Consume 'ACCEPTED' create response event from DMI 60/142060/6
authoremaclee <lee.anjella.macabuhay@est.tech>
Wed, 17 Sep 2025 14:09:08 +0000 (15:09 +0100)
committeremaclee <lee.anjella.macabuhay@est.tech>
Tue, 23 Sep 2025 08:54:18 +0000 (09:54 +0100)
- added agreed schema for dmi out event response
- added method to consume event from DMI for create response
- added method to update subscription status where status is
  ACCEPTED

Issue-ID: CPS-2982
Change-Id: Idbb3046a2fa1aad9d03f06cbbc17d163696c3b7a
Signed-off-by: emaclee <lee.anjella.macabuhay@est.tech>
13 files changed:
cps-application/src/main/resources/application.yml
cps-application/src/test/resources/application.yml
cps-ncmp-events/src/main/resources/schemas/ncmp/datajobs.subscription/data-job-subscription-dmi-out-event-schema-1.0.0.json [new file with mode: 0644]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventConsumer.java [new file with mode: 0644]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandler.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImpl.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/utils/CmDataJobSubscriptionPersistenceService.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventConsumerSpec.groovy [new file with mode: 0644]
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImplSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpInEventConsumerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/utils/CmSubscriptionPersistenceServiceSpec.groovy
cps-ncmp-service/src/test/resources/datajobs/subscription/cmNotificationSubscriptionDmiOutEvent.json
integration-test/src/test/resources/application.yml

index 245c051..d735a1c 100644 (file)
@@ -104,7 +104,7 @@ app:
         avc:
             cm-subscription-ncmp-in: ${CM_SUBSCRIPTION_NCMP_IN_TOPIC:subscription}
             cm-subscription-dmi-in: ${CM_SUBSCRIPTION_DMI_IN_TOPIC:ncmp-dmi-cm-avc-subscription}
-            cm-subscription-ncmp-out: ${CM_SUBSCRIPTION_NCMP_OUT_TOPIC:subscription-response}
+            cm-subscription-dmi-out: ${CM_SUBSCRIPTION_DMI_OUT_TOPIC:dmi-ncmp-cm-avc-subscription}
             cm-events-topic: ${NCMP_CM_EVENTS_TOPIC:cm-events}
             inventory-events-topic: ncmp-inventory-events
     lcm:
index a24e65a..b08a375 100644 (file)
@@ -101,7 +101,7 @@ app:
         avc:
             cm-subscription-ncmp-in: ${CM_SUBSCRIPTION_NCMP_IN_TOPIC:subscription}
             cm-subscription-dmi-in: ${CM_SUBSCRIPTION_DMI_IN_TOPIC:ncmp-dmi-cm-avc-subscription}
-            cm-subscription-ncmp-out: ${CM_SUBSCRIPTION_NCMP_OUT_TOPIC:subscription-response}
+            cm-subscription-dmi-out: ${CM_SUBSCRIPTION_DMI_OUT_TOPIC:dmi-ncmp-cm-avc-subscription}
             cm-events-topic: ${NCMP_CM_EVENTS_TOPIC:cm-events}
             inventory-events-topic: ncmp-inventory-events
     lcm:
