1d27fc0ae4e6d2159625e921b4f751c8d876a393
[dcaegen2/services.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
4  * ================================================================================
5  * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.bbs.event.processor.config;
22
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
27
28 import java.time.Duration;
29 import java.util.AbstractMap;
30 import java.util.HashMap;
31 import java.util.HashSet;
32 import java.util.Map;
33 import java.util.Set;
34
35 import org.jetbrains.annotations.NotNull;
36 import org.onap.bbs.event.processor.exceptions.ApplicationEnvironmentException;
37 import org.onap.bbs.event.processor.model.GeneratedAppConfigObject;
38 import org.onap.bbs.event.processor.model.ImmutableDmaapInfo;
39 import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject;
40 import org.onap.bbs.event.processor.model.ImmutableStreamsObject;
41 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
42 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
43 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46 import org.springframework.beans.factory.annotation.Autowired;
47 import org.springframework.stereotype.Component;
48
49 import reactor.core.Disposable;
50
51 @Component
52 public class ConsulConfigurationGateway {
53
54     private static final Logger LOGGER = LoggerFactory.getLogger(ConsulConfigurationGateway.class);
55
56     private static final String CONSUL_HOST = "CONSUL_HOST";
57     private static final String CONFIG_BINDING_SERVICE = "CONFIG_BINDING_SERVICE";
58     private static final String HOSTNAME = "HOSTNAME";
59
60     private final ApplicationConfiguration configuration;
61     private Gson gson;
62     private Disposable cbsFetchPipeline;
63
64     @Autowired
65     ConsulConfigurationGateway(ApplicationConfiguration configuration) {
66         this.configuration = configuration;
67         gson = new GsonBuilder().setPrettyPrinting().create();
68     }
69
70     /**
71      * Periodically fetch application configuration via CBS service of DCAE.
72      * @param initialDelay initial delay before initiation of polling
73      * @param period polling interval
74      */
75     public void periodicallyFetchConfigFromCbs(Duration initialDelay, Duration period) {
76         if (environmentNotReady()) {
77             throw new ApplicationEnvironmentException(
78                     String.format("Application Environment missing critical parameters: %s",
79                             getMissingEnvironmentVariables()));
80         }
81
82         fetchConfig(initialDelay, period);
83     }
84
85     boolean environmentNotReady() {
86         String consulHost = System.getenv().get(CONSUL_HOST);
87         String cbs = System.getenv().get(CONFIG_BINDING_SERVICE);
88         String hostname = System.getenv().get(HOSTNAME);
89         return consulHost == null || cbs == null || hostname == null;
90     }
91
92     /**
93      * Reschedule application configuration periodic retrieval via CBS service of DCAE.
94      * @param initialDelay initial delay before rescheduling
95      * @param period new polling interval
96      */
97     public void rescheduleCbsConfigurationRetrieval(Duration initialDelay, Duration period) {
98         if (cbsFetchPipeline != null && !cbsFetchPipeline.isDisposed()) {
99             LOGGER.info("Disposing old CBS Config fetch job");
100             cbsFetchPipeline.dispose();
101         }
102         periodicallyFetchConfigFromCbs(initialDelay, period);
103     }
104
105     private void parseConsulRetrievedConfiguration(JsonObject jsonObject) {
106
107         GeneratedAppConfigObject generatedAppConfigObject = generateAppConfigObject(jsonObject);
108         LOGGER.trace("Consul-Retrieved Application Generated Object:\n{}", generatedAppConfigObject);
109         configuration.updateCurrentConfiguration(generatedAppConfigObject);
110     }
111
112     private void handleErrors(Throwable throwable) {
113         LOGGER.error("Periodic CBS configuration polling was terminated with error: {}", throwable);
114         LOGGER.info("Will restart CBS configuration fetching job due to abnormal termination."
115                 + " Will start fetching after 60 seconds (please correct configuration in the meantime)"
116                 + " and it will then poll every {} seconds (reverting to default)",
117                 configuration.getCbsPollingInterval());
118         fetchConfig(Duration.ofSeconds(60), Duration.ofSeconds(configuration.getCbsPollingInterval()));
119     }
120
121     private void fetchConfig(Duration initialDelay, Duration period) {
122         RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
123
124         // Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name)
125         EnvProperties env = EnvProperties.fromEnvironment();
126
127         // Create the client and use it to get the configuration
128         cbsFetchPipeline = CbsClientFactory.createCbsClient(env)
129                 .doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e))
130                 .retry(e -> true)
131                 .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, initialDelay, period))
132                 .subscribe(this::parseConsulRetrievedConfiguration, this::handleErrors);
133     }
134
135     @NotNull
136     GeneratedAppConfigObject generateAppConfigObject(JsonObject configObject) {
137
138         if (LOGGER.isInfoEnabled()) {
139             String configAsString = gson.toJson(configObject);
140             LOGGER.info("Received App Config object\n{}", configAsString);
141         }
142
143         final String dmaapProtocol = configObject.get("dmaap.protocol").getAsString();
144         final String dmaapContentType  = configObject.get("dmaap.contentType").getAsString();
145         final String dmaapConsumerId  = configObject.get("dmaap.consumer.consumerId").getAsString();
146         final String dmaapConsumerGroup = configObject.get("dmaap.consumer.consumerGroup").getAsString();
147         final int dmaapMessageLimit  = configObject.get("dmaap.messageLimit").getAsInt();
148         final int dmaapTimeoutMs  = configObject.get("dmaap.timeoutMs").getAsInt();
149
150         final String aaiHost = configObject.get("aai.host").getAsString();
151         final int aaiPort = configObject.get("aai.port").getAsInt();
152         final String aaiProtocol = configObject.get("aai.protocol").getAsString();
153         final String aaiUsername = configObject.get("aai.username").getAsString();
154         final String aaiPassword = configObject.get("aai.password").getAsString();
155         final boolean aaiIgnoreSslCertificateErrors =
156                 configObject.get("aai.aaiIgnoreSslCertificateErrors").getAsBoolean();
157
158         final int pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt();
159         final int pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt();
160         final int cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt();
161
162         final String reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString();
163         final String reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString();
164         final String cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString();
165         final String cpeAuthClControlName =
166                 configObject.get("application.cpe.authentication.clControlName").getAsString();
167
168         final String policyVersion = configObject.get("application.policyVersion").getAsString();
169         final String closeLoopTargetType = configObject.get("application.clTargetType").getAsString();
170         final String closeLoopEventStatus = configObject.get("application.clEventStatus").getAsString();
171         final String closeLoopVersion = configObject.get("application.clVersion").getAsString();
172         final String closeLoopTarget = configObject.get("application.clTarget").getAsString();
173         final String closeLoopOriginator = configObject.get("application.clOriginator").getAsString();
174
175         final String reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString();
176         final String cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString();
177         final String closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString();
178
179         final String loggingLevel = configObject.get("application.loggingLevel").getAsString();
180
181         final JsonObject streamsPublishes = configObject.getAsJsonObject("streams_publishes");
182         final JsonObject streamsSubscribes = configObject.getAsJsonObject("streams_subscribes");
183
184         return ImmutableGeneratedAppConfigObject.builder()
185                 .dmaapProtocol(dmaapProtocol)
186                 .dmaapContentType(dmaapContentType)
187                 .dmaapConsumerConsumerId(dmaapConsumerId)
188                 .dmaapConsumerConsumerGroup(dmaapConsumerGroup)
189                 .dmaapMessageLimit(dmaapMessageLimit)
190                 .dmaapTimeoutMs(dmaapTimeoutMs)
191                 .aaiHost(aaiHost)
192                 .aaiPort(aaiPort)
193                 .aaiProtocol(aaiProtocol)
194                 .aaiUsername(aaiUsername)
195                 .aaiPassword(aaiPassword)
196                 .aaiIgnoreSslCertificateErrors(aaiIgnoreSslCertificateErrors)
197                 .pipelinesPollingIntervalSec(pipelinesPollingIntervalSec)
198                 .pipelinesTimeoutSec(pipelinesTimeoutSec)
199                 .cbsPollingIntervalSec(cbsPollingIntervalSec)
200                 .reRegistrationPolicyScope(reRegPolicyScope)
201                 .reRegistrationClControlName(reRegClControlName)
202                 .policyVersion(policyVersion)
203                 .closeLoopTargetType(closeLoopTargetType)
204                 .closeLoopEventStatus(closeLoopEventStatus)
205                 .closeLoopVersion(closeLoopVersion)
206                 .closeLoopTarget(closeLoopTarget)
207                 .closeLoopOriginator(closeLoopOriginator)
208                 .cpeAuthPolicyScope(cpeAuthPolicyScope)
209                 .cpeAuthClControlName(cpeAuthClControlName)
210                 .reRegConfigKey(reRegConfigKey)
211                 .cpeAuthConfigKey(cpeAuthConfigKey)
212                 .closeLoopConfigKey(closeLoopConfigKey)
213                 .loggingLevel(loggingLevel)
214                 .streamSubscribesMap(parseStreamsObjects(streamsSubscribes))
215                 .streamPublishesMap(parseStreamsObjects(streamsPublishes))
216                 .build();
217     }
218
219     private Set<String> getMissingEnvironmentVariables() {
220         Set<String> missingVars = new HashSet<>();
221         if (System.getenv().get(CONSUL_HOST) == null) {
222             missingVars.add(CONSUL_HOST);
223         }
224         if (System.getenv().get(CONFIG_BINDING_SERVICE) == null) {
225             missingVars.add(CONFIG_BINDING_SERVICE);
226         }
227         if (System.getenv().get(HOSTNAME) == null) {
228             missingVars.add(HOSTNAME);
229         }
230         return missingVars;
231     }
232
233     private Map<String, GeneratedAppConfigObject.StreamsObject> parseStreamsObjects(
234             JsonObject jsonObject) {
235         Map<String, GeneratedAppConfigObject.StreamsObject> streams = new HashMap<>();
236
237         jsonObject.entrySet().stream()
238                 .map(this::parseStreamsSingleObject)
239                 .forEach(e -> streams.put(e.getKey(), e.getValue()));
240
241         return streams;
242     }
243
244     private Map.Entry<String, GeneratedAppConfigObject.StreamsObject> parseStreamsSingleObject(
245             Map.Entry<String, JsonElement> jsonEntry) {
246
247         JsonObject closeLoopOutput = (JsonObject) jsonEntry.getValue();
248
249         String type = closeLoopOutput.get("type").getAsString();
250         String aafUsername = closeLoopOutput.get("aaf_username") != null
251                 ? closeLoopOutput.get("aaf_username").getAsString() : "";
252         String aafPassword = closeLoopOutput.get("aaf_password") != null
253                 ? closeLoopOutput.get("aaf_password").getAsString() : "";
254
255         JsonObject dmaapInfo = closeLoopOutput.getAsJsonObject("dmaap_info");
256         String clientId = dmaapInfo.get("client_id") != null
257                 ? dmaapInfo.get("client_id").getAsString() : "";
258         String clientRole = dmaapInfo.get("client_role") != null
259                 ? dmaapInfo.get("client_role").getAsString() : "";
260         String location = dmaapInfo.get("location") != null
261                 ? dmaapInfo.get("location").getAsString() : "";
262         String topicUrl = dmaapInfo.get("topic_url").getAsString();
263
264         GeneratedAppConfigObject.DmaapInfo dmaapInfoObject = ImmutableDmaapInfo.builder()
265                 .clientId(clientId)
266                 .clientRole(clientRole)
267                 .location(location)
268                 .topicUrl(topicUrl)
269                 .build();
270         GeneratedAppConfigObject.StreamsObject streamsObject = ImmutableStreamsObject.builder()
271                 .type(type)
272                 .aafUsername(aafUsername)
273                 .aafPassword(aafPassword)
274                 .dmaapInfo(dmaapInfoObject)
275                 .build();
276
277         return new AbstractMap.SimpleEntry<>(jsonEntry.getKey(), streamsObject);
278     }
279 }