restoring the pnf dmaap client functionality to the proper working version 61/98861/2
authorLukasz Muszkieta <lukasz.muszkieta@nokia.com>
Tue, 26 Nov 2019 16:08:00 +0000 (17:08 +0100)
committerLukasz Muszkieta <lukasz.muszkieta@nokia.com>
Tue, 26 Nov 2019 17:31:11 +0000 (18:31 +0100)
Issue-ID: SO-2339
Change-Id: I2a12517fd7b37d3260058be6c5c27865e207b861
Signed-off-by: Lukasz Muszkieta <lukasz.muszkieta@nokia.com>
bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java
bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java
bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java
bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java
bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscriptionTest.java
bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java
bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClientTest.java

index 0783178..43fbc59 100644 (file)
@@ -34,7 +34,7 @@ public class DmaapClientTestImpl implements DmaapClient {
     private Runnable informConsumer;
 
     @Override
-    public void registerForUpdate(String pnfCorrelationId, Runnable informConsumer, Map<String, String> updateInfo) {
+    public void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) {
         this.pnfCorrelationId = pnfCorrelationId;
         this.informConsumer = informConsumer;
     }
index a55f32a..5cbd530 100644 (file)
@@ -3,7 +3,6 @@
  * ONAP - SO
  * ================================================================================
  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2019 Nokia.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.so.bpmn.infrastructure.pnf.delegate;
 
-import java.util.Map;
 import org.camunda.bpm.engine.RuntimeService;
 import org.camunda.bpm.engine.delegate.DelegateExecution;
 import org.camunda.bpm.engine.delegate.JavaDelegate;
-import org.onap.so.bpmn.common.recipe.ResourceInput;
-import org.onap.so.bpmn.common.resource.ResourceRequestBuilder;
 import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
-import java.util.HashMap;
-import java.util.Optional;
 
 @Component
 public class InformDmaapClient implements JavaDelegate {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(InformDmaapClient.class);
     private DmaapClient dmaapClient;
 
     @Override
     public void execute(DelegateExecution execution) {
         String pnfCorrelationId = (String) execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID);
         RuntimeService runtimeService = execution.getProcessEngineServices().getRuntimeService();
-        String processBusinessKey = execution.getProcessBusinessKey();
-        dmaapClient.registerForUpdate(pnfCorrelationId,
-                () -> runtimeService.createMessageCorrelation("WorkflowMessage")
-                        .processInstanceBusinessKey(processBusinessKey).correlateWithResult(),
-                createUpdateInfoMap(execution));
-    }
-
-    private Map<String, String> createUpdateInfoMap(DelegateExecution execution) {
-        Map<String, String> updateInfoMap = new HashMap<>();
-        updateInfoMap.put("pnfCorrelationId",
-                (String) execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID));
-        getResourceInput(execution).ifPresent(resourceInput -> {
-            updateInfoMap.put("globalSubscriberID", resourceInput.getGlobalSubscriberId());
-            updateInfoMap.put("serviceType", resourceInput.getServiceType());
-            updateInfoMap.put("serviceInstanceId", resourceInput.getServiceInstanceId());
-        });
-        return updateInfoMap;
-    }
-
-    private Optional<ResourceInput> getResourceInput(DelegateExecution execution) {
-        ResourceInput resourceInput = null;
-        if (execution.getVariable("resourceInput") != null) {
-            resourceInput = ResourceRequestBuilder.getJsonObject((String) execution.getVariable("resourceInput"),
-                    ResourceInput.class);
-        } else {
-            LOGGER.warn("resourceInput value is null for correlation id: {}",
-                    execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID));
-        }
-        return Optional.ofNullable(resourceInput);
+        dmaapClient.registerForUpdate(pnfCorrelationId, () -> runtimeService.createMessageCorrelation("WorkflowMessage")
+                .processInstanceBusinessKey(execution.getProcessBusinessKey()).correlateWithResult());
     }
 
     @Autowired
index bafb749..fd7eb15 100644 (file)
 
 package org.onap.so.bpmn.infrastructure.pnf.dmaap;
 
