Update CBS client to fetch config periodically
[dcaegen2/services/son-handler.git] / src / main / java / org / onap / dcaegen2 / services / sonhms / controller / ConfigFetchFromCbs.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  son-handler
4  *  ================================================================================
5  *   Copyright (C) 2019 Wipro Limited.
6  *   ==============================================================================
7  *     Licensed under the Apache License, Version 2.0 (the "License");
8  *     you may not use this file except in compliance with the License.
9  *     You may obtain a copy of the License at
10  *  
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *  
13  *     Unless required by applicable law or agreed to in writing, software
14  *     distributed under the License is distributed on an "AS IS" BASIS,
15  *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *     See the License for the specific language governing permissions and
17  *     limitations under the License.
18  *     ============LICENSE_END=========================================================
19  *  
20  *******************************************************************************/
21
22 package org.onap.dcaegen2.services.sonhms.controller;
23
24 import com.google.gson.Gson;
25 import com.google.gson.JsonArray;
26 import com.google.gson.JsonObject;
27 import com.google.gson.reflect.TypeToken;
28
29 import java.lang.reflect.Type;
30 import java.time.Duration;
31 import java.util.List;
32 import java.util.Map;
33
34 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
35 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
36 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
37 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
38 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
39 import org.onap.dcaegen2.services.sonhms.ConfigPolicy;
40 import org.onap.dcaegen2.services.sonhms.Configuration;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 import reactor.core.Disposable;
45
46 public class ConfigFetchFromCbs implements Runnable {
47
48     private static Logger log = LoggerFactory.getLogger(ConfigFetchFromCbs.class);
49
50     private Duration interval;
51
52     public ConfigFetchFromCbs() {
53
54     }
55
56     public ConfigFetchFromCbs(Duration interval) {
57         this.interval = interval;
58     }
59
60     /**
61      * Gets app config from CBS.
62      */
63     private Disposable getAppConfig() {
64
65         // Generate RequestID and InvocationID which will be used when logging and in
66         // HTTP requests
67         log.info("getAppconfig start ..");
68         RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
69         // Read necessary properties from the environment
70         final EnvProperties env = EnvProperties.fromEnvironment();
71         log.debug("environments {}", env);
72         ConfigPolicy configPolicy = ConfigPolicy.getInstance();
73
74         // Polling properties
75         final Duration initialDelay = Duration.ofSeconds(5);
76         final Duration period = interval;
77
78         // Create the client and use it to get the configuration
79         final CbsRequest request = CbsRequests.getAll(diagnosticContext);
80         return CbsClientFactory.createCbsClient(env)
81                 .flatMapMany(cbsClient -> cbsClient.updates(request, initialDelay, period)).subscribe(jsonObject -> {
82                     log.info("configuration and policy from CBS {}", jsonObject);
83                     JsonObject config = jsonObject.getAsJsonObject("config");
84                     Duration newPeriod = Duration.ofSeconds(config.get("cbsPollingInterval").getAsInt());
85                     if (!newPeriod.equals(period)) {
86                         interval = newPeriod;
87                         synchronized (this) {
88                             this.notifyAll();
89                         }
90
91                     }
92                     updateConfigurationFromJsonObject(config);
93
94                     Type mapType = new TypeToken<Map<String, Object>>() {
95                     }.getType();
96                     if (jsonObject.getAsJsonObject("policies") != null) {
97                         JsonObject policyJson = jsonObject.getAsJsonObject("policies").getAsJsonArray("items").get(0)
98                                 .getAsJsonObject().getAsJsonObject("config");
99                         Map<String, Object> policy = new Gson().fromJson(policyJson, mapType);
100                         configPolicy.setConfig(policy);
101                         log.info("Config policy {}", configPolicy);
102                     }
103                 }, throwable -> log.warn("Ooops", throwable));
104     }
105
106     private void updateConfigurationFromJsonObject(JsonObject jsonObject) {
107
108         log.info("Updating configuration from CBS");
109         Configuration configuration = Configuration.getInstance();
110
111         Type mapType = new TypeToken<Map<String, Object>>() {
112         }.getType();
113
114         JsonObject subscribes = jsonObject.getAsJsonObject("streams_subscribes");
115         Map<String, Object> streamsSubscribes = new Gson().fromJson(subscribes, mapType);
116
117         JsonObject publishes = jsonObject.getAsJsonObject("streams_publishes");
118         Map<String, Object> streamsPublishes = new Gson().fromJson(publishes, mapType);
119
120         int pgPort = jsonObject.get("postgres.port").getAsInt();
121         int pollingInterval = jsonObject.get("sonhandler.pollingInterval").getAsInt();
122         String pgPassword = jsonObject.get("postgres.password").getAsString();
123         int numSolutions = jsonObject.get("sonhandler.numSolutions").getAsInt();
124         int minConfusion = jsonObject.get("sonhandler.minConfusion").getAsInt();
125         int maximumClusters = jsonObject.get("sonhandler.maximumClusters").getAsInt();
126         int minCollision = jsonObject.get("sonhandler.minCollision").getAsInt();
127         String sourceId = jsonObject.get("sonhandler.sourceId").getAsString();
128         String pgUsername = jsonObject.get("postgres.username").getAsString();
129         String pgHost = jsonObject.get("postgres.host").getAsString();
130
131         JsonArray servers = jsonObject.getAsJsonArray("sonhandler.dmaap.server");
132         Type listType = new TypeToken<List<String>>() {
133         }.getType();
134         List<String> dmaapServers = new Gson().fromJson(servers, listType);
135
136         String cg = jsonObject.get("sonhandler.cg").getAsString();
137         int bufferTime = jsonObject.get("sonhandler.bufferTime").getAsInt();
138         String cid = jsonObject.get("sonhandler.cid").getAsString();
139         String configDbService = jsonObject.get("sonhandler.configDb.service").getAsString();
140         String namespace = jsonObject.get("sonhandler.namespace").getAsString();
141         String callbackUrl = "http://" + System.getenv("HOSTNAME") + "." + namespace + ":8080/callbackUrl";
142
143         String pciOptimizer = jsonObject.get("sonhandler.pciOptimizer").getAsString();
144         String pciAnrOptimizer = jsonObject.get("sonhandler.pciAnrOptimizer").getAsString();
145
146         String oofService = jsonObject.get("sonhandler.oof.service").getAsString();
147         String oofEndpoint = jsonObject.get("sonhandler.oof.endpoint").getAsString();
148         int pollingTimeout = jsonObject.get("sonhandler.pollingTimeout").getAsInt();
149
150         int badThreshold = jsonObject.get("sonhandler.badThreshold").getAsInt();
151         int poorThreshold = jsonObject.get("sonhandler.poorThreshold").getAsInt();
152
153         int poorCountThreshold = jsonObject.get("sonhandler.poorCountThreshold").getAsInt();
154         int badCountThreshold = jsonObject.get("sonhandler.badCountThreshold").getAsInt();
155         int oofTriggerCountTimer = jsonObject.get("sonhandler.oofTriggerCountTimer").getAsInt();
156         int oofTriggerCountThreshold = jsonObject.get("sonhandler.oofTriggerCountThreshold").getAsInt();
157         int policyRespTimer = jsonObject.get("sonhandler.policyRespTimer").getAsInt();
158
159         configuration.setStreamsSubscribes(streamsSubscribes);
160         configuration.setStreamsPublishes(streamsPublishes);
161         configuration.setPgPassword(pgPassword);
162         configuration.setPgPort(pgPort);
163         configuration.setPollingInterval(pollingInterval);
164         configuration.setNumSolutions(numSolutions);
165         configuration.setMinCollision(minCollision);
166         configuration.setMinConfusion(minConfusion);
167         configuration.setMaximumClusters(maximumClusters);
168         configuration.setPgHost(pgHost);
169         configuration.setPgUsername(pgUsername);
170         configuration.setSourceId(sourceId);
171         configuration.setDmaapServers(dmaapServers);
172         configuration.setCg(cg);
173         configuration.setCid(cid);
174         configuration.setBufferTime(bufferTime);
175         configuration.setConfigDbService(configDbService);
176         configuration.setCallbackUrl(callbackUrl);
177         configuration.setPciOptimizer(pciOptimizer);
178         configuration.setPciAnrOptimizer(pciAnrOptimizer);
179         configuration.setOofService(oofService);
180         configuration.setOofEndpoint(oofEndpoint);
181         configuration.setPollingTimeout(pollingTimeout);
182         configuration.setBadThreshold(badThreshold);
183         configuration.setPoorThreshold(poorThreshold);
184         configuration.setPoorCountThreshold(poorCountThreshold);
185         configuration.setBadCountThreshold(badCountThreshold);
186         configuration.setOofTriggerCountTimer(oofTriggerCountTimer);
187         configuration.setOofTriggerCountThreshold(oofTriggerCountThreshold);
188         configuration.setPolicyRespTimer(policyRespTimer);
189
190         log.info("configuration from CBS {}", configuration);
191
192     }
193
194     @Override
195     public void run() {
196         Boolean done = false;
197         while (!done) {
198             try {
199                 Disposable disp = getAppConfig();
200                 synchronized (this) {
201                     this.wait();
202                 }
203                 log.info("Polling interval changed");
204                 disp.dispose();
205             } catch (Exception e) {
206                 done = true;
207             }
208         }
209     }
210
211 }