2  * ============LICENSE_START=======================================================
 
   3  * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
 
   4  * ================================================================================
 
   5  * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.bbs.event.processor.config;
 
  23 import com.google.gson.Gson;
 
  24 import com.google.gson.GsonBuilder;
 
  25 import com.google.gson.JsonElement;
 
  26 import com.google.gson.JsonObject;
 
  28 import java.time.Duration;
 
  29 import java.util.AbstractMap;
 
  30 import java.util.HashMap;
 
  31 import java.util.HashSet;
 
  35 import org.jetbrains.annotations.NotNull;
 
  36 import org.onap.bbs.event.processor.exceptions.ApplicationEnvironmentException;
 
  37 import org.onap.bbs.event.processor.model.GeneratedAppConfigObject;
 
  38 import org.onap.bbs.event.processor.model.ImmutableDmaapInfo;
 
  39 import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject;
 
  40 import org.onap.bbs.event.processor.model.ImmutableStreamsObject;
 
  41 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
 
  42 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
 
  43 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 
  44 import org.slf4j.Logger;
 
  45 import org.slf4j.LoggerFactory;
 
  46 import org.springframework.beans.factory.annotation.Autowired;
 
  47 import org.springframework.stereotype.Component;
 
  49 import reactor.core.Disposable;
 
  52 public class ConsulConfigurationGateway {
 
  54     private static final Logger LOGGER = LoggerFactory.getLogger(ConsulConfigurationGateway.class);
 
  56     private static final String CONSUL_HOST = "CONSUL_HOST";
 
  57     private static final String CONFIG_BINDING_SERVICE = "CONFIG_BINDING_SERVICE";
 
  58     private static final String HOSTNAME = "HOSTNAME";
 
  60     private final ApplicationConfiguration configuration;
 
  62     private Disposable cbsFetchPipeline;
 
  65     ConsulConfigurationGateway(ApplicationConfiguration configuration) {
 
  66         this.configuration = configuration;
 
  67         gson = new GsonBuilder().setPrettyPrinting().create();
 
  71      * Periodically fetch application configuration via CBS service of DCAE.
 
  72      * @param initialDelay initial delay before initiation of polling
 
  73      * @param period polling interval
 
  75     public void periodicallyFetchConfigFromCbs(Duration initialDelay, Duration period) {
 
  76         if (environmentNotReady()) {
 
  77             throw new ApplicationEnvironmentException(
 
  78                     String.format("Application Environment missing critical parameters: %s",
 
  79                             getMissingEnvironmentVariables()));
 
  82         RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
 
  84         // Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name)
 
  85         EnvProperties env = EnvProperties.fromEnvironment();
 
  87         // Create the client and use it to get the configuration
 
  88         cbsFetchPipeline = CbsClientFactory.createCbsClient(env)
 
  89                 .doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e))
 
  91                 .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, initialDelay, period))
 
  92                 .subscribe(this::parseConsulRetrievedConfiguration, this::handleErrors);
 
  95     boolean environmentNotReady() {
 
  96         String consulHost = System.getenv().get(CONSUL_HOST);
 
  97         String cbs = System.getenv().get(CONFIG_BINDING_SERVICE);
 
  98         String hostname = System.getenv().get(HOSTNAME);
 
  99         return consulHost == null || cbs == null || hostname == null;
 
 103      * Reschedule application configuration periodic retrieval via CBS service of DCAE.
 
 104      * @param initialDelay initial delay before rescheduling
 
 105      * @param period new polling interval
 
 107     public void rescheduleCbsConfigurationRetrieval(Duration initialDelay, Duration period) {
 
 108         if (cbsFetchPipeline != null && !cbsFetchPipeline.isDisposed()) {
 
 109             LOGGER.info("Disposing old CBS Config fetch job");
 
 110             cbsFetchPipeline.dispose();
 
 112         periodicallyFetchConfigFromCbs(initialDelay, period);
 
 115     private void parseConsulRetrievedConfiguration(JsonObject jsonObject) {
 
 117         GeneratedAppConfigObject generatedAppConfigObject = generateAppConfigObject(jsonObject);
 
 118         LOGGER.trace("Consul-Retrieved Application Generated Object:\n{}", generatedAppConfigObject);
 
 119         configuration.updateCurrentConfiguration(generatedAppConfigObject);
 
 122     private void handleErrors(Throwable throwable) {
 
 123         LOGGER.error("Periodic CBS configuration polling was terminated with error: {}", throwable);
 
 127     GeneratedAppConfigObject generateAppConfigObject(JsonObject configObject) {
 
 129         if (LOGGER.isInfoEnabled()) {
 
 130             String configAsString = gson.toJson(configObject);
 
 131             LOGGER.info("Received App Config object\n{}", configAsString);
 
 134         final String dmaapProtocol = configObject.get("dmaap.protocol").getAsString();
 
 135         final String dmaapContentType  = configObject.get("dmaap.contentType").getAsString();
 
 136         final String dmaapConsumerId  = configObject.get("dmaap.consumer.consumerId").getAsString();
 
 137         final String dmaapConsumerGroup = configObject.get("dmaap.consumer.consumerGroup").getAsString();
 
 138         final int dmaapMessageLimit  = configObject.get("dmaap.messageLimit").getAsInt();
 
 139         final int dmaapTimeoutMs  = configObject.get("dmaap.timeoutMs").getAsInt();
 
 141         final String aaiHost = configObject.get("aai.host").getAsString();
 
 142         final int aaiPort = configObject.get("aai.port").getAsInt();
 
 143         final String aaiProtocol = configObject.get("aai.protocol").getAsString();
 
 144         final String aaiUsername = configObject.get("aai.username").getAsString();
 
 145         final String aaiPassword = configObject.get("aai.password").getAsString();
 
 146         final boolean aaiIgnoreSslCertificateErrors =
 
 147                 configObject.get("aai.aaiIgnoreSslCertificateErrors").getAsBoolean();
 
 149         final int pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt();
 
 150         final int pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt();
 
 151         final int cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt();
 
 152         final String reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString();
 
 153         final String reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString();
 
 154         final String cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString();
 
 155         final String cpeAuthClControlName =
 
 156                 configObject.get("application.cpe.authentication.clControlName").getAsString();
 
 157         final String reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString();
 
 158         final String cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString();
 
 159         final String closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString();
 
 161         final JsonObject streamsPublishes = configObject.getAsJsonObject("streams_publishes");
 
 162         final JsonObject streamsSubscribes = configObject.getAsJsonObject("streams_subscribes");
 
 164         return ImmutableGeneratedAppConfigObject.builder()
 
 165                 .dmaapProtocol(dmaapProtocol)
 
 166                 .dmaapContentType(dmaapContentType)
 
 167                 .dmaapConsumerConsumerId(dmaapConsumerId)
 
 168                 .dmaapConsumerConsumerGroup(dmaapConsumerGroup)
 
 169                 .dmaapMessageLimit(dmaapMessageLimit)
 
 170                 .dmaapTimeoutMs(dmaapTimeoutMs)
 
 173                 .aaiProtocol(aaiProtocol)
 
 174                 .aaiUsername(aaiUsername)
 
 175                 .aaiPassword(aaiPassword)
 
 176                 .aaiIgnoreSslCertificateErrors(aaiIgnoreSslCertificateErrors)
 
 177                 .pipelinesPollingIntervalSec(pipelinesPollingIntervalSec)
 
 178                 .pipelinesTimeoutSec(pipelinesTimeoutSec)
 
 179                 .cbsPollingIntervalSec(cbsPollingIntervalSec)
 
 180                 .reRegistrationPolicyScope(reRegPolicyScope)
 
 181                 .reRegistrationClControlName(reRegClControlName)
 
 182                 .cpeAuthPolicyScope(cpeAuthPolicyScope)
 
 183                 .cpeAuthClControlName(cpeAuthClControlName)
 
 184                 .reRegConfigKey(reRegConfigKey)
 
 185                 .cpeAuthConfigKey(cpeAuthConfigKey)
 
 186                 .closeLoopConfigKey(closeLoopConfigKey)
 
 187                 .streamSubscribesMap(parseStreamsObjects(streamsSubscribes))
 
 188                 .streamPublishesMap(parseStreamsObjects(streamsPublishes))
 
 192     private Set<String> getMissingEnvironmentVariables() {
 
 193         Set<String> missingVars = new HashSet<>();
 
 194         if (System.getenv().get(CONSUL_HOST) == null) {
 
 195             missingVars.add(CONSUL_HOST);
 
 197         if (System.getenv().get(CONFIG_BINDING_SERVICE) == null) {
 
 198             missingVars.add(CONFIG_BINDING_SERVICE);
 
 200         if (System.getenv().get(HOSTNAME) == null) {
 
 201             missingVars.add(HOSTNAME);
 
 206     private Map<String, GeneratedAppConfigObject.StreamsObject> parseStreamsObjects(
 
 207             JsonObject jsonObject) {
 
 208         Map<String, GeneratedAppConfigObject.StreamsObject> streams = new HashMap<>();
 
 210         jsonObject.entrySet().stream()
 
 211                 .map(this::parseStreamsSingleObject)
 
 212                 .forEach(e -> streams.put(e.getKey(), e.getValue()));
 
 217     private Map.Entry<String, GeneratedAppConfigObject.StreamsObject> parseStreamsSingleObject(
 
 218             Map.Entry<String, JsonElement> jsonEntry) {
 
 220         JsonObject closeLoopOutput = (JsonObject) jsonEntry.getValue();
 
 222         String type = closeLoopOutput.get("type").getAsString();
 
 223         String aafUsername = closeLoopOutput.get("aaf_username") != null
 
 224                 ? closeLoopOutput.get("aaf_username").getAsString() : "";
 
 225         String aafPassword = closeLoopOutput.get("aaf_password") != null
 
 226                 ? closeLoopOutput.get("aaf_password").getAsString() : "";
 
 228         JsonObject dmaapInfo = closeLoopOutput.getAsJsonObject("dmaap_info");
 
 229         String clientId = dmaapInfo.get("client_id") != null
 
 230                 ? dmaapInfo.get("client_id").getAsString() : "";
 
 231         String clientRole = dmaapInfo.get("client_role") != null
 
 232                 ? dmaapInfo.get("client_role").getAsString() : "";
 
 233         String location = dmaapInfo.get("location") != null
 
 234                 ? dmaapInfo.get("location").getAsString() : "";
 
 235         String topicUrl = dmaapInfo.get("topic_url").getAsString();
 
 237         GeneratedAppConfigObject.DmaapInfo dmaapInfoObject = ImmutableDmaapInfo.builder()
 
 239                 .clientRole(clientRole)
 
 243         GeneratedAppConfigObject.StreamsObject streamsObject = ImmutableStreamsObject.builder()
 
 245                 .aafUsername(aafUsername)
 
 246                 .aafPassword(aafPassword)
 
 247                 .dmaapInfo(dmaapInfoObject)
 
 250         return new AbstractMap.SimpleEntry<>(jsonEntry.getKey(), streamsObject);