2  * ============LICENSE_START=======================================================
 
   3  * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
 
   4  * ================================================================================
 
   5  * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.bbs.event.processor.config;
 
  23 import com.google.gson.Gson;
 
  24 import com.google.gson.GsonBuilder;
 
  25 import com.google.gson.JsonElement;
 
  26 import com.google.gson.JsonObject;
 
  28 import java.time.Duration;
 
  29 import java.util.AbstractMap;
 
  30 import java.util.HashMap;
 
  31 import java.util.HashSet;
 
  35 import org.jetbrains.annotations.NotNull;
 
  36 import org.onap.bbs.event.processor.exceptions.ApplicationEnvironmentException;
 
  37 import org.onap.bbs.event.processor.model.GeneratedAppConfigObject;
 
  38 import org.onap.bbs.event.processor.model.ImmutableDmaapInfo;
 
  39 import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject;
 
  40 import org.onap.bbs.event.processor.model.ImmutableStreamsObject;
 
  41 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
 
  42 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
 
  43 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 
  44 import org.slf4j.Logger;
 
  45 import org.slf4j.LoggerFactory;
 
  46 import org.springframework.beans.factory.annotation.Autowired;
 
  47 import org.springframework.stereotype.Component;
 
  49 import reactor.core.Disposable;
 
  52 public class ConsulConfigurationGateway {
 
  54     private static final Logger LOGGER = LoggerFactory.getLogger(ConsulConfigurationGateway.class);
 
  56     private static final String CONSUL_HOST = "CONSUL_HOST";
 
  57     private static final String CONFIG_BINDING_SERVICE = "CONFIG_BINDING_SERVICE";
 
  58     private static final String HOSTNAME = "HOSTNAME";
 
  60     private final ApplicationConfiguration configuration;
 
  62     private Disposable cbsFetchPipeline;
 
  65     ConsulConfigurationGateway(ApplicationConfiguration configuration) {
 
  66         this.configuration = configuration;
 
  67         gson = new GsonBuilder().setPrettyPrinting().create();
 
  71      * Periodically fetch application configuration via CBS service of DCAE.
 
  72      * @param initialDelay initial delay before initiation of polling
 
  73      * @param period polling interval
 
  75     public void periodicallyFetchConfigFromCbs(Duration initialDelay, Duration period) {
 
  76         if (environmentNotReady()) {
 
  77             throw new ApplicationEnvironmentException(
 
  78                     String.format("Application Environment missing critical parameters: %s",
 
  79                             getMissingEnvironmentVariables()));
 
  82         fetchConfig(initialDelay, period);
 
  85     boolean environmentNotReady() {
 
  86         String consulHost = System.getenv().get(CONSUL_HOST);
 
  87         String cbs = System.getenv().get(CONFIG_BINDING_SERVICE);
 
  88         String hostname = System.getenv().get(HOSTNAME);
 
  89         return consulHost == null || cbs == null || hostname == null;
 
  93      * Reschedule application configuration periodic retrieval via CBS service of DCAE.
 
  94      * @param initialDelay initial delay before rescheduling
 
  95      * @param period new polling interval
 
  97     public void rescheduleCbsConfigurationRetrieval(Duration initialDelay, Duration period) {
 
  98         if (cbsFetchPipeline != null && !cbsFetchPipeline.isDisposed()) {
 
  99             LOGGER.info("Disposing old CBS Config fetch job");
 
 100             cbsFetchPipeline.dispose();
 
 102         periodicallyFetchConfigFromCbs(initialDelay, period);
 
 105     private void parseConsulRetrievedConfiguration(JsonObject jsonObject) {
 
 107         GeneratedAppConfigObject generatedAppConfigObject = generateAppConfigObject(jsonObject);
 
 108         LOGGER.trace("Consul-Retrieved Application Generated Object:\n{}", generatedAppConfigObject);
 
 109         configuration.updateCurrentConfiguration(generatedAppConfigObject);
 
 112     private void handleErrors(Throwable throwable) {
 
 113         LOGGER.error("Periodic CBS configuration polling was terminated with error: {}", throwable);
 
 114         LOGGER.info("Will restart CBS configuration fetching job due to abnormal termination."
 
 115                 + " Will start fetching after 60 seconds (please correct configuration in the meantime)"
 
 116                 + " and it will then poll every {} seconds (reverting to default)",
 
 117                 configuration.getCbsPollingInterval());
 
 118         fetchConfig(Duration.ofSeconds(60), Duration.ofSeconds(configuration.getCbsPollingInterval()));
 
 121     private void fetchConfig(Duration initialDelay, Duration period) {
 
 122         RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
 
 124         // Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name)
 
 125         EnvProperties env = EnvProperties.fromEnvironment();
 
 127         // Create the client and use it to get the configuration
 
 128         cbsFetchPipeline = CbsClientFactory.createCbsClient(env)
 
 129                 .doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e))
 
 131                 .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, initialDelay, period))
 
 132                 .subscribe(this::parseConsulRetrievedConfiguration, this::handleErrors);
 
 136     GeneratedAppConfigObject generateAppConfigObject(JsonObject configObject) {
 
 138         if (LOGGER.isInfoEnabled()) {
 
 139             String configAsString = gson.toJson(configObject);
 
 140             LOGGER.info("Received App Config object\n{}", configAsString);
 
 143         final String dmaapProtocol = configObject.get("dmaap.protocol").getAsString();
 
 144         final String dmaapContentType  = configObject.get("dmaap.contentType").getAsString();
 
 145         final String dmaapConsumerId  = configObject.get("dmaap.consumer.consumerId").getAsString();
 
 146         final String dmaapConsumerGroup = configObject.get("dmaap.consumer.consumerGroup").getAsString();
 
 147         final int dmaapMessageLimit  = configObject.get("dmaap.messageLimit").getAsInt();
 
 148         final int dmaapTimeoutMs  = configObject.get("dmaap.timeoutMs").getAsInt();
 
 150         final String aaiHost = configObject.get("aai.host").getAsString();
 
 151         final int aaiPort = configObject.get("aai.port").getAsInt();
 
 152         final String aaiProtocol = configObject.get("aai.protocol").getAsString();
 
 153         final String aaiUsername = configObject.get("aai.username").getAsString();
 
 154         final String aaiPassword = configObject.get("aai.password").getAsString();
 
 155         final boolean aaiIgnoreSslCertificateErrors =
 
 156                 configObject.get("aai.aaiIgnoreSslCertificateErrors").getAsBoolean();
 
 158         final int pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt();
 
 159         final int pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt();
 
 160         final int cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt();
 
 162         final String reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString();
 
 163         final String reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString();
 
 164         final String cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString();
 
 165         final String cpeAuthClControlName =
 
 166                 configObject.get("application.cpe.authentication.clControlName").getAsString();
 
 168         final String policyVersion = configObject.get("application.policyVersion").getAsString();
 
 169         final String closeLoopTargetType = configObject.get("application.clTargetType").getAsString();
 
 170         final String closeLoopEventStatus = configObject.get("application.clEventStatus").getAsString();
 
 171         final String closeLoopVersion = configObject.get("application.clVersion").getAsString();
 
 172         final String closeLoopTarget = configObject.get("application.clTarget").getAsString();
 
 173         final String closeLoopOriginator = configObject.get("application.clOriginator").getAsString();
 
 175         final String reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString();
 
 176         final String cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString();
 
 177         final String closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString();
 
 179         final JsonObject streamsPublishes = configObject.getAsJsonObject("streams_publishes");
 
 180         final JsonObject streamsSubscribes = configObject.getAsJsonObject("streams_subscribes");
 
 182         return ImmutableGeneratedAppConfigObject.builder()
 
 183                 .dmaapProtocol(dmaapProtocol)
 
 184                 .dmaapContentType(dmaapContentType)
 
 185                 .dmaapConsumerConsumerId(dmaapConsumerId)
 
 186                 .dmaapConsumerConsumerGroup(dmaapConsumerGroup)
 
 187                 .dmaapMessageLimit(dmaapMessageLimit)
 
 188                 .dmaapTimeoutMs(dmaapTimeoutMs)
 
 191                 .aaiProtocol(aaiProtocol)
 
 192                 .aaiUsername(aaiUsername)
 
 193                 .aaiPassword(aaiPassword)
 
 194                 .aaiIgnoreSslCertificateErrors(aaiIgnoreSslCertificateErrors)
 
 195                 .pipelinesPollingIntervalSec(pipelinesPollingIntervalSec)
 
 196                 .pipelinesTimeoutSec(pipelinesTimeoutSec)
 
 197                 .cbsPollingIntervalSec(cbsPollingIntervalSec)
 
 198                 .reRegistrationPolicyScope(reRegPolicyScope)
 
 199                 .reRegistrationClControlName(reRegClControlName)
 
 200                 .policyVersion(policyVersion)
 
 201                 .closeLoopTargetType(closeLoopTargetType)
 
 202                 .closeLoopEventStatus(closeLoopEventStatus)
 
 203                 .closeLoopVersion(closeLoopVersion)
 
 204                 .closeLoopTarget(closeLoopTarget)
 
 205                 .closeLoopOriginator(closeLoopOriginator)
 
 206                 .cpeAuthPolicyScope(cpeAuthPolicyScope)
 
 207                 .cpeAuthClControlName(cpeAuthClControlName)
 
 208                 .reRegConfigKey(reRegConfigKey)
 
 209                 .cpeAuthConfigKey(cpeAuthConfigKey)
 
 210                 .closeLoopConfigKey(closeLoopConfigKey)
 
 211                 .streamSubscribesMap(parseStreamsObjects(streamsSubscribes))
 
 212                 .streamPublishesMap(parseStreamsObjects(streamsPublishes))
 
 216     private Set<String> getMissingEnvironmentVariables() {
 
 217         Set<String> missingVars = new HashSet<>();
 
 218         if (System.getenv().get(CONSUL_HOST) == null) {
 
 219             missingVars.add(CONSUL_HOST);
 
 221         if (System.getenv().get(CONFIG_BINDING_SERVICE) == null) {
 
 222             missingVars.add(CONFIG_BINDING_SERVICE);
 
 224         if (System.getenv().get(HOSTNAME) == null) {
 
 225             missingVars.add(HOSTNAME);
 
 230     private Map<String, GeneratedAppConfigObject.StreamsObject> parseStreamsObjects(
 
 231             JsonObject jsonObject) {
 
 232         Map<String, GeneratedAppConfigObject.StreamsObject> streams = new HashMap<>();
 
 234         jsonObject.entrySet().stream()
 
 235                 .map(this::parseStreamsSingleObject)
 
 236                 .forEach(e -> streams.put(e.getKey(), e.getValue()));
 
 241     private Map.Entry<String, GeneratedAppConfigObject.StreamsObject> parseStreamsSingleObject(
 
 242             Map.Entry<String, JsonElement> jsonEntry) {
 
 244         JsonObject closeLoopOutput = (JsonObject) jsonEntry.getValue();
 
 246         String type = closeLoopOutput.get("type").getAsString();
 
 247         String aafUsername = closeLoopOutput.get("aaf_username") != null
 
 248                 ? closeLoopOutput.get("aaf_username").getAsString() : "";
 
 249         String aafPassword = closeLoopOutput.get("aaf_password") != null
 
 250                 ? closeLoopOutput.get("aaf_password").getAsString() : "";
 
 252         JsonObject dmaapInfo = closeLoopOutput.getAsJsonObject("dmaap_info");
 
 253         String clientId = dmaapInfo.get("client_id") != null
 
 254                 ? dmaapInfo.get("client_id").getAsString() : "";
 
 255         String clientRole = dmaapInfo.get("client_role") != null
 
 256                 ? dmaapInfo.get("client_role").getAsString() : "";
 
 257         String location = dmaapInfo.get("location") != null
 
 258                 ? dmaapInfo.get("location").getAsString() : "";
 
 259         String topicUrl = dmaapInfo.get("topic_url").getAsString();
 
 261         GeneratedAppConfigObject.DmaapInfo dmaapInfoObject = ImmutableDmaapInfo.builder()
 
 263                 .clientRole(clientRole)
 
 267         GeneratedAppConfigObject.StreamsObject streamsObject = ImmutableStreamsObject.builder()
 
 269                 .aafUsername(aafUsername)
 
 270                 .aafPassword(aafPassword)
 
 271                 .dmaapInfo(dmaapInfoObject)
 
 274         return new AbstractMap.SimpleEntry<>(jsonEntry.getKey(), streamsObject);