Add or Delete a PNF to an Active Service
[so.git] / bpmn / so-bpmn-infrastructure-common / src / main / java / org / onap / so / bpmn / infrastructure / pnf / dmaap / PnfEventReadyDmaapClient.java
index 02303a6..1a25388 100644 (file)
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-
 package org.onap.so.bpmn.infrastructure.pnf.dmaap;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -38,23 +39,18 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.core.env.Environment;
 import org.springframework.stereotype.Component;
-import org.onap.so.client.aai.entities.uri.AAIResourceUri;
-import org.onap.so.client.aai.entities.uri.AAIUriFactory;
-import org.onap.so.client.aai.AAIResourcesClient;
-import org.onap.so.client.aai.AAIObjectType;
 
 @Component
 public class PnfEventReadyDmaapClient implements DmaapClient {
-
     private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyDmaapClient.class);
-
     private HttpClient httpClient;
     private Map<String, Runnable> pnfCorrelationIdToThreadMap;
-    private HttpGet getRequest;
+    private HttpGet getRequestForpnfReady;
+    private HttpGet getRequestForPnfUpdate;
     private int topicListenerDelayInSeconds;
     private volatile ScheduledThreadPoolExecutor executor;
     private volatile boolean dmaapThreadListenerIsRunning;
-    private volatile List<Map<String, String>> listOfUpdateInfoMap;
+    private String topicName;
 
     @Autowired
     public PnfEventReadyDmaapClient(Environment env) {
@@ -62,21 +58,34 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
         pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>();
         topicListenerDelayInSeconds = env.getProperty("pnf.dmaap.topicListenerDelayInSeconds", Integer.class);
         executor = null;
-        getRequest = new HttpGet(UriBuilder.fromUri(env.getProperty("pnf.dmaap.uriPathPrefix"))
+        topicName = env.getProperty("pnf.dmaap.topicName");
+        String[] topic = topicName.split("\\s");
+        String pnf_ready = null;
+        String pnf_update = null;
+        for (String t : topic) {
+            if (t.matches("(.*)PNF_READY(.*)")) {
+                pnf_ready = t;
+            } else if (t.matches("(.*)PNF_UPDATE(.*)")) {
+                pnf_update = t;
+            } else {
+                return;
+            }
+        }
+        getRequestForpnfReady = new HttpGet(UriBuilder.fromUri(env.getProperty("pnf.dmaap.uriPathPrefix"))
                 .scheme(env.getProperty("pnf.dmaap.protocol")).host(env.getProperty("pnf.dmaap.host"))
-                .port(env.getProperty("pnf.dmaap.port", Integer.class)).path(env.getProperty("pnf.dmaap.topicName"))
+                .port(env.getProperty("pnf.dmaap.port", Integer.class)).path(pnf_ready)
+                .path(env.getProperty("pnf.dmaap.consumerGroup")).path(env.getProperty("pnf.dmaap.consumerId"))
+                .build());
+        getRequestForPnfUpdate = new HttpGet(UriBuilder.fromUri(env.getProperty("pnf.dmaap.uriPathPrefix"))
+                .scheme(env.getProperty("pnf.dmaap.protocol")).host(env.getProperty("pnf.dmaap.host"))
+                .port(env.getProperty("pnf.dmaap.port", Integer.class)).path(pnf_update)
                 .path(env.getProperty("pnf.dmaap.consumerGroup")).path(env.getProperty("pnf.dmaap.consumerId"))
                 .build());
-        listOfUpdateInfoMap = new ArrayList<>();
     }
 
     @Override
-    public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer,
-            Map<String, String> updateInfo) {
+    public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) {
         logger.debug("registering for pnf ready dmaap event for pnf correlation id: {}", pnfCorrelationId);
-        synchronized (listOfUpdateInfoMap) {
-            listOfUpdateInfoMap.add(updateInfo);
-        }
         pnfCorrelationIdToThreadMap.put(pnfCorrelationId, informConsumer);
         if (!dmaapThreadListenerIsRunning) {
             startDmaapThreadListener();
@@ -87,16 +96,6 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
     public synchronized Runnable unregister(String pnfCorrelationId) {
         logger.debug("unregistering from pnf ready dmaap event for pnf correlation id: {}", pnfCorrelationId);
         Runnable runnable = pnfCorrelationIdToThreadMap.remove(pnfCorrelationId);
-        synchronized (listOfUpdateInfoMap) {
-            for (int i = listOfUpdateInfoMap.size() - 1; i >= 0; i--) {
-                if (!listOfUpdateInfoMap.get(i).containsKey("pnfCorrelationId"))
-                    continue;
-                String id = listOfUpdateInfoMap.get(i).get("pnfCorrelationId");
-                if (id != pnfCorrelationId)
-                    continue;
-                listOfUpdateInfoMap.remove(i);
-            }
-        }
         if (pnfCorrelationIdToThreadMap.isEmpty()) {
             stopDmaapThreadListener();
         }
@@ -123,27 +122,24 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
     }
 
     class DmaapTopicListenerThread implements Runnable {
-
         @Override
         public void run() {
             try {
-                logger.debug("dmaap listener starts listening pnf ready dmaap topic");
-                HttpResponse response = httpClient.execute(getRequest);
-                List<String> idList = getPnfCorrelationIdListFromResponse(response);
-
-                // idList is never null
-                if (!idList.isEmpty()) {
-                    // send only body of response
-                    registerClientResponse(idList.get(0), EntityUtils.toString(response.getEntity(), "UTF-8"));
-                }
-
-                if (idList != null) {
-                    idList.forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound);
+                HttpResponse response;
+                response = httpClient.execute(getRequestForPnfUpdate);
+                List<String> pnfUpdateResponse = getPnfCorrelationIdListFromResponse(response);
+                if (pnfUpdateResponse.isEmpty()) {
+                    response = httpClient.execute(getRequestForpnfReady);
+                    getPnfCorrelationIdListFromResponse(response)
+                            .forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound);
+                } else {
+                    pnfUpdateResponse.forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound);
                 }
             } catch (IOException e) {
                 logger.error("Exception caught during sending rest request to dmaap for listening event topic", e);
             } finally {
-                getRequest.reset();
+                getRequestForpnfReady.reset();
+                getRequestForPnfUpdate.reset();
             }
         }
 
@@ -164,36 +160,5 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
                 runnable.run();
             }
         }
-
-        private void registerClientResponse(String pnfCorrelationId, String response) {
-
-            String customerId = null;
-            String serviceType = null;
-            String serId = null;
-            synchronized (listOfUpdateInfoMap) {
-                for (Map<String, String> map : listOfUpdateInfoMap) {
-                    if (!map.containsKey("pnfCorrelationId"))
-                        continue;
-                    if (pnfCorrelationId != map.get("pnfCorrelationId"))
-                        continue;
-                    if (!map.containsKey("globalSubscriberID"))
-                        continue;
-                    if (!map.containsKey("serviceType"))
-                        continue;
-                    if (!map.containsKey("serviceInstanceId"))
-                        continue;
-                    customerId = map.get("pnfCorrelationId");
-                    serviceType = map.get("serviceType");
-                    serId = map.get("serviceInstanceId");
-                }
-            }
-            if (customerId == null || serviceType == null || serId == null)
-                return;
-            AAIResourcesClient client = new AAIResourcesClient();
-            AAIResourceUri uri = AAIUriFactory.createResourceUri(AAIObjectType.SERVICE_INSTANCE_METADATA, customerId,
-                    serviceType, serId);
-            client.update(uri, response);
-        }
-
     }
 }