-import java.util.Map;
-
 public interface DmaapClient {
 
-    void registerForUpdate(String pnfCorrelationId, Runnable informConsumer, Map<String, String> updateInfo);
+    void registerForUpdate(String pnfCorrelationId, Runnable informConsumer);
 
     Runnable unregister(String pnfCorrelationId);
 }
index bd1a45c..a2c73ca 100644 (file)
 package org.onap.so.bpmn.infrastructure.pnf.dmaap;
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.*;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import javax.ws.rs.core.UriBuilder;
 import org.apache.http.HttpResponse;
-import org.apache.http.HttpStatus;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.HttpClientBuilder;
@@ -40,11 +40,6 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.core.env.Environment;
 import org.springframework.stereotype.Component;
-import org.onap.so.client.aai.entities.uri.AAIResourceUri;
-import org.onap.so.client.aai.entities.uri.AAIUriFactory;
-import org.onap.so.client.aai.AAIResourcesClient;
-import org.onap.so.client.aai.AAIObjectType;
-import static org.onap.so.bpmn.infrastructure.pnf.dmaap.JsonUtilForPnfCorrelationId.*;
 
 @Component
 public class PnfEventReadyDmaapClient implements DmaapClient {
@@ -57,7 +52,6 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
     private int topicListenerDelayInSeconds;
     private volatile ScheduledThreadPoolExecutor executor;
     private volatile boolean dmaapThreadListenerIsRunning;
-    private volatile List<Map<String, String>> listOfUpdateInfoMap;
 
     @Autowired
     public PnfEventReadyDmaapClient(Environment env) {
@@ -70,16 +64,11 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
                 .port(env.getProperty("pnf.dmaap.port", Integer.class)).path(env.getProperty("pnf.dmaap.topicName"))
                 .path(env.getProperty("pnf.dmaap.consumerGroup")).path(env.getProperty("pnf.dmaap.consumerId"))
                 .build());
-        listOfUpdateInfoMap = new ArrayList<>();
     }
 
     @Override
-    public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer,
-            Map<String, String> updateInfo) {
+    public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) {
         logger.debug("registering for pnf ready dmaap event for pnf correlation id: {}", pnfCorrelationId);
-        synchronized (listOfUpdateInfoMap) {
-            listOfUpdateInfoMap.add(updateInfo);
-        }
         pnfCorrelationIdToThreadMap.put(pnfCorrelationId, informConsumer);
         if (!dmaapThreadListenerIsRunning) {
             startDmaapThreadListener();
@@ -90,16 +79,6 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
     public synchronized Runnable unregister(String pnfCorrelationId) {
         logger.debug("unregistering from pnf ready dmaap event for pnf correlation id: {}", pnfCorrelationId);
         Runnable runnable = pnfCorrelationIdToThreadMap.remove(pnfCorrelationId);
-        synchronized (listOfUpdateInfoMap) {
-            for (int i = listOfUpdateInfoMap.size() - 1; i >= 0; i--) {
-                if (!listOfUpdateInfoMap.get(i).containsKey("pnfCorrelationId"))
-                    continue;
-                String id = listOfUpdateInfoMap.get(i).get("pnfCorrelationId");
-                if (id != pnfCorrelationId)
-                    continue;
-                listOfUpdateInfoMap.remove(i);
-            }
-        }
         if (pnfCorrelationIdToThreadMap.isEmpty()) {
             stopDmaapThreadListener();
         }
@@ -132,12 +111,7 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
             try {
                 logger.debug("dmaap listener starts listening pnf ready dmaap topic");
                 HttpResponse response = httpClient.execute(getRequest);
-                if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
-                    String responseString = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
-                    List<String> idList = parseJsonToGelAllPnfCorrelationId(responseString);
-                    idList.stream().findFirst().ifPresent(id -> registerClientResponse(id, responseString));
-                    idList.forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound);
-                }
+                getPnfCorrelationIdListFromResponse(response).forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound);
             } catch (IOException e) {
                 logger.error("Exception caught during sending rest request to dmaap for listening event topic", e);
             } finally {
@@ -145,6 +119,16 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
             }
         }
 
