6530f0b2b53b1d821166e99b0884519719beaf6b
[dcaegen2/services.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
4  * ================================================================================
5  * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.bbs.event.processor.config;
22
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
27
28 import java.time.Duration;
29 import java.util.AbstractMap;
30 import java.util.HashMap;
31 import java.util.HashSet;
32 import java.util.Map;
33 import java.util.Set;
34
35 import org.jetbrains.annotations.NotNull;
36 import org.onap.bbs.event.processor.exceptions.ApplicationEnvironmentException;
37 import org.onap.bbs.event.processor.model.GeneratedAppConfigObject;
38 import org.onap.bbs.event.processor.model.ImmutableDmaapInfo;
39 import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject;
40 import org.onap.bbs.event.processor.model.ImmutableStreamsObject;
41 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
42 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
43 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46 import org.springframework.beans.factory.annotation.Autowired;
47 import org.springframework.stereotype.Component;
48
49 import reactor.core.Disposable;
50
51 @Component
52 public class ConsulConfigurationGateway {
53
54     private static final Logger LOGGER = LoggerFactory.getLogger(ConsulConfigurationGateway.class);
55
56     private static final String CONSUL_HOST = "CONSUL_HOST";
57     private static final String CONFIG_BINDING_SERVICE = "CONFIG_BINDING_SERVICE";
58     private static final String HOSTNAME = "HOSTNAME";
59
60     private final ApplicationConfiguration configuration;
61     private Gson gson;
62     private Disposable cbsFetchPipeline;
63
64     @Autowired
65     ConsulConfigurationGateway(ApplicationConfiguration configuration) {
66         this.configuration = configuration;
67         gson = new GsonBuilder().setPrettyPrinting().create();
68     }
69
70     /**
71      * Periodically fetch application configuration via CBS service of DCAE.
72      * @param initialDelay initial delay before initiation of polling
73      * @param period polling interval
74      */
75     public void periodicallyFetchConfigFromCbs(Duration initialDelay, Duration period) {
76         if (environmentNotReady()) {
77             throw new ApplicationEnvironmentException(
78                     String.format("Application Environment missing critical parameters: %s",
79                             getMissingEnvironmentVariables()));
80         }
81
82         fetchConfig(initialDelay, period);
83     }
84
85     boolean environmentNotReady() {
86         String consulHost = System.getenv().get(CONSUL_HOST);
87         String cbs = System.getenv().get(CONFIG_BINDING_SERVICE);
88         String hostname = System.getenv().get(HOSTNAME);
89         return consulHost == null || cbs == null || hostname == null;
90     }
91
92     /**
93      * Reschedule application configuration periodic retrieval via CBS service of DCAE.
94      * @param initialDelay initial delay before rescheduling
95      * @param period new polling interval
96      */
97     public void rescheduleCbsConfigurationRetrieval(Duration initialDelay, Duration period) {
98         if (cbsFetchPipeline != null && !cbsFetchPipeline.isDisposed()) {
99             LOGGER.info("Disposing old CBS Config fetch job");
100             cbsFetchPipeline.dispose();
101         }
102         periodicallyFetchConfigFromCbs(initialDelay, period);
103     }
104
105     private void parseConsulRetrievedConfiguration(JsonObject jsonObject) {
106
107         GeneratedAppConfigObject generatedAppConfigObject = generateAppConfigObject(jsonObject);
108         LOGGER.trace("Consul-Retrieved Application Generated Object:\n{}", generatedAppConfigObject);
109         configuration.updateCurrentConfiguration(generatedAppConfigObject);
110     }
111
112     private void handleErrors(Throwable throwable) {
113         LOGGER.error("Periodic CBS configuration polling was terminated with error: {}", throwable);
114         LOGGER.info("Will restart CBS configuration fetching job due to abnormal termination."
115                 + " Will start fetching after 60 seconds (please correct configuration in the meantime)"
116                 + " and it will then poll every {} seconds (reverting to default)",
117                 configuration.getCbsPollingInterval());
118         fetchConfig(Duration.ofSeconds(60), Duration.ofSeconds(configuration.getCbsPollingInterval()));
119     }
120
121     private void fetchConfig(Duration initialDelay, Duration period) {
122         RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
123
124         // Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name)
125         EnvProperties env = EnvProperties.fromEnvironment();
126
127         // Create the client and use it to get the configuration
128         cbsFetchPipeline = CbsClientFactory.createCbsClient(env)
129                 .doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e))
130                 .retry(e -> true)
131                 .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, initialDelay, period))
132                 .subscribe(this::parseConsulRetrievedConfiguration, this::handleErrors);
133     }
134
135     @NotNull
136     GeneratedAppConfigObject generateAppConfigObject(JsonObject configObject) {
137
138         if (LOGGER.isInfoEnabled()) {
139             String configAsString = gson.toJson(configObject);
140             LOGGER.info("Received App Config object\n{}", configAsString);
141         }
142
143         final String dmaapProtocol = configObject.get("dmaap.protocol").getAsString();
144         final String dmaapContentType  = configObject.get("dmaap.contentType").getAsString();
145         final String dmaapConsumerId  = configObject.get("dmaap.consumer.consumerId").getAsString();
146         final String dmaapConsumerGroup = configObject.get("dmaap.consumer.consumerGroup").getAsString();
147         final int dmaapMessageLimit  = configObject.get("dmaap.messageLimit").getAsInt();
148         final int dmaapTimeoutMs  = configObject.get("dmaap.timeoutMs").getAsInt();
149
150         final String aaiHost = configObject.get("aai.host").getAsString();
151         final int aaiPort = configObject.get("aai.port").getAsInt();
152         final String aaiProtocol = configObject.get("aai.protocol").getAsString();
153         final String aaiUsername = configObject.get("aai.username").getAsString();
154         final String aaiPassword = configObject.get("aai.password").getAsString();
155         final boolean aaiIgnoreSslCertificateErrors =
156                 configObject.get("aai.aaiIgnoreSslCertificateErrors").getAsBoolean();
157
158         final int pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt();
159         final int pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt();
160         final int cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt();
161
162         final String reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString();
163         final String reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString();
164         final String cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString();
165         final String cpeAuthClControlName =
166                 configObject.get("application.cpe.authentication.clControlName").getAsString();
167
168         final String policyVersion = configObject.get("application.policyVersion").getAsString();
169         final String closeLoopTargetType = configObject.get("application.clTargetType").getAsString();
170         final String closeLoopEventStatus = configObject.get("application.clEventStatus").getAsString();
171         final String closeLoopVersion = configObject.get("application.clVersion").getAsString();
172         final String closeLoopTarget = configObject.get("application.clTarget").getAsString();
173         final String closeLoopOriginator = configObject.get("application.clOriginator").getAsString();
174
175         final String reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString();
176         final String cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString();
177         final String closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString();
178
179         final JsonObject streamsPublishes = configObject.getAsJsonObject("streams_publishes");
180         final JsonObject streamsSubscribes = configObject.getAsJsonObject("streams_subscribes");
181
182         return ImmutableGeneratedAppConfigObject.builder()
183                 .dmaapProtocol(dmaapProtocol)
184                 .dmaapContentType(dmaapContentType)
185                 .dmaapConsumerConsumerId(dmaapConsumerId)
186                 .dmaapConsumerConsumerGroup(dmaapConsumerGroup)
187                 .dmaapMessageLimit(dmaapMessageLimit)
188                 .dmaapTimeoutMs(dmaapTimeoutMs)
189                 .aaiHost(aaiHost)
190                 .aaiPort(aaiPort)
191                 .aaiProtocol(aaiProtocol)
192                 .aaiUsername(aaiUsername)
193                 .aaiPassword(aaiPassword)
194                 .aaiIgnoreSslCertificateErrors(aaiIgnoreSslCertificateErrors)
195                 .pipelinesPollingIntervalSec(pipelinesPollingIntervalSec)
196                 .pipelinesTimeoutSec(pipelinesTimeoutSec)
197                 .cbsPollingIntervalSec(cbsPollingIntervalSec)
198                 .reRegistrationPolicyScope(reRegPolicyScope)
199                 .reRegistrationClControlName(reRegClControlName)
200                 .policyVersion(policyVersion)
201                 .closeLoopTargetType(closeLoopTargetType)
202                 .closeLoopEventStatus(closeLoopEventStatus)
203                 .closeLoopVersion(closeLoopVersion)
204                 .closeLoopTarget(closeLoopTarget)
205                 .closeLoopOriginator(closeLoopOriginator)
206                 .cpeAuthPolicyScope(cpeAuthPolicyScope)
207                 .cpeAuthClControlName(cpeAuthClControlName)
208                 .reRegConfigKey(reRegConfigKey)
209                 .cpeAuthConfigKey(cpeAuthConfigKey)
210                 .closeLoopConfigKey(closeLoopConfigKey)
211                 .streamSubscribesMap(parseStreamsObjects(streamsSubscribes))
212                 .streamPublishesMap(parseStreamsObjects(streamsPublishes))
213                 .build();
214     }
215
216     private Set<String> getMissingEnvironmentVariables() {
217         Set<String> missingVars = new HashSet<>();
218         if (System.getenv().get(CONSUL_HOST) == null) {
219             missingVars.add(CONSUL_HOST);
220         }
221         if (System.getenv().get(CONFIG_BINDING_SERVICE) == null) {
222             missingVars.add(CONFIG_BINDING_SERVICE);
223         }
224         if (System.getenv().get(HOSTNAME) == null) {
225             missingVars.add(HOSTNAME);
226         }
227         return missingVars;
228     }
229
230     private Map<String, GeneratedAppConfigObject.StreamsObject> parseStreamsObjects(
231             JsonObject jsonObject) {
232         Map<String, GeneratedAppConfigObject.StreamsObject> streams = new HashMap<>();
233
234         jsonObject.entrySet().stream()
235                 .map(this::parseStreamsSingleObject)
236                 .forEach(e -> streams.put(e.getKey(), e.getValue()));
237
238         return streams;
239     }
240
241     private Map.Entry<String, GeneratedAppConfigObject.StreamsObject> parseStreamsSingleObject(
242             Map.Entry<String, JsonElement> jsonEntry) {
243
244         JsonObject closeLoopOutput = (JsonObject) jsonEntry.getValue();
245
246         String type = closeLoopOutput.get("type").getAsString();
247         String aafUsername = closeLoopOutput.get("aaf_username") != null
248                 ? closeLoopOutput.get("aaf_username").getAsString() : "";
249         String aafPassword = closeLoopOutput.get("aaf_password") != null
250                 ? closeLoopOutput.get("aaf_password").getAsString() : "";
251
252         JsonObject dmaapInfo = closeLoopOutput.getAsJsonObject("dmaap_info");
253         String clientId = dmaapInfo.get("client_id") != null
254                 ? dmaapInfo.get("client_id").getAsString() : "";
255         String clientRole = dmaapInfo.get("client_role") != null
256                 ? dmaapInfo.get("client_role").getAsString() : "";
257         String location = dmaapInfo.get("location") != null
258                 ? dmaapInfo.get("location").getAsString() : "";
259         String topicUrl = dmaapInfo.get("topic_url").getAsString();
260
261         GeneratedAppConfigObject.DmaapInfo dmaapInfoObject = ImmutableDmaapInfo.builder()
262                 .clientId(clientId)
263                 .clientRole(clientRole)
264                 .location(location)
265                 .topicUrl(topicUrl)
266                 .build();
267         GeneratedAppConfigObject.StreamsObject streamsObject = ImmutableStreamsObject.builder()
268                 .type(type)
269                 .aafUsername(aafUsername)
270                 .aafPassword(aafPassword)
271                 .dmaapInfo(dmaapInfoObject)
272                 .build();
273
274         return new AbstractMap.SimpleEntry<>(jsonEntry.getKey(), streamsObject);
275     }
276 }