[SO] Remove DMaap Dependency in SO-bpmn-infra
[so.git] / bpmn / so-bpmn-infrastructure-common / src / main / java / org / onap / so / bpmn / infrastructure / pnf / dmaap / PnfEventReadyDmaapClient.java
index f215d49..44b16da 100644 (file)
@@ -28,12 +28,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import javax.ws.rs.core.UriBuilder;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.util.EntityUtils;
+import org.onap.so.client.kafka.KafkaConsumerImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -43,38 +38,42 @@ import org.springframework.stereotype.Component;
 @Component
 public class PnfEventReadyDmaapClient implements DmaapClient {
     private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyDmaapClient.class);
-    private HttpClient httpClient;
     private Map<String, Runnable> pnfCorrelationIdToThreadMap;
-    private HttpGet getRequestForpnfReady;
-    private HttpGet getRequestForPnfUpdate;
     private int topicListenerDelayInSeconds;
     private volatile ScheduledThreadPoolExecutor executor;
     private volatile boolean dmaapThreadListenerIsRunning;
+    private KafkaConsumerImpl consumerForPnfReady;
+    private KafkaConsumerImpl consumerForPnfUpdate;
+    private String pnfReadyTopic;
+    private String pnfUpdateTopic;
+    private String consumerGroup;
+    private String consumerId;
+    private String consumerIdUpdate;
 
 
 
     @Autowired
-    public PnfEventReadyDmaapClient(Environment env) {
-        httpClient = HttpClientBuilder.create().build();
+    public PnfEventReadyDmaapClient(Environment env) throws IOException {
         pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>();
         topicListenerDelayInSeconds = env.getProperty("pnf.dmaap.topicListenerDelayInSeconds", Integer.class);
         executor = null;
-        getRequestForpnfReady = new HttpGet(UriBuilder.fromUri(env.getProperty("pnf.dmaap.uriPathPrefix"))
-                .scheme(env.getProperty("pnf.dmaap.protocol")).host(env.getProperty("pnf.dmaap.host"))
-                .port(env.getProperty("pnf.dmaap.port", Integer.class))
-                .path(env.getProperty("pnf.dmaap.pnfReadyTopicName")).path(env.getProperty("pnf.dmaap.consumerGroup"))
-                .path(env.getProperty("pnf.dmaap.consumerId")).build());
-        getRequestForPnfUpdate = new HttpGet(UriBuilder.fromUri(env.getProperty("pnf.dmaap.uriPathPrefix"))
-                .scheme(env.getProperty("pnf.dmaap.protocol")).host(env.getProperty("pnf.dmaap.host"))
-                .port(env.getProperty("pnf.dmaap.port", Integer.class))
-                .path(env.getProperty("pnf.dmaap.pnfUpdateTopicName")).path(env.getProperty("pnf.dmaap.consumerGroup"))
-                .path(env.getProperty("pnf.dmaap.consumerIdUpdate")).build());
+        try {
+            consumerForPnfReady = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers"));
+            consumerForPnfUpdate = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers"));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        pnfReadyTopic = env.getProperty("pnf.kafka.pnfReadyTopicName");
+        pnfUpdateTopic = env.getProperty("pnf.kafka.pnfUpdateTopicName");
+        consumerGroup = env.getProperty("pnf.kafka.consumerGroup");
+        consumerId = env.getProperty("pnf.kafka.consumerId");
+        consumerIdUpdate = env.getProperty("pnf.kafka.consumerIdUpdate");
     }
 
 
     @Override
     public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) {
-        logger.debug("registering for pnf ready dmaap event for pnf correlation id: {}", pnfCorrelationId);
+        logger.debug("registering for pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId);
         pnfCorrelationIdToThreadMap.put(pnfCorrelationId, informConsumer);
         if (!dmaapThreadListenerIsRunning) {
             startDmaapThreadListener();
@@ -83,9 +82,11 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
 
     @Override
     public synchronized Runnable unregister(String pnfCorrelationId) {
-        logger.debug("unregistering from pnf ready dmaap event for pnf correlation id: {}", pnfCorrelationId);
+        logger.debug("unregistering from pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId);
         Runnable runnable = pnfCorrelationIdToThreadMap.remove(pnfCorrelationId);
         if (pnfCorrelationIdToThreadMap.isEmpty()) {
+            consumerForPnfUpdate.close();
+            consumerForPnfReady.close();
             stopDmaapThreadListener();
         }
         return runnable;
@@ -114,30 +115,25 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
         @Override
         public void run() {
             try {
-                HttpResponse response;
-                response = httpClient.execute(getRequestForPnfUpdate);
-                List<String> pnfUpdateResponse = getPnfCorrelationIdListFromResponse(response);
-                if (pnfUpdateResponse.isEmpty()) {
-                    response = httpClient.execute(getRequestForpnfReady);
+                List<String> response;
+                System.out.println(pnfUpdateTopic + "   " + consumerGroup);
+                response = consumerForPnfUpdate.get(pnfUpdateTopic, consumerGroup, consumerIdUpdate);
+                if (response.isEmpty()) {
+                    response = consumerForPnfReady.get(pnfReadyTopic, consumerGroup, consumerId);
                     getPnfCorrelationIdListFromResponse(response)
                             .forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound);
                 } else {
-                    pnfUpdateResponse.forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound);
+                    getPnfCorrelationIdListFromResponse(response)
+                            .forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound);
                 }
             } catch (IOException e) {
-                logger.error("Exception caught during sending rest request to dmaap for listening event topic", e);
-            } finally {
-                getRequestForpnfReady.reset();
-                getRequestForPnfUpdate.reset();
+                logger.error("Exception caught during sending rest request to kafka for listening event topic", e);
             }
         }
 
-        private List<String> getPnfCorrelationIdListFromResponse(HttpResponse response) throws IOException {
-            if (response.getStatusLine().getStatusCode() == 200) {
-                String responseString = EntityUtils.toString(response.getEntity(), "UTF-8");
-                if (responseString != null) {
-                    return JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(responseString);
-                }
+        private List<String> getPnfCorrelationIdListFromResponse(List<String> response) throws IOException {
+            if (response != null) {
+                return JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(response);
             }
             return Collections.emptyList();
         }