[SO] Code improvement in bpmn-infra supporting kafka change 61/137161/3 1.13.0
authorsushant53 <sushant.jadhav@t-systems.com>
Thu, 29 Feb 2024 06:21:05 +0000 (11:51 +0530)
committerSushant Jadhav <sushant.jadhav@t-systems.com>
Tue, 5 Mar 2024 09:29:55 +0000 (09:29 +0000)
Code improvement in bpmn-infra supporting kafka change

Issue-ID: SO-4122
Change-Id: I3924418d16f8f6d9270278f1894e224a216d1cf2
Signed-off-by: sushant53 <sushant.jadhav@t-systems.com>
15 files changed:
bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CreateAndActivatePnfResourceTest.java
bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/KafkaClientTestImpl.java [moved from bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java with 93% similarity]
bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscription.java [moved from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscription.java with 82% similarity]
bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClient.java [moved from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java with 84% similarity]
bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java
bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationId.java [moved from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java with 98% similarity]
bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/KafkaClient.java [moved from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java with 93% similarity]
bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClient.java [moved from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java with 85% similarity]
bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscriptionTest.java [moved from bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscriptionTest.java with 84% similarity]
bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClientTest.java [moved from bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClientTest.java with 81% similarity]
bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/KafkaClientTestImpl.java [moved from bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java with 94% similarity]
bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEventTest.java
bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationIdTest.java [moved from bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationIdTest.java with 98% similarity]
bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClientTest.java [moved from bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java with 92% similarity]
bpmn/so-bpmn-tasks/src/main/java/org/onap/so/bpmn/infrastructure/workflow/tasks/ebb/loader/PnfEBBLoader.java

