refactor: Remove LcmEventHeader parameter from sendLcmEvent method 41/142641/2
authorToineSiebelink <toine.siebelink@est.tech>
Wed, 3 Dec 2025 10:47:52 +0000 (10:47 +0000)
committerToineSiebelink <toine.siebelink@est.tech>
Thu, 4 Dec 2025 14:43:24 +0000 (14:43 +0000)
- Remove LcmEventHeader parameter from LcmEventsProducer.sendLcmEvent()
- Extract headers directly from LcmEvent object using new extractHeadersAsMap() method
- Remove json definition of now unnecessary lcm event header object
- Fix bug(?) related to recording metrics when sending event failed
- Removed unnecessary null checks for status during metric record
- Refactored LcmEventHelper, LcmEventProducer and LcmEventProducerHelper into 3 classes with clear responsibilities
  LcmEventProducer               : can send events (depends on LcmObjectCreator)
  LcmEventObjectCreator          : create events(depends on CmHandlePropertyChangeDetector)
  CmHandlePropertyChangeDetector : detects the updates
- Cleaned up corresponding test classes
- Renamed cps/events/EventsProducer.java to cps/events/EventProducer (singular, consistent with other classes)
  (apologies, this caused updates for 3/4 of the files in this commit)
- Renamed dmi/EventProducer to dmi/DmiEventProducer to avoid clashes with main EventProducer (in some testware)

Issue-ID: CPS-3072
Change-Id: I2126fed8a1d45c0360a777b4419103804c5ff9f2
Signed-off-by: ToineSiebelink <toine.siebelink@est.tech>
39 files changed:
cps-ncmp-events/src/main/resources/schemas/lcm/lcm-event-header-v1.json [deleted file]
cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumer.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DmiAsyncRequestResponseEventConsumer.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelper.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumer.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiEventProducer.java [moved from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventProducer.java with 93% similarity]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImpl.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationServicePropertyHandler.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/CmHandlePropertyChangeDetector.java [new file with mode: 0644]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/CmHandlePropertyUpdates.java [moved from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventHeaderMapper.java with 68% similarity]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventObjectCreator.java [new file with mode: 0644]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventProducer.java [new file with mode: 0644]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandlerImpl.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsHelper.java [deleted file]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducer.java [deleted file]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerHelper.java [deleted file]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/utils/events/InventoryEventProducer.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/DmiDataOperationsSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/FilterStrategiesIntegrationSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/SerializationIntegrationSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelperSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventProducerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImplSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationServicePropertyHandlerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/CmHandleStateMonitorSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventProducerSpec.groovy [moved from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventsProducerSpec.groovy with 85% similarity]
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventObjectCreatorSpec.groovy [moved from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerHelperSpec.groovy with 85% similarity]
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventProducerSpec.groovy [new file with mode: 0644]
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandlerImplSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerSpec.groovy [deleted file]
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/InventoryEventProducerSpec.groovy
cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsProducer.java
cps-service/src/main/java/org/onap/cps/events/EventProducer.java [moved from cps-service/src/main/java/org/onap/cps/events/EventsProducer.java with 97% similarity]
cps-service/src/test/groovy/org/onap/cps/events/CpsDataUpdateEventProducerSpec.groovy [moved from cps-service/src/test/groovy/org/onap/cps/events/CpsDataUpdateEventsProducerSpec.groovy with 94% similarity]
cps-service/src/test/groovy/org/onap/cps/events/EventProducerSpec.groovy [moved from cps-service/src/test/groovy/org/onap/cps/events/EventsProducerSpec.groovy with 94% similarity]

diff --git a/cps-ncmp-events/src/main/resources/schemas/lcm/lcm-event-header-v1.json b/cps-ncmp-events/src/main/resources/schemas/lcm/lcm-event-header-v1.json
deleted file mode 100644 (file)
index 8c9922e..0000000
+++ /dev/null
@@ -1,56 +0,0 @@
-{
-
-  "$schema": "https://json-schema.org/draft/2019-09/schema",
-  "$id": "urn:cps:org.onap.ncmp.cmhandle.lcm-event-header:v1",
-  "$ref": "#/definitions/LcmEventHeader",
-
-  "definitions": {
-    "LcmEventHeader": {
-      "description": "The header for LCM event",
-      "type": "object",
-      "javaType" : "org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader",
-      "properties": {
-        "eventId": {
-          "description": "The unique id identifying the event",
-          "type": "string"
-        },
-        "eventCorrelationId": {
-          "description": "The id identifying the event",
-          "type": "string"
-        },
-        "eventTime": {
-          "description": "The timestamp when original event occurred",
-          "type": "string"
-        },
-        "eventSource": {
-          "description": "The source of the event",
-          "type": "string"
-        },
-        "eventType": {
-          "description": "The type of the event",
-          "type": "string"
-        },
-        "eventSchema": {
-          "description": "The schema that this event adheres to",
-          "type": "string"
-        },
-        "eventSchemaVersion": {
-          "description": "The version of the schema that this event adheres to",
-          "type": "string"
-        }
-      },
-      "required": [
-        "eventId",
-        "eventCorrelationId",
-        "eventTime",
-        "eventSource",
-        "eventType",
-        "eventSchema",
-        "eventSchemaVersion",
-        "event"
-      ],
-      "additionalProperties": false
-    }
-
-  }
-}
index edd4872..fae7b9a 100644 (file)
@@ -33,7 +33,7 @@ import org.onap.cps.TestUtils
 import org.onap.cps.api.exceptions.DataValidationException
 import org.onap.cps.api.model.ModuleDefinition
 import org.onap.cps.api.model.ModuleReference
-import org.onap.cps.events.EventsProducer
+import org.onap.cps.events.EventProducer
 import org.onap.cps.ncmp.api.inventory.DataStoreSyncState
 import org.onap.cps.ncmp.api.inventory.models.CompositeState
 import org.onap.cps.ncmp.api.inventory.models.LockReasonCategory
