1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019 Wipro Limited.
6 * ==============================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 *******************************************************************************/
22 package org.onap.dcaegen2.services.sonhms.controller;
24 import com.google.gson.Gson;
25 import com.google.gson.JsonArray;
26 import com.google.gson.JsonObject;
27 import com.google.gson.reflect.TypeToken;
29 import java.lang.reflect.Type;
30 import java.time.Duration;
31 import java.util.List;
34 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
35 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
36 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
37 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
38 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
39 import org.onap.dcaegen2.services.sonhms.ConfigPolicy;
40 import org.onap.dcaegen2.services.sonhms.Configuration;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
44 import reactor.core.Disposable;
46 public class ConfigFetchFromCbs implements Runnable {
48 private static Logger log = LoggerFactory.getLogger(ConfigFetchFromCbs.class);
50 private Duration interval;
52 public ConfigFetchFromCbs() {
56 public ConfigFetchFromCbs(Duration interval) {
57 this.interval = interval;
61 * Gets app config from CBS.
63 private Disposable getAppConfig() {
65 // Generate RequestID and InvocationID which will be used when logging and in
67 log.info("getAppconfig start ..");
68 RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
69 // Read necessary properties from the environment
70 final EnvProperties env = EnvProperties.fromEnvironment();
71 log.debug("environments {}", env);
72 ConfigPolicy configPolicy = ConfigPolicy.getInstance();
75 final Duration initialDelay = Duration.ofSeconds(5);
76 final Duration period = interval;
78 // Create the client and use it to get the configuration
79 final CbsRequest request = CbsRequests.getAll(diagnosticContext);
80 return CbsClientFactory.createCbsClient(env)
81 .flatMapMany(cbsClient -> cbsClient.updates(request, initialDelay, period)).subscribe(jsonObject -> {
82 log.info("configuration and policy from CBS {}", jsonObject);
83 JsonObject config = jsonObject.getAsJsonObject("config");
84 Duration newPeriod = Duration.ofSeconds(config.get("cbsPollingInterval").getAsInt());
85 if (!newPeriod.equals(period)) {
92 updateConfigurationFromJsonObject(config);
94 Type mapType = new TypeToken<Map<String, Object>>() {
96 if (jsonObject.getAsJsonObject("policies") != null) {
97 JsonObject policyJson = jsonObject.getAsJsonObject("policies").getAsJsonArray("items").get(0)
98 .getAsJsonObject().getAsJsonObject("config");
99 Map<String, Object> policy = new Gson().fromJson(policyJson, mapType);
100 configPolicy.setConfig(policy);
101 log.info("Config policy {}", configPolicy);
103 }, throwable -> log.warn("Ooops", throwable));
106 private void updateConfigurationFromJsonObject(JsonObject jsonObject) {
108 log.info("Updating configuration from CBS");
109 Configuration configuration = Configuration.getInstance();
111 Type mapType = new TypeToken<Map<String, Object>>() {
114 JsonObject subscribes = jsonObject.getAsJsonObject("streams_subscribes");
115 Map<String, Object> streamsSubscribes = new Gson().fromJson(subscribes, mapType);
117 JsonObject publishes = jsonObject.getAsJsonObject("streams_publishes");
118 Map<String, Object> streamsPublishes = new Gson().fromJson(publishes, mapType);
120 int pgPort = jsonObject.get("postgres.port").getAsInt();
121 int pollingInterval = jsonObject.get("sonhandler.pollingInterval").getAsInt();
122 String pgPassword = jsonObject.get("postgres.password").getAsString();
123 int numSolutions = jsonObject.get("sonhandler.numSolutions").getAsInt();
124 int minConfusion = jsonObject.get("sonhandler.minConfusion").getAsInt();
125 int maximumClusters = jsonObject.get("sonhandler.maximumClusters").getAsInt();
126 int minCollision = jsonObject.get("sonhandler.minCollision").getAsInt();
127 String sourceId = jsonObject.get("sonhandler.sourceId").getAsString();
128 String pgUsername = jsonObject.get("postgres.username").getAsString();
129 String pgHost = jsonObject.get("postgres.host").getAsString();
131 JsonArray servers = jsonObject.getAsJsonArray("sonhandler.dmaap.server");
132 Type listType = new TypeToken<List<String>>() {
134 List<String> dmaapServers = new Gson().fromJson(servers, listType);
136 String cg = jsonObject.get("sonhandler.cg").getAsString();
137 int bufferTime = jsonObject.get("sonhandler.bufferTime").getAsInt();
138 String cid = jsonObject.get("sonhandler.cid").getAsString();
139 String configDbService = jsonObject.get("sonhandler.configDb.service").getAsString();
140 String namespace = jsonObject.get("sonhandler.namespace").getAsString();
141 String callbackUrl = "http://" + System.getenv("HOSTNAME") + "." + namespace + ":8080/callbackUrl";
143 String pciOptimizer = jsonObject.get("sonhandler.pciOptimizer").getAsString();
144 String pciAnrOptimizer = jsonObject.get("sonhandler.pciAnrOptimizer").getAsString();
146 String oofService = jsonObject.get("sonhandler.oof.service").getAsString();
147 String oofEndpoint = jsonObject.get("sonhandler.oof.endpoint").getAsString();
148 int pollingTimeout = jsonObject.get("sonhandler.pollingTimeout").getAsInt();
150 int badThreshold = jsonObject.get("sonhandler.badThreshold").getAsInt();
151 int poorThreshold = jsonObject.get("sonhandler.poorThreshold").getAsInt();
153 int poorCountThreshold = jsonObject.get("sonhandler.poorCountThreshold").getAsInt();
154 int badCountThreshold = jsonObject.get("sonhandler.badCountThreshold").getAsInt();
155 int oofTriggerCountTimer = jsonObject.get("sonhandler.oofTriggerCountTimer").getAsInt();
156 int oofTriggerCountThreshold = jsonObject.get("sonhandler.oofTriggerCountThreshold").getAsInt();
157 int policyRespTimer = jsonObject.get("sonhandler.policyRespTimer").getAsInt();
159 configuration.setStreamsSubscribes(streamsSubscribes);
160 configuration.setStreamsPublishes(streamsPublishes);
161 configuration.setPgPassword(pgPassword);
162 configuration.setPgPort(pgPort);
163 configuration.setPollingInterval(pollingInterval);
164 configuration.setNumSolutions(numSolutions);
165 configuration.setMinCollision(minCollision);
166 configuration.setMinConfusion(minConfusion);
167 configuration.setMaximumClusters(maximumClusters);
168 configuration.setPgHost(pgHost);
169 configuration.setPgUsername(pgUsername);
170 configuration.setSourceId(sourceId);
171 configuration.setDmaapServers(dmaapServers);
172 configuration.setCg(cg);
173 configuration.setCid(cid);
174 configuration.setBufferTime(bufferTime);
175 configuration.setConfigDbService(configDbService);
176 configuration.setCallbackUrl(callbackUrl);
177 configuration.setPciOptimizer(pciOptimizer);
178 configuration.setPciAnrOptimizer(pciAnrOptimizer);
179 configuration.setOofService(oofService);
180 configuration.setOofEndpoint(oofEndpoint);
181 configuration.setPollingTimeout(pollingTimeout);
182 configuration.setBadThreshold(badThreshold);
183 configuration.setPoorThreshold(poorThreshold);
184 configuration.setPoorCountThreshold(poorCountThreshold);
185 configuration.setBadCountThreshold(badCountThreshold);
186 configuration.setOofTriggerCountTimer(oofTriggerCountTimer);
187 configuration.setOofTriggerCountThreshold(oofTriggerCountThreshold);
188 configuration.setPolicyRespTimer(policyRespTimer);
190 log.info("configuration from CBS {}", configuration);
196 Boolean done = false;
199 Disposable disp = getAppConfig();
200 synchronized (this) {
203 log.info("Polling interval changed");
205 } catch (Exception e) {