Refactor Consumers/Producers based on agreed format #2 07/140607/3
authoremaclee <lee.anjella.macabuhay@est.tech>
Thu, 27 Mar 2025 12:40:26 +0000 (12:40 +0000)
committeremaclee <lee.anjella.macabuhay@est.tech>
Sun, 6 Apr 2025 20:17:19 +0000 (21:17 +0100)
- 'EventsPublisher' to 'EventsProducer'
- terms 'publish' is replaced with 'send' to follow kafka
  methods
- LcmEventsProducer is not fully changed as it may affect
  metrics; will handle on seperate patch

Issue-ID: CPS-2597
Change-Id: I310fc60fd0ff85eb83f2f3c6f9b54c569b3ff902
Signed-off-by: emaclee <lee.anjella.macabuhay@est.tech>
36 files changed:
cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/KafkaConfig.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventProducer.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducer.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventPublishingTask.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/DmiDataOperations.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumer.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DmiAsyncRequestResponseEventConsumer.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelper.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducer.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/trustlevel/TrustLevelManager.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/utils/events/InventoryEventProducer.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventProducerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImplSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/DmiDataOperationsSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/FilterStrategiesIntegrationSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/SerializationIntegrationSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelperSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventsProducerSpec.groovy [moved from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsPublisherSpec.groovy with 90% similarity]
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/trustlevel/TrustLevelManagerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/InventoryEventProducerSpec.groovy
cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsProducer.java
cps-service/src/main/java/org/onap/cps/events/EventsProducer.java [moved from cps-service/src/main/java/org/onap/cps/events/EventsPublisher.java with 76% similarity]
cps-service/src/main/java/org/onap/cps/impl/CpsDataServiceImpl.java
cps-service/src/test/groovy/org/onap/cps/events/CpsDataUpdateEventsProducerSpec.groovy
cps-service/src/test/groovy/org/onap/cps/events/EventsProducerSpec.groovy [moved from cps-service/src/test/groovy/org/onap/cps/events/EventsPublisherSpec.groovy with 79% similarity]
cps-service/src/test/groovy/org/onap/cps/impl/CpsDataServiceImplSpec.groovy

index 94c113c..e934530 100644 (file)
@@ -2,7 +2,7 @@
  *  ============LICENSE_START=======================================================
  *  Copyright (C) 2021 Pantheon.tech
  *  Modifications Copyright (C) 2021 highstreet technologies GmbH
- *  Modifications Copyright (C) 2021-2025 OpenInfra Foundation Europe
+ *  Modifications Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
  *  Modifications Copyright (C) 2021-2022 Bell Canada.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
@@ -31,7 +31,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
 import groovy.json.JsonSlurper
 import org.mapstruct.factory.Mappers
 import org.onap.cps.TestUtils
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
 import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl
 import org.onap.cps.ncmp.api.inventory.models.CompositeState
 import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
