2  * ============LICENSE_START=======================================================
 
   3  * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
 
   4  * ================================================================================
 
   5  * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.bbs.event.processor.config;
 
  23 import com.google.gson.Gson;
 
  24 import com.google.gson.GsonBuilder;
 
  25 import com.google.gson.JsonElement;
 
  26 import com.google.gson.JsonObject;
 
  28 import java.time.Duration;
 
  29 import java.util.AbstractMap;
 
  30 import java.util.HashMap;
 
  31 import java.util.HashSet;
 
  35 import org.jetbrains.annotations.NotNull;
 
  36 import org.onap.bbs.event.processor.exceptions.ApplicationEnvironmentException;
 
  37 import org.onap.bbs.event.processor.model.GeneratedAppConfigObject;
 
  38 import org.onap.bbs.event.processor.model.ImmutableDmaapInfo;
 
  39 import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject;
 
  40 import org.onap.bbs.event.processor.model.ImmutableStreamsObject;
 
  41 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
 
  42 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
 
  43 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
 
  44 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 
  45 import org.slf4j.Logger;
 
  46 import org.slf4j.LoggerFactory;
 
  47 import org.springframework.beans.factory.annotation.Autowired;
 
  48 import org.springframework.stereotype.Component;
 
  50 import reactor.core.Disposable;
 
  53 public class ConsulConfigurationGateway {
 
  55     private static final Logger LOGGER = LoggerFactory.getLogger(ConsulConfigurationGateway.class);
 
  57     private static final String CONSUL_HOST = "CONSUL_HOST";
 
  58     private static final String CONFIG_BINDING_SERVICE = "CONFIG_BINDING_SERVICE";
 
  59     private static final String HOSTNAME = "HOSTNAME";
 
  61     private final ApplicationConfiguration configuration;
 
  63     private Disposable cbsFetchPipeline;
 
  66     ConsulConfigurationGateway(ApplicationConfiguration configuration) {
 
  67         this.configuration = configuration;
 
  68         gson = new GsonBuilder().setPrettyPrinting().create();
 
  72      * Periodically fetch application configuration via CBS service of DCAE.
 
  73      * @param initialDelay initial delay before initiation of polling
 
  74      * @param period polling interval
 
  76     public void periodicallyFetchConfigFromCbs(Duration initialDelay, Duration period) {
 
  77         if (environmentNotReady()) {
 
  78             throw new ApplicationEnvironmentException(
 
  79                     String.format("Application Environment missing critical parameters: %s",
 
  80                             getMissingEnvironmentVariables()));
 
  83         fetchConfig(initialDelay, period);
 
  86     boolean environmentNotReady() {
 
  87         var consulHost = System.getenv().get(CONSUL_HOST);
 
  88         var cbs = System.getenv().get(CONFIG_BINDING_SERVICE);
 
  89         var hostname = System.getenv().get(HOSTNAME);
 
  90         return consulHost == null || cbs == null || hostname == null;
 
  94      * Reschedule application configuration periodic retrieval via CBS service of DCAE.
 
  95      * @param initialDelay initial delay before rescheduling
 
  96      * @param period new polling interval
 
  98     public void rescheduleCbsConfigurationRetrieval(Duration initialDelay, Duration period) {
 
  99         if (cbsFetchPipeline != null && !cbsFetchPipeline.isDisposed()) {
 
 100             LOGGER.info("Disposing old CBS Config fetch job");
 
 101             cbsFetchPipeline.dispose();
 
 103         periodicallyFetchConfigFromCbs(initialDelay, period);
 
 106     private void parseConsulRetrievedConfiguration(JsonObject jsonObject) {
 
 108         var generatedAppConfigObject = generateAppConfigObject(jsonObject);
 
 109         LOGGER.trace("Consul-Retrieved Application Generated Object:\n{}", generatedAppConfigObject);
 
 110         configuration.updateCurrentConfiguration(generatedAppConfigObject);
 
 113     private void handleErrors(Throwable throwable) {
 
 114         LOGGER.error("Periodic CBS configuration polling was terminated with error: {}", throwable);
 
 115         LOGGER.info("Will restart CBS configuration fetching job due to abnormal termination."
 
 116                 + " Will start fetching after 60 seconds (please correct configuration in the meantime)"
 
 117                 + " and it will then poll every {} seconds (reverting to default)",
 
 118                 configuration.getCbsPollingInterval());
 
 119         fetchConfig(Duration.ofSeconds(60), Duration.ofSeconds(configuration.getCbsPollingInterval()));
 
 122     private void fetchConfig(Duration initialDelay, Duration period) {
 
 123         RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
 
 125         // Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name)
 
 126         var cbsClientConfig = CbsClientConfiguration.fromEnvironment();
 
 127         var cbsRequest = CbsRequests.getConfiguration(diagnosticContext);
 
 128         // Create the client and use it to get the configuration
 
 129         cbsFetchPipeline = CbsClientFactory.createCbsClient(cbsClientConfig)
 
 130                 .doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e))
 
 132                 .flatMapMany(cbsClient -> cbsClient.updates(cbsRequest, initialDelay, period))
 
 133                 .subscribe(this::parseConsulRetrievedConfiguration, this::handleErrors);
 
 137     GeneratedAppConfigObject generateAppConfigObject(JsonObject configObject) {
 
 139         if (LOGGER.isInfoEnabled()) {
 
 140             var configAsString = gson.toJson(configObject);
 
 141             LOGGER.info("Received App Config object\n{}", configAsString);
 
 144         final var dmaapProtocol = configObject.get("dmaap.protocol").getAsString();
 
 145         final var dmaapContentType  = configObject.get("dmaap.contentType").getAsString();
 
 146         final var dmaapConsumerId  = configObject.get("dmaap.consumer.consumerId").getAsString();
 
 147         final var dmaapConsumerGroup = configObject.get("dmaap.consumer.consumerGroup").getAsString();
 
 148         final var dmaapMessageLimit  = configObject.get("dmaap.messageLimit").getAsInt();
 
 149         final var dmaapTimeoutMs  = configObject.get("dmaap.timeoutMs").getAsInt();
 
 151         final var aaiHost = configObject.get("aai.host").getAsString();
 
 152         final var aaiPort = configObject.get("aai.port").getAsInt();
 
 153         final var aaiProtocol = configObject.get("aai.protocol").getAsString();
 
 154         final var aaiUsername = configObject.get("aai.username").getAsString();
 
 155         final var aaiPassword = configObject.get("aai.password").getAsString();
 
 156         final var aaiIgnoreSslCertificateErrors =
 
 157                 configObject.get("aai.aaiIgnoreSslCertificateErrors").getAsBoolean();
 
 159         final var pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt();
 
 160         final var pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt();
 
 161         final var cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt();
 
 163         final var reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString();
 
 164         final var reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString();
 
 165         final var cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString();
 
 166         final var cpeAuthClControlName =
 
 167                 configObject.get("application.cpe.authentication.clControlName").getAsString();
 
 169         final var policyVersion = configObject.get("application.policyVersion").getAsString();
 
 170         final var closeLoopTargetType = configObject.get("application.clTargetType").getAsString();
 
 171         final var closeLoopEventStatus = configObject.get("application.clEventStatus").getAsString();
 
 172         final var closeLoopVersion = configObject.get("application.clVersion").getAsString();
 
 173         final var closeLoopTarget = configObject.get("application.clTarget").getAsString();
 
 174         final var closeLoopOriginator = configObject.get("application.clOriginator").getAsString();
 
 176         final var reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString();
 
 177         final var cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString();
 
 178         final var closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString();
 
 180         final var loggingLevel = configObject.get("application.loggingLevel").getAsString();
 
 182         final var keyStorePath = configObject.get("application.ssl.keyStorePath").getAsString();
 
 183         final var keyStorePasswordPath = configObject.get("application.ssl.keyStorePasswordPath").getAsString();
 
 184         final var trustStorePath = configObject.get("application.ssl.trustStorePath").getAsString();
 
 185         final var trustStorePasswordPath = configObject.get("application.ssl.trustStorePasswordPath").getAsString();
 
 186         final var aaiEnableCertAuth = configObject.get("application.ssl.enableAaiCertAuth").getAsBoolean();
 
 187         final var dmaapEnableCertAuth = configObject.get("application.ssl.enableDmaapCertAuth").getAsBoolean();
 
 189         final var streamsPublishes = configObject.getAsJsonObject("streams_publishes");
 
 190         final var streamsSubscribes = configObject.getAsJsonObject("streams_subscribes");
 
 192         return ImmutableGeneratedAppConfigObject.builder()
 
 193                 .dmaapProtocol(dmaapProtocol)
 
 194                 .dmaapContentType(dmaapContentType)
 
 195                 .dmaapConsumerConsumerId(dmaapConsumerId)
 
 196                 .dmaapConsumerConsumerGroup(dmaapConsumerGroup)
 
 197                 .dmaapMessageLimit(dmaapMessageLimit)
 
 198                 .dmaapTimeoutMs(dmaapTimeoutMs)
 
 201                 .aaiProtocol(aaiProtocol)
 
 202                 .aaiUsername(aaiUsername)
 
 203                 .aaiPassword(aaiPassword)
 
 204                 .aaiIgnoreSslCertificateErrors(aaiIgnoreSslCertificateErrors)
 
 205                 .pipelinesPollingIntervalSec(pipelinesPollingIntervalSec)
 
 206                 .pipelinesTimeoutSec(pipelinesTimeoutSec)
 
 207                 .cbsPollingIntervalSec(cbsPollingIntervalSec)
 
 208                 .reRegistrationPolicyScope(reRegPolicyScope)
 
 209                 .reRegistrationClControlName(reRegClControlName)
 
 210                 .policyVersion(policyVersion)
 
 211                 .closeLoopTargetType(closeLoopTargetType)
 
 212                 .closeLoopEventStatus(closeLoopEventStatus)
 
 213                 .closeLoopVersion(closeLoopVersion)
 
 214                 .closeLoopTarget(closeLoopTarget)
 
 215                 .closeLoopOriginator(closeLoopOriginator)
 
 216                 .cpeAuthPolicyScope(cpeAuthPolicyScope)
 
 217                 .cpeAuthClControlName(cpeAuthClControlName)
 
 218                 .reRegConfigKey(reRegConfigKey)
 
 219                 .cpeAuthConfigKey(cpeAuthConfigKey)
 
 220                 .closeLoopConfigKey(closeLoopConfigKey)
 
 221                 .loggingLevel(loggingLevel)
 
 222                 .keyStorePath(keyStorePath)
 
 223                 .keyStorePasswordPath(keyStorePasswordPath)
 
 224                 .trustStorePath(trustStorePath)
 
 225                 .trustStorePasswordPath(trustStorePasswordPath)
 
 226                 .enableAaiCertAuth(aaiEnableCertAuth)
 
 227                 .enableDmaapCertAuth(dmaapEnableCertAuth)
 
 228                 .streamSubscribesMap(parseStreamsObjects(streamsSubscribes))
 
 229                 .streamPublishesMap(parseStreamsObjects(streamsPublishes))
 
 233     private Set<String> getMissingEnvironmentVariables() {
 
 234         Set<String> missingVars = new HashSet<>();
 
 235         if (System.getenv().get(CONSUL_HOST) == null) {
 
 236             missingVars.add(CONSUL_HOST);
 
 238         if (System.getenv().get(CONFIG_BINDING_SERVICE) == null) {
 
 239             missingVars.add(CONFIG_BINDING_SERVICE);
 
 241         if (System.getenv().get(HOSTNAME) == null) {
 
 242             missingVars.add(HOSTNAME);
 
 247     private Map<String, GeneratedAppConfigObject.StreamsObject> parseStreamsObjects(
 
 248             JsonObject jsonObject) {
 
 249         Map<String, GeneratedAppConfigObject.StreamsObject> streams = new HashMap<>();
 
 251         jsonObject.entrySet().stream()
 
 252                 .map(this::parseStreamsSingleObject)
 
 253                 .forEach(e -> streams.put(e.getKey(), e.getValue()));
 
 258     private Map.Entry<String, GeneratedAppConfigObject.StreamsObject> parseStreamsSingleObject(
 
 259             Map.Entry<String, JsonElement> jsonEntry) {
 
 261         var closeLoopOutput = (JsonObject) jsonEntry.getValue();
 
 263         var type = closeLoopOutput.get("type").getAsString();
 
 264         var aafUsername = closeLoopOutput.get("aaf_username") != null
 
 265                 ? closeLoopOutput.get("aaf_username").getAsString() : "";
 
 266         var aafPassword = closeLoopOutput.get("aaf_password") != null
 
 267                 ? closeLoopOutput.get("aaf_password").getAsString() : "";
 
 269         var dmaapInfo = closeLoopOutput.getAsJsonObject("dmaap_info");
 
 270         var clientId = dmaapInfo.get("client_id") != null
 
 271                 ? dmaapInfo.get("client_id").getAsString() : "";
 
 272         var clientRole = dmaapInfo.get("client_role") != null
 
 273                 ? dmaapInfo.get("client_role").getAsString() : "";
 
 274         var location = dmaapInfo.get("location") != null
 
 275                 ? dmaapInfo.get("location").getAsString() : "";
 
 276         var topicUrl = dmaapInfo.get("topic_url").getAsString();
 
 278         GeneratedAppConfigObject.DmaapInfo dmaapInfoObject = ImmutableDmaapInfo.builder()
 
 280                 .clientRole(clientRole)
 
 284         GeneratedAppConfigObject.StreamsObject streamsObject = ImmutableStreamsObject.builder()
 
 286                 .aafUsername(aafUsername)
 
 287                 .aafPassword(aafPassword)
 
 288                 .dmaapInfo(dmaapInfoObject)
 
 291         return new AbstractMap.SimpleEntry<>(jsonEntry.getKey(), streamsObject);