[SO] Remove DMaap Dependency in SO-bpmn-infra
[so.git] / bpmn / so-bpmn-infrastructure-common / src / main / java / org / onap / so / bpmn / infrastructure / pnf / dmaap / PnfEventReadyDmaapClient.java
index 1a25388..44b16da 100644 (file)
@@ -28,12 +28,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import javax.ws.rs.core.UriBuilder;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.util.EntityUtils;
+import org.onap.so.client.kafka.KafkaConsumerImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -43,49 +38,42 @@ import org.springframework.stereotype.Component;
 @Component
 public class PnfEventReadyDmaapClient implements DmaapClient {
     private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyDmaapClient.class);
-    private HttpClient httpClient;
     private Map<String, Runnable> pnfCorrelationIdToThreadMap;
-    private HttpGet getRequestForpnfReady;
-    private HttpGet getRequestForPnfUpdate;
     private int topicListenerDelayInSeconds;
     private volatile ScheduledThreadPoolExecutor executor;
     private volatile boolean dmaapThreadListenerIsRunning;
-    private String topicName;
+    private KafkaConsumerImpl consumerForPnfReady;
+    private KafkaConsumerImpl consumerForPnfUpdate;
+    private String pnfReadyTopic;
+    private String pnfUpdateTopic;
+    private String consumerGroup;
+    private String consumerId;
+    private String consumerIdUpdate;
+
+
 
     @Autowired
-    public PnfEventReadyDmaapClient(Environment env) {
-        httpClient = HttpClientBuilder.create().build();
+    public PnfEventReadyDmaapClient(Environment env) throws IOException {
         pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>();
         topicListenerDelayInSeconds = env.getProperty("pnf.dmaap.topicListenerDelayInSeconds", Integer.class);
         executor = null;
-        topicName = env.getProperty("pnf.dmaap.topicName");
-        String[] topic = topicName.split("\\s");
-        String pnf_ready = null;
-        String pnf_update = null;
-        for (String t : topic) {
-            if (t.matches("(.*)PNF_READY(.*)")) {
-                pnf_ready = t;
-            } else if (t.matches("(.*)PNF_UPDATE(.*)")) {
-                pnf_update = t;
-            } else {
-                return;
-            }
+        try {
+            consumerForPnfReady = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers"));
+            consumerForPnfUpdate = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers"));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
         }
-        getRequestForpnfReady = new HttpGet(UriBuilder.fromUri(env.getProperty("pnf.dmaap.uriPathPrefix"))
-                .scheme(env.getProperty("pnf.dmaap.protocol")).host(env.getProperty("pnf.dmaap.host"))
-                .port(env.getProperty("pnf.dmaap.port", Integer.class)).path(pnf_ready)
-                .path(env.getProperty("pnf.dmaap.consumerGroup")).path(env.getProperty("pnf.dmaap.consumerId"))
-                .build());
-        getRequestForPnfUpdate = new HttpGet(UriBuilder.fromUri(env.getProperty("pnf.dmaap.uriPathPrefix"))
-                .scheme(env.getProperty("pnf.dmaap.protocol")).host(env.getProperty("pnf.dmaap.host"))
-                .port(env.getProperty("pnf.dmaap.port", Integer.class)).path(pnf_update)
-                .path(env.getProperty("pnf.dmaap.consumerGroup")).path(env.getProperty("pnf.dmaap.consumerId"))
-                .build());
+        pnfReadyTopic = env.getProperty("pnf.kafka.pnfReadyTopicName");
+        pnfUpdateTopic = env.getProperty("pnf.kafka.pnfUpdateTopicName");
+        consumerGroup = env.getProperty("pnf.kafka.consumerGroup");
+        consumerId = env.getProperty("pnf.kafka.consumerId");
+        consumerIdUpdate = env.getProperty("pnf.kafka.consumerIdUpdate");
     }
 
+
     @Override
     public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) {
-        logger.debug("registering for pnf ready dmaap event for pnf correlation id: {}", pnfCorrelationId);
+        logger.debug("registering for pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId);
         pnfCorrelationIdToThreadMap.put(pnfCorrelationId, informConsumer);
         if (!dmaapThreadListenerIsRunning) {
             startDmaapThreadListener();
@@ -94,9 +82,11 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
 
     @Override
     public synchronized Runnable unregister(String pnfCorrelationId) {
-        logger.debug("unregistering from pnf ready dmaap event for pnf correlation id: {}", pnfCorrelationId);
+        logger.debug("unregistering from pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId);
         Runnable runnable = pnfCorrelationIdToThreadMap.remove(pnfCorrelationId);
         if (pnfCorrelationIdToThreadMap.isEmpty()) {
+            consumerForPnfUpdate.close();
+            consumerForPnfReady.close();
             stopDmaapThreadListener();
         }
         return runnable;
@@ -125,30 +115,25 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
         @Override
         public void run() {
             try {
-                HttpResponse response;
-                response = httpClient.execute(getRequestForPnfUpdate);
-                List<String> pnfUpdateResponse = getPnfCorrelationIdListFromResponse(response);
-                if (pnfUpdateResponse.isEmpty()) {
-                    response = httpClient.execute(getRequestForpnfReady);
+                List<String> response;
+                System.out.println(pnfUpdateTopic + "   " + consumerGroup);
+                response = consumerForPnfUpdate.get(pnfUpdateTopic, consumerGroup, consumerIdUpdate);
+                if (response.isEmpty()) {
+                    response = consumerForPnfReady.get(pnfReadyTopic, consumerGroup, consumerId);
                     getPnfCorrelationIdListFromResponse(response)
                             .forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound);
                 } else {
-                    pnfUpdateResponse.forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound);
+                    getPnfCorrelationIdListFromResponse(response)
+                            .forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound);
                 }
             } catch (IOException e) {
-                logger.error("Exception caught during sending rest request to dmaap for listening event topic", e);
-            } finally {
-                getRequestForpnfReady.reset();
-                getRequestForPnfUpdate.reset();
+                logger.error("Exception caught during sending rest request to kafka for listening event topic", e);
             }
         }
 
-        private List<String> getPnfCorrelationIdListFromResponse(HttpResponse response) throws IOException {
-            if (response.getStatusLine().getStatusCode() == 200) {
-                String responseString = EntityUtils.toString(response.getEntity(), "UTF-8");
-                if (responseString != null) {
-                    return JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(responseString);
-                }
+        private List<String> getPnfCorrelationIdListFromResponse(List<String> response) throws IOException {
+            if (response != null) {
+                return JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(response);
             }
             return Collections.emptyList();
         }
@@ -162,3 +147,4 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
         }
     }
 }
+