+        private List<String> getPnfCorrelationIdListFromResponse(HttpResponse response) throws IOException {
+            if (response.getStatusLine().getStatusCode() == 200) {
+                String responseString = EntityUtils.toString(response.getEntity(), "UTF-8");
+                if (responseString != null) {
+                    return JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(responseString);
+                }
+            }
+            return Collections.emptyList();
+        }
+
         private void informAboutPnfReadyIfPnfCorrelationIdFound(String pnfCorrelationId) {
             Runnable runnable = unregister(pnfCorrelationId);
             if (runnable != null) {
@@ -152,36 +136,6 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
                 runnable.run();
             }
         }
-
-        private void registerClientResponse(String pnfCorrelationId, String response) {
-
-            String customerId = null;
-            String serviceType = null;
-            String serId = null;
-            synchronized (listOfUpdateInfoMap) {
-                for (Map<String, String> map : listOfUpdateInfoMap) {
-                    if (!map.containsKey("pnfCorrelationId"))
-                        continue;
-                    if (pnfCorrelationId != map.get("pnfCorrelationId"))
-                        continue;
-                    if (!map.containsKey("globalSubscriberID"))
-                        continue;
-                    if (!map.containsKey("serviceType"))
-                        continue;
-                    if (!map.containsKey("serviceInstanceId"))
-                        continue;
-                    customerId = map.get("pnfCorrelationId");
-                    serviceType = map.get("serviceType");
-                    serId = map.get("serviceInstanceId");
-                }
-            }
-            if (customerId == null || serviceType == null || serId == null)
-                return;
-            AAIResourcesClient client = new AAIResourcesClient();
-            AAIResourceUri uri = AAIUriFactory.createResourceUri(AAIObjectType.SERVICE_INSTANCE_METADATA, customerId,
-                    serviceType, serId);
-            client.update(uri, response);
-        }
-
     }
+
 }
