Replace sleep with polling in CmSubscriptionSpec 12/142412/3
authorshikha0203 <shivani.khare@est.tech>
Tue, 11 Nov 2025 16:59:37 +0000 (16:59 +0000)
committerShivani Khare <shivani.khare@est.tech>
Fri, 14 Nov 2025 09:59:45 +0000 (09:59 +0000)
Issue-ID: CPS-3037
Change-Id: If3da998b1f9a2d41ea774b2e323aa6a1aa51cd57
Signed-off-by: shikha0203 <shivani.khare@est.tech>
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventConsumer.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpInEventConsumer.java
integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/datajobs/subscription/CmSubscriptionSpec.groovy

index 141c74a..aa351ad 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * ============LICENSE_START=======================================================
+ *  ===========LICENSE_START========================================================
  *  Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
@@ -75,6 +75,8 @@ public class EventConsumer {
                 }
             }
         }
+        log.info("Finished processing DMI subscription response event with details: | correlationId={} | eventType={}",
+                correlationId, eventType);
     }
 
     private CmSubscriptionStatus getCmSubscriptionStatus(final DataJobSubscriptionDmiOutEvent dmiOutEvent) {
index cfa8ba6..4e38e5e 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * ============LICENSE_START=======================================================
+ *  ===========LICENSE_START========================================================
  * Copyright (c) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
@@ -63,7 +63,7 @@ public class NcmpInEventConsumer {
                 default -> log.warn("Unknown eventType={} for dataJobId={}", eventType, dataJobId);
             }
         } finally {
-            log.info("NCMP In Event has been Processed for dataJobId={}", dataJobId);
+            log.info("NCMP In Event with eventType={} has been Processed for dataJobId={}", eventType, dataJobId);
         }
     }
 
index d122c0e..263a245 100644 (file)
@@ -32,11 +32,13 @@ import java.time.Duration
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.serialization.StringSerializer
 import org.onap.cps.integration.base.CpsIntegrationSpecBase
