updating NcmpInEventConsumer to only log the event information 41/141941/8
authorshikha0203 <shivani.khare@est.tech>
Mon, 25 Aug 2025 10:43:08 +0000 (11:43 +0100)
committershikha0203 <shivani.khare@est.tech>
Fri, 29 Aug 2025 10:13:45 +0000 (11:13 +0100)
-event handling will be done in next commit
-rebased

Issue-ID: CPS-2893
Change-Id: I6fe334ce66ef2b4f9e6f7916083821a73a5f7e8f
Signed-off-by: shikha0203 <shivani.khare@est.tech>
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumerSpec.groovy
cps-ncmp-service/src/test/resources/sample_dataJobSubscriptionInEvent.json [new file with mode: 0644]

index b3fb133..83c2109 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
+ * Copyright (c) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -9,7 +9,7 @@
  *        http://www.apache.org/licenses/LICENSE-2.0
  *
  *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  distributed under the License is distributed on an 'AS IS' BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
 
 package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
 
-import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent;
-
-import io.cloudevents.CloudEvent;
 import java.util.List;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.NcmpInEvent;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.DataJobSubscriptionOperationInEvent;
+import org.onap.cps.ncmp.impl.utils.JexParser;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
@@ -39,34 +35,28 @@ import org.springframework.stereotype.Component;
 @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
 public class NcmpInEventConsumer {
 
-    private final CmSubscriptionHandler cmSubscriptionHandler;
-
     /**
      * Consume the specified event.
      *
-     * @param ncmpInEventAsConsumerRecord the event to be consumed
+     * @param dataJobSubscriptionOperationInEvent the event to be consumed
      */
     @KafkaListener(topics = "${app.ncmp.avc.cm-subscription-ncmp-in}",
-            containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
-    public void consumeSubscriptionEvent(final ConsumerRecord<String, CloudEvent> ncmpInEventAsConsumerRecord) {
-        final CloudEvent cloudEvent = ncmpInEventAsConsumerRecord.value();
-        final NcmpInEvent ncmpInEvent = toTargetEvent(cloudEvent, NcmpInEvent.class);
-        if (ncmpInEvent != null) {
-            log.info("Subscription with name {} to be mapped to hazelcast object...",
-                ncmpInEvent.getData().getSubscriptionId());
+            containerFactory = "legacyEventConcurrentKafkaListenerContainerFactory",
+            properties = {"spring.json.value.default.type="
+                    + "org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0"
+                    + ".client_to_ncmp.DataJobSubscriptionOperationInEvent"})
+    public void consumeSubscriptionEvent(
+            final DataJobSubscriptionOperationInEvent dataJobSubscriptionOperationInEvent) {
+
+        final String eventType = dataJobSubscriptionOperationInEvent.getEventType();
+        final String dataNodeSelector = dataJobSubscriptionOperationInEvent.getEvent().getDataJob()
+                .getProductionJobDefinition().getTargetSelector().getDataNodeSelector();
+        final List<String> fdns = JexParser.extractFdnsFromLocationPaths(dataNodeSelector);
+        final String dataJobId = dataJobSubscriptionOperationInEvent.getEvent().getDataJob().getId();
+        final String dataTypeId = dataJobSubscriptionOperationInEvent.getEvent().getDataType() != null
+                ? dataJobSubscriptionOperationInEvent.getEvent().getDataType().getDataTypeId() : "UNKNOWN";
 
-            final String subscriptionId = ncmpInEvent.getData().getSubscriptionId();
-            final List<Predicate> predicates = ncmpInEvent.getData().getPredicates();
-            if ("subscriptionCreateRequest".equals(cloudEvent.getType())) {
-                log.info("Subscription create request for source {} with subscription id {} ...",
-                    cloudEvent.getSource(), subscriptionId);
-                cmSubscriptionHandler.processSubscriptionCreateRequest(subscriptionId, predicates);
-            }
-            if ("subscriptionDeleteRequest".equals(cloudEvent.getType())) {
-                log.info("Subscription delete request for source {} with subscription id {} ...",
-                    cloudEvent.getSource(), subscriptionId);
-                cmSubscriptionHandler.processSubscriptionDeleteRequest(subscriptionId);
-            }
-        }
+        log.info("Consumed subscription event with details: | jobId={} | eventType={} | fdns={} | dataType={}",
+                dataJobId, eventType, fdns, dataTypeId);
     }
 }
index 3b8fb0f..9e46028 100644 (file)
@@ -25,23 +25,19 @@ import ch.qos.logback.classic.Logger
 import ch.qos.logback.classic.spi.ILoggingEvent
 import ch.qos.logback.core.read.ListAppender
 import com.fasterxml.jackson.databind.ObjectMapper
-import io.cloudevents.CloudEvent
-import io.cloudevents.core.builder.CloudEventBuilder
-import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.NcmpInEvent
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.DataJobSubscriptionOperationInEvent
 import org.onap.cps.ncmp.utils.TestUtils
-import org.onap.cps.ncmp.utils.events.MessagingBaseSpec
 import org.onap.cps.utils.JsonObjectMapper
 import org.slf4j.LoggerFactory
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.test.context.SpringBootTest
+import spock.lang.Specification
 
 @SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
-class NcmpInEventConsumerSpec extends MessagingBaseSpec {
+class NcmpInEventConsumerSpec extends Specification {
 
-    def mockCmSubscriptionHandler = Mock(CmSubscriptionHandler)
-    def objectUnderTest = new NcmpInEventConsumer(mockCmSubscriptionHandler)
-    def logger = Spy(ListAppender<ILoggingEvent>)
+    def objectUnderTest = new NcmpInEventConsumer()
+    def logger = new ListAppender<ILoggingEvent>()
 
     @Autowired
     JsonObjectMapper jsonObjectMapper
@@ -58,69 +54,23 @@ class NcmpInEventConsumerSpec extends MessagingBaseSpec {
         ((Logger) LoggerFactory.getLogger(NcmpInEventConsumer.class)).detachAndStopAllAppenders()
     }
 
-
-    def 'Consume valid CmNotificationSubscriptionNcmpInEvent create message.'() {
-        given: 'a cmNotificationSubscription event'
-            def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
-            def testEventSent = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent)
-            def testCloudEventSent = CloudEventBuilder.v1()
-                .withData(objectMapper.writeValueAsBytes(testEventSent))
-                .withId('subscriptionCreated')
-                .withType('subscriptionCreateRequest')
-                .withSource(URI.create('some-resource'))
-                .withExtension('correlationid', 'test-cmhandle1').build()
-            def consumerRecord = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent)
-        when: 'the valid event is consumed'
-            objectUnderTest.consumeSubscriptionEvent(consumerRecord)
-        then: 'an event is logged with level INFO'
-            def loggingEvent = getLoggingEvent()
+    def 'Consuming CM Data Notification #scenario data type id.'() {
+        given: 'a JSON file containing a subscription event'
+            def jsonData = TestUtils.getResourceFileContent('sample_dataJobSubscriptionInEvent.json')
+            jsonData = jsonData.replace('#dataTypeId', dataTypeId)
+            def event = objectMapper.readValue(jsonData, DataJobSubscriptionOperationInEvent)
+        when: 'the event is consumed'
+            objectUnderTest.consumeSubscriptionEvent(event)
+        then: 'event details are logged at level INFO'
+            def loggingEvent = logger.list.last()
             assert loggingEvent.level == Level.INFO
-        and: 'the log indicates the task completed successfully'
-            assert loggingEvent.formattedMessage == 'Subscription create request for source some-resource with subscription id test-id ...'
-        and: 'the subscription handler service is called once'
-            1 * mockCmSubscriptionHandler.processSubscriptionCreateRequest('test-id',_)
+            assert loggingEvent.formattedMessage.contains('jobId=my job id')
+            assert loggingEvent.formattedMessage.contains('eventType=my event type')
+            assert loggingEvent.formattedMessage.contains("dataType=${dataTypeId}")
+            assert loggingEvent.formattedMessage.contains('fdns=[/SubNetwork=SN1]')
+        where: 'the following data type ids are used'
+            scenario  | dataTypeId
+            'with'    | 'my data type'
+            'without' | 'null'
     }
-
-    def 'Consume valid CmNotificationSubscriptionNcmpInEvent delete message.'() {
-        given: 'a cmNotificationSubscription event'
-            def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
-            def testEventSent = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent)
-            def testCloudEventSent = CloudEventBuilder.v1()
-                .withData(objectMapper.writeValueAsBytes(testEventSent))
-                .withId('sub-id')
-                .withType('subscriptionDeleteRequest')
-                .withSource(URI.create('some-resource'))
-                .withExtension('correlationid', 'test-cmhandle1').build()
-            def consumerRecord = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent)
-        when: 'the valid event is consumed'
-            objectUnderTest.consumeSubscriptionEvent(consumerRecord)
-        then: 'an event is logged with level INFO'
-            def loggingEvent = getLoggingEvent()
-            assert loggingEvent.level == Level.INFO
-        and: 'the log indicates the task completed successfully'
-            assert loggingEvent.formattedMessage == 'Subscription delete request for source some-resource with subscription id test-id ...'
-        and: 'the subscription handler service is called once'
-            1 * mockCmSubscriptionHandler.processSubscriptionDeleteRequest('test-id')
-    }
-
-    def 'Attempt to consume unsupported Event.'() {
-        given: 'a unsupported event with a valid supported type'
-            def unsupportedEvent = Mock(CloudEvent)
-            def cloudEventWithUnsupportedEvent = CloudEventBuilder.v1()
-                .withId('some id')
-                .withType('subscriptionCreateRequest') // this is valid but does not match the event object
-                .withSource(URI.create('some-resource'))
-                .withData(objectMapper.writeValueAsBytes(unsupportedEvent)).build()
-            def consumerRecord = new ConsumerRecord<String, CloudEvent>('some topic', 0, 0, 'some key', cloudEventWithUnsupportedEvent)
-        when: 'attempt to consume the unsupported event'
-            objectUnderTest.consumeSubscriptionEvent(consumerRecord)
-        then: 'the subscription handler service is not called at all'
-            0 * mockCmSubscriptionHandler.processSubscriptionDeleteRequest(*_)
-            0 * mockCmSubscriptionHandler.processSubscriptionCreateRequest(*_)
-    }
-
-    def getLoggingEvent() {
-        return logger.list[1]
-    }
-
 }
diff --git a/cps-ncmp-service/src/test/resources/sample_dataJobSubscriptionInEvent.json b/cps-ncmp-service/src/test/resources/sample_dataJobSubscriptionInEvent.json
new file mode 100644 (file)
index 0000000..f81e7e1
--- /dev/null
@@ -0,0 +1,22 @@
+{
+  "eventType": "my event type",
+  "event": {
+    "dataJob": {
+      "id": "my job id",
+      "productionJobDefinition": {
+        "targetSelector": {
+          "dataNodeSelector": "/SubNetwork[id=\"SN1\"]"
+        },
+        "dataSelector": {
+          "notificationTypes": [
+            ""
+          ],
+          "notificationFilter": ""
+        }
+      }
+    },
+    "dataType": {
+      "dataTypeId": "#dataTypeId"
+    }
+  }
+}
\ No newline at end of file