ebdf0474f2ec0721bf40d4d122419735f620a7fb
[dcaegen2/collectors/ves.git] / src / main / java / org / onap / dcae / configuration / ConfigurationHandler.java
1 /*
2  * ============LICENSE_START=======================================================
3  * VES Collector
4  * ================================================================================
5  * Copyright (C) 2020 Nokia. All rights reserved.s
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.configuration;
21
22 import com.google.gson.JsonObject;
23 import io.vavr.control.Option;
24 import org.json.JSONObject;
25 import org.onap.dcae.configuration.cbs.CbsClientConfigurationProvider;
26 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
27 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
28 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
29 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
30 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
31 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 import reactor.core.Disposable;
35 import reactor.core.publisher.Mono;
36
37 import java.time.Duration;
38
39 /**
40  * ConfigurationHandler is responsible for receiving configuration updates from Consul.
41  * Any change made in the Consul will be reported as a notification.
42  */
43 public class ConfigurationHandler {
44
45     private static Logger log = LoggerFactory.getLogger(ConfigurationHandler.class);
46     private static final String CONFIG_DICT = "config";
47
48     private final CbsClientConfigurationProvider cbsClientConfigurationProvider;
49     private final ConfigUpdater configUpdater;
50
51     /**
52      * Constructor
53      * @param cbsClientConfigurationProvider provides configuration to connect with Consul
54      * @param configUpdater for updating application configuration
55      */
56     public ConfigurationHandler(CbsClientConfigurationProvider cbsClientConfigurationProvider, ConfigUpdater configUpdater) {
57         this.cbsClientConfigurationProvider = cbsClientConfigurationProvider;
58         this.configUpdater = configUpdater;
59     }
60
61     /**
62      * Start listen for application configuration notifications with configuration changes
63      * @param interval defines period of time when notification can come
64      * @return {@link Disposable} handler to close Consul listener at the end
65      */
66     public Disposable startListen(Duration interval) {
67
68         log.info("Start listening for configuration from Consul ...");
69         log.info(String.format("Consul configuration will be fetched in %s period.", interval));
70
71         // Polling properties
72         final Duration initialDelay = Duration.ofSeconds(5);
73         final Duration period = interval;
74
75         final CbsRequest request = createCbsRequest();
76         final CbsClientConfiguration cbsClientConfiguration = cbsClientConfigurationProvider.get();
77
78         return createCbsClient(cbsClientConfiguration)
79                 .flatMapMany(cbsClient -> cbsClient.updates(request, initialDelay, period))
80                 .subscribe(
81                         this::handleConfigurationFromConsul,
82                         this::handleError
83                 );
84     }
85
86     Mono<CbsClient> createCbsClient(CbsClientConfiguration cbsClientConfiguration) {
87         return CbsClientFactory.createCbsClient(cbsClientConfiguration);
88     }
89
90     void handleConfigurationFromConsul(JsonObject jsonObject) {
91         log.info("Configuration update from Consul {}", jsonObject);
92         if(jsonObject.has(CONFIG_DICT)) {
93             JsonObject config = jsonObject.getAsJsonObject(CONFIG_DICT);
94             JSONObject jObject = new JSONObject(config.toString());
95             configUpdater.updateConfig(Option.of(jObject));
96         } else {
97             throw new IllegalArgumentException(String.format("Invalid application configuration: %s ", jsonObject));
98         }
99     }
100
101     private void handleError(Throwable throwable) {
102         log.error("Unexpected error occurred during fetching configuration from Consul", throwable);
103     }
104
105     private CbsRequest createCbsRequest() {
106         RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
107         return CbsRequests.getAll(diagnosticContext);
108     }
109 }