Merge "fix oauth code"
[ccsdk/features.git] / sdnr / wt / mountpoint-registrar / provider / src / main / java / org / onap / ccsdk / features / sdnr / wt / mountpointregistrar / impl / MountpointRegistrarImpl.java
index 53454ac..deff2e3 100644 (file)
@@ -3,6 +3,7 @@
  * ONAP : ccsdk feature sdnr wt
  * =================================================================================================
  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
  * =================================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -21,9 +22,15 @@ package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
 import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
 import org.onap.ccsdk.features.sdnr.wt.common.configuration.filechange.IConfigChangedListener;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.PNFRegistrationConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.ProvisioningConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StndDefinedFaultConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StrimziKafkaConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,12 +40,13 @@ public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedLis
     private static final String APPLICATION_NAME = "mountpoint-registrar";
     private static final String CONFIGURATIONFILE = "etc/mountpoint-registrar.properties";
 
-    private Thread dmaapVESMsgConsumerMain = null;
+    private Thread sKafkaVESMsgConsumerMain = null;
 
     private GeneralConfig generalConfig;
-    private boolean dmaapEnabled = false;
-    private Map<String, Configuration> configMap = new HashMap<>();
-    private DMaaPVESMsgConsumerMain dmaapConsumerMain = null;
+    private boolean strimziEnabled = false;
+    private Map<String, MessageConfig> configMap = new HashMap<>();
+    private StrimziKafkaVESMsgConsumerMain sKafkaConsumerMain = null;
+    private StrimziKafkaConfig strimziKafkaConfig;
 
     // Blueprint 1
     public MountpointRegistrarImpl() {
@@ -53,20 +61,25 @@ public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedLis
         configFileRepresentation.registerConfigChangedListener(this);
 
         generalConfig = new GeneralConfig(configFileRepresentation);
+        strimziKafkaConfig = new StrimziKafkaConfig(configFileRepresentation);
         PNFRegistrationConfig pnfRegConfig = new PNFRegistrationConfig(configFileRepresentation);
         FaultConfig faultConfig = new FaultConfig(configFileRepresentation);
+        ProvisioningConfig provisioningConfig = new ProvisioningConfig(configFileRepresentation);
+        StndDefinedFaultConfig stndFaultConfig = new StndDefinedFaultConfig(configFileRepresentation);
 
         configMap.put("pnfRegistration", pnfRegConfig);
         configMap.put("fault", faultConfig);
-
-        dmaapEnabled = generalConfig.getEnabled();
-        if (dmaapEnabled) { // start dmaap consumer thread only if dmaapEnabled=true
-            LOG.info("DMaaP seems to be enabled, starting consumer(s)");
-            dmaapConsumerMain = new DMaaPVESMsgConsumerMain(configMap, generalConfig);
-            dmaapVESMsgConsumerMain = new Thread(dmaapConsumerMain);
-            dmaapVESMsgConsumerMain.start();
+        configMap.put("provisioning", provisioningConfig);
+        configMap.put("stndDefinedFault", stndFaultConfig);
+
+        strimziEnabled = strimziKafkaConfig.getEnabled();
+        if (strimziEnabled) { // start Kafka consumer thread only if strimziEnabled=true
+            LOG.info("Strimzi Kafka seems to be enabled, starting consumer(s)");
+            sKafkaConsumerMain = new StrimziKafkaVESMsgConsumerMain(configMap, generalConfig, strimziKafkaConfig);
+            sKafkaVESMsgConsumerMain = new Thread(sKafkaConsumerMain);
+            sKafkaVESMsgConsumerMain.start();
         } else {
-            LOG.info("DMaaP seems to be disabled, not starting any consumer(s)");
+            LOG.info("Strimzi Kafka seems to be disabled, not starting any consumer(s)");
         }
     }
 
@@ -81,22 +94,30 @@ public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedLis
 
     @Override
     public void onConfigChanged() {
-        LOG.info("Service configuration state changed. Enabled: {}", generalConfig.getEnabled());
-        boolean dmaapEnabledNewVal = generalConfig.getEnabled();
-        if (!dmaapEnabled && dmaapEnabledNewVal) { // Dmaap disabled earlier (or during bundle startup) but enabled later, start Consumer(s)
-            LOG.info("DMaaP is enabled, starting consumer(s)");
-            dmaapConsumerMain = new DMaaPVESMsgConsumerMain(configMap, generalConfig);
-            dmaapVESMsgConsumerMain = new Thread(dmaapConsumerMain);
-            dmaapVESMsgConsumerMain.start();
-        } else if (dmaapEnabled && !dmaapEnabledNewVal) { // Dmaap enabled earlier (or during bundle startup) but disabled later, stop consumer(s)
-            LOG.info("DMaaP is disabled, stopping consumer(s)");
-            List<DMaaPVESMsgConsumer> consumers = dmaapConsumerMain.getConsumers();
-            for (DMaaPVESMsgConsumer consumer : consumers) {
+        if (generalConfig == null) { // Included as NullPointerException observed once in docker logs
+            LOG.warn("onConfigChange cannot be handled. Unexpected Null for generalConfig");
+            return;
+        }
+        if (strimziKafkaConfig == null) { // Included as NullPointerException observed once in docker logs
+            LOG.warn("onConfigChange cannot be handled. Unexpected Null for strimziKafkaConfig");
+            return;
+        }
+        LOG.info("Service configuration state changed. Enabled: {}", strimziKafkaConfig.getEnabled());
+        boolean strimziEnabledNewVal = strimziKafkaConfig.getEnabled();
+        if (!strimziEnabled && strimziEnabledNewVal) { // Strimzi kafka disabled earlier (or during bundle startup) but enabled later, start Consumer(s)
+            LOG.info("Strimzi Kafka is enabled, starting consumer(s)");
+            sKafkaConsumerMain = new StrimziKafkaVESMsgConsumerMain(configMap, generalConfig, strimziKafkaConfig);
+            sKafkaVESMsgConsumerMain = new Thread(sKafkaConsumerMain);
+            sKafkaVESMsgConsumerMain.start();
+        } else if (strimziEnabled && !strimziEnabledNewVal) { // Strimzi kafka enabled earlier (or during bundle startup) but disabled later, stop consumer(s)
+            LOG.info("Strimzi Kafka is disabled, stopping consumer(s)");
+            List<StrimziKafkaVESMsgConsumer> consumers = sKafkaConsumerMain.getConsumers();
+            for (StrimziKafkaVESMsgConsumer consumer : consumers) {
                 // stop all consumers
                 consumer.stopConsumer();
             }
         }
-        dmaapEnabled = dmaapEnabledNewVal;
+        strimziEnabled = strimziEnabledNewVal;
     }
 
     @Override
@@ -119,6 +140,4 @@ public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedLis
             }
         }
     }
-
-
 }