315fc793cff507d8917bb171aeafc5bb02288ad7
[dcaegen2/services.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
4  * ================================================================================
5  * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.bbs.event.processor.config;
22
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
27
28 import java.time.Duration;
29 import java.util.AbstractMap;
30 import java.util.HashMap;
31 import java.util.HashSet;
32 import java.util.Map;
33 import java.util.Set;
34
35 import org.jetbrains.annotations.NotNull;
36 import org.onap.bbs.event.processor.exceptions.ApplicationEnvironmentException;
37 import org.onap.bbs.event.processor.model.GeneratedAppConfigObject;
38 import org.onap.bbs.event.processor.model.ImmutableDmaapInfo;
39 import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject;
40 import org.onap.bbs.event.processor.model.ImmutableStreamsObject;
41 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
42 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
43 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46 import org.springframework.beans.factory.annotation.Autowired;
47 import org.springframework.stereotype.Component;
48
49 import reactor.core.Disposable;
50
51 @Component
52 public class ConsulConfigurationGateway {
53
54     private static final Logger LOGGER = LoggerFactory.getLogger(ConsulConfigurationGateway.class);
55
56     private static final String CONSUL_HOST = "CONSUL_HOST";
57     private static final String CONFIG_BINDING_SERVICE = "CONFIG_BINDING_SERVICE";
58     private static final String HOSTNAME = "HOSTNAME";
59
60     private final ApplicationConfiguration configuration;
61     private Gson gson;
62     private Disposable cbsFetchPipeline;
63
64     @Autowired
65     ConsulConfigurationGateway(ApplicationConfiguration configuration) {
66         this.configuration = configuration;
67         gson = new GsonBuilder().setPrettyPrinting().create();
68     }
69
70     /**
71      * Periodically fetch application configuration via CBS service of DCAE.
72      * @param initialDelay initial delay before initiation of polling
73      * @param period polling interval
74      */
75     public void periodicallyFetchConfigFromCbs(Duration initialDelay, Duration period) {
76         if (environmentNotReady()) {
77             throw new ApplicationEnvironmentException(
78                     String.format("Application Environment missing critical parameters: %s",
79                             getMissingEnvironmentVariables()));
80         }
81
82         RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
83
84         // Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name)
85         EnvProperties env = EnvProperties.fromEnvironment();
86
87         // Create the client and use it to get the configuration
88         cbsFetchPipeline = CbsClientFactory.createCbsClient(env)
89                 .doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e))
90                 .retry(e -> true)
91                 .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, initialDelay, period))
92                 .subscribe(this::parseConsulRetrievedConfiguration, this::handleErrors);
93     }
94
95     boolean environmentNotReady() {
96         String consulHost = System.getenv().get(CONSUL_HOST);
97         String cbs = System.getenv().get(CONFIG_BINDING_SERVICE);
98         String hostname = System.getenv().get(HOSTNAME);
99         return consulHost == null || cbs == null || hostname == null;
100     }
101
102     /**
103      * Reschedule application configuration periodic retrieval via CBS service of DCAE.
104      * @param initialDelay initial delay before rescheduling
105      * @param period new polling interval
106      */
107     public void rescheduleCbsConfigurationRetrieval(Duration initialDelay, Duration period) {
108         if (cbsFetchPipeline != null && !cbsFetchPipeline.isDisposed()) {
109             LOGGER.info("Disposing old CBS Config fetch job");
110             cbsFetchPipeline.dispose();
111         }
112         periodicallyFetchConfigFromCbs(initialDelay, period);
113     }
114
115     private void parseConsulRetrievedConfiguration(JsonObject jsonObject) {
116
117         GeneratedAppConfigObject generatedAppConfigObject = generateAppConfigObject(jsonObject);
118         LOGGER.trace("Consul-Retrieved Application Generated Object:\n{}", generatedAppConfigObject);
119         configuration.updateCurrentConfiguration(generatedAppConfigObject);
120     }
121
122     private void handleErrors(Throwable throwable) {
123         LOGGER.error("Periodic CBS configuration polling was terminated with error: {}", throwable);
124     }
125
126     @NotNull
127     GeneratedAppConfigObject generateAppConfigObject(JsonObject configObject) {
128
129         if (LOGGER.isInfoEnabled()) {
130             String configAsString = gson.toJson(configObject);
131             LOGGER.info("Received App Config object\n{}", configAsString);
132         }
133
134         final String dmaapProtocol = configObject.get("dmaap.protocol").getAsString();
135         final String dmaapContentType  = configObject.get("dmaap.contentType").getAsString();
136         final String dmaapConsumerId  = configObject.get("dmaap.consumer.consumerId").getAsString();
137         final String dmaapConsumerGroup = configObject.get("dmaap.consumer.consumerGroup").getAsString();
138         final int dmaapMessageLimit  = configObject.get("dmaap.messageLimit").getAsInt();
139         final int dmaapTimeoutMs  = configObject.get("dmaap.timeoutMs").getAsInt();
140
141         final String aaiHost = configObject.get("aai.host").getAsString();
142         final int aaiPort = configObject.get("aai.port").getAsInt();
143         final String aaiProtocol = configObject.get("aai.protocol").getAsString();
144         final String aaiUsername = configObject.get("aai.username").getAsString();
145         final String aaiPassword = configObject.get("aai.password").getAsString();
146         final boolean aaiIgnoreSslCertificateErrors =
147                 configObject.get("aai.aaiIgnoreSslCertificateErrors").getAsBoolean();
148
149         final int pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt();
150         final int pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt();
151         final int cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt();
152         final String reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString();
153         final String reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString();
154         final String cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString();
155         final String cpeAuthClControlName =
156                 configObject.get("application.cpe.authentication.clControlName").getAsString();
157         final String reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString();
158         final String cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString();
159         final String closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString();
160
161         final JsonObject streamsPublishes = configObject.getAsJsonObject("streams_publishes");
162         final JsonObject streamsSubscribes = configObject.getAsJsonObject("streams_subscribes");
163
164         return ImmutableGeneratedAppConfigObject.builder()
165                 .dmaapProtocol(dmaapProtocol)
166                 .dmaapContentType(dmaapContentType)
167                 .dmaapConsumerConsumerId(dmaapConsumerId)
168                 .dmaapConsumerConsumerGroup(dmaapConsumerGroup)
169                 .dmaapMessageLimit(dmaapMessageLimit)
170                 .dmaapTimeoutMs(dmaapTimeoutMs)
171                 .aaiHost(aaiHost)
172                 .aaiPort(aaiPort)
173                 .aaiProtocol(aaiProtocol)
174                 .aaiUsername(aaiUsername)
175                 .aaiPassword(aaiPassword)
176                 .aaiIgnoreSslCertificateErrors(aaiIgnoreSslCertificateErrors)
177                 .pipelinesPollingIntervalSec(pipelinesPollingIntervalSec)
178                 .pipelinesTimeoutSec(pipelinesTimeoutSec)
179                 .cbsPollingIntervalSec(cbsPollingIntervalSec)
180                 .reRegistrationPolicyScope(reRegPolicyScope)
181                 .reRegistrationClControlName(reRegClControlName)
182                 .cpeAuthPolicyScope(cpeAuthPolicyScope)
183                 .cpeAuthClControlName(cpeAuthClControlName)
184                 .reRegConfigKey(reRegConfigKey)
185                 .cpeAuthConfigKey(cpeAuthConfigKey)
186                 .closeLoopConfigKey(closeLoopConfigKey)
187                 .streamSubscribesMap(parseStreamsObjects(streamsSubscribes))
188                 .streamPublishesMap(parseStreamsObjects(streamsPublishes))
189                 .build();
190     }
191
192     private Set<String> getMissingEnvironmentVariables() {
193         Set<String> missingVars = new HashSet<>();
194         if (System.getenv().get(CONSUL_HOST) == null) {
195             missingVars.add(CONSUL_HOST);
196         }
197         if (System.getenv().get(CONFIG_BINDING_SERVICE) == null) {
198             missingVars.add(CONFIG_BINDING_SERVICE);
199         }
200         if (System.getenv().get(HOSTNAME) == null) {
201             missingVars.add(HOSTNAME);
202         }
203         return missingVars;
204     }
205
206     private Map<String, GeneratedAppConfigObject.StreamsObject> parseStreamsObjects(
207             JsonObject jsonObject) {
208         Map<String, GeneratedAppConfigObject.StreamsObject> streams = new HashMap<>();
209
210         jsonObject.entrySet().stream()
211                 .map(this::parseStreamsSingleObject)
212                 .forEach(e -> streams.put(e.getKey(), e.getValue()));
213
214         return streams;
215     }
216
217     private Map.Entry<String, GeneratedAppConfigObject.StreamsObject> parseStreamsSingleObject(
218             Map.Entry<String, JsonElement> jsonEntry) {
219
220         JsonObject closeLoopOutput = (JsonObject) jsonEntry.getValue();
221
222         String type = closeLoopOutput.get("type").getAsString();
223         String aafUsername = closeLoopOutput.get("aaf_username") != null
224                 ? closeLoopOutput.get("aaf_username").getAsString() : "";
225         String aafPassword = closeLoopOutput.get("aaf_password") != null
226                 ? closeLoopOutput.get("aaf_password").getAsString() : "";
227
228         JsonObject dmaapInfo = closeLoopOutput.getAsJsonObject("dmaap_info");
229         String clientId = dmaapInfo.get("client_id") != null
230                 ? dmaapInfo.get("client_id").getAsString() : "";
231         String clientRole = dmaapInfo.get("client_role") != null
232                 ? dmaapInfo.get("client_role").getAsString() : "";
233         String location = dmaapInfo.get("location") != null
234                 ? dmaapInfo.get("location").getAsString() : "";
235         String topicUrl = dmaapInfo.get("topic_url").getAsString();
236
237         GeneratedAppConfigObject.DmaapInfo dmaapInfoObject = ImmutableDmaapInfo.builder()
238                 .clientId(clientId)
239                 .clientRole(clientRole)
240                 .location(location)
241                 .topicUrl(topicUrl)
242                 .build();
243         GeneratedAppConfigObject.StreamsObject streamsObject = ImmutableStreamsObject.builder()
244                 .type(type)
245                 .aafUsername(aafUsername)
246                 .aafPassword(aafPassword)
247                 .dmaapInfo(dmaapInfoObject)
248                 .build();
249
250         return new AbstractMap.SimpleEntry<>(jsonEntry.getKey(), streamsObject);
251     }
252 }