From 5765f951f81688805b61eb8313bd0469f30534ae Mon Sep 17 00:00:00 2001 From: shikha0203 Date: Mon, 25 Aug 2025 11:43:08 +0100 Subject: [PATCH] updating NcmpInEventConsumer to only log the event information -event handling will be done in next commit -rebased Issue-ID: CPS-2893 Change-Id: I6fe334ce66ef2b4f9e6f7916083821a73a5f7e8f Signed-off-by: shikha0203 --- .../ncmp/NcmpInEventConsumer.java | 52 +++++------- .../ncmp/NcmpInEventConsumerSpec.groovy | 94 +++++----------------- .../sample_dataJobSubscriptionInEvent.json | 22 +++++ 3 files changed, 65 insertions(+), 103 deletions(-) create mode 100644 cps-ncmp-service/src/test/resources/sample_dataJobSubscriptionInEvent.json diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java index b3fb133432..83c21099ca 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved. + * Copyright (c) 2024-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -9,7 +9,7 @@ * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, + * distributed under the License is distributed on an 'AS IS' BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. @@ -20,15 +20,11 @@ package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp; -import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent; - -import io.cloudevents.CloudEvent; import java.util.List; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.NcmpInEvent; -import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate; +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.DataJobSubscriptionOperationInEvent; +import org.onap.cps.ncmp.impl.utils.JexParser; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @@ -39,34 +35,28 @@ import org.springframework.stereotype.Component; @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) public class NcmpInEventConsumer { - private final CmSubscriptionHandler cmSubscriptionHandler; - /** * Consume the specified event. * - * @param ncmpInEventAsConsumerRecord the event to be consumed + * @param dataJobSubscriptionOperationInEvent the event to be consumed */ @KafkaListener(topics = "${app.ncmp.avc.cm-subscription-ncmp-in}", - containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory") - public void consumeSubscriptionEvent(final ConsumerRecord ncmpInEventAsConsumerRecord) { - final CloudEvent cloudEvent = ncmpInEventAsConsumerRecord.value(); - final NcmpInEvent ncmpInEvent = toTargetEvent(cloudEvent, NcmpInEvent.class); - if (ncmpInEvent != null) { - log.info("Subscription with name {} to be mapped to hazelcast object...", - ncmpInEvent.getData().getSubscriptionId()); + containerFactory = "legacyEventConcurrentKafkaListenerContainerFactory", + properties = {"spring.json.value.default.type=" + + "org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0" + + ".client_to_ncmp.DataJobSubscriptionOperationInEvent"}) + public void consumeSubscriptionEvent( + final DataJobSubscriptionOperationInEvent dataJobSubscriptionOperationInEvent) { + + final String eventType = dataJobSubscriptionOperationInEvent.getEventType(); + final String dataNodeSelector = dataJobSubscriptionOperationInEvent.getEvent().getDataJob() + .getProductionJobDefinition().getTargetSelector().getDataNodeSelector(); + final List fdns = JexParser.extractFdnsFromLocationPaths(dataNodeSelector); + final String dataJobId = dataJobSubscriptionOperationInEvent.getEvent().getDataJob().getId(); + final String dataTypeId = dataJobSubscriptionOperationInEvent.getEvent().getDataType() != null + ? dataJobSubscriptionOperationInEvent.getEvent().getDataType().getDataTypeId() : "UNKNOWN"; - final String subscriptionId = ncmpInEvent.getData().getSubscriptionId(); - final List predicates = ncmpInEvent.getData().getPredicates(); - if ("subscriptionCreateRequest".equals(cloudEvent.getType())) { - log.info("Subscription create request for source {} with subscription id {} ...", - cloudEvent.getSource(), subscriptionId); - cmSubscriptionHandler.processSubscriptionCreateRequest(subscriptionId, predicates); - } - if ("subscriptionDeleteRequest".equals(cloudEvent.getType())) { - log.info("Subscription delete request for source {} with subscription id {} ...", - cloudEvent.getSource(), subscriptionId); - cmSubscriptionHandler.processSubscriptionDeleteRequest(subscriptionId); - } - } + log.info("Consumed subscription event with details: | jobId={} | eventType={} | fdns={} | dataType={}", + dataJobId, eventType, fdns, dataTypeId); } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumerSpec.groovy index 3b8fb0f756..9e4602856e 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumerSpec.groovy @@ -25,23 +25,19 @@ import ch.qos.logback.classic.Logger import ch.qos.logback.classic.spi.ILoggingEvent import ch.qos.logback.core.read.ListAppender import com.fasterxml.jackson.databind.ObjectMapper -import io.cloudevents.CloudEvent -import io.cloudevents.core.builder.CloudEventBuilder -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.NcmpInEvent +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.DataJobSubscriptionOperationInEvent import org.onap.cps.ncmp.utils.TestUtils -import org.onap.cps.ncmp.utils.events.MessagingBaseSpec import org.onap.cps.utils.JsonObjectMapper import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest +import spock.lang.Specification @SpringBootTest(classes = [ObjectMapper, JsonObjectMapper]) -class NcmpInEventConsumerSpec extends MessagingBaseSpec { +class NcmpInEventConsumerSpec extends Specification { - def mockCmSubscriptionHandler = Mock(CmSubscriptionHandler) - def objectUnderTest = new NcmpInEventConsumer(mockCmSubscriptionHandler) - def logger = Spy(ListAppender) + def objectUnderTest = new NcmpInEventConsumer() + def logger = new ListAppender() @Autowired JsonObjectMapper jsonObjectMapper @@ -58,69 +54,23 @@ class NcmpInEventConsumerSpec extends MessagingBaseSpec { ((Logger) LoggerFactory.getLogger(NcmpInEventConsumer.class)).detachAndStopAllAppenders() } - - def 'Consume valid CmNotificationSubscriptionNcmpInEvent create message.'() { - given: 'a cmNotificationSubscription event' - def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json') - def testEventSent = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent) - def testCloudEventSent = CloudEventBuilder.v1() - .withData(objectMapper.writeValueAsBytes(testEventSent)) - .withId('subscriptionCreated') - .withType('subscriptionCreateRequest') - .withSource(URI.create('some-resource')) - .withExtension('correlationid', 'test-cmhandle1').build() - def consumerRecord = new ConsumerRecord('topic-name', 0, 0, 'event-key', testCloudEventSent) - when: 'the valid event is consumed' - objectUnderTest.consumeSubscriptionEvent(consumerRecord) - then: 'an event is logged with level INFO' - def loggingEvent = getLoggingEvent() + def 'Consuming CM Data Notification #scenario data type id.'() { + given: 'a JSON file containing a subscription event' + def jsonData = TestUtils.getResourceFileContent('sample_dataJobSubscriptionInEvent.json') + jsonData = jsonData.replace('#dataTypeId', dataTypeId) + def event = objectMapper.readValue(jsonData, DataJobSubscriptionOperationInEvent) + when: 'the event is consumed' + objectUnderTest.consumeSubscriptionEvent(event) + then: 'event details are logged at level INFO' + def loggingEvent = logger.list.last() assert loggingEvent.level == Level.INFO - and: 'the log indicates the task completed successfully' - assert loggingEvent.formattedMessage == 'Subscription create request for source some-resource with subscription id test-id ...' - and: 'the subscription handler service is called once' - 1 * mockCmSubscriptionHandler.processSubscriptionCreateRequest('test-id',_) + assert loggingEvent.formattedMessage.contains('jobId=my job id') + assert loggingEvent.formattedMessage.contains('eventType=my event type') + assert loggingEvent.formattedMessage.contains("dataType=${dataTypeId}") + assert loggingEvent.formattedMessage.contains('fdns=[/SubNetwork=SN1]') + where: 'the following data type ids are used' + scenario | dataTypeId + 'with' | 'my data type' + 'without' | 'null' } - - def 'Consume valid CmNotificationSubscriptionNcmpInEvent delete message.'() { - given: 'a cmNotificationSubscription event' - def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json') - def testEventSent = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent) - def testCloudEventSent = CloudEventBuilder.v1() - .withData(objectMapper.writeValueAsBytes(testEventSent)) - .withId('sub-id') - .withType('subscriptionDeleteRequest') - .withSource(URI.create('some-resource')) - .withExtension('correlationid', 'test-cmhandle1').build() - def consumerRecord = new ConsumerRecord('topic-name', 0, 0, 'event-key', testCloudEventSent) - when: 'the valid event is consumed' - objectUnderTest.consumeSubscriptionEvent(consumerRecord) - then: 'an event is logged with level INFO' - def loggingEvent = getLoggingEvent() - assert loggingEvent.level == Level.INFO - and: 'the log indicates the task completed successfully' - assert loggingEvent.formattedMessage == 'Subscription delete request for source some-resource with subscription id test-id ...' - and: 'the subscription handler service is called once' - 1 * mockCmSubscriptionHandler.processSubscriptionDeleteRequest('test-id') - } - - def 'Attempt to consume unsupported Event.'() { - given: 'a unsupported event with a valid supported type' - def unsupportedEvent = Mock(CloudEvent) - def cloudEventWithUnsupportedEvent = CloudEventBuilder.v1() - .withId('some id') - .withType('subscriptionCreateRequest') // this is valid but does not match the event object - .withSource(URI.create('some-resource')) - .withData(objectMapper.writeValueAsBytes(unsupportedEvent)).build() - def consumerRecord = new ConsumerRecord('some topic', 0, 0, 'some key', cloudEventWithUnsupportedEvent) - when: 'attempt to consume the unsupported event' - objectUnderTest.consumeSubscriptionEvent(consumerRecord) - then: 'the subscription handler service is not called at all' - 0 * mockCmSubscriptionHandler.processSubscriptionDeleteRequest(*_) - 0 * mockCmSubscriptionHandler.processSubscriptionCreateRequest(*_) - } - - def getLoggingEvent() { - return logger.list[1] - } - } diff --git a/cps-ncmp-service/src/test/resources/sample_dataJobSubscriptionInEvent.json b/cps-ncmp-service/src/test/resources/sample_dataJobSubscriptionInEvent.json new file mode 100644 index 0000000000..f81e7e1215 --- /dev/null +++ b/cps-ncmp-service/src/test/resources/sample_dataJobSubscriptionInEvent.json @@ -0,0 +1,22 @@ +{ + "eventType": "my event type", + "event": { + "dataJob": { + "id": "my job id", + "productionJobDefinition": { + "targetSelector": { + "dataNodeSelector": "/SubNetwork[id=\"SN1\"]" + }, + "dataSelector": { + "notificationTypes": [ + "" + ], + "notificationFilter": "" + } + } + }, + "dataType": { + "dataTypeId": "#dataTypeId" + } + } +} \ No newline at end of file -- 2.16.6