diff --git a/cps-ncmp-events/src/main/resources/schemas/ncmp/datajobs.subscription/data-job-subscription-dmi-out-event-schema-1.0.0.json b/cps-ncmp-events/src/main/resources/schemas/ncmp/datajobs.subscription/data-job-subscription-dmi-out-event-schema-1.0.0.json
new file mode 100644 (file)
index 0000000..5dcbdbe
--- /dev/null
@@ -0,0 +1,44 @@
+
+{
+  "$schema": "https://json-schema.org/draft/2019-09/schema",
+  "$id": "urn:cps:org.onap.ncmp.events.subscription:1.0.0",
+  "$ref": "#/definitions/DmiOutEvent",
+  "definitions": {
+    "DmiOutEvent": {
+      "description": "The payload for cm notification subscription event coming out from DMI Plugin.",
+      "type": "object",
+      "additionalProperties": false,
+      "javaType": "org.onap.cps.ncmp.impl.datajobs.subscription.dmi_to_ncmp.DataJobSubscriptionDmiOutEvent",
+      "properties": {
+        "data": {
+          "$ref": "#/definitions/Data"
+        }
+      },
+      "required": [
+        "data"
+      ],
+      "title": "DmiOutEvent"
+    },
+    "Data": {
+      "type": "object",
+      "description": "Information about the targets and subscription",
+      "additionalProperties": false,
+      "properties": {
+        "statusCode": {
+          "type": "string",
+          "format": "integer",
+          "description": "The common status as defined in CPS"
+        },
+        "statusMessage": {
+          "type": "string",
+          "description": "The common status message as defined in CPS"
+        }
+      },
+      "required": [
+        "statusCode",
+        "statusMessage"
+      ],
+      "title": "Data"
+    }
+  }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventConsumer.java
new file mode 100644 (file)
index 0000000..141c74a
--- /dev/null
@@ -0,0 +1,89 @@
+
+/*
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.datajobs.subscription.dmi;
+
+import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_ACCEPTED;
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED;
+import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent;
+
+import io.cloudevents.CloudEvent;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.onap.cps.ncmp.impl.datajobs.subscription.dmi_to_ncmp.DataJobSubscriptionDmiOutEvent;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus;
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.CmSubscriptionHandler;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+@RequiredArgsConstructor
+@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
+public class EventConsumer {
+
+    private final CmSubscriptionHandler cmSubscriptionHandler;
+    private static final String CORRELATION_ID_SEPARATOR = "#";
+
+    /**
+     * Consume the Cm Notification Subscription response event from the dmi-plugin.
+     *
+     * @param dmiOutEventAsConsumerRecord the event to be consumed
+     */
+    @KafkaListener(topics = "${app.ncmp.avc.cm-subscription-dmi-out}",
+            containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
+    public void consumeDmiOutEvent(final ConsumerRecord<String, CloudEvent> dmiOutEventAsConsumerRecord) {
+        final CloudEvent cloudEvent = dmiOutEventAsConsumerRecord.value();
+        final DataJobSubscriptionDmiOutEvent dmiOutEvent =
+                toTargetEvent(cloudEvent, DataJobSubscriptionDmiOutEvent.class);
+        final String correlationId = String.valueOf(cloudEvent.getExtension("correlationid"));
+        final String eventType = cloudEvent.getType();
+
+        log.info("Consumed DMI subscription response event with details: | correlationId={} | eventType={}",
+                correlationId, eventType);
+
+        if (dmiOutEvent != null && correlationId != null) {
+            final String[] parts = correlationId.split(CORRELATION_ID_SEPARATOR);
+            final String subscriptionId = parts[0];
+            final String dmiPluginName = parts[1];
+
+            if ("subscriptionCreateResponse".equals(eventType)) {
+                final CmSubscriptionStatus cmSubscriptionStatus = getCmSubscriptionStatus(dmiOutEvent);
+                if (ACCEPTED.equals(cmSubscriptionStatus)) {
+                    cmSubscriptionHandler.updateCmSubscriptionStatus(
+                            subscriptionId, dmiPluginName, cmSubscriptionStatus);
+                }
+            }
+        }
+    }
+
+    private CmSubscriptionStatus getCmSubscriptionStatus(final DataJobSubscriptionDmiOutEvent dmiOutEvent) {
+        final String statusMessage = dmiOutEvent.getData().getStatusMessage();
+        final String statusCode = dmiOutEvent.getData().getStatusCode();
+        if (statusCode.equals(CM_DATA_SUBSCRIPTION_ACCEPTED.getCode())
+                && statusMessage.equals(CM_DATA_SUBSCRIPTION_ACCEPTED.getMessage())) {
+            return CmSubscriptionStatus.ACCEPTED;
+        }
+        return CmSubscriptionStatus.REJECTED;
+    }
+}
index 47315b3..a23e76b 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp;
 
 import java.util.List;
 import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataSelector;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus;
 
 public interface CmSubscriptionHandler {
 
@@ -35,4 +36,14 @@ public interface CmSubscriptionHandler {
     void processSubscriptionCreate(final DataSelector dataSelector, final String subscriptionId,
                                    final List<String> dataNodeSelectors);
 
+    /**
+     * Update status of a subscription.
+     *
+     * @param subscriptionId       subscription id
+     * @param dmiServiceName       relevant DMI service name
+     * @param cmSubscriptionStatus cm subscription status
+     */
+    void updateCmSubscriptionStatus(final String subscriptionId, final String dmiServiceName,
+                                    final CmSubscriptionStatus cmSubscriptionStatus);
+
 }
index 9a0be3b..8811c6a 100644 (file)
@@ -31,6 +31,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataSelector;
 import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventMapper;
 import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.EventProducer;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus;
 import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent;
 import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService;
 import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
@@ -61,6 +62,26 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler {
         sendCreateEventToDmis(subscriptionId, dataSelector);
     }
 
+    @Override
+    public void updateCmSubscriptionStatus(final String subscriptionId,
+                                           final String dmiServiceName,
+                                           final CmSubscriptionStatus cmSubscriptionStatus) {
+        final List<String> dataNodeSelectors =
+                cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors(subscriptionId);
+        for (final String dataNodeSelector : dataNodeSelectors) {
+            final String cmHandleId = getCmHandleId(dataNodeSelector);
+            if (cmHandleId == null) {
+                log.info("Failed to resolve cm handle ID for dataNodeSelector {}", dataNodeSelector);
+            } else {
+                final String resolvedDmiServiceName = getDmiServiceName(cmHandleId);
+                if (resolvedDmiServiceName.equals(dmiServiceName)) {
+                    cmDataJobSubscriptionPersistenceService.updateCmSubscriptionStatus(dataNodeSelector,
+                            cmSubscriptionStatus);
+                }
+            }
+        }
+    }
+
     private void sendCreateEventToDmis(final String subscriptionId, final DataSelector dataSelector) {
         final List<String> dataNodeSelectors =
                 cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors(subscriptionId);
index 14259d8..cdb1295 100644 (file)
@@ -36,6 +36,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.api.CpsDataService;
 import org.onap.cps.api.CpsQueryService;
 import org.onap.cps.api.model.DataNode;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus;
 import org.onap.cps.utils.ContentType;
 import org.onap.cps.utils.JsonObjectMapper;
 import org.springframework.stereotype.Service;
@@ -133,26 +134,38 @@ public class CmDataJobSubscriptionPersistenceService {
             addNewSubscriptionDetails(subscriptionId, dataNodeSelector);
         } else {
             final Collection<String> subscriptionIds = getSubscriptionIds(dataNodeSelector);
-            final String status = dataNodes.iterator().next().getLeaves().get("status").toString();
+            final String cmSubscriptionStatusName = dataNodes.iterator().next().getLeaves().get("status").toString();
             subscriptionIds.add(subscriptionId);
-            updateSubscriptionDetails(dataNodeSelector, subscriptionIds, status);
+            updateSubscriptionDetails(dataNodeSelector, subscriptionIds, cmSubscriptionStatusName);
         }
     }
 
+    /**
+     * Update status of a subscription.
+     *
+     * @param dataNodeSelector     data node selector
+     * @param cmSubscriptionStatus cm subscription status
+     */
+    public void updateCmSubscriptionStatus(final String dataNodeSelector,
+                                           final CmSubscriptionStatus cmSubscriptionStatus) {
+        final Collection<String> subscriptionIds = getSubscriptionIds(dataNodeSelector);
+        updateSubscriptionDetails(dataNodeSelector, subscriptionIds, cmSubscriptionStatus.name());
+    }
+
     private void addNewSubscriptionDetails(final String subscriptionId,
                                            final String dataNodeSelector) {
         final Collection<String> newSubscriptionList = Collections.singletonList(subscriptionId);
-        final String status = UNKNOWN.name();
+        final String cmSubscriptionStatus = UNKNOWN.name();
         final String subscriptionDetailsAsJson = createSubscriptionDetailsAsJson(dataNodeSelector,
-                newSubscriptionList, status);
+                newSubscriptionList, cmSubscriptionStatus);
         cpsDataService.saveData(DATASPACE, ANCHOR, subscriptionDetailsAsJson,
             OffsetDateTime.now(), ContentType.JSON);
     }
 
     private void updateSubscriptionDetails(final String dataNodeSelector, final Collection<String> subscriptionIds,
-                                           final String status) {
+                                           final String cmSubscriptionStatusName) {
         final String subscriptionDetailsAsJson = createSubscriptionDetailsAsJson(dataNodeSelector,
-                subscriptionIds, status);
+                subscriptionIds, cmSubscriptionStatusName);
         cpsDataService.updateNodeLeaves(DATASPACE, ANCHOR,
                 PARENT_NODE_XPATH, subscriptionDetailsAsJson, OffsetDateTime.now(),
             ContentType.JSON);
@@ -160,11 +173,11 @@ public class CmDataJobSubscriptionPersistenceService {
 
     private String createSubscriptionDetailsAsJson(final String dataNodeSelector,
                                                    final Collection<String> subscriptionIds,
-                                                   final String status) {
+                                                   final String cmSubscriptionStatusName) {
         final Map<String, Serializable> subscriptionDetailsAsMap =
             Map.of("dataNodeSelector", dataNodeSelector,
                 "dataJobId", (Serializable) subscriptionIds,
-                "status", status);
+                "status", cmSubscriptionStatusName);
         return "{\"subscription\":[" + jsonObjectMapper.asJsonString(subscriptionDetailsAsMap) + "]}";
     }
 }
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventConsumerSpec.groovy
new file mode 100644 (file)
index 0000000..af0d322
--- /dev/null
@@ -0,0 +1,84 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an 'AS IS' BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.datajobs.subscription.dmi
+
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED
+
+import ch.qos.logback.classic.Level
+import ch.qos.logback.classic.Logger
+import ch.qos.logback.classic.spi.ILoggingEvent
+import ch.qos.logback.core.read.ListAppender
+import com.fasterxml.jackson.databind.ObjectMapper
+import io.cloudevents.CloudEvent
+import io.cloudevents.core.builder.CloudEventBuilder
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.onap.cps.ncmp.impl.datajobs.subscription.dmi_to_ncmp.DataJobSubscriptionDmiOutEvent
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.CmSubscriptionHandler
+import org.onap.cps.ncmp.utils.TestUtils
+import org.onap.cps.utils.JsonObjectMapper
+import org.slf4j.LoggerFactory
+import spock.lang.Specification
+
+class EventConsumerSpec extends Specification {
+
+    def logger = new ListAppender<ILoggingEvent>()
+    def objectMapper = new ObjectMapper()
+    def jsonObjectMapper = new JsonObjectMapper(objectMapper)
+
+    def mockCmSubscriptionHandler = Mock(CmSubscriptionHandler)
+    def objectUnderTest = new EventConsumer(mockCmSubscriptionHandler)
+
+    void setup() {
+        ((Logger) LoggerFactory.getLogger(EventConsumer.class)).addAppender(logger)
+        logger.start()
+    }
+
+    void cleanup() {
+        ((Logger) LoggerFactory.getLogger(EventConsumer.class)).detachAndStopAllAppenders()
+    }
+
+    def 'Consume subscription CREATE response with status ACCEPTED from DMI Plugin'() {
+        given: 'a response event from DMI'
+            def jsonData = TestUtils.getResourceFileContent(
+                'datajobs/subscription/cmNotificationSubscriptionDmiOutEvent.json')
+            def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DataJobSubscriptionDmiOutEvent.class)
+            def testCloudEventSent = CloudEventBuilder.v1()
+                .withData(objectMapper.writeValueAsBytes(testEventSent))
+                .withId('random-uuid')
+                .withType('subscriptionCreateResponse')
+                .withSource(URI.create('myDmi'))
+                .withExtension('correlationid', 'sub-1#myDmi').build()
+            def consumerRecord = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent)
+        when: 'the event is consumed'
+            objectUnderTest.consumeDmiOutEvent(consumerRecord)
+        then: 'an event is logged with level INFO'
+            def loggingEvent = getLoggingEvent()
+            assert loggingEvent.level == Level.INFO
+        and: 'the log indicates the task completed successfully'
+            assert loggingEvent.formattedMessage == 'Consumed DMI subscription response event with details: | correlationId=sub-1#myDmi | eventType=subscriptionCreateResponse'
+        and:  'the subscription handler is called to update status of subscription with correct details'
+            1 * mockCmSubscriptionHandler.updateCmSubscriptionStatus('sub-1', 'myDmi', ACCEPTED)
+    }
+
+    def getLoggingEvent() {
+        return logger.list[0]
+    }
+}
index 79becb3..8980d65 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp
 
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED
 
 import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataSelector
 import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent
@@ -133,6 +134,27 @@ class CmSubscriptionHandlerImplSpec extends Specification {
             'new target does not overlap with existing targets'| ['/newDataNodeSelector[id=""]']                                   || 1
     }
 
+    def 'Update subscription status to ACCEPTED: #scenario'() {
+        given: 'a subscription ID'
+            def mySubscriptionId = 'mySubId'
+        and: 'the persistence service returns all inactive data node selectors'
+            def myDataNodeSelectors = ['/myDataNodeSelector[id=""]'].asList()
+            mockCmSubscriptionPersistenceService.getInactiveDataNodeSelectors(mySubscriptionId) >> myDataNodeSelectors
+        and: 'alternate id matcher always returns a cm handle id'
+            mockAlternateIdMatcher.getCmHandleId(_) >> 'someCmHandleId'
+        and: 'the inventory persistence service returns a yang model with a dmi service name for the accepted subscription'
+            mockInventoryPersistence.getYangModelCmHandle(_) >>  new YangModelCmHandle(dmiServiceName: 'myDmi')
+        when: 'the method to update subscription status is called with status=ACCEPTED and dmi #dmiName'
+            objectUnderTest.updateCmSubscriptionStatus(mySubscriptionId, dmiName, ACCEPTED)
+        then: 'the persistence service to update subscription status is ONLY called for matching dmi name'
+            expectedCallsToPersistenceService * mockCmSubscriptionPersistenceService.updateCmSubscriptionStatus('/myDataNodeSelector[id=""]', ACCEPTED)
+        where: 'the following data are used'
+            scenario                          |dmiName        || expectedCallsToPersistenceService
+            'data node selector for "myDmi"'  |'myDmi'        || 1
+            'data node selector for other dmi'| 'someOtherDmi'|| 0
+    }
+
+
     def getFdn(dataNodeSelector) {
         return JexParser.extractFdnPrefix(dataNodeSelector).orElse("")
     }
