2 * ============LICENSE_START=======================================================
3 * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
4 * ================================================================================
5 * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.bbs.event.processor.config;
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
28 import java.time.Duration;
29 import java.util.AbstractMap;
30 import java.util.HashMap;
31 import java.util.HashSet;
35 import org.jetbrains.annotations.NotNull;
36 import org.onap.bbs.event.processor.exceptions.ApplicationEnvironmentException;
37 import org.onap.bbs.event.processor.model.GeneratedAppConfigObject;
38 import org.onap.bbs.event.processor.model.ImmutableDmaapInfo;
39 import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject;
40 import org.onap.bbs.event.processor.model.ImmutableStreamsObject;
41 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
42 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
43 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46 import org.springframework.beans.factory.annotation.Autowired;
47 import org.springframework.stereotype.Component;
49 import reactor.core.Disposable;
52 public class ConsulConfigurationGateway {
54 private static final Logger LOGGER = LoggerFactory.getLogger(ConsulConfigurationGateway.class);
56 private static final String CONSUL_HOST = "CONSUL_HOST";
57 private static final String CONFIG_BINDING_SERVICE = "CONFIG_BINDING_SERVICE";
58 private static final String HOSTNAME = "HOSTNAME";
60 private final ApplicationConfiguration configuration;
62 private Disposable cbsFetchPipeline;
65 ConsulConfigurationGateway(ApplicationConfiguration configuration) {
66 this.configuration = configuration;
67 gson = new GsonBuilder().setPrettyPrinting().create();
71 * Periodically fetch application configuration via CBS service of DCAE.
72 * @param initialDelay initial delay before initiation of polling
73 * @param period polling interval
75 public void periodicallyFetchConfigFromCbs(Duration initialDelay, Duration period) {
76 if (environmentNotReady()) {
77 throw new ApplicationEnvironmentException(
78 String.format("Application Environment missing critical parameters: %s",
79 getMissingEnvironmentVariables()));
82 fetchConfig(initialDelay, period);
85 boolean environmentNotReady() {
86 String consulHost = System.getenv().get(CONSUL_HOST);
87 String cbs = System.getenv().get(CONFIG_BINDING_SERVICE);
88 String hostname = System.getenv().get(HOSTNAME);
89 return consulHost == null || cbs == null || hostname == null;
93 * Reschedule application configuration periodic retrieval via CBS service of DCAE.
94 * @param initialDelay initial delay before rescheduling
95 * @param period new polling interval
97 public void rescheduleCbsConfigurationRetrieval(Duration initialDelay, Duration period) {
98 if (cbsFetchPipeline != null && !cbsFetchPipeline.isDisposed()) {
99 LOGGER.info("Disposing old CBS Config fetch job");
100 cbsFetchPipeline.dispose();
102 periodicallyFetchConfigFromCbs(initialDelay, period);
105 private void parseConsulRetrievedConfiguration(JsonObject jsonObject) {
107 GeneratedAppConfigObject generatedAppConfigObject = generateAppConfigObject(jsonObject);
108 LOGGER.trace("Consul-Retrieved Application Generated Object:\n{}", generatedAppConfigObject);
109 configuration.updateCurrentConfiguration(generatedAppConfigObject);
112 private void handleErrors(Throwable throwable) {
113 LOGGER.error("Periodic CBS configuration polling was terminated with error: {}", throwable);
114 LOGGER.info("Will restart CBS configuration fetching job due to abnormal termination."
115 + " Will start fetching after 60 seconds (please correct configuration in the meantime)"
116 + " and it will then poll every {} seconds (reverting to default)",
117 configuration.getCbsPollingInterval());
118 fetchConfig(Duration.ofSeconds(60), Duration.ofSeconds(configuration.getCbsPollingInterval()));
121 private void fetchConfig(Duration initialDelay, Duration period) {
122 RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
124 // Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name)
125 EnvProperties env = EnvProperties.fromEnvironment();
127 // Create the client and use it to get the configuration
128 cbsFetchPipeline = CbsClientFactory.createCbsClient(env)
129 .doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e))
131 .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, initialDelay, period))
132 .subscribe(this::parseConsulRetrievedConfiguration, this::handleErrors);
136 GeneratedAppConfigObject generateAppConfigObject(JsonObject configObject) {
138 if (LOGGER.isInfoEnabled()) {
139 String configAsString = gson.toJson(configObject);
140 LOGGER.info("Received App Config object\n{}", configAsString);
143 final String dmaapProtocol = configObject.get("dmaap.protocol").getAsString();
144 final String dmaapContentType = configObject.get("dmaap.contentType").getAsString();
145 final String dmaapConsumerId = configObject.get("dmaap.consumer.consumerId").getAsString();
146 final String dmaapConsumerGroup = configObject.get("dmaap.consumer.consumerGroup").getAsString();
147 final int dmaapMessageLimit = configObject.get("dmaap.messageLimit").getAsInt();
148 final int dmaapTimeoutMs = configObject.get("dmaap.timeoutMs").getAsInt();
150 final String aaiHost = configObject.get("aai.host").getAsString();
151 final int aaiPort = configObject.get("aai.port").getAsInt();
152 final String aaiProtocol = configObject.get("aai.protocol").getAsString();
153 final String aaiUsername = configObject.get("aai.username").getAsString();
154 final String aaiPassword = configObject.get("aai.password").getAsString();
155 final boolean aaiIgnoreSslCertificateErrors =
156 configObject.get("aai.aaiIgnoreSslCertificateErrors").getAsBoolean();
158 final int pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt();
159 final int pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt();
160 final int cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt();
162 final String reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString();
163 final String reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString();
164 final String cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString();
165 final String cpeAuthClControlName =
166 configObject.get("application.cpe.authentication.clControlName").getAsString();
168 final String policyVersion = configObject.get("application.policyVersion").getAsString();
169 final String closeLoopTargetType = configObject.get("application.clTargetType").getAsString();
170 final String closeLoopEventStatus = configObject.get("application.clEventStatus").getAsString();
171 final String closeLoopVersion = configObject.get("application.clVersion").getAsString();
172 final String closeLoopTarget = configObject.get("application.clTarget").getAsString();
173 final String closeLoopOriginator = configObject.get("application.clOriginator").getAsString();
175 final String reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString();
176 final String cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString();
177 final String closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString();
179 final String loggingLevel = configObject.get("application.loggingLevel").getAsString();
181 final JsonObject streamsPublishes = configObject.getAsJsonObject("streams_publishes");
182 final JsonObject streamsSubscribes = configObject.getAsJsonObject("streams_subscribes");
184 return ImmutableGeneratedAppConfigObject.builder()
185 .dmaapProtocol(dmaapProtocol)
186 .dmaapContentType(dmaapContentType)
187 .dmaapConsumerConsumerId(dmaapConsumerId)
188 .dmaapConsumerConsumerGroup(dmaapConsumerGroup)
189 .dmaapMessageLimit(dmaapMessageLimit)
190 .dmaapTimeoutMs(dmaapTimeoutMs)
193 .aaiProtocol(aaiProtocol)
194 .aaiUsername(aaiUsername)
195 .aaiPassword(aaiPassword)
196 .aaiIgnoreSslCertificateErrors(aaiIgnoreSslCertificateErrors)
197 .pipelinesPollingIntervalSec(pipelinesPollingIntervalSec)
198 .pipelinesTimeoutSec(pipelinesTimeoutSec)
199 .cbsPollingIntervalSec(cbsPollingIntervalSec)
200 .reRegistrationPolicyScope(reRegPolicyScope)
201 .reRegistrationClControlName(reRegClControlName)
202 .policyVersion(policyVersion)
203 .closeLoopTargetType(closeLoopTargetType)
204 .closeLoopEventStatus(closeLoopEventStatus)
205 .closeLoopVersion(closeLoopVersion)
206 .closeLoopTarget(closeLoopTarget)
207 .closeLoopOriginator(closeLoopOriginator)
208 .cpeAuthPolicyScope(cpeAuthPolicyScope)
209 .cpeAuthClControlName(cpeAuthClControlName)
210 .reRegConfigKey(reRegConfigKey)
211 .cpeAuthConfigKey(cpeAuthConfigKey)
212 .closeLoopConfigKey(closeLoopConfigKey)
213 .loggingLevel(loggingLevel)
214 .streamSubscribesMap(parseStreamsObjects(streamsSubscribes))
215 .streamPublishesMap(parseStreamsObjects(streamsPublishes))
219 private Set<String> getMissingEnvironmentVariables() {
220 Set<String> missingVars = new HashSet<>();
221 if (System.getenv().get(CONSUL_HOST) == null) {
222 missingVars.add(CONSUL_HOST);
224 if (System.getenv().get(CONFIG_BINDING_SERVICE) == null) {
225 missingVars.add(CONFIG_BINDING_SERVICE);
227 if (System.getenv().get(HOSTNAME) == null) {
228 missingVars.add(HOSTNAME);
233 private Map<String, GeneratedAppConfigObject.StreamsObject> parseStreamsObjects(
234 JsonObject jsonObject) {
235 Map<String, GeneratedAppConfigObject.StreamsObject> streams = new HashMap<>();
237 jsonObject.entrySet().stream()
238 .map(this::parseStreamsSingleObject)
239 .forEach(e -> streams.put(e.getKey(), e.getValue()));
244 private Map.Entry<String, GeneratedAppConfigObject.StreamsObject> parseStreamsSingleObject(
245 Map.Entry<String, JsonElement> jsonEntry) {
247 JsonObject closeLoopOutput = (JsonObject) jsonEntry.getValue();
249 String type = closeLoopOutput.get("type").getAsString();
250 String aafUsername = closeLoopOutput.get("aaf_username") != null
251 ? closeLoopOutput.get("aaf_username").getAsString() : "";
252 String aafPassword = closeLoopOutput.get("aaf_password") != null
253 ? closeLoopOutput.get("aaf_password").getAsString() : "";
255 JsonObject dmaapInfo = closeLoopOutput.getAsJsonObject("dmaap_info");
256 String clientId = dmaapInfo.get("client_id") != null
257 ? dmaapInfo.get("client_id").getAsString() : "";
258 String clientRole = dmaapInfo.get("client_role") != null
259 ? dmaapInfo.get("client_role").getAsString() : "";
260 String location = dmaapInfo.get("location") != null
261 ? dmaapInfo.get("location").getAsString() : "";
262 String topicUrl = dmaapInfo.get("topic_url").getAsString();
264 GeneratedAppConfigObject.DmaapInfo dmaapInfoObject = ImmutableDmaapInfo.builder()
266 .clientRole(clientRole)
270 GeneratedAppConfigObject.StreamsObject streamsObject = ImmutableStreamsObject.builder()
272 .aafUsername(aafUsername)
273 .aafPassword(aafPassword)
274 .dmaapInfo(dmaapInfoObject)
277 return new AbstractMap.SimpleEntry<>(jsonEntry.getKey(), streamsObject);