Merge "Added Junit for GRMClientCallFailed.java"
[so.git] / bpmn / so-bpmn-infrastructure-common / src / main / java / org / onap / so / bpmn / infrastructure / pnf / dmaap / PnfEventReadyDmaapClient.java
index 70323b7..1b7a69e 100644 (file)
@@ -4,6 +4,8 @@
  * ================================================================================
  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
+ * Copyright (C) 2018 Nokia.
+ * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
 package org.onap.so.bpmn.infrastructure.pnf.dmaap;
 
 import java.io.IOException;
-import java.net.URI;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
 import javax.ws.rs.core.UriBuilder;
-
 import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.util.EntityUtils;
-import org.onap.so.logger.MsoLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.Scope;
 import org.springframework.core.env.Environment;
 import org.springframework.stereotype.Component;
 
 @Component
-@Scope("prototype")
 public class PnfEventReadyDmaapClient implements DmaapClient {
 
-    private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA, PnfEventReadyDmaapClient.class);
+    private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyDmaapClient.class);
 
-    private final Environment env;
     private HttpClient httpClient;
-    private String dmaapHost;
-    private int dmaapPort;
-    private String dmaapProtocol;
-    private String dmaapUriPathPrefix;
-    private String dmaapTopicName;
-    private String consumerId;
-    private String consumerGroup;
     private Map<String, Runnable> pnfCorrelationIdToThreadMap;
     private HttpGet getRequest;
-    private int dmaapClientDelayInSeconds;
+    private int topicListenerDelayInSeconds;
     private volatile ScheduledThreadPoolExecutor executor;
     private volatile boolean dmaapThreadListenerIsRunning;
 
     @Autowired
     public PnfEventReadyDmaapClient(Environment env) {
-        this.env = env;
-    }
-
-    public void init() {
         httpClient = HttpClientBuilder.create().build();
         pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>();
-        dmaapHost = env.getProperty("pnf.dmaap.host");
-        dmaapPort = env.getProperty("pnf.dmaap.port", Integer.class);
+        topicListenerDelayInSeconds = env.getProperty("pnf.dmaap.topicListenerDelayInSeconds", Integer.class);
         executor = null;
-        getRequest = new HttpGet(buildURI());
+        getRequest = new HttpGet(UriBuilder.fromUri(env.getProperty("pnf.dmaap.uriPathPrefix"))
+                .scheme(env.getProperty("pnf.dmaap.protocol"))
+                .host(env.getProperty("pnf.dmaap.host"))
+                .port(env.getProperty("pnf.dmaap.port", Integer.class))
+                .path(env.getProperty("pnf.dmaap.topicName"))
+                .path(env.getProperty("pnf.dmaap.consumerGroup"))
+                .path(env.getProperty("pnf.dmaap.consumerId")).build());
     }
 
     @Override
-    public synchronized void registerForUpdate(String correlationId, Runnable informConsumer) {
-        LOGGER.debug("registering for pnf ready dmaap event for correlation id: " + correlationId);
-        pnfCorrelationIdToThreadMap.put(correlationId, informConsumer);
+    public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) {
+        logger.debug("registering for pnf ready dmaap event for pnf correlation id: {}", pnfCorrelationId);
+        pnfCorrelationIdToThreadMap.put(pnfCorrelationId, informConsumer);
         if (!dmaapThreadListenerIsRunning) {
             startDmaapThreadListener();
         }
     }
 
     @Override
-    public synchronized Runnable unregister(String correlationId) {
-        LOGGER.debug("unregistering from pnf ready dmaap event for correlation id: " + correlationId);
-        Runnable runnable = pnfCorrelationIdToThreadMap.remove(correlationId);
+    public synchronized Runnable unregister(String pnfCorrelationId) {
+        logger.debug("unregistering from pnf ready dmaap event for pnf correlation id: {}", pnfCorrelationId);
+        Runnable runnable = pnfCorrelationIdToThreadMap.remove(pnfCorrelationId);
         if (pnfCorrelationIdToThreadMap.isEmpty()) {
             stopDmaapThreadListener();
         }
@@ -102,7 +93,7 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
             executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
             executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
             executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0,
-                    dmaapClientDelayInSeconds, TimeUnit.SECONDS);
+                    topicListenerDelayInSeconds, TimeUnit.SECONDS);
             dmaapThreadListenerIsRunning = true;
         }
     }
@@ -115,64 +106,36 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
         }
     }
 
-    private URI buildURI() {
-        return UriBuilder.fromUri(dmaapUriPathPrefix)
-                .scheme(dmaapProtocol)
-                .host(dmaapHost)
-                .port(dmaapPort).path(dmaapTopicName)
-                .path(consumerGroup).path(consumerId).build();
-    }
-
-    public void setDmaapProtocol(String dmaapProtocol) {
-        this.dmaapProtocol = dmaapProtocol;
-    }
-
-    public void setDmaapUriPathPrefix(String dmaapUriPathPrefix) {
-        this.dmaapUriPathPrefix = dmaapUriPathPrefix;
-    }
-
-    public void setDmaapTopicName(String dmaapTopicName) {
-        this.dmaapTopicName = dmaapTopicName;
-    }
-
-    public void setConsumerId(String consumerId) {
-        this.consumerId = consumerId;
-    }
-
-    public void setConsumerGroup(String consumerGroup) {
-        this.consumerGroup = consumerGroup;
-    }
-
-    public void setDmaapClientDelayInSeconds(int dmaapClientDelayInSeconds) {
-        this.dmaapClientDelayInSeconds = dmaapClientDelayInSeconds;
-    }
-
     class DmaapTopicListenerThread implements Runnable {
 
         @Override
         public void run() {
             try {
+                logger.debug("dmaap listener starts listening pnf ready dmaap topic");
                 HttpResponse response = httpClient.execute(getRequest);
-                getCorrelationIdListFromResponse(response).forEach(this::informAboutPnfReadyIfCorrelationIdFound);
+                getPnfCorrelationIdListFromResponse(response).forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound);
             } catch (IOException e) {
-                LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e);
+                logger.error("Exception caught during sending rest request to dmaap for listening event topic", e);
+            }
+            finally {
+                getRequest.reset();
             }
         }
 
-        private List<String> getCorrelationIdListFromResponse(HttpResponse response) throws IOException {
+        private List<String> getPnfCorrelationIdListFromResponse(HttpResponse response) throws IOException {
             if (response.getStatusLine().getStatusCode() == 200) {
                 String responseString = EntityUtils.toString(response.getEntity(), "UTF-8");
                 if (responseString != null) {
-                    return JsonUtilForCorrelationId.parseJsonToGelAllCorrelationId(responseString);
+                    return JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(responseString);
                 }
             }
             return Collections.emptyList();
         }
 
-        private void informAboutPnfReadyIfCorrelationIdFound(String correlationId) {
-            Runnable runnable = unregister(correlationId);
+        private void informAboutPnfReadyIfPnfCorrelationIdFound(String pnfCorrelationId) {
+            Runnable runnable = unregister(pnfCorrelationId);
             if (runnable != null) {
-                LOGGER.debug("pnf ready event got from dmaap for correlationId: " + correlationId);
+                logger.debug("dmaap listener gets pnf ready event for pnfCorrelationId: {}", pnfCorrelationId);
                 runnable.run();
             }
         }