607b3b31166451a188a76dd2ed1740a94a787252
[dcaegen2/services.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
4  * ================================================================================
5  * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.bbs.event.processor.config;
22
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
27
28 import java.time.Duration;
29 import java.util.AbstractMap;
30 import java.util.HashMap;
31 import java.util.HashSet;
32 import java.util.Map;
33 import java.util.Set;
34
35 import org.jetbrains.annotations.NotNull;
36 import org.onap.bbs.event.processor.exceptions.ApplicationEnvironmentException;
37 import org.onap.bbs.event.processor.model.GeneratedAppConfigObject;
38 import org.onap.bbs.event.processor.model.ImmutableDmaapInfo;
39 import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject;
40 import org.onap.bbs.event.processor.model.ImmutableStreamsObject;
41 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
42 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
43 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
44 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
45 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import org.springframework.beans.factory.annotation.Autowired;
49 import org.springframework.stereotype.Component;
50
51 import reactor.core.Disposable;
52
53 @Component
54 public class ConsulConfigurationGateway {
55
56     private static final Logger LOGGER = LoggerFactory.getLogger(ConsulConfigurationGateway.class);
57
58     private static final String CONSUL_HOST = "CONSUL_HOST";
59     private static final String CONFIG_BINDING_SERVICE = "CONFIG_BINDING_SERVICE";
60     private static final String HOSTNAME = "HOSTNAME";
61
62     private final ApplicationConfiguration configuration;
63     private Gson gson;
64     private Disposable cbsFetchPipeline;
65
66     @Autowired
67     ConsulConfigurationGateway(ApplicationConfiguration configuration) {
68         this.configuration = configuration;
69         gson = new GsonBuilder().setPrettyPrinting().create();
70     }
71
72     /**
73      * Periodically fetch application configuration via CBS service of DCAE.
74      * @param initialDelay initial delay before initiation of polling
75      * @param period polling interval
76      */
77     public void periodicallyFetchConfigFromCbs(Duration initialDelay, Duration period) {
78         if (environmentNotReady()) {
79             throw new ApplicationEnvironmentException(
80                     String.format("Application Environment missing critical parameters: %s",
81                             getMissingEnvironmentVariables()));
82         }
83
84         fetchConfig(initialDelay, period);
85     }
86
87     boolean environmentNotReady() {
88         String consulHost = System.getenv().get(CONSUL_HOST);
89         String cbs = System.getenv().get(CONFIG_BINDING_SERVICE);
90         String hostname = System.getenv().get(HOSTNAME);
91         return consulHost == null || cbs == null || hostname == null;
92     }
93
94     /**
95      * Reschedule application configuration periodic retrieval via CBS service of DCAE.
96      * @param initialDelay initial delay before rescheduling
97      * @param period new polling interval
98      */
99     public void rescheduleCbsConfigurationRetrieval(Duration initialDelay, Duration period) {
100         if (cbsFetchPipeline != null && !cbsFetchPipeline.isDisposed()) {
101             LOGGER.info("Disposing old CBS Config fetch job");
102             cbsFetchPipeline.dispose();
103         }
104         periodicallyFetchConfigFromCbs(initialDelay, period);
105     }
106
107     private void parseConsulRetrievedConfiguration(JsonObject jsonObject) {
108
109         GeneratedAppConfigObject generatedAppConfigObject = generateAppConfigObject(jsonObject);
110         LOGGER.trace("Consul-Retrieved Application Generated Object:\n{}", generatedAppConfigObject);
111         configuration.updateCurrentConfiguration(generatedAppConfigObject);
112     }
113
114     private void handleErrors(Throwable throwable) {
115         LOGGER.error("Periodic CBS configuration polling was terminated with error: {}", throwable);
116         LOGGER.info("Will restart CBS configuration fetching job due to abnormal termination."
117                 + " Will start fetching after 60 seconds (please correct configuration in the meantime)"
118                 + " and it will then poll every {} seconds (reverting to default)",
119                 configuration.getCbsPollingInterval());
120         fetchConfig(Duration.ofSeconds(60), Duration.ofSeconds(configuration.getCbsPollingInterval()));
121     }
122
123     private void fetchConfig(Duration initialDelay, Duration period) {
124         RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
125
126         // Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name)
127         EnvProperties env = EnvProperties.fromEnvironment();
128         CbsRequest cbsRequest = CbsRequests.getConfiguration(diagnosticContext);
129         // Create the client and use it to get the configuration
130         cbsFetchPipeline = CbsClientFactory.createCbsClient(env)
131                 .doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e))
132                 .retry(e -> true)
133                 .flatMapMany(cbsClient -> cbsClient.updates(cbsRequest, initialDelay, period))
134                 .subscribe(this::parseConsulRetrievedConfiguration, this::handleErrors);
135     }
136
137     @NotNull
138     GeneratedAppConfigObject generateAppConfigObject(JsonObject configObject) {
139
140         if (LOGGER.isInfoEnabled()) {
141             String configAsString = gson.toJson(configObject);
142             LOGGER.info("Received App Config object\n{}", configAsString);
143         }
144
145         final String dmaapProtocol = configObject.get("dmaap.protocol").getAsString();
146         final String dmaapContentType  = configObject.get("dmaap.contentType").getAsString();
147         final String dmaapConsumerId  = configObject.get("dmaap.consumer.consumerId").getAsString();
148         final String dmaapConsumerGroup = configObject.get("dmaap.consumer.consumerGroup").getAsString();
149         final int dmaapMessageLimit  = configObject.get("dmaap.messageLimit").getAsInt();
150         final int dmaapTimeoutMs  = configObject.get("dmaap.timeoutMs").getAsInt();
151
152         final String aaiHost = configObject.get("aai.host").getAsString();
153         final int aaiPort = configObject.get("aai.port").getAsInt();
154         final String aaiProtocol = configObject.get("aai.protocol").getAsString();
155         final String aaiUsername = configObject.get("aai.username").getAsString();
156         final String aaiPassword = configObject.get("aai.password").getAsString();
157         final boolean aaiIgnoreSslCertificateErrors =
158                 configObject.get("aai.aaiIgnoreSslCertificateErrors").getAsBoolean();
159
160         final int pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt();
161         final int pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt();
162         final int cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt();
163
164         final String reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString();
165         final String reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString();
166         final String cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString();
167         final String cpeAuthClControlName =
168                 configObject.get("application.cpe.authentication.clControlName").getAsString();
169
170         final String policyVersion = configObject.get("application.policyVersion").getAsString();
171         final String closeLoopTargetType = configObject.get("application.clTargetType").getAsString();
172         final String closeLoopEventStatus = configObject.get("application.clEventStatus").getAsString();
173         final String closeLoopVersion = configObject.get("application.clVersion").getAsString();
174         final String closeLoopTarget = configObject.get("application.clTarget").getAsString();
175         final String closeLoopOriginator = configObject.get("application.clOriginator").getAsString();
176
177         final String reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString();
178         final String cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString();
179         final String closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString();
180
181         final String loggingLevel = configObject.get("application.loggingLevel").getAsString();
182
183         final String keyStorePath = configObject.get("application.ssl.keyStorePath").getAsString();
184         final String keyStorePasswordPath = configObject.get("application.ssl.keyStorePasswordPath").getAsString();
185         final String trustStorePath = configObject.get("application.ssl.trustStorePath").getAsString();
186         final String trustStorePasswordPath = configObject.get("application.ssl.trustStorePasswordPath").getAsString();
187         final boolean aaiEnableCertAuth = configObject.get("application.ssl.enableAaiCertAuth").getAsBoolean();
188         final boolean dmaapEnableCertAuth = configObject.get("application.ssl.enableDmaapCertAuth").getAsBoolean();
189
190         final JsonObject streamsPublishes = configObject.getAsJsonObject("streams_publishes");
191         final JsonObject streamsSubscribes = configObject.getAsJsonObject("streams_subscribes");
192
193         return ImmutableGeneratedAppConfigObject.builder()
194                 .dmaapProtocol(dmaapProtocol)
195                 .dmaapContentType(dmaapContentType)
196                 .dmaapConsumerConsumerId(dmaapConsumerId)
197                 .dmaapConsumerConsumerGroup(dmaapConsumerGroup)
198                 .dmaapMessageLimit(dmaapMessageLimit)
199                 .dmaapTimeoutMs(dmaapTimeoutMs)
200                 .aaiHost(aaiHost)
201                 .aaiPort(aaiPort)
202                 .aaiProtocol(aaiProtocol)
203                 .aaiUsername(aaiUsername)
204                 .aaiPassword(aaiPassword)
205                 .aaiIgnoreSslCertificateErrors(aaiIgnoreSslCertificateErrors)
206                 .pipelinesPollingIntervalSec(pipelinesPollingIntervalSec)
207                 .pipelinesTimeoutSec(pipelinesTimeoutSec)
208                 .cbsPollingIntervalSec(cbsPollingIntervalSec)
209                 .reRegistrationPolicyScope(reRegPolicyScope)
210                 .reRegistrationClControlName(reRegClControlName)
211                 .policyVersion(policyVersion)
212                 .closeLoopTargetType(closeLoopTargetType)
213                 .closeLoopEventStatus(closeLoopEventStatus)
214                 .closeLoopVersion(closeLoopVersion)
215                 .closeLoopTarget(closeLoopTarget)
216                 .closeLoopOriginator(closeLoopOriginator)
217                 .cpeAuthPolicyScope(cpeAuthPolicyScope)
218                 .cpeAuthClControlName(cpeAuthClControlName)
219                 .reRegConfigKey(reRegConfigKey)
220                 .cpeAuthConfigKey(cpeAuthConfigKey)
221                 .closeLoopConfigKey(closeLoopConfigKey)
222                 .loggingLevel(loggingLevel)
223                 .keyStorePath(keyStorePath)
224                 .keyStorePasswordPath(keyStorePasswordPath)
225                 .trustStorePath(trustStorePath)
226                 .trustStorePasswordPath(trustStorePasswordPath)
227                 .enableAaiCertAuth(aaiEnableCertAuth)
228                 .enableDmaapCertAuth(dmaapEnableCertAuth)
229                 .streamSubscribesMap(parseStreamsObjects(streamsSubscribes))
230                 .streamPublishesMap(parseStreamsObjects(streamsPublishes))
231                 .build();
232     }
233
234     private Set<String> getMissingEnvironmentVariables() {
235         Set<String> missingVars = new HashSet<>();
236         if (System.getenv().get(CONSUL_HOST) == null) {
237             missingVars.add(CONSUL_HOST);
238         }
239         if (System.getenv().get(CONFIG_BINDING_SERVICE) == null) {
240             missingVars.add(CONFIG_BINDING_SERVICE);
241         }
242         if (System.getenv().get(HOSTNAME) == null) {
243             missingVars.add(HOSTNAME);
244         }
245         return missingVars;
246     }
247
248     private Map<String, GeneratedAppConfigObject.StreamsObject> parseStreamsObjects(
249             JsonObject jsonObject) {
250         Map<String, GeneratedAppConfigObject.StreamsObject> streams = new HashMap<>();
251
252         jsonObject.entrySet().stream()
253                 .map(this::parseStreamsSingleObject)
254                 .forEach(e -> streams.put(e.getKey(), e.getValue()));
255
256         return streams;
257     }
258
259     private Map.Entry<String, GeneratedAppConfigObject.StreamsObject> parseStreamsSingleObject(
260             Map.Entry<String, JsonElement> jsonEntry) {
261
262         JsonObject closeLoopOutput = (JsonObject) jsonEntry.getValue();
263
264         String type = closeLoopOutput.get("type").getAsString();
265         String aafUsername = closeLoopOutput.get("aaf_username") != null
266                 ? closeLoopOutput.get("aaf_username").getAsString() : "";
267         String aafPassword = closeLoopOutput.get("aaf_password") != null
268                 ? closeLoopOutput.get("aaf_password").getAsString() : "";
269
270         JsonObject dmaapInfo = closeLoopOutput.getAsJsonObject("dmaap_info");
271         String clientId = dmaapInfo.get("client_id") != null
272                 ? dmaapInfo.get("client_id").getAsString() : "";
273         String clientRole = dmaapInfo.get("client_role") != null
274                 ? dmaapInfo.get("client_role").getAsString() : "";
275         String location = dmaapInfo.get("location") != null
276                 ? dmaapInfo.get("location").getAsString() : "";
277         String topicUrl = dmaapInfo.get("topic_url").getAsString();
278
279         GeneratedAppConfigObject.DmaapInfo dmaapInfoObject = ImmutableDmaapInfo.builder()
280                 .clientId(clientId)
281                 .clientRole(clientRole)
282                 .location(location)
283                 .topicUrl(topicUrl)
284                 .build();
285         GeneratedAppConfigObject.StreamsObject streamsObject = ImmutableStreamsObject.builder()
286                 .type(type)
287                 .aafUsername(aafUsername)
288                 .aafPassword(aafPassword)
289                 .dmaapInfo(dmaapInfoObject)
290                 .build();
291
292         return new AbstractMap.SimpleEntry<>(jsonEntry.getKey(), streamsObject);
293     }
294 }