2 * ============LICENSE_START=======================================================
3 * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
4 * ================================================================================
5 * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.bbs.event.processor.config;
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
28 import java.time.Duration;
29 import java.util.AbstractMap;
30 import java.util.HashMap;
31 import java.util.HashSet;
35 import org.jetbrains.annotations.NotNull;
36 import org.onap.bbs.event.processor.exceptions.ApplicationEnvironmentException;
37 import org.onap.bbs.event.processor.model.GeneratedAppConfigObject;
38 import org.onap.bbs.event.processor.model.ImmutableDmaapInfo;
39 import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject;
40 import org.onap.bbs.event.processor.model.ImmutableStreamsObject;
41 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
42 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
43 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46 import org.springframework.beans.factory.annotation.Autowired;
47 import org.springframework.stereotype.Component;
49 import reactor.core.Disposable;
52 public class ConsulConfigurationGateway {
54 private static final Logger LOGGER = LoggerFactory.getLogger(ConsulConfigurationGateway.class);
56 private static final String CONSUL_HOST = "CONSUL_HOST";
57 private static final String CONFIG_BINDING_SERVICE = "CONFIG_BINDING_SERVICE";
58 private static final String HOSTNAME = "HOSTNAME";
60 private final ApplicationConfiguration configuration;
62 private Disposable cbsFetchPipeline;
65 ConsulConfigurationGateway(ApplicationConfiguration configuration) {
66 this.configuration = configuration;
67 gson = new GsonBuilder().setPrettyPrinting().create();
71 * Periodically fetch application configuration via CBS service of DCAE.
72 * @param initialDelay initial delay before initiation of polling
73 * @param period polling interval
75 public void periodicallyFetchConfigFromCbs(Duration initialDelay, Duration period) {
76 if (environmentNotReady()) {
77 throw new ApplicationEnvironmentException(
78 String.format("Application Environment missing critical parameters: %s",
79 getMissingEnvironmentVariables()));
82 fetchConfig(initialDelay, period);
85 boolean environmentNotReady() {
86 String consulHost = System.getenv().get(CONSUL_HOST);
87 String cbs = System.getenv().get(CONFIG_BINDING_SERVICE);
88 String hostname = System.getenv().get(HOSTNAME);
89 return consulHost == null || cbs == null || hostname == null;
93 * Reschedule application configuration periodic retrieval via CBS service of DCAE.
94 * @param initialDelay initial delay before rescheduling
95 * @param period new polling interval
97 public void rescheduleCbsConfigurationRetrieval(Duration initialDelay, Duration period) {
98 if (cbsFetchPipeline != null && !cbsFetchPipeline.isDisposed()) {
99 LOGGER.info("Disposing old CBS Config fetch job");
100 cbsFetchPipeline.dispose();
102 periodicallyFetchConfigFromCbs(initialDelay, period);
105 private void parseConsulRetrievedConfiguration(JsonObject jsonObject) {
107 GeneratedAppConfigObject generatedAppConfigObject = generateAppConfigObject(jsonObject);
108 LOGGER.trace("Consul-Retrieved Application Generated Object:\n{}", generatedAppConfigObject);
109 configuration.updateCurrentConfiguration(generatedAppConfigObject);
112 private void handleErrors(Throwable throwable) {
113 LOGGER.error("Periodic CBS configuration polling was terminated with error: {}", throwable);
114 LOGGER.info("Will restart CBS configuration fetching job due to abnormal termination."
115 + " Will start fetching after 60 seconds (please correct configuration in the meantime)"
116 + " and it will then poll every {} seconds (reverting to default)",
117 configuration.getCbsPollingInterval());
118 fetchConfig(Duration.ofSeconds(60), Duration.ofSeconds(configuration.getCbsPollingInterval()));
121 private void fetchConfig(Duration initialDelay, Duration period) {
122 RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
124 // Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name)
125 EnvProperties env = EnvProperties.fromEnvironment();
127 // Create the client and use it to get the configuration
128 cbsFetchPipeline = CbsClientFactory.createCbsClient(env)
129 .doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e))
131 .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, initialDelay, period))
132 .subscribe(this::parseConsulRetrievedConfiguration, this::handleErrors);
136 GeneratedAppConfigObject generateAppConfigObject(JsonObject configObject) {
138 if (LOGGER.isInfoEnabled()) {
139 String configAsString = gson.toJson(configObject);
140 LOGGER.info("Received App Config object\n{}", configAsString);
143 final String dmaapProtocol = configObject.get("dmaap.protocol").getAsString();
144 final String dmaapContentType = configObject.get("dmaap.contentType").getAsString();
145 final String dmaapConsumerId = configObject.get("dmaap.consumer.consumerId").getAsString();
146 final String dmaapConsumerGroup = configObject.get("dmaap.consumer.consumerGroup").getAsString();
147 final int dmaapMessageLimit = configObject.get("dmaap.messageLimit").getAsInt();
148 final int dmaapTimeoutMs = configObject.get("dmaap.timeoutMs").getAsInt();
150 final String aaiHost = configObject.get("aai.host").getAsString();
151 final int aaiPort = configObject.get("aai.port").getAsInt();
152 final String aaiProtocol = configObject.get("aai.protocol").getAsString();
153 final String aaiUsername = configObject.get("aai.username").getAsString();
154 final String aaiPassword = configObject.get("aai.password").getAsString();
155 final boolean aaiIgnoreSslCertificateErrors =
156 configObject.get("aai.aaiIgnoreSslCertificateErrors").getAsBoolean();
158 final int pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt();
159 final int pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt();
160 final int cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt();
162 final String reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString();
163 final String reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString();
164 final String cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString();
165 final String cpeAuthClControlName =
166 configObject.get("application.cpe.authentication.clControlName").getAsString();
168 final String policyVersion = configObject.get("application.policyVersion").getAsString();
169 final String closeLoopTargetType = configObject.get("application.clTargetType").getAsString();
170 final String closeLoopEventStatus = configObject.get("application.clEventStatus").getAsString();
171 final String closeLoopVersion = configObject.get("application.clVersion").getAsString();
172 final String closeLoopTarget = configObject.get("application.clTarget").getAsString();
173 final String closeLoopOriginator = configObject.get("application.clOriginator").getAsString();
175 final String reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString();
176 final String cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString();
177 final String closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString();
179 final JsonObject streamsPublishes = configObject.getAsJsonObject("streams_publishes");
180 final JsonObject streamsSubscribes = configObject.getAsJsonObject("streams_subscribes");
182 return ImmutableGeneratedAppConfigObject.builder()
183 .dmaapProtocol(dmaapProtocol)
184 .dmaapContentType(dmaapContentType)
185 .dmaapConsumerConsumerId(dmaapConsumerId)
186 .dmaapConsumerConsumerGroup(dmaapConsumerGroup)
187 .dmaapMessageLimit(dmaapMessageLimit)
188 .dmaapTimeoutMs(dmaapTimeoutMs)
191 .aaiProtocol(aaiProtocol)
192 .aaiUsername(aaiUsername)
193 .aaiPassword(aaiPassword)
194 .aaiIgnoreSslCertificateErrors(aaiIgnoreSslCertificateErrors)
195 .pipelinesPollingIntervalSec(pipelinesPollingIntervalSec)
196 .pipelinesTimeoutSec(pipelinesTimeoutSec)
197 .cbsPollingIntervalSec(cbsPollingIntervalSec)
198 .reRegistrationPolicyScope(reRegPolicyScope)
199 .reRegistrationClControlName(reRegClControlName)
200 .policyVersion(policyVersion)
201 .closeLoopTargetType(closeLoopTargetType)
202 .closeLoopEventStatus(closeLoopEventStatus)
203 .closeLoopVersion(closeLoopVersion)
204 .closeLoopTarget(closeLoopTarget)
205 .closeLoopOriginator(closeLoopOriginator)
206 .cpeAuthPolicyScope(cpeAuthPolicyScope)
207 .cpeAuthClControlName(cpeAuthClControlName)
208 .reRegConfigKey(reRegConfigKey)
209 .cpeAuthConfigKey(cpeAuthConfigKey)
210 .closeLoopConfigKey(closeLoopConfigKey)
211 .streamSubscribesMap(parseStreamsObjects(streamsSubscribes))
212 .streamPublishesMap(parseStreamsObjects(streamsPublishes))
216 private Set<String> getMissingEnvironmentVariables() {
217 Set<String> missingVars = new HashSet<>();
218 if (System.getenv().get(CONSUL_HOST) == null) {
219 missingVars.add(CONSUL_HOST);
221 if (System.getenv().get(CONFIG_BINDING_SERVICE) == null) {
222 missingVars.add(CONFIG_BINDING_SERVICE);
224 if (System.getenv().get(HOSTNAME) == null) {
225 missingVars.add(HOSTNAME);
230 private Map<String, GeneratedAppConfigObject.StreamsObject> parseStreamsObjects(
231 JsonObject jsonObject) {
232 Map<String, GeneratedAppConfigObject.StreamsObject> streams = new HashMap<>();
234 jsonObject.entrySet().stream()
235 .map(this::parseStreamsSingleObject)
236 .forEach(e -> streams.put(e.getKey(), e.getValue()));
241 private Map.Entry<String, GeneratedAppConfigObject.StreamsObject> parseStreamsSingleObject(
242 Map.Entry<String, JsonElement> jsonEntry) {
244 JsonObject closeLoopOutput = (JsonObject) jsonEntry.getValue();
246 String type = closeLoopOutput.get("type").getAsString();
247 String aafUsername = closeLoopOutput.get("aaf_username") != null
248 ? closeLoopOutput.get("aaf_username").getAsString() : "";
249 String aafPassword = closeLoopOutput.get("aaf_password") != null
250 ? closeLoopOutput.get("aaf_password").getAsString() : "";
252 JsonObject dmaapInfo = closeLoopOutput.getAsJsonObject("dmaap_info");
253 String clientId = dmaapInfo.get("client_id") != null
254 ? dmaapInfo.get("client_id").getAsString() : "";
255 String clientRole = dmaapInfo.get("client_role") != null
256 ? dmaapInfo.get("client_role").getAsString() : "";
257 String location = dmaapInfo.get("location") != null
258 ? dmaapInfo.get("location").getAsString() : "";
259 String topicUrl = dmaapInfo.get("topic_url").getAsString();
261 GeneratedAppConfigObject.DmaapInfo dmaapInfoObject = ImmutableDmaapInfo.builder()
263 .clientRole(clientRole)
267 GeneratedAppConfigObject.StreamsObject streamsObject = ImmutableStreamsObject.builder()
269 .aafUsername(aafUsername)
270 .aafPassword(aafPassword)
271 .dmaapInfo(dmaapInfoObject)
274 return new AbstractMap.SimpleEntry<>(jsonEntry.getKey(), streamsObject);