From 38a6ff81ade39e180b8e5fb0c7ef1a522dbc4d01 Mon Sep 17 00:00:00 2001 From: ToineSiebelink Date: Tue, 2 Dec 2025 17:32:19 +0000 Subject: [PATCH] Expand Integration test to verify LCM Event - Fixed small bug: EventId and EventTime in header and event should be identical - Check LCM event details for common fields - Check Header value and corresponding event fields (duplicated!) - Check old and new value for create and update use cases - Removed unessary public method Issue-ID: CPS-3072 Change-Id: I4eaa44747e0ca4c631dea037a2f55a307e0ad1a4 Signed-off-by: ToineSiebelink --- .../impl/inventory/sync/lcm/LcmEventsProducer.java | 3 +- .../java/org/onap/cps/events/EventsProducer.java | 40 +++---- .../org/onap/cps/events/EventsProducerSpec.groovy | 21 +--- .../integration/base/CpsIntegrationSpecBase.groovy | 12 ++- .../ncmp/inventory/CmHandleCreateSpec.groovy | 117 ++++++++++++--------- 5 files changed, 94 insertions(+), 99 deletions(-) diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducer.java index 9e5a97d4c0..333f0674e0 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducer.java @@ -68,8 +68,9 @@ public class LcmEventsProducer { * @param lcmEventHeader Optional headers associated with the LCM event */ public void sendLcmEvent(final String cmHandleId, final LcmEvent lcmEvent, final LcmEventHeader lcmEventHeader) { - if (notificationsEnabled) { + lcmEventHeader.setEventId(lcmEvent.getEventId()); + lcmEventHeader.setEventTime(lcmEvent.getEventTime()); final Timer.Sample timerSample = Timer.start(meterRegistry); try { @SuppressWarnings("unchecked") diff --git a/cps-service/src/main/java/org/onap/cps/events/EventsProducer.java b/cps-service/src/main/java/org/onap/cps/events/EventsProducer.java index 61758a08a4..aea8406857 100644 --- a/cps-service/src/main/java/org/onap/cps/events/EventsProducer.java +++ b/cps-service/src/main/java/org/onap/cps/events/EventsProducer.java @@ -86,36 +86,27 @@ public class EventsProducer { handleLegacyEventCallback(topicName, eventFuture); } + /** - * Legacy Event sender with headers. Schemas that implement LegacyEvent are eligible to use this method. + * Legacy Event sender with headers in a Map. Schemas that implement LegacyEvent are eligible to use this method. * * @param topicName valid topic name * @param eventKey message key - * @param eventHeaders event headers - * @param event message payload + * @param headersAsMap map of legacyEvent headers + * @param legacyEvent message payload */ - public void sendLegacyEvent(final String topicName, final String eventKey, final Headers eventHeaders, - final LegacyEvent event) { + public void sendLegacyEvent(final String topicName, + final String eventKey, + final Map headersAsMap, + final LegacyEvent legacyEvent) { + final Headers headers = convertToKafkaHeaders(headersAsMap); final ProducerRecord producerRecord = - new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders); + new ProducerRecord<>(topicName, null, eventKey, legacyEvent, headers); final CompletableFuture> eventFuture = - legacyEventKafkaTemplate.send(producerRecord); + legacyEventKafkaTemplate.send(producerRecord); handleLegacyEventCallback(topicName, eventFuture); } - /** - * Legacy Event sender with headers in a Map. Schemas that implement LegacyEvent are eligible to use this method. - * - * @param topicName valid topic name - * @param eventKey message key - * @param eventHeaders map of event headers - * @param event message payload - */ - public void sendLegacyEvent(final String topicName, final String eventKey, final Map eventHeaders, - final LegacyEvent event) { - sendLegacyEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event); - } - /** * Generic CloudEvent sender ensuring Exactly Once Semantics behaviour. * @@ -129,16 +120,15 @@ public class EventsProducer { eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e, true)); } - private void handleLegacyEventCallback(final String topicName, final CompletableFuture> eventFuture) { eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e, false)); } - private Headers convertToKafkaHeaders(final Map eventMessageHeaders) { - final Headers eventHeaders = new RecordHeaders(); - eventMessageHeaders.forEach((key, value) -> eventHeaders.add(key, SerializationUtils.serialize(value))); - return eventHeaders; + private Headers convertToKafkaHeaders(final Map headersAsMap) { + final Headers headers = new RecordHeaders(); + headersAsMap.forEach((key, value) -> headers.add(key, SerializationUtils.serialize(value))); + return headers; } private static void logOutcome(final String topicName, final SendResult result, final Throwable e, diff --git a/cps-service/src/test/groovy/org/onap/cps/events/EventsProducerSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/events/EventsProducerSpec.groovy index 8c71fea492..8cd1cbd782 100644 --- a/cps-service/src/test/groovy/org/onap/cps/events/EventsProducerSpec.groovy +++ b/cps-service/src/test/groovy/org/onap/cps/events/EventsProducerSpec.groovy @@ -150,25 +150,6 @@ class EventsProducerSpec extends Specification { assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event') == true } - def 'Send Legacy Event with Record Headers'() { - given: 'a successfully sent event' - def sampleEventHeaders = new RecordHeaders([new RecordHeader('k1', SerializationUtils.serialize('v1'))]) - def sampleProducerRecord = new ProducerRecord('some-topic', null, 'some-key', 'some-value', sampleEventHeaders) - def eventFuture = CompletableFuture.completedFuture( - new SendResult( - sampleProducerRecord, - new RecordMetadata(new TopicPartition('some-topic', 0), 0, 0, 0, 0, 0) - ) - ) - def someLegacyEvent = Mock(LegacyEvent) - when: 'sending the legacy event' - objectUnderTest.sendLegacyEvent('some-topic', 'some-event-key', sampleEventHeaders, someLegacyEvent) - then: 'event is sent' - 1 * mockLegacyKafkaTemplate.send(_) >> eventFuture - and: 'the correct debug message is logged' - assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event') == true - } - def 'Handle Legacy Event Callback'() { given: 'an event is successfully sent' def eventFuture = CompletableFuture.completedFuture( @@ -224,4 +205,4 @@ class EventsProducerSpec extends Specification { lastLoggingEvent.level == expectedLevel && lastLoggingEvent.formattedMessage.contains(expectedFormattedMessage) } -} \ No newline at end of file +} diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy index 8084673e30..e71511a5c9 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy @@ -46,6 +46,7 @@ import org.onap.cps.ncmp.impl.inventory.ParameterizedCmHandleQueryService import org.onap.cps.ncmp.impl.inventory.sync.ModuleSyncService import org.onap.cps.ncmp.impl.inventory.sync.ModuleSyncWatchdog import org.onap.cps.ncmp.impl.utils.AlternateIdMatcher +import org.onap.cps.ncmp.impl.utils.EventDateTimeFormatter import org.onap.cps.ncmp.rest.controller.NetworkCmProxyInventoryController import org.onap.cps.ri.repository.DataspaceRepository import org.onap.cps.ri.repository.SchemaSetRepository @@ -68,6 +69,7 @@ import spock.lang.Specification import java.time.Duration import java.time.OffsetDateTime +import java.time.ZonedDateTime import java.util.concurrent.BlockingQueue @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK, classes = [CpsDataspaceService]) @@ -343,13 +345,15 @@ abstract class CpsIntegrationSpecBase extends Specification { def getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, numberOfRecordsToRead) { def consumerRecords = [] def retryAttempts = 10 - while (consumerRecords.size() < numberOfRecordsToRead) { - retryAttempts-- + while (consumerRecords.size() < numberOfRecordsToRead && retryAttempts-- > 0) { consumerRecords.addAll(kafkaConsumer.poll(Duration.ofMillis(100))) - if (retryAttempts == 0) - break } return consumerRecords } + def timestampIsVeryRecent(eventTime) { + def eventTimeAsOffsetDateTime = EventDateTimeFormatter.toIsoOffsetDateTime(eventTime) + Duration.between(eventTimeAsOffsetDateTime, ZonedDateTime.now()).seconds < 3 + } + } diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/inventory/CmHandleCreateSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/inventory/CmHandleCreateSpec.groovy index ca8ebb0800..08c2160371 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/inventory/CmHandleCreateSpec.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/inventory/CmHandleCreateSpec.groovy @@ -38,7 +38,7 @@ import java.time.Duration class CmHandleCreateSpec extends CpsIntegrationSpecBase { NetworkCmProxyInventoryFacadeImpl objectUnderTest - def uniqueId = 'ch-unique-id-for-create-test' + def uniqueId = 'my-new-cm-handle' KafkaConsumer kafkaConsumer @@ -56,72 +56,79 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase { given: 'DMI will return modules when requested' dmiDispatcher1.moduleNamesPerCmHandleId['ch-1'] = ['M1', 'M2'] dmiDispatcher1.moduleNamesPerCmHandleId[uniqueId] = ['M1', 'M2'] - when: 'a CM-handle is registered for creation' - def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: uniqueId, dataProducerIdentifier: 'my-data-producer-identifier') + def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: uniqueId, + alternateId: 'fdn1', + moduleSetTag: 'tag1', + dataProducerIdentifier: 'prod1', + publicProperties: [color:'green']) def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate]) def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(dmiPluginRegistration) - then: 'registration gives successful response' assert dmiPluginRegistrationResponse.createdCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(uniqueId)] - and: 'CM-handle is initially in ADVISED state' assert CmHandleState.ADVISED == objectUnderTest.getCmHandleCompositeState(uniqueId).cmHandleState - then: 'the module sync watchdog is triggered' moduleSyncWatchdog.moduleSyncAdvisedCmHandles() - then: 'CM-handle goes to READY state after module sync' assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(uniqueId).cmHandleState - and: 'the CM-handle has expected modules' assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(uniqueId).moduleName.sort() - - then: 'get the latest messages' + then: 'get the last 2 messages and related headers' def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 2) - - and: 'both converted messages are for the correct cm handle' - def notificationMessages = [] - for (def consumerRecord : consumerRecords) { - notificationMessages.add(jsonObjectMapper.convertJsonString(consumerRecord.value().toString(), LcmEvent)) + def messages = [] + def headerMaps = [] + consumerRecords.each { consumerRecord -> + messages.add(jsonObjectMapper.convertJsonString(consumerRecord.value().toString(), LcmEvent)) + headerMaps.add(getHeadersAsMap(consumerRecord)) } - assert notificationMessages.event.cmHandleId == [ uniqueId, uniqueId ] - - and: 'the oldest event is about the update to ADVISED state and it has a data producer id' - assert notificationMessages[0].event.newValues.cmHandleState.value() == 'ADVISED' - assert notificationMessages[0].event.dataProducerIdentifier == 'my-data-producer-identifier' - - and: 'the next event is about update to READY state and it has the same data producer identifier as in in ADVISED state' - assert notificationMessages[1].event.newValues.cmHandleState.value() == 'READY' - assert notificationMessages[1].event.dataProducerIdentifier == 'my-data-producer-identifier' - + and: 'both messages have the correct common attributes (that did not change)' + assert messages.event.every { + it.cmHandleId == uniqueId && + it.alternateId == 'fdn1' && + it.moduleSetTag == 'tag1' && + it.dataProducerIdentifier == 'prod1' + } + and: 'the header fields are populated correctly and stored as kafka headers too' + validateEventHeaders(messages[0], headerMaps[0], 'create') + validateEventHeaders(messages[1], headerMaps[1], 'update') + and: 'the first lcm event has no old values and the initial attributes as new values state ADVISED' + with(messages[0].event) { + assert oldValues == null + assert newValues.cmHandleState.value() == 'ADVISED' + assert newValues.dataSyncEnabled == null + assert newValues.cmHandleProperties[0] == [color:'green'] + } + and: 'the next event is about update to READY state (new value), the old value for state is ADVISED' + assert messages[1].event.oldValues.cmHandleState.value() == 'ADVISED' + assert messages[1].event.newValues.cmHandleState.value() == 'READY' + and: 'the cm handle (public) properties have not changed and are therefore null for old and new values' + assert messages[1].event.oldValues.cmHandleProperties == null + assert messages[1].event.newValues.cmHandleProperties == null + and: 'the data sync flag goes from undefined to false' + assert messages[1].event.oldValues.dataSyncEnabled == null + assert messages[1].event.newValues.dataSyncEnabled == false and: 'there are no more messages to be read' assert getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 1).size() == 0 - cleanup: 'deregister CM handle' deregisterCmHandle(DMI1_URL, uniqueId) } - def 'CM Handle goes to LOCKED state when DMI gives error during module sync.'() { + def 'CM Handle registration with DMI error during module sync.'() { given: 'DMI is not available to handle requests' dmiDispatcher1.isAvailable = false - when: 'a CM-handle is registered for creation' def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: 'ch-1') def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate]) objectUnderTest.updateDmiRegistration(dmiPluginRegistration) - and: 'the module sync watchdog is triggered' moduleSyncWatchdog.moduleSyncAdvisedCmHandles() - then: 'CM-handle goes to LOCKED state with reason MODULE_SYNC_FAILED' def cmHandleCompositeState = objectUnderTest.getCmHandleCompositeState('ch-1') assert cmHandleCompositeState.cmHandleState == CmHandleState.LOCKED assert cmHandleCompositeState.lockReason.lockReasonCategory == LockReasonCategory.MODULE_SYNC_FAILED - and: 'CM-handle has no modules' assert objectUnderTest.getYangResourcesModuleReferences('ch-1').empty - cleanup: 'deregister CM handle' deregisterCmHandle(DMI1_URL, 'ch-1') } @@ -132,23 +139,17 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase { and: 'existing CM-handles cm-1 with moduleSetTag "A", and cm-2 with moduleSetTag "B"' registerCmHandle(DMI1_URL, 'ch-1', 'A') registerCmHandle(DMI1_URL, 'ch-2', 'B') - when: 'a CM-handle is registered for creation with moduleSetTag "B"' def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: 'ch-3', moduleSetTag: 'B') objectUnderTest.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate])) - and: 'the module sync watchdog is triggered' moduleSyncWatchdog.moduleSyncAdvisedCmHandles() - then: 'the CM-handle goes to READY state' assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState('ch-3').cmHandleState - and: 'the CM-handle has expected moduleSetTag' assert objectUnderTest.getNcmpServiceCmHandle('ch-3').moduleSetTag == 'B' - and: 'the CM-handle has expected modules from module set "B": M1 and M3' assert ['M1', 'M3'] == objectUnderTest.getYangResourcesModuleReferences('ch-3').moduleName.sort() - cleanup: 'deregister CM handles' deregisterCmHandles(DMI1_URL, ['ch-1', 'ch-2', 'ch-3']) } @@ -160,7 +161,6 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase { registerCmHandle(DMI1_URL, 'ch-1', NO_MODULE_SET_TAG, 'existing-alt-id') and: 'an existing CM-handle with no alternate ID' registerCmHandle(DMI1_URL, 'ch-2', NO_MODULE_SET_TAG, NO_ALTERNATE_ID) - when: 'a batch of CM-handles is registered for creation with various alternate IDs' def cmHandlesToCreate = [ new NcmpServiceCmHandle(cmHandleId: 'ch-3', alternateId: NO_ALTERNATE_ID), @@ -171,7 +171,6 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase { ] def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: cmHandlesToCreate) def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(dmiPluginRegistration) - then: 'registration gives expected responses' assert dmiPluginRegistrationResponse.createdCmHandles.sort { it.cmHandle } == [ CmHandleRegistrationResponse.createSuccessResponse('ch-3'), @@ -180,7 +179,6 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase { CmHandleRegistrationResponse.createSuccessResponse('ch-6'), CmHandleRegistrationResponse.createFailureResponse('ch-7', NcmpResponseStatus.CM_HANDLE_ALREADY_EXIST), ] - cleanup: 'deregister CM handles' deregisterCmHandles(DMI1_URL, (1..7).collect{ 'ch-'+it }) } @@ -188,35 +186,27 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase { def 'CM Handle retry after failed module sync.'() { given: 'DMI is not initially available to handle requests' dmiDispatcher1.isAvailable = false - when: 'CM-handles are registered for creation' def cmHandlesToCreate = [new NcmpServiceCmHandle(cmHandleId: 'ch-1'), new NcmpServiceCmHandle(cmHandleId: 'ch-2')] def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: cmHandlesToCreate) objectUnderTest.updateDmiRegistration(dmiPluginRegistration) - and: 'the module sync watchdog is triggered' moduleSyncWatchdog.moduleSyncAdvisedCmHandles() - then: 'CM-handles go to LOCKED state' assert objectUnderTest.getCmHandleCompositeState('ch-1').cmHandleState == CmHandleState.LOCKED - when: 'DMI is available for retry' dmiDispatcher1.moduleNamesPerCmHandleId = ['ch-1': ['M1', 'M2'], 'ch-2': ['M1', 'M2']] dmiDispatcher1.isAvailable = true - and: 'the module sync watchdog is triggered TWICE' 2.times { moduleSyncWatchdog.moduleSyncAdvisedCmHandles() } - then: 'Both CM-handles go to READY state' ['ch-1', 'ch-2'].each { cmHandleId -> assert objectUnderTest.getCmHandleCompositeState(cmHandleId).cmHandleState == CmHandleState.READY } - and: 'Both CM-handles have expected modules' ['ch-1', 'ch-2'].each { cmHandleId -> assert objectUnderTest.getYangResourcesModuleReferences(cmHandleId).moduleName.sort() == ['M1', 'M2'] } - cleanup: 'deregister CM handles' deregisterCmHandles(DMI1_URL, ['ch-1', 'ch-2']) } @@ -227,4 +217,33 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase { kafkaConsumer.poll(Duration.ofMillis(500)) } + def getHeadersAsMap(consumerRecord) { + def headersAsMap = [:] + consumerRecord.headers().each { header -> + def value = (new String((byte[]) header.value())).substring(7) // The raw header is prefixed with encoded type + headersAsMap.put(header.key(), value) + } + return headersAsMap + } + + def validateEventHeaders(message, headerAsMap, expectedEventType) { + with(message) { + assert UUID.fromString(eventId) != null + assert headerAsMap.get('eventId') == eventId + assert eventCorrelationId == uniqueId + assert headerAsMap.get('eventCorrelationId') == eventCorrelationId + assert timestampIsVeryRecent(eventTime) + assert headerAsMap.get('eventTime') == eventTime + assert eventSource == 'org.onap.ncmp' + assert headerAsMap.get('eventSource') == eventSource + assert eventType == 'org.onap.ncmp.cmhandle-lcm-event.'+expectedEventType + assert headerAsMap.get('eventType') == eventType + assert eventSchema == 'org.onap.ncmp:cmhandle-lcm-event' + assert headerAsMap.get('eventSchema') == eventSchema + assert eventSchemaVersion == '1.0' + assert headerAsMap.get('eventSchemaVersion') == eventSchemaVersion + } + return true + } + } -- 2.16.6