From e0df0b0d86c8a85c50d4a4cda3f95049a3e83801 Mon Sep 17 00:00:00 2001 From: emaclee Date: Fri, 3 Oct 2025 17:55:33 +0100 Subject: [PATCH] Add integration test for create subscription - add test for creating subscription and sending to multiple DMIs - update status of a subscription based on dmi response event - add test for creating subscription wherein it partailly overlaps with existing active subscriptions - add test for creating subscription wherein it fully overlaps with existing active subscriptions Issue-ID: CPS-2995 Change-Id: Iabbd54168ab8b9b241c357519e65e206440533dd Signed-off-by: emaclee --- .../subscription/CmSubscriptionSpec.groovy | 243 +++++++++++++++++++++ .../performance/base/NcmpPerfTestBase.groovy | 2 +- .../onap/cps/integration/KafkaTestContainer.java | 20 ++ .../cm-data-subscriptions@2023-09-21.yang | 0 .../subscription/createSubscriptionEvent.json | 22 ++ .../subscription/dmiSubscriptionResponseEvent.json | 6 + 6 files changed, 292 insertions(+), 1 deletion(-) create mode 100644 integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/datajobs/subscription/CmSubscriptionSpec.groovy rename integration-test/src/test/resources/data/{cm-data-subscriptions => datajobs}/cm-data-subscriptions@2023-09-21.yang (100%) create mode 100644 integration-test/src/test/resources/data/datajobs/subscription/createSubscriptionEvent.json create mode 100644 integration-test/src/test/resources/data/datajobs/subscription/dmiSubscriptionResponseEvent.json diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/datajobs/subscription/CmSubscriptionSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/datajobs/subscription/CmSubscriptionSpec.groovy new file mode 100644 index 0000000000..5a13cb2b3d --- /dev/null +++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/datajobs/subscription/CmSubscriptionSpec.groovy @@ -0,0 +1,243 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the 'License'); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an 'AS IS' BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.integration.functional.ncmp.datajobs.subscription + +import static org.onap.cps.api.parameters.FetchDescendantsOption.DIRECT_CHILDREN_ONLY + +import ch.qos.logback.classic.spi.ILoggingEvent +import ch.qos.logback.core.read.ListAppender +import io.cloudevents.core.builder.CloudEventBuilder +import io.cloudevents.kafka.CloudEventSerializer +import io.cloudevents.kafka.CloudEventDeserializer +import java.nio.charset.StandardCharsets +import java.time.Duration +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.serialization.StringSerializer +import org.onap.cps.integration.base.CpsIntegrationSpecBase +import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.NcmpInEventConsumer +import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService +import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Value + +class CmSubscriptionSpec extends CpsIntegrationSpecBase { + + @Autowired + CmDataJobSubscriptionPersistenceService cmDataJobSubscriptionPersistenceService + + @Value('${app.ncmp.avc.cm-subscription-ncmp-in}') + def subscriptionTopic + + @Value('${app.ncmp.avc.cm-subscription-dmi-out}') + def dmiOutTopic + + @Value('${app.ncmp.avc.cm-subscription-dmi-in}') + def dmiInTopic + + def dmiInConsumer + def testRequestProducer + def testResponseProducer + + def listAppender = new ListAppender() + def logger + + def setup() { + registerCmHandlesForSubscriptions() + kafkaTestContainer.start() + dmiInConsumer = kafkaTestContainer.getConsumer('test-group', CloudEventDeserializer.class) + dmiInConsumer.subscribe([dmiInTopic]) + dmiInConsumer.poll(Duration.ofMillis(500)) + testRequestProducer = kafkaTestContainer.createProducer('test-client-id', StringSerializer.class) + testResponseProducer = kafkaTestContainer.createProducer('test-client-id', CloudEventSerializer.class) + logger = LoggerFactory.getLogger(NcmpInEventConsumer) + listAppender.start() + logger.addAppender(listAppender) + } + + def cleanup() { + dmiInConsumer.unsubscribe() + dmiInConsumer.close() + testRequestProducer.close() + testResponseProducer.close() + kafkaTestContainer.close() + deregisterCmHandles('dmi-0', ['cmHandle0']) + deregisterCmHandles('dmi-1', ['cmHandle1', 'cmHandle2']) + deregisterCmHandles('dmi-2', ['cmHandle3', 'cmHandle4']) + } + + def 'Create subscription and send to multiple DMIs'() { + given: 'a data node selector on DMI-1' + def dmi1DataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n''' + and: 'a data node selector on DMI-2' + def dmi2DataNodeSelector = '/parent[id=\\\"3\\\"]/child' + and: 'an event payload' + def eventDataNodeSelector = (dmi1DataNodeSelector + dmi2DataNodeSelector) + def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'myDataJobId', eventDataNodeSelector) + when: 'a subscription create request is sent' + sendSubscriptionCreateRequest(subscriptionTopic, 'key', eventPayload) + then: 'log shows event is consumed by ncmp' + def messages = listAppender.list*.formattedMessage + messages.any { msg -> msg.contains('myDataJobId') && msg.contains('dataJobCreated')} + and: 'the 3 different data node selectors for the given data job id is persisted' + assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('myDataJobId').size() == 3 + and: 'get correlation ids from event sent to DMIs' + def correlationIds = getAllConsumedCorrelationIds() + and: 'there is correlation IDs (event) for each affected dmi (DMI-1, DMI-2)' + assert correlationIds.size() == 2 + assert correlationIds.containsAll(['myDataJobId#dmi-1', 'myDataJobId#dmi-2']) + } + + def 'Update subscription status'() { + given: 'a persisted subscription' + def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'newDataJob', '/parent[id=\\\'0\\\']\\n') + sendSubscriptionCreateRequest(subscriptionTopic, 'newDataJob', eventPayload) + when: 'dmi accepts the subscription create request' + sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-0', 'newDataJob#dmi-0') + then: 'there are no more inactive data node selector for given datajob id' + assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('newDataJob').size() == 0 + and: 'status for the data node selector for given data job id is ACCEPTED' + def affectedDataNodes = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', + '//subscription/dataJobId[text()=\'newDataJob\']', DIRECT_CHILDREN_ONLY) + assert affectedDataNodes.leaves.every( entry -> entry.get('status') == 'ACCEPTED') + } + + def 'Create new subscription which partially overlaps with an existing active subscription'() { + given: 'an existing data node selector on DMI-1' + def existingDmi1DataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"3\\\"]/child\\n''' + and: 'a new data node selector on DMI-2' + def newDmi2DataNodeSelector = '/parent[id=\\\"4\\\"]' + and: 'an event payload' + def eventDataNodeSelector = (existingDmi1DataNodeSelector + newDmi2DataNodeSelector) + def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'partialOverlappingDataJob', eventDataNodeSelector) + and: 'an active subscription in database' + createAndAcceptSubscriptionA() + when: 'a new subscription create request is sent' + sendSubscriptionCreateRequest(subscriptionTopic, 'partialOverlappingDataJob', eventPayload) + then: 'log shows event is consumed by ncmp' + def messages = listAppender.list*.formattedMessage + messages.any { msg -> msg.contains('partialOverlappingDataJob') && msg.contains('dataJobCreated')} + and: 'the 3 data node selectors for the given data job id is persisted' + assert cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', + '//subscription/dataJobId[text()=\'partialOverlappingDataJob\']', DIRECT_CHILDREN_ONLY).size() == 3 + and: 'only one data node selector is not active' + assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('partialOverlappingDataJob').size() == 1 + and: 'get correlation ids from event sent to DMIs' + def correlationIds = getAllConsumedCorrelationIds() + and: 'there is correlation IDs (event) for only the affected dmi (DMI-2)' + assert !correlationIds.contains('partialOverlappingDataJob#dmi-1') + assert correlationIds.contains('partialOverlappingDataJob#dmi-2') + } + + def 'Create new subscription which completely overlaps with an active existing subscriptions'() { + given: 'a new data node selector' + def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n''' + and: 'an event payload' + def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'fullyOverlappingDataJob', dataNodeSelector) + and: 'existing active subscriptions in database' + createAndAcceptSubscriptionA() + createAndAcceptSubscriptionB() + when: 'a new subscription create request is sent' + sendSubscriptionCreateRequest(subscriptionTopic, 'fullyOverlappingDataJob', eventPayload) + then: 'log shows event is consumed by ncmp' + def messages = listAppender.list*.formattedMessage + messages.any { msg -> msg.contains('fullyOverlappingDataJob') && msg.contains('dataJobCreated')} + and: 'the 2 data node selectors for the given data job id is persisted' + assert cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', + '//subscription/dataJobId[text()=\'fullyOverlappingDataJob\']', DIRECT_CHILDREN_ONLY).size() == 2 + and: 'there are no inactive data node selector' + assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('fullyOverlappingDataJob').size() == 0 + and: 'get correlation ids from event sent to DMIs' + def correlationIds = getAllConsumedCorrelationIds() + and: 'there is no correlation IDs (event) for any dmi' + assert !correlationIds.any { correlationId -> correlationId.startsWith('fullyOverlappingDataJob') } + } + + def registerCmHandlesForSubscriptions() { + registerCmHandle('dmi-0', 'cmHandle0', '','/parent=0') + registerCmHandle('dmi-1', 'cmHandle1', '','/parent=1') + registerCmHandle('dmi-1', 'cmHandle2', '','/parent=2') + registerCmHandle('dmi-2', 'cmHandle3', '','/parent=3') + registerCmHandle('dmi-2', 'cmHandle4', '','/parent=4') + } + + def createSubscriptionEventPayload(eventType, dataJobId, dataNodeSelector) { + def eventPayload = readResourceDataFile('datajobs/subscription/createSubscriptionEvent.json') + eventPayload = eventPayload.replace('#eventType', eventType) + eventPayload = eventPayload.replace('#dataJobId', dataJobId) + eventPayload = eventPayload.replace('#dataNodeSelector', dataNodeSelector) + return eventPayload + } + + def createAndAcceptSubscriptionA() { + def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n/parent[id=\\\"3\\\"]/child''' + def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'dataJobA', dataNodeSelector) + sendSubscriptionCreateRequest(subscriptionTopic, 'dataJobA', eventPayload) + sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-1', 'dataJobA#dmi-1') + sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-2', 'dataJobA#dmi-2') + } + + def createAndAcceptSubscriptionB() { + def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"3\\\"]/child\\n/parent[id=\\\"4\\\"]''' + def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'dataJobB', dataNodeSelector) + sendSubscriptionCreateRequest(subscriptionTopic, 'dataJobB', eventPayload) + sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-2', 'dataJobB#dmi-2') + } + + def sendSubscriptionCreateRequest(topic, eventKey, eventPayload) { + def event = new ProducerRecord<>(topic, eventKey, eventPayload); + testRequestProducer.send(event) + sleep(1000) + } + + def sendDmiResponse(statusCode, statusMessage, eventType, eventSource, correlationId) { + def eventPayload = readResourceDataFile('datajobs/subscription/dmiSubscriptionResponseEvent.json') + eventPayload = eventPayload.replace('#statusCode', statusCode) + eventPayload = eventPayload.replace('#statusMessage', statusMessage) + def cloudEvent = CloudEventBuilder.v1() + .withData(eventPayload.getBytes(StandardCharsets.UTF_8)) + .withId('random-uuid') + .withType(eventType) + .withSource(URI.create(eventSource)) + .withExtension('correlationid', correlationId).build() + def event = new ProducerRecord<>(dmiOutTopic, 'key', cloudEvent); + testResponseProducer.send(event) + sleep(2000) + } + + def getAllConsumedCorrelationIds() { + def consumedEvents = dmiInConsumer.poll(Duration.ofMillis(1000)) + def headersMap = getAllHeaders(consumedEvents) + return headersMap.get('ce_correlationid') + } + + def getAllHeaders(consumedEvents) { + def headersMap = [:].withDefault { [] } + consumedEvents.each { event -> + event.headers().each { header -> + def key = header.key() + def value = new String(header.value()) + headersMap[key] << value + } + + } + return headersMap + } +} diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/performance/base/NcmpPerfTestBase.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/performance/base/NcmpPerfTestBase.groovy index 585d62cf88..e5b159e0f1 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/performance/base/NcmpPerfTestBase.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/performance/base/NcmpPerfTestBase.groovy @@ -100,7 +100,7 @@ class NcmpPerfTestBase extends PerfTestBase { } def createCmDataSubscriptionsSchemaSet() { - def modelAsString = readResourceDataFile('cm-data-subscriptions/cm-data-subscriptions@2023-09-21.yang') + def modelAsString = readResourceDataFile('datajobs/cm-data-subscriptions@2023-09-21.yang') cpsModuleService.createSchemaSet(NCMP_PERFORMANCE_TEST_DATASPACE, CM_DATA_SUBSCRIPTIONS_SCHEMA_SET, [registry: modelAsString]) } diff --git a/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java b/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java index 60c1637c5a..6494ab92ed 100644 --- a/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java +++ b/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java @@ -24,7 +24,10 @@ import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.testcontainers.kafka.ConfluentKafkaContainer; /** @@ -69,6 +72,23 @@ public class KafkaTestContainer extends ConfluentKafkaContainer { return new KafkaConsumer<>(consumerProperties(consumerGroupId, valueDeserializer)); } + /** + * Create kafka producer. + * + * @param clientId client id + * @param valueSerializer value serializer + * @return kafka test producer + */ + public static KafkaProducer createProducer(final String clientId, final Object valueSerializer) { + final Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaTestContainer.getBootstrapServers()); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); + configProps.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); + + return new KafkaProducer<>(configProps); + } + @Override public void start() { if (!isRunning()) { diff --git a/integration-test/src/test/resources/data/cm-data-subscriptions/cm-data-subscriptions@2023-09-21.yang b/integration-test/src/test/resources/data/datajobs/cm-data-subscriptions@2023-09-21.yang similarity index 100% rename from integration-test/src/test/resources/data/cm-data-subscriptions/cm-data-subscriptions@2023-09-21.yang rename to integration-test/src/test/resources/data/datajobs/cm-data-subscriptions@2023-09-21.yang diff --git a/integration-test/src/test/resources/data/datajobs/subscription/createSubscriptionEvent.json b/integration-test/src/test/resources/data/datajobs/subscription/createSubscriptionEvent.json new file mode 100644 index 0000000000..9f4028aeff --- /dev/null +++ b/integration-test/src/test/resources/data/datajobs/subscription/createSubscriptionEvent.json @@ -0,0 +1,22 @@ +{ + "eventType": "#eventType", + "event": { + "dataJob": { + "id": "#dataJobId", + "productionJobDefinition": { + "targetSelector": { + "dataNodeSelector": "#dataNodeSelector" + }, + "dataSelector": { + "notificationTypes": [ + "" + ], + "notificationFilter": "" + } + } + }, + "dataType": { + "dataTypeId": "" + } + } +} \ No newline at end of file diff --git a/integration-test/src/test/resources/data/datajobs/subscription/dmiSubscriptionResponseEvent.json b/integration-test/src/test/resources/data/datajobs/subscription/dmiSubscriptionResponseEvent.json new file mode 100644 index 0000000000..e57761e06f --- /dev/null +++ b/integration-test/src/test/resources/data/datajobs/subscription/dmiSubscriptionResponseEvent.json @@ -0,0 +1,6 @@ +{ + "data": { + "statusCode": "#statusCode", + "statusMessage": "#statusMessage" + } +} \ No newline at end of file -- 2.16.6