[SO] Code improvement in bpmn-infra supporting kafka change
[so.git] / bpmn / so-bpmn-infrastructure-common / src / main / java / org / onap / so / bpmn / infrastructure / pnf / kafka / PnfEventReadyKafkaClient.java
@@ -19,7 +19,7 @@
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.so.bpmn.infrastructure.pnf.dmaap;
+package org.onap.so.bpmn.infrastructure.pnf.kafka;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -36,12 +36,12 @@ import org.springframework.core.env.Environment;
 import org.springframework.stereotype.Component;
 
 @Component
-public class PnfEventReadyDmaapClient implements DmaapClient {
-    private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyDmaapClient.class);
+public class PnfEventReadyKafkaClient implements KafkaClient {
+    private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyKafkaClient.class);
     private Map<String, Runnable> pnfCorrelationIdToThreadMap;
     private int topicListenerDelayInSeconds;
     private volatile ScheduledThreadPoolExecutor executor;
-    private volatile boolean dmaapThreadListenerIsRunning;
+    private volatile boolean kafkaThreadListenerIsRunning;
     private KafkaConsumerImpl consumerForPnfReady;
     private KafkaConsumerImpl consumerForPnfUpdate;
     private String pnfReadyTopic;
@@ -53,9 +53,9 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
 
 
     @Autowired
-    public PnfEventReadyDmaapClient(Environment env) throws IOException {
+    public PnfEventReadyKafkaClient(Environment env) throws IOException {
         pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>();
-        topicListenerDelayInSeconds = env.getProperty("pnf.dmaap.topicListenerDelayInSeconds", Integer.class);
+        topicListenerDelayInSeconds = env.getProperty("pnf.kafka.topicListenerDelayInSeconds", Integer.class);
         executor = null;
         try {
             consumerForPnfReady = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers"));
@@ -75,8 +75,8 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
     public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) {
         logger.debug("registering for pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId);
         pnfCorrelationIdToThreadMap.put(pnfCorrelationId, informConsumer);
-        if (!dmaapThreadListenerIsRunning) {
-            startDmaapThreadListener();
+        if (!kafkaThreadListenerIsRunning) {
+            startKafkaThreadListener();
         }
     }
 
@@ -87,31 +87,31 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
         if (pnfCorrelationIdToThreadMap.isEmpty()) {
             consumerForPnfUpdate.close();
             consumerForPnfReady.close();
-            stopDmaapThreadListener();
+            stopKafkaThreadListener();
         }
         return runnable;
     }
 
-    private synchronized void startDmaapThreadListener() {
-        if (!dmaapThreadListenerIsRunning) {
+    private synchronized void startKafkaThreadListener() {
+        if (!kafkaThreadListenerIsRunning) {
             executor = new ScheduledThreadPoolExecutor(1);
             executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
             executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
-            executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0, topicListenerDelayInSeconds,
+            executor.scheduleWithFixedDelay(new KafkaTopicListenerThread(), 0, topicListenerDelayInSeconds,
                     TimeUnit.SECONDS);
-            dmaapThreadListenerIsRunning = true;
+            kafkaThreadListenerIsRunning = true;
         }
     }
 
-    private synchronized void stopDmaapThreadListener() {
-        if (dmaapThreadListenerIsRunning) {
+    private synchronized void stopKafkaThreadListener() {
+        if (kafkaThreadListenerIsRunning) {
             executor.shutdown();
-            dmaapThreadListenerIsRunning = false;
+            kafkaThreadListenerIsRunning = false;
             executor = null;
         }
     }
 
-    class DmaapTopicListenerThread implements Runnable {
+    class KafkaTopicListenerThread implements Runnable {
         @Override
         public void run() {
             try {
@@ -141,7 +141,7 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
         private void informAboutPnfReadyIfPnfCorrelationIdFound(String pnfCorrelationId) {
             Runnable runnable = unregister(pnfCorrelationId);
             if (runnable != null) {
-                logger.debug("dmaap listener gets pnf ready event for pnfCorrelationId: {}", pnfCorrelationId);
+                logger.debug("kafka listener gets pnf ready event for pnfCorrelationId: {}", pnfCorrelationId);
                 runnable.run();
             }
         }