2 * ============LICENSE_START=======================================================
3 * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
4 * ================================================================================
5 * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.bbs.event.processor.config;
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
28 import java.time.Duration;
29 import java.util.AbstractMap;
30 import java.util.HashMap;
31 import java.util.HashSet;
35 import org.jetbrains.annotations.NotNull;
36 import org.onap.bbs.event.processor.exceptions.ApplicationEnvironmentException;
37 import org.onap.bbs.event.processor.model.GeneratedAppConfigObject;
38 import org.onap.bbs.event.processor.model.ImmutableDmaapInfo;
39 import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject;
40 import org.onap.bbs.event.processor.model.ImmutableStreamsObject;
41 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
42 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
43 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
44 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47 import org.springframework.beans.factory.annotation.Autowired;
48 import org.springframework.stereotype.Component;
50 import reactor.core.Disposable;
53 public class ConsulConfigurationGateway {
55 private static final Logger LOGGER = LoggerFactory.getLogger(ConsulConfigurationGateway.class);
57 private static final String CONSUL_HOST = "CONSUL_HOST";
58 private static final String CONFIG_BINDING_SERVICE = "CONFIG_BINDING_SERVICE";
59 private static final String HOSTNAME = "HOSTNAME";
61 private final ApplicationConfiguration configuration;
63 private Disposable cbsFetchPipeline;
66 ConsulConfigurationGateway(ApplicationConfiguration configuration) {
67 this.configuration = configuration;
68 gson = new GsonBuilder().setPrettyPrinting().create();
72 * Periodically fetch application configuration via CBS service of DCAE.
73 * @param initialDelay initial delay before initiation of polling
74 * @param period polling interval
76 public void periodicallyFetchConfigFromCbs(Duration initialDelay, Duration period) {
77 if (environmentNotReady()) {
78 throw new ApplicationEnvironmentException(
79 String.format("Application Environment missing critical parameters: %s",
80 getMissingEnvironmentVariables()));
83 fetchConfig(initialDelay, period);
86 boolean environmentNotReady() {
87 var consulHost = System.getenv().get(CONSUL_HOST);
88 var cbs = System.getenv().get(CONFIG_BINDING_SERVICE);
89 var hostname = System.getenv().get(HOSTNAME);
90 return consulHost == null || cbs == null || hostname == null;
94 * Reschedule application configuration periodic retrieval via CBS service of DCAE.
95 * @param initialDelay initial delay before rescheduling
96 * @param period new polling interval
98 public void rescheduleCbsConfigurationRetrieval(Duration initialDelay, Duration period) {
99 if (cbsFetchPipeline != null && !cbsFetchPipeline.isDisposed()) {
100 LOGGER.info("Disposing old CBS Config fetch job");
101 cbsFetchPipeline.dispose();
103 periodicallyFetchConfigFromCbs(initialDelay, period);
106 private void parseConsulRetrievedConfiguration(JsonObject jsonObject) {
108 var generatedAppConfigObject = generateAppConfigObject(jsonObject);
109 LOGGER.trace("Consul-Retrieved Application Generated Object:\n{}", generatedAppConfigObject);
110 configuration.updateCurrentConfiguration(generatedAppConfigObject);
113 private void handleErrors(Throwable throwable) {
114 LOGGER.error("Periodic CBS configuration polling was terminated with error: {}", throwable);
115 LOGGER.info("Will restart CBS configuration fetching job due to abnormal termination."
116 + " Will start fetching after 60 seconds (please correct configuration in the meantime)"
117 + " and it will then poll every {} seconds (reverting to default)",
118 configuration.getCbsPollingInterval());
119 fetchConfig(Duration.ofSeconds(60), Duration.ofSeconds(configuration.getCbsPollingInterval()));
122 private void fetchConfig(Duration initialDelay, Duration period) {
123 RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
125 // Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name)
126 var cbsClientConfig = CbsClientConfiguration.fromEnvironment();
127 var cbsRequest = CbsRequests.getConfiguration(diagnosticContext);
128 // Create the client and use it to get the configuration
129 cbsFetchPipeline = CbsClientFactory.createCbsClient(cbsClientConfig)
130 .doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e))
132 .flatMapMany(cbsClient -> cbsClient.updates(cbsRequest, initialDelay, period))
133 .subscribe(this::parseConsulRetrievedConfiguration, this::handleErrors);
137 GeneratedAppConfigObject generateAppConfigObject(JsonObject configObject) {
139 if (LOGGER.isInfoEnabled()) {
140 var configAsString = gson.toJson(configObject);
141 LOGGER.info("Received App Config object\n{}", configAsString);
144 final var dmaapProtocol = configObject.get("dmaap.protocol").getAsString();
145 final var dmaapContentType = configObject.get("dmaap.contentType").getAsString();
146 final var dmaapConsumerId = configObject.get("dmaap.consumer.consumerId").getAsString();
147 final var dmaapConsumerGroup = configObject.get("dmaap.consumer.consumerGroup").getAsString();
148 final var dmaapMessageLimit = configObject.get("dmaap.messageLimit").getAsInt();
149 final var dmaapTimeoutMs = configObject.get("dmaap.timeoutMs").getAsInt();
151 final var aaiHost = configObject.get("aai.host").getAsString();
152 final var aaiPort = configObject.get("aai.port").getAsInt();
153 final var aaiProtocol = configObject.get("aai.protocol").getAsString();
154 final var aaiUsername = configObject.get("aai.username").getAsString();
155 final var aaiPassword = configObject.get("aai.password").getAsString();
156 final var aaiIgnoreSslCertificateErrors =
157 configObject.get("aai.aaiIgnoreSslCertificateErrors").getAsBoolean();
159 final var pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt();
160 final var pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt();
161 final var cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt();
163 final var reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString();
164 final var reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString();
165 final var cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString();
166 final var cpeAuthClControlName =
167 configObject.get("application.cpe.authentication.clControlName").getAsString();
169 final var policyVersion = configObject.get("application.policyVersion").getAsString();
170 final var closeLoopTargetType = configObject.get("application.clTargetType").getAsString();
171 final var closeLoopEventStatus = configObject.get("application.clEventStatus").getAsString();
172 final var closeLoopVersion = configObject.get("application.clVersion").getAsString();
173 final var closeLoopTarget = configObject.get("application.clTarget").getAsString();
174 final var closeLoopOriginator = configObject.get("application.clOriginator").getAsString();
176 final var reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString();
177 final var cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString();
178 final var closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString();
180 final var loggingLevel = configObject.get("application.loggingLevel").getAsString();
182 final var keyStorePath = configObject.get("application.ssl.keyStorePath").getAsString();
183 final var keyStorePasswordPath = configObject.get("application.ssl.keyStorePasswordPath").getAsString();
184 final var trustStorePath = configObject.get("application.ssl.trustStorePath").getAsString();
185 final var trustStorePasswordPath = configObject.get("application.ssl.trustStorePasswordPath").getAsString();
186 final var aaiEnableCertAuth = configObject.get("application.ssl.enableAaiCertAuth").getAsBoolean();
187 final var dmaapEnableCertAuth = configObject.get("application.ssl.enableDmaapCertAuth").getAsBoolean();
189 final var streamsPublishes = configObject.getAsJsonObject("streams_publishes");
190 final var streamsSubscribes = configObject.getAsJsonObject("streams_subscribes");
192 return ImmutableGeneratedAppConfigObject.builder()
193 .dmaapProtocol(dmaapProtocol)
194 .dmaapContentType(dmaapContentType)
195 .dmaapConsumerConsumerId(dmaapConsumerId)
196 .dmaapConsumerConsumerGroup(dmaapConsumerGroup)
197 .dmaapMessageLimit(dmaapMessageLimit)
198 .dmaapTimeoutMs(dmaapTimeoutMs)
201 .aaiProtocol(aaiProtocol)
202 .aaiUsername(aaiUsername)
203 .aaiPassword(aaiPassword)
204 .aaiIgnoreSslCertificateErrors(aaiIgnoreSslCertificateErrors)
205 .pipelinesPollingIntervalSec(pipelinesPollingIntervalSec)
206 .pipelinesTimeoutSec(pipelinesTimeoutSec)
207 .cbsPollingIntervalSec(cbsPollingIntervalSec)
208 .reRegistrationPolicyScope(reRegPolicyScope)
209 .reRegistrationClControlName(reRegClControlName)
210 .policyVersion(policyVersion)
211 .closeLoopTargetType(closeLoopTargetType)
212 .closeLoopEventStatus(closeLoopEventStatus)
213 .closeLoopVersion(closeLoopVersion)
214 .closeLoopTarget(closeLoopTarget)
215 .closeLoopOriginator(closeLoopOriginator)
216 .cpeAuthPolicyScope(cpeAuthPolicyScope)
217 .cpeAuthClControlName(cpeAuthClControlName)
218 .reRegConfigKey(reRegConfigKey)
219 .cpeAuthConfigKey(cpeAuthConfigKey)
220 .closeLoopConfigKey(closeLoopConfigKey)
221 .loggingLevel(loggingLevel)
222 .keyStorePath(keyStorePath)
223 .keyStorePasswordPath(keyStorePasswordPath)
224 .trustStorePath(trustStorePath)
225 .trustStorePasswordPath(trustStorePasswordPath)
226 .enableAaiCertAuth(aaiEnableCertAuth)
227 .enableDmaapCertAuth(dmaapEnableCertAuth)
228 .streamSubscribesMap(parseStreamsObjects(streamsSubscribes))
229 .streamPublishesMap(parseStreamsObjects(streamsPublishes))
233 private Set<String> getMissingEnvironmentVariables() {
234 Set<String> missingVars = new HashSet<>();
235 if (System.getenv().get(CONSUL_HOST) == null) {
236 missingVars.add(CONSUL_HOST);
238 if (System.getenv().get(CONFIG_BINDING_SERVICE) == null) {
239 missingVars.add(CONFIG_BINDING_SERVICE);
241 if (System.getenv().get(HOSTNAME) == null) {
242 missingVars.add(HOSTNAME);
247 private Map<String, GeneratedAppConfigObject.StreamsObject> parseStreamsObjects(
248 JsonObject jsonObject) {
249 Map<String, GeneratedAppConfigObject.StreamsObject> streams = new HashMap<>();
251 jsonObject.entrySet().stream()
252 .map(this::parseStreamsSingleObject)
253 .forEach(e -> streams.put(e.getKey(), e.getValue()));
258 private Map.Entry<String, GeneratedAppConfigObject.StreamsObject> parseStreamsSingleObject(
259 Map.Entry<String, JsonElement> jsonEntry) {
261 var closeLoopOutput = (JsonObject) jsonEntry.getValue();
263 var type = closeLoopOutput.get("type").getAsString();
264 var aafUsername = closeLoopOutput.get("aaf_username") != null
265 ? closeLoopOutput.get("aaf_username").getAsString() : "";
266 var aafPassword = closeLoopOutput.get("aaf_password") != null
267 ? closeLoopOutput.get("aaf_password").getAsString() : "";
269 var dmaapInfo = closeLoopOutput.getAsJsonObject("dmaap_info");
270 var clientId = dmaapInfo.get("client_id") != null
271 ? dmaapInfo.get("client_id").getAsString() : "";
272 var clientRole = dmaapInfo.get("client_role") != null
273 ? dmaapInfo.get("client_role").getAsString() : "";
274 var location = dmaapInfo.get("location") != null
275 ? dmaapInfo.get("location").getAsString() : "";
276 var topicUrl = dmaapInfo.get("topic_url").getAsString();
278 GeneratedAppConfigObject.DmaapInfo dmaapInfoObject = ImmutableDmaapInfo.builder()
280 .clientRole(clientRole)
284 GeneratedAppConfigObject.StreamsObject streamsObject = ImmutableStreamsObject.builder()
286 .aafUsername(aafUsername)
287 .aafPassword(aafPassword)
288 .dmaapInfo(dmaapInfoObject)
291 return new AbstractMap.SimpleEntry<>(jsonEntry.getKey(), streamsObject);