+import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.EventConsumer
 import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.NcmpInEventConsumer
 import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService
 import org.slf4j.LoggerFactory
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.beans.factory.annotation.Value
+import spock.util.concurrent.PollingConditions
 
 class CmSubscriptionSpec extends CpsIntegrationSpecBase {
 
@@ -57,7 +59,8 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase {
     def testResponseProducer
 
     def listAppender = new ListAppender<ILoggingEvent>()
-    def logger = (Logger) LoggerFactory.getLogger(NcmpInEventConsumer)
+    def ncmpInEventLogger = (Logger) LoggerFactory.getLogger(NcmpInEventConsumer)
+    def dmiEventConsumerLogger = (Logger) LoggerFactory.getLogger(EventConsumer)
 
     def setup() {
         registerCmHandlesForSubscriptions()
@@ -68,11 +71,13 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase {
         testRequestProducer = kafkaTestContainer.createProducer('test-client-id', StringSerializer.class)
         testResponseProducer = kafkaTestContainer.createProducer('test-client-id', CloudEventSerializer.class)
         listAppender.start()
-        logger.addAppender(listAppender)
+        ncmpInEventLogger.addAppender(listAppender)
+        dmiEventConsumerLogger.addAppender(listAppender)
     }
 
     def cleanup() {
-        logger.detachAndStopAllAppenders()
+        ncmpInEventLogger.detachAndStopAllAppenders()
+        dmiEventConsumerLogger.detachAndStopAllAppenders()
         dmiInConsumer.unsubscribe()
         dmiInConsumer.close()
         testRequestProducer.close()
@@ -92,7 +97,7 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase {
             def eventDataNodeSelector = (dmi1DataNodeSelector + dmi2DataNodeSelector)
             def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'myDataJobId01', eventDataNodeSelector)
         when: 'a subscription create request is sent'
-            sendSubscriptionRequest(subscriptionTopic, 'key', eventPayload, 'myDataJobId01')
+            sendSubscriptionRequest(subscriptionTopic, 'key', eventPayload, 'myDataJobId01', 'dataJobCreated')
         then: 'log shows event is consumed by ncmp'
             def messages = listAppender.list*.formattedMessage
             messages.any { msg -> msg.contains('myDataJobId01') && msg.contains('dataJobCreated') }
@@ -108,8 +113,8 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase {
 
     def 'Create subscription accepted by DMI.'() {
         given: 'a persisted subscription'
-            def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'newDataJob', '/parent[id=\\\'0\\\']\\n')
-            sendSubscriptionRequest(subscriptionTopic, 'some key', eventPayload, 'newDataJob')
+            def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'newDataJob', '/parent[id=\\\"0\\\"]\\n')
+            sendSubscriptionRequest(subscriptionTopic, 'some key', eventPayload, 'newDataJob', 'dataJobCreated')
         when: 'dmi accepts the subscription create request'
             sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-0', 'newDataJob#dmi-0')
         then: 'there are no more inactive data node selector for given datajob id'
@@ -129,7 +134,7 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase {
             def eventDataNodeSelector = (overlappingDmi1DataNodeSelector + newDmi2DataNodeSelector)
             def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'partialOverlappingDataJobId', eventDataNodeSelector)
         when: 'create request event for overlapping subscription is sent'
-            sendSubscriptionRequest(subscriptionTopic, 'some key', eventPayload, 'partialOverlappingDataJobId')
+            sendSubscriptionRequest(subscriptionTopic, 'some key', eventPayload, 'partialOverlappingDataJobId', 'dataJobCreated')
         then: 'log shows event is consumed by ncmp'
             def messages = listAppender.list*.formattedMessage
             messages.any { msg -> msg.contains('partialOverlappingDataJobId') && msg.contains('dataJobCreated') }
@@ -149,24 +154,24 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase {
         given: 'a new data node selector'
             def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n'''
         and: 'an event payload'
-            def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'fullyOverlappingDataJob', dataNodeSelector)
+            def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'fullyOverlappingDataJobId', dataNodeSelector)
         and: 'existing active subscriptions in database'
             createAndAcceptSubscriptionA()
             createAndAcceptSubscriptionB()
         when: 'a new subscription create request is sent'
-            sendSubscriptionRequest(subscriptionTopic, 'fullyOverlappingDataJob', eventPayload, 'myDataJobId')
+            sendSubscriptionRequest(subscriptionTopic, 'some key', eventPayload, 'fullyOverlappingDataJobId', 'dataJobCreated')
         then: 'log shows event is consumed by ncmp'
             def messages = listAppender.list*.formattedMessage
-            messages.any { msg -> msg.contains('fullyOverlappingDataJob') && msg.contains('dataJobCreated') }
+            messages.any { msg -> msg.contains('fullyOverlappingDataJobId') && msg.contains('dataJobCreated') }
         and: 'the 2 data node selectors for the given data job id is persisted'
             assert cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
-                    '//subscription/dataJobId[text()=\'fullyOverlappingDataJob\']', DIRECT_CHILDREN_ONLY).size() == 2
+                    '//subscription/dataJobId[text()=\'fullyOverlappingDataJobId\']', DIRECT_CHILDREN_ONLY).size() == 2
         and: 'there are no inactive data node selector'
-            assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('fullyOverlappingDataJob').size() == 0
+            assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('fullyOverlappingDataJobId').size() == 0
         and: 'get correlation ids from event sent to DMIs'
             def correlationIds = getAllConsumedCorrelationIds()
         and: 'there is no correlation IDs (event) for any dmi'
-            assert !correlationIds.any { correlationId -> correlationId.startsWith('fullyOverlappingDataJob') }
+            assert !correlationIds.any { correlationId -> correlationId.startsWith('fullyOverlappingDataJobId') }
     }
 
     def 'Delete subscription removes last subscriber.'() {
@@ -174,7 +179,7 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase {
             def dataNodeSelector = '/parent[id=\\\"5\\\"]'
         and: 'a subscription created'
             def createEventPayload = createSubscriptionEventPayload('dataJobCreated', 'lastDataJobId', dataNodeSelector)
-            sendSubscriptionRequest(subscriptionTopic, 'lastDataJobId', createEventPayload, 'lastDataJobId')
+            sendSubscriptionRequest(subscriptionTopic, 'lastDataJobId', createEventPayload, 'lastDataJobId', 'dataJobCreated')
         and: 'data nodes is persisted '
             def dataNodes = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
                     "//subscription/dataJobId[text()=\'lastDataJobId\']", DIRECT_CHILDREN_ONLY)
@@ -182,7 +187,7 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase {
             assert dataNodes.iterator().next().leaves.dataNodeSelector == '/parent[id="5"]'
         when: 'a delete event is received for the subscription'
             def deleteEventPayload = createSubscriptionEventPayload('dataJobDeleted', 'lastDataJobId', dataNodeSelector)
-            sendSubscriptionRequest(subscriptionTopic, 'lastDataJobId', deleteEventPayload, 'lastDataJobId')
+            sendSubscriptionRequest(subscriptionTopic, 'lastDataJobId', deleteEventPayload, 'lastDataJobId', 'dataJobDeleted')
         then: 'the subscription is fully removed from persistence'
             def remainingDataNodeSelector = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
                     "//subscription/dataJobId[text()=\'lastDataJobId\']", DIRECT_CHILDREN_ONLY)
@@ -204,10 +209,10 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase {
             assert !existingSubscription.isEmpty()
         and: 'a new subscription'
             def createEventPayload1 = createSubscriptionEventPayload('dataJobCreated', 'id-to-remove', dataNodeSelector)
-            sendSubscriptionRequest(subscriptionTopic, 'id-to-remove', createEventPayload1, 'id-to-remove')
+            sendSubscriptionRequest(subscriptionTopic, 'id-to-remove', createEventPayload1, 'id-to-remove', 'dataJobCreated')
         when: 'a delete event is received'
             def deleteEventPayload = createSubscriptionEventPayload('dataJobDeleted', 'id-to-remove', dataNodeSelector)
-            sendSubscriptionRequest(subscriptionTopic, 'id-to-remove', deleteEventPayload, 'id-to-remove')
+            sendSubscriptionRequest(subscriptionTopic, 'id-to-remove', deleteEventPayload, 'id-to-remove', 'dataJobDeleted')
         then: 'the data job id does not exist in database'
             def resultForDeletedSubscription = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
                     "//subscription/dataJobId[text()='id-to-remove']", DIRECT_CHILDREN_ONLY)
@@ -226,7 +231,7 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase {
         given: 'an event payload'
             def deleteEventPayload = createSubscriptionEventPayload('dataJobDeleted', 'nonExistingDataJobId', '/nonExisting')
         when: 'a delete event is received for the non-existent subscription'
-            sendSubscriptionRequest(subscriptionTopic, 'nonExistingDataJobId', deleteEventPayload, 'myDataJobId')
+            sendSubscriptionRequest(subscriptionTopic, 'nonExistingDataJobId', deleteEventPayload, 'nonExistingDataJobId', 'dataJobDeleted')
         then: 'no exception is thrown'
             noExceptionThrown()
         and: 'nothing is sent to DMI'
@@ -253,7 +258,7 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase {
     def createAndAcceptSubscriptionA() {
         def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n/parent[id=\\\"3\\\"]/child'''
         def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'dataJobA', dataNodeSelector)
-        sendSubscriptionRequest(subscriptionTopic, 'dataJobA', eventPayload, 'dataJobA')
+        sendSubscriptionRequest(subscriptionTopic, 'dataJobA', eventPayload, 'dataJobA', 'dataJobCreated')
         sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-1', 'dataJobA#dmi-1')
         sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-2', 'dataJobA#dmi-2')
     }
@@ -261,15 +266,15 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase {
     def createAndAcceptSubscriptionB() {
         def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"3\\\"]/child\\n/parent[id=\\\"4\\\"]'''
         def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'dataJobB', dataNodeSelector)
-        sendSubscriptionRequest(subscriptionTopic, 'dataJobB', eventPayload, 'dataJobB')
+        sendSubscriptionRequest(subscriptionTopic, 'dataJobB', eventPayload, 'dataJobB', 'dataJobCreated')
         sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-2', 'dataJobB#dmi-2')
     }
 
-    def sendSubscriptionRequest(topic, eventKey, eventPayload, dataJobId) {
+    def sendSubscriptionRequest(topic, eventKey, eventPayload, dataJobId, eventType) {
         def event = new ProducerRecord<>(topic, eventKey, eventPayload)
         testRequestProducer.send(event)
-        //TODO Add polling within for log to report dataJobId is finished (separate commit)
-        sleep(2000)
+        def expectedMessageWhenFinishedProcessingEvent = 'NCMP In Event with eventType=' + eventType + ' has been Processed for dataJobId='+ dataJobId
+        assertEventProcessedBasedOnLogging(expectedMessageWhenFinishedProcessingEvent)
     }
 
     def sendDmiResponse(statusCode, statusMessage, eventType, eventSource, correlationId) {
@@ -284,7 +289,16 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase {
                 .withExtension('correlationid', correlationId).build()
         def event = new ProducerRecord<>(dmiOutTopic, 'key', cloudEvent)
         testResponseProducer.send(event)
-        sleep(2000)
+        def expectedMessageWhenFinishedProcessingEvent = 'Finished processing DMI subscription response event with details: | correlationId=' + correlationId + ' | eventType=' + eventType
+        assertEventProcessedBasedOnLogging(expectedMessageWhenFinishedProcessingEvent)
+    }
+
+    def assertEventProcessedBasedOnLogging(expectedMessageInLog) {
+        new PollingConditions().within(2) {
+            def messages = listAppender.list*.formattedMessage
+            def eventProcessed = messages.any { msg -> msg.contains(expectedMessageInLog) }
+            assert eventProcessed
+        }
     }
 
     def getAllConsumedCorrelationIds() {