4ac854955c6792dbdb8b92f5ec76e41741f3232d
[dcaegen2/services.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
4  * ================================================================================
5  * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.bbs.event.processor.config;
22
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
27
28 import java.time.Duration;
29 import java.util.AbstractMap;
30 import java.util.HashMap;
31 import java.util.HashSet;
32 import java.util.Map;
33 import java.util.Set;
34
35 import org.jetbrains.annotations.NotNull;
36 import org.onap.bbs.event.processor.exceptions.ApplicationEnvironmentException;
37 import org.onap.bbs.event.processor.model.GeneratedAppConfigObject;
38 import org.onap.bbs.event.processor.model.ImmutableDmaapInfo;
39 import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject;
40 import org.onap.bbs.event.processor.model.ImmutableStreamsObject;
41 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
42 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
43 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
44 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47 import org.springframework.beans.factory.annotation.Autowired;
48 import org.springframework.stereotype.Component;
49
50 import reactor.core.Disposable;
51
52 @Component
53 public class ConsulConfigurationGateway {
54
55     private static final Logger LOGGER = LoggerFactory.getLogger(ConsulConfigurationGateway.class);
56
57     private static final String CONSUL_HOST = "CONSUL_HOST";
58     private static final String CONFIG_BINDING_SERVICE = "CONFIG_BINDING_SERVICE";
59     private static final String HOSTNAME = "HOSTNAME";
60
61     private final ApplicationConfiguration configuration;
62     private Gson gson;
63     private Disposable cbsFetchPipeline;
64
65     @Autowired
66     ConsulConfigurationGateway(ApplicationConfiguration configuration) {
67         this.configuration = configuration;
68         gson = new GsonBuilder().setPrettyPrinting().create();
69     }
70
71     /**
72      * Periodically fetch application configuration via CBS service of DCAE.
73      * @param initialDelay initial delay before initiation of polling
74      * @param period polling interval
75      */
76     public void periodicallyFetchConfigFromCbs(Duration initialDelay, Duration period) {
77         if (environmentNotReady()) {
78             throw new ApplicationEnvironmentException(
79                     String.format("Application Environment missing critical parameters: %s",
80                             getMissingEnvironmentVariables()));
81         }
82
83         fetchConfig(initialDelay, period);
84     }
85
86     boolean environmentNotReady() {
87         var consulHost = System.getenv().get(CONSUL_HOST);
88         var cbs = System.getenv().get(CONFIG_BINDING_SERVICE);
89         var hostname = System.getenv().get(HOSTNAME);
90         return consulHost == null || cbs == null || hostname == null;
91     }
92
93     /**
94      * Reschedule application configuration periodic retrieval via CBS service of DCAE.
95      * @param initialDelay initial delay before rescheduling
96      * @param period new polling interval
97      */
98     public void rescheduleCbsConfigurationRetrieval(Duration initialDelay, Duration period) {
99         if (cbsFetchPipeline != null && !cbsFetchPipeline.isDisposed()) {
100             LOGGER.info("Disposing old CBS Config fetch job");
101             cbsFetchPipeline.dispose();
102         }
103         periodicallyFetchConfigFromCbs(initialDelay, period);
104     }
105
106     private void parseConsulRetrievedConfiguration(JsonObject jsonObject) {
107
108         var generatedAppConfigObject = generateAppConfigObject(jsonObject);
109         LOGGER.trace("Consul-Retrieved Application Generated Object:\n{}", generatedAppConfigObject);
110         configuration.updateCurrentConfiguration(generatedAppConfigObject);
111     }
112
113     private void handleErrors(Throwable throwable) {
114         LOGGER.error("Periodic CBS configuration polling was terminated with error: {}", throwable);
115         LOGGER.info("Will restart CBS configuration fetching job due to abnormal termination."
116                 + " Will start fetching after 60 seconds (please correct configuration in the meantime)"
117                 + " and it will then poll every {} seconds (reverting to default)",
118                 configuration.getCbsPollingInterval());
119         fetchConfig(Duration.ofSeconds(60), Duration.ofSeconds(configuration.getCbsPollingInterval()));
120     }
121
122     private void fetchConfig(Duration initialDelay, Duration period) {
123         RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
124
125         // Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name)
126         var cbsClientConfig = CbsClientConfiguration.fromEnvironment();
127         var cbsRequest = CbsRequests.getConfiguration(diagnosticContext);
128         // Create the client and use it to get the configuration
129         cbsFetchPipeline = CbsClientFactory.createCbsClient(cbsClientConfig)
130                 .doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e))
131                 .retry(e -> true)
132                 .flatMapMany(cbsClient -> cbsClient.updates(cbsRequest, initialDelay, period))
133                 .subscribe(this::parseConsulRetrievedConfiguration, this::handleErrors);
134     }
135
136     @NotNull
137     GeneratedAppConfigObject generateAppConfigObject(JsonObject configObject) {
138
139         if (LOGGER.isInfoEnabled()) {
140             var configAsString = gson.toJson(configObject);
141             LOGGER.info("Received App Config object\n{}", configAsString);
142         }
143
144         final var dmaapProtocol = configObject.get("dmaap.protocol").getAsString();
145         final var dmaapContentType  = configObject.get("dmaap.contentType").getAsString();
146         final var dmaapConsumerId  = configObject.get("dmaap.consumer.consumerId").getAsString();
147         final var dmaapConsumerGroup = configObject.get("dmaap.consumer.consumerGroup").getAsString();
148         final var dmaapMessageLimit  = configObject.get("dmaap.messageLimit").getAsInt();
149         final var dmaapTimeoutMs  = configObject.get("dmaap.timeoutMs").getAsInt();
150
151         final var aaiHost = configObject.get("aai.host").getAsString();
152         final var aaiPort = configObject.get("aai.port").getAsInt();
153         final var aaiProtocol = configObject.get("aai.protocol").getAsString();
154         final var aaiUsername = configObject.get("aai.username").getAsString();
155         final var aaiPassword = configObject.get("aai.password").getAsString();
156         final var aaiIgnoreSslCertificateErrors =
157                 configObject.get("aai.aaiIgnoreSslCertificateErrors").getAsBoolean();
158
159         final var pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt();
160         final var pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt();
161         final var cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt();
162
163         final var reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString();
164         final var reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString();
165         final var cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString();
166         final var cpeAuthClControlName =
167                 configObject.get("application.cpe.authentication.clControlName").getAsString();
168
169         final var policyVersion = configObject.get("application.policyVersion").getAsString();
170         final var closeLoopTargetType = configObject.get("application.clTargetType").getAsString();
171         final var closeLoopEventStatus = configObject.get("application.clEventStatus").getAsString();
172         final var closeLoopVersion = configObject.get("application.clVersion").getAsString();
173         final var closeLoopTarget = configObject.get("application.clTarget").getAsString();
174         final var closeLoopOriginator = configObject.get("application.clOriginator").getAsString();
175
176         final var reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString();
177         final var cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString();
178         final var closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString();
179
180         final var loggingLevel = configObject.get("application.loggingLevel").getAsString();
181
182         final var keyStorePath = configObject.get("application.ssl.keyStorePath").getAsString();
183         final var keyStorePasswordPath = configObject.get("application.ssl.keyStorePasswordPath").getAsString();
184         final var trustStorePath = configObject.get("application.ssl.trustStorePath").getAsString();
185         final var trustStorePasswordPath = configObject.get("application.ssl.trustStorePasswordPath").getAsString();
186         final var aaiEnableCertAuth = configObject.get("application.ssl.enableAaiCertAuth").getAsBoolean();
187         final var dmaapEnableCertAuth = configObject.get("application.ssl.enableDmaapCertAuth").getAsBoolean();
188
189         final var streamsPublishes = configObject.getAsJsonObject("streams_publishes");
190         final var streamsSubscribes = configObject.getAsJsonObject("streams_subscribes");
191
192         return ImmutableGeneratedAppConfigObject.builder()
193                 .dmaapProtocol(dmaapProtocol)
194                 .dmaapContentType(dmaapContentType)
195                 .dmaapConsumerConsumerId(dmaapConsumerId)
196                 .dmaapConsumerConsumerGroup(dmaapConsumerGroup)
197                 .dmaapMessageLimit(dmaapMessageLimit)
198                 .dmaapTimeoutMs(dmaapTimeoutMs)
199                 .aaiHost(aaiHost)
200                 .aaiPort(aaiPort)
201                 .aaiProtocol(aaiProtocol)
202                 .aaiUsername(aaiUsername)
203                 .aaiPassword(aaiPassword)
204                 .aaiIgnoreSslCertificateErrors(aaiIgnoreSslCertificateErrors)
205                 .pipelinesPollingIntervalSec(pipelinesPollingIntervalSec)
206                 .pipelinesTimeoutSec(pipelinesTimeoutSec)
207                 .cbsPollingIntervalSec(cbsPollingIntervalSec)
208                 .reRegistrationPolicyScope(reRegPolicyScope)
209                 .reRegistrationClControlName(reRegClControlName)
210                 .policyVersion(policyVersion)
211                 .closeLoopTargetType(closeLoopTargetType)
212                 .closeLoopEventStatus(closeLoopEventStatus)
213                 .closeLoopVersion(closeLoopVersion)
214                 .closeLoopTarget(closeLoopTarget)
215                 .closeLoopOriginator(closeLoopOriginator)
216                 .cpeAuthPolicyScope(cpeAuthPolicyScope)
217                 .cpeAuthClControlName(cpeAuthClControlName)
218                 .reRegConfigKey(reRegConfigKey)
219                 .cpeAuthConfigKey(cpeAuthConfigKey)
220                 .closeLoopConfigKey(closeLoopConfigKey)
221                 .loggingLevel(loggingLevel)
222                 .keyStorePath(keyStorePath)
223                 .keyStorePasswordPath(keyStorePasswordPath)
224                 .trustStorePath(trustStorePath)
225                 .trustStorePasswordPath(trustStorePasswordPath)
226                 .enableAaiCertAuth(aaiEnableCertAuth)
227                 .enableDmaapCertAuth(dmaapEnableCertAuth)
228                 .streamSubscribesMap(parseStreamsObjects(streamsSubscribes))
229                 .streamPublishesMap(parseStreamsObjects(streamsPublishes))
230                 .build();
231     }
232
233     private Set<String> getMissingEnvironmentVariables() {
234         Set<String> missingVars = new HashSet<>();
235         if (System.getenv().get(CONSUL_HOST) == null) {
236             missingVars.add(CONSUL_HOST);
237         }
238         if (System.getenv().get(CONFIG_BINDING_SERVICE) == null) {
239             missingVars.add(CONFIG_BINDING_SERVICE);
240         }
241         if (System.getenv().get(HOSTNAME) == null) {
242             missingVars.add(HOSTNAME);
243         }
244         return missingVars;
245     }
246
247     private Map<String, GeneratedAppConfigObject.StreamsObject> parseStreamsObjects(
248             JsonObject jsonObject) {
249         Map<String, GeneratedAppConfigObject.StreamsObject> streams = new HashMap<>();
250
251         jsonObject.entrySet().stream()
252                 .map(this::parseStreamsSingleObject)
253                 .forEach(e -> streams.put(e.getKey(), e.getValue()));
254
255         return streams;
256     }
257
258     private Map.Entry<String, GeneratedAppConfigObject.StreamsObject> parseStreamsSingleObject(
259             Map.Entry<String, JsonElement> jsonEntry) {
260
261         var closeLoopOutput = (JsonObject) jsonEntry.getValue();
262
263         var type = closeLoopOutput.get("type").getAsString();
264         var aafUsername = closeLoopOutput.get("aaf_username") != null
265                 ? closeLoopOutput.get("aaf_username").getAsString() : "";
266         var aafPassword = closeLoopOutput.get("aaf_password") != null
267                 ? closeLoopOutput.get("aaf_password").getAsString() : "";
268
269         var dmaapInfo = closeLoopOutput.getAsJsonObject("dmaap_info");
270         var clientId = dmaapInfo.get("client_id") != null
271                 ? dmaapInfo.get("client_id").getAsString() : "";
272         var clientRole = dmaapInfo.get("client_role") != null
273                 ? dmaapInfo.get("client_role").getAsString() : "";
274         var location = dmaapInfo.get("location") != null
275                 ? dmaapInfo.get("location").getAsString() : "";
276         var topicUrl = dmaapInfo.get("topic_url").getAsString();
277
278         GeneratedAppConfigObject.DmaapInfo dmaapInfoObject = ImmutableDmaapInfo.builder()
279                 .clientId(clientId)
280                 .clientRole(clientRole)
281                 .location(location)
282                 .topicUrl(topicUrl)
283                 .build();
284         GeneratedAppConfigObject.StreamsObject streamsObject = ImmutableStreamsObject.builder()
285                 .type(type)
286                 .aafUsername(aafUsername)
287                 .aafPassword(aafPassword)
288                 .dmaapInfo(dmaapInfoObject)
289                 .build();
290
291         return new AbstractMap.SimpleEntry<>(jsonEntry.getKey(), streamsObject);
292     }
293 }