index 0c001b1..32ec69e 100644 (file)
@@ -49,7 +49,7 @@ public class CreateAndActivatePnfResourceTest extends BaseIntegrationTest {
     private PnfManagementTestImpl pnfManagementTest;
 
     @Autowired
-    private DmaapClientTestImpl dmaapClientTestImpl;
+    private KafkaClientTestImpl kafkaClientTestImpl;
 
     @Before
     public void setup() {
@@ -60,7 +60,7 @@ public class CreateAndActivatePnfResourceTest extends BaseIntegrationTest {
     }
 
     @Test
-    public void shouldWaitForMessageFromDmaapAndUpdateAaiEntryWhenAaiEntryExists() {
+    public void shouldWaitForMessageFromKafkaAndUpdateAaiEntryWhenAaiEntryExists() {
         // given
         variables.put(PNF_CORRELATION_ID, PnfManagementTestImpl.ID_WITH_ENTRY);
         ResourceInput ri = getUpdateResInputObj("OLT");
@@ -72,19 +72,19 @@ public class CreateAndActivatePnfResourceTest extends BaseIntegrationTest {
         // when
         ProcessInstance instance =
                 runtimeService.startProcessInstanceByKey("CreateAndActivatePnfResource", "businessKey", variables);
-        assertThat(instance).isWaitingAt("WaitForDmaapPnfReadyNotification").isWaitingFor("WorkflowMessage");
-        dmaapClientTestImpl.sendMessage();
+        assertThat(instance).isWaitingAt("WaitForKafkaPnfReadyNotification").isWaitingFor("WorkflowMessage");
+        kafkaClientTestImpl.sendMessage();
 
         // then
         assertThat(instance).isEnded().hasPassedInOrder("CreateAndActivatePnf_StartEvent", "CheckInputs",
-                "CheckAiiForPnfCorrelationId", "DoesAaiContainInfoAboutPnf", "AaiEntryExists", "InformDmaapClient",
-                "WaitForDmaapPnfReadyNotification", "CreateRelationId", "AaiEntryUpdated");
+                "CheckAiiForPnfCorrelationId", "DoesAaiContainInfoAboutPnf", "AaiEntryExists", "InformKafkaClient",
+                "WaitForKafkaPnfReadyNotification", "CreateRelationId", "AaiEntryUpdated");
         Assertions.assertThat(pnfManagementTest.getServiceAndPnfRelationMap())
                 .containsOnly(MapEntry.entry(SERVICE_INSTANCE_ID, PnfManagementTestImpl.ID_WITH_ENTRY));
     }
 
     @Test
-    public void shouldCreateAaiEntryWaitForMessageFromDmaapAndUpdateAaiEntryWhenNoAaiEntryExists() {
+    public void shouldCreateAaiEntryWaitForMessageFromKafkaAndUpdateAaiEntryWhenNoAaiEntryExists() {
         // given
         variables.put(PNF_CORRELATION_ID, PnfManagementTestImpl.ID_WITHOUT_ENTRY);
         ResourceInput ri = getUpdateResInputObj("OLT");
@@ -96,13 +96,13 @@ public class CreateAndActivatePnfResourceTest extends BaseIntegrationTest {
         // when
         ProcessInstance instance =
                 runtimeService.startProcessInstanceByKey("CreateAndActivatePnfResource", "businessKey", variables);
-        assertThat(instance).isWaitingAt("WaitForDmaapPnfReadyNotification").isWaitingFor("WorkflowMessage");
-        dmaapClientTestImpl.sendMessage();
+        assertThat(instance).isWaitingAt("WaitForKafkaPnfReadyNotification").isWaitingFor("WorkflowMessage");
+        kafkaClientTestImpl.sendMessage();
 
         // then
         assertThat(instance).isEnded().hasPassedInOrder("CreateAndActivatePnf_StartEvent", "CheckInputs",
                 "CheckAiiForPnfCorrelationId", "DoesAaiContainInfoAboutPnf", "CreatePnfEntryInAai", "AaiEntryExists",
-                "InformDmaapClient", "WaitForDmaapPnfReadyNotification", "CreateRelationId", "AaiEntryUpdated");
+                "InformKafkaClient", "WaitForKafkaPnfReadyNotification", "CreateRelationId", "AaiEntryUpdated");
         Assertions.assertThat(pnfManagementTest.getCreated()).containsOnlyKeys(PnfManagementTestImpl.ID_WITHOUT_ENTRY);
         Assertions.assertThat(pnfManagementTest.getServiceAndPnfRelationMap())
                 .containsOnly(MapEntry.entry(SERVICE_INSTANCE_ID, PnfManagementTestImpl.ID_WITHOUT_ENTRY));
 
 package org.onap.so.bpmn.infrastructure.pnf.delegate;
 
-import java.util.Map;
 import java.util.Objects;
-import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient;
+import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient;
 import org.springframework.context.annotation.Primary;
 import org.springframework.stereotype.Component;
 
 @Component
 @Primary
-public class DmaapClientTestImpl implements DmaapClient {
+public class KafkaClientTestImpl implements KafkaClient {
 
     private String pnfCorrelationId;
     private Runnable informConsumer;
@@ -22,23 +22,23 @@ package org.onap.so.bpmn.infrastructure.pnf.delegate;
 
 import org.camunda.bpm.engine.delegate.DelegateExecution;
 import org.camunda.bpm.engine.delegate.JavaDelegate;
-import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient;
+import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 @Component
-public class CancelDmaapSubscription implements JavaDelegate {
+public class CancelKafkaSubscription implements JavaDelegate {
 
-    private DmaapClient dmaapClient;
+    private KafkaClient kafkaClient;
 
     @Override
     public void execute(DelegateExecution execution) {
         String pnfCorrelationId = (String) execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID);
-        dmaapClient.unregister(pnfCorrelationId);
+        kafkaClient.unregister(pnfCorrelationId);
     }
 
     @Autowired
-    public void setDmaapClient(DmaapClient dmaapClient) {
-        this.dmaapClient = dmaapClient;
+    public void setKafkaClient(KafkaClient kafkaClient) {
+        this.kafkaClient = kafkaClient;
     }
 }
@@ -23,25 +23,25 @@ package org.onap.so.bpmn.infrastructure.pnf.delegate;
 import org.camunda.bpm.engine.RuntimeService;
 import org.camunda.bpm.engine.delegate.DelegateExecution;
 import org.camunda.bpm.engine.delegate.JavaDelegate;
-import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient;
+import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 @Component
-public class InformDmaapClient implements JavaDelegate {
+public class InformKafkaClient implements JavaDelegate {
 
-    private DmaapClient dmaapClient;
+    private KafkaClient kafkaClient;
 
     @Override
     public void execute(DelegateExecution execution) {
         String pnfCorrelationId = (String) execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID);
         RuntimeService runtimeService = execution.getProcessEngineServices().getRuntimeService();
-        dmaapClient.registerForUpdate(pnfCorrelationId, () -> runtimeService.createMessageCorrelation("WorkflowMessage")
+        kafkaClient.registerForUpdate(pnfCorrelationId, () -> runtimeService.createMessageCorrelation("WorkflowMessage")
                 .processInstanceBusinessKey(execution.getProcessBusinessKey()).correlateWithResult());
     }
 
     @Autowired
-    public void setDmaapClient(DmaapClient dmaapClient) {
-        this.dmaapClient = dmaapClient;
+    public void setKafkaClient(KafkaClient kafkaClient) {
+        this.kafkaClient = kafkaClient;
     }
 }
index 9e1b5d5..ef5da92 100644 (file)
@@ -26,7 +26,7 @@ import org.camunda.bpm.engine.RuntimeService;
 import org.camunda.bpm.engine.delegate.DelegateExecution;
 import org.camunda.bpm.engine.delegate.JavaDelegate;
 import org.onap.so.bpmn.common.BuildingBlockExecution;
-import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient;
+import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient;
 import org.onap.so.bpmn.servicedecomposition.bbobjects.Pnf;
 import org.onap.so.bpmn.servicedecomposition.entities.ResourceKey;
 import org.onap.so.bpmn.servicedecomposition.tasks.ExtractPojosForBB;
@@ -45,19 +45,19 @@ import org.springframework.stereotype.Component;
 public class RegisterForPnfReadyEvent implements JavaDelegate {
 
     private static final String ERROR_MESSAGE_PNF_NOT_FOUND =
-            "pnf resource not found in buildingBlockExecution while registering to dmaap listener";
+            "pnf resource not found in buildingBlockExecution while registering to kafka listener";
     private static final Logger LOGGER = LoggerFactory.getLogger(RegisterForPnfReadyEvent.class);
 
-    private DmaapClient dmaapClient;
+    private KafkaClient kafkaClient;
     private ExtractPojosForBB extractPojosForBB;
     private ExceptionBuilder exceptionBuilder;
     private String pnfEntryNotificationTimeout;
 
     @Autowired
-    public RegisterForPnfReadyEvent(DmaapClient dmaapClient, ExtractPojosForBB extractPojosForBB,
+    public RegisterForPnfReadyEvent(KafkaClient kafkaClient, ExtractPojosForBB extractPojosForBB,
             ExceptionBuilder exceptionBuilder,
             @Value("${aai.pnfEntryNotificationTimeout}") String pnfEntryNotificationTimeout) {
-        this.dmaapClient = dmaapClient;
+        this.kafkaClient = kafkaClient;
         this.extractPojosForBB = extractPojosForBB;
         this.exceptionBuilder = exceptionBuilder;
         this.pnfEntryNotificationTimeout = pnfEntryNotificationTimeout;
@@ -69,7 +69,7 @@ public class RegisterForPnfReadyEvent implements JavaDelegate {
             String pnfName = getPnfName(execution);
             fillExecution(execution, pnfName);
             RuntimeService runtimeService = execution.getProcessEngineServices().getRuntimeService();
-            dmaapClient.registerForUpdate(pnfName, () -> runtimeService.createMessageCorrelation("WorkflowMessage")
+            kafkaClient.registerForUpdate(pnfName, () -> runtimeService.createMessageCorrelation("WorkflowMessage")
                     .processInstanceId(execution.getProcessInstanceId()).correlateWithResult());
         } catch (BBObjectNotFoundException e) {
             LOGGER.error(ERROR_MESSAGE_PNF_NOT_FOUND);
@@ -19,9 +19,9 @@
  * ============LICENSE_END=========================================================
  */
 
-package org.onap.so.bpmn.infrastructure.pnf.dmaap;
+package org.onap.so.bpmn.infrastructure.pnf.kafka;
 
-public interface DmaapClient {
+public interface KafkaClient {
 
     void registerForUpdate(String pnfCorrelationId, Runnable informConsumer);
 
@@ -19,7 +19,7 @@
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.so.bpmn.infrastructure.pnf.dmaap;
+package org.onap.so.bpmn.infrastructure.pnf.kafka;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -36,12 +36,12 @@ import org.springframework.core.env.Environment;
 import org.springframework.stereotype.Component;
 
 @Component
-public class PnfEventReadyDmaapClient implements DmaapClient {
-    private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyDmaapClient.class);
+public class PnfEventReadyKafkaClient implements KafkaClient {
+    private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyKafkaClient.class);
     private Map<String, Runnable> pnfCorrelationIdToThreadMap;
     private int topicListenerDelayInSeconds;
     private volatile ScheduledThreadPoolExecutor executor;
-    private volatile boolean dmaapThreadListenerIsRunning;
+    private volatile boolean kafkaThreadListenerIsRunning;
     private KafkaConsumerImpl consumerForPnfReady;
     private KafkaConsumerImpl consumerForPnfUpdate;
     private String pnfReadyTopic;
@@ -53,9 +53,9 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
 
 
     @Autowired
-    public PnfEventReadyDmaapClient(Environment env) throws IOException {
+    public PnfEventReadyKafkaClient(Environment env) throws IOException {
         pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>();
-        topicListenerDelayInSeconds = env.getProperty("pnf.dmaap.topicListenerDelayInSeconds", Integer.class);
+        topicListenerDelayInSeconds = env.getProperty("pnf.kafka.topicListenerDelayInSeconds", Integer.class);
         executor = null;
         try {
             consumerForPnfReady = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers"));
@@ -75,8 +75,8 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
     public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) {
         logger.debug("registering for pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId);
         pnfCorrelationIdToThreadMap.put(pnfCorrelationId, informConsumer);
-        if (!dmaapThreadListenerIsRunning) {
-            startDmaapThreadListener();
+        if (!kafkaThreadListenerIsRunning) {
+            startKafkaThreadListener();
         }
     }
 
@@ -87,31 +87,31 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
         if (pnfCorrelationIdToThreadMap.isEmpty()) {
             consumerForPnfUpdate.close();
             consumerForPnfReady.close();
-            stopDmaapThreadListener();
+            stopKafkaThreadListener();
         }
         return runnable;
     }
 
-    private synchronized void startDmaapThreadListener() {
-        if (!dmaapThreadListenerIsRunning) {
+    private synchronized void startKafkaThreadListener() {
+        if (!kafkaThreadListenerIsRunning) {
             executor = new ScheduledThreadPoolExecutor(1);
             executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
             executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
-            executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0, topicListenerDelayInSeconds,
+            executor.scheduleWithFixedDelay(new KafkaTopicListenerThread(), 0, topicListenerDelayInSeconds,
                     TimeUnit.SECONDS);
-            dmaapThreadListenerIsRunning = true;
+            kafkaThreadListenerIsRunning = true;
         }
     }
 
-    private synchronized void stopDmaapThreadListener() {
-        if (dmaapThreadListenerIsRunning) {
+    private synchronized void stopKafkaThreadListener() {
+        if (kafkaThreadListenerIsRunning) {
             executor.shutdown();
-            dmaapThreadListenerIsRunning = false;
+            kafkaThreadListenerIsRunning = false;
             executor = null;
         }
     }
 
-    class DmaapTopicListenerThread implements Runnable {
+    class KafkaTopicListenerThread implements Runnable {
         @Override
         public void run() {
             try {
@@ -141,7 +141,7 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
         private void informAboutPnfReadyIfPnfCorrelationIdFound(String pnfCorrelationId) {
             Runnable runnable = unregister(pnfCorrelationId);
             if (runnable != null) {
-                logger.debug("dmaap listener gets pnf ready event for pnfCorrelationId: {}", pnfCorrelationId);
+                logger.debug("kafka listener gets pnf ready event for pnfCorrelationId: {}", pnfCorrelationId);
                 runnable.run();
             }
         }
@@ -27,25 +27,25 @@ import static org.mockito.Mockito.when;
 import org.camunda.bpm.engine.delegate.DelegateExecution;
 import org.junit.Test;
 
-public class CancelDmaapSubscriptionTest {
+public class CancelKafkaSubscriptionTest {
 
     private static final String TEST_PNF_CORRELATION_ID = "testPnfCorrelationId";
 
     @Test
     public void shouldCancelSubscription() {
         // given
-        CancelDmaapSubscription delegate = new CancelDmaapSubscription();
-        DmaapClientTestImpl dmaapClientTest = new DmaapClientTestImpl();
-        delegate.setDmaapClient(dmaapClientTest);
+        CancelKafkaSubscription delegate = new CancelKafkaSubscription();
+        KafkaClientTestImpl kafkaClientTest = new KafkaClientTestImpl();
+        delegate.setKafkaClient(kafkaClientTest);
         DelegateExecution delegateExecution = mock(DelegateExecution.class);
         when(delegateExecution.getVariable(eq(ExecutionVariableNames.PNF_CORRELATION_ID)))
                 .thenReturn(TEST_PNF_CORRELATION_ID);
         when(delegateExecution.getProcessBusinessKey()).thenReturn("testBusinessKey");
-        dmaapClientTest.registerForUpdate("testPnfCorrelationId", () -> {
+        kafkaClientTest.registerForUpdate("testPnfCorrelationId", () -> {
         });
         // when
         delegate.execute(delegateExecution);
         // then
-        assertThat(dmaapClientTest.haveRegisteredConsumer()).isFalse();
+        assertThat(kafkaClientTest.haveRegisteredConsumer()).isFalse();
     }
 }
@@ -35,40 +35,40 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.InOrder;
 
-public class InformDmaapClientTest {
+public class InformKafkaClientTest {
     @Before
     public void setUp() {
-        informDmaapClient = new InformDmaapClient();
-        dmaapClientTest = new DmaapClientTestImpl();
-        informDmaapClient.setDmaapClient(dmaapClientTest);
+        informKafkaClient = new InformKafkaClient();
+        kafkaClientTest = new KafkaClientTestImpl();
+        informKafkaClient.setKafkaClient(kafkaClientTest);
         delegateExecution = mockDelegateExecution();
     }
 
-    private InformDmaapClient informDmaapClient;
+    private InformKafkaClient informKafkaClient;
 
-    private DmaapClientTestImpl dmaapClientTest;
+    private KafkaClientTestImpl kafkaClientTest;
 
     private DelegateExecution delegateExecution;
 
     private MessageCorrelationBuilder messageCorrelationBuilder;
 
     @Test
-    public void shouldSendListenerToDmaapClient() {
+    public void shouldSendListenerToKafkaClient() {
         // when
-        informDmaapClient.execute(delegateExecution);
+        informKafkaClient.execute(delegateExecution);
         // then
-        assertThat(dmaapClientTest.getPnfCorrelationId()).isEqualTo("testPnfCorrelationId");
-        assertThat(dmaapClientTest.getInformConsumer()).isNotNull();
+        assertThat(kafkaClientTest.getPnfCorrelationId()).isEqualTo("testPnfCorrelationId");
+        assertThat(kafkaClientTest.getInformConsumer()).isNotNull();
         verifyZeroInteractions(messageCorrelationBuilder);
     }
 
     @Test
-    public void shouldSendListenerToDmaapClientAndSendMessageToCamunda() {
+    public void shouldSendListenerToKafkaClientAndSendMessageToCamunda() {
         // when
-        informDmaapClient.execute(delegateExecution);
-        dmaapClientTest.getInformConsumer().run();
+        informKafkaClient.execute(delegateExecution);
+        kafkaClientTest.getInformConsumer().run();
         // then
-        assertThat(dmaapClientTest.getPnfCorrelationId()).isEqualTo("testPnfCorrelationId");
+        assertThat(kafkaClientTest.getPnfCorrelationId()).isEqualTo("testPnfCorrelationId");
         InOrder inOrder = inOrder(messageCorrelationBuilder);
         inOrder.verify(messageCorrelationBuilder).processInstanceBusinessKey("testBusinessKey");
         inOrder.verify(messageCorrelationBuilder).correlateWithResult();
@@ -22,9 +22,9 @@
 package org.onap.so.bpmn.infrastructure.pnf.delegate;
 
 import java.util.Objects;
-import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient;
+import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient;
 
-public class DmaapClientTestImpl implements DmaapClient {
+public class KafkaClientTestImpl implements KafkaClient {
 
     private String pnfCorrelationId;
     private Runnable informConsumer;
index 7ec05fd..6422e5f 100644 (file)
@@ -29,7 +29,7 @@ public class RegisterForPnfReadyEventTest {
 
     private DelegateExecution delegateExecution;
     private ExtractPojosForBB extractPojosForBBMock;
-    private DmaapClientTestImpl dmaapClientTest;
+    private KafkaClientTestImpl kafkaClientTest;
     private MessageCorrelationBuilder messageCorrelationBuilder;
     private ExceptionBuilder exceptionBuilderMock;
     private BuildingBlockExecution buildingBlockExecution;
@@ -39,7 +39,7 @@ public class RegisterForPnfReadyEventTest {
     @Before
     public void init() {
         delegateExecution = prepareExecution();
-        dmaapClientTest = new DmaapClientTestImpl();
+        kafkaClientTest = new KafkaClientTestImpl();
         exceptionBuilderMock = mock(ExceptionBuilder.class);
         extractPojosForBBMock = mock(ExtractPojosForBB.class);
         buildingBlockExecution = new DelegateExecutionImpl(new HashMap<>());
@@ -47,9 +47,9 @@ public class RegisterForPnfReadyEventTest {
     }
 
     @Test
-    public void shouldRegisterForDmaapClient() throws BBObjectNotFoundException {
+    public void shouldRegisterForKafkaClient() throws BBObjectNotFoundException {
         // given
-        testedObject = new RegisterForPnfReadyEvent(dmaapClientTest, extractPojosForBBMock, exceptionBuilderMock,
+        testedObject = new RegisterForPnfReadyEvent(kafkaClientTest, extractPojosForBBMock, exceptionBuilderMock,
                 PNF_ENTRY_NOTIFICATION_TIMEOUT);
         Pnf pnf = new Pnf();
         pnf.setPnfName(PNF_NAME);
@@ -60,13 +60,13 @@ public class RegisterForPnfReadyEventTest {
         verify(delegateExecution).setVariable(ExecutionVariableNames.PNF_CORRELATION_ID, PNF_NAME);
         verify(delegateExecution).setVariable(ExecutionVariableNames.TIMEOUT_FOR_NOTIFICATION,
                 PNF_ENTRY_NOTIFICATION_TIMEOUT);
-        checkIfInformConsumerThreadIsRunProperly(dmaapClientTest);
+        checkIfInformConsumerThreadIsRunProperly(kafkaClientTest);
     }
 
     @Test
     public void pnfNotFoundInBBexecution_WorkflowExIsThrown() throws BBObjectNotFoundException {
         // given
-        testedObject = new RegisterForPnfReadyEvent(dmaapClientTest, extractPojosForBBMock, exceptionBuilderMock,
+        testedObject = new RegisterForPnfReadyEvent(kafkaClientTest, extractPojosForBBMock, exceptionBuilderMock,
                 PNF_ENTRY_NOTIFICATION_TIMEOUT);
         when(extractPojosForBBMock.extractByKey(buildingBlockExecution, ResourceKey.PNF))
                 .thenThrow(BBObjectNotFoundException.class);
@@ -74,13 +74,13 @@ public class RegisterForPnfReadyEventTest {
         testedObject.execute(delegateExecution);
         // then
         verify(exceptionBuilderMock).buildAndThrowWorkflowException(delegateExecution, 7000,
-                "pnf resource not found in buildingBlockExecution while registering to dmaap listener");
+                "pnf resource not found in buildingBlockExecution while registering to kafka listener");
     }
 
     @Test
     public void pnfNameIsNull_WorkflowExIsThrown() throws BBObjectNotFoundException {
         // given
-        testedObject = new RegisterForPnfReadyEvent(dmaapClientTest, extractPojosForBBMock, exceptionBuilderMock,
+        testedObject = new RegisterForPnfReadyEvent(kafkaClientTest, extractPojosForBBMock, exceptionBuilderMock,
                 PNF_ENTRY_NOTIFICATION_TIMEOUT);
         when(extractPojosForBBMock.extractByKey(buildingBlockExecution, ResourceKey.PNF)).thenReturn(new Pnf());
         // when
@@ -92,7 +92,7 @@ public class RegisterForPnfReadyEventTest {
     @Test
     public void pnfEventNotificationTimeoutNotSet_WorkflowExIsThrown() throws BBObjectNotFoundException {
         // given
-        testedObject = new RegisterForPnfReadyEvent(dmaapClientTest, extractPojosForBBMock, exceptionBuilderMock, null);
+        testedObject = new RegisterForPnfReadyEvent(kafkaClientTest, extractPojosForBBMock, exceptionBuilderMock, null);
         when(extractPojosForBBMock.extractByKey(buildingBlockExecution, ResourceKey.PNF)).thenReturn(new Pnf());
         // when
         testedObject.execute(delegateExecution);
@@ -101,8 +101,8 @@ public class RegisterForPnfReadyEventTest {
                 "pnfEntryNotificationTimeout value not defined");
     }
 
-    private void checkIfInformConsumerThreadIsRunProperly(DmaapClientTestImpl dmaapClientTest) {
-        dmaapClientTest.getInformConsumer().run();
+    private void checkIfInformConsumerThreadIsRunProperly(KafkaClientTestImpl kafkaClientTest) {
+        kafkaClientTest.getInformConsumer().run();
         InOrder inOrder = inOrder(messageCorrelationBuilder);
         inOrder.verify(messageCorrelationBuilder).processInstanceId(PROCESS_INSTANCE_ID);
         inOrder.verify(messageCorrelationBuilder).correlateWithResult();
@@ -20,7 +20,7 @@
  * ============LICENSE_END=========================================================
  */
 
-package org.onap.so.bpmn.infrastructure.pnf.dmaap;
+package org.onap.so.bpmn.infrastructure.pnf.kafka;
 
 
 import static org.junit.Assert.assertEquals;
@@ -42,13 +42,13 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
-import org.onap.so.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient.DmaapTopicListenerThread;
+import org.onap.so.bpmn.infrastructure.pnf.kafka.PnfEventReadyKafkaClient.KafkaTopicListenerThread;
 import org.onap.so.client.kafka.KafkaConsumerImpl;
 import org.springframework.core.env.Environment;
 
 
 @RunWith(MockitoJUnitRunner.class)
-public class PnfEventReadyDmaapClientTest {
+public class PnfEventReadyKafkaClientTest {
     private static final String KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
     private static final String PNF_CORRELATION_ID = "corrTestId";
     private static final String PNF_CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId";
@@ -66,9 +66,9 @@ public class PnfEventReadyDmaapClientTest {
 
     @Mock
     private Environment env;
-    private PnfEventReadyDmaapClient testedObject;
+    private PnfEventReadyKafkaClient testedObject;
 
-    private DmaapTopicListenerThread testedObjectInnerClassThread;
+    private KafkaTopicListenerThread testedObjectInnerClassThread;
     private KafkaConsumerImpl kafkaConsumerMock;
     private Runnable threadMockToNotifyCamundaFlow;
     private ScheduledThreadPoolExecutor executorMock;
@@ -81,10 +81,10 @@ public class PnfEventReadyDmaapClientTest {
         when(env.getProperty(eq("pnf.kafka.consumerId"))).thenReturn(CONSUMER_ID);
         when(env.getProperty(eq("pnf.kafka.consumerIdUpdate"))).thenReturn(CONSUMER_ID_UPDATE);
         when(env.getProperty(eq("pnf.kafka.consumerGroup"))).thenReturn(CONSUMER_GROUP);
-        when(env.getProperty(eq("pnf.dmaap.topicListenerDelayInSeconds"), eq(Integer.class)))
+        when(env.getProperty(eq("pnf.kafka.topicListenerDelayInSeconds"), eq(Integer.class)))
                 .thenReturn(TOPIC_LISTENER_DELAY_IN_SECONDS);
-        testedObject = new PnfEventReadyDmaapClient(env);
-        testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread();
+        testedObject = new PnfEventReadyKafkaClient(env);
+        testedObjectInnerClassThread = testedObject.new KafkaTopicListenerThread();
         kafkaConsumerMock = mock(KafkaConsumerImpl.class);
         threadMockToNotifyCamundaFlow = mock(Runnable.class);
         executorMock = mock(ScheduledThreadPoolExecutor.class);
@@ -94,7 +94,7 @@ public class PnfEventReadyDmaapClientTest {
     /**
      * Test run method, where the are following conditions:
      * <p>
-     * - DmaapThreadListener is running, flag is set to true
+     * - KafkaThreadListener is running, flag is set to true
      * <p>
      * - map is filled with one entry with the key that we get from response
      * <p>
@@ -130,7 +130,7 @@ public class PnfEventReadyDmaapClientTest {
     /**
      * Test run method, where the are following conditions:
      * <p>
-     * - DmaapThreadListener is running, flag is set to true
+     * - KafkaThreadListener is running, flag is set to true
      * <p>
      * - map is filled with one entry with the pnfCorrelationId that does not match to pnfCorrelationId taken from http
      * response. run method should not do anything with the map not run any thread to notify camunda process
@@ -147,7 +147,7 @@ public class PnfEventReadyDmaapClientTest {
     /**
      * Test run method, where the are following conditions:
      * <p>
-     * - DmaapThreadListener is running, flag is set to true
+     * - KafkaThreadListener is running, flag is set to true
      * <p>
      * - map is filled with one entry with the pnfCorrelationId but no correlation id is taken from HttpResponse run
      * method should not do anything with the map and not run any thread to notify camunda process
@@ -182,7 +182,7 @@ public class PnfEventReadyDmaapClientTest {
         pnfCorrelationToThreadMap.put(PNF_CORRELATION_ID, threadMockToNotifyCamundaFlow);
         pnfCorrelationToThreadMapField.set(testedObject, pnfCorrelationToThreadMap);
 
-        Field threadRunFlag = testedObject.getClass().getDeclaredField("dmaapThreadListenerIsRunning");
+        Field threadRunFlag = testedObject.getClass().getDeclaredField("kafkaThreadListenerIsRunning");
         threadRunFlag.setAccessible(true);
         threadRunFlag.set(testedObject, true);
         threadRunFlag.setAccessible(false);
index 761219c..a440932 100644 (file)
@@ -82,6 +82,7 @@ public class PnfEBBLoader {
                 aaiResourceIds.add(new Pair<>(WorkflowType.PNF, pnf.getPnfId()));
                 Resource pnfResource = new Resource(WorkflowType.PNF, pnf.getPnfId(), false, serviceResource);
                 org.onap.aai.domain.yang.Pnf aaiPnf = bbInputSetupUtils.getAAIPnf(pnf.getPnfName());
+                pnfResource.setInstanceName(pnf.getPnfName());
                 pnfResource.setModelCustomizationId(aaiPnf.getModelCustomizationId());
                 pnfResource.setModelVersionId(aaiPnf.getModelVersionId());
                 resourceList.add(pnfResource);