2 * ============LICENSE_START=======================================================
3 * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
4 * ================================================================================
5 * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.bbs.event.processor.config;
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
28 import java.time.Duration;
29 import java.util.AbstractMap;
30 import java.util.HashMap;
31 import java.util.HashSet;
35 import org.jetbrains.annotations.NotNull;
36 import org.onap.bbs.event.processor.exceptions.ApplicationEnvironmentException;
37 import org.onap.bbs.event.processor.model.GeneratedAppConfigObject;
38 import org.onap.bbs.event.processor.model.ImmutableDmaapInfo;
39 import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject;
40 import org.onap.bbs.event.processor.model.ImmutableStreamsObject;
41 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
42 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
43 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46 import org.springframework.beans.factory.annotation.Autowired;
47 import org.springframework.stereotype.Component;
49 import reactor.core.Disposable;
52 public class ConsulConfigurationGateway {
54 private static final Logger LOGGER = LoggerFactory.getLogger(ConsulConfigurationGateway.class);
56 private static final String CONSUL_HOST = "CONSUL_HOST";
57 private static final String CONFIG_BINDING_SERVICE = "CONFIG_BINDING_SERVICE";
58 private static final String HOSTNAME = "HOSTNAME";
60 private final ApplicationConfiguration configuration;
62 private Disposable cbsFetchPipeline;
65 ConsulConfigurationGateway(ApplicationConfiguration configuration) {
66 this.configuration = configuration;
67 gson = new GsonBuilder().setPrettyPrinting().create();
71 * Periodically fetch application configuration via CBS service of DCAE.
72 * @param initialDelay initial delay before initiation of polling
73 * @param period polling interval
75 public void periodicallyFetchConfigFromCbs(Duration initialDelay, Duration period) {
76 if (environmentNotReady()) {
77 throw new ApplicationEnvironmentException(
78 String.format("Application Environment missing critical parameters: %s",
79 getMissingEnvironmentVariables()));
82 RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
84 // Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name)
85 EnvProperties env = EnvProperties.fromEnvironment();
87 // Create the client and use it to get the configuration
88 cbsFetchPipeline = CbsClientFactory.createCbsClient(env)
89 .doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e))
91 .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, initialDelay, period))
92 .subscribe(this::parseConsulRetrievedConfiguration, this::handleErrors);
95 boolean environmentNotReady() {
96 String consulHost = System.getenv().get(CONSUL_HOST);
97 String cbs = System.getenv().get(CONFIG_BINDING_SERVICE);
98 String hostname = System.getenv().get(HOSTNAME);
99 return consulHost == null || cbs == null || hostname == null;
103 * Reschedule application configuration periodic retrieval via CBS service of DCAE.
104 * @param initialDelay initial delay before rescheduling
105 * @param period new polling interval
107 public void rescheduleCbsConfigurationRetrieval(Duration initialDelay, Duration period) {
108 if (cbsFetchPipeline != null && !cbsFetchPipeline.isDisposed()) {
109 LOGGER.info("Disposing old CBS Config fetch job");
110 cbsFetchPipeline.dispose();
112 periodicallyFetchConfigFromCbs(initialDelay, period);
115 private void parseConsulRetrievedConfiguration(JsonObject jsonObject) {
117 GeneratedAppConfigObject generatedAppConfigObject = generateAppConfigObject(jsonObject);
118 LOGGER.trace("Consul-Retrieved Application Generated Object:\n{}", generatedAppConfigObject);
119 configuration.updateCurrentConfiguration(generatedAppConfigObject);
122 private void handleErrors(Throwable throwable) {
123 LOGGER.error("Periodic CBS configuration polling was terminated with error: {}", throwable);
127 GeneratedAppConfigObject generateAppConfigObject(JsonObject configObject) {
129 if (LOGGER.isInfoEnabled()) {
130 String configAsString = gson.toJson(configObject);
131 LOGGER.info("Received App Config object\n{}", configAsString);
134 final String dmaapProtocol = configObject.get("dmaap.protocol").getAsString();
135 final String dmaapContentType = configObject.get("dmaap.contentType").getAsString();
136 final String dmaapConsumerId = configObject.get("dmaap.consumer.consumerId").getAsString();
137 final String dmaapConsumerGroup = configObject.get("dmaap.consumer.consumerGroup").getAsString();
138 final int dmaapMessageLimit = configObject.get("dmaap.messageLimit").getAsInt();
139 final int dmaapTimeoutMs = configObject.get("dmaap.timeoutMs").getAsInt();
141 final String aaiHost = configObject.get("aai.host").getAsString();
142 final int aaiPort = configObject.get("aai.port").getAsInt();
143 final String aaiProtocol = configObject.get("aai.protocol").getAsString();
144 final String aaiUsername = configObject.get("aai.username").getAsString();
145 final String aaiPassword = configObject.get("aai.password").getAsString();
146 final boolean aaiIgnoreSslCertificateErrors =
147 configObject.get("aai.aaiIgnoreSslCertificateErrors").getAsBoolean();
149 final int pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt();
150 final int pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt();
151 final int cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt();
152 final String reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString();
153 final String reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString();
154 final String cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString();
155 final String cpeAuthClControlName =
156 configObject.get("application.cpe.authentication.clControlName").getAsString();
157 final String reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString();
158 final String cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString();
159 final String closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString();
161 final JsonObject streamsPublishes = configObject.getAsJsonObject("streams_publishes");
162 final JsonObject streamsSubscribes = configObject.getAsJsonObject("streams_subscribes");
164 return ImmutableGeneratedAppConfigObject.builder()
165 .dmaapProtocol(dmaapProtocol)
166 .dmaapContentType(dmaapContentType)
167 .dmaapConsumerConsumerId(dmaapConsumerId)
168 .dmaapConsumerConsumerGroup(dmaapConsumerGroup)
169 .dmaapMessageLimit(dmaapMessageLimit)
170 .dmaapTimeoutMs(dmaapTimeoutMs)
173 .aaiProtocol(aaiProtocol)
174 .aaiUsername(aaiUsername)
175 .aaiPassword(aaiPassword)
176 .aaiIgnoreSslCertificateErrors(aaiIgnoreSslCertificateErrors)
177 .pipelinesPollingIntervalSec(pipelinesPollingIntervalSec)
178 .pipelinesTimeoutSec(pipelinesTimeoutSec)
179 .cbsPollingIntervalSec(cbsPollingIntervalSec)
180 .reRegistrationPolicyScope(reRegPolicyScope)
181 .reRegistrationClControlName(reRegClControlName)
182 .cpeAuthPolicyScope(cpeAuthPolicyScope)
183 .cpeAuthClControlName(cpeAuthClControlName)
184 .reRegConfigKey(reRegConfigKey)
185 .cpeAuthConfigKey(cpeAuthConfigKey)
186 .closeLoopConfigKey(closeLoopConfigKey)
187 .streamSubscribesMap(parseStreamsObjects(streamsSubscribes))
188 .streamPublishesMap(parseStreamsObjects(streamsPublishes))
192 private Set<String> getMissingEnvironmentVariables() {
193 Set<String> missingVars = new HashSet<>();
194 if (System.getenv().get(CONSUL_HOST) == null) {
195 missingVars.add(CONSUL_HOST);
197 if (System.getenv().get(CONFIG_BINDING_SERVICE) == null) {
198 missingVars.add(CONFIG_BINDING_SERVICE);
200 if (System.getenv().get(HOSTNAME) == null) {
201 missingVars.add(HOSTNAME);
206 private Map<String, GeneratedAppConfigObject.StreamsObject> parseStreamsObjects(
207 JsonObject jsonObject) {
208 Map<String, GeneratedAppConfigObject.StreamsObject> streams = new HashMap<>();
210 jsonObject.entrySet().stream()
211 .map(this::parseStreamsSingleObject)
212 .forEach(e -> streams.put(e.getKey(), e.getValue()));
217 private Map.Entry<String, GeneratedAppConfigObject.StreamsObject> parseStreamsSingleObject(
218 Map.Entry<String, JsonElement> jsonEntry) {
220 JsonObject closeLoopOutput = (JsonObject) jsonEntry.getValue();
222 String type = closeLoopOutput.get("type").getAsString();
223 String aafUsername = closeLoopOutput.get("aaf_username") != null
224 ? closeLoopOutput.get("aaf_username").getAsString() : "";
225 String aafPassword = closeLoopOutput.get("aaf_password") != null
226 ? closeLoopOutput.get("aaf_password").getAsString() : "";
228 JsonObject dmaapInfo = closeLoopOutput.getAsJsonObject("dmaap_info");
229 String clientId = dmaapInfo.get("client_id") != null
230 ? dmaapInfo.get("client_id").getAsString() : "";
231 String clientRole = dmaapInfo.get("client_role") != null
232 ? dmaapInfo.get("client_role").getAsString() : "";
233 String location = dmaapInfo.get("location") != null
234 ? dmaapInfo.get("location").getAsString() : "";
235 String topicUrl = dmaapInfo.get("topic_url").getAsString();
237 GeneratedAppConfigObject.DmaapInfo dmaapInfoObject = ImmutableDmaapInfo.builder()
239 .clientRole(clientRole)
243 GeneratedAppConfigObject.StreamsObject streamsObject = ImmutableStreamsObject.builder()
245 .aafUsername(aafUsername)
246 .aafPassword(aafPassword)
247 .dmaapInfo(dmaapInfoObject)
250 return new AbstractMap.SimpleEntry<>(jsonEntry.getKey(), streamsObject);