@@ -134,7 +134,7 @@ class NetworkCmProxyControllerSpec extends Specification {
     }
 
     def cleanup() {
-        ((Logger) LoggerFactory.getLogger(EventsPublisher.class)).detachAndStopAllAppenders()
+        ((Logger) LoggerFactory.getLogger(EventsProducer.class)).detachAndStopAllAppenders()
     }
 
     def 'Get Resource Data from pass-through operational.'() {
index 3d3c3db..8475be6 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- * Copyright (C) 2023-2024 Nordix Foundation
+ * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -47,7 +47,7 @@ import org.springframework.kafka.support.serializer.JsonSerializer;
 /**
  * kafka Configuration for legacy and cloud events.
  *
- * @param <T> valid legacy event to be published over the wire.
+ * @param <T> valid legacy event to be sent over the wire.
  */
 @Configuration
 @EnableKafka
index 2d1f648..eca8380 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- * Copyright (c) 2023-2024 Nordix Foundation.
+ * Copyright (c) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -24,7 +24,7 @@ import io.cloudevents.CloudEvent;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.onap.cps.events.EventsPublisher;
+import org.onap.cps.events.EventsProducer;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.kafka.annotation.KafkaListener;
@@ -43,7 +43,7 @@ public class CmAvcEventConsumer {
     @Value("${app.ncmp.avc.cm-events-topic}")
     private String cmEventsTopicName;
 
-    private final EventsPublisher<CloudEvent> eventsPublisher;
+    private final EventsProducer<CloudEvent> eventsProducer;
 
     /**
      * Incoming Cm AvcEvent in the form of Consumer Record, it will be forwarded as is to a target topic.
@@ -58,6 +58,6 @@ public class CmAvcEventConsumer {
         final CloudEvent outgoingAvcEvent = cmAvcEventAsConsumerRecord.value();
         final String outgoingAvcEventKey = cmAvcEventAsConsumerRecord.key();
         log.debug("Consuming AVC event with key : {} and value : {}", outgoingAvcEventKey, outgoingAvcEvent);
-        eventsPublisher.publishCloudEvent(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent);
+        eventsProducer.sendCloudEvent(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent);
     }
 }
index 232803a..baa9926 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2024-2025 Nordix Foundation
+ *  Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -25,7 +25,7 @@ import static org.onap.cps.ncmp.events.NcmpEventDataSchema.SUBSCRIPTIONS_V1;
 import io.cloudevents.CloudEvent;
 import java.util.Map;
 import lombok.RequiredArgsConstructor;
-import org.onap.cps.events.EventsPublisher;
+import org.onap.cps.events.EventsProducer;
 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent;
 import org.onap.cps.ncmp.utils.events.NcmpEvent;
 import org.springframework.beans.factory.annotation.Value;
@@ -37,22 +37,22 @@ import org.springframework.stereotype.Component;
 @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
 public class DmiInEventProducer {
 
-    private final EventsPublisher<CloudEvent> eventsPublisher;
+    private final EventsProducer<CloudEvent> eventsProducer;
 
     @Value("${app.ncmp.avc.cm-subscription-dmi-in}")
     private String dmiInEventTopic;
 
     /**
-     * Publish the event to the provided dmi plugin with key as subscription id and the event is in Cloud Event format.
+     * Send the event to the provided dmi plugin with key as subscription id and the event is in Cloud Event format.
      *
      * @param subscriptionId Cm Subscription Id
      * @param dmiPluginName  Dmi Plugin Name
      * @param eventType      Type of event
      * @param dmiInEvent     Cm Notification Subscription event for Dmi
      */
-    public void publishDmiInEvent(final String subscriptionId, final String dmiPluginName,
-            final String eventType, final DmiInEvent dmiInEvent) {
-        eventsPublisher.publishCloudEvent(dmiInEventTopic, subscriptionId,
+    public void sendDmiInEvent(final String subscriptionId, final String dmiPluginName,
+                               final String eventType, final DmiInEvent dmiInEvent) {
+        eventsProducer.sendCloudEvent(dmiInEventTopic, subscriptionId,
                 buildAndGetDmiInEventAsCloudEvent(subscriptionId, dmiPluginName, eventType, dmiInEvent));
 
     }
index 98c66af..d5e7106 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2024 Nordix Foundation
+ *  Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -105,7 +105,7 @@ public class DmiOutEventConsumer {
     private void handleEventsStatusPerDmi(final String subscriptionId, final String eventType) {
         final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi = dmiCacheHandler.get(subscriptionId);
         final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEvent(subscriptionId, dmiSubscriptionsPerDmi);
-        ncmpOutEventProducer.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false);
+        ncmpOutEventProducer.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false);
     }
 
     private boolean checkStatusCodeAndMessage(final NcmpResponseStatus ncmpResponseStatus,
index 1b368dd..f6ac0cf 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2024 Nordix Foundation
+ *  Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -75,7 +75,7 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler {
             handleNewCmSubscription(subscriptionId);
             scheduleNcmpOutEventResponse(subscriptionId, "subscriptionCreateResponse");
         } else {
-            rejectAndPublishCreateRequest(subscriptionId, predicates);
+            rejectAndSendCreateRequest(subscriptionId, predicates);
         }
     }
 
@@ -87,7 +87,7 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler {
                 getLastRemainingAndOverlappingSubscriptionsPerDmi(subscriptionDataNodes);
         dmiCacheHandler.add(subscriptionId, mergeDmiCmSubscriptionDetailsPerDmiMaps(dmiCmSubscriptionTuple));
         if (dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi().isEmpty()) {
-            acceptAndPublishDeleteRequest(subscriptionId);
+            acceptAndSendDeleteRequest(subscriptionId);
         } else {
             sendSubscriptionDeleteRequestToDmi(subscriptionId,
                     dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi(
@@ -122,19 +122,19 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler {
     }
 
     private void scheduleNcmpOutEventResponse(final String subscriptionId, final String eventType) {
-        ncmpOutEventProducer.publishNcmpOutEvent(subscriptionId, eventType, null, true);
+        ncmpOutEventProducer.sendNcmpOutEvent(subscriptionId, eventType, null, true);
     }
 
-    private void rejectAndPublishCreateRequest(final String subscriptionId, final List<Predicate> predicates) {
+    private void rejectAndSendCreateRequest(final String subscriptionId, final List<Predicate> predicates) {
         final Set<String> subscriptionTargetFilters =
                 predicates.stream().flatMap(predicate -> predicate.getTargetFilter().stream())
                         .collect(Collectors.toSet());
         final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEventForRejectedRequest(subscriptionId,
                 new ArrayList<>(subscriptionTargetFilters));
-        ncmpOutEventProducer.publishNcmpOutEvent(subscriptionId, "subscriptionCreateResponse", ncmpOutEvent, false);
+        ncmpOutEventProducer.sendNcmpOutEvent(subscriptionId, "subscriptionCreateResponse", ncmpOutEvent, false);
     }
 
-    private void acceptAndPublishDeleteRequest(final String subscriptionId) {
+    private void acceptAndSendDeleteRequest(final String subscriptionId) {
         final Set<String> dmiServiceNames = dmiCacheHandler.get(subscriptionId).keySet();
         for (final String dmiServiceName : dmiServiceNames) {
             dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiServiceName,
@@ -143,7 +143,7 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler {
         }
         final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEvent(subscriptionId,
                 dmiCacheHandler.get(subscriptionId));
-        ncmpOutEventProducer.publishNcmpOutEvent(subscriptionId, "subscriptionDeleteResponse", ncmpOutEvent,
+        ncmpOutEventProducer.sendNcmpOutEvent(subscriptionId, "subscriptionDeleteResponse", ncmpOutEvent,
                 false);
     }
 
@@ -158,15 +158,15 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler {
             if (dmiCmSubscriptionPredicates.isEmpty()) {
                 acceptAndPersistCmSubscriptionPerDmi(subscriptionId, dmiPluginName);
             } else {
-                publishDmiInEventPerDmi(subscriptionId, dmiPluginName, dmiCmSubscriptionPredicates);
+                sendDmiInEventPerDmi(subscriptionId, dmiPluginName, dmiCmSubscriptionPredicates);
             }
         });
     }
 
-    private void publishDmiInEventPerDmi(final String subscriptionId, final String dmiPluginName,
-                                         final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
+    private void sendDmiInEventPerDmi(final String subscriptionId, final String dmiPluginName,
+                                      final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
         final DmiInEvent dmiInEvent = dmiInEventMapper.toDmiInEvent(dmiCmSubscriptionPredicates);
-        dmiInEventProducer.publishDmiInEvent(subscriptionId, dmiPluginName,
+        dmiInEventProducer.sendDmiInEvent(subscriptionId, dmiPluginName,
                 "subscriptionCreateRequest", dmiInEvent);
     }
 
@@ -183,7 +183,7 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler {
             final DmiInEvent dmiInEvent =
                     dmiInEventMapper.toDmiInEvent(
                             dmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates());
-            dmiInEventProducer.publishDmiInEvent(subscriptionId,
+            dmiInEventProducer.sendDmiInEvent(subscriptionId,
                     dmiPluginName, "subscriptionDeleteRequest", dmiInEvent);
         });
     }
index 8cfb3ad..639fb65 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2024-2025 Nordix Foundation
+ *  Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -31,7 +31,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.events.EventsPublisher;
+import org.onap.cps.events.EventsProducer;
 import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler;
 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
 import org.onap.cps.ncmp.utils.events.NcmpEvent;
@@ -51,7 +51,7 @@ public class NcmpOutEventProducer {
     @Value("${ncmp.timers.subscription-forwarding.dmi-response-timeout-ms}")
     private Integer dmiOutEventTimeoutInMs;
 
-    private final EventsPublisher<CloudEvent> eventsPublisher;
+    private final EventsProducer<CloudEvent> eventsProducer;
     private final NcmpOutEventMapper ncmpOutEventMapper;
     private final DmiCacheHandler dmiCacheHandler;
     private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
@@ -59,7 +59,7 @@ public class NcmpOutEventProducer {
             new ConcurrentHashMap<>();
 
     /**
-     * Publish the event to the client who requested the subscription with key as subscription id and event is Cloud
+     * Send the event to the client who requested the subscription with key as subscription id and event is Cloud
      * Event compliant.
      *
      * @param subscriptionId   Cm Subscription Id
@@ -67,23 +67,23 @@ public class NcmpOutEventProducer {
      * @param ncmpOutEvent     Cm Notification Subscription Event for the
      *                         client
      * @param isScheduledEvent Determines if the event is to be scheduled
-     *                         or published now
+     *                         or send now
      */
-    public void publishNcmpOutEvent(final String subscriptionId, final String eventType,
-            final NcmpOutEvent ncmpOutEvent, final boolean isScheduledEvent) {
+    public void sendNcmpOutEvent(final String subscriptionId, final String eventType,
+                                 final NcmpOutEvent ncmpOutEvent, final boolean isScheduledEvent) {
 
         final String taskKey = subscriptionId.concat(eventType);
 
         if (isScheduledEvent && !scheduledTasksPerSubscriptionIdAndEventType.containsKey(taskKey)) {
-            final ScheduledFuture<?> scheduledFuture = scheduleAndPublishNcmpOutEvent(subscriptionId, eventType);
+            final ScheduledFuture<?> scheduledFuture = scheduleAndSendNcmpOutEvent(subscriptionId, eventType);
             scheduledTasksPerSubscriptionIdAndEventType.putIfAbsent(taskKey, scheduledFuture);
             log.debug("Scheduled the Cm Subscription Event for subscriptionId : {} and eventType : {}", subscriptionId,
                     eventType);
         } else {
             cancelScheduledTask(taskKey);
             if (ncmpOutEvent != null) {
-                publishNcmpOutEventNow(subscriptionId, eventType, ncmpOutEvent);
-                log.debug("Published Cm Subscription Event on demand for subscriptionId : {} and eventType : {}",
+                sendNcmpOutEventNow(subscriptionId, eventType, ncmpOutEvent);
+                log.debug("Sent Cm Subscription Event on demand for subscriptionId : {} and eventType : {}",
                         subscriptionId, eventType);
             }
         }
@@ -109,9 +109,9 @@ public class NcmpOutEventProducer {
                        .asCloudEvent();
     }
 
-    private ScheduledFuture<?> scheduleAndPublishNcmpOutEvent(final String subscriptionId, final String eventType) {
+    private ScheduledFuture<?> scheduleAndSendNcmpOutEvent(final String subscriptionId, final String eventType) {
         final NcmpOutEventPublishingTask ncmpOutEventPublishingTask =
-                new NcmpOutEventPublishingTask(ncmpOutEventTopic, subscriptionId, eventType, eventsPublisher,
+                new NcmpOutEventPublishingTask(ncmpOutEventTopic, subscriptionId, eventType, eventsProducer,
                         ncmpOutEventMapper, dmiCacheHandler);
         return scheduledExecutorService.schedule(ncmpOutEventPublishingTask, dmiOutEventTimeoutInMs,
                 TimeUnit.MILLISECONDS);
@@ -125,11 +125,11 @@ public class NcmpOutEventProducer {
         }
     }
 
-    private void publishNcmpOutEventNow(final String subscriptionId, final String eventType,
-            final NcmpOutEvent ncmpOutEvent) {
+    private void sendNcmpOutEventNow(final String subscriptionId, final String eventType,
+                                     final NcmpOutEvent ncmpOutEvent) {
         final CloudEvent ncmpOutEventAsCloudEvent =
                 buildAndGetNcmpOutEventAsCloudEvent(subscriptionId, eventType, ncmpOutEvent);
-        eventsPublisher.publishCloudEvent(ncmpOutEventTopic, subscriptionId, ncmpOutEventAsCloudEvent);
+        eventsProducer.sendCloudEvent(ncmpOutEventTopic, subscriptionId, ncmpOutEventAsCloudEvent);
         dmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId);
     }
 
index e9d6d78..80d7981 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2024-2025 Nordix Foundation
+ *  Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -26,7 +26,7 @@ import io.cloudevents.CloudEvent;
 import java.util.Map;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.events.EventsPublisher;
+import org.onap.cps.events.EventsProducer;
 import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler;
 import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
@@ -38,12 +38,12 @@ public class NcmpOutEventPublishingTask implements Runnable {
     private final String topicName;
     private final String subscriptionId;
     private final String eventType;
-    private final EventsPublisher<CloudEvent> eventsPublisher;
+    private final EventsProducer<CloudEvent> eventsProducer;
     private final NcmpOutEventMapper ncmpOutEventMapper;
     private final DmiCacheHandler dmiCacheHandler;
 
     /**
-     * Delegating the responsibility of publishing NcmpOutEvent as a separate task which will
+     * Delegating the responsibility of sending NcmpOutEvent as a separate task which will
      * be called after a specified delay.
      */
     @Override
@@ -52,7 +52,7 @@ public class NcmpOutEventPublishingTask implements Runnable {
                 dmiCacheHandler.get(subscriptionId);
         final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEvent(subscriptionId,
                 dmiSubscriptionsPerDmi);
-        eventsPublisher.publishCloudEvent(topicName, subscriptionId,
+        eventsProducer.sendCloudEvent(topicName, subscriptionId,
                 buildAndGetNcmpOutEventAsCloudEvent(subscriptionId, eventType, ncmpOutEvent));
         dmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId);
     }
index 2a0d2f5..0e9db3d 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2021-2025 Nordix Foundation
+ *  Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
  *  Modifications Copyright (C) 2022 Bell Canada
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
@@ -309,7 +309,7 @@ public class DmiDataOperations {
             cmHandleIdsPerResponseCodesPerOperation.add(dmiDataOperationRequestBody,
                     Map.of(dmiClientRequestException.getNcmpResponseStatus(), cmHandleIds));
         });
-        DmiDataOperationsHelper.publishErrorMessageToClientTopic(topicName, requestId,
+        DmiDataOperationsHelper.sendErrorMessageToClientTopic(topicName, requestId,
                 cmHandleIdsPerResponseCodesPerOperation);
     }
 }
index 6f368da..22f20c8 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- * Copyright (C) 2023-2024 Nordix Foundation
+ * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -25,7 +25,7 @@ import io.cloudevents.kafka.impl.KafkaHeaders;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.onap.cps.events.EventsPublisher;
+import org.onap.cps.events.EventsProducer;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
@@ -39,11 +39,11 @@ import org.springframework.stereotype.Component;
 @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
 public class DataOperationEventConsumer {
 
-    private final EventsPublisher<CloudEvent> eventsPublisher;
+    private final EventsProducer<CloudEvent> eventsProducer;
 
     /**
-     * Consume the DataOperation cloud event published by producer to topic 'async-m2m.topic'
-     * and publish the same to client specified topic.
+     * Consume the DataOperation cloud event sent by producer to topic 'async-m2m.topic'
+     * and send the same to client specified topic.
      *
      * @param dataOperationEventConsumerRecord consuming event as a ConsumerRecord.
      */
@@ -52,12 +52,12 @@ public class DataOperationEventConsumer {
             filter = "includeDataOperationEventsOnly",
             groupId = "ncmp-data-operation-event-group",
             containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
-    public void consumeAndPublish(final ConsumerRecord<String, CloudEvent> dataOperationEventConsumerRecord) {
+    public void consumeAndSend(final ConsumerRecord<String, CloudEvent> dataOperationEventConsumerRecord) {
         log.debug("Consuming event payload {} ...", dataOperationEventConsumerRecord.value());
         final String eventTarget = KafkaHeaders.getParsedKafkaHeader(
                 dataOperationEventConsumerRecord.headers(), "ce_destination");
         final String correlationId = KafkaHeaders.getParsedKafkaHeader(
                 dataOperationEventConsumerRecord.headers(), "ce_correlationid");
-        eventsPublisher.publishCloudEvent(eventTarget, correlationId, dataOperationEventConsumerRecord.value());
+        eventsProducer.sendCloudEvent(eventTarget, correlationId, dataOperationEventConsumerRecord.value());
     }
 }
index e2803e8..7caa280 100644 (file)
@@ -22,7 +22,7 @@ package org.onap.cps.ncmp.impl.data.async;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.events.EventsPublisher;
+import org.onap.cps.events.EventsProducer;
 import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent;
 import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -38,7 +38,7 @@ import org.springframework.stereotype.Component;
 @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
 public class DmiAsyncRequestResponseEventConsumer {
 
-    private final EventsPublisher<NcmpAsyncRequestResponseEvent> eventsPublisher;
+    private final EventsProducer<NcmpAsyncRequestResponseEvent> eventsProducer;
     private final NcmpAsyncRequestResponseEventMapper ncmpAsyncRequestResponseEventMapper;
 
     /**
@@ -55,7 +55,7 @@ public class DmiAsyncRequestResponseEventConsumer {
         log.debug("Consuming event {} ...", dmiAsyncRequestResponseEvent);
         final NcmpAsyncRequestResponseEvent ncmpAsyncRequestResponseEvent =
                 ncmpAsyncRequestResponseEventMapper.toNcmpAsyncEvent(dmiAsyncRequestResponseEvent);
-        eventsPublisher.publishEvent(ncmpAsyncRequestResponseEvent.getEventTarget(),
+        eventsProducer.sendEvent(ncmpAsyncRequestResponseEvent.getEventTarget(),
                                      ncmpAsyncRequestResponseEvent.getEventId(),
                                      ncmpAsyncRequestResponseEvent);
     }
index cb435f4..ee3f6fe 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2023-2024 Nordix Foundation
+ *  Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -34,7 +34,7 @@ import java.util.Set;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.events.EventsPublisher;
+import org.onap.cps.events.EventsProducer;
 import org.onap.cps.ncmp.api.NcmpResponseStatus;
 import org.onap.cps.ncmp.api.data.models.DataOperationDefinition;
 import org.onap.cps.ncmp.api.data.models.DataOperationRequest;
@@ -114,7 +114,7 @@ public class DmiDataOperationsHelper {
                     DmiDataOperation.buildDmiDataOperationRequestBodyWithoutCmHandles(dataOperationDefinitionIn),
                     CM_HANDLES_NOT_READY, nonReadyCmHandleReferences);
         }
-        publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleReferencesPerResponseCodesPerOperation);
+        sendErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleReferencesPerResponseCodesPerOperation);
         return dmiDataOperationsOutPerDmiServiceName;
     }
 
@@ -127,24 +127,24 @@ public class DmiDataOperationsHelper {
     }
 
     /**
-     * Creates data operation cloud event and publish it to client topic.
+     * Creates data operation cloud event and sends it to client topic.
      *
      * @param clientTopic                              client given topic
      * @param requestId                                unique identifier per request
      * @param cmHandleIdsPerResponseCodesPerOperation  list of cm handle ids per operation with response code
      */
-    public static void publishErrorMessageToClientTopic(final String clientTopic,
-                                                         final String requestId,
-                                                         final MultiValueMap<DmiDataOperation,
+    public static void sendErrorMessageToClientTopic(final String clientTopic,
+                                                     final String requestId,
+                                                     final MultiValueMap<DmiDataOperation,
                                                                  Map<NcmpResponseStatus, List<String>>>
                                                                     cmHandleIdsPerResponseCodesPerOperation) {
         if (!cmHandleIdsPerResponseCodesPerOperation.isEmpty()) {
             final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic,
                     requestId, cmHandleIdsPerResponseCodesPerOperation);
-            final EventsPublisher<CloudEvent> eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class);
-            log.warn("publishing error message to client topic: {} ,requestId: {}, data operation cloud event id: {}",
+            final EventsProducer<CloudEvent> eventsProducer = CpsApplicationContext.getCpsBean(EventsProducer.class);
+            log.warn("sending error message to client topic: {} ,requestId: {}, data operation cloud event id: {}",
                     clientTopic, requestId, dataOperationCloudEvent.getId());
-            eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
+            eventsProducer.sendCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
         }
     }
 
index d62688d..2ed407f 100644 (file)
@@ -28,7 +28,7 @@ import java.util.List;
 import java.util.Map;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.events.EventsPublisher;
+import org.onap.cps.events.EventsProducer;
 import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
 import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader;
 import org.onap.cps.ncmp.events.lcm.v1.Values;
@@ -38,7 +38,7 @@ import org.springframework.kafka.KafkaException;
 import org.springframework.stereotype.Service;
 
 /**
- * LcmEventsProducer to call the publisher and publish on the dedicated topic.
+ * LcmEventsProducer to call the producer and send on the dedicated topic.
  */
 
 @Slf4j
@@ -49,7 +49,7 @@ public class LcmEventsProducer {
     private static final Tag TAG_METHOD = Tag.of("method", "publishLcmEvent");
     private static final Tag TAG_CLASS = Tag.of("class", LcmEventsProducer.class.getName());
     private static final String UNAVAILABLE_CM_HANDLE_STATE = "N/A";
-    private final EventsPublisher<LcmEvent> eventsPublisher;
+    private final EventsProducer<LcmEvent> eventsProducer;
     private final JsonObjectMapper jsonObjectMapper;
     private final MeterRegistry meterRegistry;
 
@@ -74,7 +74,7 @@ public class LcmEventsProducer {
             try {
                 final Map<String, Object> lcmEventHeadersMap =
                         jsonObjectMapper.convertToValueType(lcmEventHeader, Map.class);
-                eventsPublisher.publishEvent(topicName, cmHandleId, lcmEventHeadersMap, lcmEvent);
+                eventsProducer.sendEvent(topicName, cmHandleId, lcmEventHeadersMap, lcmEvent);
             } catch (final KafkaException e) {
                 log.error("Unable to publish message to topic : {} and cause : {}", topicName, e.getMessage());
             } finally {
index 27ad535..944b5eb 100644 (file)
@@ -68,7 +68,7 @@ public class TrustLevelManager {
     }
 
     /**
-     * Add cmHandles to the cache and publish notification for initial trust level of cmHandles if it is NONE.
+     * Add cmHandles to the cache and send notification for initial trust level of cmHandles if it is NONE.
      *
      * @param cmHandlesToBeCreated a list of cmHandles being created
      */
@@ -82,7 +82,7 @@ public class TrustLevelManager {
             }
             trustLevelPerCmHandleIdForCache.put(cmHandleId, initialTrustLevel);
             if (TrustLevel.NONE.equals(initialTrustLevel)) {
-                inventoryEventProducer.publishAvcEvent(cmHandleId,
+                inventoryEventProducer.sendAvcEvent(cmHandleId,
                         AVC_CHANGED_ATTRIBUTE_NAME,
                         AVC_NO_OLD_VALUE,
                         initialTrustLevel.name());
@@ -92,7 +92,7 @@ public class TrustLevelManager {
     }
 
     /**
-     * Updates trust level of dmi plugin in the cache and publish notification for trust level of cmHandles if it
+     * Updates trust level of dmi plugin in the cache and sends notification for trust level of cmHandles if it
      * has changed.
      *
      * @param dmiServiceName        dmi service name
@@ -113,7 +113,7 @@ public class TrustLevelManager {
     }
 
     /**
-     * Updates trust level of device in the cache and publish notification for trust level of device if it has
+     * Updates trust level of device in the cache and send notification for trust level of device if it has
      * changed.
      *
      * @param cmHandleId            cm handle id
@@ -197,7 +197,7 @@ public class TrustLevelManager {
         } else {
             log.info("The trust level for Cm Handle: {} is now: {} ", notificationCandidateCmHandleId,
                     newEffectiveTrustLevel);
-            inventoryEventProducer.publishAvcEvent(notificationCandidateCmHandleId,
+            inventoryEventProducer.sendAvcEvent(notificationCandidateCmHandleId,
                     AVC_CHANGED_ATTRIBUTE_NAME,
                     oldEffectiveTrustLevel.name(),
                     newEffectiveTrustLevel.name());
index f388ee1..8f83e28 100644 (file)
@@ -27,7 +27,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import lombok.RequiredArgsConstructor;
-import org.onap.cps.events.EventsPublisher;
+import org.onap.cps.events.EventsProducer;
 import org.onap.cps.ncmp.events.avc.ncmp_to_client.Avc;
 import org.onap.cps.ncmp.events.avc.ncmp_to_client.AvcEvent;
 import org.onap.cps.ncmp.events.avc.ncmp_to_client.Data;
@@ -38,18 +38,18 @@ import org.springframework.stereotype.Service;
 @RequiredArgsConstructor
 public class InventoryEventProducer {
 
-    private final EventsPublisher<CloudEvent> eventsPublisher;
+    private final EventsProducer<CloudEvent> eventsProducer;
 
     @Value("${app.ncmp.avc.inventory-events-topic}")
     private String ncmpInventoryEventsTopicName;
 
     /**
-     * Publish attribute value change event.
+     * Send attribute value change event.
      *
      * @param eventKey id of the cmHandle being registered
      */
-    public void publishAvcEvent(final String eventKey, final String attributeName,
-                                final String oldAttributeValue, final String newAttributeValue) {
+    public void sendAvcEvent(final String eventKey, final String attributeName,
+                             final String oldAttributeValue, final String newAttributeValue) {
         final AvcEvent avcEvent = buildAvcEvent(attributeName, oldAttributeValue, newAttributeValue);
 
         final Map<String, String> extensions = createAvcEventExtensions(eventKey);
@@ -61,7 +61,7 @@ public class InventoryEventProducer {
                                                  .build()
                                                  .asCloudEvent();
 
-        eventsPublisher.publishCloudEvent(ncmpInventoryEventsTopicName, eventKey, avcCloudEvent);
+        eventsProducer.sendCloudEvent(ncmpInventoryEventsTopicName, eventKey, avcCloudEvent);
     }
 
     private AvcEvent buildAvcEvent(final String attributeName,
index ad5f42e..b0a8f20 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (c) 2023-2024 Nordix Foundation.
+ *  Copyright (c) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -27,7 +27,7 @@ import io.cloudevents.kafka.CloudEventDeserializer
 import io.cloudevents.kafka.impl.KafkaHeaders
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
 import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent
 import org.onap.cps.ncmp.utils.TestUtils
 import org.onap.cps.ncmp.utils.events.MessagingBaseSpec
@@ -41,16 +41,16 @@ import java.time.Duration
 
 import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
 
-@SpringBootTest(classes = [EventsPublisher, CmAvcEventConsumer, ObjectMapper, JsonObjectMapper])
+@SpringBootTest(classes = [EventsProducer, CmAvcEventConsumer, ObjectMapper, JsonObjectMapper])
 @Testcontainers
 @DirtiesContext
 class CmAvcEventConsumerSpec extends MessagingBaseSpec {
 
     @SpringBean
-    EventsPublisher eventsPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+    EventsProducer eventsProducer = new EventsProducer<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
     @SpringBean
-    CmAvcEventConsumer acvEventConsumer = new CmAvcEventConsumer(eventsPublisher)
+    CmAvcEventConsumer acvEventConsumer = new CmAvcEventConsumer(eventsProducer)
 
     @Autowired
     JsonObjectMapper jsonObjectMapper
index 49e43f9..5a10147 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- * Copyright (c) 2024-2025 Nordix Foundation.
+ * Copyright (c) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -23,7 +23,7 @@ package org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi
 import com.fasterxml.jackson.databind.ObjectMapper
 import io.cloudevents.CloudEvent
 import io.cloudevents.core.v1.CloudEventBuilder
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
 import org.onap.cps.ncmp.config.CpsApplicationContext
 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.CmHandle
 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.Data
@@ -38,11 +38,11 @@ import spock.lang.Specification
 @ContextConfiguration(classes = [CpsApplicationContext])
 class DmiInEventProducerSpec extends Specification {
 
-    def mockEventsPublisher = Mock(EventsPublisher)
+    def mockEventsProducer = Mock(EventsProducer)
 
-    def objectUnderTest = new DmiInEventProducer(mockEventsPublisher)
+    def objectUnderTest = new DmiInEventProducer(mockEventsProducer)
 
-    def 'Create and Publish Cm Notification Subscription DMI In Event'() {
+    def 'Create and Send Cm Notification Subscription DMI In Event'() {
         given: 'a cm subscription for a dmi plugin'
             def subscriptionId = 'test-subscription-id'
             def dmiPluginName = 'test-dmiplugin'
@@ -50,10 +50,10 @@ class DmiInEventProducerSpec extends Specification {
             def dmiInEvent = new DmiInEvent(data: new Data(cmHandles: [new CmHandle(cmhandleId: 'test-1', privateProperties: [:])]))
         and: 'also we have target topic for dmiPlugin'
             objectUnderTest.dmiInEventTopic = 'dmiplugin-test-topic'
-        when: 'the event is published'
-            objectUnderTest.publishDmiInEvent(subscriptionId, dmiPluginName, eventType, dmiInEvent)
+        when: 'the event is sent'
+            objectUnderTest.sendDmiInEvent(subscriptionId, dmiPluginName, eventType, dmiInEvent)
         then: 'the event contains the required attributes'
-            1 * mockEventsPublisher.publishCloudEvent(_, _, _) >> {
+            1 * mockEventsProducer.sendCloudEvent(_, _, _) >> {
                 args ->
                     {
                         assert args[0] == 'dmiplugin-test-topic'
index bcf8780..2ab15d2 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- * Copyright (c) 2024 Nordix Foundation.
+ * Copyright (c) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -108,8 +108,8 @@ class DmiOutEventConsumerSpec extends MessagingBaseSpec {
             expectedPersistenceCalls * mockDmiCacheHandler.persistIntoDatabasePerDmi('sub-1','test-dmi-plugin-name')
         and: 'correct number of calls to map the ncmp out event'
             1 * mockNcmpOutEventMapper.toNcmpOutEvent('sub-1', _)
-        and: 'correct number of calls to publish the ncmp out event to client'
-            1 * mockNcmpOutEventProducer.publishNcmpOutEvent('sub-1', 'subscriptionCreateResponse', _, false)
+        and: 'correct number of calls to send the ncmp out event to client'
+            1 * mockNcmpOutEventProducer.sendNcmpOutEvent('sub-1', 'subscriptionCreateResponse', _, false)
         where: 'the following parameters are used'
             scenario          | subscriptionStatus | statusCode || expectedCacheCalls | expectedPersistenceCalls
             'Accepted Status' | ACCEPTED           | '1'        || 1                  | 1
index 1a54dee..e4321ff 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- * Copyright (c) 2024 Nordix Foundation.
+ * Copyright (c) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -81,11 +81,11 @@ class CmSubscriptionHandlerImplSpec extends Specification {
             objectUnderTest.processSubscriptionCreateRequest(subscriptionId, predicates)
         then: 'the subscription cache handler is called once'
             1 * mockDmiCacheHandler.add('test-id', _)
-        and: 'the events handler method to publish DMI event is called correct number of times with the correct parameters'
-            testDmiSubscriptionsPerDmi.size() * mockDmiInEventProducer.publishDmiInEvent(
+        and: 'the events handler method to send DMI event is called correct number of times with the correct parameters'
+            testDmiSubscriptionsPerDmi.size() * mockDmiInEventProducer.sendDmiInEvent(
                 "test-id", "dmi-1", "subscriptionCreateRequest", testDmiInEvent)
         and: 'we schedule to send the response after configured time from the cache'
-            1 * mockNcmpOutEventProducer.publishNcmpOutEvent('test-id', 'subscriptionCreateResponse', null, true)
+            1 * mockNcmpOutEventProducer.sendNcmpOutEvent('test-id', 'subscriptionCreateResponse', null, true)
     }
 
     def 'Consume valid and Overlapping Cm Notification Subscription NcmpIn Event'() {
@@ -105,7 +105,7 @@ class CmSubscriptionHandlerImplSpec extends Specification {
         and: 'the subscription details are updated in the cache'
             1 * mockDmiCacheHandler.updateDmiSubscriptionStatus('test-id', _, ACCEPTED)
         and: 'we schedule to send the response after configured time from the cache'
-            1 * mockNcmpOutEventProducer.publishNcmpOutEvent('test-id', 'subscriptionCreateResponse', null, true)
+            1 * mockNcmpOutEventProducer.sendNcmpOutEvent('test-id', 'subscriptionCreateResponse', null, true)
     }
 
     def 'Consume valid and but non-unique CmNotificationSubscription create message'() {
@@ -122,10 +122,10 @@ class CmSubscriptionHandlerImplSpec extends Specification {
                 "test-id", _) >> testNcmpOutEvent
         when: 'the valid but non-unique event is consumed'
             objectUnderTest.processSubscriptionCreateRequest(subscriptionId, predicates)
-        then: 'the events handler method to publish DMI event is never called'
-            0 * mockDmiInEventProducer.publishDmiInEvent(_, _, _, _)
-        and: 'the events handler method to publish NCMP out event is called once'
-            1 * mockNcmpOutEventProducer.publishNcmpOutEvent('test-id', 'subscriptionCreateResponse', testNcmpOutEvent, false)
+        then: 'the events handler method to send DMI event is never called'
+            0 * mockDmiInEventProducer.sendDmiInEvent(_, _, _, _)
+        and: 'the events handler method to send NCMP out event is called once'
+            1 * mockNcmpOutEventProducer.sendNcmpOutEvent('test-id', 'subscriptionCreateResponse', testNcmpOutEvent, false)
     }
 
     def 'Consume valid CmNotificationSubscriptionNcmpInEvent delete message'() {
@@ -140,11 +140,11 @@ class CmSubscriptionHandlerImplSpec extends Specification {
             1 * mockInventoryPersistence.getYangModelCmHandle('ch-2') >> new YangModelCmHandle(dmiServiceName: 'dmi-2')
         when: 'the subscription delete request is processed'
             objectUnderTest.processSubscriptionDeleteRequest(subscriptionId)
-        then: 'the method to publish a dmi event is called with correct parameters'
-            1 * mockDmiInEventProducer.publishDmiInEvent(subscriptionId,'dmi-1','subscriptionDeleteRequest',_)
-            1 * mockDmiInEventProducer.publishDmiInEvent(subscriptionId,'dmi-2','subscriptionDeleteRequest',_)
-        and: 'the method to publish nmcp out event is called with correct parameters'
-            1 * mockNcmpOutEventProducer.publishNcmpOutEvent(subscriptionId, 'subscriptionDeleteResponse', null, true)
+        then: 'the method to send a dmi event is called with correct parameters'
+            1 * mockDmiInEventProducer.sendDmiInEvent(subscriptionId,'dmi-1','subscriptionDeleteRequest',_)
+            1 * mockDmiInEventProducer.sendDmiInEvent(subscriptionId,'dmi-2','subscriptionDeleteRequest',_)
+        and: 'the method to send nmcp out event is called with correct parameters'
+            1 * mockNcmpOutEventProducer.sendNcmpOutEvent(subscriptionId, 'subscriptionDeleteResponse', null, true)
     }
 
     def 'Delete a subscriber for fully overlapping subscriptions'() {
@@ -161,12 +161,12 @@ class CmSubscriptionHandlerImplSpec extends Specification {
             2 * mockDmiCacheHandler.get(subscriptionId) >> ['dmi-1':[:],'dmi-2':[:]]
         when: 'the subscription delete request is processed'
             objectUnderTest.processSubscriptionDeleteRequest(subscriptionId)
-        then: 'the method to publish a dmi event is never called'
-            0 * mockDmiInEventProducer.publishDmiInEvent(_,_,_,_)
+        then: 'the method to send a dmi event is never called'
+            0 * mockDmiInEventProducer.sendDmiInEvent(_,_,_,_)
         and: 'the cache handler is called to remove subscriber from database per dmi'
             1 * mockDmiCacheHandler.removeFromDatabase('test-id', 'dmi-1')
             1 * mockDmiCacheHandler.removeFromDatabase('test-id', 'dmi-2')
-        and: 'the method to publish nmcp out event is called with correct parameters'
-            1 * mockNcmpOutEventProducer.publishNcmpOutEvent(subscriptionId, 'subscriptionDeleteResponse', null, false)
+        and: 'the method to send ncmp out event is called with correct parameters'
+            1 * mockNcmpOutEventProducer.sendNcmpOutEvent(subscriptionId, 'subscriptionDeleteResponse', null, false)
     }
 }
index d8adde2..09aebf3 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2025 Nordix Foundation
+ *  Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -23,7 +23,7 @@ package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp
 import com.fasterxml.jackson.databind.ObjectMapper
 import io.cloudevents.CloudEvent
 import io.cloudevents.core.v1.CloudEventBuilder
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
 import org.onap.cps.ncmp.config.CpsApplicationContext
 import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler
 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.Data
@@ -38,27 +38,27 @@ import spock.lang.Specification
 @ContextConfiguration(classes = [CpsApplicationContext])
 class NcmpOutEventProducerSpec extends Specification {
 
-    def mockEventsPublisher = Mock(EventsPublisher)
+    def mockEventsProducer = Mock(EventsProducer)
     def mockNcmpOutEventMapper = Mock(NcmpOutEventMapper)
     def mockDmiCacheHandler = Mock(DmiCacheHandler)
 
-    def objectUnderTest = new NcmpOutEventProducer(mockEventsPublisher, mockNcmpOutEventMapper, mockDmiCacheHandler)
+    def objectUnderTest = new NcmpOutEventProducer(mockEventsProducer, mockNcmpOutEventMapper, mockDmiCacheHandler)
 
     def 'Create and #scenario Cm Notification Subscription NCMP out event'() {
         given: 'a cm subscription response for the client'
             def subscriptionId = 'test-subscription-id-2'
             def eventType = 'subscriptionCreateResponse'
             def ncmpOutEvent = new NcmpOutEvent(data: new Data(subscriptionId: 'test-subscription-id-2', acceptedTargets: ['ch-1', 'ch-2']))
-        and: 'also we have target topic for publishing to client'
+        and: 'also we have target topic for sending to client'
             objectUnderTest.ncmpOutEventTopic = 'client-test-topic'
         and: 'a deadline to an event'
             objectUnderTest.dmiOutEventTimeoutInMs = 1000
-        when: 'the event is published'
-            objectUnderTest.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, eventPublishingTaskToBeScheduled)
+        when: 'the event is sent'
+            objectUnderTest.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, eventPublishingTaskToBeScheduled)
         then: 'we conditionally wait for a while'
             Thread.sleep(delayInMs)
         then: 'the event contains the required attributes'
-            1 * mockEventsPublisher.publishCloudEvent(_, _, _) >> {
+            1 * mockEventsProducer.sendCloudEvent(_, _, _) >> {
                 args ->
                     {
                         assert args[0] == 'client-test-topic'
@@ -72,27 +72,27 @@ class NcmpOutEventProducerSpec extends Specification {
             }
         where: 'following scenarios are considered'
             scenario                                          | delayInMs | eventPublishingTaskToBeScheduled
-            'publish event now'                               | 0         | false
-            'schedule and publish after the configured time ' | 1500      | true
+            'send event now'                               | 0         | false
+            'schedule and send after the configured time ' | 1500      | true
     }
 
-    def 'Schedule Cm Notification Subscription NCMP out event but later publish it on demand'() {
+    def 'Schedule Cm Notification Subscription NCMP out event but later send it on demand'() {
         given: 'a cm subscription response for the client'
             def subscriptionId = 'test-subscription-id-3'
             def eventType = 'subscriptionCreateResponse'
             def ncmpOutEvent = new NcmpOutEvent(data: new Data(subscriptionId: 'test-subscription-id-3', acceptedTargets: ['ch-2', 'ch-3']))
-        and: 'also we have target topic for publishing to client'
+        and: 'also we have target topic for sending to client'
             objectUnderTest.ncmpOutEventTopic = 'client-test-topic'
         and: 'a deadline to an event'
             objectUnderTest.dmiOutEventTimeoutInMs = 1000
-        when: 'the event is scheduled to be published'
-            objectUnderTest.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, true)
+        when: 'the event is scheduled to be sent'
+            objectUnderTest.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, true)
         then: 'we wait for 10ms and then we receive response from DMI'
             Thread.sleep(10)
-        and: 'we receive response from DMI so we publish the message on demand'
-            objectUnderTest.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false)
+        and: 'we receive response from DMI so we send the message on demand'
+            objectUnderTest.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false)
         then: 'the event contains the required attributes'
-            1 * mockEventsPublisher.publishCloudEvent(_, _, _) >> {
+            1 * mockEventsProducer.sendCloudEvent(_, _, _) >> {
                 args ->
                     {
                         assert args[0] == 'client-test-topic'
@@ -108,23 +108,23 @@ class NcmpOutEventProducerSpec extends Specification {
             1 * mockDmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId)
     }
 
-    def 'No event published when NCMP out event is null'() {
+    def 'No event sent when NCMP out event is null'() {
         given: 'a cm subscription response for the client'
             def subscriptionId = 'test-subscription-id-3'
             def eventType = 'subscriptionCreateResponse'
             def ncmpOutEvent = null
-        and: 'also we have target topic for publishing to client'
+        and: 'also we have target topic for sending to client'
             objectUnderTest.ncmpOutEventTopic = 'client-test-topic'
         and: 'a deadline to an event'
             objectUnderTest.dmiOutEventTimeoutInMs = 1000
-        when: 'the event is scheduled to be published'
-            objectUnderTest.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, true)
+        when: 'the event is scheduled to be sent'
+            objectUnderTest.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, true)
         then: 'we wait for 10ms and then we receive response from DMI'
             Thread.sleep(10)
-        and: 'we receive NO response from DMI so we publish the message on demand'
-            objectUnderTest.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false)
-        and: 'no event published'
-            0 * mockEventsPublisher.publishCloudEvent(*_)
+        and: 'we receive NO response from DMI so we send the message on demand'
+            objectUnderTest.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false)
+        and: 'no event sent'
+            0 * mockEventsProducer.sendCloudEvent(*_)
     }
 
 }
index 37a9097..93338c9 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2021-2025 Nordix Foundation
+ *  Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
  *  Modifications Copyright (C) 2022 Bell Canada
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
@@ -22,7 +22,7 @@
 package org.onap.cps.ncmp.impl.data
 
 import com.fasterxml.jackson.databind.ObjectMapper
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
 import org.onap.cps.ncmp.api.data.models.CmResourceAddress
 import org.onap.cps.ncmp.api.data.models.DataOperationRequest
 import org.onap.cps.ncmp.api.exceptions.DmiClientRequestException
@@ -57,7 +57,7 @@ import static org.onap.cps.ncmp.impl.models.RequiredDmiService.DATA
 import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
 
 @SpringBootTest
-@ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext, DmiProperties, DmiDataOperations, PolicyExecutor])
+@ContextConfiguration(classes = [EventsProducer, CpsApplicationContext, DmiProperties, DmiDataOperations, PolicyExecutor])
 class DmiDataOperationsSpec extends DmiOperationsBaseSpec {
 
     def NO_TOPIC = null
@@ -74,7 +74,7 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec {
     DmiDataOperations objectUnderTest
 
     @SpringBean
-    EventsPublisher eventsPublisher = Stub()
+    EventsProducer eventsProducer = Stub()
 
     @SpringBean
     PolicyExecutor policyExecutor = Mock()
@@ -130,9 +130,9 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec {
             def dataOperationBatchRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
             def dataOperationRequest = spiedJsonObjectMapper.convertJsonString(dataOperationBatchRequestJsonData, DataOperationRequest.class)
             dataOperationRequest.dataOperationDefinitions[0].cmHandleReferences = [cmHandleId]
-        and: 'the published cloud event will be captured'
+        and: 'the sent cloud event will be captured'
             def actualDataOperationCloudEvent = null
-            eventsPublisher.publishCloudEvent('my-topic-name', 'my-request-id', _) >> { args -> actualDataOperationCloudEvent = args[2] }
+            eventsProducer.sendCloudEvent('my-topic-name', 'my-request-id', _) >> { args -> actualDataOperationCloudEvent = args[2] }
         and: 'a DMI client request exception is thrown when DMI service is called'
             mockDmiRestClient.asynchronousPostOperationWithJsonData(*_) >> { Mono.error(new DmiClientRequestException(123, '', '', UNKNOWN_ERROR)) }
         when: 'attempt to get resource data for group of cm handles is invoked'
index c651bb5..8ea73b6 100644 (file)
@@ -24,7 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.mapstruct.factory.Mappers
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
 import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent
 import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent
 import org.onap.cps.ncmp.utils.TestUtils
@@ -37,14 +37,14 @@ import org.springframework.test.annotation.DirtiesContext
 import org.testcontainers.spock.Testcontainers
 import java.time.Duration
 
-@SpringBootTest(classes = [EventsPublisher, DmiAsyncRequestResponseEventConsumer, ObjectMapper, JsonObjectMapper])
+@SpringBootTest(classes = [EventsProducer, DmiAsyncRequestResponseEventConsumer, ObjectMapper, JsonObjectMapper])
 @Testcontainers
 @DirtiesContext
 class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends MessagingBaseSpec {
 
     @SpringBean
-    EventsPublisher cpsAsyncRequestResponseEventPublisher =
-        new EventsPublisher<NcmpAsyncRequestResponseEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate);
+    EventsProducer cpsAsyncRequestResponseEventProducer =
+        new EventsProducer<NcmpAsyncRequestResponseEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate);
 
 
     @SpringBean
@@ -53,7 +53,7 @@ class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends MessagingBase
 
     @SpringBean
     DmiAsyncRequestResponseEventConsumer dmiAsyncRequestResponseEventConsumer =
-            new DmiAsyncRequestResponseEventConsumer(cpsAsyncRequestResponseEventPublisher,
+            new DmiAsyncRequestResponseEventConsumer(cpsAsyncRequestResponseEventProducer,
                     ncmpAsyncRequestResponseEventMapper)
 
     @Autowired
index afb594a..9c9768a 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- * Copyright (C) 2023-2024 Nordix Foundation
+ * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -29,7 +29,7 @@ import io.cloudevents.kafka.impl.KafkaHeaders
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.common.header.internals.RecordHeaders
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
 import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent
 import org.onap.cps.ncmp.utils.TestUtils
 import org.onap.cps.ncmp.utils.events.MessagingBaseSpec
@@ -45,16 +45,16 @@ import java.time.Duration
 
 import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
 
-@SpringBootTest(classes = [EventsPublisher, DataOperationEventConsumer, RecordFilterStrategies, JsonObjectMapper, ObjectMapper])
+@SpringBootTest(classes = [EventsProducer, DataOperationEventConsumer, RecordFilterStrategies, JsonObjectMapper, ObjectMapper])
 @Testcontainers
 @DirtiesContext
 class DataOperationEventConsumerSpec extends MessagingBaseSpec {
 
     @SpringBean
-    EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+    EventsProducer asyncDataOperationEventProducer = new EventsProducer<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
     @SpringBean
-    DataOperationEventConsumer objectUnderTest = new DataOperationEventConsumer(asyncDataOperationEventPublisher)
+    DataOperationEventConsumer objectUnderTest = new DataOperationEventConsumer(asyncDataOperationEventProducer)
 
     @Autowired
     JsonObjectMapper jsonObjectMapper
@@ -66,13 +66,13 @@ class DataOperationEventConsumerSpec extends MessagingBaseSpec {
     def static clientTopic = 'client-topic'
     def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent'
 
-    def 'Consume and publish event to client specified topic'() {
+    def 'Consume and send event to client specified topic'() {
         given: 'consumer subscribing to client topic'
             cloudEventKafkaConsumer.subscribe([clientTopic])
         and: 'consumer record for data operation event'
             def consumerRecordIn = createConsumerRecord(dataOperationType)
-        when: 'the data operation event is consumed and published to client specified topic'
-            objectUnderTest.consumeAndPublish(consumerRecordIn)
+        when: 'the data operation event is consumed and sent to client specified topic'
+            objectUnderTest.consumeAndSend(consumerRecordIn)
         and: 'the client specified topic is polled'
             def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
         then: 'verify cloud compliant headers'
@@ -84,7 +84,7 @@ class DataOperationEventConsumerSpec extends MessagingBaseSpec {
             assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_destination') == clientTopic
         and: 'map consumer record to expected event type'
             def dataOperationResponseEvent = toTargetEvent(consumerRecordOut.value(), DataOperationEvent.class)
-        and: 'verify published response data properties'
+        and: 'verify sent response data properties'
             def response = dataOperationResponseEvent.data.responses[0]
             response.operationId == 'some-operation-id'
             response.statusCode == 'any-success-status-code'
index 8039d47..baca445 100644 (file)
@@ -21,7 +21,7 @@
 package org.onap.cps.ncmp.impl.data.async
 
 import io.cloudevents.core.builder.CloudEventBuilder
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
 import org.onap.cps.ncmp.config.KafkaConfig
 import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent
 import org.onap.cps.ncmp.utils.events.ConsumerBaseSpec
@@ -42,7 +42,7 @@ import java.util.concurrent.TimeUnit
 class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec {
 
     @SpringBean
-    EventsPublisher mockEventsPublisher = Mock()
+    EventsProducer mockEventsProducer = Mock()
 
     @SpringBean
     NcmpAsyncRequestResponseEventMapper mapper = Stub()
@@ -61,23 +61,23 @@ class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec {
         then: 'wait a little for async processing of message (must wait to try to avoid false positives)'
             TimeUnit.MILLISECONDS.sleep(300)
         and: 'event is not consumed'
-            0 * mockEventsPublisher.publishEvent(*_)
+            0 * mockEventsProducer.sendEvent(*_)
     }
 
     def 'Legacy event consumer with valid legacy event.'() {
         given: 'a legacy event'
             DmiAsyncRequestResponseEvent legacyEvent = new DmiAsyncRequestResponseEvent(eventId:'legacyEventId', eventTarget:'legacyEventTarget')
-        and: 'a flag to track the publish event call'
-            def publishEventMethodCalled = false
-        and: 'the (mocked) events publisher will use the flag to indicate if it is called'
-            mockEventsPublisher.publishEvent(*_) >> {
-                publishEventMethodCalled = true
+        and: 'a flag to track the send event call'
+            def sendEventMethodCalled = false
+        and: 'the (mocked) events producer will use the flag to indicate if it is called'
+            mockEventsProducer.sendEvent(*_) >> {
+                sendEventMethodCalled = true
             }
         when: 'send the cloud event'
             legacyEventKafkaTemplate.send(topic, legacyEvent)
         then: 'the event is consumed by the (legacy) AsynRestRequest consumer'
             new PollingConditions().within(1) {
-                assert publishEventMethodCalled == true
+                assert sendEventMethodCalled == true
             }
     }
 
@@ -87,20 +87,20 @@ class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec {
                 .withType(eventType)
                 .withSource(URI.create('some-source'))
                 .build()
-        and: 'a flag to track the publish event call'
-            def publishEventMethodCalled = false
-        and: 'the (mocked) events publisher will use the flag to indicate if it is called'
-            mockEventsPublisher.publishCloudEvent(*_) >> {
-                publishEventMethodCalled = true
+        and: 'a flag to track the sent event call'
+            def sendEventMethodCalled = false
+        and: 'the (mocked) events producer will use the flag to indicate if it is called'
+            mockEventsProducer.sendCloudEvent(*_) >> {
+                sendEventMethodCalled = true
             }
         when: 'send the cloud event'
             cloudEventKafkaTemplate.send(topic, cloudEvent)
         then: 'the event has only been forwarded for the correct type'
             new PollingConditions(initialDelay: 0.3).within(1) {
-                assert publishEventMethodCalled == expectCallToPublishEventMethod
+                assert sendEventMethodCalled == expectCallToSendEventMethod
             }
         where: 'the following event types are used'
-            eventType                                        || expectCallToPublishEventMethod
+            eventType                                        || expectCallToSendEventMethod
             'DataOperationEvent'                             || true
             'other type'                                     || false
             'any type contain the word "DataOperationEvent"' || true
@@ -114,7 +114,7 @@ class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec {
         then: 'wait a little for async processing of message (must wait to try to avoid false positives)'
             TimeUnit.MILLISECONDS.sleep(300)
         and: 'the event is not processed by this consumer'
-            0 * mockEventsPublisher.publishCloudEvent(*_)
+            0 * mockEventsProducer.sendCloudEvent(*_)
     }
 
 }
index 75738b4..65e8af8 100644 (file)
@@ -22,7 +22,7 @@ package org.onap.cps.ncmp.impl.data.async
 
 import com.fasterxml.jackson.databind.ObjectMapper
 import io.cloudevents.core.builder.CloudEventBuilder
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
 import org.onap.cps.ncmp.config.KafkaConfig
 import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent
 import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent
@@ -46,7 +46,7 @@ import spock.util.concurrent.PollingConditions
 class SerializationIntegrationSpec extends ConsumerBaseSpec {
 
     @SpringBean
-    EventsPublisher mockEventsPublisher = Mock()
+    EventsProducer mockEventsProducer = Mock()
 
     @SpringBean
     NcmpAsyncRequestResponseEventMapper mapper = Stub() { toNcmpAsyncEvent(_) >> new NcmpAsyncRequestResponseEvent(eventId: 'my-event-id', eventTarget: 'some client topic')}
@@ -60,34 +60,34 @@ class SerializationIntegrationSpec extends ConsumerBaseSpec {
     def 'Forwarding DataOperation Event Data.'() {
         given: 'a data operation cloud event'
             def cloudEvent = createCloudEvent()
-        and: 'a flag to track the publish cloud event call'
-            def publishCloudEventMethodCalled = false
-        and: 'the (mocked) events publisher will use the flag to indicate if it is called and will capture the cloud event'
-            mockEventsPublisher.publishCloudEvent('some client topic', 'some-correlation-id', cloudEvent) >> {
-                publishCloudEventMethodCalled = true
+        and: 'a flag to track the send cloud event call'
+            def sendCloudEventMethodCalled = false
+        and: 'the (mocked) events producer will use the flag to indicate if it is called and will capture the cloud event'
+            mockEventsProducer.sendCloudEvent('some client topic', 'some-correlation-id', cloudEvent) >> {
+                sendCloudEventMethodCalled = true
             }
         when: 'send the event'
             cloudEventKafkaTemplate.send(topic, cloudEvent)
         then: 'the event has been forwarded'
             new PollingConditions().within(1) {
-                assert publishCloudEventMethodCalled == true
+                assert sendCloudEventMethodCalled == true
             }
     }
 
     def 'Forwarding AsyncRestRequestResponse Event Data.'() {
         given: 'async request response legacy event'
             def dmiAsyncRequestResponseEvent = new DmiAsyncRequestResponseEvent(eventId: 'my-event-id',eventTarget: 'some client topic')
-        and: 'a flag to track the publish event call'
-            def publishEventMethodCalled = false
-        and: 'the (mocked) events publisher will use the flag to indicate if it is called and will capture the event'
-            mockEventsPublisher.publishEvent(*_) >> {
-                publishEventMethodCalled = true
+        and: 'a flag to track the send event call'
+            def sendEventMethodCalled = false
+        and: 'the (mocked) events producer will use the flag to indicate if it is called and will capture the event'
+            mockEventsProducer.sendEvent(*_) >> {
+                sendEventMethodCalled = true
             }
         when: 'send the event'
             legacyEventKafkaTemplate.send(topic, dmiAsyncRequestResponseEvent)
         then: 'the event has been forwarded'
             new PollingConditions().within(1) {
-                assert publishEventMethodCalled == true
+                assert sendEventMethodCalled == true
             }
     }
 
index 33b8490..22ce4ab 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2023-2024 Nordix Foundation
+ *  Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -25,7 +25,7 @@ import io.cloudevents.CloudEvent
 import io.cloudevents.kafka.CloudEventDeserializer
 import io.cloudevents.kafka.impl.KafkaHeaders
 import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
 import org.onap.cps.ncmp.api.data.models.DataOperationRequest
 import org.onap.cps.ncmp.api.data.models.OperationType
 import org.onap.cps.ncmp.api.inventory.models.CompositeStateBuilder
@@ -46,7 +46,7 @@ import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.ADVISED
 import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.READY
 import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
 
-@ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext])
+@ContextConfiguration(classes = [EventsProducer, CpsApplicationContext])
 class DmiDataOperationsHelperSpec extends MessagingBaseSpec {
 
     def static clientTopic = 'my-topic-name'
@@ -56,7 +56,7 @@ class DmiDataOperationsHelperSpec extends MessagingBaseSpec {
     JsonObjectMapper jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
 
     @SpringBean
-    EventsPublisher eventPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+    EventsProducer eventProducer = new EventsProducer<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
     def 'Process per data operation request with #serviceName.'() {
         given: 'data operation request with 3 operations'
@@ -106,7 +106,7 @@ class DmiDataOperationsHelperSpec extends MessagingBaseSpec {
             assert cmHandlesInRequestBody[0].moduleSetTag == 'module-set-tag1'
     }
 
-    def 'Process per data operation request with non-ready, non-existing cm handle and publish event to client specified topic'() {
+    def 'Process per data operation request with non-ready, non-existing cm handle and send event to client specified topic'() {
         given: 'consumer subscribing to client topic'
             def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test-1', CloudEventDeserializer))
             cloudEventKafkaConsumer.subscribe([clientTopic])
@@ -129,7 +129,7 @@ class DmiDataOperationsHelperSpec extends MessagingBaseSpec {
                 toTargetEvent(consumerRecordOut.value(), DataOperationEvent.class)
         and: 'data operation response event response size is 3'
             dataOperationResponseEvent.data.responses.size() == 3
-        and: 'verify published data operation response as json string'
+        and: 'verify sent data operation response as json string'
             def dataOperationResponseEventJson = TestUtils.getResourceFileContent('dataOperationResponseEvent.json')
             jsonObjectMapper.asJsonString(dataOperationResponseEvent.data.responses) == dataOperationResponseEventJson
     }
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- * Copyright (C) 2022-2024 Nordix Foundation
+ * Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -23,7 +23,7 @@ package org.onap.cps.ncmp.impl.inventory.sync.lcm
 import com.fasterxml.jackson.databind.ObjectMapper
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.common.serialization.StringDeserializer
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
 import org.onap.cps.ncmp.events.lcm.v1.Event
 import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
 import org.onap.cps.ncmp.utils.TestUtils
@@ -41,20 +41,20 @@ import java.time.Duration
 @SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
 @Testcontainers
 @DirtiesContext
-class LcmEventsPublisherSpec extends MessagingBaseSpec {
+class EventsProducerSpec extends MessagingBaseSpec {
 
     def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', StringDeserializer))
 
     def testTopic = 'ncmp-events-test'
 
     @SpringBean
-    EventsPublisher<LcmEvent> lcmEventsPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+    EventsProducer<LcmEvent> eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
     @Autowired
     JsonObjectMapper jsonObjectMapper
 
 
-    def 'Produce and Consume Lcm Event'() {
+    def 'Produce and Consume Event'() {
         given: 'event key and event data'
             def eventKey = 'lcm'
             def eventId = 'test-uuid'
@@ -84,8 +84,8 @@ class LcmEventsPublisherSpec extends MessagingBaseSpec {
                 eventSchemaVersion: eventSchemaVersion]
         and: 'consumer has a subscription'
             legacyEventKafkaConsumer.subscribe([testTopic] as List<String>)
-        when: 'an event is published'
-            lcmEventsPublisher.publishEvent(testTopic, eventKey, eventHeader, eventData)
+        when: 'an event is sent'
+            eventsProducer.sendEvent(testTopic, eventKey, eventHeader, eventData)
         and: 'topic is polled'
             def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))
         then: 'poll returns one record'
index a0b6de1..8ae13f8 100644 (file)
@@ -25,7 +25,7 @@ import static org.onap.cps.ncmp.events.lcm.v1.Values.CmHandleState.READY
 
 import io.micrometer.core.instrument.Tag
 import io.micrometer.core.instrument.simple.SimpleMeterRegistry
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
 import org.onap.cps.ncmp.events.lcm.v1.Event
 import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
 import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader
@@ -36,13 +36,13 @@ import spock.lang.Specification
 
 class LcmEventsProducerSpec extends Specification {
 
-    def mockLcmEventsPublisher = Mock(EventsPublisher)
+    def mockLcmEventsProducer = Mock(EventsProducer)
     def mockJsonObjectMapper = Mock(JsonObjectMapper)
     def meterRegistry = new SimpleMeterRegistry()
 
-    def objectUnderTest = new LcmEventsProducer(mockLcmEventsPublisher, mockJsonObjectMapper, meterRegistry)
+    def objectUnderTest = new LcmEventsProducer(mockLcmEventsProducer, mockJsonObjectMapper, meterRegistry)
 
-    def 'Create and Publish lcm event where events are #scenario'() {
+    def 'Create and send lcm event where events are #scenario'() {
         given: 'a cm handle id, Lcm Event, and headers'
             def cmHandleId = 'test-cm-handle-id'
             def eventId = UUID.randomUUID().toString()
@@ -54,10 +54,10 @@ class LcmEventsProducerSpec extends Specification {
             objectUnderTest.notificationsEnabled = notificationsEnabled
         and: 'lcm event header is transformed to headers map'
             mockJsonObjectMapper.convertToValueType(lcmEventHeader, Map.class) >> ['eventId': eventId, 'eventCorrelationId': cmHandleId]
-        when: 'service is called to publish lcm event'
+        when: 'service is called to send lcm event'
             objectUnderTest.publishLcmEvent('test-cm-handle-id', lcmEvent, lcmEventHeader)
         then: 'publisher is called #expectedTimesMethodCalled times'
-            expectedTimesMethodCalled * mockLcmEventsPublisher.publishEvent(_, cmHandleId, _, lcmEvent) >> {
+            expectedTimesMethodCalled * mockLcmEventsProducer.sendEvent(_, cmHandleId, _, lcmEvent) >> {
                 args -> {
                     def eventHeaders = (args[2] as Map<String,Object>)
                     assert eventHeaders.containsKey('eventId')
@@ -91,7 +91,7 @@ class LcmEventsProducerSpec extends Specification {
             def lcmEventHeader = new LcmEventHeader(eventId: eventId, eventCorrelationId: cmHandleId)
             objectUnderTest.notificationsEnabled = true
         when: 'publisher set to throw an exception'
-            mockLcmEventsPublisher.publishEvent(_, _, _, _) >> { throw new KafkaException('publishing failed')}
+            mockLcmEventsProducer.sendEvent(_, _, _, _) >> { throw new KafkaException('publishing failed')}
         and: 'an event is publised'
             objectUnderTest.publishLcmEvent(cmHandleId, lcmEvent, lcmEventHeader)
         then: 'the exception is just logged and not bubbled up'
index 72ca190..020834e 100644 (file)
@@ -71,7 +71,7 @@ class TrustLevelManagerSpec extends Specification {
         when: 'method to register to the cache is called'
             objectUnderTest.registerCmHandles(cmHandleModelsToBeCreated)
         then: 'no notification sent'
-            0 * mockInventoryEventProducer.publishAvcEvent(*_)
+            0 * mockInventoryEventProducer.sendAvcEvent(*_)
         and: 'both cm handles are in the cache and are trusted'
             assert trustLevelPerCmHandleId.get('ch-1') == TrustLevel.COMPLETE
             assert trustLevelPerCmHandleId.get('ch-2') == TrustLevel.COMPLETE
@@ -83,7 +83,7 @@ class TrustLevelManagerSpec extends Specification {
         when: 'method to register to the cache is called'
             objectUnderTest.registerCmHandles(cmHandleModelsToBeCreated)
         then: 'notification is sent'
-            1 * mockInventoryEventProducer.publishAvcEvent(*_)
+            1 * mockInventoryEventProducer.sendAvcEvent(*_)
     }
 
     def 'Dmi trust level updated'() {
@@ -94,7 +94,7 @@ class TrustLevelManagerSpec extends Specification {
         when: 'the update is handled'
             objectUnderTest.updateDmi('my-dmi', ['ch-1'], TrustLevel.NONE)
         then: 'notification is sent'
-            1 * mockInventoryEventProducer.publishAvcEvent('ch-1', 'trustLevel', 'COMPLETE', 'NONE')
+            1 * mockInventoryEventProducer.sendAvcEvent('ch-1', 'trustLevel', 'COMPLETE', 'NONE')
         and: 'the dmi in the cache is not trusted'
             assert trustLevelPerDmiPlugin.get('my-dmi') == TrustLevel.NONE
     }
@@ -107,7 +107,7 @@ class TrustLevelManagerSpec extends Specification {
         when: 'the update is handled'
             objectUnderTest.updateDmi('my-dmi', ['ch-1'], TrustLevel.COMPLETE)
         then: 'no notification is sent'
-            0 * mockInventoryEventProducer.publishAvcEvent(*_)
+            0 * mockInventoryEventProducer.sendAvcEvent(*_)
         and: 'the dmi in the cache is trusted'
             assert trustLevelPerDmiPlugin.get('my-dmi') == TrustLevel.COMPLETE
     }
@@ -124,7 +124,7 @@ class TrustLevelManagerSpec extends Specification {
         then: 'the cm handle in the cache is trusted'
             assert trustLevelPerCmHandleId.get('ch-1', TrustLevel.COMPLETE)
         and: 'notification is sent'
-            1 * mockInventoryEventProducer.publishAvcEvent('ch-1', 'trustLevel', 'NONE', 'COMPLETE')
+            1 * mockInventoryEventProducer.sendAvcEvent('ch-1', 'trustLevel', 'NONE', 'COMPLETE')
     }
 
     def 'CmHandle trust level updated with same value'() {
@@ -139,7 +139,7 @@ class TrustLevelManagerSpec extends Specification {
         then: 'the cm handle in the cache is not trusted'
             assert trustLevelPerCmHandleId.get('ch-1', TrustLevel.NONE)
         and: 'no notification is sent'
-            0 * mockInventoryEventProducer.publishAvcEvent(*_)
+            0 * mockInventoryEventProducer.sendAvcEvent(*_)
     }
 
     def 'Dmi trust level restored to complete with non trusted CmHandle'() {
@@ -152,7 +152,7 @@ class TrustLevelManagerSpec extends Specification {
         then: 'the cm handle in the cache is still NONE'
             assert trustLevelPerCmHandleId.get('ch-1') == TrustLevel.NONE
         and: 'no notification is sent'
-            0 * mockInventoryEventProducer.publishAvcEvent(*_)
+            0 * mockInventoryEventProducer.sendAvcEvent(*_)
     }
 
     def 'Apply effective trust level among CmHandle and dmi plugin'() {
index 1aa7aab..21fc656 100644 (file)
@@ -22,7 +22,7 @@ package org.onap.cps.ncmp.utils.events
 
 import com.fasterxml.jackson.databind.ObjectMapper
 import io.cloudevents.CloudEvent
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
 import org.onap.cps.ncmp.config.CpsApplicationContext
 import org.onap.cps.ncmp.events.avc.ncmp_to_client.Avc
 import org.onap.cps.ncmp.events.avc.ncmp_to_client.AvcEvent
@@ -32,10 +32,10 @@ import org.springframework.test.context.ContextConfiguration
 @ContextConfiguration(classes = [CpsApplicationContext, ObjectMapper, JsonObjectMapper])
 class InventoryEventProducerSpec extends MessagingBaseSpec {
 
-    def mockEventsPublisher = Mock(EventsPublisher<CloudEvent>)
-    def objectUnderTest = new InventoryEventProducer(mockEventsPublisher)
+    def mockEventsProducer = Mock(EventsProducer<CloudEvent>)
+    def objectUnderTest = new InventoryEventProducer(mockEventsProducer)
 
-    def 'Publish an attribute value change event'() {
+    def 'Send an attribute value change event'() {
         given: 'the event key'
             def someEventKey = 'someEventKey'
         and: 'the name of the attribute being changed'
@@ -44,10 +44,10 @@ class InventoryEventProducerSpec extends MessagingBaseSpec {
             def someOldAttributeValue = 'someOldAttributeValue'
         and: 'the new value of the attribute'
             def someNewAttributeValue = 'someNewAttributeValue'
-        when: 'an attribute value change event is published'
-            objectUnderTest.publishAvcEvent(someEventKey, someAttributeName, someOldAttributeValue, someNewAttributeValue)
-        then: 'the cloud event publisher is invoked with the correct data'
-            1 * mockEventsPublisher.publishCloudEvent(_, someEventKey,
+        when: 'an attribute value change event is sent'
+            objectUnderTest.sendAvcEvent(someEventKey, someAttributeName, someOldAttributeValue, someNewAttributeValue)
+        then: 'the cloud event producer is invoked with the correct data'
+            1 * mockEventsProducer.sendCloudEvent(_, someEventKey,
                 cloudEvent -> {
                     def actualAvcs = CloudEventMapper.toTargetEvent(cloudEvent, AvcEvent.class).data.attributeValueChange
                     def expectedAvc = new Avc(attributeName: someAttributeName,
index 3061fd2..2d2a857 100644 (file)
@@ -42,7 +42,7 @@ import org.springframework.stereotype.Service;
 @RequiredArgsConstructor
 public class CpsDataUpdateEventsProducer {
 
-    private final EventsPublisher<CpsDataUpdatedEvent> eventsPublisher;
+    private final EventsProducer<CpsDataUpdatedEvent> eventsProducer;
 
     private final CpsNotificationService cpsNotificationService;
 
@@ -56,16 +56,16 @@ public class CpsDataUpdateEventsProducer {
     private boolean notificationsEnabled;
 
     /**
-     * Publish the cps data update event with header to the public topic.
+     * Send the cps data update event with header to the public topic.
      *
      * @param anchor Anchor of the updated data
      * @param xpath  xpath of the updated data
      * @param operation operation performed on the data
      * @param observedTimestamp timestamp when data was updated.
      */
-    @Timed(value = "cps.dataupdate.events.publish", description = "Time taken to publish Data Update event")
-    public void publishCpsDataUpdateEvent(final Anchor anchor, final String xpath,
-                                          final Operation operation, final OffsetDateTime observedTimestamp) {
+    @Timed(value = "cps.dataupdate.events.publish", description = "Time taken to send Data Update event")
+    public void sendCpsDataUpdateEvent(final Anchor anchor, final String xpath,
+                                       final Operation operation, final OffsetDateTime observedTimestamp) {
         if (notificationsEnabled && cpsChangeEventNotificationsEnabled && isNotificationEnabledForAnchor(anchor)) {
             final CpsDataUpdatedEvent cpsDataUpdatedEvent = createCpsDataUpdatedEvent(anchor,
                     observedTimestamp, xpath, operation);
@@ -74,7 +74,7 @@ public class CpsDataUpdateEventsProducer {
             final CloudEvent cpsDataUpdatedEventAsCloudEvent =
                     CpsEvent.builder().type(CpsDataUpdatedEvent.class.getTypeName()).data(cpsDataUpdatedEvent)
                             .extensions(extensions).build().asCloudEvent();
-            eventsPublisher.publishCloudEvent(topicName, updateEventId, cpsDataUpdatedEventAsCloudEvent);
+            eventsProducer.sendCloudEvent(topicName, updateEventId, cpsDataUpdatedEventAsCloudEvent);
         } else {
             log.debug("State of Overall Notifications : {} and Cps Change Event Notifications : {}",
                     notificationsEnabled, cpsChangeEventNotificationsEnabled);
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- * Copyright (C) 2022-2024 Nordix Foundation
+ * Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -34,13 +34,13 @@ import org.springframework.stereotype.Service;
 import org.springframework.util.SerializationUtils;
 
 /**
- * EventsPublisher to publish events.
+ * EventsProducer to send events.
  */
 
 @Slf4j
 @Service
 @RequiredArgsConstructor
-public class EventsPublisher<T> {
+public class EventsProducer<T> {
 
     /**
      * KafkaTemplate for legacy (non-cloud) events.
@@ -51,49 +51,49 @@ public class EventsPublisher<T> {
     private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
 
     /**
-     * Generic CloudEvent publisher.
+     * Generic CloudEvent sender.
      *
      * @param topicName valid topic name
      * @param eventKey  message key
      * @param event     message payload
      */
-    public void publishCloudEvent(final String topicName, final String eventKey, final CloudEvent event) {
+    public void sendCloudEvent(final String topicName, final String eventKey, final CloudEvent event) {
         final CompletableFuture<SendResult<String, CloudEvent>> eventFuture =
                 cloudEventKafkaTemplate.send(topicName, eventKey, event);
         eventFuture.whenComplete((result, e) -> {
             if (e == null) {
-                log.debug("Successfully published event to topic : {} , Event : {}", result.getRecordMetadata().topic(),
+                log.debug("Successfully sent event to topic : {} , Event : {}", result.getRecordMetadata().topic(),
                         result.getProducerRecord().value());
 
             } else {
-                log.error("Unable to publish event to topic : {} due to {}", topicName, e.getMessage());
+                log.error("Unable to send event to topic : {} due to {}", topicName, e.getMessage());
             }
         });
     }
 
     /**
-     * Generic Event publisher.
+     * Generic Event sender.
      * Note: Cloud events should be used. This will be addressed as part of  https://lf-onap.atlassian.net/browse/CPS-1717.
      *
      * @param topicName valid topic name
      * @param eventKey  message key
      * @param event     message payload
      */
-    public void publishEvent(final String topicName, final String eventKey, final T event) {
+    public void sendEvent(final String topicName, final String eventKey, final T event) {
         final CompletableFuture<SendResult<String, T>> eventFuture =
                 legacyKafkaEventTemplate.send(topicName, eventKey, event);
         handleLegacyEventCallback(topicName, eventFuture);
     }
 
     /**
-     * Generic Event Publisher with headers.
+     * Generic Event sender with headers.
      *
      * @param topicName    valid topic name
      * @param eventKey     message key
      * @param eventHeaders event headers
      * @param event        message payload
      */
-    public void publishEvent(final String topicName, final String eventKey, final Headers eventHeaders, final T event) {
+    public void sendEvent(final String topicName, final String eventKey, final Headers eventHeaders, final T event) {
 
         final ProducerRecord<String, T> producerRecord =
                 new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders);
@@ -102,27 +102,27 @@ public class EventsPublisher<T> {
     }
 
     /**
-     * Generic Event Publisher with headers.
+     * Generic Event sender with headers.
      *
      * @param topicName    valid topic name
      * @param eventKey     message key
      * @param eventHeaders map of event headers
      * @param event        message payload
      */
-    public void publishEvent(final String topicName, final String eventKey, final Map<String, Object> eventHeaders,
-            final T event) {
+    public void sendEvent(final String topicName, final String eventKey, final Map<String, Object> eventHeaders,
+                          final T event) {
 
-        publishEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event);
+        sendEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event);
     }
 
     private void handleLegacyEventCallback(final String topicName,
             final CompletableFuture<SendResult<String, T>> eventFuture) {
         eventFuture.whenComplete((result, e) -> {
             if (e == null) {
-                log.debug("Successfully published event to topic : {} , Event : {}", result.getRecordMetadata().topic(),
+                log.debug("Successfully sent event to topic : {} , Event : {}", result.getRecordMetadata().topic(),
                         result.getProducerRecord().value());
             } else {
-                log.error("Unable to publish event to topic : {} due to {}", topicName, e.getMessage());
+                log.error("Unable to send event to topic : {} due to {}", topicName, e.getMessage());
             }
         });
     }
index 586941a..98f8d66 100644 (file)
@@ -396,7 +396,7 @@ public class CpsDataServiceImpl implements CpsDataService {
                                       final Operation operation,
                                       final OffsetDateTime observedTimestamp) {
         try {
-            cpsDataUpdateEventsProducer.publishCpsDataUpdateEvent(anchor, xpath, operation, observedTimestamp);
+            cpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(anchor, xpath, operation, observedTimestamp);
         } catch (final Exception exception) {
             log.error("Failed to send message to notification service", exception);
         }
index 641e399..07ab2a3 100644 (file)
@@ -40,18 +40,18 @@ import static org.onap.cps.events.model.Data.Operation.UPDATE
 
 @ContextConfiguration(classes = [ObjectMapper, JsonObjectMapper])
 class CpsDataUpdateEventsProducerSpec extends Specification {
-    def mockEventsPublisher = Mock(EventsPublisher)
+    def mockEventsProducer = Mock(EventsProducer)
     def objectMapper = new ObjectMapper();
     def mockCpsNotificationService = Mock(CpsNotificationService)
 
-    def objectUnderTest = new CpsDataUpdateEventsProducer(mockEventsPublisher, mockCpsNotificationService)
+    def objectUnderTest = new CpsDataUpdateEventsProducer(mockEventsProducer, mockCpsNotificationService)
 
     def setup() {
         mockCpsNotificationService.isNotificationEnabled('dataspace01', 'anchor01') >> true
         objectUnderTest.topicName = 'cps-core-event'
     }
 
-    def 'Create and Publish cps update event where events are #scenario.'() {
+    def 'Create and send cps update event where events are #scenario.'() {
         given: 'an anchor, operation and observed timestamp'
             def anchor = new Anchor('anchor01', 'dataspace01', 'schema01');
             def operation = operationInRequest
@@ -60,10 +60,10 @@ class CpsDataUpdateEventsProducerSpec extends Specification {
             objectUnderTest.notificationsEnabled = true
         and: 'cpsChangeEventNotificationsEnabled is also true'
             objectUnderTest.cpsChangeEventNotificationsEnabled = true
-        when: 'service is called to publish data update event'
-            objectUnderTest.publishCpsDataUpdateEvent(anchor, xpath, operation, observedTimestamp)
+        when: 'service is called to send data update event'
+            objectUnderTest.sendCpsDataUpdateEvent(anchor, xpath, operation, observedTimestamp)
         then: 'the event contains the required attributes'
-            1 * mockEventsPublisher.publishCloudEvent('cps-core-event', 'dataspace01:anchor01', _) >> {
+            1 * mockEventsProducer.sendCloudEvent('cps-core-event', 'dataspace01:anchor01', _) >> {
             args ->
                 {
                     def cpsDataUpdatedEvent = (args[2] as CloudEvent)
@@ -86,7 +86,7 @@ class CpsDataUpdateEventsProducerSpec extends Specification {
         'non root node xpath and delete operation' | '/test/path' | DELETE              || UPDATE
     }
 
-    def 'Publish cps update event when no timestamp provided.'() {
+    def 'Send cps update event when no timestamp provided.'() {
         given: 'an anchor, operation and null timestamp'
             def anchor = new Anchor('anchor01', 'dataspace01', 'schema01');
             def observedTimestamp = null
@@ -94,13 +94,13 @@ class CpsDataUpdateEventsProducerSpec extends Specification {
             objectUnderTest.notificationsEnabled = true
         and: 'cpsChangeEventNotificationsEnabled is true'
             objectUnderTest.cpsChangeEventNotificationsEnabled = true
-        when: 'service is called to publish data update event'
-            objectUnderTest.publishCpsDataUpdateEvent(anchor, '/', CREATE, observedTimestamp)
-        then: 'the event is published'
-            1 * mockEventsPublisher.publishCloudEvent('cps-core-event', 'dataspace01:anchor01', _)
+        when: 'service is called to send data update event'
+            objectUnderTest.sendCpsDataUpdateEvent(anchor, '/', CREATE, observedTimestamp)
+        then: 'the event is sent'
+            1 * mockEventsProducer.sendCloudEvent('cps-core-event', 'dataspace01:anchor01', _)
     }
 
-    def 'Enabling and disabling publish cps update events.'() {
+    def 'Enabling and disabling sending cps update events.'() {
         given: 'a different anchor'
             def anchor = new Anchor('anchor02', 'some dataspace', 'some schema');
         and: 'notificationsEnabled is #notificationsEnabled'
@@ -109,12 +109,12 @@ class CpsDataUpdateEventsProducerSpec extends Specification {
             objectUnderTest.cpsChangeEventNotificationsEnabled = cpsChangeEventNotificationsEnabled
         and: 'notification service enabled is: #cpsNotificationServiceisNotificationEnabled'
             mockCpsNotificationService.isNotificationEnabled(_, 'anchor02') >> cpsNotificationServiceisNotificationEnabled
-        when: 'service is called to publish data update event'
-            objectUnderTest.publishCpsDataUpdateEvent(anchor, '/', CREATE, null)
-        then: 'the event is only published when all related flags are true'
-            expectedCallsToPublisher * mockEventsPublisher.publishCloudEvent(*_)
+        when: 'service is called to send data update event'
+            objectUnderTest.sendCpsDataUpdateEvent(anchor, '/', CREATE, null)
+        then: 'the event is only sent when all related flags are true'
+            expectedCallsToProducer * mockEventsProducer.sendCloudEvent(*_)
         where: 'the following flags are used'
-            notificationsEnabled | cpsChangeEventNotificationsEnabled | cpsNotificationServiceisNotificationEnabled  || expectedCallsToPublisher
+            notificationsEnabled | cpsChangeEventNotificationsEnabled | cpsNotificationServiceisNotificationEnabled  || expectedCallsToProducer
             false                | true                               | true                                         || 0
             true                 | false                              | true                                         || 0
             true                 | true                               | false                                        || 0
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
+ * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -31,8 +31,6 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.header.Headers
 import org.apache.kafka.common.header.internals.RecordHeader
 import org.apache.kafka.common.header.internals.RecordHeaders
-import org.junit.jupiter.api.AfterEach
-import org.junit.jupiter.api.BeforeEach
 import org.slf4j.LoggerFactory
 import org.springframework.kafka.core.KafkaTemplate
 import org.springframework.kafka.support.SendResult
@@ -41,27 +39,27 @@ import spock.lang.Specification
 
 import java.util.concurrent.CompletableFuture
 
-class EventsPublisherSpec extends Specification {
+class EventsProducerSpec extends Specification {
 
     def legacyKafkaTemplateMock = Mock(KafkaTemplate)
     def mockCloudEventKafkaTemplate = Mock(KafkaTemplate)
     def logger = Spy(ListAppender<ILoggingEvent>)
 
     void setup() {
-        def setupLogger = ((Logger) LoggerFactory.getLogger(EventsPublisher.class))
+        def setupLogger = ((Logger) LoggerFactory.getLogger(EventsProducer.class))
         setupLogger.setLevel(Level.DEBUG)
         setupLogger.addAppender(logger)
         logger.start()
     }
 
     void cleanup() {
-        ((Logger) LoggerFactory.getLogger(EventsPublisher.class)).detachAndStopAllAppenders()
+        ((Logger) LoggerFactory.getLogger(EventsProducer.class)).detachAndStopAllAppenders()
     }
 
-    def objectUnderTest = new EventsPublisher(legacyKafkaTemplateMock, mockCloudEventKafkaTemplate)
+    def objectUnderTest = new EventsProducer(legacyKafkaTemplateMock, mockCloudEventKafkaTemplate)
 
-    def 'Publish Cloud Event'() {
-        given: 'a successfully published event'
+    def 'Send Cloud Event'() {
+        given: 'a successfully sent event'
             def eventFuture = CompletableFuture.completedFuture(
                 new SendResult(
                     new ProducerRecord('some-topic', 'some-value'),
@@ -70,30 +68,30 @@ class EventsPublisherSpec extends Specification {
             )
             def someCloudEvent = Mock(CloudEvent)
             1 * mockCloudEventKafkaTemplate.send('some-topic', 'some-event-key', someCloudEvent) >> eventFuture
-        when: 'publishing the cloud event'
-            objectUnderTest.publishCloudEvent('some-topic', 'some-event-key', someCloudEvent)
+        when: 'sending the cloud event'
+            objectUnderTest.sendCloudEvent('some-topic', 'some-event-key', someCloudEvent)
         then: 'the correct debug message is logged'
             def lastLoggingEvent = logger.list[0]
             assert lastLoggingEvent.level == Level.DEBUG
-            assert lastLoggingEvent.formattedMessage.contains('Successfully published event')
+            assert lastLoggingEvent.formattedMessage.contains('Successfully sent event')
     }
 
-    def 'Publish Cloud Event with Exception'() {
+    def 'Send Cloud Event with Exception'() {
         given: 'a failed event'
             def eventFutureWithFailure = new CompletableFuture<SendResult<String, String>>()
             eventFutureWithFailure.completeExceptionally(new RuntimeException('some exception'))
             def someCloudEvent = Mock(CloudEvent)
             1 * mockCloudEventKafkaTemplate.send('some-topic', 'some-event-key', someCloudEvent) >> eventFutureWithFailure
-        when: 'publishing the cloud event'
-            objectUnderTest.publishCloudEvent('some-topic', 'some-event-key', someCloudEvent)
+        when: 'sending the cloud event'
+            objectUnderTest.sendCloudEvent('some-topic', 'some-event-key', someCloudEvent)
         then: 'the correct error message is logged'
             def lastLoggingEvent = logger.list[0]
             assert lastLoggingEvent.level == Level.ERROR
-            assert lastLoggingEvent.formattedMessage.contains('Unable to publish event')
+            assert lastLoggingEvent.formattedMessage.contains('Unable to send event')
     }
 
-    def 'Publish Legacy Event'() {
-        given: 'a successfully published event'
+    def 'Send Legacy Event'() {
+        given: 'a successfully sent event'
             def eventFuture = CompletableFuture.completedFuture(
                 new SendResult(
                     new ProducerRecord('some-topic', 'some-value'),
@@ -102,16 +100,16 @@ class EventsPublisherSpec extends Specification {
             )
             def someEvent = Mock(Object)
             1 * legacyKafkaTemplateMock.send('some-topic', 'some-event-key', someEvent) >> eventFuture
-        when: 'publishing the cloud event'
-            objectUnderTest.publishEvent('some-topic', 'some-event-key', someEvent)
+        when: 'sending the cloud event'
+            objectUnderTest.sendEvent('some-topic', 'some-event-key', someEvent)
         then: 'the correct debug message is logged'
             def lastLoggingEvent = logger.list[0]
             assert lastLoggingEvent.level == Level.DEBUG
-            assert lastLoggingEvent.formattedMessage.contains('Successfully published event')
+            assert lastLoggingEvent.formattedMessage.contains('Successfully sent event')
     }
 
-    def 'Publish Legacy Event with Headers as Map'() {
-        given: 'a successfully published event'
+    def 'Send Legacy Event with Headers as Map'() {
+        given: 'a successfully sent event'
             def sampleEventHeaders = ['k1': SerializationUtils.serialize('v1')]
             def eventFuture = CompletableFuture.completedFuture(
                 new SendResult(
@@ -120,18 +118,18 @@ class EventsPublisherSpec extends Specification {
                 )
             )
             def someEvent = Mock(Object.class)
-        when: 'publishing the legacy event'
-            objectUnderTest.publishEvent('some-topic', 'some-event-key', sampleEventHeaders, someEvent)
-        then: 'event is published'
+        when: 'sending the legacy event'
+            objectUnderTest.sendEvent('some-topic', 'some-event-key', sampleEventHeaders, someEvent)
+        then: 'event is sent'
             1 * legacyKafkaTemplateMock.send(_) >> eventFuture
         and: 'the correct debug message is logged'
             def lastLoggingEvent = logger.list[0]
             assert lastLoggingEvent.level == Level.DEBUG
-            assert lastLoggingEvent.formattedMessage.contains('Successfully published event')
+            assert lastLoggingEvent.formattedMessage.contains('Successfully sent event')
     }
 
-    def 'Publish Legacy Event with Record Headers'() {
-        given: 'a successfully published event'
+    def 'Send Legacy Event with Record Headers'() {
+        given: 'a successfully sent event'
             def sampleEventHeaders = new RecordHeaders([new RecordHeader('k1', SerializationUtils.serialize('v1'))])
             def sampleProducerRecord = new ProducerRecord('some-topic', null, 'some-key', 'some-value', sampleEventHeaders)
             def eventFuture = CompletableFuture.completedFuture(
@@ -141,18 +139,18 @@ class EventsPublisherSpec extends Specification {
                 )
             )
             def someEvent = Mock(Object.class)
-        when: 'publishing the legacy event'
-            objectUnderTest.publishEvent('some-topic', 'some-event-key', sampleEventHeaders, someEvent)
-        then: 'event is published'
+        when: 'sending the legacy event'
+            objectUnderTest.sendEvent('some-topic', 'some-event-key', sampleEventHeaders, someEvent)
+        then: 'event is sent'
             1 * legacyKafkaTemplateMock.send(_) >> eventFuture
         and: 'the correct debug message is logged'
             def lastLoggingEvent = logger.list[0]
             assert lastLoggingEvent.level == Level.DEBUG
-            assert lastLoggingEvent.formattedMessage.contains('Successfully published event')
+            assert lastLoggingEvent.formattedMessage.contains('Successfully sent event')
     }
 
     def 'Handle Legacy Event Callback'() {
-        given: 'an event is successfully published'
+        given: 'an event is successfully sent'
             def eventFuture = CompletableFuture.completedFuture(
                 new SendResult(
                     new ProducerRecord('some-topic', 'some-value'),
@@ -164,11 +162,11 @@ class EventsPublisherSpec extends Specification {
         then: 'the correct debug message is logged'
             def lastLoggingEvent = logger.list[0]
             assert lastLoggingEvent.level == Level.DEBUG
-            assert lastLoggingEvent.formattedMessage.contains('Successfully published event')
+            assert lastLoggingEvent.formattedMessage.contains('Successfully sent event')
     }
 
     def 'Handle Legacy Event Callback with Exception'() {
-        given: 'a failure to publish an event'
+        given: 'a failure to send an event'
             def eventFutureWithFailure = new CompletableFuture<SendResult<String, String>>()
             eventFutureWithFailure.completeExceptionally(new RuntimeException('some exception'))
         when: 'handling legacy event callback'
@@ -176,7 +174,7 @@ class EventsPublisherSpec extends Specification {
         then: 'the correct error message is logged'
             def lastLoggingEvent = logger.list[0]
             assert lastLoggingEvent.level == Level.ERROR
-            assert lastLoggingEvent.formattedMessage.contains('Unable to publish event')
+            assert lastLoggingEvent.formattedMessage.contains('Unable to send event')
     }
 
     def 'Convert to kafka headers'() {
index a4bfd56..4085a08 100644 (file)
@@ -577,8 +577,8 @@ class CpsDataServiceImplSpec extends Specification {
         and: 'the persistence service method is invoked with the correct parameters'
             1 * mockCpsDataPersistenceService.deleteDataNodes(dataspaceName, _ as Collection<String>)
         and: 'a data update event is sent for each anchor'
-            1 * mockCpsDataUpdateEventsProducer.publishCpsDataUpdateEvent(anchor1, '/', DELETE, observedTimestamp)
-            1 * mockCpsDataUpdateEventsProducer.publishCpsDataUpdateEvent(anchor2, '/', DELETE, observedTimestamp)
+            1 * mockCpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(anchor1, '/', DELETE, observedTimestamp)
+            1 * mockCpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(anchor2, '/', DELETE, observedTimestamp)
     }
 
     def "Validating #scenario when dry run is enabled."() {
@@ -639,11 +639,11 @@ class CpsDataServiceImplSpec extends Specification {
             1 * mockCpsDataPersistenceService.lockAnchor('some-sessionId', 'some-dataspaceName', 'some-anchorName', 250L)
     }
 
-    def 'Exception is thrown while publishing the notification.'(){
+    def 'Exception is thrown while sending the notification.'(){
         given: 'schema set for given anchor and dataspace references test-tree model'
             setupSchemaSetMocks('test-tree.yang')
-        when: 'publisher set to throw an exception'
-            mockCpsDataUpdateEventsProducer.publishCpsDataUpdateEvent(_, _, _, _) >> { throw new Exception("publishing failed")}
+        when: 'producer throws an exception while sending event'
+            mockCpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(_, _, _, _) >> { throw new Exception("Sending failed")}
         and: 'an update event is performed'
             objectUnderTest.updateNodeLeaves(dataspaceName, anchorName, '/', '{"test-tree": {"branch": []}}', observedTimestamp, ContentType.JSON)
         then: 'the exception is not bubbled up'