avc:
cm-subscription-ncmp-in: ${CM_SUBSCRIPTION_NCMP_IN_TOPIC:subscription}
cm-subscription-dmi-in: ${CM_SUBSCRIPTION_DMI_IN_TOPIC:ncmp-dmi-cm-avc-subscription}
- cm-subscription-ncmp-out: ${CM_SUBSCRIPTION_NCMP_OUT_TOPIC:subscription-response}
+ cm-subscription-dmi-out: ${CM_SUBSCRIPTION_DMI_OUT_TOPIC:dmi-ncmp-cm-avc-subscription}
cm-events-topic: ${NCMP_CM_EVENTS_TOPIC:cm-events}
inventory-events-topic: ncmp-inventory-events
lcm:
avc:
cm-subscription-ncmp-in: ${CM_SUBSCRIPTION_NCMP_IN_TOPIC:subscription}
cm-subscription-dmi-in: ${CM_SUBSCRIPTION_DMI_IN_TOPIC:ncmp-dmi-cm-avc-subscription}
- cm-subscription-ncmp-out: ${CM_SUBSCRIPTION_NCMP_OUT_TOPIC:subscription-response}
+ cm-subscription-dmi-out: ${CM_SUBSCRIPTION_DMI_OUT_TOPIC:dmi-ncmp-cm-avc-subscription}
cm-events-topic: ${NCMP_CM_EVENTS_TOPIC:cm-events}
inventory-events-topic: ncmp-inventory-events
lcm:
--- /dev/null
+
+{
+ "$schema": "https://json-schema.org/draft/2019-09/schema",
+ "$id": "urn:cps:org.onap.ncmp.events.subscription:1.0.0",
+ "$ref": "#/definitions/DmiOutEvent",
+ "definitions": {
+ "DmiOutEvent": {
+ "description": "The payload for cm notification subscription event coming out from DMI Plugin.",
+ "type": "object",
+ "additionalProperties": false,
+ "javaType": "org.onap.cps.ncmp.impl.datajobs.subscription.dmi_to_ncmp.DataJobSubscriptionDmiOutEvent",
+ "properties": {
+ "data": {
+ "$ref": "#/definitions/Data"
+ }
+ },
+ "required": [
+ "data"
+ ],
+ "title": "DmiOutEvent"
+ },
+ "Data": {
+ "type": "object",
+ "description": "Information about the targets and subscription",
+ "additionalProperties": false,
+ "properties": {
+ "statusCode": {
+ "type": "string",
+ "format": "integer",
+ "description": "The common status as defined in CPS"
+ },
+ "statusMessage": {
+ "type": "string",
+ "description": "The common status message as defined in CPS"
+ }
+ },
+ "required": [
+ "statusCode",
+ "statusMessage"
+ ],
+ "title": "Data"
+ }
+ }
+}
--- /dev/null
+
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.datajobs.subscription.dmi;
+
+import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_ACCEPTED;
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED;
+import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent;
+
+import io.cloudevents.CloudEvent;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.onap.cps.ncmp.impl.datajobs.subscription.dmi_to_ncmp.DataJobSubscriptionDmiOutEvent;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus;
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.CmSubscriptionHandler;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+@RequiredArgsConstructor
+@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
+public class EventConsumer {
+
+ private final CmSubscriptionHandler cmSubscriptionHandler;
+ private static final String CORRELATION_ID_SEPARATOR = "#";
+
+ /**
+ * Consume the Cm Notification Subscription response event from the dmi-plugin.
+ *
+ * @param dmiOutEventAsConsumerRecord the event to be consumed
+ */
+ @KafkaListener(topics = "${app.ncmp.avc.cm-subscription-dmi-out}",
+ containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
+ public void consumeDmiOutEvent(final ConsumerRecord<String, CloudEvent> dmiOutEventAsConsumerRecord) {
+ final CloudEvent cloudEvent = dmiOutEventAsConsumerRecord.value();
+ final DataJobSubscriptionDmiOutEvent dmiOutEvent =
+ toTargetEvent(cloudEvent, DataJobSubscriptionDmiOutEvent.class);
+ final String correlationId = String.valueOf(cloudEvent.getExtension("correlationid"));
+ final String eventType = cloudEvent.getType();
+
+ log.info("Consumed DMI subscription response event with details: | correlationId={} | eventType={}",
+ correlationId, eventType);
+
+ if (dmiOutEvent != null && correlationId != null) {
+ final String[] parts = correlationId.split(CORRELATION_ID_SEPARATOR);
+ final String subscriptionId = parts[0];
+ final String dmiPluginName = parts[1];
+
+ if ("subscriptionCreateResponse".equals(eventType)) {
+ final CmSubscriptionStatus cmSubscriptionStatus = getCmSubscriptionStatus(dmiOutEvent);
+ if (ACCEPTED.equals(cmSubscriptionStatus)) {
+ cmSubscriptionHandler.updateCmSubscriptionStatus(
+ subscriptionId, dmiPluginName, cmSubscriptionStatus);
+ }
+ }
+ }
+ }
+
+ private CmSubscriptionStatus getCmSubscriptionStatus(final DataJobSubscriptionDmiOutEvent dmiOutEvent) {
+ final String statusMessage = dmiOutEvent.getData().getStatusMessage();
+ final String statusCode = dmiOutEvent.getData().getStatusCode();
+ if (statusCode.equals(CM_DATA_SUBSCRIPTION_ACCEPTED.getCode())
+ && statusMessage.equals(CM_DATA_SUBSCRIPTION_ACCEPTED.getMessage())) {
+ return CmSubscriptionStatus.ACCEPTED;
+ }
+ return CmSubscriptionStatus.REJECTED;
+ }
+}
import java.util.List;
import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataSelector;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus;
public interface CmSubscriptionHandler {
void processSubscriptionCreate(final DataSelector dataSelector, final String subscriptionId,
final List<String> dataNodeSelectors);
+ /**
+ * Update status of a subscription.
+ *
+ * @param subscriptionId subscription id
+ * @param dmiServiceName relevant DMI service name
+ * @param cmSubscriptionStatus cm subscription status
+ */
+ void updateCmSubscriptionStatus(final String subscriptionId, final String dmiServiceName,
+ final CmSubscriptionStatus cmSubscriptionStatus);
+
}
import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataSelector;
import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventMapper;
import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.EventProducer;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus;
import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent;
import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService;
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
sendCreateEventToDmis(subscriptionId, dataSelector);
}
+ @Override
+ public void updateCmSubscriptionStatus(final String subscriptionId,
+ final String dmiServiceName,
+ final CmSubscriptionStatus cmSubscriptionStatus) {
+ final List<String> dataNodeSelectors =
+ cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors(subscriptionId);
+ for (final String dataNodeSelector : dataNodeSelectors) {
+ final String cmHandleId = getCmHandleId(dataNodeSelector);
+ if (cmHandleId == null) {
+ log.info("Failed to resolve cm handle ID for dataNodeSelector {}", dataNodeSelector);
+ } else {
+ final String resolvedDmiServiceName = getDmiServiceName(cmHandleId);
+ if (resolvedDmiServiceName.equals(dmiServiceName)) {
+ cmDataJobSubscriptionPersistenceService.updateCmSubscriptionStatus(dataNodeSelector,
+ cmSubscriptionStatus);
+ }
+ }
+ }
+ }
+
private void sendCreateEventToDmis(final String subscriptionId, final DataSelector dataSelector) {
final List<String> dataNodeSelectors =
cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors(subscriptionId);
import org.onap.cps.api.CpsDataService;
import org.onap.cps.api.CpsQueryService;
import org.onap.cps.api.model.DataNode;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus;
import org.onap.cps.utils.ContentType;
import org.onap.cps.utils.JsonObjectMapper;
import org.springframework.stereotype.Service;
addNewSubscriptionDetails(subscriptionId, dataNodeSelector);
} else {
final Collection<String> subscriptionIds = getSubscriptionIds(dataNodeSelector);
- final String status = dataNodes.iterator().next().getLeaves().get("status").toString();
+ final String cmSubscriptionStatusName = dataNodes.iterator().next().getLeaves().get("status").toString();
subscriptionIds.add(subscriptionId);
- updateSubscriptionDetails(dataNodeSelector, subscriptionIds, status);
+ updateSubscriptionDetails(dataNodeSelector, subscriptionIds, cmSubscriptionStatusName);
}
}
+ /**
+ * Update status of a subscription.
+ *
+ * @param dataNodeSelector data node selector
+ * @param cmSubscriptionStatus cm subscription status
+ */
+ public void updateCmSubscriptionStatus(final String dataNodeSelector,
+ final CmSubscriptionStatus cmSubscriptionStatus) {
+ final Collection<String> subscriptionIds = getSubscriptionIds(dataNodeSelector);
+ updateSubscriptionDetails(dataNodeSelector, subscriptionIds, cmSubscriptionStatus.name());
+ }
+
private void addNewSubscriptionDetails(final String subscriptionId,
final String dataNodeSelector) {
final Collection<String> newSubscriptionList = Collections.singletonList(subscriptionId);
- final String status = UNKNOWN.name();
+ final String cmSubscriptionStatus = UNKNOWN.name();
final String subscriptionDetailsAsJson = createSubscriptionDetailsAsJson(dataNodeSelector,
- newSubscriptionList, status);
+ newSubscriptionList, cmSubscriptionStatus);
cpsDataService.saveData(DATASPACE, ANCHOR, subscriptionDetailsAsJson,
OffsetDateTime.now(), ContentType.JSON);
}
private void updateSubscriptionDetails(final String dataNodeSelector, final Collection<String> subscriptionIds,
- final String status) {
+ final String cmSubscriptionStatusName) {
final String subscriptionDetailsAsJson = createSubscriptionDetailsAsJson(dataNodeSelector,
- subscriptionIds, status);
+ subscriptionIds, cmSubscriptionStatusName);
cpsDataService.updateNodeLeaves(DATASPACE, ANCHOR,
PARENT_NODE_XPATH, subscriptionDetailsAsJson, OffsetDateTime.now(),
ContentType.JSON);
private String createSubscriptionDetailsAsJson(final String dataNodeSelector,
final Collection<String> subscriptionIds,
- final String status) {
+ final String cmSubscriptionStatusName) {
final Map<String, Serializable> subscriptionDetailsAsMap =
Map.of("dataNodeSelector", dataNodeSelector,
"dataJobId", (Serializable) subscriptionIds,
- "status", status);
+ "status", cmSubscriptionStatusName);
return "{\"subscription\":[" + jsonObjectMapper.asJsonString(subscriptionDetailsAsMap) + "]}";
}
}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an 'AS IS' BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.datajobs.subscription.dmi
+
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED
+
+import ch.qos.logback.classic.Level
+import ch.qos.logback.classic.Logger
+import ch.qos.logback.classic.spi.ILoggingEvent
+import ch.qos.logback.core.read.ListAppender
+import com.fasterxml.jackson.databind.ObjectMapper
+import io.cloudevents.CloudEvent
+import io.cloudevents.core.builder.CloudEventBuilder
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.onap.cps.ncmp.impl.datajobs.subscription.dmi_to_ncmp.DataJobSubscriptionDmiOutEvent
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.CmSubscriptionHandler
+import org.onap.cps.ncmp.utils.TestUtils
+import org.onap.cps.utils.JsonObjectMapper
+import org.slf4j.LoggerFactory
+import spock.lang.Specification
+
+class EventConsumerSpec extends Specification {
+
+ def logger = new ListAppender<ILoggingEvent>()
+ def objectMapper = new ObjectMapper()
+ def jsonObjectMapper = new JsonObjectMapper(objectMapper)
+
+ def mockCmSubscriptionHandler = Mock(CmSubscriptionHandler)
+ def objectUnderTest = new EventConsumer(mockCmSubscriptionHandler)
+
+ void setup() {
+ ((Logger) LoggerFactory.getLogger(EventConsumer.class)).addAppender(logger)
+ logger.start()
+ }
+
+ void cleanup() {
+ ((Logger) LoggerFactory.getLogger(EventConsumer.class)).detachAndStopAllAppenders()
+ }
+
+ def 'Consume subscription CREATE response with status ACCEPTED from DMI Plugin'() {
+ given: 'a response event from DMI'
+ def jsonData = TestUtils.getResourceFileContent(
+ 'datajobs/subscription/cmNotificationSubscriptionDmiOutEvent.json')
+ def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DataJobSubscriptionDmiOutEvent.class)
+ def testCloudEventSent = CloudEventBuilder.v1()
+ .withData(objectMapper.writeValueAsBytes(testEventSent))
+ .withId('random-uuid')
+ .withType('subscriptionCreateResponse')
+ .withSource(URI.create('myDmi'))
+ .withExtension('correlationid', 'sub-1#myDmi').build()
+ def consumerRecord = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent)
+ when: 'the event is consumed'
+ objectUnderTest.consumeDmiOutEvent(consumerRecord)
+ then: 'an event is logged with level INFO'
+ def loggingEvent = getLoggingEvent()
+ assert loggingEvent.level == Level.INFO
+ and: 'the log indicates the task completed successfully'
+ assert loggingEvent.formattedMessage == 'Consumed DMI subscription response event with details: | correlationId=sub-1#myDmi | eventType=subscriptionCreateResponse'
+ and: 'the subscription handler is called to update status of subscription with correct details'
+ 1 * mockCmSubscriptionHandler.updateCmSubscriptionStatus('sub-1', 'myDmi', ACCEPTED)
+ }
+
+ def getLoggingEvent() {
+ return logger.list[0]
+ }
+}
package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED
import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataSelector
import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent
'new target does not overlap with existing targets'| ['/newDataNodeSelector[id=""]'] || 1
}
+ def 'Update subscription status to ACCEPTED: #scenario'() {
+ given: 'a subscription ID'
+ def mySubscriptionId = 'mySubId'
+ and: 'the persistence service returns all inactive data node selectors'
+ def myDataNodeSelectors = ['/myDataNodeSelector[id=""]'].asList()
+ mockCmSubscriptionPersistenceService.getInactiveDataNodeSelectors(mySubscriptionId) >> myDataNodeSelectors
+ and: 'alternate id matcher always returns a cm handle id'
+ mockAlternateIdMatcher.getCmHandleId(_) >> 'someCmHandleId'
+ and: 'the inventory persistence service returns a yang model with a dmi service name for the accepted subscription'
+ mockInventoryPersistence.getYangModelCmHandle(_) >> new YangModelCmHandle(dmiServiceName: 'myDmi')
+ when: 'the method to update subscription status is called with status=ACCEPTED and dmi #dmiName'
+ objectUnderTest.updateCmSubscriptionStatus(mySubscriptionId, dmiName, ACCEPTED)
+ then: 'the persistence service to update subscription status is ONLY called for matching dmi name'
+ expectedCallsToPersistenceService * mockCmSubscriptionPersistenceService.updateCmSubscriptionStatus('/myDataNodeSelector[id=""]', ACCEPTED)
+ where: 'the following data are used'
+ scenario |dmiName || expectedCallsToPersistenceService
+ 'data node selector for "myDmi"' |'myDmi' || 1
+ 'data node selector for other dmi'| 'someOtherDmi'|| 0
+ }
+
+
def getFdn(dataNodeSelector) {
return JexParser.extractFdnPrefix(dataNodeSelector).orElse("")
}
import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataJobSubscriptionOperationInEvent
import org.onap.cps.ncmp.impl.utils.JexParser
import org.onap.cps.ncmp.utils.TestUtils
-import org.onap.cps.utils.JsonObjectMapper
import org.slf4j.LoggerFactory
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.boot.test.context.SpringBootTest
import spock.lang.Specification
-@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
class NcmpInEventConsumerSpec extends Specification {
- def mockCmSubscriptionHandler = Mock(CmSubscriptionHandlerImpl)
- def objectUnderTest = new NcmpInEventConsumer(mockCmSubscriptionHandler)
def logger = new ListAppender<ILoggingEvent>()
+ def objectMapper = new ObjectMapper()
- @Autowired
- JsonObjectMapper jsonObjectMapper
-
- @Autowired
- ObjectMapper objectMapper
+ def mockCmSubscriptionHandler = Mock(CmSubscriptionHandlerImpl)
+ def objectUnderTest = new NcmpInEventConsumer(mockCmSubscriptionHandler)
void setup() {
((Logger) LoggerFactory.getLogger(NcmpInEventConsumer.class)).addAppender(logger)
package org.onap.cps.ncmp.impl.datajobs.subscription.utils
-
import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR
import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID
import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_INACTIVE_SUBSCRIPTIONS
import static CmDataJobSubscriptionPersistenceService.PARENT_NODE_XPATH
import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED
import com.fasterxml.jackson.databind.ObjectMapper
import org.onap.cps.api.CpsDataService
1 * mockCpsDataService.updateNodeLeaves('NCMP-Admin', 'cm-data-job-subscriptions', PARENT_NODE_XPATH, subscriptionDetailsAsJson, _, ContentType.JSON)
}
+ def 'Update subscription status'() {
+ given: 'a data node selector'
+ def myDataNodeSelector = "/myDataNodeSelector"
+ and: 'a status'
+ def status = ACCEPTED
+ and: 'the query service returns data node'
+ def subscriptionIds = ['someId']
+ mockCpsQueryService.queryDataNodes(_,_,_,_) >> [new DataNode(leaves: ['dataJobId': subscriptionIds, 'dataNodeSelector': myDataNodeSelector, 'status': 'UNKNOWN'])]
+ and: 'updated cm data job subscription details as json'
+ def subscriptionDetailsAsJson = objectUnderTest.createSubscriptionDetailsAsJson(myDataNodeSelector, subscriptionIds, status.name())
+ when: 'the method to update subscription status is called'
+ objectUnderTest.updateCmSubscriptionStatus(myDataNodeSelector, status)
+ then: 'data service method to update list of subscribers is called once'
+ 1 * mockCpsDataService.updateNodeLeaves('NCMP-Admin', 'cm-data-job-subscriptions', PARENT_NODE_XPATH, subscriptionDetailsAsJson, _, _)
+
+ }
+
}
{
"data": {
"statusCode": "1",
- "statusMessage": "accepted"
+ "statusMessage": "ACCEPTED"
}
}
\ No newline at end of file
avc:
cm-subscription-ncmp-in: ${CM_SUBSCRIPTION_NCMP_IN_TOPIC:subscription}
cm-subscription-dmi-in: ${CM_SUBSCRIPTION_DMI_IN_TOPIC:ncmp-dmi-cm-avc-subscription}
- cm-subscription-ncmp-out: ${CM_SUBSCRIPTION_NCMP_OUT_TOPIC:subscription-response}
+ cm-subscription-dmi-out: ${CM_SUBSCRIPTION_DMI_OUT_TOPIC:dmi-ncmp-cm-avc-subscription}
cm-events-topic: ${NCMP_CM_EVENTS_TOPIC:cm-events}
inventory-events-topic: ncmp-inventory-events
lcm: