2  * ============LICENSE_START=======================================================
 
   3  * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
 
   4  * ================================================================================
 
   5  * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.bbs.event.processor.config;
 
  23 import com.google.gson.Gson;
 
  24 import com.google.gson.GsonBuilder;
 
  25 import com.google.gson.JsonElement;
 
  26 import com.google.gson.JsonObject;
 
  28 import java.time.Duration;
 
  29 import java.util.AbstractMap;
 
  30 import java.util.HashMap;
 
  31 import java.util.HashSet;
 
  35 import org.jetbrains.annotations.NotNull;
 
  36 import org.onap.bbs.event.processor.exceptions.ApplicationEnvironmentException;
 
  37 import org.onap.bbs.event.processor.model.GeneratedAppConfigObject;
 
  38 import org.onap.bbs.event.processor.model.ImmutableDmaapInfo;
 
  39 import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject;
 
  40 import org.onap.bbs.event.processor.model.ImmutableStreamsObject;
 
  41 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
 
  42 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
 
  43 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 
  44 import org.slf4j.Logger;
 
  45 import org.slf4j.LoggerFactory;
 
  46 import org.springframework.beans.factory.annotation.Autowired;
 
  47 import org.springframework.stereotype.Component;
 
  49 import reactor.core.Disposable;
 
  52 public class ConsulConfigurationGateway {
 
  54     private static final Logger LOGGER = LoggerFactory.getLogger(ConsulConfigurationGateway.class);
 
  56     private static final String CONSUL_HOST = "CONSUL_HOST";
 
  57     private static final String CONFIG_BINDING_SERVICE = "CONFIG_BINDING_SERVICE";
 
  58     private static final String HOSTNAME = "HOSTNAME";
 
  60     private final ApplicationConfiguration configuration;
 
  62     private Disposable cbsFetchPipeline;
 
  65     ConsulConfigurationGateway(ApplicationConfiguration configuration) {
 
  66         this.configuration = configuration;
 
  67         gson = new GsonBuilder().setPrettyPrinting().create();
 
  71      * Periodically fetch application configuration via CBS service of DCAE.
 
  72      * @param initialDelay initial delay before initiation of polling
 
  73      * @param period polling interval
 
  75     public void periodicallyFetchConfigFromCbs(Duration initialDelay, Duration period) {
 
  76         if (environmentNotReady()) {
 
  77             throw new ApplicationEnvironmentException(
 
  78                     String.format("Application Environment missing critical parameters: %s",
 
  79                             getMissingEnvironmentVariables()));
 
  82         fetchConfig(initialDelay, period);
 
  85     boolean environmentNotReady() {
 
  86         String consulHost = System.getenv().get(CONSUL_HOST);
 
  87         String cbs = System.getenv().get(CONFIG_BINDING_SERVICE);
 
  88         String hostname = System.getenv().get(HOSTNAME);
 
  89         return consulHost == null || cbs == null || hostname == null;
 
  93      * Reschedule application configuration periodic retrieval via CBS service of DCAE.
 
  94      * @param initialDelay initial delay before rescheduling
 
  95      * @param period new polling interval
 
  97     public void rescheduleCbsConfigurationRetrieval(Duration initialDelay, Duration period) {
 
  98         if (cbsFetchPipeline != null && !cbsFetchPipeline.isDisposed()) {
 
  99             LOGGER.info("Disposing old CBS Config fetch job");
 
 100             cbsFetchPipeline.dispose();
 
 102         periodicallyFetchConfigFromCbs(initialDelay, period);
 
 105     private void parseConsulRetrievedConfiguration(JsonObject jsonObject) {
 
 107         GeneratedAppConfigObject generatedAppConfigObject = generateAppConfigObject(jsonObject);
 
 108         LOGGER.trace("Consul-Retrieved Application Generated Object:\n{}", generatedAppConfigObject);
 
 109         configuration.updateCurrentConfiguration(generatedAppConfigObject);
 
 112     private void handleErrors(Throwable throwable) {
 
 113         LOGGER.error("Periodic CBS configuration polling was terminated with error: {}", throwable);
 
 114         LOGGER.info("Will restart CBS configuration fetching job due to abnormal termination."
 
 115                 + " Will start fetching after 60 seconds (please correct configuration in the meantime)"
 
 116                 + " and it will then poll every {} seconds (reverting to default)",
 
 117                 configuration.getCbsPollingInterval());
 
 118         fetchConfig(Duration.ofSeconds(60), Duration.ofSeconds(configuration.getCbsPollingInterval()));
 
 121     private void fetchConfig(Duration initialDelay, Duration period) {
 
 122         RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
 
 124         // Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name)
 
 125         EnvProperties env = EnvProperties.fromEnvironment();
 
 127         // Create the client and use it to get the configuration
 
 128         cbsFetchPipeline = CbsClientFactory.createCbsClient(env)
 
 129                 .doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e))
 
 131                 .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, initialDelay, period))
 
 132                 .subscribe(this::parseConsulRetrievedConfiguration, this::handleErrors);
 
 136     GeneratedAppConfigObject generateAppConfigObject(JsonObject configObject) {
 
 138         if (LOGGER.isInfoEnabled()) {
 
 139             String configAsString = gson.toJson(configObject);
 
 140             LOGGER.info("Received App Config object\n{}", configAsString);
 
 143         final String dmaapProtocol = configObject.get("dmaap.protocol").getAsString();
 
 144         final String dmaapContentType  = configObject.get("dmaap.contentType").getAsString();
 
 145         final String dmaapConsumerId  = configObject.get("dmaap.consumer.consumerId").getAsString();
 
 146         final String dmaapConsumerGroup = configObject.get("dmaap.consumer.consumerGroup").getAsString();
 
 147         final int dmaapMessageLimit  = configObject.get("dmaap.messageLimit").getAsInt();
 
 148         final int dmaapTimeoutMs  = configObject.get("dmaap.timeoutMs").getAsInt();
 
 150         final String aaiHost = configObject.get("aai.host").getAsString();
 
 151         final int aaiPort = configObject.get("aai.port").getAsInt();
 
 152         final String aaiProtocol = configObject.get("aai.protocol").getAsString();
 
 153         final String aaiUsername = configObject.get("aai.username").getAsString();
 
 154         final String aaiPassword = configObject.get("aai.password").getAsString();
 
 155         final boolean aaiIgnoreSslCertificateErrors =
 
 156                 configObject.get("aai.aaiIgnoreSslCertificateErrors").getAsBoolean();
 
 158         final int pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt();
 
 159         final int pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt();
 
 160         final int cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt();
 
 162         final String reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString();
 
 163         final String reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString();
 
 164         final String cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString();
 
 165         final String cpeAuthClControlName =
 
 166                 configObject.get("application.cpe.authentication.clControlName").getAsString();
 
 168         final String policyVersion = configObject.get("application.policyVersion").getAsString();
 
 169         final String closeLoopTargetType = configObject.get("application.clTargetType").getAsString();
 
 170         final String closeLoopEventStatus = configObject.get("application.clEventStatus").getAsString();
 
 171         final String closeLoopVersion = configObject.get("application.clVersion").getAsString();
 
 172         final String closeLoopTarget = configObject.get("application.clTarget").getAsString();
 
 173         final String closeLoopOriginator = configObject.get("application.clOriginator").getAsString();
 
 175         final String reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString();
 
 176         final String cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString();
 
 177         final String closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString();
 
 179         final String loggingLevel = configObject.get("application.loggingLevel").getAsString();
 
 181         final JsonObject streamsPublishes = configObject.getAsJsonObject("streams_publishes");
 
 182         final JsonObject streamsSubscribes = configObject.getAsJsonObject("streams_subscribes");
 
 184         return ImmutableGeneratedAppConfigObject.builder()
 
 185                 .dmaapProtocol(dmaapProtocol)
 
 186                 .dmaapContentType(dmaapContentType)
 
 187                 .dmaapConsumerConsumerId(dmaapConsumerId)
 
 188                 .dmaapConsumerConsumerGroup(dmaapConsumerGroup)
 
 189                 .dmaapMessageLimit(dmaapMessageLimit)
 
 190                 .dmaapTimeoutMs(dmaapTimeoutMs)
 
 193                 .aaiProtocol(aaiProtocol)
 
 194                 .aaiUsername(aaiUsername)
 
 195                 .aaiPassword(aaiPassword)
 
 196                 .aaiIgnoreSslCertificateErrors(aaiIgnoreSslCertificateErrors)
 
 197                 .pipelinesPollingIntervalSec(pipelinesPollingIntervalSec)
 
 198                 .pipelinesTimeoutSec(pipelinesTimeoutSec)
 
 199                 .cbsPollingIntervalSec(cbsPollingIntervalSec)
 
 200                 .reRegistrationPolicyScope(reRegPolicyScope)
 
 201                 .reRegistrationClControlName(reRegClControlName)
 
 202                 .policyVersion(policyVersion)
 
 203                 .closeLoopTargetType(closeLoopTargetType)
 
 204                 .closeLoopEventStatus(closeLoopEventStatus)
 
 205                 .closeLoopVersion(closeLoopVersion)
 
 206                 .closeLoopTarget(closeLoopTarget)
 
 207                 .closeLoopOriginator(closeLoopOriginator)
 
 208                 .cpeAuthPolicyScope(cpeAuthPolicyScope)
 
 209                 .cpeAuthClControlName(cpeAuthClControlName)
 
 210                 .reRegConfigKey(reRegConfigKey)
 
 211                 .cpeAuthConfigKey(cpeAuthConfigKey)
 
 212                 .closeLoopConfigKey(closeLoopConfigKey)
 
 213                 .loggingLevel(loggingLevel)
 
 214                 .streamSubscribesMap(parseStreamsObjects(streamsSubscribes))
 
 215                 .streamPublishesMap(parseStreamsObjects(streamsPublishes))
 
 219     private Set<String> getMissingEnvironmentVariables() {
 
 220         Set<String> missingVars = new HashSet<>();
 
 221         if (System.getenv().get(CONSUL_HOST) == null) {
 
 222             missingVars.add(CONSUL_HOST);
 
 224         if (System.getenv().get(CONFIG_BINDING_SERVICE) == null) {
 
 225             missingVars.add(CONFIG_BINDING_SERVICE);
 
 227         if (System.getenv().get(HOSTNAME) == null) {
 
 228             missingVars.add(HOSTNAME);
 
 233     private Map<String, GeneratedAppConfigObject.StreamsObject> parseStreamsObjects(
 
 234             JsonObject jsonObject) {
 
 235         Map<String, GeneratedAppConfigObject.StreamsObject> streams = new HashMap<>();
 
 237         jsonObject.entrySet().stream()
 
 238                 .map(this::parseStreamsSingleObject)
 
 239                 .forEach(e -> streams.put(e.getKey(), e.getValue()));
 
 244     private Map.Entry<String, GeneratedAppConfigObject.StreamsObject> parseStreamsSingleObject(
 
 245             Map.Entry<String, JsonElement> jsonEntry) {
 
 247         JsonObject closeLoopOutput = (JsonObject) jsonEntry.getValue();
 
 249         String type = closeLoopOutput.get("type").getAsString();
 
 250         String aafUsername = closeLoopOutput.get("aaf_username") != null
 
 251                 ? closeLoopOutput.get("aaf_username").getAsString() : "";
 
 252         String aafPassword = closeLoopOutput.get("aaf_password") != null
 
 253                 ? closeLoopOutput.get("aaf_password").getAsString() : "";
 
 255         JsonObject dmaapInfo = closeLoopOutput.getAsJsonObject("dmaap_info");
 
 256         String clientId = dmaapInfo.get("client_id") != null
 
 257                 ? dmaapInfo.get("client_id").getAsString() : "";
 
 258         String clientRole = dmaapInfo.get("client_role") != null
 
 259                 ? dmaapInfo.get("client_role").getAsString() : "";
 
 260         String location = dmaapInfo.get("location") != null
 
 261                 ? dmaapInfo.get("location").getAsString() : "";
 
 262         String topicUrl = dmaapInfo.get("topic_url").getAsString();
 
 264         GeneratedAppConfigObject.DmaapInfo dmaapInfoObject = ImmutableDmaapInfo.builder()
 
 266                 .clientRole(clientRole)
 
 270         GeneratedAppConfigObject.StreamsObject streamsObject = ImmutableStreamsObject.builder()
 
 272                 .aafUsername(aafUsername)
 
 273                 .aafPassword(aafPassword)
 
 274                 .dmaapInfo(dmaapInfoObject)
 
 277         return new AbstractMap.SimpleEntry<>(jsonEntry.getKey(), streamsObject);