Add or Delete a PNF to an Active Service
[so.git] / bpmn / so-bpmn-infrastructure-common / src / main / java / org / onap / so / bpmn / infrastructure / pnf / dmaap / PnfEventReadyDmaapClient.java
index 1b7a69e..f215d49 100644 (file)
@@ -19,7 +19,6 @@
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-
 package org.onap.so.bpmn.infrastructure.pnf.dmaap;
 
 import java.io.IOException;
@@ -43,31 +42,36 @@ import org.springframework.stereotype.Component;
 
 @Component
 public class PnfEventReadyDmaapClient implements DmaapClient {
-
     private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyDmaapClient.class);
-
     private HttpClient httpClient;
     private Map<String, Runnable> pnfCorrelationIdToThreadMap;
-    private HttpGet getRequest;
+    private HttpGet getRequestForpnfReady;
+    private HttpGet getRequestForPnfUpdate;
     private int topicListenerDelayInSeconds;
     private volatile ScheduledThreadPoolExecutor executor;
     private volatile boolean dmaapThreadListenerIsRunning;
 
+
+
     @Autowired
     public PnfEventReadyDmaapClient(Environment env) {
         httpClient = HttpClientBuilder.create().build();
         pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>();
         topicListenerDelayInSeconds = env.getProperty("pnf.dmaap.topicListenerDelayInSeconds", Integer.class);
         executor = null;
-        getRequest = new HttpGet(UriBuilder.fromUri(env.getProperty("pnf.dmaap.uriPathPrefix"))
-                .scheme(env.getProperty("pnf.dmaap.protocol"))
-                .host(env.getProperty("pnf.dmaap.host"))
+        getRequestForpnfReady = new HttpGet(UriBuilder.fromUri(env.getProperty("pnf.dmaap.uriPathPrefix"))
+                .scheme(env.getProperty("pnf.dmaap.protocol")).host(env.getProperty("pnf.dmaap.host"))
                 .port(env.getProperty("pnf.dmaap.port", Integer.class))
-                .path(env.getProperty("pnf.dmaap.topicName"))
-                .path(env.getProperty("pnf.dmaap.consumerGroup"))
+                .path(env.getProperty("pnf.dmaap.pnfReadyTopicName")).path(env.getProperty("pnf.dmaap.consumerGroup"))
                 .path(env.getProperty("pnf.dmaap.consumerId")).build());
+        getRequestForPnfUpdate = new HttpGet(UriBuilder.fromUri(env.getProperty("pnf.dmaap.uriPathPrefix"))
+                .scheme(env.getProperty("pnf.dmaap.protocol")).host(env.getProperty("pnf.dmaap.host"))
+                .port(env.getProperty("pnf.dmaap.port", Integer.class))
+                .path(env.getProperty("pnf.dmaap.pnfUpdateTopicName")).path(env.getProperty("pnf.dmaap.consumerGroup"))
+                .path(env.getProperty("pnf.dmaap.consumerIdUpdate")).build());
     }
 
+
     @Override
     public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) {
         logger.debug("registering for pnf ready dmaap event for pnf correlation id: {}", pnfCorrelationId);
@@ -92,8 +96,8 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
             executor = new ScheduledThreadPoolExecutor(1);
             executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
             executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
-            executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0,
-                    topicListenerDelayInSeconds, TimeUnit.SECONDS);
+            executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0, topicListenerDelayInSeconds,
+                    TimeUnit.SECONDS);
             dmaapThreadListenerIsRunning = true;
         }
     }
@@ -107,18 +111,24 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
     }
 
     class DmaapTopicListenerThread implements Runnable {
-
         @Override
         public void run() {
             try {
-                logger.debug("dmaap listener starts listening pnf ready dmaap topic");
-                HttpResponse response = httpClient.execute(getRequest);
-                getPnfCorrelationIdListFromResponse(response).forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound);
+                HttpResponse response;
+                response = httpClient.execute(getRequestForPnfUpdate);
+                List<String> pnfUpdateResponse = getPnfCorrelationIdListFromResponse(response);
+                if (pnfUpdateResponse.isEmpty()) {
+                    response = httpClient.execute(getRequestForpnfReady);
+                    getPnfCorrelationIdListFromResponse(response)
+                            .forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound);
+                } else {
+                    pnfUpdateResponse.forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound);
+                }
             } catch (IOException e) {
                 logger.error("Exception caught during sending rest request to dmaap for listening event topic", e);
-            }
-            finally {
-                getRequest.reset();
+            } finally {
+                getRequestForpnfReady.reset();
+                getRequestForPnfUpdate.reset();
             }
         }
 
@@ -140,5 +150,5 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
             }
         }
     }
-
 }
+