2  * ============LICENSE_START=======================================================
 
   3  * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
 
   4  * ================================================================================
 
   5  * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.bbs.event.processor.config;
 
  23 import com.google.gson.Gson;
 
  24 import com.google.gson.GsonBuilder;
 
  25 import com.google.gson.JsonElement;
 
  26 import com.google.gson.JsonObject;
 
  28 import java.time.Duration;
 
  29 import java.util.AbstractMap;
 
  30 import java.util.HashMap;
 
  31 import java.util.HashSet;
 
  35 import org.jetbrains.annotations.NotNull;
 
  36 import org.onap.bbs.event.processor.exceptions.ApplicationEnvironmentException;
 
  37 import org.onap.bbs.event.processor.model.GeneratedAppConfigObject;
 
  38 import org.onap.bbs.event.processor.model.ImmutableDmaapInfo;
 
  39 import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject;
 
  40 import org.onap.bbs.event.processor.model.ImmutableStreamsObject;
 
  41 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
 
  42 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
 
  43 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
 
  44 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
 
  45 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 
  46 import org.slf4j.Logger;
 
  47 import org.slf4j.LoggerFactory;
 
  48 import org.springframework.beans.factory.annotation.Autowired;
 
  49 import org.springframework.stereotype.Component;
 
  51 import reactor.core.Disposable;
 
  54 public class ConsulConfigurationGateway {
 
  56     private static final Logger LOGGER = LoggerFactory.getLogger(ConsulConfigurationGateway.class);
 
  58     private static final String CONSUL_HOST = "CONSUL_HOST";
 
  59     private static final String CONFIG_BINDING_SERVICE = "CONFIG_BINDING_SERVICE";
 
  60     private static final String HOSTNAME = "HOSTNAME";
 
  62     private final ApplicationConfiguration configuration;
 
  64     private Disposable cbsFetchPipeline;
 
  67     ConsulConfigurationGateway(ApplicationConfiguration configuration) {
 
  68         this.configuration = configuration;
 
  69         gson = new GsonBuilder().setPrettyPrinting().create();
 
  73      * Periodically fetch application configuration via CBS service of DCAE.
 
  74      * @param initialDelay initial delay before initiation of polling
 
  75      * @param period polling interval
 
  77     public void periodicallyFetchConfigFromCbs(Duration initialDelay, Duration period) {
 
  78         if (environmentNotReady()) {
 
  79             throw new ApplicationEnvironmentException(
 
  80                     String.format("Application Environment missing critical parameters: %s",
 
  81                             getMissingEnvironmentVariables()));
 
  84         fetchConfig(initialDelay, period);
 
  87     boolean environmentNotReady() {
 
  88         String consulHost = System.getenv().get(CONSUL_HOST);
 
  89         String cbs = System.getenv().get(CONFIG_BINDING_SERVICE);
 
  90         String hostname = System.getenv().get(HOSTNAME);
 
  91         return consulHost == null || cbs == null || hostname == null;
 
  95      * Reschedule application configuration periodic retrieval via CBS service of DCAE.
 
  96      * @param initialDelay initial delay before rescheduling
 
  97      * @param period new polling interval
 
  99     public void rescheduleCbsConfigurationRetrieval(Duration initialDelay, Duration period) {
 
 100         if (cbsFetchPipeline != null && !cbsFetchPipeline.isDisposed()) {
 
 101             LOGGER.info("Disposing old CBS Config fetch job");
 
 102             cbsFetchPipeline.dispose();
 
 104         periodicallyFetchConfigFromCbs(initialDelay, period);
 
 107     private void parseConsulRetrievedConfiguration(JsonObject jsonObject) {
 
 109         GeneratedAppConfigObject generatedAppConfigObject = generateAppConfigObject(jsonObject);
 
 110         LOGGER.trace("Consul-Retrieved Application Generated Object:\n{}", generatedAppConfigObject);
 
 111         configuration.updateCurrentConfiguration(generatedAppConfigObject);
 
 114     private void handleErrors(Throwable throwable) {
 
 115         LOGGER.error("Periodic CBS configuration polling was terminated with error: {}", throwable);
 
 116         LOGGER.info("Will restart CBS configuration fetching job due to abnormal termination."
 
 117                 + " Will start fetching after 60 seconds (please correct configuration in the meantime)"
 
 118                 + " and it will then poll every {} seconds (reverting to default)",
 
 119                 configuration.getCbsPollingInterval());
 
 120         fetchConfig(Duration.ofSeconds(60), Duration.ofSeconds(configuration.getCbsPollingInterval()));
 
 123     private void fetchConfig(Duration initialDelay, Duration period) {
 
 124         RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
 
 126         // Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name)
 
 127         EnvProperties env = EnvProperties.fromEnvironment();
 
 128         CbsRequest cbsRequest = CbsRequests.getConfiguration(diagnosticContext);
 
 129         // Create the client and use it to get the configuration
 
 130         cbsFetchPipeline = CbsClientFactory.createCbsClient(env)
 
 131                 .doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e))
 
 133                 .flatMapMany(cbsClient -> cbsClient.updates(cbsRequest, initialDelay, period))
 
 134                 .subscribe(this::parseConsulRetrievedConfiguration, this::handleErrors);
 
 138     GeneratedAppConfigObject generateAppConfigObject(JsonObject configObject) {
 
 140         if (LOGGER.isInfoEnabled()) {
 
 141             String configAsString = gson.toJson(configObject);
 
 142             LOGGER.info("Received App Config object\n{}", configAsString);
 
 145         final String dmaapProtocol = configObject.get("dmaap.protocol").getAsString();
 
 146         final String dmaapContentType  = configObject.get("dmaap.contentType").getAsString();
 
 147         final String dmaapConsumerId  = configObject.get("dmaap.consumer.consumerId").getAsString();
 
 148         final String dmaapConsumerGroup = configObject.get("dmaap.consumer.consumerGroup").getAsString();
 
 149         final int dmaapMessageLimit  = configObject.get("dmaap.messageLimit").getAsInt();
 
 150         final int dmaapTimeoutMs  = configObject.get("dmaap.timeoutMs").getAsInt();
 
 152         final String aaiHost = configObject.get("aai.host").getAsString();
 
 153         final int aaiPort = configObject.get("aai.port").getAsInt();
 
 154         final String aaiProtocol = configObject.get("aai.protocol").getAsString();
 
 155         final String aaiUsername = configObject.get("aai.username").getAsString();
 
 156         final String aaiPassword = configObject.get("aai.password").getAsString();
 
 157         final boolean aaiIgnoreSslCertificateErrors =
 
 158                 configObject.get("aai.aaiIgnoreSslCertificateErrors").getAsBoolean();
 
 160         final int pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt();
 
 161         final int pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt();
 
 162         final int cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt();
 
 164         final String reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString();
 
 165         final String reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString();
 
 166         final String cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString();
 
 167         final String cpeAuthClControlName =
 
 168                 configObject.get("application.cpe.authentication.clControlName").getAsString();
 
 170         final String policyVersion = configObject.get("application.policyVersion").getAsString();
 
 171         final String closeLoopTargetType = configObject.get("application.clTargetType").getAsString();
 
 172         final String closeLoopEventStatus = configObject.get("application.clEventStatus").getAsString();
 
 173         final String closeLoopVersion = configObject.get("application.clVersion").getAsString();
 
 174         final String closeLoopTarget = configObject.get("application.clTarget").getAsString();
 
 175         final String closeLoopOriginator = configObject.get("application.clOriginator").getAsString();
 
 177         final String reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString();
 
 178         final String cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString();
 
 179         final String closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString();
 
 181         final String loggingLevel = configObject.get("application.loggingLevel").getAsString();
 
 183         final String keyStorePath = configObject.get("application.ssl.keyStorePath").getAsString();
 
 184         final String keyStorePasswordPath = configObject.get("application.ssl.keyStorePasswordPath").getAsString();
 
 185         final String trustStorePath = configObject.get("application.ssl.trustStorePath").getAsString();
 
 186         final String trustStorePasswordPath = configObject.get("application.ssl.trustStorePasswordPath").getAsString();
 
 187         final boolean aaiEnableCertAuth = configObject.get("application.ssl.enableAaiCertAuth").getAsBoolean();
 
 188         final boolean dmaapEnableCertAuth = configObject.get("application.ssl.enableDmaapCertAuth").getAsBoolean();
 
 190         final JsonObject streamsPublishes = configObject.getAsJsonObject("streams_publishes");
 
 191         final JsonObject streamsSubscribes = configObject.getAsJsonObject("streams_subscribes");
 
 193         return ImmutableGeneratedAppConfigObject.builder()
 
 194                 .dmaapProtocol(dmaapProtocol)
 
 195                 .dmaapContentType(dmaapContentType)
 
 196                 .dmaapConsumerConsumerId(dmaapConsumerId)
 
 197                 .dmaapConsumerConsumerGroup(dmaapConsumerGroup)
 
 198                 .dmaapMessageLimit(dmaapMessageLimit)
 
 199                 .dmaapTimeoutMs(dmaapTimeoutMs)
 
 202                 .aaiProtocol(aaiProtocol)
 
 203                 .aaiUsername(aaiUsername)
 
 204                 .aaiPassword(aaiPassword)
 
 205                 .aaiIgnoreSslCertificateErrors(aaiIgnoreSslCertificateErrors)
 
 206                 .pipelinesPollingIntervalSec(pipelinesPollingIntervalSec)
 
 207                 .pipelinesTimeoutSec(pipelinesTimeoutSec)
 
 208                 .cbsPollingIntervalSec(cbsPollingIntervalSec)
 
 209                 .reRegistrationPolicyScope(reRegPolicyScope)
 
 210                 .reRegistrationClControlName(reRegClControlName)
 
 211                 .policyVersion(policyVersion)
 
 212                 .closeLoopTargetType(closeLoopTargetType)
 
 213                 .closeLoopEventStatus(closeLoopEventStatus)
 
 214                 .closeLoopVersion(closeLoopVersion)
 
 215                 .closeLoopTarget(closeLoopTarget)
 
 216                 .closeLoopOriginator(closeLoopOriginator)
 
 217                 .cpeAuthPolicyScope(cpeAuthPolicyScope)
 
 218                 .cpeAuthClControlName(cpeAuthClControlName)
 
 219                 .reRegConfigKey(reRegConfigKey)
 
 220                 .cpeAuthConfigKey(cpeAuthConfigKey)
 
 221                 .closeLoopConfigKey(closeLoopConfigKey)
 
 222                 .loggingLevel(loggingLevel)
 
 223                 .keyStorePath(keyStorePath)
 
 224                 .keyStorePasswordPath(keyStorePasswordPath)
 
 225                 .trustStorePath(trustStorePath)
 
 226                 .trustStorePasswordPath(trustStorePasswordPath)
 
 227                 .enableAaiCertAuth(aaiEnableCertAuth)
 
 228                 .enableDmaapCertAuth(dmaapEnableCertAuth)
 
 229                 .streamSubscribesMap(parseStreamsObjects(streamsSubscribes))
 
 230                 .streamPublishesMap(parseStreamsObjects(streamsPublishes))
 
 234     private Set<String> getMissingEnvironmentVariables() {
 
 235         Set<String> missingVars = new HashSet<>();
 
 236         if (System.getenv().get(CONSUL_HOST) == null) {
 
 237             missingVars.add(CONSUL_HOST);
 
 239         if (System.getenv().get(CONFIG_BINDING_SERVICE) == null) {
 
 240             missingVars.add(CONFIG_BINDING_SERVICE);
 
 242         if (System.getenv().get(HOSTNAME) == null) {
 
 243             missingVars.add(HOSTNAME);
 
 248     private Map<String, GeneratedAppConfigObject.StreamsObject> parseStreamsObjects(
 
 249             JsonObject jsonObject) {
 
 250         Map<String, GeneratedAppConfigObject.StreamsObject> streams = new HashMap<>();
 
 252         jsonObject.entrySet().stream()
 
 253                 .map(this::parseStreamsSingleObject)
 
 254                 .forEach(e -> streams.put(e.getKey(), e.getValue()));
 
 259     private Map.Entry<String, GeneratedAppConfigObject.StreamsObject> parseStreamsSingleObject(
 
 260             Map.Entry<String, JsonElement> jsonEntry) {
 
 262         JsonObject closeLoopOutput = (JsonObject) jsonEntry.getValue();
 
 264         String type = closeLoopOutput.get("type").getAsString();
 
 265         String aafUsername = closeLoopOutput.get("aaf_username") != null
 
 266                 ? closeLoopOutput.get("aaf_username").getAsString() : "";
 
 267         String aafPassword = closeLoopOutput.get("aaf_password") != null
 
 268                 ? closeLoopOutput.get("aaf_password").getAsString() : "";
 
 270         JsonObject dmaapInfo = closeLoopOutput.getAsJsonObject("dmaap_info");
 
 271         String clientId = dmaapInfo.get("client_id") != null
 
 272                 ? dmaapInfo.get("client_id").getAsString() : "";
 
 273         String clientRole = dmaapInfo.get("client_role") != null
 
 274                 ? dmaapInfo.get("client_role").getAsString() : "";
 
 275         String location = dmaapInfo.get("location") != null
 
 276                 ? dmaapInfo.get("location").getAsString() : "";
 
 277         String topicUrl = dmaapInfo.get("topic_url").getAsString();
 
 279         GeneratedAppConfigObject.DmaapInfo dmaapInfoObject = ImmutableDmaapInfo.builder()
 
 281                 .clientRole(clientRole)
 
 285         GeneratedAppConfigObject.StreamsObject streamsObject = ImmutableStreamsObject.builder()
 
 287                 .aafUsername(aafUsername)
 
 288                 .aafPassword(aafPassword)
 
 289                 .dmaapInfo(dmaapInfoObject)
 
 292         return new AbstractMap.SimpleEntry<>(jsonEntry.getKey(), streamsObject);