index ad201c6..6230fb5 100644 (file)
@@ -28,24 +28,16 @@ import com.fasterxml.jackson.databind.ObjectMapper
 import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataJobSubscriptionOperationInEvent
 import org.onap.cps.ncmp.impl.utils.JexParser
 import org.onap.cps.ncmp.utils.TestUtils
-import org.onap.cps.utils.JsonObjectMapper
 import org.slf4j.LoggerFactory
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.boot.test.context.SpringBootTest
 import spock.lang.Specification
 
-@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
 class NcmpInEventConsumerSpec extends Specification {
 
-    def mockCmSubscriptionHandler = Mock(CmSubscriptionHandlerImpl)
-    def objectUnderTest = new NcmpInEventConsumer(mockCmSubscriptionHandler)
     def logger = new ListAppender<ILoggingEvent>()
+    def objectMapper = new ObjectMapper()
 
-    @Autowired
-    JsonObjectMapper jsonObjectMapper
-
-    @Autowired
-    ObjectMapper objectMapper
+    def mockCmSubscriptionHandler = Mock(CmSubscriptionHandlerImpl)
+    def objectUnderTest = new NcmpInEventConsumer(mockCmSubscriptionHandler)
 
     void setup() {
         ((Logger) LoggerFactory.getLogger(NcmpInEventConsumer.class)).addAppender(logger)
index ed2fa33..a105d1d 100644 (file)
 
 package org.onap.cps.ncmp.impl.datajobs.subscription.utils
 
-
 import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR
 import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID
 import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_INACTIVE_SUBSCRIPTIONS
 import static CmDataJobSubscriptionPersistenceService.PARENT_NODE_XPATH
 import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED
 
 import com.fasterxml.jackson.databind.ObjectMapper
 import org.onap.cps.api.CpsDataService
@@ -119,4 +119,21 @@ class CmSubscriptionPersistenceServiceSpec extends Specification {
             1 * mockCpsDataService.updateNodeLeaves('NCMP-Admin', 'cm-data-job-subscriptions', PARENT_NODE_XPATH, subscriptionDetailsAsJson, _, ContentType.JSON)
     }
 
+    def 'Update subscription status'() {
+        given: 'a data node selector'
+            def myDataNodeSelector = "/myDataNodeSelector"
+        and: 'a status'
+            def status = ACCEPTED
+        and: 'the query service returns data node'
+            def subscriptionIds = ['someId']
+            mockCpsQueryService.queryDataNodes(_,_,_,_) >> [new DataNode(leaves: ['dataJobId': subscriptionIds, 'dataNodeSelector': myDataNodeSelector, 'status': 'UNKNOWN'])]
+        and: 'updated cm data job subscription details as json'
+            def subscriptionDetailsAsJson = objectUnderTest.createSubscriptionDetailsAsJson(myDataNodeSelector, subscriptionIds, status.name())
+        when: 'the method to update subscription status is called'
+            objectUnderTest.updateCmSubscriptionStatus(myDataNodeSelector, status)
+        then: 'data service method to update list of subscribers is called once'
+            1 * mockCpsDataService.updateNodeLeaves('NCMP-Admin', 'cm-data-job-subscriptions', PARENT_NODE_XPATH, subscriptionDetailsAsJson, _, _)
+
+    }
+
 }
index 2593c49..cb60de8 100644 (file)
@@ -100,7 +100,7 @@ app:
     avc:
       cm-subscription-ncmp-in: ${CM_SUBSCRIPTION_NCMP_IN_TOPIC:subscription}
       cm-subscription-dmi-in: ${CM_SUBSCRIPTION_DMI_IN_TOPIC:ncmp-dmi-cm-avc-subscription}
-      cm-subscription-ncmp-out: ${CM_SUBSCRIPTION_NCMP_OUT_TOPIC:subscription-response}
+      cm-subscription-dmi-out: ${CM_SUBSCRIPTION_DMI_OUT_TOPIC:dmi-ncmp-cm-avc-subscription}
       cm-events-topic: ${NCMP_CM_EVENTS_TOPIC:cm-events}
       inventory-events-topic: ncmp-inventory-events
   lcm: