[SO] Remove DMaap Dependency in SO-bpmn-infra
[so.git] / bpmn / so-bpmn-infrastructure-common / src / main / java / org / onap / so / bpmn / infrastructure / pnf / dmaap / PnfEventReadyDmaapClient.java
index 96562fe..44b16da 100644 (file)
@@ -19,7 +19,6 @@
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-
 package org.onap.so.bpmn.infrastructure.pnf.dmaap;
 
 import java.io.IOException;
@@ -29,12 +28,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import javax.ws.rs.core.UriBuilder;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.util.EntityUtils;
+import org.onap.so.client.kafka.KafkaConsumerImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -43,32 +37,43 @@ import org.springframework.stereotype.Component;
 
 @Component
 public class PnfEventReadyDmaapClient implements DmaapClient {
-
     private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyDmaapClient.class);
-
-    private HttpClient httpClient;
     private Map<String, Runnable> pnfCorrelationIdToThreadMap;
-    private HttpGet getRequest;
     private int topicListenerDelayInSeconds;
     private volatile ScheduledThreadPoolExecutor executor;
     private volatile boolean dmaapThreadListenerIsRunning;
+    private KafkaConsumerImpl consumerForPnfReady;
+    private KafkaConsumerImpl consumerForPnfUpdate;
+    private String pnfReadyTopic;
+    private String pnfUpdateTopic;
+    private String consumerGroup;
+    private String consumerId;
+    private String consumerIdUpdate;
+
+
 
     @Autowired
-    public PnfEventReadyDmaapClient(Environment env) {
-        httpClient = HttpClientBuilder.create().build();
+    public PnfEventReadyDmaapClient(Environment env) throws IOException {
         pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>();
         topicListenerDelayInSeconds = env.getProperty("pnf.dmaap.topicListenerDelayInSeconds", Integer.class);
         executor = null;
-        getRequest = new HttpGet(UriBuilder.fromUri(env.getProperty("pnf.dmaap.uriPathPrefix"))
-                .scheme(env.getProperty("pnf.dmaap.protocol")).host(env.getProperty("pnf.dmaap.host"))
-                .port(env.getProperty("pnf.dmaap.port", Integer.class)).path(env.getProperty("pnf.dmaap.topicName"))
-                .path(env.getProperty("pnf.dmaap.consumerGroup")).path(env.getProperty("pnf.dmaap.consumerId"))
-                .build());
+        try {
+            consumerForPnfReady = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers"));
+            consumerForPnfUpdate = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers"));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        pnfReadyTopic = env.getProperty("pnf.kafka.pnfReadyTopicName");
+        pnfUpdateTopic = env.getProperty("pnf.kafka.pnfUpdateTopicName");
+        consumerGroup = env.getProperty("pnf.kafka.consumerGroup");
+        consumerId = env.getProperty("pnf.kafka.consumerId");
+        consumerIdUpdate = env.getProperty("pnf.kafka.consumerIdUpdate");
     }
 
+
     @Override
     public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) {
-        logger.debug("registering for pnf ready dmaap event for pnf correlation id: {}", pnfCorrelationId);
+        logger.debug("registering for pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId);
         pnfCorrelationIdToThreadMap.put(pnfCorrelationId, informConsumer);
         if (!dmaapThreadListenerIsRunning) {
             startDmaapThreadListener();
@@ -77,9 +82,11 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
 
     @Override
     public synchronized Runnable unregister(String pnfCorrelationId) {
-        logger.debug("unregistering from pnf ready dmaap event for pnf correlation id: {}", pnfCorrelationId);
+        logger.debug("unregistering from pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId);
         Runnable runnable = pnfCorrelationIdToThreadMap.remove(pnfCorrelationId);
         if (pnfCorrelationIdToThreadMap.isEmpty()) {
+            consumerForPnfUpdate.close();
+            consumerForPnfReady.close();
             stopDmaapThreadListener();
         }
         return runnable;
@@ -105,26 +112,28 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
     }
 
     class DmaapTopicListenerThread implements Runnable {
-
         @Override
         public void run() {
             try {
-                logger.debug("dmaap listener starts listening pnf ready dmaap topic");
-                HttpResponse response = httpClient.execute(getRequest);
-                getPnfCorrelationIdListFromResponse(response).forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound);
+                List<String> response;
+                System.out.println(pnfUpdateTopic + "   " + consumerGroup);
+                response = consumerForPnfUpdate.get(pnfUpdateTopic, consumerGroup, consumerIdUpdate);
+                if (response.isEmpty()) {
+                    response = consumerForPnfReady.get(pnfReadyTopic, consumerGroup, consumerId);
+                    getPnfCorrelationIdListFromResponse(response)
+                            .forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound);
+                } else {
+                    getPnfCorrelationIdListFromResponse(response)
+                            .forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound);
+                }
             } catch (IOException e) {
-                logger.error("Exception caught during sending rest request to dmaap for listening event topic", e);
-            } finally {
-                getRequest.reset();
+                logger.error("Exception caught during sending rest request to kafka for listening event topic", e);
             }
         }
 
-        private List<String> getPnfCorrelationIdListFromResponse(HttpResponse response) throws IOException {
-            if (response.getStatusLine().getStatusCode() == 200) {
-                String responseString = EntityUtils.toString(response.getEntity(), "UTF-8");
-                if (responseString != null) {
-                    return JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(responseString);
-                }
+        private List<String> getPnfCorrelationIdListFromResponse(List<String> response) throws IOException {
+            if (response != null) {
+                return JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(response);
             }
             return Collections.emptyList();
         }
@@ -138,3 +147,4 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
         }
     }
 }
+