From: emaclee Date: Thu, 27 Mar 2025 12:40:26 +0000 (+0000) Subject: Refactor Consumers/Producers based on agreed format #2 X-Git-Tag: 3.6.2~11^2 X-Git-Url: https://gerrit.onap.org/r/gitweb?a=commitdiff_plain;h=39b989709b0c51223d44bd1615c690b33b54e911;p=cps.git Refactor Consumers/Producers based on agreed format #2 - 'EventsPublisher' to 'EventsProducer' - terms 'publish' is replaced with 'send' to follow kafka methods - LcmEventsProducer is not fully changed as it may affect metrics; will handle on seperate patch Issue-ID: CPS-2597 Change-Id: I310fc60fd0ff85eb83f2f3c6f9b54c569b3ff902 Signed-off-by: emaclee --- diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy index 94c113c053..e934530d6f 100644 --- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy +++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2021 Pantheon.tech * Modifications Copyright (C) 2021 highstreet technologies GmbH - * Modifications Copyright (C) 2021-2025 OpenInfra Foundation Europe + * Modifications Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved. * Modifications Copyright (C) 2021-2022 Bell Canada. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -31,7 +31,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import groovy.json.JsonSlurper import org.mapstruct.factory.Mappers import org.onap.cps.TestUtils -import org.onap.cps.events.EventsPublisher +import org.onap.cps.events.EventsProducer import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl import org.onap.cps.ncmp.api.inventory.models.CompositeState import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle @@ -134,7 +134,7 @@ class NetworkCmProxyControllerSpec extends Specification { } def cleanup() { - ((Logger) LoggerFactory.getLogger(EventsPublisher.class)).detachAndStopAllAppenders() + ((Logger) LoggerFactory.getLogger(EventsProducer.class)).detachAndStopAllAppenders() } def 'Get Resource Data from pass-through operational.'() { diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/KafkaConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/KafkaConfig.java index 3d3c3db482..8475be6f6a 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/KafkaConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/KafkaConfig.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2023-2024 Nordix Foundation + * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,7 +47,7 @@ import org.springframework.kafka.support.serializer.JsonSerializer; /** * kafka Configuration for legacy and cloud events. * - * @param valid legacy event to be published over the wire. + * @param valid legacy event to be sent over the wire. */ @Configuration @EnableKafka diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java index 2d1f64802b..eca8380756 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (c) 2023-2024 Nordix Foundation. + * Copyright (c) 2023-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,7 @@ import io.cloudevents.CloudEvent; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.onap.cps.events.EventsPublisher; +import org.onap.cps.events.EventsProducer; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; @@ -43,7 +43,7 @@ public class CmAvcEventConsumer { @Value("${app.ncmp.avc.cm-events-topic}") private String cmEventsTopicName; - private final EventsPublisher eventsPublisher; + private final EventsProducer eventsProducer; /** * Incoming Cm AvcEvent in the form of Consumer Record, it will be forwarded as is to a target topic. @@ -58,6 +58,6 @@ public class CmAvcEventConsumer { final CloudEvent outgoingAvcEvent = cmAvcEventAsConsumerRecord.value(); final String outgoingAvcEventKey = cmAvcEventAsConsumerRecord.key(); log.debug("Consuming AVC event with key : {} and value : {}", outgoingAvcEventKey, outgoingAvcEvent); - eventsPublisher.publishCloudEvent(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent); + eventsProducer.sendCloudEvent(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventProducer.java index 232803a941..baa9926a40 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventProducer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventProducer.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2024-2025 Nordix Foundation + * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,7 +25,7 @@ import static org.onap.cps.ncmp.events.NcmpEventDataSchema.SUBSCRIPTIONS_V1; import io.cloudevents.CloudEvent; import java.util.Map; import lombok.RequiredArgsConstructor; -import org.onap.cps.events.EventsPublisher; +import org.onap.cps.events.EventsProducer; import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent; import org.onap.cps.ncmp.utils.events.NcmpEvent; import org.springframework.beans.factory.annotation.Value; @@ -37,22 +37,22 @@ import org.springframework.stereotype.Component; @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) public class DmiInEventProducer { - private final EventsPublisher eventsPublisher; + private final EventsProducer eventsProducer; @Value("${app.ncmp.avc.cm-subscription-dmi-in}") private String dmiInEventTopic; /** - * Publish the event to the provided dmi plugin with key as subscription id and the event is in Cloud Event format. + * Send the event to the provided dmi plugin with key as subscription id and the event is in Cloud Event format. * * @param subscriptionId Cm Subscription Id * @param dmiPluginName Dmi Plugin Name * @param eventType Type of event * @param dmiInEvent Cm Notification Subscription event for Dmi */ - public void publishDmiInEvent(final String subscriptionId, final String dmiPluginName, - final String eventType, final DmiInEvent dmiInEvent) { - eventsPublisher.publishCloudEvent(dmiInEventTopic, subscriptionId, + public void sendDmiInEvent(final String subscriptionId, final String dmiPluginName, + final String eventType, final DmiInEvent dmiInEvent) { + eventsProducer.sendCloudEvent(dmiInEventTopic, subscriptionId, buildAndGetDmiInEventAsCloudEvent(subscriptionId, dmiPluginName, eventType, dmiInEvent)); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java index 98c66afe30..d5e7106795 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2024 Nordix Foundation + * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -105,7 +105,7 @@ public class DmiOutEventConsumer { private void handleEventsStatusPerDmi(final String subscriptionId, final String eventType) { final Map dmiSubscriptionsPerDmi = dmiCacheHandler.get(subscriptionId); final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEvent(subscriptionId, dmiSubscriptionsPerDmi); - ncmpOutEventProducer.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false); + ncmpOutEventProducer.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false); } private boolean checkStatusCodeAndMessage(final NcmpResponseStatus ncmpResponseStatus, diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java index 1b368dde6c..f6ac0cf699 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2024 Nordix Foundation + * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -75,7 +75,7 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler { handleNewCmSubscription(subscriptionId); scheduleNcmpOutEventResponse(subscriptionId, "subscriptionCreateResponse"); } else { - rejectAndPublishCreateRequest(subscriptionId, predicates); + rejectAndSendCreateRequest(subscriptionId, predicates); } } @@ -87,7 +87,7 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler { getLastRemainingAndOverlappingSubscriptionsPerDmi(subscriptionDataNodes); dmiCacheHandler.add(subscriptionId, mergeDmiCmSubscriptionDetailsPerDmiMaps(dmiCmSubscriptionTuple)); if (dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi().isEmpty()) { - acceptAndPublishDeleteRequest(subscriptionId); + acceptAndSendDeleteRequest(subscriptionId); } else { sendSubscriptionDeleteRequestToDmi(subscriptionId, dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi( @@ -122,19 +122,19 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler { } private void scheduleNcmpOutEventResponse(final String subscriptionId, final String eventType) { - ncmpOutEventProducer.publishNcmpOutEvent(subscriptionId, eventType, null, true); + ncmpOutEventProducer.sendNcmpOutEvent(subscriptionId, eventType, null, true); } - private void rejectAndPublishCreateRequest(final String subscriptionId, final List predicates) { + private void rejectAndSendCreateRequest(final String subscriptionId, final List predicates) { final Set subscriptionTargetFilters = predicates.stream().flatMap(predicate -> predicate.getTargetFilter().stream()) .collect(Collectors.toSet()); final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEventForRejectedRequest(subscriptionId, new ArrayList<>(subscriptionTargetFilters)); - ncmpOutEventProducer.publishNcmpOutEvent(subscriptionId, "subscriptionCreateResponse", ncmpOutEvent, false); + ncmpOutEventProducer.sendNcmpOutEvent(subscriptionId, "subscriptionCreateResponse", ncmpOutEvent, false); } - private void acceptAndPublishDeleteRequest(final String subscriptionId) { + private void acceptAndSendDeleteRequest(final String subscriptionId) { final Set dmiServiceNames = dmiCacheHandler.get(subscriptionId).keySet(); for (final String dmiServiceName : dmiServiceNames) { dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiServiceName, @@ -143,7 +143,7 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler { } final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEvent(subscriptionId, dmiCacheHandler.get(subscriptionId)); - ncmpOutEventProducer.publishNcmpOutEvent(subscriptionId, "subscriptionDeleteResponse", ncmpOutEvent, + ncmpOutEventProducer.sendNcmpOutEvent(subscriptionId, "subscriptionDeleteResponse", ncmpOutEvent, false); } @@ -158,15 +158,15 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler { if (dmiCmSubscriptionPredicates.isEmpty()) { acceptAndPersistCmSubscriptionPerDmi(subscriptionId, dmiPluginName); } else { - publishDmiInEventPerDmi(subscriptionId, dmiPluginName, dmiCmSubscriptionPredicates); + sendDmiInEventPerDmi(subscriptionId, dmiPluginName, dmiCmSubscriptionPredicates); } }); } - private void publishDmiInEventPerDmi(final String subscriptionId, final String dmiPluginName, - final List dmiCmSubscriptionPredicates) { + private void sendDmiInEventPerDmi(final String subscriptionId, final String dmiPluginName, + final List dmiCmSubscriptionPredicates) { final DmiInEvent dmiInEvent = dmiInEventMapper.toDmiInEvent(dmiCmSubscriptionPredicates); - dmiInEventProducer.publishDmiInEvent(subscriptionId, dmiPluginName, + dmiInEventProducer.sendDmiInEvent(subscriptionId, dmiPluginName, "subscriptionCreateRequest", dmiInEvent); } @@ -183,7 +183,7 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler { final DmiInEvent dmiInEvent = dmiInEventMapper.toDmiInEvent( dmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates()); - dmiInEventProducer.publishDmiInEvent(subscriptionId, + dmiInEventProducer.sendDmiInEvent(subscriptionId, dmiPluginName, "subscriptionDeleteRequest", dmiInEvent); }); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducer.java index 8cfb3ad563..639fb65296 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducer.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2024-2025 Nordix Foundation + * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,7 +31,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.onap.cps.events.EventsPublisher; +import org.onap.cps.events.EventsProducer; import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler; import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent; import org.onap.cps.ncmp.utils.events.NcmpEvent; @@ -51,7 +51,7 @@ public class NcmpOutEventProducer { @Value("${ncmp.timers.subscription-forwarding.dmi-response-timeout-ms}") private Integer dmiOutEventTimeoutInMs; - private final EventsPublisher eventsPublisher; + private final EventsProducer eventsProducer; private final NcmpOutEventMapper ncmpOutEventMapper; private final DmiCacheHandler dmiCacheHandler; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); @@ -59,7 +59,7 @@ public class NcmpOutEventProducer { new ConcurrentHashMap<>(); /** - * Publish the event to the client who requested the subscription with key as subscription id and event is Cloud + * Send the event to the client who requested the subscription with key as subscription id and event is Cloud * Event compliant. * * @param subscriptionId Cm Subscription Id @@ -67,23 +67,23 @@ public class NcmpOutEventProducer { * @param ncmpOutEvent Cm Notification Subscription Event for the * client * @param isScheduledEvent Determines if the event is to be scheduled - * or published now + * or send now */ - public void publishNcmpOutEvent(final String subscriptionId, final String eventType, - final NcmpOutEvent ncmpOutEvent, final boolean isScheduledEvent) { + public void sendNcmpOutEvent(final String subscriptionId, final String eventType, + final NcmpOutEvent ncmpOutEvent, final boolean isScheduledEvent) { final String taskKey = subscriptionId.concat(eventType); if (isScheduledEvent && !scheduledTasksPerSubscriptionIdAndEventType.containsKey(taskKey)) { - final ScheduledFuture scheduledFuture = scheduleAndPublishNcmpOutEvent(subscriptionId, eventType); + final ScheduledFuture scheduledFuture = scheduleAndSendNcmpOutEvent(subscriptionId, eventType); scheduledTasksPerSubscriptionIdAndEventType.putIfAbsent(taskKey, scheduledFuture); log.debug("Scheduled the Cm Subscription Event for subscriptionId : {} and eventType : {}", subscriptionId, eventType); } else { cancelScheduledTask(taskKey); if (ncmpOutEvent != null) { - publishNcmpOutEventNow(subscriptionId, eventType, ncmpOutEvent); - log.debug("Published Cm Subscription Event on demand for subscriptionId : {} and eventType : {}", + sendNcmpOutEventNow(subscriptionId, eventType, ncmpOutEvent); + log.debug("Sent Cm Subscription Event on demand for subscriptionId : {} and eventType : {}", subscriptionId, eventType); } } @@ -109,9 +109,9 @@ public class NcmpOutEventProducer { .asCloudEvent(); } - private ScheduledFuture scheduleAndPublishNcmpOutEvent(final String subscriptionId, final String eventType) { + private ScheduledFuture scheduleAndSendNcmpOutEvent(final String subscriptionId, final String eventType) { final NcmpOutEventPublishingTask ncmpOutEventPublishingTask = - new NcmpOutEventPublishingTask(ncmpOutEventTopic, subscriptionId, eventType, eventsPublisher, + new NcmpOutEventPublishingTask(ncmpOutEventTopic, subscriptionId, eventType, eventsProducer, ncmpOutEventMapper, dmiCacheHandler); return scheduledExecutorService.schedule(ncmpOutEventPublishingTask, dmiOutEventTimeoutInMs, TimeUnit.MILLISECONDS); @@ -125,11 +125,11 @@ public class NcmpOutEventProducer { } } - private void publishNcmpOutEventNow(final String subscriptionId, final String eventType, - final NcmpOutEvent ncmpOutEvent) { + private void sendNcmpOutEventNow(final String subscriptionId, final String eventType, + final NcmpOutEvent ncmpOutEvent) { final CloudEvent ncmpOutEventAsCloudEvent = buildAndGetNcmpOutEventAsCloudEvent(subscriptionId, eventType, ncmpOutEvent); - eventsPublisher.publishCloudEvent(ncmpOutEventTopic, subscriptionId, ncmpOutEventAsCloudEvent); + eventsProducer.sendCloudEvent(ncmpOutEventTopic, subscriptionId, ncmpOutEventAsCloudEvent); dmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventPublishingTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventPublishingTask.java index e9d6d78429..80d7981db9 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventPublishingTask.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventPublishingTask.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2024-2025 Nordix Foundation + * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,7 @@ import io.cloudevents.CloudEvent; import java.util.Map; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.onap.cps.events.EventsPublisher; +import org.onap.cps.events.EventsProducer; import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler; import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails; import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent; @@ -38,12 +38,12 @@ public class NcmpOutEventPublishingTask implements Runnable { private final String topicName; private final String subscriptionId; private final String eventType; - private final EventsPublisher eventsPublisher; + private final EventsProducer eventsProducer; private final NcmpOutEventMapper ncmpOutEventMapper; private final DmiCacheHandler dmiCacheHandler; /** - * Delegating the responsibility of publishing NcmpOutEvent as a separate task which will + * Delegating the responsibility of sending NcmpOutEvent as a separate task which will * be called after a specified delay. */ @Override @@ -52,7 +52,7 @@ public class NcmpOutEventPublishingTask implements Runnable { dmiCacheHandler.get(subscriptionId); final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEvent(subscriptionId, dmiSubscriptionsPerDmi); - eventsPublisher.publishCloudEvent(topicName, subscriptionId, + eventsProducer.sendCloudEvent(topicName, subscriptionId, buildAndGetNcmpOutEventAsCloudEvent(subscriptionId, eventType, ncmpOutEvent)); dmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/DmiDataOperations.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/DmiDataOperations.java index 2a0d2f563c..0e9db3d17b 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/DmiDataOperations.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/DmiDataOperations.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2021-2025 Nordix Foundation + * Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved. * Modifications Copyright (C) 2022 Bell Canada * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -309,7 +309,7 @@ public class DmiDataOperations { cmHandleIdsPerResponseCodesPerOperation.add(dmiDataOperationRequestBody, Map.of(dmiClientRequestException.getNcmpResponseStatus(), cmHandleIds)); }); - DmiDataOperationsHelper.publishErrorMessageToClientTopic(topicName, requestId, + DmiDataOperationsHelper.sendErrorMessageToClientTopic(topicName, requestId, cmHandleIdsPerResponseCodesPerOperation); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumer.java index 6f368da2d0..22f20c8784 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumer.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2023-2024 Nordix Foundation + * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,7 +25,7 @@ import io.cloudevents.kafka.impl.KafkaHeaders; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.onap.cps.events.EventsPublisher; +import org.onap.cps.events.EventsProducer; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @@ -39,11 +39,11 @@ import org.springframework.stereotype.Component; @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) public class DataOperationEventConsumer { - private final EventsPublisher eventsPublisher; + private final EventsProducer eventsProducer; /** - * Consume the DataOperation cloud event published by producer to topic 'async-m2m.topic' - * and publish the same to client specified topic. + * Consume the DataOperation cloud event sent by producer to topic 'async-m2m.topic' + * and send the same to client specified topic. * * @param dataOperationEventConsumerRecord consuming event as a ConsumerRecord. */ @@ -52,12 +52,12 @@ public class DataOperationEventConsumer { filter = "includeDataOperationEventsOnly", groupId = "ncmp-data-operation-event-group", containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory") - public void consumeAndPublish(final ConsumerRecord dataOperationEventConsumerRecord) { + public void consumeAndSend(final ConsumerRecord dataOperationEventConsumerRecord) { log.debug("Consuming event payload {} ...", dataOperationEventConsumerRecord.value()); final String eventTarget = KafkaHeaders.getParsedKafkaHeader( dataOperationEventConsumerRecord.headers(), "ce_destination"); final String correlationId = KafkaHeaders.getParsedKafkaHeader( dataOperationEventConsumerRecord.headers(), "ce_correlationid"); - eventsPublisher.publishCloudEvent(eventTarget, correlationId, dataOperationEventConsumerRecord.value()); + eventsProducer.sendCloudEvent(eventTarget, correlationId, dataOperationEventConsumerRecord.value()); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DmiAsyncRequestResponseEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DmiAsyncRequestResponseEventConsumer.java index e2803e89a1..7caa280745 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DmiAsyncRequestResponseEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DmiAsyncRequestResponseEventConsumer.java @@ -22,7 +22,7 @@ package org.onap.cps.ncmp.impl.data.async; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.onap.cps.events.EventsPublisher; +import org.onap.cps.events.EventsProducer; import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent; import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -38,7 +38,7 @@ import org.springframework.stereotype.Component; @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) public class DmiAsyncRequestResponseEventConsumer { - private final EventsPublisher eventsPublisher; + private final EventsProducer eventsProducer; private final NcmpAsyncRequestResponseEventMapper ncmpAsyncRequestResponseEventMapper; /** @@ -55,7 +55,7 @@ public class DmiAsyncRequestResponseEventConsumer { log.debug("Consuming event {} ...", dmiAsyncRequestResponseEvent); final NcmpAsyncRequestResponseEvent ncmpAsyncRequestResponseEvent = ncmpAsyncRequestResponseEventMapper.toNcmpAsyncEvent(dmiAsyncRequestResponseEvent); - eventsPublisher.publishEvent(ncmpAsyncRequestResponseEvent.getEventTarget(), + eventsProducer.sendEvent(ncmpAsyncRequestResponseEvent.getEventTarget(), ncmpAsyncRequestResponseEvent.getEventId(), ncmpAsyncRequestResponseEvent); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelper.java index cb435f4a84..ee3f6fec70 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelper.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelper.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2023-2024 Nordix Foundation + * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,7 @@ import java.util.Set; import lombok.AccessLevel; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.onap.cps.events.EventsPublisher; +import org.onap.cps.events.EventsProducer; import org.onap.cps.ncmp.api.NcmpResponseStatus; import org.onap.cps.ncmp.api.data.models.DataOperationDefinition; import org.onap.cps.ncmp.api.data.models.DataOperationRequest; @@ -114,7 +114,7 @@ public class DmiDataOperationsHelper { DmiDataOperation.buildDmiDataOperationRequestBodyWithoutCmHandles(dataOperationDefinitionIn), CM_HANDLES_NOT_READY, nonReadyCmHandleReferences); } - publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleReferencesPerResponseCodesPerOperation); + sendErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleReferencesPerResponseCodesPerOperation); return dmiDataOperationsOutPerDmiServiceName; } @@ -127,24 +127,24 @@ public class DmiDataOperationsHelper { } /** - * Creates data operation cloud event and publish it to client topic. + * Creates data operation cloud event and sends it to client topic. * * @param clientTopic client given topic * @param requestId unique identifier per request * @param cmHandleIdsPerResponseCodesPerOperation list of cm handle ids per operation with response code */ - public static void publishErrorMessageToClientTopic(final String clientTopic, - final String requestId, - final MultiValueMap>> cmHandleIdsPerResponseCodesPerOperation) { if (!cmHandleIdsPerResponseCodesPerOperation.isEmpty()) { final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic, requestId, cmHandleIdsPerResponseCodesPerOperation); - final EventsPublisher eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class); - log.warn("publishing error message to client topic: {} ,requestId: {}, data operation cloud event id: {}", + final EventsProducer eventsProducer = CpsApplicationContext.getCpsBean(EventsProducer.class); + log.warn("sending error message to client topic: {} ,requestId: {}, data operation cloud event id: {}", clientTopic, requestId, dataOperationCloudEvent.getId()); - eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent); + eventsProducer.sendCloudEvent(clientTopic, requestId, dataOperationCloudEvent); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducer.java index d62688de1d..2ed407fbfc 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducer.java @@ -28,7 +28,7 @@ import java.util.List; import java.util.Map; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.onap.cps.events.EventsPublisher; +import org.onap.cps.events.EventsProducer; import org.onap.cps.ncmp.events.lcm.v1.LcmEvent; import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader; import org.onap.cps.ncmp.events.lcm.v1.Values; @@ -38,7 +38,7 @@ import org.springframework.kafka.KafkaException; import org.springframework.stereotype.Service; /** - * LcmEventsProducer to call the publisher and publish on the dedicated topic. + * LcmEventsProducer to call the producer and send on the dedicated topic. */ @Slf4j @@ -49,7 +49,7 @@ public class LcmEventsProducer { private static final Tag TAG_METHOD = Tag.of("method", "publishLcmEvent"); private static final Tag TAG_CLASS = Tag.of("class", LcmEventsProducer.class.getName()); private static final String UNAVAILABLE_CM_HANDLE_STATE = "N/A"; - private final EventsPublisher eventsPublisher; + private final EventsProducer eventsProducer; private final JsonObjectMapper jsonObjectMapper; private final MeterRegistry meterRegistry; @@ -74,7 +74,7 @@ public class LcmEventsProducer { try { final Map lcmEventHeadersMap = jsonObjectMapper.convertToValueType(lcmEventHeader, Map.class); - eventsPublisher.publishEvent(topicName, cmHandleId, lcmEventHeadersMap, lcmEvent); + eventsProducer.sendEvent(topicName, cmHandleId, lcmEventHeadersMap, lcmEvent); } catch (final KafkaException e) { log.error("Unable to publish message to topic : {} and cause : {}", topicName, e.getMessage()); } finally { diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/trustlevel/TrustLevelManager.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/trustlevel/TrustLevelManager.java index 27ad535344..944b5eb9a0 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/trustlevel/TrustLevelManager.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/trustlevel/TrustLevelManager.java @@ -68,7 +68,7 @@ public class TrustLevelManager { } /** - * Add cmHandles to the cache and publish notification for initial trust level of cmHandles if it is NONE. + * Add cmHandles to the cache and send notification for initial trust level of cmHandles if it is NONE. * * @param cmHandlesToBeCreated a list of cmHandles being created */ @@ -82,7 +82,7 @@ public class TrustLevelManager { } trustLevelPerCmHandleIdForCache.put(cmHandleId, initialTrustLevel); if (TrustLevel.NONE.equals(initialTrustLevel)) { - inventoryEventProducer.publishAvcEvent(cmHandleId, + inventoryEventProducer.sendAvcEvent(cmHandleId, AVC_CHANGED_ATTRIBUTE_NAME, AVC_NO_OLD_VALUE, initialTrustLevel.name()); @@ -92,7 +92,7 @@ public class TrustLevelManager { } /** - * Updates trust level of dmi plugin in the cache and publish notification for trust level of cmHandles if it + * Updates trust level of dmi plugin in the cache and sends notification for trust level of cmHandles if it * has changed. * * @param dmiServiceName dmi service name @@ -113,7 +113,7 @@ public class TrustLevelManager { } /** - * Updates trust level of device in the cache and publish notification for trust level of device if it has + * Updates trust level of device in the cache and send notification for trust level of device if it has * changed. * * @param cmHandleId cm handle id @@ -197,7 +197,7 @@ public class TrustLevelManager { } else { log.info("The trust level for Cm Handle: {} is now: {} ", notificationCandidateCmHandleId, newEffectiveTrustLevel); - inventoryEventProducer.publishAvcEvent(notificationCandidateCmHandleId, + inventoryEventProducer.sendAvcEvent(notificationCandidateCmHandleId, AVC_CHANGED_ATTRIBUTE_NAME, oldEffectiveTrustLevel.name(), newEffectiveTrustLevel.name()); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/utils/events/InventoryEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/utils/events/InventoryEventProducer.java index f388ee1b20..8f83e28a7c 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/utils/events/InventoryEventProducer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/utils/events/InventoryEventProducer.java @@ -27,7 +27,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import lombok.RequiredArgsConstructor; -import org.onap.cps.events.EventsPublisher; +import org.onap.cps.events.EventsProducer; import org.onap.cps.ncmp.events.avc.ncmp_to_client.Avc; import org.onap.cps.ncmp.events.avc.ncmp_to_client.AvcEvent; import org.onap.cps.ncmp.events.avc.ncmp_to_client.Data; @@ -38,18 +38,18 @@ import org.springframework.stereotype.Service; @RequiredArgsConstructor public class InventoryEventProducer { - private final EventsPublisher eventsPublisher; + private final EventsProducer eventsProducer; @Value("${app.ncmp.avc.inventory-events-topic}") private String ncmpInventoryEventsTopicName; /** - * Publish attribute value change event. + * Send attribute value change event. * * @param eventKey id of the cmHandle being registered */ - public void publishAvcEvent(final String eventKey, final String attributeName, - final String oldAttributeValue, final String newAttributeValue) { + public void sendAvcEvent(final String eventKey, final String attributeName, + final String oldAttributeValue, final String newAttributeValue) { final AvcEvent avcEvent = buildAvcEvent(attributeName, oldAttributeValue, newAttributeValue); final Map extensions = createAvcEventExtensions(eventKey); @@ -61,7 +61,7 @@ public class InventoryEventProducer { .build() .asCloudEvent(); - eventsPublisher.publishCloudEvent(ncmpInventoryEventsTopicName, eventKey, avcCloudEvent); + eventsProducer.sendCloudEvent(ncmpInventoryEventsTopicName, eventKey, avcCloudEvent); } private AvcEvent buildAvcEvent(final String attributeName, diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy index ad5f42ed94..b0a8f20ccb 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (c) 2023-2024 Nordix Foundation. + * Copyright (c) 2023-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,7 +27,7 @@ import io.cloudevents.kafka.CloudEventDeserializer import io.cloudevents.kafka.impl.KafkaHeaders import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer -import org.onap.cps.events.EventsPublisher +import org.onap.cps.events.EventsProducer import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent import org.onap.cps.ncmp.utils.TestUtils import org.onap.cps.ncmp.utils.events.MessagingBaseSpec @@ -41,16 +41,16 @@ import java.time.Duration import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent -@SpringBootTest(classes = [EventsPublisher, CmAvcEventConsumer, ObjectMapper, JsonObjectMapper]) +@SpringBootTest(classes = [EventsProducer, CmAvcEventConsumer, ObjectMapper, JsonObjectMapper]) @Testcontainers @DirtiesContext class CmAvcEventConsumerSpec extends MessagingBaseSpec { @SpringBean - EventsPublisher eventsPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate) + EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate) @SpringBean - CmAvcEventConsumer acvEventConsumer = new CmAvcEventConsumer(eventsPublisher) + CmAvcEventConsumer acvEventConsumer = new CmAvcEventConsumer(eventsProducer) @Autowired JsonObjectMapper jsonObjectMapper diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventProducerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventProducerSpec.groovy index 49e43f9067..5a101471f5 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventProducerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventProducerSpec.groovy @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (c) 2024-2025 Nordix Foundation. + * Copyright (c) 2024-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,7 +23,7 @@ package org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi import com.fasterxml.jackson.databind.ObjectMapper import io.cloudevents.CloudEvent import io.cloudevents.core.v1.CloudEventBuilder -import org.onap.cps.events.EventsPublisher +import org.onap.cps.events.EventsProducer import org.onap.cps.ncmp.config.CpsApplicationContext import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.CmHandle import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.Data @@ -38,11 +38,11 @@ import spock.lang.Specification @ContextConfiguration(classes = [CpsApplicationContext]) class DmiInEventProducerSpec extends Specification { - def mockEventsPublisher = Mock(EventsPublisher) + def mockEventsProducer = Mock(EventsProducer) - def objectUnderTest = new DmiInEventProducer(mockEventsPublisher) + def objectUnderTest = new DmiInEventProducer(mockEventsProducer) - def 'Create and Publish Cm Notification Subscription DMI In Event'() { + def 'Create and Send Cm Notification Subscription DMI In Event'() { given: 'a cm subscription for a dmi plugin' def subscriptionId = 'test-subscription-id' def dmiPluginName = 'test-dmiplugin' @@ -50,10 +50,10 @@ class DmiInEventProducerSpec extends Specification { def dmiInEvent = new DmiInEvent(data: new Data(cmHandles: [new CmHandle(cmhandleId: 'test-1', privateProperties: [:])])) and: 'also we have target topic for dmiPlugin' objectUnderTest.dmiInEventTopic = 'dmiplugin-test-topic' - when: 'the event is published' - objectUnderTest.publishDmiInEvent(subscriptionId, dmiPluginName, eventType, dmiInEvent) + when: 'the event is sent' + objectUnderTest.sendDmiInEvent(subscriptionId, dmiPluginName, eventType, dmiInEvent) then: 'the event contains the required attributes' - 1 * mockEventsPublisher.publishCloudEvent(_, _, _) >> { + 1 * mockEventsProducer.sendCloudEvent(_, _, _) >> { args -> { assert args[0] == 'dmiplugin-test-topic' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumerSpec.groovy index bcf8780873..2ab15d231c 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumerSpec.groovy @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (c) 2024 Nordix Foundation. + * Copyright (c) 2024-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -108,8 +108,8 @@ class DmiOutEventConsumerSpec extends MessagingBaseSpec { expectedPersistenceCalls * mockDmiCacheHandler.persistIntoDatabasePerDmi('sub-1','test-dmi-plugin-name') and: 'correct number of calls to map the ncmp out event' 1 * mockNcmpOutEventMapper.toNcmpOutEvent('sub-1', _) - and: 'correct number of calls to publish the ncmp out event to client' - 1 * mockNcmpOutEventProducer.publishNcmpOutEvent('sub-1', 'subscriptionCreateResponse', _, false) + and: 'correct number of calls to send the ncmp out event to client' + 1 * mockNcmpOutEventProducer.sendNcmpOutEvent('sub-1', 'subscriptionCreateResponse', _, false) where: 'the following parameters are used' scenario | subscriptionStatus | statusCode || expectedCacheCalls | expectedPersistenceCalls 'Accepted Status' | ACCEPTED | '1' || 1 | 1 diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImplSpec.groovy index 1a54deea6a..e4321ff718 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImplSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImplSpec.groovy @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (c) 2024 Nordix Foundation. + * Copyright (c) 2024-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -81,11 +81,11 @@ class CmSubscriptionHandlerImplSpec extends Specification { objectUnderTest.processSubscriptionCreateRequest(subscriptionId, predicates) then: 'the subscription cache handler is called once' 1 * mockDmiCacheHandler.add('test-id', _) - and: 'the events handler method to publish DMI event is called correct number of times with the correct parameters' - testDmiSubscriptionsPerDmi.size() * mockDmiInEventProducer.publishDmiInEvent( + and: 'the events handler method to send DMI event is called correct number of times with the correct parameters' + testDmiSubscriptionsPerDmi.size() * mockDmiInEventProducer.sendDmiInEvent( "test-id", "dmi-1", "subscriptionCreateRequest", testDmiInEvent) and: 'we schedule to send the response after configured time from the cache' - 1 * mockNcmpOutEventProducer.publishNcmpOutEvent('test-id', 'subscriptionCreateResponse', null, true) + 1 * mockNcmpOutEventProducer.sendNcmpOutEvent('test-id', 'subscriptionCreateResponse', null, true) } def 'Consume valid and Overlapping Cm Notification Subscription NcmpIn Event'() { @@ -105,7 +105,7 @@ class CmSubscriptionHandlerImplSpec extends Specification { and: 'the subscription details are updated in the cache' 1 * mockDmiCacheHandler.updateDmiSubscriptionStatus('test-id', _, ACCEPTED) and: 'we schedule to send the response after configured time from the cache' - 1 * mockNcmpOutEventProducer.publishNcmpOutEvent('test-id', 'subscriptionCreateResponse', null, true) + 1 * mockNcmpOutEventProducer.sendNcmpOutEvent('test-id', 'subscriptionCreateResponse', null, true) } def 'Consume valid and but non-unique CmNotificationSubscription create message'() { @@ -122,10 +122,10 @@ class CmSubscriptionHandlerImplSpec extends Specification { "test-id", _) >> testNcmpOutEvent when: 'the valid but non-unique event is consumed' objectUnderTest.processSubscriptionCreateRequest(subscriptionId, predicates) - then: 'the events handler method to publish DMI event is never called' - 0 * mockDmiInEventProducer.publishDmiInEvent(_, _, _, _) - and: 'the events handler method to publish NCMP out event is called once' - 1 * mockNcmpOutEventProducer.publishNcmpOutEvent('test-id', 'subscriptionCreateResponse', testNcmpOutEvent, false) + then: 'the events handler method to send DMI event is never called' + 0 * mockDmiInEventProducer.sendDmiInEvent(_, _, _, _) + and: 'the events handler method to send NCMP out event is called once' + 1 * mockNcmpOutEventProducer.sendNcmpOutEvent('test-id', 'subscriptionCreateResponse', testNcmpOutEvent, false) } def 'Consume valid CmNotificationSubscriptionNcmpInEvent delete message'() { @@ -140,11 +140,11 @@ class CmSubscriptionHandlerImplSpec extends Specification { 1 * mockInventoryPersistence.getYangModelCmHandle('ch-2') >> new YangModelCmHandle(dmiServiceName: 'dmi-2') when: 'the subscription delete request is processed' objectUnderTest.processSubscriptionDeleteRequest(subscriptionId) - then: 'the method to publish a dmi event is called with correct parameters' - 1 * mockDmiInEventProducer.publishDmiInEvent(subscriptionId,'dmi-1','subscriptionDeleteRequest',_) - 1 * mockDmiInEventProducer.publishDmiInEvent(subscriptionId,'dmi-2','subscriptionDeleteRequest',_) - and: 'the method to publish nmcp out event is called with correct parameters' - 1 * mockNcmpOutEventProducer.publishNcmpOutEvent(subscriptionId, 'subscriptionDeleteResponse', null, true) + then: 'the method to send a dmi event is called with correct parameters' + 1 * mockDmiInEventProducer.sendDmiInEvent(subscriptionId,'dmi-1','subscriptionDeleteRequest',_) + 1 * mockDmiInEventProducer.sendDmiInEvent(subscriptionId,'dmi-2','subscriptionDeleteRequest',_) + and: 'the method to send nmcp out event is called with correct parameters' + 1 * mockNcmpOutEventProducer.sendNcmpOutEvent(subscriptionId, 'subscriptionDeleteResponse', null, true) } def 'Delete a subscriber for fully overlapping subscriptions'() { @@ -161,12 +161,12 @@ class CmSubscriptionHandlerImplSpec extends Specification { 2 * mockDmiCacheHandler.get(subscriptionId) >> ['dmi-1':[:],'dmi-2':[:]] when: 'the subscription delete request is processed' objectUnderTest.processSubscriptionDeleteRequest(subscriptionId) - then: 'the method to publish a dmi event is never called' - 0 * mockDmiInEventProducer.publishDmiInEvent(_,_,_,_) + then: 'the method to send a dmi event is never called' + 0 * mockDmiInEventProducer.sendDmiInEvent(_,_,_,_) and: 'the cache handler is called to remove subscriber from database per dmi' 1 * mockDmiCacheHandler.removeFromDatabase('test-id', 'dmi-1') 1 * mockDmiCacheHandler.removeFromDatabase('test-id', 'dmi-2') - and: 'the method to publish nmcp out event is called with correct parameters' - 1 * mockNcmpOutEventProducer.publishNcmpOutEvent(subscriptionId, 'subscriptionDeleteResponse', null, false) + and: 'the method to send ncmp out event is called with correct parameters' + 1 * mockNcmpOutEventProducer.sendNcmpOutEvent(subscriptionId, 'subscriptionDeleteResponse', null, false) } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducerSpec.groovy index d8adde261c..09aebf3499 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducerSpec.groovy @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2025 Nordix Foundation + * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,7 +23,7 @@ package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp import com.fasterxml.jackson.databind.ObjectMapper import io.cloudevents.CloudEvent import io.cloudevents.core.v1.CloudEventBuilder -import org.onap.cps.events.EventsPublisher +import org.onap.cps.events.EventsProducer import org.onap.cps.ncmp.config.CpsApplicationContext import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.Data @@ -38,27 +38,27 @@ import spock.lang.Specification @ContextConfiguration(classes = [CpsApplicationContext]) class NcmpOutEventProducerSpec extends Specification { - def mockEventsPublisher = Mock(EventsPublisher) + def mockEventsProducer = Mock(EventsProducer) def mockNcmpOutEventMapper = Mock(NcmpOutEventMapper) def mockDmiCacheHandler = Mock(DmiCacheHandler) - def objectUnderTest = new NcmpOutEventProducer(mockEventsPublisher, mockNcmpOutEventMapper, mockDmiCacheHandler) + def objectUnderTest = new NcmpOutEventProducer(mockEventsProducer, mockNcmpOutEventMapper, mockDmiCacheHandler) def 'Create and #scenario Cm Notification Subscription NCMP out event'() { given: 'a cm subscription response for the client' def subscriptionId = 'test-subscription-id-2' def eventType = 'subscriptionCreateResponse' def ncmpOutEvent = new NcmpOutEvent(data: new Data(subscriptionId: 'test-subscription-id-2', acceptedTargets: ['ch-1', 'ch-2'])) - and: 'also we have target topic for publishing to client' + and: 'also we have target topic for sending to client' objectUnderTest.ncmpOutEventTopic = 'client-test-topic' and: 'a deadline to an event' objectUnderTest.dmiOutEventTimeoutInMs = 1000 - when: 'the event is published' - objectUnderTest.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, eventPublishingTaskToBeScheduled) + when: 'the event is sent' + objectUnderTest.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, eventPublishingTaskToBeScheduled) then: 'we conditionally wait for a while' Thread.sleep(delayInMs) then: 'the event contains the required attributes' - 1 * mockEventsPublisher.publishCloudEvent(_, _, _) >> { + 1 * mockEventsProducer.sendCloudEvent(_, _, _) >> { args -> { assert args[0] == 'client-test-topic' @@ -72,27 +72,27 @@ class NcmpOutEventProducerSpec extends Specification { } where: 'following scenarios are considered' scenario | delayInMs | eventPublishingTaskToBeScheduled - 'publish event now' | 0 | false - 'schedule and publish after the configured time ' | 1500 | true + 'send event now' | 0 | false + 'schedule and send after the configured time ' | 1500 | true } - def 'Schedule Cm Notification Subscription NCMP out event but later publish it on demand'() { + def 'Schedule Cm Notification Subscription NCMP out event but later send it on demand'() { given: 'a cm subscription response for the client' def subscriptionId = 'test-subscription-id-3' def eventType = 'subscriptionCreateResponse' def ncmpOutEvent = new NcmpOutEvent(data: new Data(subscriptionId: 'test-subscription-id-3', acceptedTargets: ['ch-2', 'ch-3'])) - and: 'also we have target topic for publishing to client' + and: 'also we have target topic for sending to client' objectUnderTest.ncmpOutEventTopic = 'client-test-topic' and: 'a deadline to an event' objectUnderTest.dmiOutEventTimeoutInMs = 1000 - when: 'the event is scheduled to be published' - objectUnderTest.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, true) + when: 'the event is scheduled to be sent' + objectUnderTest.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, true) then: 'we wait for 10ms and then we receive response from DMI' Thread.sleep(10) - and: 'we receive response from DMI so we publish the message on demand' - objectUnderTest.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false) + and: 'we receive response from DMI so we send the message on demand' + objectUnderTest.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false) then: 'the event contains the required attributes' - 1 * mockEventsPublisher.publishCloudEvent(_, _, _) >> { + 1 * mockEventsProducer.sendCloudEvent(_, _, _) >> { args -> { assert args[0] == 'client-test-topic' @@ -108,23 +108,23 @@ class NcmpOutEventProducerSpec extends Specification { 1 * mockDmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId) } - def 'No event published when NCMP out event is null'() { + def 'No event sent when NCMP out event is null'() { given: 'a cm subscription response for the client' def subscriptionId = 'test-subscription-id-3' def eventType = 'subscriptionCreateResponse' def ncmpOutEvent = null - and: 'also we have target topic for publishing to client' + and: 'also we have target topic for sending to client' objectUnderTest.ncmpOutEventTopic = 'client-test-topic' and: 'a deadline to an event' objectUnderTest.dmiOutEventTimeoutInMs = 1000 - when: 'the event is scheduled to be published' - objectUnderTest.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, true) + when: 'the event is scheduled to be sent' + objectUnderTest.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, true) then: 'we wait for 10ms and then we receive response from DMI' Thread.sleep(10) - and: 'we receive NO response from DMI so we publish the message on demand' - objectUnderTest.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false) - and: 'no event published' - 0 * mockEventsPublisher.publishCloudEvent(*_) + and: 'we receive NO response from DMI so we send the message on demand' + objectUnderTest.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false) + and: 'no event sent' + 0 * mockEventsProducer.sendCloudEvent(*_) } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/DmiDataOperationsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/DmiDataOperationsSpec.groovy index 37a9097f33..93338c97ec 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/DmiDataOperationsSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/DmiDataOperationsSpec.groovy @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2021-2025 Nordix Foundation + * Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved. * Modifications Copyright (C) 2022 Bell Canada * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -22,7 +22,7 @@ package org.onap.cps.ncmp.impl.data import com.fasterxml.jackson.databind.ObjectMapper -import org.onap.cps.events.EventsPublisher +import org.onap.cps.events.EventsProducer import org.onap.cps.ncmp.api.data.models.CmResourceAddress import org.onap.cps.ncmp.api.data.models.DataOperationRequest import org.onap.cps.ncmp.api.exceptions.DmiClientRequestException @@ -57,7 +57,7 @@ import static org.onap.cps.ncmp.impl.models.RequiredDmiService.DATA import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent @SpringBootTest -@ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext, DmiProperties, DmiDataOperations, PolicyExecutor]) +@ContextConfiguration(classes = [EventsProducer, CpsApplicationContext, DmiProperties, DmiDataOperations, PolicyExecutor]) class DmiDataOperationsSpec extends DmiOperationsBaseSpec { def NO_TOPIC = null @@ -74,7 +74,7 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec { DmiDataOperations objectUnderTest @SpringBean - EventsPublisher eventsPublisher = Stub() + EventsProducer eventsProducer = Stub() @SpringBean PolicyExecutor policyExecutor = Mock() @@ -130,9 +130,9 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec { def dataOperationBatchRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json') def dataOperationRequest = spiedJsonObjectMapper.convertJsonString(dataOperationBatchRequestJsonData, DataOperationRequest.class) dataOperationRequest.dataOperationDefinitions[0].cmHandleReferences = [cmHandleId] - and: 'the published cloud event will be captured' + and: 'the sent cloud event will be captured' def actualDataOperationCloudEvent = null - eventsPublisher.publishCloudEvent('my-topic-name', 'my-request-id', _) >> { args -> actualDataOperationCloudEvent = args[2] } + eventsProducer.sendCloudEvent('my-topic-name', 'my-request-id', _) >> { args -> actualDataOperationCloudEvent = args[2] } and: 'a DMI client request exception is thrown when DMI service is called' mockDmiRestClient.asynchronousPostOperationWithJsonData(*_) >> { Mono.error(new DmiClientRequestException(123, '', '', UNKNOWN_ERROR)) } when: 'attempt to get resource data for group of cm handles is invoked' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy index c651bb5d0f..8ea73b672f 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy @@ -24,7 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.serialization.StringDeserializer import org.mapstruct.factory.Mappers -import org.onap.cps.events.EventsPublisher +import org.onap.cps.events.EventsProducer import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent import org.onap.cps.ncmp.utils.TestUtils @@ -37,14 +37,14 @@ import org.springframework.test.annotation.DirtiesContext import org.testcontainers.spock.Testcontainers import java.time.Duration -@SpringBootTest(classes = [EventsPublisher, DmiAsyncRequestResponseEventConsumer, ObjectMapper, JsonObjectMapper]) +@SpringBootTest(classes = [EventsProducer, DmiAsyncRequestResponseEventConsumer, ObjectMapper, JsonObjectMapper]) @Testcontainers @DirtiesContext class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends MessagingBaseSpec { @SpringBean - EventsPublisher cpsAsyncRequestResponseEventPublisher = - new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate); + EventsProducer cpsAsyncRequestResponseEventProducer = + new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate); @SpringBean @@ -53,7 +53,7 @@ class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends MessagingBase @SpringBean DmiAsyncRequestResponseEventConsumer dmiAsyncRequestResponseEventConsumer = - new DmiAsyncRequestResponseEventConsumer(cpsAsyncRequestResponseEventPublisher, + new DmiAsyncRequestResponseEventConsumer(cpsAsyncRequestResponseEventProducer, ncmpAsyncRequestResponseEventMapper) @Autowired diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumerSpec.groovy index afb594ab04..9c9768ab1f 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumerSpec.groovy @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2023-2024 Nordix Foundation + * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,7 +29,7 @@ import io.cloudevents.kafka.impl.KafkaHeaders import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.header.internals.RecordHeaders -import org.onap.cps.events.EventsPublisher +import org.onap.cps.events.EventsProducer import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent import org.onap.cps.ncmp.utils.TestUtils import org.onap.cps.ncmp.utils.events.MessagingBaseSpec @@ -45,16 +45,16 @@ import java.time.Duration import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent -@SpringBootTest(classes = [EventsPublisher, DataOperationEventConsumer, RecordFilterStrategies, JsonObjectMapper, ObjectMapper]) +@SpringBootTest(classes = [EventsProducer, DataOperationEventConsumer, RecordFilterStrategies, JsonObjectMapper, ObjectMapper]) @Testcontainers @DirtiesContext class DataOperationEventConsumerSpec extends MessagingBaseSpec { @SpringBean - EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate) + EventsProducer asyncDataOperationEventProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate) @SpringBean - DataOperationEventConsumer objectUnderTest = new DataOperationEventConsumer(asyncDataOperationEventPublisher) + DataOperationEventConsumer objectUnderTest = new DataOperationEventConsumer(asyncDataOperationEventProducer) @Autowired JsonObjectMapper jsonObjectMapper @@ -66,13 +66,13 @@ class DataOperationEventConsumerSpec extends MessagingBaseSpec { def static clientTopic = 'client-topic' def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent' - def 'Consume and publish event to client specified topic'() { + def 'Consume and send event to client specified topic'() { given: 'consumer subscribing to client topic' cloudEventKafkaConsumer.subscribe([clientTopic]) and: 'consumer record for data operation event' def consumerRecordIn = createConsumerRecord(dataOperationType) - when: 'the data operation event is consumed and published to client specified topic' - objectUnderTest.consumeAndPublish(consumerRecordIn) + when: 'the data operation event is consumed and sent to client specified topic' + objectUnderTest.consumeAndSend(consumerRecordIn) and: 'the client specified topic is polled' def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))[0] then: 'verify cloud compliant headers' @@ -84,7 +84,7 @@ class DataOperationEventConsumerSpec extends MessagingBaseSpec { assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_destination') == clientTopic and: 'map consumer record to expected event type' def dataOperationResponseEvent = toTargetEvent(consumerRecordOut.value(), DataOperationEvent.class) - and: 'verify published response data properties' + and: 'verify sent response data properties' def response = dataOperationResponseEvent.data.responses[0] response.operationId == 'some-operation-id' response.statusCode == 'any-success-status-code' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/FilterStrategiesIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/FilterStrategiesIntegrationSpec.groovy index 8039d4767c..baca4450dd 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/FilterStrategiesIntegrationSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/FilterStrategiesIntegrationSpec.groovy @@ -21,7 +21,7 @@ package org.onap.cps.ncmp.impl.data.async import io.cloudevents.core.builder.CloudEventBuilder -import org.onap.cps.events.EventsPublisher +import org.onap.cps.events.EventsProducer import org.onap.cps.ncmp.config.KafkaConfig import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent import org.onap.cps.ncmp.utils.events.ConsumerBaseSpec @@ -42,7 +42,7 @@ import java.util.concurrent.TimeUnit class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec { @SpringBean - EventsPublisher mockEventsPublisher = Mock() + EventsProducer mockEventsProducer = Mock() @SpringBean NcmpAsyncRequestResponseEventMapper mapper = Stub() @@ -61,23 +61,23 @@ class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec { then: 'wait a little for async processing of message (must wait to try to avoid false positives)' TimeUnit.MILLISECONDS.sleep(300) and: 'event is not consumed' - 0 * mockEventsPublisher.publishEvent(*_) + 0 * mockEventsProducer.sendEvent(*_) } def 'Legacy event consumer with valid legacy event.'() { given: 'a legacy event' DmiAsyncRequestResponseEvent legacyEvent = new DmiAsyncRequestResponseEvent(eventId:'legacyEventId', eventTarget:'legacyEventTarget') - and: 'a flag to track the publish event call' - def publishEventMethodCalled = false - and: 'the (mocked) events publisher will use the flag to indicate if it is called' - mockEventsPublisher.publishEvent(*_) >> { - publishEventMethodCalled = true + and: 'a flag to track the send event call' + def sendEventMethodCalled = false + and: 'the (mocked) events producer will use the flag to indicate if it is called' + mockEventsProducer.sendEvent(*_) >> { + sendEventMethodCalled = true } when: 'send the cloud event' legacyEventKafkaTemplate.send(topic, legacyEvent) then: 'the event is consumed by the (legacy) AsynRestRequest consumer' new PollingConditions().within(1) { - assert publishEventMethodCalled == true + assert sendEventMethodCalled == true } } @@ -87,20 +87,20 @@ class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec { .withType(eventType) .withSource(URI.create('some-source')) .build() - and: 'a flag to track the publish event call' - def publishEventMethodCalled = false - and: 'the (mocked) events publisher will use the flag to indicate if it is called' - mockEventsPublisher.publishCloudEvent(*_) >> { - publishEventMethodCalled = true + and: 'a flag to track the sent event call' + def sendEventMethodCalled = false + and: 'the (mocked) events producer will use the flag to indicate if it is called' + mockEventsProducer.sendCloudEvent(*_) >> { + sendEventMethodCalled = true } when: 'send the cloud event' cloudEventKafkaTemplate.send(topic, cloudEvent) then: 'the event has only been forwarded for the correct type' new PollingConditions(initialDelay: 0.3).within(1) { - assert publishEventMethodCalled == expectCallToPublishEventMethod + assert sendEventMethodCalled == expectCallToSendEventMethod } where: 'the following event types are used' - eventType || expectCallToPublishEventMethod + eventType || expectCallToSendEventMethod 'DataOperationEvent' || true 'other type' || false 'any type contain the word "DataOperationEvent"' || true @@ -114,7 +114,7 @@ class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec { then: 'wait a little for async processing of message (must wait to try to avoid false positives)' TimeUnit.MILLISECONDS.sleep(300) and: 'the event is not processed by this consumer' - 0 * mockEventsPublisher.publishCloudEvent(*_) + 0 * mockEventsProducer.sendCloudEvent(*_) } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/SerializationIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/SerializationIntegrationSpec.groovy index 75738b443f..65e8af8e48 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/SerializationIntegrationSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/SerializationIntegrationSpec.groovy @@ -22,7 +22,7 @@ package org.onap.cps.ncmp.impl.data.async import com.fasterxml.jackson.databind.ObjectMapper import io.cloudevents.core.builder.CloudEventBuilder -import org.onap.cps.events.EventsPublisher +import org.onap.cps.events.EventsProducer import org.onap.cps.ncmp.config.KafkaConfig import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent @@ -46,7 +46,7 @@ import spock.util.concurrent.PollingConditions class SerializationIntegrationSpec extends ConsumerBaseSpec { @SpringBean - EventsPublisher mockEventsPublisher = Mock() + EventsProducer mockEventsProducer = Mock() @SpringBean NcmpAsyncRequestResponseEventMapper mapper = Stub() { toNcmpAsyncEvent(_) >> new NcmpAsyncRequestResponseEvent(eventId: 'my-event-id', eventTarget: 'some client topic')} @@ -60,34 +60,34 @@ class SerializationIntegrationSpec extends ConsumerBaseSpec { def 'Forwarding DataOperation Event Data.'() { given: 'a data operation cloud event' def cloudEvent = createCloudEvent() - and: 'a flag to track the publish cloud event call' - def publishCloudEventMethodCalled = false - and: 'the (mocked) events publisher will use the flag to indicate if it is called and will capture the cloud event' - mockEventsPublisher.publishCloudEvent('some client topic', 'some-correlation-id', cloudEvent) >> { - publishCloudEventMethodCalled = true + and: 'a flag to track the send cloud event call' + def sendCloudEventMethodCalled = false + and: 'the (mocked) events producer will use the flag to indicate if it is called and will capture the cloud event' + mockEventsProducer.sendCloudEvent('some client topic', 'some-correlation-id', cloudEvent) >> { + sendCloudEventMethodCalled = true } when: 'send the event' cloudEventKafkaTemplate.send(topic, cloudEvent) then: 'the event has been forwarded' new PollingConditions().within(1) { - assert publishCloudEventMethodCalled == true + assert sendCloudEventMethodCalled == true } } def 'Forwarding AsyncRestRequestResponse Event Data.'() { given: 'async request response legacy event' def dmiAsyncRequestResponseEvent = new DmiAsyncRequestResponseEvent(eventId: 'my-event-id',eventTarget: 'some client topic') - and: 'a flag to track the publish event call' - def publishEventMethodCalled = false - and: 'the (mocked) events publisher will use the flag to indicate if it is called and will capture the event' - mockEventsPublisher.publishEvent(*_) >> { - publishEventMethodCalled = true + and: 'a flag to track the send event call' + def sendEventMethodCalled = false + and: 'the (mocked) events producer will use the flag to indicate if it is called and will capture the event' + mockEventsProducer.sendEvent(*_) >> { + sendEventMethodCalled = true } when: 'send the event' legacyEventKafkaTemplate.send(topic, dmiAsyncRequestResponseEvent) then: 'the event has been forwarded' new PollingConditions().within(1) { - assert publishEventMethodCalled == true + assert sendEventMethodCalled == true } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelperSpec.groovy index 33b8490e5e..22ce4ab084 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelperSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelperSpec.groovy @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2023-2024 Nordix Foundation + * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,7 +25,7 @@ import io.cloudevents.CloudEvent import io.cloudevents.kafka.CloudEventDeserializer import io.cloudevents.kafka.impl.KafkaHeaders import org.apache.kafka.clients.consumer.KafkaConsumer -import org.onap.cps.events.EventsPublisher +import org.onap.cps.events.EventsProducer import org.onap.cps.ncmp.api.data.models.DataOperationRequest import org.onap.cps.ncmp.api.data.models.OperationType import org.onap.cps.ncmp.api.inventory.models.CompositeStateBuilder @@ -46,7 +46,7 @@ import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.ADVISED import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.READY import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent -@ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext]) +@ContextConfiguration(classes = [EventsProducer, CpsApplicationContext]) class DmiDataOperationsHelperSpec extends MessagingBaseSpec { def static clientTopic = 'my-topic-name' @@ -56,7 +56,7 @@ class DmiDataOperationsHelperSpec extends MessagingBaseSpec { JsonObjectMapper jsonObjectMapper = new JsonObjectMapper(new ObjectMapper()) @SpringBean - EventsPublisher eventPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate) + EventsProducer eventProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate) def 'Process per data operation request with #serviceName.'() { given: 'data operation request with 3 operations' @@ -106,7 +106,7 @@ class DmiDataOperationsHelperSpec extends MessagingBaseSpec { assert cmHandlesInRequestBody[0].moduleSetTag == 'module-set-tag1' } - def 'Process per data operation request with non-ready, non-existing cm handle and publish event to client specified topic'() { + def 'Process per data operation request with non-ready, non-existing cm handle and send event to client specified topic'() { given: 'consumer subscribing to client topic' def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test-1', CloudEventDeserializer)) cloudEventKafkaConsumer.subscribe([clientTopic]) @@ -129,7 +129,7 @@ class DmiDataOperationsHelperSpec extends MessagingBaseSpec { toTargetEvent(consumerRecordOut.value(), DataOperationEvent.class) and: 'data operation response event response size is 3' dataOperationResponseEvent.data.responses.size() == 3 - and: 'verify published data operation response as json string' + and: 'verify sent data operation response as json string' def dataOperationResponseEventJson = TestUtils.getResourceFileContent('dataOperationResponseEvent.json') jsonObjectMapper.asJsonString(dataOperationResponseEvent.data.responses) == dataOperationResponseEventJson } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsPublisherSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventsProducerSpec.groovy similarity index 90% rename from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsPublisherSpec.groovy rename to cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventsProducerSpec.groovy index 3e7ed9aff6..9d2511a996 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsPublisherSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventsProducerSpec.groovy @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022-2024 Nordix Foundation + * Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,7 +23,7 @@ package org.onap.cps.ncmp.impl.inventory.sync.lcm import com.fasterxml.jackson.databind.ObjectMapper import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.serialization.StringDeserializer -import org.onap.cps.events.EventsPublisher +import org.onap.cps.events.EventsProducer import org.onap.cps.ncmp.events.lcm.v1.Event import org.onap.cps.ncmp.events.lcm.v1.LcmEvent import org.onap.cps.ncmp.utils.TestUtils @@ -41,20 +41,20 @@ import java.time.Duration @SpringBootTest(classes = [ObjectMapper, JsonObjectMapper]) @Testcontainers @DirtiesContext -class LcmEventsPublisherSpec extends MessagingBaseSpec { +class EventsProducerSpec extends MessagingBaseSpec { def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', StringDeserializer)) def testTopic = 'ncmp-events-test' @SpringBean - EventsPublisher lcmEventsPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate) + EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate) @Autowired JsonObjectMapper jsonObjectMapper - def 'Produce and Consume Lcm Event'() { + def 'Produce and Consume Event'() { given: 'event key and event data' def eventKey = 'lcm' def eventId = 'test-uuid' @@ -84,8 +84,8 @@ class LcmEventsPublisherSpec extends MessagingBaseSpec { eventSchemaVersion: eventSchemaVersion] and: 'consumer has a subscription' legacyEventKafkaConsumer.subscribe([testTopic] as List) - when: 'an event is published' - lcmEventsPublisher.publishEvent(testTopic, eventKey, eventHeader, eventData) + when: 'an event is sent' + eventsProducer.sendEvent(testTopic, eventKey, eventHeader, eventData) and: 'topic is polled' def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500)) then: 'poll returns one record' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerSpec.groovy index a0b6de1aa4..8ae13f83e5 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerSpec.groovy @@ -25,7 +25,7 @@ import static org.onap.cps.ncmp.events.lcm.v1.Values.CmHandleState.READY import io.micrometer.core.instrument.Tag import io.micrometer.core.instrument.simple.SimpleMeterRegistry -import org.onap.cps.events.EventsPublisher +import org.onap.cps.events.EventsProducer import org.onap.cps.ncmp.events.lcm.v1.Event import org.onap.cps.ncmp.events.lcm.v1.LcmEvent import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader @@ -36,13 +36,13 @@ import spock.lang.Specification class LcmEventsProducerSpec extends Specification { - def mockLcmEventsPublisher = Mock(EventsPublisher) + def mockLcmEventsProducer = Mock(EventsProducer) def mockJsonObjectMapper = Mock(JsonObjectMapper) def meterRegistry = new SimpleMeterRegistry() - def objectUnderTest = new LcmEventsProducer(mockLcmEventsPublisher, mockJsonObjectMapper, meterRegistry) + def objectUnderTest = new LcmEventsProducer(mockLcmEventsProducer, mockJsonObjectMapper, meterRegistry) - def 'Create and Publish lcm event where events are #scenario'() { + def 'Create and send lcm event where events are #scenario'() { given: 'a cm handle id, Lcm Event, and headers' def cmHandleId = 'test-cm-handle-id' def eventId = UUID.randomUUID().toString() @@ -54,10 +54,10 @@ class LcmEventsProducerSpec extends Specification { objectUnderTest.notificationsEnabled = notificationsEnabled and: 'lcm event header is transformed to headers map' mockJsonObjectMapper.convertToValueType(lcmEventHeader, Map.class) >> ['eventId': eventId, 'eventCorrelationId': cmHandleId] - when: 'service is called to publish lcm event' + when: 'service is called to send lcm event' objectUnderTest.publishLcmEvent('test-cm-handle-id', lcmEvent, lcmEventHeader) then: 'publisher is called #expectedTimesMethodCalled times' - expectedTimesMethodCalled * mockLcmEventsPublisher.publishEvent(_, cmHandleId, _, lcmEvent) >> { + expectedTimesMethodCalled * mockLcmEventsProducer.sendEvent(_, cmHandleId, _, lcmEvent) >> { args -> { def eventHeaders = (args[2] as Map) assert eventHeaders.containsKey('eventId') @@ -91,7 +91,7 @@ class LcmEventsProducerSpec extends Specification { def lcmEventHeader = new LcmEventHeader(eventId: eventId, eventCorrelationId: cmHandleId) objectUnderTest.notificationsEnabled = true when: 'publisher set to throw an exception' - mockLcmEventsPublisher.publishEvent(_, _, _, _) >> { throw new KafkaException('publishing failed')} + mockLcmEventsProducer.sendEvent(_, _, _, _) >> { throw new KafkaException('publishing failed')} and: 'an event is publised' objectUnderTest.publishLcmEvent(cmHandleId, lcmEvent, lcmEventHeader) then: 'the exception is just logged and not bubbled up' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/trustlevel/TrustLevelManagerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/trustlevel/TrustLevelManagerSpec.groovy index 72ca190ff1..020834e6d6 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/trustlevel/TrustLevelManagerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/trustlevel/TrustLevelManagerSpec.groovy @@ -71,7 +71,7 @@ class TrustLevelManagerSpec extends Specification { when: 'method to register to the cache is called' objectUnderTest.registerCmHandles(cmHandleModelsToBeCreated) then: 'no notification sent' - 0 * mockInventoryEventProducer.publishAvcEvent(*_) + 0 * mockInventoryEventProducer.sendAvcEvent(*_) and: 'both cm handles are in the cache and are trusted' assert trustLevelPerCmHandleId.get('ch-1') == TrustLevel.COMPLETE assert trustLevelPerCmHandleId.get('ch-2') == TrustLevel.COMPLETE @@ -83,7 +83,7 @@ class TrustLevelManagerSpec extends Specification { when: 'method to register to the cache is called' objectUnderTest.registerCmHandles(cmHandleModelsToBeCreated) then: 'notification is sent' - 1 * mockInventoryEventProducer.publishAvcEvent(*_) + 1 * mockInventoryEventProducer.sendAvcEvent(*_) } def 'Dmi trust level updated'() { @@ -94,7 +94,7 @@ class TrustLevelManagerSpec extends Specification { when: 'the update is handled' objectUnderTest.updateDmi('my-dmi', ['ch-1'], TrustLevel.NONE) then: 'notification is sent' - 1 * mockInventoryEventProducer.publishAvcEvent('ch-1', 'trustLevel', 'COMPLETE', 'NONE') + 1 * mockInventoryEventProducer.sendAvcEvent('ch-1', 'trustLevel', 'COMPLETE', 'NONE') and: 'the dmi in the cache is not trusted' assert trustLevelPerDmiPlugin.get('my-dmi') == TrustLevel.NONE } @@ -107,7 +107,7 @@ class TrustLevelManagerSpec extends Specification { when: 'the update is handled' objectUnderTest.updateDmi('my-dmi', ['ch-1'], TrustLevel.COMPLETE) then: 'no notification is sent' - 0 * mockInventoryEventProducer.publishAvcEvent(*_) + 0 * mockInventoryEventProducer.sendAvcEvent(*_) and: 'the dmi in the cache is trusted' assert trustLevelPerDmiPlugin.get('my-dmi') == TrustLevel.COMPLETE } @@ -124,7 +124,7 @@ class TrustLevelManagerSpec extends Specification { then: 'the cm handle in the cache is trusted' assert trustLevelPerCmHandleId.get('ch-1', TrustLevel.COMPLETE) and: 'notification is sent' - 1 * mockInventoryEventProducer.publishAvcEvent('ch-1', 'trustLevel', 'NONE', 'COMPLETE') + 1 * mockInventoryEventProducer.sendAvcEvent('ch-1', 'trustLevel', 'NONE', 'COMPLETE') } def 'CmHandle trust level updated with same value'() { @@ -139,7 +139,7 @@ class TrustLevelManagerSpec extends Specification { then: 'the cm handle in the cache is not trusted' assert trustLevelPerCmHandleId.get('ch-1', TrustLevel.NONE) and: 'no notification is sent' - 0 * mockInventoryEventProducer.publishAvcEvent(*_) + 0 * mockInventoryEventProducer.sendAvcEvent(*_) } def 'Dmi trust level restored to complete with non trusted CmHandle'() { @@ -152,7 +152,7 @@ class TrustLevelManagerSpec extends Specification { then: 'the cm handle in the cache is still NONE' assert trustLevelPerCmHandleId.get('ch-1') == TrustLevel.NONE and: 'no notification is sent' - 0 * mockInventoryEventProducer.publishAvcEvent(*_) + 0 * mockInventoryEventProducer.sendAvcEvent(*_) } def 'Apply effective trust level among CmHandle and dmi plugin'() { diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/InventoryEventProducerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/InventoryEventProducerSpec.groovy index 1aa7aab0f3..21fc6563c2 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/InventoryEventProducerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/InventoryEventProducerSpec.groovy @@ -22,7 +22,7 @@ package org.onap.cps.ncmp.utils.events import com.fasterxml.jackson.databind.ObjectMapper import io.cloudevents.CloudEvent -import org.onap.cps.events.EventsPublisher +import org.onap.cps.events.EventsProducer import org.onap.cps.ncmp.config.CpsApplicationContext import org.onap.cps.ncmp.events.avc.ncmp_to_client.Avc import org.onap.cps.ncmp.events.avc.ncmp_to_client.AvcEvent @@ -32,10 +32,10 @@ import org.springframework.test.context.ContextConfiguration @ContextConfiguration(classes = [CpsApplicationContext, ObjectMapper, JsonObjectMapper]) class InventoryEventProducerSpec extends MessagingBaseSpec { - def mockEventsPublisher = Mock(EventsPublisher) - def objectUnderTest = new InventoryEventProducer(mockEventsPublisher) + def mockEventsProducer = Mock(EventsProducer) + def objectUnderTest = new InventoryEventProducer(mockEventsProducer) - def 'Publish an attribute value change event'() { + def 'Send an attribute value change event'() { given: 'the event key' def someEventKey = 'someEventKey' and: 'the name of the attribute being changed' @@ -44,10 +44,10 @@ class InventoryEventProducerSpec extends MessagingBaseSpec { def someOldAttributeValue = 'someOldAttributeValue' and: 'the new value of the attribute' def someNewAttributeValue = 'someNewAttributeValue' - when: 'an attribute value change event is published' - objectUnderTest.publishAvcEvent(someEventKey, someAttributeName, someOldAttributeValue, someNewAttributeValue) - then: 'the cloud event publisher is invoked with the correct data' - 1 * mockEventsPublisher.publishCloudEvent(_, someEventKey, + when: 'an attribute value change event is sent' + objectUnderTest.sendAvcEvent(someEventKey, someAttributeName, someOldAttributeValue, someNewAttributeValue) + then: 'the cloud event producer is invoked with the correct data' + 1 * mockEventsProducer.sendCloudEvent(_, someEventKey, cloudEvent -> { def actualAvcs = CloudEventMapper.toTargetEvent(cloudEvent, AvcEvent.class).data.attributeValueChange def expectedAvc = new Avc(attributeName: someAttributeName, diff --git a/cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsProducer.java b/cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsProducer.java index 3061fd2022..2d2a857604 100644 --- a/cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsProducer.java +++ b/cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsProducer.java @@ -42,7 +42,7 @@ import org.springframework.stereotype.Service; @RequiredArgsConstructor public class CpsDataUpdateEventsProducer { - private final EventsPublisher eventsPublisher; + private final EventsProducer eventsProducer; private final CpsNotificationService cpsNotificationService; @@ -56,16 +56,16 @@ public class CpsDataUpdateEventsProducer { private boolean notificationsEnabled; /** - * Publish the cps data update event with header to the public topic. + * Send the cps data update event with header to the public topic. * * @param anchor Anchor of the updated data * @param xpath xpath of the updated data * @param operation operation performed on the data * @param observedTimestamp timestamp when data was updated. */ - @Timed(value = "cps.dataupdate.events.publish", description = "Time taken to publish Data Update event") - public void publishCpsDataUpdateEvent(final Anchor anchor, final String xpath, - final Operation operation, final OffsetDateTime observedTimestamp) { + @Timed(value = "cps.dataupdate.events.publish", description = "Time taken to send Data Update event") + public void sendCpsDataUpdateEvent(final Anchor anchor, final String xpath, + final Operation operation, final OffsetDateTime observedTimestamp) { if (notificationsEnabled && cpsChangeEventNotificationsEnabled && isNotificationEnabledForAnchor(anchor)) { final CpsDataUpdatedEvent cpsDataUpdatedEvent = createCpsDataUpdatedEvent(anchor, observedTimestamp, xpath, operation); @@ -74,7 +74,7 @@ public class CpsDataUpdateEventsProducer { final CloudEvent cpsDataUpdatedEventAsCloudEvent = CpsEvent.builder().type(CpsDataUpdatedEvent.class.getTypeName()).data(cpsDataUpdatedEvent) .extensions(extensions).build().asCloudEvent(); - eventsPublisher.publishCloudEvent(topicName, updateEventId, cpsDataUpdatedEventAsCloudEvent); + eventsProducer.sendCloudEvent(topicName, updateEventId, cpsDataUpdatedEventAsCloudEvent); } else { log.debug("State of Overall Notifications : {} and Cps Change Event Notifications : {}", notificationsEnabled, cpsChangeEventNotificationsEnabled); diff --git a/cps-service/src/main/java/org/onap/cps/events/EventsPublisher.java b/cps-service/src/main/java/org/onap/cps/events/EventsProducer.java similarity index 76% rename from cps-service/src/main/java/org/onap/cps/events/EventsPublisher.java rename to cps-service/src/main/java/org/onap/cps/events/EventsProducer.java index 46384b5933..01e1ad183a 100644 --- a/cps-service/src/main/java/org/onap/cps/events/EventsPublisher.java +++ b/cps-service/src/main/java/org/onap/cps/events/EventsProducer.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022-2024 Nordix Foundation + * Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,13 +34,13 @@ import org.springframework.stereotype.Service; import org.springframework.util.SerializationUtils; /** - * EventsPublisher to publish events. + * EventsProducer to send events. */ @Slf4j @Service @RequiredArgsConstructor -public class EventsPublisher { +public class EventsProducer { /** * KafkaTemplate for legacy (non-cloud) events. @@ -51,49 +51,49 @@ public class EventsPublisher { private final KafkaTemplate cloudEventKafkaTemplate; /** - * Generic CloudEvent publisher. + * Generic CloudEvent sender. * * @param topicName valid topic name * @param eventKey message key * @param event message payload */ - public void publishCloudEvent(final String topicName, final String eventKey, final CloudEvent event) { + public void sendCloudEvent(final String topicName, final String eventKey, final CloudEvent event) { final CompletableFuture> eventFuture = cloudEventKafkaTemplate.send(topicName, eventKey, event); eventFuture.whenComplete((result, e) -> { if (e == null) { - log.debug("Successfully published event to topic : {} , Event : {}", result.getRecordMetadata().topic(), + log.debug("Successfully sent event to topic : {} , Event : {}", result.getRecordMetadata().topic(), result.getProducerRecord().value()); } else { - log.error("Unable to publish event to topic : {} due to {}", topicName, e.getMessage()); + log.error("Unable to send event to topic : {} due to {}", topicName, e.getMessage()); } }); } /** - * Generic Event publisher. + * Generic Event sender. * Note: Cloud events should be used. This will be addressed as part of https://lf-onap.atlassian.net/browse/CPS-1717. * * @param topicName valid topic name * @param eventKey message key * @param event message payload */ - public void publishEvent(final String topicName, final String eventKey, final T event) { + public void sendEvent(final String topicName, final String eventKey, final T event) { final CompletableFuture> eventFuture = legacyKafkaEventTemplate.send(topicName, eventKey, event); handleLegacyEventCallback(topicName, eventFuture); } /** - * Generic Event Publisher with headers. + * Generic Event sender with headers. * * @param topicName valid topic name * @param eventKey message key * @param eventHeaders event headers * @param event message payload */ - public void publishEvent(final String topicName, final String eventKey, final Headers eventHeaders, final T event) { + public void sendEvent(final String topicName, final String eventKey, final Headers eventHeaders, final T event) { final ProducerRecord producerRecord = new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders); @@ -102,27 +102,27 @@ public class EventsPublisher { } /** - * Generic Event Publisher with headers. + * Generic Event sender with headers. * * @param topicName valid topic name * @param eventKey message key * @param eventHeaders map of event headers * @param event message payload */ - public void publishEvent(final String topicName, final String eventKey, final Map eventHeaders, - final T event) { + public void sendEvent(final String topicName, final String eventKey, final Map eventHeaders, + final T event) { - publishEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event); + sendEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event); } private void handleLegacyEventCallback(final String topicName, final CompletableFuture> eventFuture) { eventFuture.whenComplete((result, e) -> { if (e == null) { - log.debug("Successfully published event to topic : {} , Event : {}", result.getRecordMetadata().topic(), + log.debug("Successfully sent event to topic : {} , Event : {}", result.getRecordMetadata().topic(), result.getProducerRecord().value()); } else { - log.error("Unable to publish event to topic : {} due to {}", topicName, e.getMessage()); + log.error("Unable to send event to topic : {} due to {}", topicName, e.getMessage()); } }); } diff --git a/cps-service/src/main/java/org/onap/cps/impl/CpsDataServiceImpl.java b/cps-service/src/main/java/org/onap/cps/impl/CpsDataServiceImpl.java index 586941a561..98f8d66405 100644 --- a/cps-service/src/main/java/org/onap/cps/impl/CpsDataServiceImpl.java +++ b/cps-service/src/main/java/org/onap/cps/impl/CpsDataServiceImpl.java @@ -396,7 +396,7 @@ public class CpsDataServiceImpl implements CpsDataService { final Operation operation, final OffsetDateTime observedTimestamp) { try { - cpsDataUpdateEventsProducer.publishCpsDataUpdateEvent(anchor, xpath, operation, observedTimestamp); + cpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(anchor, xpath, operation, observedTimestamp); } catch (final Exception exception) { log.error("Failed to send message to notification service", exception); } diff --git a/cps-service/src/test/groovy/org/onap/cps/events/CpsDataUpdateEventsProducerSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/events/CpsDataUpdateEventsProducerSpec.groovy index 641e399622..07ab2a3613 100644 --- a/cps-service/src/test/groovy/org/onap/cps/events/CpsDataUpdateEventsProducerSpec.groovy +++ b/cps-service/src/test/groovy/org/onap/cps/events/CpsDataUpdateEventsProducerSpec.groovy @@ -40,18 +40,18 @@ import static org.onap.cps.events.model.Data.Operation.UPDATE @ContextConfiguration(classes = [ObjectMapper, JsonObjectMapper]) class CpsDataUpdateEventsProducerSpec extends Specification { - def mockEventsPublisher = Mock(EventsPublisher) + def mockEventsProducer = Mock(EventsProducer) def objectMapper = new ObjectMapper(); def mockCpsNotificationService = Mock(CpsNotificationService) - def objectUnderTest = new CpsDataUpdateEventsProducer(mockEventsPublisher, mockCpsNotificationService) + def objectUnderTest = new CpsDataUpdateEventsProducer(mockEventsProducer, mockCpsNotificationService) def setup() { mockCpsNotificationService.isNotificationEnabled('dataspace01', 'anchor01') >> true objectUnderTest.topicName = 'cps-core-event' } - def 'Create and Publish cps update event where events are #scenario.'() { + def 'Create and send cps update event where events are #scenario.'() { given: 'an anchor, operation and observed timestamp' def anchor = new Anchor('anchor01', 'dataspace01', 'schema01'); def operation = operationInRequest @@ -60,10 +60,10 @@ class CpsDataUpdateEventsProducerSpec extends Specification { objectUnderTest.notificationsEnabled = true and: 'cpsChangeEventNotificationsEnabled is also true' objectUnderTest.cpsChangeEventNotificationsEnabled = true - when: 'service is called to publish data update event' - objectUnderTest.publishCpsDataUpdateEvent(anchor, xpath, operation, observedTimestamp) + when: 'service is called to send data update event' + objectUnderTest.sendCpsDataUpdateEvent(anchor, xpath, operation, observedTimestamp) then: 'the event contains the required attributes' - 1 * mockEventsPublisher.publishCloudEvent('cps-core-event', 'dataspace01:anchor01', _) >> { + 1 * mockEventsProducer.sendCloudEvent('cps-core-event', 'dataspace01:anchor01', _) >> { args -> { def cpsDataUpdatedEvent = (args[2] as CloudEvent) @@ -86,7 +86,7 @@ class CpsDataUpdateEventsProducerSpec extends Specification { 'non root node xpath and delete operation' | '/test/path' | DELETE || UPDATE } - def 'Publish cps update event when no timestamp provided.'() { + def 'Send cps update event when no timestamp provided.'() { given: 'an anchor, operation and null timestamp' def anchor = new Anchor('anchor01', 'dataspace01', 'schema01'); def observedTimestamp = null @@ -94,13 +94,13 @@ class CpsDataUpdateEventsProducerSpec extends Specification { objectUnderTest.notificationsEnabled = true and: 'cpsChangeEventNotificationsEnabled is true' objectUnderTest.cpsChangeEventNotificationsEnabled = true - when: 'service is called to publish data update event' - objectUnderTest.publishCpsDataUpdateEvent(anchor, '/', CREATE, observedTimestamp) - then: 'the event is published' - 1 * mockEventsPublisher.publishCloudEvent('cps-core-event', 'dataspace01:anchor01', _) + when: 'service is called to send data update event' + objectUnderTest.sendCpsDataUpdateEvent(anchor, '/', CREATE, observedTimestamp) + then: 'the event is sent' + 1 * mockEventsProducer.sendCloudEvent('cps-core-event', 'dataspace01:anchor01', _) } - def 'Enabling and disabling publish cps update events.'() { + def 'Enabling and disabling sending cps update events.'() { given: 'a different anchor' def anchor = new Anchor('anchor02', 'some dataspace', 'some schema'); and: 'notificationsEnabled is #notificationsEnabled' @@ -109,12 +109,12 @@ class CpsDataUpdateEventsProducerSpec extends Specification { objectUnderTest.cpsChangeEventNotificationsEnabled = cpsChangeEventNotificationsEnabled and: 'notification service enabled is: #cpsNotificationServiceisNotificationEnabled' mockCpsNotificationService.isNotificationEnabled(_, 'anchor02') >> cpsNotificationServiceisNotificationEnabled - when: 'service is called to publish data update event' - objectUnderTest.publishCpsDataUpdateEvent(anchor, '/', CREATE, null) - then: 'the event is only published when all related flags are true' - expectedCallsToPublisher * mockEventsPublisher.publishCloudEvent(*_) + when: 'service is called to send data update event' + objectUnderTest.sendCpsDataUpdateEvent(anchor, '/', CREATE, null) + then: 'the event is only sent when all related flags are true' + expectedCallsToProducer * mockEventsProducer.sendCloudEvent(*_) where: 'the following flags are used' - notificationsEnabled | cpsChangeEventNotificationsEnabled | cpsNotificationServiceisNotificationEnabled || expectedCallsToPublisher + notificationsEnabled | cpsChangeEventNotificationsEnabled | cpsNotificationServiceisNotificationEnabled || expectedCallsToProducer false | true | true || 0 true | false | true || 0 true | true | false || 0 diff --git a/cps-service/src/test/groovy/org/onap/cps/events/EventsPublisherSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/events/EventsProducerSpec.groovy similarity index 79% rename from cps-service/src/test/groovy/org/onap/cps/events/EventsPublisherSpec.groovy rename to cps-service/src/test/groovy/org/onap/cps/events/EventsProducerSpec.groovy index fe67287dec..e36d09387a 100644 --- a/cps-service/src/test/groovy/org/onap/cps/events/EventsPublisherSpec.groovy +++ b/cps-service/src/test/groovy/org/onap/cps/events/EventsProducerSpec.groovy @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2024 Nordix Foundation + * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,8 +31,6 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.header.Headers import org.apache.kafka.common.header.internals.RecordHeader import org.apache.kafka.common.header.internals.RecordHeaders -import org.junit.jupiter.api.AfterEach -import org.junit.jupiter.api.BeforeEach import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.support.SendResult @@ -41,27 +39,27 @@ import spock.lang.Specification import java.util.concurrent.CompletableFuture -class EventsPublisherSpec extends Specification { +class EventsProducerSpec extends Specification { def legacyKafkaTemplateMock = Mock(KafkaTemplate) def mockCloudEventKafkaTemplate = Mock(KafkaTemplate) def logger = Spy(ListAppender) void setup() { - def setupLogger = ((Logger) LoggerFactory.getLogger(EventsPublisher.class)) + def setupLogger = ((Logger) LoggerFactory.getLogger(EventsProducer.class)) setupLogger.setLevel(Level.DEBUG) setupLogger.addAppender(logger) logger.start() } void cleanup() { - ((Logger) LoggerFactory.getLogger(EventsPublisher.class)).detachAndStopAllAppenders() + ((Logger) LoggerFactory.getLogger(EventsProducer.class)).detachAndStopAllAppenders() } - def objectUnderTest = new EventsPublisher(legacyKafkaTemplateMock, mockCloudEventKafkaTemplate) + def objectUnderTest = new EventsProducer(legacyKafkaTemplateMock, mockCloudEventKafkaTemplate) - def 'Publish Cloud Event'() { - given: 'a successfully published event' + def 'Send Cloud Event'() { + given: 'a successfully sent event' def eventFuture = CompletableFuture.completedFuture( new SendResult( new ProducerRecord('some-topic', 'some-value'), @@ -70,30 +68,30 @@ class EventsPublisherSpec extends Specification { ) def someCloudEvent = Mock(CloudEvent) 1 * mockCloudEventKafkaTemplate.send('some-topic', 'some-event-key', someCloudEvent) >> eventFuture - when: 'publishing the cloud event' - objectUnderTest.publishCloudEvent('some-topic', 'some-event-key', someCloudEvent) + when: 'sending the cloud event' + objectUnderTest.sendCloudEvent('some-topic', 'some-event-key', someCloudEvent) then: 'the correct debug message is logged' def lastLoggingEvent = logger.list[0] assert lastLoggingEvent.level == Level.DEBUG - assert lastLoggingEvent.formattedMessage.contains('Successfully published event') + assert lastLoggingEvent.formattedMessage.contains('Successfully sent event') } - def 'Publish Cloud Event with Exception'() { + def 'Send Cloud Event with Exception'() { given: 'a failed event' def eventFutureWithFailure = new CompletableFuture>() eventFutureWithFailure.completeExceptionally(new RuntimeException('some exception')) def someCloudEvent = Mock(CloudEvent) 1 * mockCloudEventKafkaTemplate.send('some-topic', 'some-event-key', someCloudEvent) >> eventFutureWithFailure - when: 'publishing the cloud event' - objectUnderTest.publishCloudEvent('some-topic', 'some-event-key', someCloudEvent) + when: 'sending the cloud event' + objectUnderTest.sendCloudEvent('some-topic', 'some-event-key', someCloudEvent) then: 'the correct error message is logged' def lastLoggingEvent = logger.list[0] assert lastLoggingEvent.level == Level.ERROR - assert lastLoggingEvent.formattedMessage.contains('Unable to publish event') + assert lastLoggingEvent.formattedMessage.contains('Unable to send event') } - def 'Publish Legacy Event'() { - given: 'a successfully published event' + def 'Send Legacy Event'() { + given: 'a successfully sent event' def eventFuture = CompletableFuture.completedFuture( new SendResult( new ProducerRecord('some-topic', 'some-value'), @@ -102,16 +100,16 @@ class EventsPublisherSpec extends Specification { ) def someEvent = Mock(Object) 1 * legacyKafkaTemplateMock.send('some-topic', 'some-event-key', someEvent) >> eventFuture - when: 'publishing the cloud event' - objectUnderTest.publishEvent('some-topic', 'some-event-key', someEvent) + when: 'sending the cloud event' + objectUnderTest.sendEvent('some-topic', 'some-event-key', someEvent) then: 'the correct debug message is logged' def lastLoggingEvent = logger.list[0] assert lastLoggingEvent.level == Level.DEBUG - assert lastLoggingEvent.formattedMessage.contains('Successfully published event') + assert lastLoggingEvent.formattedMessage.contains('Successfully sent event') } - def 'Publish Legacy Event with Headers as Map'() { - given: 'a successfully published event' + def 'Send Legacy Event with Headers as Map'() { + given: 'a successfully sent event' def sampleEventHeaders = ['k1': SerializationUtils.serialize('v1')] def eventFuture = CompletableFuture.completedFuture( new SendResult( @@ -120,18 +118,18 @@ class EventsPublisherSpec extends Specification { ) ) def someEvent = Mock(Object.class) - when: 'publishing the legacy event' - objectUnderTest.publishEvent('some-topic', 'some-event-key', sampleEventHeaders, someEvent) - then: 'event is published' + when: 'sending the legacy event' + objectUnderTest.sendEvent('some-topic', 'some-event-key', sampleEventHeaders, someEvent) + then: 'event is sent' 1 * legacyKafkaTemplateMock.send(_) >> eventFuture and: 'the correct debug message is logged' def lastLoggingEvent = logger.list[0] assert lastLoggingEvent.level == Level.DEBUG - assert lastLoggingEvent.formattedMessage.contains('Successfully published event') + assert lastLoggingEvent.formattedMessage.contains('Successfully sent event') } - def 'Publish Legacy Event with Record Headers'() { - given: 'a successfully published event' + def 'Send Legacy Event with Record Headers'() { + given: 'a successfully sent event' def sampleEventHeaders = new RecordHeaders([new RecordHeader('k1', SerializationUtils.serialize('v1'))]) def sampleProducerRecord = new ProducerRecord('some-topic', null, 'some-key', 'some-value', sampleEventHeaders) def eventFuture = CompletableFuture.completedFuture( @@ -141,18 +139,18 @@ class EventsPublisherSpec extends Specification { ) ) def someEvent = Mock(Object.class) - when: 'publishing the legacy event' - objectUnderTest.publishEvent('some-topic', 'some-event-key', sampleEventHeaders, someEvent) - then: 'event is published' + when: 'sending the legacy event' + objectUnderTest.sendEvent('some-topic', 'some-event-key', sampleEventHeaders, someEvent) + then: 'event is sent' 1 * legacyKafkaTemplateMock.send(_) >> eventFuture and: 'the correct debug message is logged' def lastLoggingEvent = logger.list[0] assert lastLoggingEvent.level == Level.DEBUG - assert lastLoggingEvent.formattedMessage.contains('Successfully published event') + assert lastLoggingEvent.formattedMessage.contains('Successfully sent event') } def 'Handle Legacy Event Callback'() { - given: 'an event is successfully published' + given: 'an event is successfully sent' def eventFuture = CompletableFuture.completedFuture( new SendResult( new ProducerRecord('some-topic', 'some-value'), @@ -164,11 +162,11 @@ class EventsPublisherSpec extends Specification { then: 'the correct debug message is logged' def lastLoggingEvent = logger.list[0] assert lastLoggingEvent.level == Level.DEBUG - assert lastLoggingEvent.formattedMessage.contains('Successfully published event') + assert lastLoggingEvent.formattedMessage.contains('Successfully sent event') } def 'Handle Legacy Event Callback with Exception'() { - given: 'a failure to publish an event' + given: 'a failure to send an event' def eventFutureWithFailure = new CompletableFuture>() eventFutureWithFailure.completeExceptionally(new RuntimeException('some exception')) when: 'handling legacy event callback' @@ -176,7 +174,7 @@ class EventsPublisherSpec extends Specification { then: 'the correct error message is logged' def lastLoggingEvent = logger.list[0] assert lastLoggingEvent.level == Level.ERROR - assert lastLoggingEvent.formattedMessage.contains('Unable to publish event') + assert lastLoggingEvent.formattedMessage.contains('Unable to send event') } def 'Convert to kafka headers'() { diff --git a/cps-service/src/test/groovy/org/onap/cps/impl/CpsDataServiceImplSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/impl/CpsDataServiceImplSpec.groovy index a4bfd569c5..4085a0897b 100644 --- a/cps-service/src/test/groovy/org/onap/cps/impl/CpsDataServiceImplSpec.groovy +++ b/cps-service/src/test/groovy/org/onap/cps/impl/CpsDataServiceImplSpec.groovy @@ -577,8 +577,8 @@ class CpsDataServiceImplSpec extends Specification { and: 'the persistence service method is invoked with the correct parameters' 1 * mockCpsDataPersistenceService.deleteDataNodes(dataspaceName, _ as Collection) and: 'a data update event is sent for each anchor' - 1 * mockCpsDataUpdateEventsProducer.publishCpsDataUpdateEvent(anchor1, '/', DELETE, observedTimestamp) - 1 * mockCpsDataUpdateEventsProducer.publishCpsDataUpdateEvent(anchor2, '/', DELETE, observedTimestamp) + 1 * mockCpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(anchor1, '/', DELETE, observedTimestamp) + 1 * mockCpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(anchor2, '/', DELETE, observedTimestamp) } def "Validating #scenario when dry run is enabled."() { @@ -639,11 +639,11 @@ class CpsDataServiceImplSpec extends Specification { 1 * mockCpsDataPersistenceService.lockAnchor('some-sessionId', 'some-dataspaceName', 'some-anchorName', 250L) } - def 'Exception is thrown while publishing the notification.'(){ + def 'Exception is thrown while sending the notification.'(){ given: 'schema set for given anchor and dataspace references test-tree model' setupSchemaSetMocks('test-tree.yang') - when: 'publisher set to throw an exception' - mockCpsDataUpdateEventsProducer.publishCpsDataUpdateEvent(_, _, _, _) >> { throw new Exception("publishing failed")} + when: 'producer throws an exception while sending event' + mockCpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(_, _, _, _) >> { throw new Exception("Sending failed")} and: 'an update event is performed' objectUnderTest.updateNodeLeaves(dataspaceName, anchorName, '/', '{"test-tree": {"branch": []}}', observedTimestamp, ContentType.JSON) then: 'the exception is not bubbled up'