@@ -143,7 +143,7 @@ class NetworkCmProxyControllerSpec extends Specification {
     }
 
     def cleanup() {
-        ((Logger) LoggerFactory.getLogger(EventsProducer.class)).detachAndStopAllAppenders()
+        ((Logger) LoggerFactory.getLogger(EventProducer.class)).detachAndStopAllAppenders()
     }
 
     def 'Get Resource Data from pass-through operational.'() {
index 9b43837..efdd579 100644 (file)
@@ -25,7 +25,7 @@ import io.cloudevents.kafka.impl.KafkaHeaders;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.onap.cps.events.EventsProducer;
+import org.onap.cps.events.EventProducer;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
@@ -39,7 +39,7 @@ import org.springframework.stereotype.Component;
 @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
 public class DataOperationEventConsumer {
 
-    private final EventsProducer eventsProducer;
+    private final EventProducer eventProducer;
 
     /**
      * Consume the DataOperation cloud event sent by producer to topic 'async-m2m.topic'
@@ -58,6 +58,6 @@ public class DataOperationEventConsumer {
                 dataOperationEventConsumerRecord.headers(), "ce_destination");
         final String correlationId = KafkaHeaders.getParsedKafkaHeader(
                 dataOperationEventConsumerRecord.headers(), "ce_correlationid");
-        eventsProducer.sendCloudEvent(eventTarget, correlationId, dataOperationEventConsumerRecord.value());
+        eventProducer.sendCloudEvent(eventTarget, correlationId, dataOperationEventConsumerRecord.value());
     }
 }
index 802e15a..9f53f72 100644 (file)
@@ -22,7 +22,7 @@ package org.onap.cps.ncmp.impl.data.async;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.events.EventsProducer;
+import org.onap.cps.events.EventProducer;
 import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent;
 import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -38,7 +38,7 @@ import org.springframework.stereotype.Component;
 @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
 public class DmiAsyncRequestResponseEventConsumer {
 
-    private final EventsProducer eventsProducer;
+    private final EventProducer eventProducer;
     private final NcmpAsyncRequestResponseEventMapper ncmpAsyncRequestResponseEventMapper;
 
     /**
@@ -56,7 +56,7 @@ public class DmiAsyncRequestResponseEventConsumer {
         log.debug("Consuming event {} ...", dmiAsyncRequestResponseEvent);
         final NcmpAsyncRequestResponseEvent ncmpAsyncRequestResponseEvent =
                 ncmpAsyncRequestResponseEventMapper.toNcmpAsyncEvent(dmiAsyncRequestResponseEvent);
-        eventsProducer.sendLegacyEvent(ncmpAsyncRequestResponseEvent.getEventTarget(),
+        eventProducer.sendLegacyEvent(ncmpAsyncRequestResponseEvent.getEventTarget(),
                                      ncmpAsyncRequestResponseEvent.getEventId(),
                                      ncmpAsyncRequestResponseEvent);
     }
index 8edd21f..ea38e28 100644 (file)
@@ -34,7 +34,7 @@ import java.util.Set;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.events.EventsProducer;
+import org.onap.cps.events.EventProducer;
 import org.onap.cps.ncmp.api.NcmpResponseStatus;
 import org.onap.cps.ncmp.api.data.models.DataOperationDefinition;
 import org.onap.cps.ncmp.api.data.models.DataOperationRequest;
@@ -141,10 +141,10 @@ public class DmiDataOperationsHelper {
         if (!cmHandleIdsPerResponseCodesPerOperation.isEmpty()) {
             final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic,
                     requestId, cmHandleIdsPerResponseCodesPerOperation);
-            final EventsProducer eventsProducer = CpsApplicationContext.getCpsBean(EventsProducer.class);
+            final EventProducer eventProducer = CpsApplicationContext.getCpsBean(EventProducer.class);
             log.warn("sending error message to client topic: {} ,requestId: {}, data operation cloud event id: {}",
                     clientTopic, requestId, dataOperationCloudEvent.getId());
-            eventsProducer.sendCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
+            eventProducer.sendCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
         }
     }
 
index 3397df7..cb5dc2e 100644 (file)
@@ -29,7 +29,7 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.header.Headers;
-import org.onap.cps.events.EventsProducer;
+import org.onap.cps.events.EventProducer;
 import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent;
 import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
 import org.springframework.beans.factory.annotation.Value;
@@ -52,7 +52,7 @@ public class CmAvcEventConsumer {
     @Value("${app.ncmp.avc.cm-events-topic}")
     private String cmEventsTopicName;
 
-    private final EventsProducer eventsProducer;
+    private final EventProducer eventProducer;
     private final CmAvcEventService cmAvcEventService;
     private final InventoryPersistence inventoryPersistence;
 
@@ -76,7 +76,7 @@ public class CmAvcEventConsumer {
         final String outgoingAvcEventKey = cmAvcEventAsConsumerRecord.key();
 
         log.debug("Consuming AVC event with key : {} and value : {}", outgoingAvcEventKey, outgoingAvcEvent);
-        eventsProducer.sendCloudEventUsingEos(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent);
+        eventProducer.sendCloudEventUsingEos(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent);
     }
 
     private void processCmAvcEventChanges(final ConsumerRecord<String, CloudEvent> cmAvcEventAsConsumerRecord) {
@@ -25,7 +25,7 @@ import static org.onap.cps.ncmp.events.NcmpEventDataSchema.SUBSCRIPTIONS_V1;
 import io.cloudevents.CloudEvent;
 import java.util.Map;
 import lombok.RequiredArgsConstructor;
-import org.onap.cps.events.EventsProducer;
+import org.onap.cps.events.EventProducer;
 import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent;
 import org.onap.cps.ncmp.utils.events.NcmpEvent;
 import org.springframework.beans.factory.annotation.Value;
@@ -35,9 +35,9 @@ import org.springframework.stereotype.Component;
 @Component
 @RequiredArgsConstructor
 @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
-public class EventProducer {
+public class DmiEventProducer {
 
-    private final EventsProducer eventsProducer;
+    private final EventProducer eventProducer;
 
     @Value("${app.ncmp.avc.cm-subscription-dmi-in}")
     private String dmiInEventTopic;
@@ -52,7 +52,7 @@ public class EventProducer {
      */
     public void send(final String subscriptionId, final String dmiPluginName,
                      final String eventType, final DataJobSubscriptionDmiInEvent event) {
-        eventsProducer.sendCloudEvent(dmiInEventTopic, subscriptionId,
+        eventProducer.sendCloudEvent(dmiInEventTopic, subscriptionId,
             toCloudEvent(eventType, event, subscriptionId, dmiPluginName));
 
     }
@@ -68,5 +68,4 @@ public class EventProducer {
             .asCloudEvent();
     }
 
-
 }
index f4fc169..dcbbdcd 100644 (file)
@@ -32,8 +32,8 @@ import java.util.Set;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataSelector;
+import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiEventProducer;
 import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventMapper;
-import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.EventProducer;
 import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus;
 import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent;
 import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService;
@@ -52,7 +52,7 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler {
 
     private final CmDataJobSubscriptionPersistenceService cmDataJobSubscriptionPersistenceService;
     private final DmiInEventMapper dmiInEventMapper;
-    private final EventProducer eventProducer;
+    private final DmiEventProducer dmiEventProducer;
     private final InventoryPersistence inventoryPersistence;
     private final AlternateIdMatcher alternateIdMatcher;
 
@@ -134,7 +134,7 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler {
 
             final DataJobSubscriptionDmiInEvent dmiInEvent;
             dmiInEvent = buildDmiInEvent(cmHandleIdsAndDataNodeSelectors, dataSelector);
-            eventProducer.send(subscriptionId, dmiServiceName, eventType, dmiInEvent);
+            dmiEventProducer.send(subscriptionId, dmiServiceName, eventType, dmiInEvent);
         }
     }
 
index dcba275..a8cd66d 100644 (file)
@@ -51,7 +51,7 @@ import org.onap.cps.ncmp.api.inventory.models.CmHandleRegistrationResponse;
 import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle;
 import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
 import org.onap.cps.ncmp.impl.inventory.sync.lcm.CmHandleTransitionPair;
-import org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventsHelper;
+import org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventProducer;
 import org.onap.cps.ncmp.impl.utils.YangDataConverter;
 import org.onap.cps.utils.JsonObjectMapper;
 import org.springframework.beans.factory.annotation.Qualifier;
@@ -70,7 +70,7 @@ public class CmHandleRegistrationServicePropertyHandler {
     private final AlternateIdChecker alternateIdChecker;
     @Qualifier("cmHandleIdPerAlternateId")
     private final IMap<String, String> cmHandleIdPerAlternateId;
-    private final LcmEventsHelper lcmEventsHelper;
+    private final LcmEventProducer lcmEventProducer;
 
     /**
      * Iterates over incoming updatedNcmpServiceCmHandles and update the dataNodes based on the updated attributes.
@@ -172,7 +172,7 @@ public class CmHandleRegistrationServicePropertyHandler {
         final YangModelCmHandle updatedYangModelCmHandle = inventoryPersistence.getYangModelCmHandle(cmHandleId);
         final CmHandleTransitionPair cmHandleTransitionPair =
             new CmHandleTransitionPair(currentYangModelCmHandle, updatedYangModelCmHandle);
-        lcmEventsHelper.sendLcmEventBatchAsynchronously(List.of(cmHandleTransitionPair));
+        lcmEventProducer.sendLcmEventBatchAsynchronously(List.of(cmHandleTransitionPair));
     }
 
     private void updateProperties(final DataNode existingCmHandleDataNode, final PropertyType propertyType,
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/CmHandlePropertyChangeDetector.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/CmHandlePropertyChangeDetector.java
new file mode 100644 (file)
index 0000000..e3cf1fb
--- /dev/null
@@ -0,0 +1,189 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.inventory.sync.lcm;
+
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle;
+import org.onap.cps.ncmp.events.lcm.v1.Values;
+
+/**
+ * Utility class for examining and determining changes in CM handle properties.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class CmHandlePropertyChangeDetector {
+
+    /**
+     * Determines property (update) values for creating a new CM handle.
+     *
+     * @param ncmpServiceCmHandle the CM handle being created
+     * @return CmHandlePropertyUpdates containing new values for the created CM handle
+     */
+    static CmHandlePropertyUpdates determineUpdatesForCreate(final NcmpServiceCmHandle ncmpServiceCmHandle) {
+        final CmHandlePropertyUpdates cmHandlePropertyUpdates = new CmHandlePropertyUpdates();
+        cmHandlePropertyUpdates.setNewValues(new Values());
+        cmHandlePropertyUpdates.getNewValues().setDataSyncEnabled(getDataSyncEnabledFlag(ncmpServiceCmHandle));
+        cmHandlePropertyUpdates.getNewValues()
+            .setCmHandleState(mapCmHandleStateToLcmEventCmHandleState(ncmpServiceCmHandle));
+        cmHandlePropertyUpdates.getNewValues()
+            .setCmHandleProperties(List.of(ncmpServiceCmHandle.getPublicProperties()));
+        return cmHandlePropertyUpdates;
+    }
+
+    /**
+     * Determines property updates between current and target CM handle states.
+     *
+     * @param currentNcmpServiceCmHandle the current CM handle state
+     * @param targetNcmpServiceCmHandle  the target CM handle state
+     * @return CmHandlePropertyUpdates containing old and new values for changed properties
+     */
+    static CmHandlePropertyUpdates determineUpdates(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
+                                                           final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
+        final boolean hasDataSyncFlagEnabledChanged =
+            hasDataSyncEnabledFlagChanged(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle);
+        final boolean hasCmHandleStateChanged =
+            hasCmHandleStateChanged(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle);
+        final boolean arePublicCmHandlePropertiesEqual =
+            arePublicCmHandlePropertiesEqual(currentNcmpServiceCmHandle.getPublicProperties(),
+                targetNcmpServiceCmHandle.getPublicProperties()
+            );
+
+        final CmHandlePropertyUpdates cmHandlePropertyUpdates = new CmHandlePropertyUpdates();
+
+        if (hasDataSyncFlagEnabledChanged || hasCmHandleStateChanged || (!arePublicCmHandlePropertiesEqual)) {
+            cmHandlePropertyUpdates.setOldValues(new Values());
+            cmHandlePropertyUpdates.setNewValues(new Values());
+        } else {
+            return cmHandlePropertyUpdates;
+        }
+
+        if (hasDataSyncFlagEnabledChanged) {
+            setDataSyncEnabledFlag(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle, cmHandlePropertyUpdates);
+        }
+
+        if (hasCmHandleStateChanged) {
+            setCmHandleStateChange(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle, cmHandlePropertyUpdates);
+        }
+
+        if (!arePublicCmHandlePropertiesEqual) {
+            setPublicCmHandlePropertiesChange(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle,
+                cmHandlePropertyUpdates);
+        }
+
+        return cmHandlePropertyUpdates;
+
+    }
+
+    private static void setDataSyncEnabledFlag(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
+                                               final NcmpServiceCmHandle targetNcmpServiceCmHandle,
+                                               final CmHandlePropertyUpdates cmHandlePropertyUpdates) {
+        cmHandlePropertyUpdates.getOldValues().setDataSyncEnabled(getDataSyncEnabledFlag(currentNcmpServiceCmHandle));
+        cmHandlePropertyUpdates.getNewValues().setDataSyncEnabled(getDataSyncEnabledFlag(targetNcmpServiceCmHandle));
+
+    }
+
+    private static void setCmHandleStateChange(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
+                                               final NcmpServiceCmHandle targetNcmpServiceCmHandle,
+                                               final CmHandlePropertyUpdates cmHandlePropertyUpdates) {
+        cmHandlePropertyUpdates.getOldValues()
+            .setCmHandleState(mapCmHandleStateToLcmEventCmHandleState(currentNcmpServiceCmHandle));
+        cmHandlePropertyUpdates.getNewValues()
+            .setCmHandleState(mapCmHandleStateToLcmEventCmHandleState(targetNcmpServiceCmHandle));
+    }
+
+    private static void setPublicCmHandlePropertiesChange(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
+                                                          final NcmpServiceCmHandle targetNcmpServiceCmHandle,
+                                                          final CmHandlePropertyUpdates cmHandlePropertyUpdates) {
+
+        final Map<String, Map<String, String>> publicCmHandlePropertiesDifference =
+            getPublicCmHandlePropertiesDifference(currentNcmpServiceCmHandle.getPublicProperties(),
+                targetNcmpServiceCmHandle.getPublicProperties()
+            );
+        cmHandlePropertyUpdates.getOldValues()
+            .setCmHandleProperties(List.of(publicCmHandlePropertiesDifference.get("oldValues")));
+        cmHandlePropertyUpdates.getNewValues()
+            .setCmHandleProperties(List.of(publicCmHandlePropertiesDifference.get("newValues")));
+
+    }
+
+    private static Values.CmHandleState mapCmHandleStateToLcmEventCmHandleState(
+        final NcmpServiceCmHandle ncmpServiceCmHandle) {
+        return Values.CmHandleState.fromValue(ncmpServiceCmHandle.getCompositeState().getCmHandleState().name());
+    }
+
+    private static Boolean getDataSyncEnabledFlag(final NcmpServiceCmHandle ncmpServiceCmHandle) {
+        return ncmpServiceCmHandle.getCompositeState().getDataSyncEnabled();
+    }
+
+    private static boolean hasDataSyncEnabledFlagChanged(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
+                                                         final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
+        final Boolean currentDataSyncFlag = currentNcmpServiceCmHandle.getCompositeState().getDataSyncEnabled();
+        final Boolean targetDataSyncFlag = targetNcmpServiceCmHandle.getCompositeState().getDataSyncEnabled();
+
+        if (targetDataSyncFlag == null) {
+            return currentDataSyncFlag != null;
+        }
+
+        return !targetDataSyncFlag.equals(currentDataSyncFlag);
+    }
+
+    private static boolean hasCmHandleStateChanged(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
+                                                   final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
+        return targetNcmpServiceCmHandle.getCompositeState().getCmHandleState()
+            != currentNcmpServiceCmHandle.getCompositeState().getCmHandleState();
+    }
+
+    private static boolean arePublicCmHandlePropertiesEqual(final Map<String, String> currentCmHandleProperties,
+                                                            final Map<String, String> targetCmHandleProperties) {
+        if (targetCmHandleProperties.size() != currentCmHandleProperties.size()) {
+            return false;
+        }
+        return targetCmHandleProperties.equals(currentCmHandleProperties);
+    }
+
+    private static Map<String, Map<String, String>> getPublicCmHandlePropertiesDifference(
+        final Map<String, String> currentCmHandleProperties,
+        final Map<String, String> targetCmHandleProperties) {
+        final Map<String, Map<String, String>> oldAndNewPropertiesDifferenceMap = new HashMap<>(2);
+
+        final MapDifference<String, String> cmHandlePropertiesDifference =
+            Maps.difference(targetCmHandleProperties, currentCmHandleProperties);
+
+        final Map<String, String> oldValues = new HashMap<>(cmHandlePropertiesDifference.entriesOnlyOnRight());
+        final Map<String, String> newValues = new HashMap<>(cmHandlePropertiesDifference.entriesOnlyOnLeft());
+
+        cmHandlePropertiesDifference.entriesDiffering().keySet().forEach(cmHandlePropertyName -> {
+            oldValues.put(cmHandlePropertyName, currentCmHandleProperties.get(cmHandlePropertyName));
+            newValues.put(cmHandlePropertyName, targetCmHandleProperties.get(cmHandlePropertyName));
+        });
+
+        oldAndNewPropertiesDifferenceMap.put("oldValues", oldValues);
+        oldAndNewPropertiesDifferenceMap.put("newValues", newValues);
+
+        return oldAndNewPropertiesDifferenceMap;
+    }
+
+}
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation
+ * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.cps.ncmp.impl.inventory.sync.lcm;
 
-import org.mapstruct.Mapper;
-import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
-import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader;
-
-@Mapper(componentModel = "spring")
-public interface LcmEventHeaderMapper {
-
-    /**
-     * Mapper for converting incoming {@link LcmEvent} to outgoing {@link LcmEventHeader}.
-     */
-
-    LcmEventHeader toLcmEventHeader(LcmEvent lcmEvent);
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.onap.cps.ncmp.events.lcm.v1.Values;
 
+@NoArgsConstructor
+@Getter
+@Setter
+class CmHandlePropertyUpdates {
+    private Values oldValues;
+    private Values newValues;
 }
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventObjectCreator.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventObjectCreator.java
new file mode 100644 (file)
index 0000000..5c7b6fd
--- /dev/null
@@ -0,0 +1,108 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.inventory.sync.lcm;
+
+import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.DELETED;
+import static org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventType.CREATE;
+import static org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventType.DELETE;
+import static org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventType.UPDATE;
+
+import java.util.UUID;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle;
+import org.onap.cps.ncmp.events.lcm.v1.Event;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
+import org.onap.cps.ncmp.impl.utils.EventDateTimeFormatter;
+import org.springframework.stereotype.Service;
+
+/**
+ * LcmEventObjectCreator to create the events send by LcmEventProducer.
+ */
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class LcmEventObjectCreator {
+
+    /**
+     * Create Lifecycle Management Event.
+     *
+     * @param currentNcmpServiceCmHandle  current ncmp service cmhandle
+     * @param targetNcmpServiceCmHandle   target ncmp service cmhandle
+     * @return Populated LcmEvent
+     */
+    public LcmEvent createLcmEvent(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
+                                    final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
+        final String cmHandleId = targetNcmpServiceCmHandle.getCmHandleId();
+        final LcmEventType lcmEventType =
+            determineEventType(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle);
+        final LcmEvent lcmEvent = createLcmEventWithHeaderDetails(cmHandleId, lcmEventType);
+        final Event event = new Event();
+        event.setCmHandleId(cmHandleId);
+        event.setAlternateId(targetNcmpServiceCmHandle.getAlternateId());
+        event.setModuleSetTag(targetNcmpServiceCmHandle.getModuleSetTag());
+        event.setDataProducerIdentifier(targetNcmpServiceCmHandle.getDataProducerIdentifier());
+        final CmHandlePropertyUpdates cmHandlePropertyUpdates =
+            determineEventValues(lcmEventType, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle);
+        event.setOldValues(cmHandlePropertyUpdates.getOldValues());
+        event.setNewValues(cmHandlePropertyUpdates.getNewValues());
+        lcmEvent.setEvent(event);
+        return lcmEvent;
+    }
+
+    private static LcmEventType determineEventType(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
+                                                   final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
+
+        if (currentNcmpServiceCmHandle.getCompositeState() == null) {
+            return CREATE;
+        } else if (targetNcmpServiceCmHandle.getCompositeState().getCmHandleState() == DELETED) {
+            return DELETE;
+        }
+        return UPDATE;
+    }
+
+    private static CmHandlePropertyUpdates determineEventValues(final LcmEventType lcmEventType,
+                                                                final NcmpServiceCmHandle currentNcmpServiceCmHandle,
+                                                                final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
+        if (CREATE == lcmEventType) {
+            return CmHandlePropertyChangeDetector.determineUpdatesForCreate(targetNcmpServiceCmHandle);
+        }
+        if (UPDATE == lcmEventType) {
+            return CmHandlePropertyChangeDetector.determineUpdates(currentNcmpServiceCmHandle,
+                targetNcmpServiceCmHandle);
+        }
+        return new CmHandlePropertyUpdates();
+    }
+
+    private LcmEvent createLcmEventWithHeaderDetails(final String eventCorrelationId, final LcmEventType lcmEventType) {
+        final LcmEvent lcmEvent = new LcmEvent();
+        lcmEvent.setEventId(UUID.randomUUID().toString());
+        lcmEvent.setEventCorrelationId(eventCorrelationId);
+        lcmEvent.setEventTime(EventDateTimeFormatter.getCurrentIsoFormattedDateTime());
+        lcmEvent.setEventSource("org.onap.ncmp");
+        lcmEvent.setEventType(lcmEventType.getEventType());
+        lcmEvent.setEventSchema("org.onap.ncmp:cmhandle-lcm-event");
+        lcmEvent.setEventSchemaVersion("1.0");
+        return lcmEvent;
+    }
+
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventProducer.java
new file mode 100644 (file)
index 0000000..85532b3
--- /dev/null
@@ -0,0 +1,139 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.inventory.sync.lcm;
+
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Tag;
+import io.micrometer.core.instrument.Timer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.events.EventProducer;
+import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
+import org.onap.cps.ncmp.events.lcm.v1.Values;
+import org.onap.cps.ncmp.impl.utils.YangDataConverter;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.KafkaException;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Service;
+
+/**
+ * Producer service for sending Lifecycle Management (LCM) events.
+ * This service is responsible for creating and publishing LCM events to Kafka topics
+ * when CM handle state transitions occur. It supports asynchronous batch processing
+ * and includes metrics collection for monitoring event publishing performance.
+ */
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class LcmEventProducer {
+
+    private static final Tag METRIC_TAG_METHOD = Tag.of("method", "sendLcmEvent");
+    private static final Tag METRIC_TAG_CLASS = Tag.of("class", LcmEventProducer.class.getName());
+    private final EventProducer eventProducer;
+    private final LcmEventObjectCreator lcmEventObjectCreator;
+    private final MeterRegistry meterRegistry;
+
+    @Value("${app.lcm.events.topic:ncmp-events}")
+    private String topicName;
+
+    @Value("${notification.enabled:true}")
+    private boolean notificationsEnabled;
+
+    /**
+     * Sends LCM events in batches asynchronously for CM handle state transitions.
+     * This method processes a collection of CM handle transition pairs and sends
+     * corresponding LCM events to the configured Kafka topic. The processing is
+     * performed asynchronously using the "notificationExecutor" thread pool.
+     *
+     * @param cmHandleTransitionPairs Collection of pairs containing current and target
+     *                               CM handle states represented as YangModelCmHandle objects
+     */
+    @Async("notificationExecutor")
+    public void sendLcmEventBatchAsynchronously(final Collection<CmHandleTransitionPair> cmHandleTransitionPairs) {
+        cmHandleTransitionPairs.forEach(cmHandleTransitionPair -> sendLcmEvent(
+            YangDataConverter.toNcmpServiceCmHandle(cmHandleTransitionPair.currentYangModelCmHandle()),
+            YangDataConverter.toNcmpServiceCmHandle(cmHandleTransitionPair.targetYangModelCmHandle())
+        ));
+    }
+
+    /**
+     * Sends a single LCM event for a CM handle state transition.
+     * Creates an LCM event using the provided current and target CM handle states,
+     * publishes it to the configured Kafka topic, and records metrics for monitoring.
+     * Event publishing is conditional based on the notifications enabled configuration.
+     *
+     * @param currentNcmpServiceCmHandle The current state of the CM handle
+     * @param targetNcmpServiceCmHandle  The target state of the CM handle
+     */
+    private void sendLcmEvent(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
+                              final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
+        if (notificationsEnabled) {
+            final LcmEvent lcmEvent = lcmEventObjectCreator.createLcmEvent(currentNcmpServiceCmHandle,
+                                                                           targetNcmpServiceCmHandle);
+            final Timer.Sample timerSample = Timer.start(meterRegistry);
+            try {
+                final Map<String, Object> headersAsMap = extractHeadersAsMap(lcmEvent);
+                final String eventKey = currentNcmpServiceCmHandle.getCmHandleId();
+                eventProducer.sendLegacyEvent(topicName, eventKey, headersAsMap, lcmEvent);
+                recordMetrics(lcmEvent, timerSample);
+            } catch (final KafkaException e) {
+                log.error("Unable to send message to topic : {} and cause : {}", topicName, e.getMessage());
+            }
+        } else {
+            log.debug("Notifications disabled.");
+        }
+    }
+
+    private Map<String, Object> extractHeadersAsMap(final LcmEvent lcmEvent) {
+        final Map<String, Object> headersAsMap = new HashMap<>(7);
+        headersAsMap.put("eventId", lcmEvent.getEventId());
+        headersAsMap.put("eventCorrelationId", lcmEvent.getEventCorrelationId());
+        headersAsMap.put("eventTime", lcmEvent.getEventTime());
+        headersAsMap.put("eventSource", lcmEvent.getEventSource());
+        headersAsMap.put("eventType", lcmEvent.getEventType());
+        headersAsMap.put("eventSchema", lcmEvent.getEventSchema());
+        headersAsMap.put("eventSchemaVersion", lcmEvent.getEventSchemaVersion());
+        return headersAsMap;
+    }
+
+    private void recordMetrics(final LcmEvent lcmEvent, final Timer.Sample timerSample) {
+        final List<Tag> tags = new ArrayList<>(4);
+        tags.add(METRIC_TAG_CLASS);
+        tags.add(METRIC_TAG_METHOD);
+        tags.add(createCmHandleStateTag("oldCmHandleState", lcmEvent.getEvent().getOldValues()));
+        tags.add(createCmHandleStateTag("newCmHandleState", lcmEvent.getEvent().getNewValues()));
+        timerSample.stop(Timer.builder("cps.ncmp.lcm.events.send")
+            .description("Time taken to send a LCM event")
+            .tags(tags)
+            .register(meterRegistry));
+    }
+
+    private Tag createCmHandleStateTag(final String tageLabel, final Values values) {
+        return Tag.of(tageLabel, values.getCmHandleState().value());
+    }
+
+}
index 23faf53..37e1ae3 100644 (file)
@@ -48,7 +48,7 @@ import org.springframework.stereotype.Service;
 public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleStateHandler {
 
     private final InventoryPersistence inventoryPersistence;
-    private final LcmEventsHelper lcmEventsHelper;
+    private final LcmEventProducer lcmEventProducer;
     private final CmHandleStateMonitor cmHandleStateMonitor;
 
     @Override
@@ -58,8 +58,10 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState
         final Collection<CmHandleTransitionPair> cmHandleTransitionPairs =
                 prepareCmHandleTransitionBatch(targetCmHandleStatePerCmHandle);
         persistCmHandleBatch(cmHandleTransitionPairs);
-        lcmEventsHelper.sendLcmEventBatchAsynchronously(cmHandleTransitionPairs);
-        cmHandleStateMonitor.updateCmHandleStateMetrics(cmHandleTransitionPairs);
+        if (!cmHandleTransitionPairs.isEmpty()) {
+            lcmEventProducer.sendLcmEventBatchAsynchronously(cmHandleTransitionPairs);
+            cmHandleStateMonitor.updateCmHandleStateMetrics(cmHandleTransitionPairs);
+        }
     }
 
     @Override
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsHelper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsHelper.java
deleted file mode 100644 (file)
index 375a602..0000000
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.inventory.sync.lcm;
-
-import java.util.Collection;
-import lombok.RequiredArgsConstructor;
-import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle;
-import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
-import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader;
-import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
-import org.onap.cps.ncmp.impl.utils.YangDataConverter;
-import org.springframework.scheduling.annotation.Async;
-import org.springframework.stereotype.Service;
-
-@Service
-@RequiredArgsConstructor
-public class LcmEventsHelper {
-
-    private final LcmEventsProducerHelper lcmEventsProducerHelper;
-    private final LcmEventsProducer lcmEventsProducer;
-
-    /**
-     * Send LcmEvent in batches and in asynchronous manner.
-     *
-     * @param cmHandleTransitionPairs Pair of existing and modified cm handle represented as YangModelCmHandle
-     */
-    @Async("notificationExecutor")
-    public void sendLcmEventBatchAsynchronously(final Collection<CmHandleTransitionPair> cmHandleTransitionPairs) {
-        cmHandleTransitionPairs.forEach(cmHandleTransitionPair -> sendLcmEvent(
-            toNcmpServiceCmHandle(cmHandleTransitionPair.currentYangModelCmHandle()),
-            toNcmpServiceCmHandle(cmHandleTransitionPair.targetYangModelCmHandle())
-        ));
-    }
-
-    private void sendLcmEvent(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
-                              final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
-        final String cmHandleId = targetNcmpServiceCmHandle.getCmHandleId();
-        final LcmEventHeader lcmEventHeader =
-                lcmEventsProducerHelper.createLcmEventHeader(cmHandleId, currentNcmpServiceCmHandle,
-                    targetNcmpServiceCmHandle
-                );
-        final LcmEvent lcmEvent =
-                lcmEventsProducerHelper.createLcmEvent(cmHandleId, currentNcmpServiceCmHandle,
-                    targetNcmpServiceCmHandle
-                );
-        lcmEventsProducer.sendLcmEvent(cmHandleId, lcmEvent, lcmEventHeader);
-    }
-
-    private static NcmpServiceCmHandle toNcmpServiceCmHandle(final YangModelCmHandle yangModelCmHandle) {
-        return YangDataConverter.toNcmpServiceCmHandle(yangModelCmHandle);
-    }
-}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducer.java
deleted file mode 100644 (file)
index 333f067..0000000
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.inventory.sync.lcm;
-
-import io.micrometer.core.instrument.MeterRegistry;
-import io.micrometer.core.instrument.Tag;
-import io.micrometer.core.instrument.Timer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.events.EventsProducer;
-import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
-import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader;
-import org.onap.cps.ncmp.events.lcm.v1.Values;
-import org.onap.cps.utils.JsonObjectMapper;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.kafka.KafkaException;
-import org.springframework.stereotype.Service;
-
-/**
- * LcmEventsProducer to call the producer and send on the dedicated topic.
- */
-
-@Slf4j
-@Service
-@RequiredArgsConstructor
-public class LcmEventsProducer {
-
-    private static final Tag TAG_METHOD = Tag.of("method", "sendLcmEvent");
-    private static final Tag TAG_CLASS = Tag.of("class", LcmEventsProducer.class.getName());
-    private static final String UNAVAILABLE_CM_HANDLE_STATE = "N/A";
-    private final EventsProducer eventsProducer;
-    private final JsonObjectMapper jsonObjectMapper;
-    private final MeterRegistry meterRegistry;
-
-    @Value("${app.lcm.events.topic:ncmp-events}")
-    private String topicName;
-
-    @Value("${notification.enabled:true}")
-    private boolean notificationsEnabled;
-
-    /**
-     * Sends an LCM event to the dedicated topic with optional notification headers.
-     * Capture and log KafkaException If an error occurs while sending the event to Kafka
-     *
-     * @param cmHandleId     CM handle id associated with the LCM event
-     * @param lcmEvent       The LCM event object to be sent
-     * @param lcmEventHeader Optional headers associated with the LCM event
-     */
-    public void sendLcmEvent(final String cmHandleId, final LcmEvent lcmEvent, final LcmEventHeader lcmEventHeader) {
-        if (notificationsEnabled) {
-            lcmEventHeader.setEventId(lcmEvent.getEventId());
-            lcmEventHeader.setEventTime(lcmEvent.getEventTime());
-            final Timer.Sample timerSample = Timer.start(meterRegistry);
-            try {
-                @SuppressWarnings("unchecked")
-                final Map<String, Object> lcmEventHeadersMap =
-                        jsonObjectMapper.convertToValueType(lcmEventHeader, Map.class);
-                eventsProducer.sendLegacyEvent(topicName, cmHandleId, lcmEventHeadersMap, lcmEvent);
-            } catch (final KafkaException e) {
-                log.error("Unable to send message to topic : {} and cause : {}", topicName, e.getMessage());
-            } finally {
-                recordMetrics(lcmEvent, timerSample);
-            }
-        } else {
-            log.debug("Notifications disabled.");
-        }
-    }
-
-    private void recordMetrics(final LcmEvent lcmEvent, final Timer.Sample timerSample) {
-        final List<Tag> tags = new ArrayList<>(4);
-        tags.add(TAG_CLASS);
-        tags.add(TAG_METHOD);
-
-        final String oldCmHandleState = extractCmHandleStateValue(lcmEvent.getEvent().getOldValues());
-        tags.add(Tag.of("oldCmHandleState", oldCmHandleState));
-
-        final String newCmHandleState = extractCmHandleStateValue(lcmEvent.getEvent().getNewValues());
-        tags.add(Tag.of("newCmHandleState", newCmHandleState));
-
-        timerSample.stop(Timer.builder("cps.ncmp.lcm.events.send")
-                .description("Time taken to send a LCM event")
-                .tags(tags)
-                .register(meterRegistry));
-    }
-
-    /**
-     * Extracts the CM handle state value from the given Values object.
-     * If the provided Values object or its CM handle state is null, returns a default value.
-     *
-     * @param values The Values object containing CM handle state information.
-     * @return The CM handle state value as a string, or a default value if null.
-     */
-    private String extractCmHandleStateValue(final Values values) {
-        return (values != null && values.getCmHandleState() != null)
-                ? values.getCmHandleState().value()
-                : UNAVAILABLE_CM_HANDLE_STATE;
-    }
-}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerHelper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerHelper.java
deleted file mode 100644 (file)
index 580a3a0..0000000
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.inventory.sync.lcm;
-
-import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.DELETED;
-import static org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventType.CREATE;
-import static org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventType.DELETE;
-import static org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventType.UPDATE;
-
-import com.google.common.collect.MapDifference;
-import com.google.common.collect.Maps;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.RequiredArgsConstructor;
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle;
-import org.onap.cps.ncmp.events.lcm.v1.Event;
-import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
-import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader;
-import org.onap.cps.ncmp.events.lcm.v1.Values;
-import org.onap.cps.ncmp.impl.utils.EventDateTimeFormatter;
-import org.springframework.stereotype.Component;
-
-/**
- * LcmEventsProducerHelper to create LcmEvent based on relevant operation.
- */
-@Slf4j
-@Component
-@RequiredArgsConstructor
-public class LcmEventsProducerHelper {
-
-    private final LcmEventHeaderMapper lcmEventHeaderMapper;
-
-    /**
-     * Create Lifecycle Management Event.
-     *
-     * @param cmHandleId                  cm handle identifier
-     * @param currentNcmpServiceCmHandle  current ncmp service cmhandle
-     * @param targetNcmpServiceCmHandle   target ncmp service cmhandle
-     * @return Populated LcmEvent
-     */
-    public LcmEvent createLcmEvent(final String cmHandleId,
-                                   final NcmpServiceCmHandle currentNcmpServiceCmHandle,
-                                   final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
-        final LcmEventType lcmEventType =
-            determineEventType(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle);
-        final LcmEvent lcmEvent = createLcmEventWithHeaderDetails(cmHandleId, lcmEventType);
-        final Event event = new Event();
-        event.setCmHandleId(cmHandleId);
-        event.setAlternateId(targetNcmpServiceCmHandle.getAlternateId());
-        event.setModuleSetTag(targetNcmpServiceCmHandle.getModuleSetTag());
-        event.setDataProducerIdentifier(targetNcmpServiceCmHandle.getDataProducerIdentifier());
-        final CmHandleValuesHolder cmHandleValuesHolder =
-            determineEventValues(lcmEventType, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle);
-        event.setOldValues(cmHandleValuesHolder.getOldValues());
-        event.setNewValues(cmHandleValuesHolder.getNewValues());
-        lcmEvent.setEvent(event);
-        return lcmEvent;
-    }
-
-    /**
-     * Create Lifecycle Management Event Header.
-     *
-     * @param cmHandleId                 cm handle identifier
-     * @param currentNcmpServiceCmHandle current ncmp service cmhandle
-     * @param targetNcmpServiceCmHandle  target ncmp service cmhandle
-     * @return Populated LcmEventHeader
-     */
-    public LcmEventHeader createLcmEventHeader(final String cmHandleId,
-                                               final NcmpServiceCmHandle currentNcmpServiceCmHandle,
-                                               final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
-        final LcmEventType lcmEventType =
-                determineEventType(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle);
-        final LcmEvent lcmEventWithHeaderDetails = createLcmEventWithHeaderDetails(cmHandleId, lcmEventType);
-        return lcmEventHeaderMapper.toLcmEventHeader(lcmEventWithHeaderDetails);
-    }
-
-    private static LcmEventType determineEventType(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
-                                                   final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
-
-        if (currentNcmpServiceCmHandle.getCompositeState() == null) {
-            return CREATE;
-        } else if (targetNcmpServiceCmHandle.getCompositeState().getCmHandleState() == DELETED) {
-            return DELETE;
-        }
-        return UPDATE;
-    }
-
-    private static CmHandleValuesHolder determineEventValues(final LcmEventType lcmEventType,
-                                                             final NcmpServiceCmHandle currentNcmpServiceCmHandle,
-                                                             final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
-        if (CREATE == lcmEventType) {
-            return determineCreateEventValues(targetNcmpServiceCmHandle);
-        } else if (UPDATE == lcmEventType) {
-            return determineUpdateEventValues(targetNcmpServiceCmHandle, currentNcmpServiceCmHandle);
-        }
-        return new CmHandleValuesHolder();
-
-    }
-
-    private LcmEvent createLcmEventWithHeaderDetails(final String eventCorrelationId, final LcmEventType lcmEventType) {
-        final LcmEvent lcmEvent = new LcmEvent();
-        lcmEvent.setEventId(UUID.randomUUID().toString());
-        lcmEvent.setEventCorrelationId(eventCorrelationId);
-        lcmEvent.setEventTime(EventDateTimeFormatter.getCurrentIsoFormattedDateTime());
-        lcmEvent.setEventSource("org.onap.ncmp");
-        lcmEvent.setEventType(lcmEventType.getEventType());
-        lcmEvent.setEventSchema("org.onap.ncmp:cmhandle-lcm-event");
-        lcmEvent.setEventSchemaVersion("1.0");
-        return lcmEvent;
-    }
-
-
-    private static CmHandleValuesHolder determineCreateEventValues(final NcmpServiceCmHandle ncmpServiceCmHandle) {
-        final CmHandleValuesHolder cmHandleValuesHolder = new CmHandleValuesHolder();
-        cmHandleValuesHolder.setNewValues(new Values());
-        cmHandleValuesHolder.getNewValues().setDataSyncEnabled(getDataSyncEnabledFlag(ncmpServiceCmHandle));
-        cmHandleValuesHolder.getNewValues()
-                .setCmHandleState(mapCmHandleStateToLcmEventCmHandleState(ncmpServiceCmHandle));
-        cmHandleValuesHolder.getNewValues().setCmHandleProperties(List.of(ncmpServiceCmHandle.getPublicProperties()));
-        return cmHandleValuesHolder;
-    }
-
-    private static CmHandleValuesHolder determineUpdateEventValues(final NcmpServiceCmHandle targetNcmpServiceCmHandle,
-                                                                final NcmpServiceCmHandle currentNcmpServiceCmHandle) {
-        final boolean hasDataSyncFlagEnabledChanged =
-                hasDataSyncEnabledFlagChanged(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle);
-        final boolean hasCmHandleStateChanged =
-                hasCmHandleStateChanged(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle);
-        final boolean arePublicCmHandlePropertiesEqual =
-                arePublicCmHandlePropertiesEqual(currentNcmpServiceCmHandle.getPublicProperties(),
-                    targetNcmpServiceCmHandle.getPublicProperties()
-                );
-
-        final CmHandleValuesHolder cmHandleValuesHolder = new CmHandleValuesHolder();
-
-        if (hasDataSyncFlagEnabledChanged || hasCmHandleStateChanged || (!arePublicCmHandlePropertiesEqual)) {
-            cmHandleValuesHolder.setOldValues(new Values());
-            cmHandleValuesHolder.setNewValues(new Values());
-        } else {
-            return cmHandleValuesHolder;
-        }
-
-        if (hasDataSyncFlagEnabledChanged) {
-            setDataSyncEnabledFlag(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle, cmHandleValuesHolder);
-        }
-
-        if (hasCmHandleStateChanged) {
-            setCmHandleStateChange(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle, cmHandleValuesHolder);
-        }
-
-        if (!arePublicCmHandlePropertiesEqual) {
-            setPublicCmHandlePropertiesChange(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle,
-                cmHandleValuesHolder);
-        }
-
-        return cmHandleValuesHolder;
-
-    }
-
-    private static void setDataSyncEnabledFlag(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
-                                               final NcmpServiceCmHandle targetNcmpServiceCmHandle,
-                                               final CmHandleValuesHolder cmHandleValuesHolder) {
-        cmHandleValuesHolder.getOldValues().setDataSyncEnabled(getDataSyncEnabledFlag(currentNcmpServiceCmHandle));
-        cmHandleValuesHolder.getNewValues().setDataSyncEnabled(getDataSyncEnabledFlag(targetNcmpServiceCmHandle));
-
-    }
-
-    private static void setCmHandleStateChange(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
-                                               final NcmpServiceCmHandle targetNcmpServiceCmHandle,
-                                               final CmHandleValuesHolder cmHandleValuesHolder) {
-        cmHandleValuesHolder.getOldValues()
-                .setCmHandleState(mapCmHandleStateToLcmEventCmHandleState(currentNcmpServiceCmHandle));
-        cmHandleValuesHolder.getNewValues()
-                .setCmHandleState(mapCmHandleStateToLcmEventCmHandleState(targetNcmpServiceCmHandle));
-    }
-
-    private static void setPublicCmHandlePropertiesChange(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
-                                                          final NcmpServiceCmHandle targetNcmpServiceCmHandle,
-                                                          final CmHandleValuesHolder cmHandleValuesHolder) {
-
-        final Map<String, Map<String, String>> publicCmHandlePropertiesDifference =
-                getPublicCmHandlePropertiesDifference(currentNcmpServiceCmHandle.getPublicProperties(),
-                    targetNcmpServiceCmHandle.getPublicProperties()
-                );
-        cmHandleValuesHolder.getOldValues()
-                .setCmHandleProperties(List.of(publicCmHandlePropertiesDifference.get("oldValues")));
-        cmHandleValuesHolder.getNewValues()
-                .setCmHandleProperties(List.of(publicCmHandlePropertiesDifference.get("newValues")));
-
-    }
-
-    private static Values.CmHandleState mapCmHandleStateToLcmEventCmHandleState(
-            final NcmpServiceCmHandle ncmpServiceCmHandle) {
-        return Values.CmHandleState.fromValue(ncmpServiceCmHandle.getCompositeState().getCmHandleState().name());
-    }
-
-    private static Boolean getDataSyncEnabledFlag(final NcmpServiceCmHandle ncmpServiceCmHandle) {
-        return ncmpServiceCmHandle.getCompositeState().getDataSyncEnabled();
-    }
-
-    private static boolean hasDataSyncEnabledFlagChanged(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
-                                                         final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
-        final Boolean currentDataSyncFlag = currentNcmpServiceCmHandle.getCompositeState().getDataSyncEnabled();
-        final Boolean targetDataSyncFlag = targetNcmpServiceCmHandle.getCompositeState().getDataSyncEnabled();
-
-        if (targetDataSyncFlag == null) {
-            return currentDataSyncFlag != null;
-        }
-
-        return !targetDataSyncFlag.equals(currentDataSyncFlag);
-    }
-
-    private static boolean hasCmHandleStateChanged(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
-                                                   final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
-        return targetNcmpServiceCmHandle.getCompositeState().getCmHandleState()
-                != currentNcmpServiceCmHandle.getCompositeState().getCmHandleState();
-    }
-
-    private static boolean arePublicCmHandlePropertiesEqual(final Map<String, String> currentCmHandleProperties,
-                                                            final Map<String, String> targetCmHandleProperties) {
-        if (targetCmHandleProperties.size() != currentCmHandleProperties.size()) {
-            return false;
-        }
-        return targetCmHandleProperties.equals(currentCmHandleProperties);
-    }
-
-    private static Map<String, Map<String, String>> getPublicCmHandlePropertiesDifference(
-           final Map<String, String> currentCmHandleProperties,
-           final Map<String, String> targetCmHandleProperties) {
-        final Map<String, Map<String, String>> oldAndNewPropertiesDifferenceMap = new HashMap<>(2);
-
-        final MapDifference<String, String> cmHandlePropertiesDifference =
-                Maps.difference(targetCmHandleProperties, currentCmHandleProperties);
-
-        final Map<String, String> oldValues = new HashMap<>(cmHandlePropertiesDifference.entriesOnlyOnRight());
-        final Map<String, String> newValues = new HashMap<>(cmHandlePropertiesDifference.entriesOnlyOnLeft());
-
-        cmHandlePropertiesDifference.entriesDiffering().keySet().forEach(cmHandlePropertyName -> {
-            oldValues.put(cmHandlePropertyName, currentCmHandleProperties.get(cmHandlePropertyName));
-            newValues.put(cmHandlePropertyName, targetCmHandleProperties.get(cmHandlePropertyName));
-        });
-
-        oldAndNewPropertiesDifferenceMap.put("oldValues", oldValues);
-        oldAndNewPropertiesDifferenceMap.put("newValues", newValues);
-
-        return oldAndNewPropertiesDifferenceMap;
-    }
-
-    @NoArgsConstructor
-    @Getter
-    @Setter
-    static class CmHandleValuesHolder {
-        private Values oldValues;
-        private Values newValues;
-    }
-}
index 7d116cd..d01ba4a 100644 (file)
@@ -27,7 +27,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import lombok.RequiredArgsConstructor;
-import org.onap.cps.events.EventsProducer;
+import org.onap.cps.events.EventProducer;
 import org.onap.cps.ncmp.events.avc.ncmp_to_client.Avc;
 import org.onap.cps.ncmp.events.avc.ncmp_to_client.AvcEvent;
 import org.onap.cps.ncmp.events.avc.ncmp_to_client.Data;
@@ -38,7 +38,7 @@ import org.springframework.stereotype.Service;
 @RequiredArgsConstructor
 public class InventoryEventProducer {
 
-    private final EventsProducer eventsProducer;
+    private final EventProducer eventProducer;
 
     @Value("${app.ncmp.avc.inventory-events-topic}")
     private String ncmpInventoryEventsTopicName;
@@ -61,7 +61,7 @@ public class InventoryEventProducer {
                                                  .build()
                                                  .asCloudEvent();
 
-        eventsProducer.sendCloudEvent(ncmpInventoryEventsTopicName, eventKey, avcCloudEvent);
+        eventProducer.sendCloudEvent(ncmpInventoryEventsTopicName, eventKey, avcCloudEvent);
     }
 
     private AvcEvent buildAvcEvent(final String attributeName,
@@ -84,4 +84,4 @@ public class InventoryEventProducer {
         extensions.put("correlationid", eventKey);
         return extensions;
     }
-}
\ No newline at end of file
+}
index 2eaabeb..a3b5646 100644 (file)
@@ -22,7 +22,7 @@
 package org.onap.cps.ncmp.impl.data
 
 import com.fasterxml.jackson.databind.ObjectMapper
-import org.onap.cps.events.EventsProducer
+import org.onap.cps.events.EventProducer
 import org.onap.cps.ncmp.api.data.models.CmResourceAddress
 import org.onap.cps.ncmp.api.data.models.DataOperationRequest
 import org.onap.cps.ncmp.api.exceptions.CmHandleNotFoundException
@@ -42,7 +42,6 @@ import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.test.context.SpringBootTest
 import org.springframework.http.HttpStatus
 import org.springframework.http.ResponseEntity
-import org.springframework.test.context.ContextConfiguration
 import reactor.core.publisher.Mono
 
 import static org.onap.cps.ncmp.api.NcmpResponseStatus.UNKNOWN_ERROR
@@ -56,8 +55,7 @@ import static org.onap.cps.ncmp.api.data.models.OperationType.UPDATE
 import static org.onap.cps.ncmp.impl.models.RequiredDmiService.DATA
 import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
 
-@SpringBootTest
-@ContextConfiguration(classes = [EventsProducer, CpsApplicationContext, DmiServiceAuthenticationProperties, DmiDataOperations, PolicyExecutor])
+@SpringBootTest(classes = [EventProducer, CpsApplicationContext, DmiServiceAuthenticationProperties, DmiDataOperations, PolicyExecutor])
 class DmiDataOperationsSpec extends DmiOperationsBaseSpec {
 
     def NO_TOPIC = null
@@ -73,17 +71,17 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec {
     DmiDataOperations objectUnderTest
 
     @SpringBean
-    EventsProducer eventsProducer = Stub()
+    EventProducer eventProducerStub = Stub()
 
     @SpringBean
-    PolicyExecutor policyExecutor = Mock()
+    PolicyExecutor mockPolicyExecutor = Mock()
 
     @SpringBean
-    AlternateIdMatcher alternateIdMatcher = Mock()
+    AlternateIdMatcher mockAlternateIdMatcher = Mock()
 
     def 'Get resource data for #expectedDataStore from DMI without topic #scenario.'() {
         given: 'a cm handle for #cmHandleId'
-            alternateIdMatcher.getCmHandleId(cmHandleId) >> cmHandleId
+            mockAlternateIdMatcher.getCmHandleId(cmHandleId) >> cmHandleId
             mockYangModelCmHandleRetrieval(additionalProperties)
         and: 'a positive response from DMI service when it is called with the expected parameters'
             def responseFromDmi = Mono.just(new ResponseEntity<Object>('{some-key:some-value}', HttpStatus.OK))
@@ -127,7 +125,7 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec {
             def dataOperationBatchRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
             def dataOperationRequest = spiedJsonObjectMapper.convertJsonString(dataOperationBatchRequestJsonData, DataOperationRequest.class)
         and: 'no valid cm handles are found for the request'
-            alternateIdMatcher.getCmHandleId(_) >> { throw new CmHandleNotFoundException('') }
+            mockAlternateIdMatcher.getCmHandleId(_) >> { throw new CmHandleNotFoundException('') }
             mockInventoryPersistence.getYangModelCmHandles(_) >> []
         when: 'get resource data for group of cm handles is invoked'
             objectUnderTest.requestResourceDataFromDmi('my-topic-name', dataOperationRequest, 'requestId', NO_AUTH_HEADER)
@@ -143,7 +141,7 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec {
             dataOperationRequest.dataOperationDefinitions[0].cmHandleReferences = [cmHandleId]
         and: 'the sent cloud event will be captured'
             def actualDataOperationCloudEvent = null
-            eventsProducer.sendCloudEvent('my-topic-name', 'my-request-id', _) >> { args -> actualDataOperationCloudEvent = args[2] }
+            eventProducerStub.sendCloudEvent('my-topic-name', 'my-request-id', _) >> {args -> actualDataOperationCloudEvent = args[2] }
         and: 'a DMI client request exception is thrown when DMI service is called'
             mockDmiRestClient.asynchronousPostOperation(*_) >> { Mono.error(new DmiClientRequestException(123, '', '', UNKNOWN_ERROR)) }
         when: 'attempt to get resource data for group of cm handles is invoked'
@@ -174,7 +172,7 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec {
     def 'Write data for pass-through:running datastore in DMI.'() {
         given: 'a cm handle for #cmHandleId'
             mockYangModelCmHandleRetrieval([yangModelCmHandleProperty])
-            alternateIdMatcher.getCmHandleId(cmHandleId) >> cmHandleId
+            mockAlternateIdMatcher.getCmHandleId(cmHandleId) >> cmHandleId
         and: 'a positive response from DMI service when it is called with the expected parameters'
             def expectedUrlTemplateParameters = new UrlTemplateParameters('myServiceName/dmi/v1/ch/{cmHandleId}/data/ds/{datastore}?resourceIdentifier={resourceIdentifier}', ['resourceIdentifier': resourceIdentifier, 'datastore': 'ncmp-datastore:passthrough-running', 'cmHandleId': cmHandleId])
             def expectedJson = '{"operation":"' + expectedOperationInUrl + '","dataType":"some data type","data":"requestData","cmHandleProperties":{"prop1":"val1"},"moduleSetTag":""}'
@@ -185,7 +183,7 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec {
         then: 'the result is the response from the DMI service'
             assert result == responseFromDmi
         and: 'the permission was checked with the policy executor'
-            1 * policyExecutor.checkPermission(_, operation, NO_AUTH_HEADER, resourceIdentifier, 'requestData' )
+            1 * mockPolicyExecutor.checkPermission(_, operation, NO_AUTH_HEADER, resourceIdentifier, 'requestData' )
         where: 'the following operation is performed'
             operation || expectedOperationInUrl
             CREATE    || 'create'
index 906779c..efd0055 100644 (file)
@@ -24,7 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.mapstruct.factory.Mappers
-import org.onap.cps.events.EventsProducer
+import org.onap.cps.events.EventProducer
 import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent
 import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent
 import org.onap.cps.ncmp.utils.TestUtils
@@ -37,14 +37,14 @@ import org.springframework.test.annotation.DirtiesContext
 import org.testcontainers.spock.Testcontainers
 import java.time.Duration
 
-@SpringBootTest(classes = [EventsProducer, DmiAsyncRequestResponseEventConsumer, ObjectMapper, JsonObjectMapper])
+@SpringBootTest(classes = [EventProducer, DmiAsyncRequestResponseEventConsumer, ObjectMapper, JsonObjectMapper])
 @Testcontainers
 @DirtiesContext
 class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends MessagingBaseSpec {
 
     @SpringBean
-    EventsProducer cpsAsyncRequestResponseEventProducer =
-        new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
+    EventProducer cpsAsyncRequestResponseEventProducer =
+        new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
 
 
     @SpringBean
index 420da6f..9c32ce9 100644 (file)
@@ -29,7 +29,7 @@ import io.cloudevents.kafka.impl.KafkaHeaders
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.common.header.internals.RecordHeaders
-import org.onap.cps.events.EventsProducer
+import org.onap.cps.events.EventProducer
 import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent
 import org.onap.cps.ncmp.utils.TestUtils
 import org.onap.cps.ncmp.utils.events.MessagingBaseSpec
@@ -45,13 +45,13 @@ import java.time.Duration
 
 import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
 
-@SpringBootTest(classes = [EventsProducer, DataOperationEventConsumer, RecordFilterStrategies, JsonObjectMapper, ObjectMapper])
+@SpringBootTest(classes = [EventProducer, DataOperationEventConsumer, RecordFilterStrategies, JsonObjectMapper, ObjectMapper])
 @Testcontainers
 @DirtiesContext
 class DataOperationEventConsumerSpec extends MessagingBaseSpec {
 
     @SpringBean
-    EventsProducer asyncDataOperationEventProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
+    EventProducer asyncDataOperationEventProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
 
     @SpringBean
     DataOperationEventConsumer objectUnderTest = new DataOperationEventConsumer(asyncDataOperationEventProducer)
index 602a54e..49287d7 100644 (file)
@@ -21,7 +21,7 @@
 package org.onap.cps.ncmp.impl.data.async
 
 import io.cloudevents.core.builder.CloudEventBuilder
-import org.onap.cps.events.EventsProducer
+import org.onap.cps.events.EventProducer
 import org.onap.cps.ncmp.config.KafkaConfig
 import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent
 import org.onap.cps.ncmp.utils.events.ConsumerBaseSpec
@@ -42,7 +42,7 @@ import java.util.concurrent.TimeUnit
 class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec {
 
     @SpringBean
-    EventsProducer mockEventsProducer = Mock()
+    EventProducer mockEventProducer = Mock()
 
     @SpringBean
     NcmpAsyncRequestResponseEventMapper mapper = Stub()
@@ -61,7 +61,7 @@ class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec {
         then: 'wait a little for async processing of message (must wait to try to avoid false positives)'
             TimeUnit.MILLISECONDS.sleep(300)
         and: 'event is not consumed'
-            0 * mockEventsProducer.sendLegacyEvent(*_)
+            0 * mockEventProducer.sendLegacyEvent(*_)
     }
 
     def 'Legacy event consumer with valid legacy event.'() {
@@ -70,7 +70,7 @@ class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec {
         and: 'a flag to track the send event call'
             def sendEventMethodCalled = false
         and: 'the (mocked) events producer will use the flag to indicate if it is called'
-            mockEventsProducer.sendLegacyEvent(*_) >> {
+            mockEventProducer.sendLegacyEvent(*_) >> {
                 sendEventMethodCalled = true
             }
         when: 'send the cloud event'
@@ -90,7 +90,7 @@ class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec {
         and: 'a flag to track the sent event call'
             def sendEventMethodCalled = false
         and: 'the (mocked) events producer will use the flag to indicate if it is called'
-            mockEventsProducer.sendCloudEvent(*_) >> {
+            mockEventProducer.sendCloudEvent(*_) >> {
                 sendEventMethodCalled = true
             }
         when: 'send the cloud event'
@@ -114,7 +114,7 @@ class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec {
         then: 'wait a little for async processing of message (must wait to try to avoid false positives)'
             TimeUnit.MILLISECONDS.sleep(300)
         and: 'the event is not processed by this consumer'
-            0 * mockEventsProducer.sendCloudEvent(*_)
+            0 * mockEventProducer.sendCloudEvent(*_)
     }
 
 }
index b082945..5e671dc 100644 (file)
@@ -22,7 +22,7 @@ package org.onap.cps.ncmp.impl.data.async
 
 import com.fasterxml.jackson.databind.ObjectMapper
 import io.cloudevents.core.builder.CloudEventBuilder
-import org.onap.cps.events.EventsProducer
+import org.onap.cps.events.EventProducer
 import org.onap.cps.ncmp.config.KafkaConfig
 import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent
 import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent
@@ -46,7 +46,7 @@ import spock.util.concurrent.PollingConditions
 class SerializationIntegrationSpec extends ConsumerBaseSpec {
 
     @SpringBean
-    EventsProducer mockEventsProducer = Mock()
+    EventProducer mockEventProducer = Mock()
 
     @SpringBean
     NcmpAsyncRequestResponseEventMapper mapper = Stub() { toNcmpAsyncEvent(_) >> new NcmpAsyncRequestResponseEvent(eventId: 'my-event-id', eventTarget: 'some client topic')}
@@ -63,7 +63,7 @@ class SerializationIntegrationSpec extends ConsumerBaseSpec {
         and: 'a flag to track the send cloud event call'
             def sendCloudEventMethodCalled = false
         and: 'the (mocked) events producer will use the flag to indicate if it is called and will capture the cloud event'
-            mockEventsProducer.sendCloudEvent('some client topic', 'some-correlation-id', cloudEvent) >> {
+            mockEventProducer.sendCloudEvent('some client topic', 'some-correlation-id', cloudEvent) >> {
                 sendCloudEventMethodCalled = true
             }
         when: 'send the event'
@@ -80,7 +80,7 @@ class SerializationIntegrationSpec extends ConsumerBaseSpec {
         and: 'a flag to track the send event call'
             def sendEventMethodCalled = false
         and: 'the (mocked) events producer will use the flag to indicate if it is called and will capture the event'
-            mockEventsProducer.sendLegacyEvent(*_) >> {
+            mockEventProducer.sendLegacyEvent(*_) >> {
                 sendEventMethodCalled = true
             }
         when: 'send the event'
index b55959d..0d6fda3 100644 (file)
 package org.onap.cps.ncmp.impl.data.utils
 
 import com.fasterxml.jackson.databind.ObjectMapper
-import io.cloudevents.CloudEvent
 import io.cloudevents.kafka.CloudEventDeserializer
 import io.cloudevents.kafka.impl.KafkaHeaders
 import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.onap.cps.events.EventsProducer
+import org.onap.cps.events.EventProducer
 import org.onap.cps.ncmp.api.data.models.DataOperationRequest
 import org.onap.cps.ncmp.api.data.models.OperationType
 import org.onap.cps.ncmp.api.inventory.models.CompositeStateBuilder
@@ -46,7 +45,7 @@ import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.ADVISED
 import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.READY
 import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
 
-@ContextConfiguration(classes = [EventsProducer, CpsApplicationContext])
+@ContextConfiguration(classes = [EventProducer, CpsApplicationContext])
 class DmiDataOperationsHelperSpec extends MessagingBaseSpec {
 
     def static clientTopic = 'my-topic-name'
@@ -56,7 +55,7 @@ class DmiDataOperationsHelperSpec extends MessagingBaseSpec {
     JsonObjectMapper jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
 
     @SpringBean
-    EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
+    EventProducer eventsProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
 
     def 'Process per data operation request with #serviceName.'() {
         given: 'data operation request with 3 operations'
index eedc961..e8b860f 100644 (file)
@@ -27,7 +27,7 @@ import io.cloudevents.kafka.CloudEventDeserializer
 import io.cloudevents.kafka.impl.KafkaHeaders
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.onap.cps.events.EventsProducer
+import org.onap.cps.events.EventProducer
 import org.onap.cps.ncmp.api.inventory.models.CompositeState
 import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent
 import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
@@ -45,13 +45,13 @@ import java.time.Duration
 import static org.onap.cps.ncmp.utils.TestUtils.getResourceFileContent
 import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
 
-@SpringBootTest(classes = [EventsProducer, CmAvcEventConsumer, ObjectMapper, JsonObjectMapper])
+@SpringBootTest(classes = [EventProducer, CmAvcEventConsumer, ObjectMapper, JsonObjectMapper])
 @Testcontainers
 @DirtiesContext
 class CmAvcEventConsumerSpec extends MessagingBaseSpec {
 
     @SpringBean
-    EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
+    EventProducer eventsProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
 
     def mockCmAvcEventService = Mock(CmAvcEventService)
     def mockInventoryPersistence = Mock(InventoryPersistence)
index 32f4eb8..09e91fe 100644 (file)
@@ -23,7 +23,7 @@ package org.onap.cps.ncmp.impl.datajobs.subscription.dmi
 import com.fasterxml.jackson.databind.ObjectMapper
 import io.cloudevents.CloudEvent
 import io.cloudevents.core.v1.CloudEventBuilder
-import org.onap.cps.events.EventsProducer
+import org.onap.cps.events.EventProducer
 import org.onap.cps.ncmp.config.CpsApplicationContext
 import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.CmHandle
 import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.Data
@@ -31,16 +31,14 @@ import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscript
 import org.onap.cps.ncmp.utils.events.CloudEventMapper
 import org.onap.cps.utils.JsonObjectMapper
 import org.springframework.boot.test.context.SpringBootTest
-import org.springframework.test.context.ContextConfiguration
 import spock.lang.Specification
 
-@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper, CloudEventBuilder])
-@ContextConfiguration(classes = [CpsApplicationContext])
+@SpringBootTest(classes = [CpsApplicationContext, ObjectMapper, JsonObjectMapper, CloudEventBuilder])
 class EventProducerSpec extends Specification {
 
-    def mockEventsProducer = Mock(EventsProducer)
+    def mockEventProducer = Mock(EventProducer)
 
-    def objectUnderTest = new EventProducer(mockEventsProducer)
+    def objectUnderTest = new DmiEventProducer(mockEventProducer)
 
     def 'Create and Send Cm Notification Subscription DMI In Event'() {
         given: 'a cm subscription for a dmi plugin'
@@ -53,7 +51,7 @@ class EventProducerSpec extends Specification {
         when: 'the event is sent'
             objectUnderTest.send(subscriptionId, dmiPluginName, eventType, dmiInEvent)
         then: 'the event contains the required attributes'
-            1 * mockEventsProducer.sendCloudEvent(_, _, _) >> {
+            1 * mockEventProducer.sendCloudEvent(_, _, _) >> {
                 args ->
                     {
                         assert args[0] == 'dmiplugin-test-topic'
index 14dc906..a098375 100644 (file)
 
 package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp
 
-import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED
-import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.REJECTED
-
 import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataSelector
+import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiEventProducer
 import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventMapper
-import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.EventProducer
 import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent
 import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService
 import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
@@ -34,11 +31,14 @@ import org.onap.cps.ncmp.impl.utils.AlternateIdMatcher
 import org.onap.cps.ncmp.impl.utils.JexParser
 import spock.lang.Specification
 
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.REJECTED
+
 class CmSubscriptionHandlerImplSpec extends Specification {
 
     def mockCmSubscriptionPersistenceService = Mock(CmDataJobSubscriptionPersistenceService)
     def mockDmiInEventMapper = Mock(DmiInEventMapper)
-    def mockDmiInEventProducer = Mock(EventProducer)
+    def mockDmiInEventProducer = Mock(DmiEventProducer)
     def mockInventoryPersistence = Mock(InventoryPersistence)
     def mockAlternateIdMatcher = Mock(AlternateIdMatcher)
 
@@ -272,4 +272,4 @@ class CmSubscriptionHandlerImplSpec extends Specification {
     def getFdn(dataNodeSelector) {
         return JexParser.extractFdnPrefix(dataNodeSelector).orElse("")
     }
-}
\ No newline at end of file
+}
index bf8d851..dad0411 100644 (file)
@@ -35,8 +35,7 @@ import org.onap.cps.api.model.DataNode
 import org.onap.cps.impl.DataNodeBuilder
 import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
 import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
-import org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventsHelper
-import org.onap.cps.utils.ContentType
+import org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventProducer
 import org.onap.cps.utils.JsonObjectMapper
 import org.slf4j.LoggerFactory
 import spock.lang.Specification
@@ -56,9 +55,9 @@ class CmHandleRegistrationServicePropertyHandlerSpec extends Specification {
     def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
     def mockAlternateIdChecker = Mock(AlternateIdChecker)
     def mockCmHandleIdPerAlternateId = Mock(IMap)
-    def mockLcmEventsHelper = Mock(LcmEventsHelper)
+    def mockLcmEventProducer = Mock(LcmEventProducer)
 
-    def objectUnderTest = new CmHandleRegistrationServicePropertyHandler(mockInventoryPersistence, mockCpsDataService, jsonObjectMapper, mockAlternateIdChecker, mockCmHandleIdPerAlternateId, mockLcmEventsHelper)
+    def objectUnderTest = new CmHandleRegistrationServicePropertyHandler(mockInventoryPersistence, mockCpsDataService, jsonObjectMapper, mockAlternateIdChecker, mockCmHandleIdPerAlternateId, mockLcmEventProducer)
     def logger = Spy(ListAppender<ILoggingEvent>)
 
     void setup() {
@@ -253,7 +252,7 @@ class CmHandleRegistrationServicePropertyHandlerSpec extends Specification {
         then:   'the update node leaves method is invoked once with correct parameters'
             1 * mockInventoryPersistence.updateCmHandleField('cmHandleId', 'data-producer-identifier', 'New Data Producer Identifier')
         and:    'LCM event is sent'
-            1 * mockLcmEventsHelper.sendLcmEventBatchAsynchronously({ cmHandleTransitionPairs ->
+            1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously({cmHandleTransitionPairs ->
                 assert cmHandleTransitionPairs[0].targetYangModelCmHandle.dataProducerIdentifier == 'New Data Producer Identifier'
             })
         where: 'the following scenarios are used'
@@ -272,7 +271,7 @@ class CmHandleRegistrationServicePropertyHandlerSpec extends Specification {
         then: 'the update node leaves method is not invoked'
             0 * mockCpsDataService.updateNodeLeaves(*_)
         and: 'No LCM events are sent'
-            0 * mockLcmEventsHelper.sendLcmEventBatchAsynchronously(*_)
+            0 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(*_)
         and: 'debug information is logged'
             def loggingEvent = logger.list[0]
             assert loggingEvent.level == Level.DEBUG
@@ -291,7 +290,7 @@ class CmHandleRegistrationServicePropertyHandlerSpec extends Specification {
         then: 'the update node leaves method is invoked once with correct parameters'
             1 * mockInventoryPersistence.updateCmHandleField('cmHandleId', 'data-producer-identifier', 'newDataProducerIdentifier')
         and: 'LCM event is sent'
-            1 * mockLcmEventsHelper.sendLcmEventBatchAsynchronously( { cmHandleTransitionPairs ->
+            1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously( {cmHandleTransitionPairs ->
                 assert cmHandleTransitionPairs[0].targetYangModelCmHandle.dataProducerIdentifier == 'newDataProducerIdentifier'
                 assert cmHandleTransitionPairs[0].currentYangModelCmHandle.dataProducerIdentifier == 'oldDataProducerIdentifier'
             })
@@ -307,7 +306,7 @@ class CmHandleRegistrationServicePropertyHandlerSpec extends Specification {
         then: 'the update node leaves method is not invoked'
             0 * mockCpsDataService.updateNodeLeaves(*_)
         and: 'No LCM events are sent'
-            0 * mockLcmEventsHelper.sendLcmEventBatchAsynchronously(*_)
+            0 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(*_)
         and: 'warning is logged'
             def lastLoggingEvent = logger.list[0]
             assert lastLoggingEvent.level == Level.WARN
index b409a1a..f1e3daf 100644 (file)
@@ -65,14 +65,14 @@ class CmHandleStateMonitorSpec extends Specification {
             1 * mockCmHandlesByState.putIfAbsent("lockedCmHandlesCount", expectedValue)
             1 * mockCmHandlesByState.putIfAbsent("deletingCmHandlesCount", expectedValue)
         where:
-            scenario                                 | queryResult                 || expectedValue
-            'query service returns zero cm handle id'| []                          || 0
-            'query service returns 1 cm handle id'   | ['someId']                  || 1
+            scenario                                 | queryResult || expectedValue
+            'query service returns zero cm handle id'| []          || 0
+            'query service returns 1 cm handle id'   | ['someId']  || 1
     }
 
 
     def 'Update cm handle state metric'() {
-        given: 'a collection of cm handle state pair'
+        given: 'a cm handle state pair'
             def cmHandleTransitionPair = new CmHandleTransitionPair(new YangModelCmHandle(compositeState: new CompositeState(cmHandleState: ADVISED)),
                                                                     new YangModelCmHandle(compositeState: new CompositeState(cmHandleState: READY))
             )
@@ -99,9 +99,9 @@ class CmHandleStateMonitorSpec extends Specification {
         then: 'the new value is as expected'
             assert entryProcessingMap.get(key) == expectedValue
         where: 'the following data is used'
-            scenario                        | key                 || expectedValue
-            'current value of count is zero'| 'zeroCmHandlesCount'|| 0
-            'current value of count is >0'  | 'tenCmHandlesCount' || 9
+            scenario                        | key                  || expectedValue
+            'current value of count is zero'| 'zeroCmHandlesCount' || 0
+            'current value of count is >0'  | 'tenCmHandlesCount'  || 9
     }
 
     def 'Applying increasing entry processor to a key on map'() {
@@ -23,7 +23,7 @@ package org.onap.cps.ncmp.impl.inventory.sync.lcm
 import com.fasterxml.jackson.databind.ObjectMapper
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.common.serialization.StringDeserializer
-import org.onap.cps.events.EventsProducer
+import org.onap.cps.events.EventProducer
 import org.onap.cps.events.LegacyEvent
 import org.onap.cps.ncmp.events.lcm.v1.Event
 import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
@@ -42,19 +42,17 @@ import java.time.Duration
 @SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
 @Testcontainers
 @DirtiesContext
-class EventsProducerSpec extends MessagingBaseSpec {
+class EventProducerSpec extends MessagingBaseSpec {
 
     def legacyEventKafkaConsumer = new KafkaConsumer<String, LegacyEvent>(eventConsumerConfigProperties('ncmp-group', StringDeserializer))
-
     def testTopic = 'ncmp-events-test'
 
     @SpringBean
-    EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
+    EventProducer eventProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
 
     @Autowired
     JsonObjectMapper jsonObjectMapper
 
-
     def 'Produce and Consume Event'() {
         given: 'event key and event data'
             def eventKey = 'lcm'
@@ -86,20 +84,19 @@ class EventsProducerSpec extends MessagingBaseSpec {
         and: 'consumer has a subscription'
             legacyEventKafkaConsumer.subscribe([testTopic] as List<String>)
         when: 'an event is sent'
-            eventsProducer.sendLegacyEvent(testTopic, eventKey, eventHeader, eventData)
+            eventProducer.sendLegacyEvent(testTopic, eventKey, eventHeader, eventData)
         and: 'topic is polled'
             def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))
         then: 'poll returns one record'
             assert records.size() == 1
         and: 'record key matches the expected event key'
-            def record = records.iterator().next()
-            assert eventKey == record.key
+            assert eventKey == records[0].key
         and: 'record matches the expected event'
             def expectedJsonString = TestUtils.getResourceFileContent('expectedLcmEvent.json')
             def expectedLcmEvent = jsonObjectMapper.convertJsonString(expectedJsonString, LcmEvent.class)
-            assert expectedLcmEvent == jsonObjectMapper.convertJsonString(record.value, LcmEvent.class)
+            assert expectedLcmEvent == jsonObjectMapper.convertJsonString(records[0].value, LcmEvent.class)
         and: 'record header matches the expected parameters'
-            assert SerializationUtils.deserialize(record.headers().lastHeader('eventId').value()) == eventId
-            assert SerializationUtils.deserialize(record.headers().lastHeader('eventCorrelationId').value()) == eventCorrelationId
+            assert SerializationUtils.deserialize(records[0].headers().lastHeader('eventId').value()) == eventId
+            assert SerializationUtils.deserialize(records[0].headers().lastHeader('eventCorrelationId').value()) == eventCorrelationId
     }
 }
@@ -20,7 +20,6 @@
 
 package org.onap.cps.ncmp.impl.inventory.sync.lcm
 
-import org.mapstruct.factory.Mappers
 import org.onap.cps.ncmp.api.inventory.models.CmHandleState
 import org.onap.cps.ncmp.api.inventory.models.CompositeState
 import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
@@ -31,21 +30,17 @@ import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.ADVISED
 import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.DELETING
 import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.READY
 
-class LcmEventsProducerHelperSpec extends Specification {
+class LcmEventObjectCreatorSpec extends Specification {
 
-    LcmEventHeaderMapper lcmEventsHeaderMapper = Mappers.getMapper(LcmEventHeaderMapper)
-
-    def objectUnderTest = new LcmEventsProducerHelper(lcmEventsHeaderMapper)
+    def objectUnderTest = new LcmEventObjectCreator()
     def cmHandleId = 'test-cm-handle'
 
     def 'Map the LcmEvent for #operation'() {
         given: 'NCMP cm handle details with current and old properties'
-            def currentNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: true, cmHandleState: currentCmHandleState),
-                    publicProperties: currentPublicProperties)
-            def targetNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: true, cmHandleState: targetCmHandleState),
-                publicProperties: targetPublicProperties)
+            def currentNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: true, cmHandleState: currentCmHandleState), publicProperties: currentPublicProperties)
+            def targetNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: true, cmHandleState: targetCmHandleState), publicProperties: targetPublicProperties)
         when: 'the lcm event is created'
-            def result = objectUnderTest.createLcmEvent(cmHandleId, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
+            def result = objectUnderTest.createLcmEvent(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
         then: 'event header is mapped correctly'
             assert result.eventSource == 'org.onap.ncmp'
             assert result.eventCorrelationId == cmHandleId
@@ -73,7 +68,7 @@ class LcmEventsProducerHelperSpec extends Specification {
             def targetNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: true, cmHandleState: READY),
                     publicProperties: publicProperties)
         when: 'the lcm event is created'
-            def result = objectUnderTest.createLcmEvent(cmHandleId, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
+            def result = objectUnderTest.createLcmEvent(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
         then: 'Properties are just the one which are same'
             assert result.event.oldValues == null
             assert result.event.newValues == null
@@ -85,7 +80,7 @@ class LcmEventsProducerHelperSpec extends Specification {
                 publicProperties: ['publicProperty1': 'value11', 'publicProperty2': 'value22'])
             def currentNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, publicProperties: ['publicProperty1': 'value1', 'publicProperty2': 'value2'])
         when: 'the lcm event is created'
-            def result = objectUnderTest.createLcmEvent(cmHandleId, currentNcmpServiceCmHandle, targetNcmpServiceCmhandle)
+            def result = objectUnderTest.createLcmEvent(currentNcmpServiceCmHandle, targetNcmpServiceCmhandle)
         then: 'event header is mapped correctly'
             assert result.eventSource == 'org.onap.ncmp'
             assert result.eventCorrelationId == cmHandleId
@@ -106,7 +101,7 @@ class LcmEventsProducerHelperSpec extends Specification {
             def currentNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: true, cmHandleState: DELETING),
                 publicProperties: ['publicProperty1': 'value1'])
         when: 'the lcm event is created'
-            def result = objectUnderTest.createLcmEvent(cmHandleId, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
+            def result = objectUnderTest.createLcmEvent(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
         then: 'event header is mapped correctly'
             assert result.eventSource == 'org.onap.ncmp'
             assert result.eventCorrelationId == cmHandleId
@@ -122,7 +117,7 @@ class LcmEventsProducerHelperSpec extends Specification {
             def currentNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: currentDataSyncEnableFlag, cmHandleState: ADVISED))
             def targetNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: targetDataSyncEnableFlag, cmHandleState: READY))
         when: 'the lcm event is created'
-            def result = objectUnderTest.createLcmEvent(cmHandleId, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
+            def result = objectUnderTest.createLcmEvent(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
         then: 'event header is mapped correctly'
             assert result.eventSource == 'org.onap.ncmp'
             assert result.eventCorrelationId == cmHandleId
@@ -150,7 +145,7 @@ class LcmEventsProducerHelperSpec extends Specification {
             def currentNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: currentDataSyncEnableFlag, cmHandleState: ADVISED))
             def targetNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: targetDataSyncEnableFlag, cmHandleState: READY))
         when: 'the lcm event is created'
-            def result = objectUnderTest.createLcmEvent(cmHandleId, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
+            def result = objectUnderTest.createLcmEvent(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
         then: 'the data sync flag is not present in the event'
             assert result.event.oldValues.dataSyncEnabled == null
             assert result.event.newValues.dataSyncEnabled == null
@@ -161,23 +156,12 @@ class LcmEventsProducerHelperSpec extends Specification {
             'null to null'   | null                      | null
     }
 
-    def 'Map the LcmEventHeader'() {
-        given: 'NCMP cm handle details with current and old details'
-            def currentNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(cmHandleState: ADVISED))
-            def targetNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(cmHandleState: READY))
-        when: 'the lcm event header is created'
-            def result = objectUnderTest.createLcmEventHeader(cmHandleId, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
-        then: 'the header field are populated'
-            assert result.eventCorrelationId == cmHandleId
-            assert result.eventId != null
-    }
-
     def 'Map the LcmEvent for alternate ID, data producer identifier, and module set tag when they contain #scenario'() {
         given: 'NCMP cm handle details with current and old values for alternate ID, module set tag, and data producer identifier'
             def currentNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, alternateId: currentAlternateId, moduleSetTag: currentModuleSetTag, dataProducerIdentifier: currentDataProducerIdentifier, compositeState: new CompositeState(dataSyncEnabled: false))
             def targetNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, alternateId: targetAlternateId, moduleSetTag: targetModuleSetTag, dataProducerIdentifier: targetDataProducerIdentifier, compositeState: new CompositeState(dataSyncEnabled: false))
         when: 'the lcm event is created'
-            def result = objectUnderTest.createLcmEvent(cmHandleId, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
+            def result = objectUnderTest.createLcmEvent(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
         then: 'the alternate ID, module set tag, and data producer identifier are present or are an empty string in the payload'
             assert result.event.alternateId == targetAlternateId
             assert result.event.moduleSetTag == targetModuleSetTag
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventProducerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventProducerSpec.groovy
new file mode 100644 (file)
index 0000000..bfcff01
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.inventory.sync.lcm
+
+import io.micrometer.core.instrument.Tag
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry
+import org.onap.cps.events.EventProducer
+import org.onap.cps.ncmp.api.inventory.models.CompositeState
+import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
+import org.springframework.kafka.KafkaException
+import spock.lang.Specification
+
+import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.ADVISED
+import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.READY
+
+class LcmEventProducerSpec extends Specification {
+
+    def mockEventProducer = Mock(EventProducer)
+    def lcmEventObjectCreator = new LcmEventObjectCreator()
+    def meterRegistry = new SimpleMeterRegistry()
+
+    def objectUnderTest = new LcmEventProducer(mockEventProducer, lcmEventObjectCreator, meterRegistry)
+
+    def cmHandleTransitionPair = new CmHandleTransitionPair(
+        new YangModelCmHandle(id: 'ch-1', compositeState: new CompositeState(cmHandleState: ADVISED), additionalProperties: [], publicProperties: []),
+        new YangModelCmHandle(id: 'ch-1', compositeState: new CompositeState(cmHandleState: READY), additionalProperties: [], publicProperties: [])
+    )
+
+    def 'Create and send lcm event where notifications are #scenario.'() {
+        given: 'notificationsEnabled is #notificationsEnabled'
+            objectUnderTest.notificationsEnabled = notificationsEnabled
+        when: 'service is called to send a batch of lcm events'
+            objectUnderTest.sendLcmEventBatchAsynchronously([cmHandleTransitionPair])
+        then: 'producer is called #expectedTimesMethodCalled times with correct identifiers'
+            expectedTimesMethodCalled * mockEventProducer.sendLegacyEvent(_, 'ch-1', _, _) >> {
+                args -> {
+                    def eventHeaders = args[2]
+                    assert UUID.fromString(eventHeaders.get('eventId')) != null
+                    assert eventHeaders.get('eventCorrelationId') == 'ch-1'
+                }
+            }
+        and: 'metrics are recorded with correct tags'
+            def timer = meterRegistry.find('cps.ncmp.lcm.events.send').timer()
+            if (notificationsEnabled) {
+                assert timer.count() == 1
+                assert timer.id.tags.containsAll(Tag.of('oldCmHandleState', 'ADVISED'), Tag.of('newCmHandleState', 'READY'))
+            } else {
+                assert timer == null
+            }
+        where: 'the following values are used'
+            scenario   | notificationsEnabled || expectedTimesMethodCalled
+            'enabled'  | true                 || 1
+            'disabled' | false                || 0
+    }
+
+    def 'Exception while sending message.'(){
+        given: 'notifications are enabled'
+            objectUnderTest.notificationsEnabled = true
+        when: 'producer set to throw an exception'
+            mockEventProducer.sendLegacyEvent(*_) >> { throw new KafkaException('sending failed')}
+        and: 'attempt to send events'
+            objectUnderTest.sendLcmEventBatchAsynchronously([cmHandleTransitionPair])
+        then: 'the exception is just logged and not bubbled up'
+            noExceptionThrown()
+        and: 'metrics are not recorded'
+            assert  meterRegistry.find('cps.ncmp.lcm.events.send').timer() == null
+    }
+
+}
index 827af61..f5c0fea 100644 (file)
@@ -56,12 +56,10 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
     }
 
     def mockInventoryPersistence = Mock(InventoryPersistence)
-    def mockLcmEventsCreator = Mock(LcmEventsProducerHelper)
-    def mockLcmEventsProducer = Mock(LcmEventsProducer)
+    def mockLcmEventProducer = Mock(LcmEventProducer)
     def mockCmHandleStateMonitor = Mock(CmHandleStateMonitor)
 
-    def lcmEventsHelper = new LcmEventsHelper(mockLcmEventsCreator, mockLcmEventsProducer)
-    def objectUnderTest = new LcmEventsCmHandleStateHandlerImpl(mockInventoryPersistence, lcmEventsHelper, mockCmHandleStateMonitor)
+    def objectUnderTest = new LcmEventsCmHandleStateHandlerImpl(mockInventoryPersistence, mockLcmEventProducer, mockCmHandleStateMonitor)
 
     def cmHandleId = 'cmhandle-id-1'
     def currentCompositeState
@@ -82,7 +80,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
             assert loggingEvent.level == Level.DEBUG
             assert loggingEvent.formattedMessage == "${cmHandleId} is now in ${toCmHandleState} state"
         and: 'event service is called to send event'
-            1 * mockLcmEventsProducer.sendLcmEvent(cmHandleId, _, _)
+            1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_)
         where: 'state change parameters are provided'
             stateChange           | fromCmHandleState | toCmHandleState
             'ADVISED to READY'    | ADVISED           | READY
@@ -99,7 +97,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
         then: 'CM-handle is saved using inventory persistence'
             1 * mockInventoryPersistence.saveCmHandleBatch(List.of(yangModelCmHandle))
         and: 'event service is called to send event'
-            1 * mockLcmEventsProducer.sendLcmEvent(cmHandleId, _, _)
+            1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_)
         and: 'a log entry is written'
             assert getLogMessage(0) == "${cmHandleId} is now in ADVISED state"
     }
@@ -115,7 +113,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
                     assert cmHandleStatePerCmHandleId.get(cmHandleId).lockReason.details == 'some lock details'
                 })
         and: 'event service is called to send event'
-            1 * mockLcmEventsProducer.sendLcmEvent(cmHandleId, _, _)
+            1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_)
         and: 'a log entry is written'
             assert getLogMessage(0) == "${cmHandleId} is now in ADVISED state"
     }
@@ -134,7 +132,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
                     assert cmHandleStatePerCmHandleId.get(cmHandleId).dataStores.operationalDataStore.dataStoreSyncState == DataStoreSyncState.NONE_REQUESTED
                 })
         and: 'event service is called to send event'
-            1 * mockLcmEventsProducer.sendLcmEvent(cmHandleId, _, _)
+            1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_)
         and: 'a log entry is written'
             assert getLogMessage(0) == "${cmHandleId} is now in READY state"
     }
@@ -150,7 +148,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
         and: 'method to persist cm handle state is called once'
             1 * mockInventoryPersistence.saveCmHandleStateBatch([(cmHandleId): yangModelCmHandle.compositeState])
         and: 'the method to send Lcm event is called once'
-            1 * mockLcmEventsProducer.sendLcmEvent(cmHandleId, _, _)
+            1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_)
     }
 
     def 'Update cmHandle state to DELETING to DELETED' (){
@@ -162,7 +160,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
         then: 'the cm handle state is as expected'
             yangModelCmHandle.getCompositeState().getCmHandleState() == DELETED
         and: 'the method to send Lcm event is called once'
-            1 * mockLcmEventsProducer.sendLcmEvent(cmHandleId, _, _)
+            1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_)
     }
 
     def 'No state change and no event to be sent'() {
@@ -174,7 +172,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
             1 * mockInventoryPersistence.saveCmHandleBatch(EMPTY_LIST)
             1 * mockInventoryPersistence.saveCmHandleStateBatch(EMPTY_MAP)
         and: 'no event will be sent'
-            0 * mockLcmEventsProducer.sendLcmEvent(*_)
+            0 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(*_)
     }
 
     def 'Batch of new cm handles provided'() {
@@ -188,8 +186,8 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
                 })
         and: 'no state updates are persisted'
             1 * mockInventoryPersistence.saveCmHandleStateBatch(EMPTY_MAP)
-        and: 'event service is called to send events'
-            2 * mockLcmEventsProducer.sendLcmEvent(_, _, _)
+        and: 'event service is called once to send 1 batch of 2 events (TODO Confirm size)'
+            1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_)
         and: 'two log entries are written'
             assert getLogMessage(0) == 'cmhandle1 is now in ADVISED state'
             assert getLogMessage(1) == 'cmhandle2 is now in ADVISED state'
@@ -206,8 +204,8 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
                 })
         and: 'no new handles are persisted'
             1 * mockInventoryPersistence.saveCmHandleBatch(EMPTY_LIST)
-        and: 'event service is called to send events'
-            2 * mockLcmEventsProducer.sendLcmEvent(_, _, _)
+        and: 'event service is called once to send 1 batch of 2 events (TODO Confirm size)'
+            1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_)
         and: 'two log entries are written'
             assert getLogMessage(0) == 'cmhandle1 is now in READY state'
             assert getLogMessage(1) == 'cmhandle2 is now in DELETING state'
@@ -222,8 +220,8 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
             1 * mockInventoryPersistence.saveCmHandleStateBatch(EMPTY_MAP)
         and: 'no new handles are persisted'
             1 * mockInventoryPersistence.saveCmHandleBatch(EMPTY_LIST)
-        and: 'event service is called to send events'
-            2 * mockLcmEventsProducer.sendLcmEvent(_, _, _)
+        and: 'event service is called once to send 1 batch of 2 events (TODO Confirm size)'
+            1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_)
         and: 'two log entries are written'
             assert getLogMessage(0) == 'cmhandle1 is now in DELETED state'
             assert getLogMessage(1) == 'cmhandle2 is now in DELETED state'
@@ -239,35 +237,29 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
         then: 'the exception is not handled'
             thrown(RuntimeException)
         and: 'no events are sent'
-            0 * mockLcmEventsProducer.sendLcmEvent(_, _, _)
+            0 * mockLcmEventProducer.sendLcmEvent(*_)
         and: 'no log entries are written'
             assert logAppender.list.empty
     }
 
     def setupBatch(type) {
-
         def yangModelCmHandle1 = new YangModelCmHandle(id: 'cmhandle1', additionalProperties: [], publicProperties: [])
         def yangModelCmHandle2 = new YangModelCmHandle(id: 'cmhandle2', additionalProperties: [], publicProperties: [])
-
         switch (type) {
             case 'NEW':
                 return [yangModelCmHandle1, yangModelCmHandle2]
-
             case 'DELETED':
                 yangModelCmHandle1.compositeState = new CompositeState(cmHandleState: READY)
                 yangModelCmHandle2.compositeState = new CompositeState(cmHandleState: READY)
                 return [(yangModelCmHandle1): DELETED, (yangModelCmHandle2): DELETED]
-
             case 'UPDATE':
                 yangModelCmHandle1.compositeState = new CompositeState(cmHandleState: ADVISED)
                 yangModelCmHandle2.compositeState = new CompositeState(cmHandleState: READY)
                 return [(yangModelCmHandle1): READY, (yangModelCmHandle2): DELETING]
-
             case 'NO_CHANGE':
                 yangModelCmHandle1.compositeState = new CompositeState(cmHandleState: ADVISED)
                 yangModelCmHandle2.compositeState = new CompositeState(cmHandleState: READY)
                 return [(yangModelCmHandle1): ADVISED, (yangModelCmHandle2): READY]
-
             default:
                 throw new IllegalArgumentException("batch type '${type}' not recognized")
         }
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerSpec.groovy
deleted file mode 100644 (file)
index 4bcb89a..0000000
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.inventory.sync.lcm
-
-import static org.onap.cps.ncmp.events.lcm.v1.Values.CmHandleState.ADVISED
-import static org.onap.cps.ncmp.events.lcm.v1.Values.CmHandleState.READY
-
-import io.micrometer.core.instrument.Tag
-import io.micrometer.core.instrument.simple.SimpleMeterRegistry
-import org.onap.cps.events.EventsProducer
-import org.onap.cps.ncmp.events.lcm.v1.Event
-import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
-import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader
-import org.onap.cps.ncmp.events.lcm.v1.Values
-import org.onap.cps.utils.JsonObjectMapper
-import org.springframework.kafka.KafkaException
-import spock.lang.Specification
-
-class LcmEventsProducerSpec extends Specification {
-
-    def mockLcmEventsProducer = Mock(EventsProducer)
-    def mockJsonObjectMapper = Mock(JsonObjectMapper)
-    def meterRegistry = new SimpleMeterRegistry()
-
-    def objectUnderTest = new LcmEventsProducer(mockLcmEventsProducer, mockJsonObjectMapper, meterRegistry)
-
-    def 'Create and send lcm event where events are #scenario'() {
-        given: 'a cm handle id and Lcm Event'
-            def cmHandleId = 'test-cm-handle-id'
-            def eventId = UUID.randomUUID().toString()
-            def event = getEventWithCmHandleState(ADVISED, READY)
-            def lcmEvent = new LcmEvent(event: event, eventId: eventId, eventCorrelationId: cmHandleId)
-        and: 'we also have a lcm event header'
-            def lcmEventHeader = new LcmEventHeader(eventId: eventId, eventCorrelationId: cmHandleId)
-        and: 'notificationsEnabled is #notificationsEnabled and it will be true as default'
-            objectUnderTest.notificationsEnabled = notificationsEnabled
-        and: 'lcm event header is transformed to headers map'
-            mockJsonObjectMapper.convertToValueType(lcmEventHeader, Map.class) >> ['eventId': eventId, 'eventCorrelationId': cmHandleId]
-        when: 'service is called to send lcm event'
-            objectUnderTest.sendLcmEvent('test-cm-handle-id', lcmEvent, lcmEventHeader)
-        then: 'producer is called #expectedTimesMethodCalled times'
-            expectedTimesMethodCalled * mockLcmEventsProducer.sendLegacyEvent(_, cmHandleId, _, lcmEvent) >> {
-                args -> {
-                    def eventHeaders = (args[2] as Map<String,Object>)
-                    assert eventHeaders.containsKey('eventId')
-                    assert eventHeaders.containsKey('eventCorrelationId')
-                    assert eventHeaders.get('eventId') == eventId
-                    assert eventHeaders.get('eventCorrelationId') == cmHandleId
-                }
-            }
-        and: 'metrics are recorded with correct tags'
-            def timer = meterRegistry.find('cps.ncmp.lcm.events.send').timer()
-            if (notificationsEnabled) {
-                assert timer != null
-                assert timer.count() == expectedTimesMethodCalled
-                def tags = timer.getId().getTags()
-                assert tags.containsAll(Tag.of('oldCmHandleState', ADVISED.value()), Tag.of('newCmHandleState', READY.value()))
-            } else {
-                assert timer == null
-            }
-        where: 'the following values are used'
-            scenario   | notificationsEnabled || expectedTimesMethodCalled
-            'enabled'  | true                 || 1
-            'disabled' | false                || 0
-    }
-
-    def 'Unable to send message'(){
-        given: 'a cm handle id and Lcm Event and notification enabled'
-            def cmHandleId = 'test-cm-handle-id'
-            def eventId = UUID.randomUUID().toString()
-        and: 'event #event'
-            def lcmEvent = new LcmEvent(event: event, eventId: eventId, eventCorrelationId: cmHandleId)
-            def lcmEventHeader = new LcmEventHeader(eventId: eventId, eventCorrelationId: cmHandleId)
-            objectUnderTest.notificationsEnabled = true
-        when: 'producer set to throw an exception'
-            mockLcmEventsProducer.sendLegacyEvent(_, _, _, _) >> { throw new KafkaException('sending failed')}
-        and: 'an event is publised'
-            objectUnderTest.sendLcmEvent(cmHandleId, lcmEvent, lcmEventHeader)
-        then: 'the exception is just logged and not bubbled up'
-            noExceptionThrown()
-        and: 'metrics are recorded with error tags'
-            def timer = meterRegistry.find('cps.ncmp.lcm.events.send').timer()
-            assert timer != null
-            assert timer.count() == 1
-            def expectedTags = [Tag.of('oldCmHandleState', 'N/A'), Tag.of('newCmHandleState', 'N/A')]
-            def tags = timer.getId().getTags()
-            assert tags.containsAll(expectedTags)
-        where: 'the following values are used'
-            scenario                  | event
-            'without values'          | new Event()
-            'without cm handle state' | getEvent()
-    }
-
-    def getEvent() {
-        def event = new Event()
-        def values = new Values()
-        event.setOldValues(values)
-        event.setNewValues(values)
-        event
-    }
-
-    def getEventWithCmHandleState(oldCmHandleState, newCmHandleState) {
-        def event = new Event()
-        def advisedCmHandleStateValues = new Values()
-        advisedCmHandleStateValues.setCmHandleState(oldCmHandleState)
-        event.setOldValues(advisedCmHandleStateValues)
-        def readyCmHandleStateValues = new Values()
-        readyCmHandleStateValues.setCmHandleState(newCmHandleState)
-        event.setNewValues(readyCmHandleStateValues)
-        return event
-    }
-}
index 1ee936b..27519e6 100644 (file)
@@ -21,8 +21,7 @@
 package org.onap.cps.ncmp.utils.events
 
 import com.fasterxml.jackson.databind.ObjectMapper
-import io.cloudevents.CloudEvent
-import org.onap.cps.events.EventsProducer
+import org.onap.cps.events.EventProducer
 import org.onap.cps.ncmp.config.CpsApplicationContext
 import org.onap.cps.ncmp.events.avc.ncmp_to_client.Avc
 import org.onap.cps.ncmp.events.avc.ncmp_to_client.AvcEvent
@@ -32,8 +31,8 @@ import org.springframework.test.context.ContextConfiguration
 @ContextConfiguration(classes = [CpsApplicationContext, ObjectMapper, JsonObjectMapper])
 class InventoryEventProducerSpec extends MessagingBaseSpec {
 
-    def mockEventsProducer = Mock(EventsProducer)
-    def objectUnderTest = new InventoryEventProducer(mockEventsProducer)
+    def mockEventProducer = Mock(EventProducer)
+    def objectUnderTest = new InventoryEventProducer(mockEventProducer)
 
     def 'Send an attribute value change event'() {
         given: 'the event key'
@@ -47,7 +46,7 @@ class InventoryEventProducerSpec extends MessagingBaseSpec {
         when: 'an attribute value change event is sent'
             objectUnderTest.sendAvcEvent(someEventKey, someAttributeName, someOldAttributeValue, someNewAttributeValue)
         then: 'the cloud event producer is invoked with the correct data'
-            1 * mockEventsProducer.sendCloudEvent(_, someEventKey,
+            1 * mockEventProducer.sendCloudEvent(_, someEventKey,
                 cloudEvent -> {
                     def actualAvcs = CloudEventMapper.toTargetEvent(cloudEvent, AvcEvent.class).data.attributeValueChange
                     def expectedAvc = new Avc(attributeName: someAttributeName,
index 86a9305..5cd4f31 100644 (file)
@@ -43,7 +43,7 @@ import org.springframework.stereotype.Service;
 @RequiredArgsConstructor
 public class CpsDataUpdateEventsProducer {
 
-    private final EventsProducer eventsProducer;
+    private final EventProducer eventProducer;
 
     private final CpsNotificationService cpsNotificationService;
 
@@ -75,7 +75,7 @@ public class CpsDataUpdateEventsProducer {
             final CloudEvent cpsDataUpdatedEventAsCloudEvent =
                     CpsEvent.builder().type(CpsDataUpdatedEvent.class.getTypeName()).data(cpsDataUpdatedEvent)
                             .extensions(extensions).build().asCloudEvent();
-            eventsProducer.sendCloudEvent(topicName, updateEventId, cpsDataUpdatedEventAsCloudEvent);
+            eventProducer.sendCloudEvent(topicName, updateEventId, cpsDataUpdatedEventAsCloudEvent);
         } else {
             log.debug("State of Overall Notifications : {} and Cps Change Event Notifications : {}",
                     notificationsEnabled, cpsChangeEventNotificationsEnabled);
@@ -36,18 +36,18 @@ import org.springframework.stereotype.Service;
 import org.springframework.util.SerializationUtils;
 
 /**
- * EventsProducer to send events.
+ * EventProducer to send events.
  */
 
 @Slf4j
 @Service
 @RequiredArgsConstructor
-public class EventsProducer {
+public class EventProducer {
 
     /**
      * KafkaTemplate for legacy (non-cloud) events.
-     * Note: Cloud events should be used. This will be addressed as part of  <a
-     * href="https://lf-onap.atlassian.net/browse/CPS-1717">...</a>.
+     * Note: Cloud events should be used. This will be addressed as part of
+     * <a href="https://lf-onap.atlassian.net/browse/CPS-1717">...</a>.
      */
     @Qualifier("legacyEventKafkaTemplate")
     private final KafkaTemplate<String, LegacyEvent> legacyEventKafkaTemplate;
@@ -39,17 +39,17 @@ import static org.onap.cps.events.model.EventPayload.Action.REMOVE
 import static org.onap.cps.events.model.EventPayload.Action.REPLACE
 
 @ContextConfiguration(classes = [ObjectMapper, JsonObjectMapper])
-class CpsDataUpdateEventsProducerSpec extends Specification {
+class CpsDataUpdateEventProducerSpec extends Specification {
 
     static def CREATE_ACTION = CREATE.value()
     static def REPLACE_ACTION = REPLACE.value()
     static def REMOVE_ACTION = REMOVE.value()
 
-    def mockEventsProducer = Mock(EventsProducer)
+    def mockEventProducer = Mock(EventProducer)
     def objectMapper = new ObjectMapper();
     def mockCpsNotificationService = Mock(CpsNotificationService)
 
-    def objectUnderTest = new CpsDataUpdateEventsProducer(mockEventsProducer, mockCpsNotificationService)
+    def objectUnderTest = new CpsDataUpdateEventsProducer(mockEventProducer, mockCpsNotificationService)
 
     def setup() {
         mockCpsNotificationService.isNotificationEnabled('dataspace01', 'anchor01') >> true
@@ -66,7 +66,7 @@ class CpsDataUpdateEventsProducerSpec extends Specification {
         when: 'service is called to send data update event'
             objectUnderTest.sendCpsDataUpdateEvent(anchor, xpath, actionInRequest, OffsetDateTime.now())
         then: 'the event contains the required attributes'
-            1 * mockEventsProducer.sendCloudEvent('cps-core-event', 'dataspace01:anchor01', _) >> {
+            1 * mockEventProducer.sendCloudEvent('cps-core-event', 'dataspace01:anchor01', _) >> {
             args ->
                 {
                     def cpsDataUpdatedEvent = (args[2] as CloudEvent)
@@ -99,7 +99,7 @@ class CpsDataUpdateEventsProducerSpec extends Specification {
         when: 'service is called to send data event'
             objectUnderTest.sendCpsDataUpdateEvent(anchor, '/', CREATE_ACTION, null)
         then: 'the event is sent'
-            1 * mockEventsProducer.sendCloudEvent('cps-core-event', 'dataspace01:anchor01', _)
+            1 * mockEventProducer.sendCloudEvent('cps-core-event', 'dataspace01:anchor01', _)
     }
 
     def 'Enabling and disabling sending cps events.'() {
@@ -114,7 +114,7 @@ class CpsDataUpdateEventsProducerSpec extends Specification {
         when: 'service is called to send data event'
             objectUnderTest.sendCpsDataUpdateEvent(anchor, '/', CREATE_ACTION, null)
         then: 'the event is only sent when all related flags are true'
-            expectedCallsToProducer * mockEventsProducer.sendCloudEvent(*_)
+            expectedCallsToProducer * mockEventProducer.sendCloudEvent(*_)
         where: 'the following flags are used'
             notificationsEnabled | cpsChangeEventNotificationsEnabled | cpsNotificationServiceisNotificationEnabled  || expectedCallsToProducer
             false                | true                               | true                                         || 0
@@ -29,8 +29,6 @@ import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.clients.producer.RecordMetadata
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.header.Headers
-import org.apache.kafka.common.header.internals.RecordHeader
-import org.apache.kafka.common.header.internals.RecordHeaders
 import org.slf4j.LoggerFactory
 import org.springframework.kafka.core.KafkaTemplate
 import org.springframework.kafka.support.SendResult
@@ -39,7 +37,7 @@ import spock.lang.Specification
 
 import java.util.concurrent.CompletableFuture
 
-class EventsProducerSpec extends Specification {
+class EventProducerSpec extends Specification {
 
     def mockLegacyKafkaTemplate = Mock(KafkaTemplate)
     def mockCloudEventKafkaTemplate = Mock(KafkaTemplate)
@@ -47,17 +45,17 @@ class EventsProducerSpec extends Specification {
     def logger = Spy(ListAppender<ILoggingEvent>)
 
     void setup() {
-        def setupLogger = ((Logger) LoggerFactory.getLogger(EventsProducer.class))
+        def setupLogger = ((Logger) LoggerFactory.getLogger(EventProducer.class))
         setupLogger.setLevel(Level.DEBUG)
         setupLogger.addAppender(logger)
         logger.start()
     }
 
     void cleanup() {
-        ((Logger) LoggerFactory.getLogger(EventsProducer.class)).detachAndStopAllAppenders()
+        ((Logger) LoggerFactory.getLogger(EventProducer.class)).detachAndStopAllAppenders()
     }
 
-    def objectUnderTest = new EventsProducer(mockLegacyKafkaTemplate, mockCloudEventKafkaTemplate, mockCloudEventKafkaTemplateForEos)
+    def objectUnderTest = new EventProducer(mockLegacyKafkaTemplate, mockCloudEventKafkaTemplate, mockCloudEventKafkaTemplateForEos)
 
     def 'Send Cloud Event'() {
         given: 'a successfully sent event'
@@ -180,7 +178,7 @@ class EventsProducerSpec extends Specification {
                 getProducerRecord() >> Mock(ProducerRecord)
             }
             def runtimeException = new RuntimeException('some runtime exception')
-            def logOutcomeMethod = EventsProducer.getDeclaredMethod('logOutcome', String, SendResult, Throwable, boolean)
+            def logOutcomeMethod = EventProducer.getDeclaredMethod('logOutcome', String, SendResult, Throwable, boolean)
             logOutcomeMethod.accessible = true
         when: 'logging the outcome with throwKafkaException set to true'
             logOutcomeMethod.invoke(null, 'some-topic', sendResult, runtimeException, true)