From 3a73935f37342eb7a332e59f091c1efbad3f8921 Mon Sep 17 00:00:00 2001 From: shikha0203 Date: Tue, 11 Nov 2025 16:59:37 +0000 Subject: [PATCH] Replace sleep with polling in CmSubscriptionSpec Issue-ID: CPS-3037 Change-Id: If3da998b1f9a2d41ea774b2e323aa6a1aa51cd57 Signed-off-by: shikha0203 --- .../datajobs/subscription/dmi/EventConsumer.java | 4 +- .../subscription/ncmp/NcmpInEventConsumer.java | 4 +- .../subscription/CmSubscriptionSpec.groovy | 62 +++++++++++++--------- 3 files changed, 43 insertions(+), 27 deletions(-) diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventConsumer.java index 141c74a81e..aa351ad228 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventConsumer.java @@ -1,6 +1,6 @@ /* - * ============LICENSE_START======================================================= + * ===========LICENSE_START======================================================== * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -75,6 +75,8 @@ public class EventConsumer { } } } + log.info("Finished processing DMI subscription response event with details: | correlationId={} | eventType={}", + correlationId, eventType); } private CmSubscriptionStatus getCmSubscriptionStatus(final DataJobSubscriptionDmiOutEvent dmiOutEvent) { diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpInEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpInEventConsumer.java index cfa8ba6d4a..4e38e5e82a 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpInEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpInEventConsumer.java @@ -1,5 +1,5 @@ /* - * ============LICENSE_START======================================================= + * ===========LICENSE_START======================================================== * Copyright (c) 2024-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -63,7 +63,7 @@ public class NcmpInEventConsumer { default -> log.warn("Unknown eventType={} for dataJobId={}", eventType, dataJobId); } } finally { - log.info("NCMP In Event has been Processed for dataJobId={}", dataJobId); + log.info("NCMP In Event with eventType={} has been Processed for dataJobId={}", eventType, dataJobId); } } diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/datajobs/subscription/CmSubscriptionSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/datajobs/subscription/CmSubscriptionSpec.groovy index d122c0e66a..263a245b14 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/datajobs/subscription/CmSubscriptionSpec.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/datajobs/subscription/CmSubscriptionSpec.groovy @@ -32,11 +32,13 @@ import java.time.Duration import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.StringSerializer import org.onap.cps.integration.base.CpsIntegrationSpecBase +import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.EventConsumer import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.NcmpInEventConsumer import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Value +import spock.util.concurrent.PollingConditions class CmSubscriptionSpec extends CpsIntegrationSpecBase { @@ -57,7 +59,8 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase { def testResponseProducer def listAppender = new ListAppender() - def logger = (Logger) LoggerFactory.getLogger(NcmpInEventConsumer) + def ncmpInEventLogger = (Logger) LoggerFactory.getLogger(NcmpInEventConsumer) + def dmiEventConsumerLogger = (Logger) LoggerFactory.getLogger(EventConsumer) def setup() { registerCmHandlesForSubscriptions() @@ -68,11 +71,13 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase { testRequestProducer = kafkaTestContainer.createProducer('test-client-id', StringSerializer.class) testResponseProducer = kafkaTestContainer.createProducer('test-client-id', CloudEventSerializer.class) listAppender.start() - logger.addAppender(listAppender) + ncmpInEventLogger.addAppender(listAppender) + dmiEventConsumerLogger.addAppender(listAppender) } def cleanup() { - logger.detachAndStopAllAppenders() + ncmpInEventLogger.detachAndStopAllAppenders() + dmiEventConsumerLogger.detachAndStopAllAppenders() dmiInConsumer.unsubscribe() dmiInConsumer.close() testRequestProducer.close() @@ -92,7 +97,7 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase { def eventDataNodeSelector = (dmi1DataNodeSelector + dmi2DataNodeSelector) def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'myDataJobId01', eventDataNodeSelector) when: 'a subscription create request is sent' - sendSubscriptionRequest(subscriptionTopic, 'key', eventPayload, 'myDataJobId01') + sendSubscriptionRequest(subscriptionTopic, 'key', eventPayload, 'myDataJobId01', 'dataJobCreated') then: 'log shows event is consumed by ncmp' def messages = listAppender.list*.formattedMessage messages.any { msg -> msg.contains('myDataJobId01') && msg.contains('dataJobCreated') } @@ -108,8 +113,8 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase { def 'Create subscription accepted by DMI.'() { given: 'a persisted subscription' - def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'newDataJob', '/parent[id=\\\'0\\\']\\n') - sendSubscriptionRequest(subscriptionTopic, 'some key', eventPayload, 'newDataJob') + def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'newDataJob', '/parent[id=\\\"0\\\"]\\n') + sendSubscriptionRequest(subscriptionTopic, 'some key', eventPayload, 'newDataJob', 'dataJobCreated') when: 'dmi accepts the subscription create request' sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-0', 'newDataJob#dmi-0') then: 'there are no more inactive data node selector for given datajob id' @@ -129,7 +134,7 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase { def eventDataNodeSelector = (overlappingDmi1DataNodeSelector + newDmi2DataNodeSelector) def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'partialOverlappingDataJobId', eventDataNodeSelector) when: 'create request event for overlapping subscription is sent' - sendSubscriptionRequest(subscriptionTopic, 'some key', eventPayload, 'partialOverlappingDataJobId') + sendSubscriptionRequest(subscriptionTopic, 'some key', eventPayload, 'partialOverlappingDataJobId', 'dataJobCreated') then: 'log shows event is consumed by ncmp' def messages = listAppender.list*.formattedMessage messages.any { msg -> msg.contains('partialOverlappingDataJobId') && msg.contains('dataJobCreated') } @@ -149,24 +154,24 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase { given: 'a new data node selector' def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n''' and: 'an event payload' - def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'fullyOverlappingDataJob', dataNodeSelector) + def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'fullyOverlappingDataJobId', dataNodeSelector) and: 'existing active subscriptions in database' createAndAcceptSubscriptionA() createAndAcceptSubscriptionB() when: 'a new subscription create request is sent' - sendSubscriptionRequest(subscriptionTopic, 'fullyOverlappingDataJob', eventPayload, 'myDataJobId') + sendSubscriptionRequest(subscriptionTopic, 'some key', eventPayload, 'fullyOverlappingDataJobId', 'dataJobCreated') then: 'log shows event is consumed by ncmp' def messages = listAppender.list*.formattedMessage - messages.any { msg -> msg.contains('fullyOverlappingDataJob') && msg.contains('dataJobCreated') } + messages.any { msg -> msg.contains('fullyOverlappingDataJobId') && msg.contains('dataJobCreated') } and: 'the 2 data node selectors for the given data job id is persisted' assert cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', - '//subscription/dataJobId[text()=\'fullyOverlappingDataJob\']', DIRECT_CHILDREN_ONLY).size() == 2 + '//subscription/dataJobId[text()=\'fullyOverlappingDataJobId\']', DIRECT_CHILDREN_ONLY).size() == 2 and: 'there are no inactive data node selector' - assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('fullyOverlappingDataJob').size() == 0 + assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('fullyOverlappingDataJobId').size() == 0 and: 'get correlation ids from event sent to DMIs' def correlationIds = getAllConsumedCorrelationIds() and: 'there is no correlation IDs (event) for any dmi' - assert !correlationIds.any { correlationId -> correlationId.startsWith('fullyOverlappingDataJob') } + assert !correlationIds.any { correlationId -> correlationId.startsWith('fullyOverlappingDataJobId') } } def 'Delete subscription removes last subscriber.'() { @@ -174,7 +179,7 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase { def dataNodeSelector = '/parent[id=\\\"5\\\"]' and: 'a subscription created' def createEventPayload = createSubscriptionEventPayload('dataJobCreated', 'lastDataJobId', dataNodeSelector) - sendSubscriptionRequest(subscriptionTopic, 'lastDataJobId', createEventPayload, 'lastDataJobId') + sendSubscriptionRequest(subscriptionTopic, 'lastDataJobId', createEventPayload, 'lastDataJobId', 'dataJobCreated') and: 'data nodes is persisted ' def dataNodes = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', "//subscription/dataJobId[text()=\'lastDataJobId\']", DIRECT_CHILDREN_ONLY) @@ -182,7 +187,7 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase { assert dataNodes.iterator().next().leaves.dataNodeSelector == '/parent[id="5"]' when: 'a delete event is received for the subscription' def deleteEventPayload = createSubscriptionEventPayload('dataJobDeleted', 'lastDataJobId', dataNodeSelector) - sendSubscriptionRequest(subscriptionTopic, 'lastDataJobId', deleteEventPayload, 'lastDataJobId') + sendSubscriptionRequest(subscriptionTopic, 'lastDataJobId', deleteEventPayload, 'lastDataJobId', 'dataJobDeleted') then: 'the subscription is fully removed from persistence' def remainingDataNodeSelector = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', "//subscription/dataJobId[text()=\'lastDataJobId\']", DIRECT_CHILDREN_ONLY) @@ -204,10 +209,10 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase { assert !existingSubscription.isEmpty() and: 'a new subscription' def createEventPayload1 = createSubscriptionEventPayload('dataJobCreated', 'id-to-remove', dataNodeSelector) - sendSubscriptionRequest(subscriptionTopic, 'id-to-remove', createEventPayload1, 'id-to-remove') + sendSubscriptionRequest(subscriptionTopic, 'id-to-remove', createEventPayload1, 'id-to-remove', 'dataJobCreated') when: 'a delete event is received' def deleteEventPayload = createSubscriptionEventPayload('dataJobDeleted', 'id-to-remove', dataNodeSelector) - sendSubscriptionRequest(subscriptionTopic, 'id-to-remove', deleteEventPayload, 'id-to-remove') + sendSubscriptionRequest(subscriptionTopic, 'id-to-remove', deleteEventPayload, 'id-to-remove', 'dataJobDeleted') then: 'the data job id does not exist in database' def resultForDeletedSubscription = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', "//subscription/dataJobId[text()='id-to-remove']", DIRECT_CHILDREN_ONLY) @@ -226,7 +231,7 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase { given: 'an event payload' def deleteEventPayload = createSubscriptionEventPayload('dataJobDeleted', 'nonExistingDataJobId', '/nonExisting') when: 'a delete event is received for the non-existent subscription' - sendSubscriptionRequest(subscriptionTopic, 'nonExistingDataJobId', deleteEventPayload, 'myDataJobId') + sendSubscriptionRequest(subscriptionTopic, 'nonExistingDataJobId', deleteEventPayload, 'nonExistingDataJobId', 'dataJobDeleted') then: 'no exception is thrown' noExceptionThrown() and: 'nothing is sent to DMI' @@ -253,7 +258,7 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase { def createAndAcceptSubscriptionA() { def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n/parent[id=\\\"3\\\"]/child''' def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'dataJobA', dataNodeSelector) - sendSubscriptionRequest(subscriptionTopic, 'dataJobA', eventPayload, 'dataJobA') + sendSubscriptionRequest(subscriptionTopic, 'dataJobA', eventPayload, 'dataJobA', 'dataJobCreated') sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-1', 'dataJobA#dmi-1') sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-2', 'dataJobA#dmi-2') } @@ -261,15 +266,15 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase { def createAndAcceptSubscriptionB() { def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"3\\\"]/child\\n/parent[id=\\\"4\\\"]''' def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'dataJobB', dataNodeSelector) - sendSubscriptionRequest(subscriptionTopic, 'dataJobB', eventPayload, 'dataJobB') + sendSubscriptionRequest(subscriptionTopic, 'dataJobB', eventPayload, 'dataJobB', 'dataJobCreated') sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-2', 'dataJobB#dmi-2') } - def sendSubscriptionRequest(topic, eventKey, eventPayload, dataJobId) { + def sendSubscriptionRequest(topic, eventKey, eventPayload, dataJobId, eventType) { def event = new ProducerRecord<>(topic, eventKey, eventPayload) testRequestProducer.send(event) - //TODO Add polling within for log to report dataJobId is finished (separate commit) - sleep(2000) + def expectedMessageWhenFinishedProcessingEvent = 'NCMP In Event with eventType=' + eventType + ' has been Processed for dataJobId='+ dataJobId + assertEventProcessedBasedOnLogging(expectedMessageWhenFinishedProcessingEvent) } def sendDmiResponse(statusCode, statusMessage, eventType, eventSource, correlationId) { @@ -284,7 +289,16 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase { .withExtension('correlationid', correlationId).build() def event = new ProducerRecord<>(dmiOutTopic, 'key', cloudEvent) testResponseProducer.send(event) - sleep(2000) + def expectedMessageWhenFinishedProcessingEvent = 'Finished processing DMI subscription response event with details: | correlationId=' + correlationId + ' | eventType=' + eventType + assertEventProcessedBasedOnLogging(expectedMessageWhenFinishedProcessingEvent) + } + + def assertEventProcessedBasedOnLogging(expectedMessageInLog) { + new PollingConditions().within(2) { + def messages = listAppender.list*.formattedMessage + def eventProcessed = messages.any { msg -> msg.contains(expectedMessageInLog) } + assert eventProcessed + } } def getAllConsumedCorrelationIds() { -- 2.16.6