2 * ============LICENSE_START=======================================================
3 * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
4 * ================================================================================
5 * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.bbs.event.processor.config;
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
28 import java.time.Duration;
29 import java.util.AbstractMap;
30 import java.util.HashMap;
31 import java.util.HashSet;
35 import org.jetbrains.annotations.NotNull;
36 import org.onap.bbs.event.processor.exceptions.ApplicationEnvironmentException;
37 import org.onap.bbs.event.processor.model.GeneratedAppConfigObject;
38 import org.onap.bbs.event.processor.model.ImmutableDmaapInfo;
39 import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject;
40 import org.onap.bbs.event.processor.model.ImmutableStreamsObject;
41 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
42 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
43 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
44 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
45 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import org.springframework.beans.factory.annotation.Autowired;
49 import org.springframework.stereotype.Component;
51 import reactor.core.Disposable;
54 public class ConsulConfigurationGateway {
56 private static final Logger LOGGER = LoggerFactory.getLogger(ConsulConfigurationGateway.class);
58 private static final String CONSUL_HOST = "CONSUL_HOST";
59 private static final String CONFIG_BINDING_SERVICE = "CONFIG_BINDING_SERVICE";
60 private static final String HOSTNAME = "HOSTNAME";
62 private final ApplicationConfiguration configuration;
64 private Disposable cbsFetchPipeline;
67 ConsulConfigurationGateway(ApplicationConfiguration configuration) {
68 this.configuration = configuration;
69 gson = new GsonBuilder().setPrettyPrinting().create();
73 * Periodically fetch application configuration via CBS service of DCAE.
74 * @param initialDelay initial delay before initiation of polling
75 * @param period polling interval
77 public void periodicallyFetchConfigFromCbs(Duration initialDelay, Duration period) {
78 if (environmentNotReady()) {
79 throw new ApplicationEnvironmentException(
80 String.format("Application Environment missing critical parameters: %s",
81 getMissingEnvironmentVariables()));
84 fetchConfig(initialDelay, period);
87 boolean environmentNotReady() {
88 String consulHost = System.getenv().get(CONSUL_HOST);
89 String cbs = System.getenv().get(CONFIG_BINDING_SERVICE);
90 String hostname = System.getenv().get(HOSTNAME);
91 return consulHost == null || cbs == null || hostname == null;
95 * Reschedule application configuration periodic retrieval via CBS service of DCAE.
96 * @param initialDelay initial delay before rescheduling
97 * @param period new polling interval
99 public void rescheduleCbsConfigurationRetrieval(Duration initialDelay, Duration period) {
100 if (cbsFetchPipeline != null && !cbsFetchPipeline.isDisposed()) {
101 LOGGER.info("Disposing old CBS Config fetch job");
102 cbsFetchPipeline.dispose();
104 periodicallyFetchConfigFromCbs(initialDelay, period);
107 private void parseConsulRetrievedConfiguration(JsonObject jsonObject) {
109 GeneratedAppConfigObject generatedAppConfigObject = generateAppConfigObject(jsonObject);
110 LOGGER.trace("Consul-Retrieved Application Generated Object:\n{}", generatedAppConfigObject);
111 configuration.updateCurrentConfiguration(generatedAppConfigObject);
114 private void handleErrors(Throwable throwable) {
115 LOGGER.error("Periodic CBS configuration polling was terminated with error: {}", throwable);
116 LOGGER.info("Will restart CBS configuration fetching job due to abnormal termination."
117 + " Will start fetching after 60 seconds (please correct configuration in the meantime)"
118 + " and it will then poll every {} seconds (reverting to default)",
119 configuration.getCbsPollingInterval());
120 fetchConfig(Duration.ofSeconds(60), Duration.ofSeconds(configuration.getCbsPollingInterval()));
123 private void fetchConfig(Duration initialDelay, Duration period) {
124 RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
126 // Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name)
127 EnvProperties env = EnvProperties.fromEnvironment();
128 CbsRequest cbsRequest = CbsRequests.getConfiguration(diagnosticContext);
129 // Create the client and use it to get the configuration
130 cbsFetchPipeline = CbsClientFactory.createCbsClient(env)
131 .doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e))
133 .flatMapMany(cbsClient -> cbsClient.updates(cbsRequest, initialDelay, period))
134 .subscribe(this::parseConsulRetrievedConfiguration, this::handleErrors);
138 GeneratedAppConfigObject generateAppConfigObject(JsonObject configObject) {
140 if (LOGGER.isInfoEnabled()) {
141 String configAsString = gson.toJson(configObject);
142 LOGGER.info("Received App Config object\n{}", configAsString);
145 final String dmaapProtocol = configObject.get("dmaap.protocol").getAsString();
146 final String dmaapContentType = configObject.get("dmaap.contentType").getAsString();
147 final String dmaapConsumerId = configObject.get("dmaap.consumer.consumerId").getAsString();
148 final String dmaapConsumerGroup = configObject.get("dmaap.consumer.consumerGroup").getAsString();
149 final int dmaapMessageLimit = configObject.get("dmaap.messageLimit").getAsInt();
150 final int dmaapTimeoutMs = configObject.get("dmaap.timeoutMs").getAsInt();
152 final String aaiHost = configObject.get("aai.host").getAsString();
153 final int aaiPort = configObject.get("aai.port").getAsInt();
154 final String aaiProtocol = configObject.get("aai.protocol").getAsString();
155 final String aaiUsername = configObject.get("aai.username").getAsString();
156 final String aaiPassword = configObject.get("aai.password").getAsString();
157 final boolean aaiIgnoreSslCertificateErrors =
158 configObject.get("aai.aaiIgnoreSslCertificateErrors").getAsBoolean();
160 final int pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt();
161 final int pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt();
162 final int cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt();
164 final String reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString();
165 final String reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString();
166 final String cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString();
167 final String cpeAuthClControlName =
168 configObject.get("application.cpe.authentication.clControlName").getAsString();
170 final String policyVersion = configObject.get("application.policyVersion").getAsString();
171 final String closeLoopTargetType = configObject.get("application.clTargetType").getAsString();
172 final String closeLoopEventStatus = configObject.get("application.clEventStatus").getAsString();
173 final String closeLoopVersion = configObject.get("application.clVersion").getAsString();
174 final String closeLoopTarget = configObject.get("application.clTarget").getAsString();
175 final String closeLoopOriginator = configObject.get("application.clOriginator").getAsString();
177 final String reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString();
178 final String cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString();
179 final String closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString();
181 final String loggingLevel = configObject.get("application.loggingLevel").getAsString();
183 final String keyStorePath = configObject.get("application.ssl.keyStorePath").getAsString();
184 final String keyStorePasswordPath = configObject.get("application.ssl.keyStorePasswordPath").getAsString();
185 final String trustStorePath = configObject.get("application.ssl.trustStorePath").getAsString();
186 final String trustStorePasswordPath = configObject.get("application.ssl.trustStorePasswordPath").getAsString();
187 final boolean aaiEnableCertAuth = configObject.get("application.ssl.enableAaiCertAuth").getAsBoolean();
188 final boolean dmaapEnableCertAuth = configObject.get("application.ssl.enableDmaapCertAuth").getAsBoolean();
190 final JsonObject streamsPublishes = configObject.getAsJsonObject("streams_publishes");
191 final JsonObject streamsSubscribes = configObject.getAsJsonObject("streams_subscribes");
193 return ImmutableGeneratedAppConfigObject.builder()
194 .dmaapProtocol(dmaapProtocol)
195 .dmaapContentType(dmaapContentType)
196 .dmaapConsumerConsumerId(dmaapConsumerId)
197 .dmaapConsumerConsumerGroup(dmaapConsumerGroup)
198 .dmaapMessageLimit(dmaapMessageLimit)
199 .dmaapTimeoutMs(dmaapTimeoutMs)
202 .aaiProtocol(aaiProtocol)
203 .aaiUsername(aaiUsername)
204 .aaiPassword(aaiPassword)
205 .aaiIgnoreSslCertificateErrors(aaiIgnoreSslCertificateErrors)
206 .pipelinesPollingIntervalSec(pipelinesPollingIntervalSec)
207 .pipelinesTimeoutSec(pipelinesTimeoutSec)
208 .cbsPollingIntervalSec(cbsPollingIntervalSec)
209 .reRegistrationPolicyScope(reRegPolicyScope)
210 .reRegistrationClControlName(reRegClControlName)
211 .policyVersion(policyVersion)
212 .closeLoopTargetType(closeLoopTargetType)
213 .closeLoopEventStatus(closeLoopEventStatus)
214 .closeLoopVersion(closeLoopVersion)
215 .closeLoopTarget(closeLoopTarget)
216 .closeLoopOriginator(closeLoopOriginator)
217 .cpeAuthPolicyScope(cpeAuthPolicyScope)
218 .cpeAuthClControlName(cpeAuthClControlName)
219 .reRegConfigKey(reRegConfigKey)
220 .cpeAuthConfigKey(cpeAuthConfigKey)
221 .closeLoopConfigKey(closeLoopConfigKey)
222 .loggingLevel(loggingLevel)
223 .keyStorePath(keyStorePath)
224 .keyStorePasswordPath(keyStorePasswordPath)
225 .trustStorePath(trustStorePath)
226 .trustStorePasswordPath(trustStorePasswordPath)
227 .enableAaiCertAuth(aaiEnableCertAuth)
228 .enableDmaapCertAuth(dmaapEnableCertAuth)
229 .streamSubscribesMap(parseStreamsObjects(streamsSubscribes))
230 .streamPublishesMap(parseStreamsObjects(streamsPublishes))
234 private Set<String> getMissingEnvironmentVariables() {
235 Set<String> missingVars = new HashSet<>();
236 if (System.getenv().get(CONSUL_HOST) == null) {
237 missingVars.add(CONSUL_HOST);
239 if (System.getenv().get(CONFIG_BINDING_SERVICE) == null) {
240 missingVars.add(CONFIG_BINDING_SERVICE);
242 if (System.getenv().get(HOSTNAME) == null) {
243 missingVars.add(HOSTNAME);
248 private Map<String, GeneratedAppConfigObject.StreamsObject> parseStreamsObjects(
249 JsonObject jsonObject) {
250 Map<String, GeneratedAppConfigObject.StreamsObject> streams = new HashMap<>();
252 jsonObject.entrySet().stream()
253 .map(this::parseStreamsSingleObject)
254 .forEach(e -> streams.put(e.getKey(), e.getValue()));
259 private Map.Entry<String, GeneratedAppConfigObject.StreamsObject> parseStreamsSingleObject(
260 Map.Entry<String, JsonElement> jsonEntry) {
262 JsonObject closeLoopOutput = (JsonObject) jsonEntry.getValue();
264 String type = closeLoopOutput.get("type").getAsString();
265 String aafUsername = closeLoopOutput.get("aaf_username") != null
266 ? closeLoopOutput.get("aaf_username").getAsString() : "";
267 String aafPassword = closeLoopOutput.get("aaf_password") != null
268 ? closeLoopOutput.get("aaf_password").getAsString() : "";
270 JsonObject dmaapInfo = closeLoopOutput.getAsJsonObject("dmaap_info");
271 String clientId = dmaapInfo.get("client_id") != null
272 ? dmaapInfo.get("client_id").getAsString() : "";
273 String clientRole = dmaapInfo.get("client_role") != null
274 ? dmaapInfo.get("client_role").getAsString() : "";
275 String location = dmaapInfo.get("location") != null
276 ? dmaapInfo.get("location").getAsString() : "";
277 String topicUrl = dmaapInfo.get("topic_url").getAsString();
279 GeneratedAppConfigObject.DmaapInfo dmaapInfoObject = ImmutableDmaapInfo.builder()
281 .clientRole(clientRole)
285 GeneratedAppConfigObject.StreamsObject streamsObject = ImmutableStreamsObject.builder()
287 .aafUsername(aafUsername)
288 .aafPassword(aafPassword)
289 .dmaapInfo(dmaapInfoObject)
292 return new AbstractMap.SimpleEntry<>(jsonEntry.getKey(), streamsObject);