Update CBS client to fetch config periodically
[dcaegen2/services/son-handler.git] / src / main / java / org / onap / dcaegen2 / services / sonhms / controller / ConfigFetchFromCbs.java
index c2e7b63..afa26d8 100644 (file)
@@ -27,6 +27,7 @@ import com.google.gson.JsonObject;
 import com.google.gson.reflect.TypeToken;
 
 import java.lang.reflect.Type;
+import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 
@@ -40,14 +41,26 @@ import org.onap.dcaegen2.services.sonhms.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ConfigFetchFromCbs {
+import reactor.core.Disposable;
+
+public class ConfigFetchFromCbs implements Runnable {
 
     private static Logger log = LoggerFactory.getLogger(ConfigFetchFromCbs.class);
 
+    private Duration interval;
+
+    public ConfigFetchFromCbs() {
+
+    }
+
+    public ConfigFetchFromCbs(Duration interval) {
+        this.interval = interval;
+    }
+
     /**
      * Gets app config from CBS.
      */
-    public void getAppConfig() {
+    private Disposable getAppConfig() {
 
         // Generate RequestID and InvocationID which will be used when logging and in
         // HTTP requests
@@ -58,23 +71,36 @@ public class ConfigFetchFromCbs {
         log.debug("environments {}", env);
         ConfigPolicy configPolicy = ConfigPolicy.getInstance();
 
+        // Polling properties
+        final Duration initialDelay = Duration.ofSeconds(5);
+        final Duration period = interval;
+
         // Create the client and use it to get the configuration
         final CbsRequest request = CbsRequests.getAll(diagnosticContext);
-        CbsClientFactory.createCbsClient(env).flatMap(cbsClient -> cbsClient.get(request)).subscribe(jsonObject -> {
-            log.info("configuration and policy from CBS {}", jsonObject);
-            JsonObject config = jsonObject.getAsJsonObject("config");
-
-            updateConfigurationFromJsonObject(config);
-
-            Type mapType = new TypeToken<Map<String, Object>>() {
-            }.getType();
-            JsonObject policyJson = jsonObject.getAsJsonObject("policies").getAsJsonArray("items").get(0)
-                    .getAsJsonObject().getAsJsonObject("config");
-            Map<String, Object> policy = new Gson().fromJson(policyJson, mapType);
-            configPolicy.setConfig(policy);
-            log.info("Config policy {}", configPolicy);
-        }, throwable -> log.warn("Ooops", throwable));
-
+        return CbsClientFactory.createCbsClient(env)
+                .flatMapMany(cbsClient -> cbsClient.updates(request, initialDelay, period)).subscribe(jsonObject -> {
+                    log.info("configuration and policy from CBS {}", jsonObject);
+                    JsonObject config = jsonObject.getAsJsonObject("config");
+                    Duration newPeriod = Duration.ofSeconds(config.get("cbsPollingInterval").getAsInt());
+                    if (!newPeriod.equals(period)) {
+                        interval = newPeriod;
+                        synchronized (this) {
+                            this.notifyAll();
+                        }
+
+                    }
+                    updateConfigurationFromJsonObject(config);
+
+                    Type mapType = new TypeToken<Map<String, Object>>() {
+                    }.getType();
+                    if (jsonObject.getAsJsonObject("policies") != null) {
+                        JsonObject policyJson = jsonObject.getAsJsonObject("policies").getAsJsonArray("items").get(0)
+                                .getAsJsonObject().getAsJsonObject("config");
+                        Map<String, Object> policy = new Gson().fromJson(policyJson, mapType);
+                        configPolicy.setConfig(policy);
+                        log.info("Config policy {}", configPolicy);
+                    }
+                }, throwable -> log.warn("Ooops", throwable));
     }
 
     private void updateConfigurationFromJsonObject(JsonObject jsonObject) {
@@ -160,9 +186,26 @@ public class ConfigFetchFromCbs {
         configuration.setOofTriggerCountTimer(oofTriggerCountTimer);
         configuration.setOofTriggerCountThreshold(oofTriggerCountThreshold);
         configuration.setPolicyRespTimer(policyRespTimer);
-        
-        log.info("configuration from CBS {}", configuration.toString());
 
+        log.info("configuration from CBS {}", configuration);
+
+    }
+
+    @Override
+    public void run() {
+        Boolean done = false;
+        while (!done) {
+            try {
+                Disposable disp = getAppConfig();
+                synchronized (this) {
+                    this.wait();
+                }
+                log.info("Polling interval changed");
+                disp.dispose();
+            } catch (Exception e) {
+                done = true;
+            }
+        }
     }
 
 }