Update CBS client to fetch config periodically 45/93345/2
authorkrishna <krishna.moorthy6@wipro.com>
Mon, 12 Aug 2019 10:54:53 +0000 (16:24 +0530)
committerkrishna <krishna.moorthy6@wipro.com>
Mon, 19 Aug 2019 05:59:52 +0000 (11:29 +0530)
Change-Id: I29c5880072631dcd4b6425452316724fcfd7bd13
Issue-ID: DCAEGEN2-1642
Signed-off-by: krishna <krishna.moorthy6@wipro.com>
.gitignore
dpo/blueprints/k8s-sonhms-inputs.yaml
pom.xml
src/main/java/org/onap/dcaegen2/services/sonhms/Application.java
src/main/java/org/onap/dcaegen2/services/sonhms/controller/ConfigFetchFromCbs.java

index 26bb426..14ef475 100644 (file)
@@ -5,3 +5,4 @@ target/
 .classpath
 *.jar
 /bin/
+.checkstyle
index 5659e07..0efdc1f 100644 (file)
@@ -18,7 +18,7 @@
  
 pgaas_cluster_name: dcae-pg-primary.onap
 database_name: sonhms
-tag_version: nexus3.onap.org:10001/onap/org.onap.dcaegen2.services.son-handler:1.1.0
+tag_version: nexus3.onap.org:10001/onap/org.onap.dcaegen2.services.son-handler:1.1.1
 replicas: 1
 aaf_username: 
 aaf_password: 
diff --git a/pom.xml b/pom.xml
index 14f8dff..ef1c771 100644 (file)
--- a/pom.xml
+++ b/pom.xml
                </dependency>
 
                <dependency>
-                       <groupId>org.functionaljava</groupId>
-                       <artifactId>functionaljava</artifactId>
-                       <version>3.0</version>
-               </dependency>
+                   <groupId>org.functionaljava</groupId>
+                   <artifactId>functionaljava</artifactId>
+                   <version>3.0</version>
+            </dependency>
+           <dependency>
+                   <groupId>org.springframework</groupId>
+                   <artifactId>spring-core</artifactId>
+                   <version>5.0.9.RELEASE</version>
+           </dependency>
+           <dependency>
+                   <groupId>org.springframework</groupId>
+                   <artifactId>spring-expression</artifactId>
+                   <version>5.0.9.RELEASE</version>
+           </dependency>
+           <dependency>
+                   <groupId>org.springframework</groupId>
+                   <artifactId>spring-web</artifactId>
+                   <version>5.0.9.RELEASE</version>
+           </dependency>
+           <dependency>
+                   <groupId>org.springframework</groupId>
+                   <artifactId>spring-webmvc</artifactId>
+                   <version>5.0.9.RELEASE</version>
+           </dependency>
+           <dependency>
+                   <groupId>org.apache.tomcat.embed</groupId>
+                   <artifactId>tomcat-embed-core</artifactId>
+                   <version>9.0.14</version>
+           </dependency>
+           <dependency>
+                   <groupId>org.apache.httpcomponents</groupId>
+                   <artifactId>httpclient</artifactId>
+               <version>4.5.7</version>
+           </dependency>
 
 
        </dependencies>
index 22f458a..9919bed 100644 (file)
@@ -21,6 +21,8 @@
 
 package org.onap.dcaegen2.services.sonhms;
 
+import java.time.Duration;
+
 import javax.sql.DataSource;
 
 import org.onap.dcaegen2.services.sonhms.controller.ConfigFetchFromCbs;
