From 0720a8a4e336d516ee00c515a392bb48a23404fd Mon Sep 17 00:00:00 2001 From: sushant53 Date: Thu, 29 Feb 2024 11:51:05 +0530 Subject: [PATCH] [SO] Code improvement in bpmn-infra supporting kafka change Code improvement in bpmn-infra supporting kafka change Issue-ID: SO-4122 Change-Id: I3924418d16f8f6d9270278f1894e224a216d1cf2 Signed-off-by: sushant53 --- .../delegate/CreateAndActivatePnfResourceTest.java | 20 ++++++------ ...lientTestImpl.java => KafkaClientTestImpl.java} | 5 ++- ...scription.java => CancelKafkaSubscription.java} | 12 ++++---- ...formDmaapClient.java => InformKafkaClient.java} | 12 ++++---- .../pnf/delegate/RegisterForPnfReadyEvent.java | 12 ++++---- .../JsonUtilForPnfCorrelationId.java | 2 +- .../DmaapClient.java => kafka/KafkaClient.java} | 4 +-- .../PnfEventReadyKafkaClient.java} | 36 +++++++++++----------- ...nTest.java => CancelKafkaSubscriptionTest.java} | 12 ++++---- ...pClientTest.java => InformKafkaClientTest.java} | 28 ++++++++--------- ...lientTestImpl.java => KafkaClientTestImpl.java} | 4 +-- .../pnf/delegate/RegisterForPnfReadyEventTest.java | 22 ++++++------- .../JsonUtilForPnfCorrelationIdTest.java | 2 +- .../PnfEventReadyKafkaClientTest.java} | 24 +++++++-------- .../workflow/tasks/ebb/loader/PnfEBBLoader.java | 1 + 15 files changed, 98 insertions(+), 98 deletions(-) rename bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/{DmaapClientTestImpl.java => KafkaClientTestImpl.java} (93%) rename bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/{CancelDmaapSubscription.java => CancelKafkaSubscription.java} (82%) rename bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/{InformDmaapClient.java => InformKafkaClient.java} (84%) rename bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/{dmaap => kafka}/JsonUtilForPnfCorrelationId.java (98%) rename bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/{dmaap/DmaapClient.java => kafka/KafkaClient.java} (93%) rename bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/{dmaap/PnfEventReadyDmaapClient.java => kafka/PnfEventReadyKafkaClient.java} (85%) rename bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/{CancelDmaapSubscriptionTest.java => CancelKafkaSubscriptionTest.java} (84%) rename bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/{InformDmaapClientTest.java => InformKafkaClientTest.java} (81%) rename bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/{DmaapClientTestImpl.java => KafkaClientTestImpl.java} (94%) rename bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/{dmaap => kafka}/JsonUtilForPnfCorrelationIdTest.java (98%) rename bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/{dmaap/PnfEventReadyDmaapClientTest.java => kafka/PnfEventReadyKafkaClientTest.java} (92%) diff --git a/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CreateAndActivatePnfResourceTest.java b/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CreateAndActivatePnfResourceTest.java index 0c001b1192..32ec69e5fa 100644 --- a/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CreateAndActivatePnfResourceTest.java +++ b/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CreateAndActivatePnfResourceTest.java @@ -49,7 +49,7 @@ public class CreateAndActivatePnfResourceTest extends BaseIntegrationTest { private PnfManagementTestImpl pnfManagementTest; @Autowired - private DmaapClientTestImpl dmaapClientTestImpl; + private KafkaClientTestImpl kafkaClientTestImpl; @Before public void setup() { @@ -60,7 +60,7 @@ public class CreateAndActivatePnfResourceTest extends BaseIntegrationTest { } @Test - public void shouldWaitForMessageFromDmaapAndUpdateAaiEntryWhenAaiEntryExists() { + public void shouldWaitForMessageFromKafkaAndUpdateAaiEntryWhenAaiEntryExists() { // given variables.put(PNF_CORRELATION_ID, PnfManagementTestImpl.ID_WITH_ENTRY); ResourceInput ri = getUpdateResInputObj("OLT"); @@ -72,19 +72,19 @@ public class CreateAndActivatePnfResourceTest extends BaseIntegrationTest { // when ProcessInstance instance = runtimeService.startProcessInstanceByKey("CreateAndActivatePnfResource", "businessKey", variables); - assertThat(instance).isWaitingAt("WaitForDmaapPnfReadyNotification").isWaitingFor("WorkflowMessage"); - dmaapClientTestImpl.sendMessage(); + assertThat(instance).isWaitingAt("WaitForKafkaPnfReadyNotification").isWaitingFor("WorkflowMessage"); + kafkaClientTestImpl.sendMessage(); // then assertThat(instance).isEnded().hasPassedInOrder("CreateAndActivatePnf_StartEvent", "CheckInputs", - "CheckAiiForPnfCorrelationId", "DoesAaiContainInfoAboutPnf", "AaiEntryExists", "InformDmaapClient", - "WaitForDmaapPnfReadyNotification", "CreateRelationId", "AaiEntryUpdated"); + "CheckAiiForPnfCorrelationId", "DoesAaiContainInfoAboutPnf", "AaiEntryExists", "InformKafkaClient", + "WaitForKafkaPnfReadyNotification", "CreateRelationId", "AaiEntryUpdated"); Assertions.assertThat(pnfManagementTest.getServiceAndPnfRelationMap()) .containsOnly(MapEntry.entry(SERVICE_INSTANCE_ID, PnfManagementTestImpl.ID_WITH_ENTRY)); } @Test - public void shouldCreateAaiEntryWaitForMessageFromDmaapAndUpdateAaiEntryWhenNoAaiEntryExists() { + public void shouldCreateAaiEntryWaitForMessageFromKafkaAndUpdateAaiEntryWhenNoAaiEntryExists() { // given variables.put(PNF_CORRELATION_ID, PnfManagementTestImpl.ID_WITHOUT_ENTRY); ResourceInput ri = getUpdateResInputObj("OLT"); @@ -96,13 +96,13 @@ public class CreateAndActivatePnfResourceTest extends BaseIntegrationTest { // when ProcessInstance instance = runtimeService.startProcessInstanceByKey("CreateAndActivatePnfResource", "businessKey", variables); - assertThat(instance).isWaitingAt("WaitForDmaapPnfReadyNotification").isWaitingFor("WorkflowMessage"); - dmaapClientTestImpl.sendMessage(); + assertThat(instance).isWaitingAt("WaitForKafkaPnfReadyNotification").isWaitingFor("WorkflowMessage"); + kafkaClientTestImpl.sendMessage(); // then assertThat(instance).isEnded().hasPassedInOrder("CreateAndActivatePnf_StartEvent", "CheckInputs", "CheckAiiForPnfCorrelationId", "DoesAaiContainInfoAboutPnf", "CreatePnfEntryInAai", "AaiEntryExists", - "InformDmaapClient", "WaitForDmaapPnfReadyNotification", "CreateRelationId", "AaiEntryUpdated"); + "InformKafkaClient", "WaitForKafkaPnfReadyNotification", "CreateRelationId", "AaiEntryUpdated"); Assertions.assertThat(pnfManagementTest.getCreated()).containsOnlyKeys(PnfManagementTestImpl.ID_WITHOUT_ENTRY); Assertions.assertThat(pnfManagementTest.getServiceAndPnfRelationMap()) .containsOnly(MapEntry.entry(SERVICE_INSTANCE_ID, PnfManagementTestImpl.ID_WITHOUT_ENTRY)); diff --git a/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java b/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/KafkaClientTestImpl.java similarity index 93% rename from bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java rename to bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/KafkaClientTestImpl.java index 43fbc59b3d..e4c1717752 100644 --- a/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java +++ b/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/KafkaClientTestImpl.java @@ -20,15 +20,14 @@ package org.onap.so.bpmn.infrastructure.pnf.delegate; -import java.util.Map; import java.util.Objects; -import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient; +import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Component; @Component @Primary -public class DmaapClientTestImpl implements DmaapClient { +public class KafkaClientTestImpl implements KafkaClient { private String pnfCorrelationId; private Runnable informConsumer; diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscription.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscription.java similarity index 82% rename from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscription.java rename to bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscription.java index 439591a295..d0a6e3a59f 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscription.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscription.java @@ -22,23 +22,23 @@ package org.onap.so.bpmn.infrastructure.pnf.delegate; import org.camunda.bpm.engine.delegate.DelegateExecution; import org.camunda.bpm.engine.delegate.JavaDelegate; -import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient; +import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component -public class CancelDmaapSubscription implements JavaDelegate { +public class CancelKafkaSubscription implements JavaDelegate { - private DmaapClient dmaapClient; + private KafkaClient kafkaClient; @Override public void execute(DelegateExecution execution) { String pnfCorrelationId = (String) execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID); - dmaapClient.unregister(pnfCorrelationId); + kafkaClient.unregister(pnfCorrelationId); } @Autowired - public void setDmaapClient(DmaapClient dmaapClient) { - this.dmaapClient = dmaapClient; + public void setKafkaClient(KafkaClient kafkaClient) { + this.kafkaClient = kafkaClient; } } diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClient.java similarity index 84% rename from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java rename to bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClient.java index 5cbd530a93..6506450c52 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClient.java @@ -23,25 +23,25 @@ package org.onap.so.bpmn.infrastructure.pnf.delegate; import org.camunda.bpm.engine.RuntimeService; import org.camunda.bpm.engine.delegate.DelegateExecution; import org.camunda.bpm.engine.delegate.JavaDelegate; -import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient; +import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component -public class InformDmaapClient implements JavaDelegate { +public class InformKafkaClient implements JavaDelegate { - private DmaapClient dmaapClient; + private KafkaClient kafkaClient; @Override public void execute(DelegateExecution execution) { String pnfCorrelationId = (String) execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID); RuntimeService runtimeService = execution.getProcessEngineServices().getRuntimeService(); - dmaapClient.registerForUpdate(pnfCorrelationId, () -> runtimeService.createMessageCorrelation("WorkflowMessage") + kafkaClient.registerForUpdate(pnfCorrelationId, () -> runtimeService.createMessageCorrelation("WorkflowMessage") .processInstanceBusinessKey(execution.getProcessBusinessKey()).correlateWithResult()); } @Autowired - public void setDmaapClient(DmaapClient dmaapClient) { - this.dmaapClient = dmaapClient; + public void setKafkaClient(KafkaClient kafkaClient) { + this.kafkaClient = kafkaClient; } } diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java index 9e1b5d5a0b..ef5da92e78 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java @@ -26,7 +26,7 @@ import org.camunda.bpm.engine.RuntimeService; import org.camunda.bpm.engine.delegate.DelegateExecution; import org.camunda.bpm.engine.delegate.JavaDelegate; import org.onap.so.bpmn.common.BuildingBlockExecution; -import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient; +import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient; import org.onap.so.bpmn.servicedecomposition.bbobjects.Pnf; import org.onap.so.bpmn.servicedecomposition.entities.ResourceKey; import org.onap.so.bpmn.servicedecomposition.tasks.ExtractPojosForBB; @@ -45,19 +45,19 @@ import org.springframework.stereotype.Component; public class RegisterForPnfReadyEvent implements JavaDelegate { private static final String ERROR_MESSAGE_PNF_NOT_FOUND = - "pnf resource not found in buildingBlockExecution while registering to dmaap listener"; + "pnf resource not found in buildingBlockExecution while registering to kafka listener"; private static final Logger LOGGER = LoggerFactory.getLogger(RegisterForPnfReadyEvent.class); - private DmaapClient dmaapClient; + private KafkaClient kafkaClient; private ExtractPojosForBB extractPojosForBB; private ExceptionBuilder exceptionBuilder; private String pnfEntryNotificationTimeout; @Autowired - public RegisterForPnfReadyEvent(DmaapClient dmaapClient, ExtractPojosForBB extractPojosForBB, + public RegisterForPnfReadyEvent(KafkaClient kafkaClient, ExtractPojosForBB extractPojosForBB, ExceptionBuilder exceptionBuilder, @Value("${aai.pnfEntryNotificationTimeout}") String pnfEntryNotificationTimeout) { - this.dmaapClient = dmaapClient; + this.kafkaClient = kafkaClient; this.extractPojosForBB = extractPojosForBB; this.exceptionBuilder = exceptionBuilder; this.pnfEntryNotificationTimeout = pnfEntryNotificationTimeout; @@ -69,7 +69,7 @@ public class RegisterForPnfReadyEvent implements JavaDelegate { String pnfName = getPnfName(execution); fillExecution(execution, pnfName); RuntimeService runtimeService = execution.getProcessEngineServices().getRuntimeService(); - dmaapClient.registerForUpdate(pnfName, () -> runtimeService.createMessageCorrelation("WorkflowMessage") + kafkaClient.registerForUpdate(pnfName, () -> runtimeService.createMessageCorrelation("WorkflowMessage") .processInstanceId(execution.getProcessInstanceId()).correlateWithResult()); } catch (BBObjectNotFoundException e) { LOGGER.error(ERROR_MESSAGE_PNF_NOT_FOUND); diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationId.java similarity index 98% rename from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java rename to bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationId.java index 9cb566f49b..3c65cbaad3 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationId.java @@ -21,7 +21,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.so.bpmn.infrastructure.pnf.dmaap; +package org.onap.so.bpmn.infrastructure.pnf.kafka; import com.google.gson.JsonArray; import com.google.gson.JsonElement; diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/KafkaClient.java similarity index 93% rename from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java rename to bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/KafkaClient.java index fd7eb153b6..941c565bca 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/KafkaClient.java @@ -19,9 +19,9 @@ * ============LICENSE_END========================================================= */ -package org.onap.so.bpmn.infrastructure.pnf.dmaap; +package org.onap.so.bpmn.infrastructure.pnf.kafka; -public interface DmaapClient { +public interface KafkaClient { void registerForUpdate(String pnfCorrelationId, Runnable informConsumer); diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClient.java similarity index 85% rename from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java rename to bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClient.java index 44b16dad28..0d3e0e0230 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClient.java @@ -19,7 +19,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.so.bpmn.infrastructure.pnf.dmaap; +package org.onap.so.bpmn.infrastructure.pnf.kafka; import java.io.IOException; import java.util.Collections; @@ -36,12 +36,12 @@ import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; @Component -public class PnfEventReadyDmaapClient implements DmaapClient { - private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyDmaapClient.class); +public class PnfEventReadyKafkaClient implements KafkaClient { + private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyKafkaClient.class); private Map pnfCorrelationIdToThreadMap; private int topicListenerDelayInSeconds; private volatile ScheduledThreadPoolExecutor executor; - private volatile boolean dmaapThreadListenerIsRunning; + private volatile boolean kafkaThreadListenerIsRunning; private KafkaConsumerImpl consumerForPnfReady; private KafkaConsumerImpl consumerForPnfUpdate; private String pnfReadyTopic; @@ -53,9 +53,9 @@ public class PnfEventReadyDmaapClient implements DmaapClient { @Autowired - public PnfEventReadyDmaapClient(Environment env) throws IOException { + public PnfEventReadyKafkaClient(Environment env) throws IOException { pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>(); - topicListenerDelayInSeconds = env.getProperty("pnf.dmaap.topicListenerDelayInSeconds", Integer.class); + topicListenerDelayInSeconds = env.getProperty("pnf.kafka.topicListenerDelayInSeconds", Integer.class); executor = null; try { consumerForPnfReady = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers")); @@ -75,8 +75,8 @@ public class PnfEventReadyDmaapClient implements DmaapClient { public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) { logger.debug("registering for pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId); pnfCorrelationIdToThreadMap.put(pnfCorrelationId, informConsumer); - if (!dmaapThreadListenerIsRunning) { - startDmaapThreadListener(); + if (!kafkaThreadListenerIsRunning) { + startKafkaThreadListener(); } } @@ -87,31 +87,31 @@ public class PnfEventReadyDmaapClient implements DmaapClient { if (pnfCorrelationIdToThreadMap.isEmpty()) { consumerForPnfUpdate.close(); consumerForPnfReady.close(); - stopDmaapThreadListener(); + stopKafkaThreadListener(); } return runnable; } - private synchronized void startDmaapThreadListener() { - if (!dmaapThreadListenerIsRunning) { + private synchronized void startKafkaThreadListener() { + if (!kafkaThreadListenerIsRunning) { executor = new ScheduledThreadPoolExecutor(1); executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0, topicListenerDelayInSeconds, + executor.scheduleWithFixedDelay(new KafkaTopicListenerThread(), 0, topicListenerDelayInSeconds, TimeUnit.SECONDS); - dmaapThreadListenerIsRunning = true; + kafkaThreadListenerIsRunning = true; } } - private synchronized void stopDmaapThreadListener() { - if (dmaapThreadListenerIsRunning) { + private synchronized void stopKafkaThreadListener() { + if (kafkaThreadListenerIsRunning) { executor.shutdown(); - dmaapThreadListenerIsRunning = false; + kafkaThreadListenerIsRunning = false; executor = null; } } - class DmaapTopicListenerThread implements Runnable { + class KafkaTopicListenerThread implements Runnable { @Override public void run() { try { @@ -141,7 +141,7 @@ public class PnfEventReadyDmaapClient implements DmaapClient { private void informAboutPnfReadyIfPnfCorrelationIdFound(String pnfCorrelationId) { Runnable runnable = unregister(pnfCorrelationId); if (runnable != null) { - logger.debug("dmaap listener gets pnf ready event for pnfCorrelationId: {}", pnfCorrelationId); + logger.debug("kafka listener gets pnf ready event for pnfCorrelationId: {}", pnfCorrelationId); runnable.run(); } } diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscriptionTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscriptionTest.java similarity index 84% rename from bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscriptionTest.java rename to bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscriptionTest.java index c2e87d57bf..6230dab3fa 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscriptionTest.java +++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscriptionTest.java @@ -27,25 +27,25 @@ import static org.mockito.Mockito.when; import org.camunda.bpm.engine.delegate.DelegateExecution; import org.junit.Test; -public class CancelDmaapSubscriptionTest { +public class CancelKafkaSubscriptionTest { private static final String TEST_PNF_CORRELATION_ID = "testPnfCorrelationId"; @Test public void shouldCancelSubscription() { // given - CancelDmaapSubscription delegate = new CancelDmaapSubscription(); - DmaapClientTestImpl dmaapClientTest = new DmaapClientTestImpl(); - delegate.setDmaapClient(dmaapClientTest); + CancelKafkaSubscription delegate = new CancelKafkaSubscription(); + KafkaClientTestImpl kafkaClientTest = new KafkaClientTestImpl(); + delegate.setKafkaClient(kafkaClientTest); DelegateExecution delegateExecution = mock(DelegateExecution.class); when(delegateExecution.getVariable(eq(ExecutionVariableNames.PNF_CORRELATION_ID))) .thenReturn(TEST_PNF_CORRELATION_ID); when(delegateExecution.getProcessBusinessKey()).thenReturn("testBusinessKey"); - dmaapClientTest.registerForUpdate("testPnfCorrelationId", () -> { + kafkaClientTest.registerForUpdate("testPnfCorrelationId", () -> { }); // when delegate.execute(delegateExecution); // then - assertThat(dmaapClientTest.haveRegisteredConsumer()).isFalse(); + assertThat(kafkaClientTest.haveRegisteredConsumer()).isFalse(); } } diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClientTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClientTest.java similarity index 81% rename from bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClientTest.java rename to bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClientTest.java index 94aa1427a4..d8a102f2b4 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClientTest.java +++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClientTest.java @@ -35,40 +35,40 @@ import org.junit.Before; import org.junit.Test; import org.mockito.InOrder; -public class InformDmaapClientTest { +public class InformKafkaClientTest { @Before public void setUp() { - informDmaapClient = new InformDmaapClient(); - dmaapClientTest = new DmaapClientTestImpl(); - informDmaapClient.setDmaapClient(dmaapClientTest); + informKafkaClient = new InformKafkaClient(); + kafkaClientTest = new KafkaClientTestImpl(); + informKafkaClient.setKafkaClient(kafkaClientTest); delegateExecution = mockDelegateExecution(); } - private InformDmaapClient informDmaapClient; + private InformKafkaClient informKafkaClient; - private DmaapClientTestImpl dmaapClientTest; + private KafkaClientTestImpl kafkaClientTest; private DelegateExecution delegateExecution; private MessageCorrelationBuilder messageCorrelationBuilder; @Test - public void shouldSendListenerToDmaapClient() { + public void shouldSendListenerToKafkaClient() { // when - informDmaapClient.execute(delegateExecution); + informKafkaClient.execute(delegateExecution); // then - assertThat(dmaapClientTest.getPnfCorrelationId()).isEqualTo("testPnfCorrelationId"); - assertThat(dmaapClientTest.getInformConsumer()).isNotNull(); + assertThat(kafkaClientTest.getPnfCorrelationId()).isEqualTo("testPnfCorrelationId"); + assertThat(kafkaClientTest.getInformConsumer()).isNotNull(); verifyZeroInteractions(messageCorrelationBuilder); } @Test - public void shouldSendListenerToDmaapClientAndSendMessageToCamunda() { + public void shouldSendListenerToKafkaClientAndSendMessageToCamunda() { // when - informDmaapClient.execute(delegateExecution); - dmaapClientTest.getInformConsumer().run(); + informKafkaClient.execute(delegateExecution); + kafkaClientTest.getInformConsumer().run(); // then - assertThat(dmaapClientTest.getPnfCorrelationId()).isEqualTo("testPnfCorrelationId"); + assertThat(kafkaClientTest.getPnfCorrelationId()).isEqualTo("testPnfCorrelationId"); InOrder inOrder = inOrder(messageCorrelationBuilder); inOrder.verify(messageCorrelationBuilder).processInstanceBusinessKey("testBusinessKey"); inOrder.verify(messageCorrelationBuilder).correlateWithResult(); diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/KafkaClientTestImpl.java similarity index 94% rename from bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java rename to bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/KafkaClientTestImpl.java index 0ec0ac8214..c374ba40c2 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java +++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/KafkaClientTestImpl.java @@ -22,9 +22,9 @@ package org.onap.so.bpmn.infrastructure.pnf.delegate; import java.util.Objects; -import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient; +import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient; -public class DmaapClientTestImpl implements DmaapClient { +public class KafkaClientTestImpl implements KafkaClient { private String pnfCorrelationId; private Runnable informConsumer; diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEventTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEventTest.java index 7ec05fda04..6422e5f88e 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEventTest.java +++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEventTest.java @@ -29,7 +29,7 @@ public class RegisterForPnfReadyEventTest { private DelegateExecution delegateExecution; private ExtractPojosForBB extractPojosForBBMock; - private DmaapClientTestImpl dmaapClientTest; + private KafkaClientTestImpl kafkaClientTest; private MessageCorrelationBuilder messageCorrelationBuilder; private ExceptionBuilder exceptionBuilderMock; private BuildingBlockExecution buildingBlockExecution; @@ -39,7 +39,7 @@ public class RegisterForPnfReadyEventTest { @Before public void init() { delegateExecution = prepareExecution(); - dmaapClientTest = new DmaapClientTestImpl(); + kafkaClientTest = new KafkaClientTestImpl(); exceptionBuilderMock = mock(ExceptionBuilder.class); extractPojosForBBMock = mock(ExtractPojosForBB.class); buildingBlockExecution = new DelegateExecutionImpl(new HashMap<>()); @@ -47,9 +47,9 @@ public class RegisterForPnfReadyEventTest { } @Test - public void shouldRegisterForDmaapClient() throws BBObjectNotFoundException { + public void shouldRegisterForKafkaClient() throws BBObjectNotFoundException { // given - testedObject = new RegisterForPnfReadyEvent(dmaapClientTest, extractPojosForBBMock, exceptionBuilderMock, + testedObject = new RegisterForPnfReadyEvent(kafkaClientTest, extractPojosForBBMock, exceptionBuilderMock, PNF_ENTRY_NOTIFICATION_TIMEOUT); Pnf pnf = new Pnf(); pnf.setPnfName(PNF_NAME); @@ -60,13 +60,13 @@ public class RegisterForPnfReadyEventTest { verify(delegateExecution).setVariable(ExecutionVariableNames.PNF_CORRELATION_ID, PNF_NAME); verify(delegateExecution).setVariable(ExecutionVariableNames.TIMEOUT_FOR_NOTIFICATION, PNF_ENTRY_NOTIFICATION_TIMEOUT); - checkIfInformConsumerThreadIsRunProperly(dmaapClientTest); + checkIfInformConsumerThreadIsRunProperly(kafkaClientTest); } @Test public void pnfNotFoundInBBexecution_WorkflowExIsThrown() throws BBObjectNotFoundException { // given - testedObject = new RegisterForPnfReadyEvent(dmaapClientTest, extractPojosForBBMock, exceptionBuilderMock, + testedObject = new RegisterForPnfReadyEvent(kafkaClientTest, extractPojosForBBMock, exceptionBuilderMock, PNF_ENTRY_NOTIFICATION_TIMEOUT); when(extractPojosForBBMock.extractByKey(buildingBlockExecution, ResourceKey.PNF)) .thenThrow(BBObjectNotFoundException.class); @@ -74,13 +74,13 @@ public class RegisterForPnfReadyEventTest { testedObject.execute(delegateExecution); // then verify(exceptionBuilderMock).buildAndThrowWorkflowException(delegateExecution, 7000, - "pnf resource not found in buildingBlockExecution while registering to dmaap listener"); + "pnf resource not found in buildingBlockExecution while registering to kafka listener"); } @Test public void pnfNameIsNull_WorkflowExIsThrown() throws BBObjectNotFoundException { // given - testedObject = new RegisterForPnfReadyEvent(dmaapClientTest, extractPojosForBBMock, exceptionBuilderMock, + testedObject = new RegisterForPnfReadyEvent(kafkaClientTest, extractPojosForBBMock, exceptionBuilderMock, PNF_ENTRY_NOTIFICATION_TIMEOUT); when(extractPojosForBBMock.extractByKey(buildingBlockExecution, ResourceKey.PNF)).thenReturn(new Pnf()); // when @@ -92,7 +92,7 @@ public class RegisterForPnfReadyEventTest { @Test public void pnfEventNotificationTimeoutNotSet_WorkflowExIsThrown() throws BBObjectNotFoundException { // given - testedObject = new RegisterForPnfReadyEvent(dmaapClientTest, extractPojosForBBMock, exceptionBuilderMock, null); + testedObject = new RegisterForPnfReadyEvent(kafkaClientTest, extractPojosForBBMock, exceptionBuilderMock, null); when(extractPojosForBBMock.extractByKey(buildingBlockExecution, ResourceKey.PNF)).thenReturn(new Pnf()); // when testedObject.execute(delegateExecution); @@ -101,8 +101,8 @@ public class RegisterForPnfReadyEventTest { "pnfEntryNotificationTimeout value not defined"); } - private void checkIfInformConsumerThreadIsRunProperly(DmaapClientTestImpl dmaapClientTest) { - dmaapClientTest.getInformConsumer().run(); + private void checkIfInformConsumerThreadIsRunProperly(KafkaClientTestImpl kafkaClientTest) { + kafkaClientTest.getInformConsumer().run(); InOrder inOrder = inOrder(messageCorrelationBuilder); inOrder.verify(messageCorrelationBuilder).processInstanceId(PROCESS_INSTANCE_ID); inOrder.verify(messageCorrelationBuilder).correlateWithResult(); diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationIdTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationIdTest.java similarity index 98% rename from bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationIdTest.java rename to bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationIdTest.java index f9e4cb4c88..71d60305e4 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationIdTest.java +++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationIdTest.java @@ -20,7 +20,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.so.bpmn.infrastructure.pnf.dmaap; +package org.onap.so.bpmn.infrastructure.pnf.kafka; import static org.assertj.core.api.Assertions.assertThat; import java.util.ArrayList; diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClientTest.java similarity index 92% rename from bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java rename to bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClientTest.java index 546e644fbd..41887e3586 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java +++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClientTest.java @@ -20,7 +20,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.so.bpmn.infrastructure.pnf.dmaap; +package org.onap.so.bpmn.infrastructure.pnf.kafka; import static org.junit.Assert.assertEquals; @@ -42,13 +42,13 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import org.onap.so.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient.DmaapTopicListenerThread; +import org.onap.so.bpmn.infrastructure.pnf.kafka.PnfEventReadyKafkaClient.KafkaTopicListenerThread; import org.onap.so.client.kafka.KafkaConsumerImpl; import org.springframework.core.env.Environment; @RunWith(MockitoJUnitRunner.class) -public class PnfEventReadyDmaapClientTest { +public class PnfEventReadyKafkaClientTest { private static final String KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"; private static final String PNF_CORRELATION_ID = "corrTestId"; private static final String PNF_CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId"; @@ -66,9 +66,9 @@ public class PnfEventReadyDmaapClientTest { @Mock private Environment env; - private PnfEventReadyDmaapClient testedObject; + private PnfEventReadyKafkaClient testedObject; - private DmaapTopicListenerThread testedObjectInnerClassThread; + private KafkaTopicListenerThread testedObjectInnerClassThread; private KafkaConsumerImpl kafkaConsumerMock; private Runnable threadMockToNotifyCamundaFlow; private ScheduledThreadPoolExecutor executorMock; @@ -81,10 +81,10 @@ public class PnfEventReadyDmaapClientTest { when(env.getProperty(eq("pnf.kafka.consumerId"))).thenReturn(CONSUMER_ID); when(env.getProperty(eq("pnf.kafka.consumerIdUpdate"))).thenReturn(CONSUMER_ID_UPDATE); when(env.getProperty(eq("pnf.kafka.consumerGroup"))).thenReturn(CONSUMER_GROUP); - when(env.getProperty(eq("pnf.dmaap.topicListenerDelayInSeconds"), eq(Integer.class))) + when(env.getProperty(eq("pnf.kafka.topicListenerDelayInSeconds"), eq(Integer.class))) .thenReturn(TOPIC_LISTENER_DELAY_IN_SECONDS); - testedObject = new PnfEventReadyDmaapClient(env); - testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread(); + testedObject = new PnfEventReadyKafkaClient(env); + testedObjectInnerClassThread = testedObject.new KafkaTopicListenerThread(); kafkaConsumerMock = mock(KafkaConsumerImpl.class); threadMockToNotifyCamundaFlow = mock(Runnable.class); executorMock = mock(ScheduledThreadPoolExecutor.class); @@ -94,7 +94,7 @@ public class PnfEventReadyDmaapClientTest { /** * Test run method, where the are following conditions: *

- * - DmaapThreadListener is running, flag is set to true + * - KafkaThreadListener is running, flag is set to true *

* - map is filled with one entry with the key that we get from response *

@@ -130,7 +130,7 @@ public class PnfEventReadyDmaapClientTest { /** * Test run method, where the are following conditions: *

- * - DmaapThreadListener is running, flag is set to true + * - KafkaThreadListener is running, flag is set to true *

* - map is filled with one entry with the pnfCorrelationId that does not match to pnfCorrelationId taken from http * response. run method should not do anything with the map not run any thread to notify camunda process @@ -147,7 +147,7 @@ public class PnfEventReadyDmaapClientTest { /** * Test run method, where the are following conditions: *

- * - DmaapThreadListener is running, flag is set to true + * - KafkaThreadListener is running, flag is set to true *

* - map is filled with one entry with the pnfCorrelationId but no correlation id is taken from HttpResponse run * method should not do anything with the map and not run any thread to notify camunda process @@ -182,7 +182,7 @@ public class PnfEventReadyDmaapClientTest { pnfCorrelationToThreadMap.put(PNF_CORRELATION_ID, threadMockToNotifyCamundaFlow); pnfCorrelationToThreadMapField.set(testedObject, pnfCorrelationToThreadMap); - Field threadRunFlag = testedObject.getClass().getDeclaredField("dmaapThreadListenerIsRunning"); + Field threadRunFlag = testedObject.getClass().getDeclaredField("kafkaThreadListenerIsRunning"); threadRunFlag.setAccessible(true); threadRunFlag.set(testedObject, true); threadRunFlag.setAccessible(false); diff --git a/bpmn/so-bpmn-tasks/src/main/java/org/onap/so/bpmn/infrastructure/workflow/tasks/ebb/loader/PnfEBBLoader.java b/bpmn/so-bpmn-tasks/src/main/java/org/onap/so/bpmn/infrastructure/workflow/tasks/ebb/loader/PnfEBBLoader.java index 761219c6be..a44093298d 100644 --- a/bpmn/so-bpmn-tasks/src/main/java/org/onap/so/bpmn/infrastructure/workflow/tasks/ebb/loader/PnfEBBLoader.java +++ b/bpmn/so-bpmn-tasks/src/main/java/org/onap/so/bpmn/infrastructure/workflow/tasks/ebb/loader/PnfEBBLoader.java @@ -82,6 +82,7 @@ public class PnfEBBLoader { aaiResourceIds.add(new Pair<>(WorkflowType.PNF, pnf.getPnfId())); Resource pnfResource = new Resource(WorkflowType.PNF, pnf.getPnfId(), false, serviceResource); org.onap.aai.domain.yang.Pnf aaiPnf = bbInputSetupUtils.getAAIPnf(pnf.getPnfName()); + pnfResource.setInstanceName(pnf.getPnfName()); pnfResource.setModelCustomizationId(aaiPnf.getModelCustomizationId()); pnfResource.setModelVersionId(aaiPnf.getModelVersionId()); resourceList.add(pnfResource); -- 2.16.6