1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020-2021 Wipro Limited.
6 * ==============================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 *******************************************************************************/
22 package org.onap.slice.analysis.ms.controller;
24 import com.google.gson.Gson;
25 import com.google.gson.JsonObject;
26 import com.google.gson.reflect.TypeToken;
28 import java.lang.reflect.Type;
29 import java.time.Duration;
32 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
33 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
34 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
35 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
36 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.ImmutableRequestDiagnosticContext;
37 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
38 import org.onap.slice.analysis.ms.models.ConfigPolicy;
39 import org.onap.slice.analysis.ms.models.Configuration;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
43 import reactor.core.Disposable;
46 * This class provides method to fetch application Configuration
49 public class ConfigFetchFromCbs implements Runnable {
51 private static Logger log = LoggerFactory.getLogger(ConfigFetchFromCbs.class);
53 private Duration interval;
55 public ConfigFetchFromCbs() {
59 public ConfigFetchFromCbs(Duration interval) {
60 this.interval = interval;
64 * Gets app config from CBS.
66 private Disposable getAppConfig() {
68 // Generate RequestID and InvocationID which will be used when logging and in
70 log.info("getAppconfig start ..");
71 RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
72 // Read necessary properties from the environment
73 final CbsClientConfiguration cbsClientConfiguration = CbsClientConfiguration.fromEnvironment();
75 log.debug("environments {}", cbsClientConfiguration);
76 ConfigPolicy configPolicy = ConfigPolicy.getInstance();
79 final Duration initialDelay = Duration.ofSeconds(5);
80 final Duration period = interval;
82 // Create the client and use it to get the configuration
83 final CbsRequest request = CbsRequests.getAll(diagnosticContext);
84 return CbsClientFactory.createCbsClient(cbsClientConfiguration)
85 .flatMapMany(cbsClient -> cbsClient.updates(request, initialDelay, period)).subscribe(jsonObject -> {
86 log.info("configuration and policy from CBS {}", jsonObject);
87 JsonObject config = jsonObject.getAsJsonObject("config");
88 Duration newPeriod = Duration.ofSeconds(config.get("cbsPollingInterval").getAsInt());
89 if (!newPeriod.equals(period)) {
96 Configuration.getInstance().updateConfigurationFromJsonObject(config);
98 Type mapType = new TypeToken<Map<String, Object>>() {
100 if (jsonObject.getAsJsonObject("policies") != null) {
101 JsonObject policyJson = jsonObject.getAsJsonObject("policies").getAsJsonArray("items").get(0)
102 .getAsJsonObject().getAsJsonObject("config");
103 Map<String, Object> policy = new Gson().fromJson(policyJson, mapType);
104 configPolicy.setConfig(policy);
105 log.info("Config policy {}", configPolicy);
107 }, throwable -> log.warn("Ooops", throwable));
114 Boolean done = false;
117 Disposable disp = getAppConfig();
118 synchronized (this) {
121 log.info("Polling interval changed");
123 } catch (Exception e) {