@@ -41,8 +43,9 @@ public class Application {
      */
     public static void main(String[] args) {
 
-        ConfigFetchFromCbs configFetchFromCbs = new ConfigFetchFromCbs();
-        configFetchFromCbs.getAppConfig();
+        ConfigFetchFromCbs configFetchFromCbs = new ConfigFetchFromCbs(Duration.ofSeconds(60));
+        Thread configFetchThread = new Thread(configFetchFromCbs);
+        configFetchThread.start();
         try {
             Thread.sleep(10000);
         } catch (InterruptedException e) {
index c2e7b63..afa26d8 100644 (file)
@@ -27,6 +27,7 @@ import com.google.gson.JsonObject;
 import com.google.gson.reflect.TypeToken;
 
 import java.lang.reflect.Type;
+import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 
@@ -40,14 +41,26 @@ import org.onap.dcaegen2.services.sonhms.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ConfigFetchFromCbs {
+import reactor.core.Disposable;
+
+public class ConfigFetchFromCbs implements Runnable {
 
     private static Logger log = LoggerFactory.getLogger(ConfigFetchFromCbs.class);
 
+    private Duration interval;
+
+    public ConfigFetchFromCbs() {
+
+    }
+
+    public ConfigFetchFromCbs(Duration interval) {
+        this.interval = interval;
+    }
+
     /**
      * Gets app config from CBS.
      */
-    public void getAppConfig() {
+    private Disposable getAppConfig() {
 
         // Generate RequestID and InvocationID which will be used when logging and in
         // HTTP requests
@@ -58,23 +71,36 @@ public class ConfigFetchFromCbs {
         log.debug("environments {}", env);
         ConfigPolicy configPolicy = ConfigPolicy.getInstance();
 
+        // Polling properties
+        final Duration initialDelay = Duration.ofSeconds(5);
+        final Duration period = interval;
+
         // Create the client and use it to get the configuration
         final CbsRequest request = CbsRequests.getAll(diagnosticContext);
-        CbsClientFactory.createCbsClient(env).flatMap(cbsClient -> cbsClient.get(request)).subscribe(jsonObject -> {
-            log.info("configuration and policy from CBS {}", jsonObject);
-            JsonObject config = jsonObject.getAsJsonObject("config");
-
-            updateConfigurationFromJsonObject(config);
-
-            Type mapType = new TypeToken<Map<String, Object>>() {
-            }.getType();
-            JsonObject policyJson = jsonObject.getAsJsonObject("policies").getAsJsonArray("items").get(0)
-                    .getAsJsonObject().getAsJsonObject("config");
-            Map<String, Object> policy = new Gson().fromJson(policyJson, mapType);
-            configPolicy.setConfig(policy);
-            log.info("Config policy {}", configPolicy);
-        }, throwable -> log.warn("Ooops", throwable));
-
+        return CbsClientFactory.createCbsClient(env)
+                .flatMapMany(cbsClient -> cbsClient.updates(request, initialDelay, period)).subscribe(jsonObject -> {
+                    log.info("configuration and policy from CBS {}", jsonObject);
+                    JsonObject config = jsonObject.getAsJsonObject("config");
+                    Duration newPeriod = Duration.ofSeconds(config.get("cbsPollingInterval").getAsInt());
+                    if (!newPeriod.equals(period)) {
+                        interval = newPeriod;
+                        synchronized (this) {
+                            this.notifyAll();
+                        }
+
+                    }
+                    updateConfigurationFromJsonObject(config);
+
+                    Type mapType = new TypeToken<Map<String, Object>>() {
+                    }.getType();
+                    if (jsonObject.getAsJsonObject("policies") != null) {
+                        JsonObject policyJson = jsonObject.getAsJsonObject("policies").getAsJsonArray("items").get(0)
+                                .getAsJsonObject().getAsJsonObject("config");
+                        Map<String, Object> policy = new Gson().fromJson(policyJson, mapType);
+                        configPolicy.setConfig(policy);
+                        log.info("Config policy {}", configPolicy);
+                    }
+                }, throwable -> log.warn("Ooops", throwable));
     }
 
     private void updateConfigurationFromJsonObject(JsonObject jsonObject) {
@@ -160,9 +186,26 @@ public class ConfigFetchFromCbs {
         configuration.setOofTriggerCountTimer(oofTriggerCountTimer);
         configuration.setOofTriggerCountThreshold(oofTriggerCountThreshold);
         configuration.setPolicyRespTimer(policyRespTimer);
-        
-        log.info("configuration from CBS {}", configuration.toString());
 
+        log.info("configuration from CBS {}", configuration);
+
+    }
+
+    @Override
+    public void run() {
+        Boolean done = false;
+        while (!done) {
+            try {
+                Disposable disp = getAppConfig();
+                synchronized (this) {
+                    this.wait();
+                }
+                log.info("Polling interval changed");
+                disp.dispose();
+            } catch (Exception e) {
+                done = true;
+            }
+        }
     }
 
 }