index bfaf9cf..c2e87d5 100644 (file)
@@ -32,7 +32,7 @@ public class CancelDmaapSubscriptionTest {
     private static final String TEST_PNF_CORRELATION_ID = "testPnfCorrelationId";
 
     @Test
-    public void shouldCancelSubscription() throws Exception {
+    public void shouldCancelSubscription() {
         // given
         CancelDmaapSubscription delegate = new CancelDmaapSubscription();
         DmaapClientTestImpl dmaapClientTest = new DmaapClientTestImpl();
@@ -42,7 +42,7 @@ public class CancelDmaapSubscriptionTest {
                 .thenReturn(TEST_PNF_CORRELATION_ID);
         when(delegateExecution.getProcessBusinessKey()).thenReturn("testBusinessKey");
         dmaapClientTest.registerForUpdate("testPnfCorrelationId", () -> {
-        }, null);
+        });
         // when
         delegate.execute(delegateExecution);
         // then
index 598582b..0ec0ac8 100644 (file)
@@ -21,9 +21,8 @@
 
 package org.onap.so.bpmn.infrastructure.pnf.delegate;
 
-import java.util.Map;
-import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient;
 import java.util.Objects;
+import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient;
 
 public class DmaapClientTestImpl implements DmaapClient {
 
@@ -31,7 +30,7 @@ public class DmaapClientTestImpl implements DmaapClient {
     private Runnable informConsumer;
 
     @Override
-    public void registerForUpdate(String pnfCorrelationId, Runnable informConsumer, Map<String, String> updateInfo) {
+    public void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) {
         this.pnfCorrelationId = pnfCorrelationId;
         this.informConsumer = informConsumer;
     }
@@ -47,19 +46,15 @@ public class DmaapClientTestImpl implements DmaapClient {
         return null;
     }
 
-    public String getPnfCorrelationId() {
+    String getPnfCorrelationId() {
         return pnfCorrelationId;
     }
 
-    public Runnable getInformConsumer() {
+    Runnable getInformConsumer() {
         return informConsumer;
     }
 
-    public void sendMessage() {
-        informConsumer.run();
-    }
-
-    public boolean haveRegisteredConsumer() {
+    boolean haveRegisteredConsumer() {
         return pnfCorrelationId != null;
     }
 }
index 93a71b3..94aa142 100644 (file)
@@ -34,8 +34,6 @@ import org.camunda.bpm.engine.runtime.MessageCorrelationBuilder;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.InOrder;
-import org.onap.so.bpmn.common.recipe.ResourceInput;
-import org.onap.so.bpmn.common.resource.ResourceRequestBuilder;
 
 public class InformDmaapClientTest {
     @Before
@@ -76,41 +74,11 @@ public class InformDmaapClientTest {
         inOrder.verify(messageCorrelationBuilder).correlateWithResult();
     }
 
-    private ResourceInput getUpdateResInputObj(String modelName) {
-
-        String resourceInput = "{\n" + "\t\"resourceInstanceName\": \"SotnFc-wan-connection_wanconnection-37\",\n"
-                + "\t\"resourceInstanceDes\": null,\n" + "\t\"globalSubscriberId\": \"sdwandemo\",\n"
-                + "\t\"serviceType\": \"CCVPN\",\n" + "\t\"operationId\": \"df3387b5-4fbf-41bd-82a0-13a955ac178a\",\n"
-                + "\t\"serviceModelInfo\": {\n" + "\t\t\"modelName\": \"WanConnectionSvc03\",\n"
-                + "\t\t\"modelUuid\": \"198b066c-0771-4157-9594-1824adfdda7e\",\n"
-                + "\t\t\"modelInvariantUuid\": \"43fb5165-7d03-4009-8951-a8f45d3f0148\",\n"
-                + "\t\t\"modelVersion\": \"1.0\",\n" + "\t\t\"modelCustomizationUuid\": \"\",\n"
-                + "\t\t\"modelCustomizationName\": \"\",\n" + "\t\t\"modelInstanceName\": \"\",\n"
-                + "\t\t\"modelType\": \"\"\n" + "\t},\n" + "\t\"resourceModelInfo\": {\n" + "\t\t\"modelName\": \""
-                + modelName + "\",\n" + "\t\t\"modelUuid\": \"6a0bf88b-343c-415b-88c1-6f73702452c4\",\n"
-                + "\t\t\"modelInvariantUuid\": \"50bc3415-2e01-4e50-a9e1-ec9584599bb3\",\n"
-                + "\t\t\"modelCustomizationUuid\": \"b205d620-84bd-4058-afa0-e3aeee8bb712\",\n"
-                + "\t\t\"modelCustomizationName\": \"\",\n"
-                + "\t\t\"modelInstanceName\": \"SotnFc-wan-connection 0\",\n" + "\t\t\"modelType\": \"\"\n" + "\t},\n"
-                + "\t\"resourceInstancenUuid\": null,\n"
-                + "\t\"resourceParameters\": \"{\\n\\\"locationConstraints\\\":[],\\n\\\"requestInputs\\\":{\\\"sotnfcspecwanconnection0_route-objective-function\\\":null,\\\"sotnfcspecwanconnection0_colorAware\\\":null,\\\"3rdctlspecwanconnection0_thirdPartyAdaptorRpc\\\":null,\\\"sotnfcspecwanconnection0_couplingFlag\\\":null,\\\"sotnfcspecwanconnection0_pbs\\\":null,\\\"3rdctlspecwanconnection0_thirdPartySdncId\\\":null,\\\"sotnfcspecwanconnection0_cbs\\\":null,\\\"3rdctlspecwanconnection0_thirdpartySdncName\\\":null,\\\"sotnfcspecwanconnection0_total-size\\\":null,\\\"3rdctlspecwanconnection0_templateFileName\\\":\\\"sotn_create_zte_template.json\\\",\\\"fcwanconnection0_type\\\":null,\\\"sotnfcspecwanconnection0_cir\\\":null,\\\"fcwanconnection0_uuid\\\":null,\\\"sotnfcspecwanconnection0_diversity-policy\\\":null,\\\"nf_naming\\\":true,\\\"multi_stage_design\\\":false,\\\"availability_zone_max_count\\\":1,\\\"3rdctlspecwanconnection0_restapiUrl\\\":\\\"http://10.80.80.21:8443/restconf/operations/ZTE-API-ConnectivityService:create-connectivity-service\\\",\\\"max_instances\\\":null,\\\"sotnfcspecwanconnection0_reroute\\\":null,\\\"fcwanconnection0_name\\\":null,\\\"sotnfcspecwanconnection0_dualLink\\\":null,\\\"min_instances\\\":null,\\\"sotnfcspecwanconnection0_pir\\\":null,\\\"sotnfcspecwanconnection0_service-type\\\":null}\\n}\",\n"
-                + "\t\"operationType\": \"createInstance\",\n"
-                + "\t\"serviceInstanceId\": \"ffa07ae4-f820-45af-9439-1416b3bc1d39\",\n"
-                + "\t\"requestsInputs\": \"{\\r\\n\\t\\\"service\\\": {\\r\\n\\t\\t\\\"name\\\": \\\"wanconnection-37\\\",\\r\\n\\t\\t\\\"description\\\": \\\"deafe\\\",\\r\\n\\t\\t\\\"serviceInvariantUuid\\\": \\\"43fb5165-7d03-4009-8951-a8f45d3f0148\\\",\\r\\n\\t\\t\\\"serviceUuid\\\": \\\"198b066c-0771-4157-9594-1824adfdda7e\\\",\\r\\n\\t\\t\\\"globalSubscriberId\\\": \\\"sdwandemo\\\",\\r\\n\\t\\t\\\"serviceType\\\": \\\"CCVPN\\\",\\r\\n\\t\\t\\\"parameters\\\": {\\r\\n\\t\\t\\t\\\"resources\\\": [\\r\\n\\t\\t\\t],\\r\\n\\t\\t\\t\\\"requestInputs\\\": {\\r\\n\\t\\t\\t\\t\\\"sotnfcwanconnection0_3rdctlspecwanconnection0_restapiUrl\\\": \\\"http://10.80.80.21:8443/restconf/operations/ZTE-API-ConnectivityService:create-connectivity-service\\\",\\r\\n\\t\\t\\t\\t\\\"sotnfcwanconnection0_3rdctlspecwanconnection0_templateFileName\\\": \\\"sotn_create_zte_template.json\\\",\\r\\n\\t\\t\\t\\t\\\"sdwanfcwanconnection0_3rdctlspecwanconnection0_restapiUrl\\\": \\\"http://10.80.80.21:8443/restconf/operations/ZTE-API-ConnectivityService:create-connectivity-service\\\",\\r\\n\\t\\t\\t\\t\\\"sdwanfcwanconnection0_3rdctlspecwanconnection0_templateFileName\\\": \\\"sdwan_create_zte_template.json\\\",\\\"ont_ont_manufacturer\\\":\\\"huawei\\\",\\\"ont_ont_serial_num\\\":\\\"123\\\"\\r\\n\\t\\t\\t}\\r\\n\\t\\t}\\r\\n\\t}\\r\\n}\"\n"
-                + "}";
-
-        ResourceInput resourceInputObj = ResourceRequestBuilder.getJsonObject(resourceInput, ResourceInput.class);
-        return resourceInputObj;
-    }
-
     private DelegateExecution mockDelegateExecution() {
-        ResourceInput input = getUpdateResInputObj("OLT");
         DelegateExecution delegateExecution = mock(DelegateExecution.class);
-
         when(delegateExecution.getVariable(eq(ExecutionVariableNames.PNF_CORRELATION_ID)))
                 .thenReturn("testPnfCorrelationId");
         when(delegateExecution.getProcessBusinessKey()).thenReturn("testBusinessKey");
-        when(delegateExecution.getVariable("resourceInput")).thenReturn(input.toString());
         ProcessEngineServices processEngineServices = mock(ProcessEngineServices.class);
         when(delegateExecution.getProcessEngineServices()).thenReturn(processEngineServices);
         RuntimeService runtimeService = mock(RuntimeService.class);
@@ -118,7 +86,6 @@ public class InformDmaapClientTest {
         messageCorrelationBuilder = mock(MessageCorrelationBuilder.class);
         when(runtimeService.createMessageCorrelation(any())).thenReturn(messageCorrelationBuilder);
         when(messageCorrelationBuilder.processInstanceBusinessKey(any())).thenReturn(messageCorrelationBuilder);
-
         return delegateExecution;
     }
 }