Expand Integration test to verify LCM Event 29/142629/2
authorToineSiebelink <toine.siebelink@est.tech>
Tue, 2 Dec 2025 17:32:19 +0000 (17:32 +0000)
committerToineSiebelink <toine.siebelink@est.tech>
Wed, 3 Dec 2025 10:01:23 +0000 (10:01 +0000)
- Fixed small bug: EventId and EventTime in header and event should be identical
- Check LCM event details for common fields
- Check Header value and corresponding event fields (duplicated!)
- Check old and new value for create and update use cases
- Removed unessary public method

Issue-ID: CPS-3072
Change-Id: I4eaa44747e0ca4c631dea037a2f55a307e0ad1a4
Signed-off-by: ToineSiebelink <toine.siebelink@est.tech>
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducer.java
cps-service/src/main/java/org/onap/cps/events/EventsProducer.java
cps-service/src/test/groovy/org/onap/cps/events/EventsProducerSpec.groovy
integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy
integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/inventory/CmHandleCreateSpec.groovy

index 9e5a97d..333f067 100644 (file)
@@ -68,8 +68,9 @@ public class LcmEventsProducer {
      * @param lcmEventHeader Optional headers associated with the LCM event
      */
     public void sendLcmEvent(final String cmHandleId, final LcmEvent lcmEvent, final LcmEventHeader lcmEventHeader) {
-
         if (notificationsEnabled) {
+            lcmEventHeader.setEventId(lcmEvent.getEventId());
+            lcmEventHeader.setEventTime(lcmEvent.getEventTime());
             final Timer.Sample timerSample = Timer.start(meterRegistry);
             try {
                 @SuppressWarnings("unchecked")
index 61758a0..aea8406 100644 (file)
@@ -86,36 +86,27 @@ public class EventsProducer {
         handleLegacyEventCallback(topicName, eventFuture);
     }
 
+
     /**
-     * Legacy Event sender with headers. Schemas that implement LegacyEvent are eligible to use this method.
+     * Legacy Event sender with headers in a Map. Schemas that implement LegacyEvent are eligible to use this method.
      *
      * @param topicName    valid topic name
      * @param eventKey     message key
-     * @param eventHeaders event headers
-     * @param event        message payload
+     * @param headersAsMap map of legacyEvent headers
+     * @param legacyEvent  message payload
      */
-    public void sendLegacyEvent(final String topicName, final String eventKey, final Headers eventHeaders,
-            final LegacyEvent event) {
+    public void sendLegacyEvent(final String topicName,
+                                final String eventKey,
+                                final Map<String, Object> headersAsMap,
+                                final LegacyEvent legacyEvent) {
+        final Headers headers = convertToKafkaHeaders(headersAsMap);
         final ProducerRecord<String, LegacyEvent> producerRecord =
-                new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders);
+            new ProducerRecord<>(topicName, null, eventKey, legacyEvent, headers);
         final CompletableFuture<SendResult<String, LegacyEvent>> eventFuture =
-                legacyEventKafkaTemplate.send(producerRecord);
+            legacyEventKafkaTemplate.send(producerRecord);
         handleLegacyEventCallback(topicName, eventFuture);
     }
 
-    /**
-     * Legacy Event sender with headers in a Map. Schemas that implement LegacyEvent are eligible to use this method.
-     *
-     * @param topicName    valid topic name
-     * @param eventKey     message key
-     * @param eventHeaders map of event headers
-     * @param event        message payload
-     */
-    public void sendLegacyEvent(final String topicName, final String eventKey, final Map<String, Object> eventHeaders,
-            final LegacyEvent event) {
-        sendLegacyEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event);
-    }
-
     /**
      * Generic CloudEvent sender ensuring Exactly Once Semantics behaviour.
      *
@@ -129,16 +120,15 @@ public class EventsProducer {
         eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e, true));
     }
 
-
     private void handleLegacyEventCallback(final String topicName,
             final CompletableFuture<SendResult<String, LegacyEvent>> eventFuture) {
         eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e, false));
     }
 
-    private Headers convertToKafkaHeaders(final Map<String, Object> eventMessageHeaders) {
-        final Headers eventHeaders = new RecordHeaders();
-        eventMessageHeaders.forEach((key, value) -> eventHeaders.add(key, SerializationUtils.serialize(value)));
-        return eventHeaders;
+    private Headers convertToKafkaHeaders(final Map<String, Object> headersAsMap) {
+        final Headers headers = new RecordHeaders();
+        headersAsMap.forEach((key, value) -> headers.add(key, SerializationUtils.serialize(value)));
+        return headers;
     }
 
     private static void logOutcome(final String topicName, final SendResult<String, ?> result, final Throwable e,
index 8c71fea..8cd1cbd 100644 (file)
@@ -150,25 +150,6 @@ class EventsProducerSpec extends Specification {
             assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event') == true
     }
 
-    def 'Send Legacy Event with Record Headers'() {
-        given: 'a successfully sent event'
-            def sampleEventHeaders = new RecordHeaders([new RecordHeader('k1', SerializationUtils.serialize('v1'))])
-            def sampleProducerRecord = new ProducerRecord('some-topic', null, 'some-key', 'some-value', sampleEventHeaders)
-            def eventFuture = CompletableFuture.completedFuture(
-                new SendResult(
-                    sampleProducerRecord,
-                    new RecordMetadata(new TopicPartition('some-topic', 0), 0, 0, 0, 0, 0)
-                )
-            )
-            def someLegacyEvent = Mock(LegacyEvent)
-        when: 'sending the legacy event'
-            objectUnderTest.sendLegacyEvent('some-topic', 'some-event-key', sampleEventHeaders, someLegacyEvent)
-        then: 'event is sent'
-            1 * mockLegacyKafkaTemplate.send(_) >> eventFuture
-        and: 'the correct debug message is logged'
-            assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event') == true
-    }
-
     def 'Handle Legacy Event Callback'() {
         given: 'an event is successfully sent'
             def eventFuture = CompletableFuture.completedFuture(
@@ -224,4 +205,4 @@ class EventsProducerSpec extends Specification {
         lastLoggingEvent.level == expectedLevel && lastLoggingEvent.formattedMessage.contains(expectedFormattedMessage)
     }
 
-}
\ No newline at end of file
+}
index 8084673..e71511a 100644 (file)
@@ -46,6 +46,7 @@ import org.onap.cps.ncmp.impl.inventory.ParameterizedCmHandleQueryService
 import org.onap.cps.ncmp.impl.inventory.sync.ModuleSyncService
 import org.onap.cps.ncmp.impl.inventory.sync.ModuleSyncWatchdog
 import org.onap.cps.ncmp.impl.utils.AlternateIdMatcher
+import org.onap.cps.ncmp.impl.utils.EventDateTimeFormatter
 import org.onap.cps.ncmp.rest.controller.NetworkCmProxyInventoryController
 import org.onap.cps.ri.repository.DataspaceRepository
 import org.onap.cps.ri.repository.SchemaSetRepository
@@ -68,6 +69,7 @@ import spock.lang.Specification
 
 import java.time.Duration
 import java.time.OffsetDateTime
+import java.time.ZonedDateTime
 import java.util.concurrent.BlockingQueue
 
 @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK, classes = [CpsDataspaceService])
@@ -343,13 +345,15 @@ abstract class CpsIntegrationSpecBase extends Specification {
     def getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, numberOfRecordsToRead) {
         def consumerRecords = []
         def retryAttempts = 10
-        while (consumerRecords.size() < numberOfRecordsToRead) {
-            retryAttempts--
+        while (consumerRecords.size() < numberOfRecordsToRead && retryAttempts-- > 0) {
             consumerRecords.addAll(kafkaConsumer.poll(Duration.ofMillis(100)))
-            if (retryAttempts == 0)
-                break
         }
         return consumerRecords
     }
 
+    def timestampIsVeryRecent(eventTime) {
+        def eventTimeAsOffsetDateTime = EventDateTimeFormatter.toIsoOffsetDateTime(eventTime)
+        Duration.between(eventTimeAsOffsetDateTime, ZonedDateTime.now()).seconds < 3
+    }
+
 }
index ca8ebb0..08c2160 100644 (file)
@@ -38,7 +38,7 @@ import java.time.Duration
 class CmHandleCreateSpec extends CpsIntegrationSpecBase {
 
     NetworkCmProxyInventoryFacadeImpl objectUnderTest
-    def uniqueId = 'ch-unique-id-for-create-test'
+    def uniqueId = 'my-new-cm-handle'
 
     KafkaConsumer<String, LegacyEvent> kafkaConsumer
 
@@ -56,72 +56,79 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase {
         given: 'DMI will return modules when requested'
             dmiDispatcher1.moduleNamesPerCmHandleId['ch-1'] = ['M1', 'M2']
             dmiDispatcher1.moduleNamesPerCmHandleId[uniqueId] = ['M1', 'M2']
-
         when: 'a CM-handle is registered for creation'
-            def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: uniqueId, dataProducerIdentifier: 'my-data-producer-identifier')
+            def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: uniqueId,
+                                                           alternateId: 'fdn1',
+                                                           moduleSetTag: 'tag1',
+                                                           dataProducerIdentifier: 'prod1',
+                                                           publicProperties: [color:'green'])
             def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate])
             def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
-
         then: 'registration gives successful response'
             assert dmiPluginRegistrationResponse.createdCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(uniqueId)]
-
         and: 'CM-handle is initially in ADVISED state'
             assert CmHandleState.ADVISED == objectUnderTest.getCmHandleCompositeState(uniqueId).cmHandleState
-
         then: 'the module sync watchdog is triggered'
             moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
-
         then: 'CM-handle goes to READY state after module sync'
             assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(uniqueId).cmHandleState
-
         and: 'the CM-handle has expected modules'
             assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(uniqueId).moduleName.sort()
-
-        then: 'get the latest messages'
+        then: 'get the last 2 messages and related headers'
             def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 2)
-
-        and: 'both converted messages are for the correct cm handle'
-            def notificationMessages = []
-            for (def consumerRecord : consumerRecords) {
-                notificationMessages.add(jsonObjectMapper.convertJsonString(consumerRecord.value().toString(), LcmEvent))
+            def messages = []
+            def headerMaps = []
+            consumerRecords.each { consumerRecord ->
+                messages.add(jsonObjectMapper.convertJsonString(consumerRecord.value().toString(), LcmEvent))
+                headerMaps.add(getHeadersAsMap(consumerRecord))
             }
-            assert notificationMessages.event.cmHandleId == [ uniqueId, uniqueId ]
-
-        and: 'the oldest event is about the update to ADVISED state and it has a data producer id'
-            assert notificationMessages[0].event.newValues.cmHandleState.value() == 'ADVISED'
-            assert notificationMessages[0].event.dataProducerIdentifier == 'my-data-producer-identifier'
-
-        and: 'the next event is about update to READY state and it has the same data producer identifier as in in ADVISED state'
-            assert notificationMessages[1].event.newValues.cmHandleState.value() == 'READY'
-            assert notificationMessages[1].event.dataProducerIdentifier == 'my-data-producer-identifier'
-
+        and: 'both messages have the correct common attributes (that did not change)'
+            assert messages.event.every {
+                it.cmHandleId == uniqueId &&
+                it.alternateId == 'fdn1' &&
+                it.moduleSetTag == 'tag1' &&
+                it.dataProducerIdentifier == 'prod1'
+            }
+        and: 'the header fields are populated correctly and stored as kafka headers too'
+            validateEventHeaders(messages[0], headerMaps[0], 'create')
+            validateEventHeaders(messages[1], headerMaps[1], 'update')
+        and: 'the first lcm event has no old values and the initial attributes as new values state ADVISED'
+            with(messages[0].event) {
+                assert oldValues == null
+                assert newValues.cmHandleState.value() == 'ADVISED'
+                assert newValues.dataSyncEnabled == null
+                assert newValues.cmHandleProperties[0] == [color:'green']
+            }
+        and: 'the next event is about update to READY state (new value), the old value for state is ADVISED'
+            assert messages[1].event.oldValues.cmHandleState.value() == 'ADVISED'
+            assert messages[1].event.newValues.cmHandleState.value() == 'READY'
+        and: 'the cm handle (public) properties have not changed and are therefore null for old and new values'
+            assert messages[1].event.oldValues.cmHandleProperties == null
+            assert messages[1].event.newValues.cmHandleProperties == null
+        and: 'the data sync flag goes from undefined to false'
+            assert messages[1].event.oldValues.dataSyncEnabled == null
+            assert messages[1].event.newValues.dataSyncEnabled == false
         and: 'there are no more messages to be read'
             assert getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 1).size() == 0
-
         cleanup: 'deregister CM handle'
             deregisterCmHandle(DMI1_URL, uniqueId)
     }
 
-    def 'CM Handle goes to LOCKED state when DMI gives error during module sync.'() {
+    def 'CM Handle registration with DMI error during module sync.'() {
         given: 'DMI is not available to handle requests'
             dmiDispatcher1.isAvailable = false
-
         when: 'a CM-handle is registered for creation'
             def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: 'ch-1')
             def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate])
             objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
-
         and: 'the module sync watchdog is triggered'
             moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
-
         then: 'CM-handle goes to LOCKED state with reason MODULE_SYNC_FAILED'
             def cmHandleCompositeState = objectUnderTest.getCmHandleCompositeState('ch-1')
             assert cmHandleCompositeState.cmHandleState == CmHandleState.LOCKED
             assert cmHandleCompositeState.lockReason.lockReasonCategory == LockReasonCategory.MODULE_SYNC_FAILED
-
         and: 'CM-handle has no modules'
             assert objectUnderTest.getYangResourcesModuleReferences('ch-1').empty
-
         cleanup: 'deregister CM handle'
             deregisterCmHandle(DMI1_URL, 'ch-1')
     }
@@ -132,23 +139,17 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase {
         and: 'existing CM-handles cm-1 with moduleSetTag "A", and cm-2 with moduleSetTag "B"'
             registerCmHandle(DMI1_URL, 'ch-1', 'A')
             registerCmHandle(DMI1_URL, 'ch-2', 'B')
-
         when: 'a CM-handle is registered for creation with moduleSetTag "B"'
             def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: 'ch-3', moduleSetTag: 'B')
             objectUnderTest.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate]))
-
         and: 'the module sync watchdog is triggered'
             moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
-
         then: 'the CM-handle goes to READY state'
             assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState('ch-3').cmHandleState
-
         and: 'the CM-handle has expected moduleSetTag'
             assert objectUnderTest.getNcmpServiceCmHandle('ch-3').moduleSetTag == 'B'
-
         and: 'the CM-handle has expected modules from module set "B": M1 and M3'
             assert ['M1', 'M3'] == objectUnderTest.getYangResourcesModuleReferences('ch-3').moduleName.sort()
-
         cleanup: 'deregister CM handles'
             deregisterCmHandles(DMI1_URL, ['ch-1', 'ch-2', 'ch-3'])
     }
@@ -160,7 +161,6 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase {
             registerCmHandle(DMI1_URL, 'ch-1', NO_MODULE_SET_TAG, 'existing-alt-id')
         and: 'an existing CM-handle with no alternate ID'
             registerCmHandle(DMI1_URL, 'ch-2', NO_MODULE_SET_TAG, NO_ALTERNATE_ID)
-
         when: 'a batch of CM-handles is registered for creation with various alternate IDs'
             def cmHandlesToCreate = [
                 new NcmpServiceCmHandle(cmHandleId: 'ch-3', alternateId: NO_ALTERNATE_ID),
@@ -171,7 +171,6 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase {
             ]
             def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: cmHandlesToCreate)
             def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
-
         then: 'registration gives expected responses'
             assert dmiPluginRegistrationResponse.createdCmHandles.sort { it.cmHandle } == [
                 CmHandleRegistrationResponse.createSuccessResponse('ch-3'),
@@ -180,7 +179,6 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase {
                 CmHandleRegistrationResponse.createSuccessResponse('ch-6'),
                 CmHandleRegistrationResponse.createFailureResponse('ch-7', NcmpResponseStatus.CM_HANDLE_ALREADY_EXIST),
             ]
-
         cleanup: 'deregister CM handles'
             deregisterCmHandles(DMI1_URL, (1..7).collect{ 'ch-'+it })
     }
@@ -188,35 +186,27 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase {
     def 'CM Handle retry after failed module sync.'() {
         given: 'DMI is not initially available to handle requests'
             dmiDispatcher1.isAvailable = false
-
         when: 'CM-handles are registered for creation'
             def cmHandlesToCreate = [new NcmpServiceCmHandle(cmHandleId: 'ch-1'), new NcmpServiceCmHandle(cmHandleId: 'ch-2')]
             def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: cmHandlesToCreate)
             objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
-
         and: 'the module sync watchdog is triggered'
             moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
-
         then: 'CM-handles go to LOCKED state'
             assert objectUnderTest.getCmHandleCompositeState('ch-1').cmHandleState == CmHandleState.LOCKED
-
         when: 'DMI is available for retry'
             dmiDispatcher1.moduleNamesPerCmHandleId = ['ch-1': ['M1', 'M2'], 'ch-2': ['M1', 'M2']]
             dmiDispatcher1.isAvailable = true
-
         and: 'the module sync watchdog is triggered TWICE'
             2.times { moduleSyncWatchdog.moduleSyncAdvisedCmHandles() }
-
         then: 'Both CM-handles go to READY state'
             ['ch-1', 'ch-2'].each { cmHandleId ->
                 assert objectUnderTest.getCmHandleCompositeState(cmHandleId).cmHandleState == CmHandleState.READY
             }
-
         and: 'Both CM-handles have expected modules'
             ['ch-1', 'ch-2'].each { cmHandleId ->
                 assert objectUnderTest.getYangResourcesModuleReferences(cmHandleId).moduleName.sort() == ['M1', 'M2']
             }
-
         cleanup: 'deregister CM handles'
             deregisterCmHandles(DMI1_URL, ['ch-1', 'ch-2'])
     }
@@ -227,4 +217,33 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase {
         kafkaConsumer.poll(Duration.ofMillis(500))
     }
 
+    def getHeadersAsMap(consumerRecord) {
+        def headersAsMap = [:]
+        consumerRecord.headers().each { header ->
+            def value = (new String((byte[]) header.value())).substring(7) // The raw header is prefixed with encoded type
+            headersAsMap.put(header.key(), value)
+        }
+        return headersAsMap
+    }
+
+    def validateEventHeaders(message, headerAsMap, expectedEventType) {
+        with(message) {
+            assert UUID.fromString(eventId) != null
+            assert headerAsMap.get('eventId') == eventId
+            assert eventCorrelationId == uniqueId
+            assert headerAsMap.get('eventCorrelationId') == eventCorrelationId
+            assert timestampIsVeryRecent(eventTime)
+            assert headerAsMap.get('eventTime') == eventTime
+            assert eventSource == 'org.onap.ncmp'
+            assert headerAsMap.get('eventSource') == eventSource
+            assert eventType == 'org.onap.ncmp.cmhandle-lcm-event.'+expectedEventType
+            assert headerAsMap.get('eventType') == eventType
+            assert eventSchema == 'org.onap.ncmp:cmhandle-lcm-event'
+            assert headerAsMap.get('eventSchema') == eventSchema
+            assert eventSchemaVersion == '1.0'
+            assert headerAsMap.get('eventSchemaVersion') == eventSchemaVersion
+        }
+        return true
+    }
+
 }