From 92374648e87a902a5dff51d22e4b39607f4cc5b6 Mon Sep 17 00:00:00 2001 From: emaclee Date: Mon, 8 Sep 2025 15:58:01 +0100 Subject: [PATCH] Subscription CREATE flow - cover scenarios of the following: add subscription for non existing details in database, add subscription for overlapping details - method first persists subscription and then send relevant details to DMIs - remove classes, files and configuration that was from legacy subscription implementation Issue-ID: CPS-2723 Change-Id: Iee60da409ced76874e0208e4284b7d5cd00c9f6a Signed-off-by: emaclee --- cps-application/src/main/resources/application.yml | 3 - cps-application/src/test/resources/application.yml | 3 - .../ncmp/avc/dmi-in-event-schema-1.0.0.json | 93 -------- .../ncmp/avc/dmi-out-event-schema-1.0.0.json | 43 ---- .../ncmp/avc/ncmp-in-event-schema-1.0.0.json | 73 ------- .../ncmp/avc/ncmp-out-event-schema-1.0.0.json | 57 ----- .../subscription/cache/DmiCacheHandler.java | 216 ------------------- .../dmi/DmiCmSubscriptionDetailsPerDmiMapper.java | 104 --------- .../subscription/dmi/DmiInEventMapper.java | 73 +++---- .../subscription/dmi/DmiOutEventConsumer.java | 116 ---------- ...{DmiInEventProducer.java => EventProducer.java} | 18 +- .../subscription/models/CmSubscriptionStatus.java | 2 +- .../ncmp/CmSubscriptionComparator.java | 77 ------- .../subscription/ncmp/CmSubscriptionHandler.java | 19 +- .../ncmp/CmSubscriptionHandlerImpl.java | 233 ++++++--------------- .../subscription/ncmp/NcmpInEventConsumer.java | 23 +- .../subscription/ncmp/NcmpOutEventMapper.java | 114 ---------- .../subscription/ncmp/NcmpOutEventProducer.java | 136 ------------ .../ncmp/NcmpOutEventPublishingTask.java | 59 ------ .../CmDataJobSubscriptionPersistenceService.java | 148 ++++++------- .../org/onap/cps/ncmp/impl/utils/JexParser.java | 3 - .../cache/CmSubscriptionConfigSpec.groovy | 67 ------ .../subscription/cache/DmiCacheHandlerSpec.groovy | 228 -------------------- ...DmiCmSubscriptionDetailsPerDmiMapperSpec.groovy | 52 ----- .../subscription/dmi/DmiInEventMapperSpec.groovy | 28 ++- .../dmi/DmiOutEventConsumerSpec.groovy | 123 ----------- ...roducerSpec.groovy => EventProducerSpec.groovy} | 12 +- .../ncmp/CmSubscriptionComparatorSpec.groovy | 57 ----- .../ncmp/CmSubscriptionHandlerImplSpec.groovy | 228 +++++++++----------- .../ncmp/NcmpInEventConsumerSpec.groovy | 39 ++-- .../ncmp/NcmpOutEventMapperSpec.groovy | 69 ------ .../ncmp/NcmpOutEventProducerSpec.groovy | 130 ------------ .../CmSubscriptionPersistenceServiceSpec.groovy | 92 +++----- .../cmNotificationSubscriptionNcmpInEvent.json | 21 -- .../cmNotificationSubscriptionDmiOutEvent.json | 0 .../cmNotificationSubscriptionNcmpInEvent.json} | 6 +- .../src/test/resources/application.yml | 3 - 37 files changed, 369 insertions(+), 2399 deletions(-) delete mode 100644 cps-ncmp-events/src/main/resources/schemas/ncmp/avc/dmi-in-event-schema-1.0.0.json delete mode 100644 cps-ncmp-events/src/main/resources/schemas/ncmp/avc/dmi-out-event-schema-1.0.0.json delete mode 100644 cps-ncmp-events/src/main/resources/schemas/ncmp/avc/ncmp-in-event-schema-1.0.0.json delete mode 100644 cps-ncmp-events/src/main/resources/schemas/ncmp/avc/ncmp-out-event-schema-1.0.0.json delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/cache/DmiCacheHandler.java delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiCmSubscriptionDetailsPerDmiMapper.java delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiOutEventConsumer.java rename cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/{DmiInEventProducer.java => EventProducer.java} (79%) delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionComparator.java delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpOutEventMapper.java delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpOutEventProducer.java delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpOutEventPublishingTask.java delete mode 100644 cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cache/CmSubscriptionConfigSpec.groovy delete mode 100644 cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cache/DmiCacheHandlerSpec.groovy delete mode 100644 cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiCmSubscriptionDetailsPerDmiMapperSpec.groovy delete mode 100644 cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiOutEventConsumerSpec.groovy rename cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/{DmiInEventProducerSpec.groovy => EventProducerSpec.groovy} (87%) delete mode 100644 cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionComparatorSpec.groovy delete mode 100644 cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpOutEventMapperSpec.groovy delete mode 100644 cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpOutEventProducerSpec.groovy delete mode 100644 cps-ncmp-service/src/test/resources/cmSubscription/cmNotificationSubscriptionNcmpInEvent.json rename cps-ncmp-service/src/test/resources/{cmSubscription => datajobs/subscription}/cmNotificationSubscriptionDmiOutEvent.json (100%) rename cps-ncmp-service/src/test/resources/{sample_dataJobSubscriptionInEvent.json => datajobs/subscription/cmNotificationSubscriptionNcmpInEvent.json} (78%) diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml index d95e69e840..245c051193 100644 --- a/cps-application/src/main/resources/application.yml +++ b/cps-application/src/main/resources/application.yml @@ -104,7 +104,6 @@ app: avc: cm-subscription-ncmp-in: ${CM_SUBSCRIPTION_NCMP_IN_TOPIC:subscription} cm-subscription-dmi-in: ${CM_SUBSCRIPTION_DMI_IN_TOPIC:ncmp-dmi-cm-avc-subscription} - cm-subscription-dmi-out: ${CM_SUBSCRIPTION_DMI_OUT_TOPIC:dmi-ncmp-cm-avc-subscription} cm-subscription-ncmp-out: ${CM_SUBSCRIPTION_NCMP_OUT_TOPIC:subscription-response} cm-events-topic: ${NCMP_CM_EVENTS_TOPIC:cm-events} inventory-events-topic: ncmp-inventory-events @@ -235,8 +234,6 @@ ncmp: cm-handle-data-sync: initial-delay-ms: 40000 sleep-time-ms: 30000 - subscription-forwarding: - dmi-response-timeout-ms: 30000 model-loader: retry-time-ms: 1000 trust-level: diff --git a/cps-application/src/test/resources/application.yml b/cps-application/src/test/resources/application.yml index e0c827bb52..a24e65af94 100644 --- a/cps-application/src/test/resources/application.yml +++ b/cps-application/src/test/resources/application.yml @@ -101,7 +101,6 @@ app: avc: cm-subscription-ncmp-in: ${CM_SUBSCRIPTION_NCMP_IN_TOPIC:subscription} cm-subscription-dmi-in: ${CM_SUBSCRIPTION_DMI_IN_TOPIC:ncmp-dmi-cm-avc-subscription} - cm-subscription-dmi-out: ${CM_SUBSCRIPTION_DMI_OUT_TOPIC:dmi-ncmp-cm-avc-subscription} cm-subscription-ncmp-out: ${CM_SUBSCRIPTION_NCMP_OUT_TOPIC:subscription-response} cm-events-topic: ${NCMP_CM_EVENTS_TOPIC:cm-events} inventory-events-topic: ncmp-inventory-events @@ -232,8 +231,6 @@ ncmp: cm-handle-data-sync: initial-delay-ms: 40000 sleep-time-ms: 30000 - subscription-forwarding: - dmi-response-timeout-ms: 30000 model-loader: retry-time-ms: 1000 trust-level: diff --git a/cps-ncmp-events/src/main/resources/schemas/ncmp/avc/dmi-in-event-schema-1.0.0.json b/cps-ncmp-events/src/main/resources/schemas/ncmp/avc/dmi-in-event-schema-1.0.0.json deleted file mode 100644 index b9c949177f..0000000000 --- a/cps-ncmp-events/src/main/resources/schemas/ncmp/avc/dmi-in-event-schema-1.0.0.json +++ /dev/null @@ -1,93 +0,0 @@ -{ - "$schema": "https://json-schema.org/draft/2019-09/schema", - "$id": "urn:cps:org.onap.ncmp.events.subscription:1.0.0", - "$ref": "#/definitions/DmiInEvent", - "definitions": { - "DmiInEvent": { - "description": "The payload for cm notification subscription event incoming message from NCMP.", - "type": "object", - "javaType": "org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DmiInEvent", - "additionalProperties": false, - "properties": { - "data": { - "$ref": "#/definitions/data" - } - }, - "required": [ - "data" - ] - }, - "data": { - "type": "object", - "description": "Information about the targets and subscription", - "additionalProperties": false, - "properties": { - "cmHandles": { - "type": "array", - "items": { - "type": "object", - "description": "Details for the target cmhandles", - "additionalProperties": false, - "properties": { - "cmhandleId": { - "type": "string" - }, - "privateProperties": { - "type": "object", - "existingJavaType": "java.util.Map", - "items": { - "type": "string" - } - } - } - } - }, - "predicates": { - "type": "array", - "description": "Additional values to be added into the subscription", - "items": { - "type": "object", - "properties": { - "targetFilter": { - "description": "CM Handles to be targeted by the subscription", - "type": "array", - "items": { - "type": "string" - } - }, - "scopeFilter": { - "type": "object", - "properties": { - "datastore": { - "description": "Datastore which is to be used by the subscription", - "type": "string", - "enum": ["ncmp-datastore:passthrough-operational", "ncmp-datastore:passthrough-running"] - }, - "xpathFilter": { - "description": "Filter to be applied to the CM Handles through this event", - "type": "array", - "items": { - "type": "string" - } - } - }, - "additionalProperties": false, - "required": [ - "xpathFilter" - ] - } - }, - "additionalProperties": false, - "required": [ - "targetFilter" - ] - }, - "additionalProperties": false - } - }, - "required": [ - "cmHandles" - ] - } - } -} diff --git a/cps-ncmp-events/src/main/resources/schemas/ncmp/avc/dmi-out-event-schema-1.0.0.json b/cps-ncmp-events/src/main/resources/schemas/ncmp/avc/dmi-out-event-schema-1.0.0.json deleted file mode 100644 index f9be82ac5f..0000000000 --- a/cps-ncmp-events/src/main/resources/schemas/ncmp/avc/dmi-out-event-schema-1.0.0.json +++ /dev/null @@ -1,43 +0,0 @@ -{ - "$schema": "https://json-schema.org/draft/2019-09/schema", - "$id": "urn:cps:org.onap.ncmp.events.subscription:1.0.0", - "$ref": "#/definitions/DmiOutEvent", - "definitions": { - "DmiOutEvent": { - "description": "The payload for cm notification subscription merge event coming out from DMI Plugin.", - "type": "object", - "additionalProperties": false, - "javaType": "org.onap.cps.ncmp.impl.datajobs.subscription.dmi_to_ncmp.DmiOutEvent", - "properties": { - "data": { - "$ref": "#/definitions/Data" - } - }, - "required": [ - "data" - ], - "title": "DmiOutEvent" - }, - "Data": { - "type": "object", - "description": "Information about the targets and subscription", - "additionalProperties": false, - "properties": { - "statusCode": { - "type": "string", - "format": "integer", - "description": "The common status as defined in CPS" - }, - "statusMessage": { - "type": "string", - "description": "The common status message as defined in CPS" - } - }, - "required": [ - "statusCode", - "statusMessage" - ], - "title": "Data" - } - } -} diff --git a/cps-ncmp-events/src/main/resources/schemas/ncmp/avc/ncmp-in-event-schema-1.0.0.json b/cps-ncmp-events/src/main/resources/schemas/ncmp/avc/ncmp-in-event-schema-1.0.0.json deleted file mode 100644 index 9177d0ed04..0000000000 --- a/cps-ncmp-events/src/main/resources/schemas/ncmp/avc/ncmp-in-event-schema-1.0.0.json +++ /dev/null @@ -1,73 +0,0 @@ -{ - "$id": "urn:cps:org.onap.ncmp.events.subscription:1.0.0", - "$ref": "#/definitions/NcmpInEvent", - "$schema": "https://json-schema.org/draft/2019-09/schema", - "definitions": { - "NcmpInEvent": { - "description": "The payload for subscription merge event.", - "javaType": "org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.NcmpInEvent", - "properties": { - "data": { - "properties": { - "subscriptionId": { - "description": "The subscription details.", - "type": "string" - }, - "predicates": { - "type": "array", - "description": "Additional values to be added into the subscription", - "items": { - "type": "object", - "properties": { - "targetFilter": { - "description": "CM Handles to be targeted by the subscription", - "type": "array", - "items": { - "type": "string" - } - }, - "scopeFilter": { - "type": "object", - "properties": { - "datastore": { - "description": "Datastore which is to be used by the subscription", - "type": "string", - "enum": ["ncmp-datastore:passthrough-operational", "ncmp-datastore:passthrough-running"] - }, - "xpathFilter": { - "description": "Filter to be applied to the CM Handles through this event", - "type": "array", - "items": { - "type": "string" - } - } - }, - "additionalProperties": false, - "required": [ - "xpathFilter" - ] - } - }, - "additionalProperties": false, - "required": [ - "targetFilter" - ] - }, - "additionalProperties": false - } - }, - "required": [ - "subscriptionId" - ], - "type": "object", - "additionalProperties": false - } - }, - "type": "object", - "additionalProperties": false, - "required": [ - "data" - ] - } - } -} diff --git a/cps-ncmp-events/src/main/resources/schemas/ncmp/avc/ncmp-out-event-schema-1.0.0.json b/cps-ncmp-events/src/main/resources/schemas/ncmp/avc/ncmp-out-event-schema-1.0.0.json deleted file mode 100644 index 86c8ccb8ba..0000000000 --- a/cps-ncmp-events/src/main/resources/schemas/ncmp/avc/ncmp-out-event-schema-1.0.0.json +++ /dev/null @@ -1,57 +0,0 @@ -{ - "$schema": "https://json-schema.org/draft/2019-09/schema", - "$id": "urn:cps:org.onap.ncmp.events.subscription:1.0.0", - "$ref": "#/definitions/NcmpOutEvent", - "definitions": { - "NcmpOutEvent": { - "type": "object", - "description": "The payload applied cm subscription merge event coming out from NCMP.", - "javaType": "org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent", - "additionalProperties": false, - "properties": { - "data": { - "$ref": "#/definitions/Data" - } - }, - "required": [ - "data" - ], - "title": "NcmpOutEvent" - }, - "Data": { - "type": "object", - "description": "Information about the targets and subscription", - "additionalProperties": false, - "properties": { - "subscriptionId": { - "type": "string", - "description": "The unique subscription id" - }, - "acceptedTargets": { - "type": "object", - "existingJavaType": "java.util.Collection", - "description": "Collection of accepted targets" - }, - "rejectedTargets": { - "type": "object", - "existingJavaType": "java.util.Collection", - "description": "Collection of rejected targets" - }, - "pendingTargets": { - "type": "object", - "existingJavaType": "java.util.Collection", - "description": "Collection of pending targets" - } - }, - "required": [ - "subscriptionId", - "acceptedTargets", - "rejectedTargets", - "pendingTargets" - ], - "title": "Data" - } - } - - -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/cache/DmiCacheHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/cache/DmiCacheHandler.java deleted file mode 100644 index 6a5252a2ea..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/cache/DmiCacheHandler.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.impl.datajobs.subscription.cache; - -import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.PENDING; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import lombok.RequiredArgsConstructor; -import org.onap.cps.ncmp.api.data.models.DatastoreType; -import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.Predicate; -import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus; -import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails; -import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate; -import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService; -import org.onap.cps.ncmp.impl.inventory.InventoryPersistence; -import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle; -import org.springframework.stereotype.Component; - -@Component -@RequiredArgsConstructor -public class DmiCacheHandler { - - private final CmDataJobSubscriptionPersistenceService cmDataJobSubscriptionPersistenceService; - private final Map> cmNotificationSubscriptionCache; - private final InventoryPersistence inventoryPersistence; - - /** - * Adds subscription to the subscription cache. - * - * @param subscriptionId subscription id - * @param predicates subscription request predicates - */ - public void add(final String subscriptionId, final List predicates) { - cmNotificationSubscriptionCache.put(subscriptionId, createDmiSubscriptionsPerDmi(predicates)); - } - - /** - * Adds subscription to the subscription cache. - * - * @param subscriptionId subscription id - * @param dmiCmSubscriptionDetailsPerDmi map of dmi cm notification subscription details per dmi - */ - public void add(final String subscriptionId, - final Map - dmiCmSubscriptionDetailsPerDmi) { - cmNotificationSubscriptionCache.put(subscriptionId, dmiCmSubscriptionDetailsPerDmi); - } - - /** - * Get cm notification subscription cache entry via subscription id. - * - * @param subscriptionId subscription id - * @return map of dmi cm notification subscriptions per dmi - */ - public Map get(final String subscriptionId) { - return cmNotificationSubscriptionCache.get(subscriptionId); - } - - - /** - * Remove cache entries with CmNotificationSubscriptionStatus ACCEPTED/REJECTED via subscription id. - * - * @param subscriptionId subscription id as key in CM notification Subscription cache. - */ - public void removeAcceptedAndRejectedDmiSubscriptionEntries(final String subscriptionId) { - final Map dmiSubscriptionsPerDmi = - cmNotificationSubscriptionCache.get(subscriptionId); - final Map updatedDmiSubscriptionsPerDmi = - dmiSubscriptionsPerDmi.entrySet().stream() - .filter(dmiCmNotificationSubscription -> !isAcceptedOrRejected( - dmiCmNotificationSubscription.getValue())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - cmNotificationSubscriptionCache.put(subscriptionId, updatedDmiSubscriptionsPerDmi); - } - - /** - * Creates map of subscription details per DMI. - * - * @param predicates CM Subscription Create Request Predicates - * @return Map of DmiCmNotificationSubscription per DMI plugin - */ - public Map createDmiSubscriptionsPerDmi( - final List predicates) { - final Map dmiSubscriptionsPerDmi = - new HashMap<>(); - for (final Predicate requestPredicate : predicates) { - final List targetFilter = requestPredicate.getTargetFilter(); - final DatastoreType datastoreType = DatastoreType.fromDatastoreName( - requestPredicate.getScopeFilter().getDatastore().toString()); - final Set xpaths = new HashSet<>(requestPredicate.getScopeFilter().getXpathFilter()); - final Map> targetCmHandlesByDmiMap = groupTargetCmHandleIdsByDmi(targetFilter); - for (final Map.Entry> targetCmHandlesByDmi : targetCmHandlesByDmiMap.entrySet()) { - final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate = - new DmiCmSubscriptionPredicate(targetCmHandlesByDmi.getValue(), - datastoreType, xpaths); - updateDmiSubscriptionDetailsPerDmi(targetCmHandlesByDmi.getKey(), - dmiCmSubscriptionPredicate, - dmiSubscriptionsPerDmi); - } - } - return dmiSubscriptionsPerDmi; - } - - /** - * Update status in map of subscription details per DMI. - * - * @param subscriptionId String of subscription id - * @param dmiServiceName String of dmiServiceName - * @param status String of status - */ - public void updateDmiSubscriptionStatus(final String subscriptionId, final String dmiServiceName, - final CmSubscriptionStatus status) { - final Map dmiSubscriptionsPerDmi = - cmNotificationSubscriptionCache.get(subscriptionId); - dmiSubscriptionsPerDmi.get(dmiServiceName).setCmSubscriptionStatus(status); - cmNotificationSubscriptionCache.put(subscriptionId, dmiSubscriptionsPerDmi); - } - - /** - * Persist map of subscription details per DMI. - * - * @param subscriptionId String of subscription id - * @param dmiServiceName String of dmiServiceName - */ - public void persistIntoDatabasePerDmi(final String subscriptionId, final String dmiServiceName) { - final List dmiCmSubscriptionPredicates = - cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName) - .getDmiCmSubscriptionPredicates(); - for (final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate : dmiCmSubscriptionPredicates) { - final DatastoreType datastoreType = dmiCmSubscriptionPredicate.getDatastoreType(); - final Set cmHandles = dmiCmSubscriptionPredicate.getTargetCmHandleIds(); - for (final String cmHandle : cmHandles) { - cmDataJobSubscriptionPersistenceService.addSubscription(datastoreType.getDatastoreName(), - cmHandle, subscriptionId); - } - } - } - - /** - * Remove subscription from database per DMI service name. - * - * @param subscriptionId String of subscription id - * @param dmiServiceName String of dmiServiceName - */ - public void removeFromDatabase(final String subscriptionId, final String dmiServiceName) { - final List dmiCmSubscriptionPredicates = - cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName) - .getDmiCmSubscriptionPredicates(); - for (final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate : dmiCmSubscriptionPredicates) { - final DatastoreType datastoreType = dmiCmSubscriptionPredicate.getDatastoreType(); - final Set cmHandles = dmiCmSubscriptionPredicate.getTargetCmHandleIds(); - for (final String cmHandle : cmHandles) { - cmDataJobSubscriptionPersistenceService.removeSubscription(datastoreType.getDatastoreName(), - cmHandle, subscriptionId); - } - } - } - - private void updateDmiSubscriptionDetailsPerDmi( - final String dmiServiceName, - final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate, - final Map dmiSubscriptionsPerDmi) { - if (dmiSubscriptionsPerDmi.containsKey(dmiServiceName)) { - dmiSubscriptionsPerDmi.get(dmiServiceName) - .getDmiCmSubscriptionPredicates().add(dmiCmSubscriptionPredicate); - } else { - dmiSubscriptionsPerDmi.put(dmiServiceName, - new DmiCmSubscriptionDetails( - new ArrayList<>(List.of(dmiCmSubscriptionPredicate)), - PENDING)); - } - } - - private Map> groupTargetCmHandleIdsByDmi(final List targetCmHandleIds) { - final Map> targetCmHandlesByDmiServiceNames = new HashMap<>(); - final Collection yangModelCmHandles = - inventoryPersistence.getYangModelCmHandles(targetCmHandleIds); - - for (final YangModelCmHandle yangModelCmHandle : yangModelCmHandles) { - final String dmiServiceName = yangModelCmHandle.getDmiServiceName(); - targetCmHandlesByDmiServiceNames.putIfAbsent(dmiServiceName, new HashSet<>()); - targetCmHandlesByDmiServiceNames.get(dmiServiceName).add(yangModelCmHandle.getId()); - } - return targetCmHandlesByDmiServiceNames; - } - - private boolean isAcceptedOrRejected(final DmiCmSubscriptionDetails dmiCmSubscription) { - return dmiCmSubscription.getCmSubscriptionStatus().toString().equals("ACCEPTED") - || dmiCmSubscription.getCmSubscriptionStatus().toString().equals("REJECTED"); - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiCmSubscriptionDetailsPerDmiMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiCmSubscriptionDetailsPerDmiMapper.java deleted file mode 100644 index 979da46466..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiCmSubscriptionDetailsPerDmiMapper.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2024 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.impl.datajobs.subscription.dmi; - -import static org.onap.cps.ncmp.api.data.models.DatastoreType.fromDatastoreName; -import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.PENDING; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import org.onap.cps.ncmp.api.data.models.DatastoreType; -import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails; -import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionKey; -import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate; -import org.springframework.stereotype.Component; - -@Component -public class DmiCmSubscriptionDetailsPerDmiMapper { - - /** - * Maps Dmi Subscription Keys grouped by Dmi Plugin to DmiCmSubscriptionDetails per Dmi plugin. - * - * @param subscribersPerDmi Details managed by each dmi plugin - * @return Grouped Dmi Subscription details per dmi plugin - */ - public Map toDmiCmSubscriptionsPerDmi( - final Map> subscribersPerDmi) { - - final Map dmiSubscriptionsPerDmi = new HashMap<>(); - - subscribersPerDmi.forEach((dmiPluginName, dmiCmSubscriptionKeys) -> { - final Map> groupedByDatastoreTypeAndXpath = - groupByDatastoreTypeAndXpath(dmiCmSubscriptionKeys); - - final List dmiSubscriptionPredicates = - createDmiCmSubscriptionPredicates(groupedByDatastoreTypeAndXpath); - - final DmiCmSubscriptionDetails dmiCmSubscriptionDetails = - new DmiCmSubscriptionDetails(dmiSubscriptionPredicates, PENDING); - - dmiSubscriptionsPerDmi.put(dmiPluginName, dmiCmSubscriptionDetails); - }); - - return dmiSubscriptionsPerDmi; - } - - private static Map> groupByDatastoreTypeAndXpath( - final Collection dmiCmSubscriptionKeys) { - return dmiCmSubscriptionKeys.stream().collect(Collectors.groupingBy( - datastoreTypeAndXpath -> new DatastoreTypeAndXpath( - fromDatastoreName(datastoreTypeAndXpath.datastoreName()), datastoreTypeAndXpath.xpath()))); - } - - private static List createDmiCmSubscriptionPredicates( - final Map> groupedByDatastoreTypeAndXpath) { - final List dmiCmSubscriptionPredicates = new ArrayList<>(); - - for (final Map.Entry> datastoreTypeXpathGroupEntry : - groupedByDatastoreTypeAndXpath.entrySet()) { - final DatastoreTypeAndXpath datastoreTypeAndXpath = datastoreTypeXpathGroupEntry.getKey(); - final Set cmHandleIds = new HashSet<>(); - - for (final DmiCmSubscriptionKey dmiCmSubscriptionKey : datastoreTypeXpathGroupEntry.getValue()) { - cmHandleIds.add(dmiCmSubscriptionKey.cmHandleId()); - } - - final Set xpaths = Collections.singleton(datastoreTypeAndXpath.xpath()); - dmiCmSubscriptionPredicates.add( - new DmiCmSubscriptionPredicate(cmHandleIds, datastoreTypeAndXpath.datastoreType(), xpaths)); - } - - return dmiCmSubscriptionPredicates; - } - - - private record DatastoreTypeAndXpath(DatastoreType datastoreType, String xpath) { - } - -} - diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiInEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiInEventMapper.java index d6e9164693..2ca7878278 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiInEventMapper.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiInEventMapper.java @@ -27,13 +27,14 @@ import java.util.List; import java.util.Map; import java.util.Set; import lombok.RequiredArgsConstructor; -import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate; import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.CmHandle; import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.Data; -import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DmiInEvent; -import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.Predicate; -import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.ScopeFilter; +import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent; +import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataSelector; +import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.ProductionJobDefinition; +import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.TargetSelector; import org.onap.cps.ncmp.impl.inventory.InventoryPersistence; +import org.onap.cps.ncmp.impl.utils.JexParser; import org.springframework.stereotype.Component; @Component @@ -43,40 +44,39 @@ public class DmiInEventMapper { private final InventoryPersistence inventoryPersistence; /** - * Mapper to form a request for the DMI Plugin for the Cm Notification Subscription. + * This method maps relevant details for a subscription to a data job subscription DMI in event. * - * @param dmiCmSubscriptionPredicates Collection of Cm Notification Subscription predicates - * @return DmiInEvent to be sent to DMI Plugin + * @param cmHandleIds list of cm handle ID(s) + * @param dataNodeSelectors list of data node selectors + * @param notificationTypes the list of notification types + * @param notificationFilter the notification filter + * @return data job subscription DMI in event */ - public DmiInEvent toDmiInEvent(final List dmiCmSubscriptionPredicates) { - final DmiInEvent dmiInEvent = new DmiInEvent(); - final Data cmSubscriptionData = new Data(); - cmSubscriptionData.setPredicates(mapToDmiInEventPredicates(dmiCmSubscriptionPredicates)); - cmSubscriptionData.setCmHandles(mapToCmSubscriptionCmHandleWithAdditionalProperties( - extractUniqueCmHandleIds(dmiCmSubscriptionPredicates))); - dmiInEvent.setData(cmSubscriptionData); + public DataJobSubscriptionDmiInEvent toDmiInEvent(final List cmHandleIds, + final List dataNodeSelectors, + final List notificationTypes, + final String notificationFilter) { + final DataJobSubscriptionDmiInEvent dmiInEvent = new DataJobSubscriptionDmiInEvent(); + final Data data = new Data(); + final String dataNodeSelectorsAsJson = JexParser.toJsonExpressionsAsString(dataNodeSelectors); + data.setCmHandles(mapToCmSubscriptionCmHandleWithAdditionalProperties(new HashSet<>(cmHandleIds))); + addProductJobDefinition(data, dataNodeSelectorsAsJson); + addDataSelector(data, notificationTypes, notificationFilter); + dmiInEvent.setData(data); return dmiInEvent; - } - private List mapToDmiInEventPredicates( - final List dmiCmSubscriptionPredicates) { - - final List predicates = new ArrayList<>(); - - dmiCmSubscriptionPredicates.forEach(dmiCmNotificationSubscriptionPredicate -> { - final Predicate predicate = new Predicate(); - final ScopeFilter scopeFilter = new ScopeFilter(); - scopeFilter.setDatastore(ScopeFilter.Datastore.fromValue( - dmiCmNotificationSubscriptionPredicate.getDatastoreType().getDatastoreName())); - scopeFilter.setXpathFilter(dmiCmNotificationSubscriptionPredicate.getXpaths().stream().toList()); - predicate.setScopeFilter(scopeFilter); - predicate.setTargetFilter(dmiCmNotificationSubscriptionPredicate.getTargetCmHandleIds().stream().toList()); - predicates.add(predicate); - }); - - return predicates; + private static void addProductJobDefinition(final Data data, final String dataNodeSelector) { + data.setProductionJobDefinition(new ProductionJobDefinition()); + data.getProductionJobDefinition().setTargetSelector(new TargetSelector()); + data.getProductionJobDefinition().getTargetSelector().setDataNodeSelector(dataNodeSelector); + } + private static void addDataSelector(final Data data, final List notificationTypes, + final String notificationFilter) { + data.getProductionJobDefinition().setDataSelector(new DataSelector()); + data.getProductionJobDefinition().getDataSelector().setNotificationTypes(notificationTypes); + data.getProductionJobDefinition().getDataSelector().setNotificationFilter(notificationFilter); } private List mapToCmSubscriptionCmHandleWithAdditionalProperties(final Set cmHandleIds) { @@ -98,13 +98,4 @@ public class DmiInEventMapper { } - private Set extractUniqueCmHandleIds(final List dmiCmSubscriptionPredicates) { - - final Set cmHandleIds = new HashSet<>(); - dmiCmSubscriptionPredicates.forEach(dmiCmNotificationSubscriptionPredicate -> cmHandleIds.addAll( - dmiCmNotificationSubscriptionPredicate.getTargetCmHandleIds())); - return cmHandleIds; - } - - } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiOutEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiOutEventConsumer.java deleted file mode 100644 index e747426851..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiOutEventConsumer.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.impl.datajobs.subscription.dmi; - -import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_ACCEPTED; -import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_REJECTED; -import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent; - -import io.cloudevents.CloudEvent; -import java.util.Map; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.onap.cps.ncmp.api.NcmpResponseStatus; -import org.onap.cps.ncmp.impl.datajobs.subscription.cache.DmiCacheHandler; -import org.onap.cps.ncmp.impl.datajobs.subscription.dmi_to_ncmp.Data; -import org.onap.cps.ncmp.impl.datajobs.subscription.dmi_to_ncmp.DmiOutEvent; -import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus; -import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails; -import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.NcmpOutEventMapper; -import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.NcmpOutEventProducer; -import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Component; - -@Component -@Slf4j -@RequiredArgsConstructor -@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) -public class DmiOutEventConsumer { - - private final DmiCacheHandler dmiCacheHandler; - private final NcmpOutEventProducer ncmpOutEventProducer; - private final NcmpOutEventMapper ncmpOutEventMapper; - - private static final String CM_SUBSCRIPTION_CORRELATION_ID_SEPARATOR = "#"; - - /** - * Consume the Cm Notification Subscription event from the dmi-plugin. - * - * @param dmiOutEventAsConsumerRecord the event to be consumed - */ - @KafkaListener(topics = "${app.ncmp.avc.cm-subscription-dmi-out}", - containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory") - public void consumeDmiOutEvent(final ConsumerRecord dmiOutEventAsConsumerRecord) { - final CloudEvent cloudEvent = dmiOutEventAsConsumerRecord.value(); - final DmiOutEvent dmiOutEvent = toTargetEvent(cloudEvent, DmiOutEvent.class); - final String correlationId = String.valueOf(cloudEvent.getExtension("correlationid")); - if (dmiOutEvent != null && correlationId != null) { - final String eventType = cloudEvent.getType(); - handleDmiOutEvent(correlationId, eventType, dmiOutEvent); - } - } - - private void handleDmiOutEvent(final String correlationId, final String eventType, - final DmiOutEvent dmiOutEvent) { - final String subscriptionId = correlationId.split(CM_SUBSCRIPTION_CORRELATION_ID_SEPARATOR)[0]; - final String dmiPluginName = correlationId.split(CM_SUBSCRIPTION_CORRELATION_ID_SEPARATOR)[1]; - - if (checkStatusCodeAndMessage(CM_DATA_SUBSCRIPTION_ACCEPTED, dmiOutEvent.getData())) { - handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmSubscriptionStatus.ACCEPTED); - if (eventType.equals("subscriptionCreateResponse")) { - dmiCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName); - } - if (eventType.equals("subscriptionDeleteResponse")) { - dmiCacheHandler.removeFromDatabase(subscriptionId, dmiPluginName); - } - handleEventsStatusPerDmi(subscriptionId, eventType); - } - - if (checkStatusCodeAndMessage(CM_DATA_SUBSCRIPTION_REJECTED, dmiOutEvent.getData())) { - handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmSubscriptionStatus.REJECTED); - handleEventsStatusPerDmi(subscriptionId, eventType); - } - - log.info("Cm Subscription with id : {} handled by the dmi-plugin : {} has the status : {}", subscriptionId, - dmiPluginName, dmiOutEvent.getData().getStatusMessage()); - } - - private void handleCacheStatusPerDmi(final String subscriptionId, final String dmiPluginName, - final CmSubscriptionStatus cmSubscriptionStatus) { - dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiPluginName, - cmSubscriptionStatus); - } - - private void handleEventsStatusPerDmi(final String subscriptionId, final String eventType) { - final Map dmiSubscriptionsPerDmi = dmiCacheHandler.get(subscriptionId); - final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEvent(subscriptionId, dmiSubscriptionsPerDmi); - ncmpOutEventProducer.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false); - } - - private boolean checkStatusCodeAndMessage(final NcmpResponseStatus ncmpResponseStatus, - final Data dmiOutData) { - return ncmpResponseStatus.getCode().equals(dmiOutData.getStatusCode()) - && ncmpResponseStatus.getMessage().equals(dmiOutData.getStatusMessage()); - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiInEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventProducer.java similarity index 79% rename from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiInEventProducer.java rename to cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventProducer.java index 71d8d3c93a..46c9457f10 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiInEventProducer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventProducer.java @@ -26,7 +26,7 @@ import io.cloudevents.CloudEvent; import java.util.Map; import lombok.RequiredArgsConstructor; import org.onap.cps.events.EventsProducer; -import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DmiInEvent; +import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent; import org.onap.cps.ncmp.utils.events.NcmpEvent; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -35,7 +35,7 @@ import org.springframework.stereotype.Component; @Component @RequiredArgsConstructor @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) -public class DmiInEventProducer { +public class EventProducer { private final EventsProducer eventsProducer; @@ -48,22 +48,22 @@ public class DmiInEventProducer { * @param subscriptionId CM subscription id * @param dmiPluginName Dmi plugin Name * @param eventType Type of event - * @param dmiInEvent Cm Notification Subscription event for Dmi + * @param event Cm Notification Subscription event for Dmi */ - public void sendDmiInEvent(final String subscriptionId, final String dmiPluginName, - final String eventType, final DmiInEvent dmiInEvent) { + public void send(final String subscriptionId, final String dmiPluginName, + final String eventType, final DataJobSubscriptionDmiInEvent event) { eventsProducer.sendCloudEvent(dmiInEventTopic, subscriptionId, - buildAndGetDmiInEventAsCloudEvent(subscriptionId, dmiPluginName, eventType, dmiInEvent)); + toCloudEvent(eventType, event, subscriptionId, dmiPluginName)); } - private CloudEvent buildAndGetDmiInEventAsCloudEvent(final String subscriptionId, final String dmiPluginName, - final String eventType, final DmiInEvent dmiInEvent) { + private CloudEvent toCloudEvent(final String eventType, final DataJobSubscriptionDmiInEvent event, + final String subscriptionId, final String dmiPluginName) { return NcmpEvent.builder() .type(eventType) .dataSchema(SUBSCRIPTIONS_V1.getDataSchema()) + .data(event) .extensions(Map.of("correlationid", String.join("#", subscriptionId, dmiPluginName))) - .data(dmiInEvent) .build() .asCloudEvent(); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/models/CmSubscriptionStatus.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/models/CmSubscriptionStatus.java index cd3f2f294b..c378ee99eb 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/models/CmSubscriptionStatus.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/models/CmSubscriptionStatus.java @@ -23,5 +23,5 @@ package org.onap.cps.ncmp.impl.datajobs.subscription.models; public enum CmSubscriptionStatus { ACCEPTED, REJECTED, - PENDING + UNKNOWN } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionComparator.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionComparator.java deleted file mode 100644 index 84e59d9823..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionComparator.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import lombok.RequiredArgsConstructor; -import org.onap.cps.ncmp.api.data.models.DatastoreType; -import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate; -import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService; -import org.springframework.stereotype.Component; - -@Component -@RequiredArgsConstructor -public class CmSubscriptionComparator { - - private final CmDataJobSubscriptionPersistenceService cmDataJobSubscriptionPersistenceService; - - /** - * Get the new Dmi Predicates for a given predicates list. - * - * @param existingDmiCmSubscriptionPredicates list of DmiCmNotificationSubscriptionPredicates - * @return new list of DmiCmNotificationSubscriptionPredicates - */ - public List getNewDmiSubscriptionPredicates( - final List existingDmiCmSubscriptionPredicates) { - final List newDmiCmSubscriptionPredicates = - new ArrayList<>(); - for (final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate : existingDmiCmSubscriptionPredicates) { - final Set targetCmHandleIds = new HashSet<>(); - final Set xpaths = new HashSet<>(); - final DatastoreType datastoreType = dmiCmSubscriptionPredicate.getDatastoreType(); - for (final String cmHandleId : dmiCmSubscriptionPredicate.getTargetCmHandleIds()) { - if (!cmDataJobSubscriptionPersistenceService.hasAtLeastOneSubscription( - datastoreType.getDatastoreName(), cmHandleId)) { - targetCmHandleIds.add(cmHandleId); - } - } - populateValidDmiSubscriptionPredicates(targetCmHandleIds, xpaths, datastoreType, - newDmiCmSubscriptionPredicates); - } - return newDmiCmSubscriptionPredicates; - } - - private void populateValidDmiSubscriptionPredicates(final Set targetCmHandleIds, - final Set xpaths, - final DatastoreType datastoreType, - final List - dmiCmSubscriptionPredicates) { - if (!targetCmHandleIds.isEmpty()) { - final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate = - new DmiCmSubscriptionPredicate(targetCmHandleIds, datastoreType, xpaths); - dmiCmSubscriptionPredicates.add(dmiCmSubscriptionPredicate); - } - } - -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandler.java index 0ada3ecc2f..47315b3ad4 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandler.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandler.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2024 Nordix Foundation + * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,23 +21,18 @@ package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp; import java.util.List; -import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.Predicate; +import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataSelector; public interface CmSubscriptionHandler { /** * Process cm notification subscription create request. * - * @param subscriptionId subscription id - * @param predicates subscription predicates + * @param dataSelector subscription data selector + * @param subscriptionId subscription id + * @param dataNodeSelectors subscription data node selectors */ - void processSubscriptionCreateRequest(final String subscriptionId, final List predicates); - - /** - * Process cm notification subscription delete request. - * - * @param subscriptionId subscription id - */ - void processSubscriptionDeleteRequest(final String subscriptionId); + void processSubscriptionCreate(final DataSelector dataSelector, final String subscriptionId, + final List dataNodeSelectors); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImpl.java index 85ea1c6be3..9a0be3be39 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImpl.java @@ -21,212 +21,111 @@ package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; -import org.onap.cps.api.model.DataNode; -import org.onap.cps.ncmp.impl.datajobs.subscription.cache.DmiCacheHandler; -import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.Predicate; -import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiCmSubscriptionDetailsPerDmiMapper; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataSelector; import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventMapper; -import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventProducer; -import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus; -import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails; -import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionKey; -import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate; -import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionTuple; -import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent; -import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DmiInEvent; +import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.EventProducer; +import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent; import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService; import org.onap.cps.ncmp.impl.inventory.InventoryPersistence; +import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle; +import org.onap.cps.ncmp.impl.utils.AlternateIdMatcher; +import org.onap.cps.ncmp.impl.utils.JexParser; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; @Service @RequiredArgsConstructor +@Slf4j @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler { - private static final Pattern SUBSCRIPTION_KEY_FROM_XPATH_PATTERN = Pattern.compile( - "^/datastores/datastore\\[@name='([^']*)']/cm-handles/cm-handle\\[@id='([^']*)']/" - + "filters/filter\\[@xpath='(.*)']$"); - private final CmDataJobSubscriptionPersistenceService cmDataJobSubscriptionPersistenceService; - private final CmSubscriptionComparator cmSubscriptionComparator; - private final NcmpOutEventMapper ncmpOutEventMapper; private final DmiInEventMapper dmiInEventMapper; - private final DmiCmSubscriptionDetailsPerDmiMapper dmiCmSubscriptionDetailsPerDmiMapper; - private final NcmpOutEventProducer ncmpOutEventProducer; - private final DmiInEventProducer dmiInEventProducer; - private final DmiCacheHandler dmiCacheHandler; + private final EventProducer eventProducer; private final InventoryPersistence inventoryPersistence; + private final AlternateIdMatcher alternateIdMatcher; @Override - public void processSubscriptionCreateRequest(final String subscriptionId, final List predicates) { - if (cmDataJobSubscriptionPersistenceService.isNewSubscriptionId(subscriptionId)) { - dmiCacheHandler.add(subscriptionId, predicates); - handleNewCmSubscription(subscriptionId); - scheduleNcmpOutEventResponse(subscriptionId, "subscriptionCreateResponse"); - } else { - rejectAndSendCreateRequest(subscriptionId, predicates); + public void processSubscriptionCreate(final DataSelector dataSelector, + final String subscriptionId, final List dataNodeSelectors) { + for (final String dataNodeSelector : dataNodeSelectors) { + cmDataJobSubscriptionPersistenceService.add(subscriptionId, dataNodeSelector); } + sendCreateEventToDmis(subscriptionId, dataSelector); } - @Override - public void processSubscriptionDeleteRequest(final String subscriptionId) { - final Collection subscriptionDataNodes = - cmDataJobSubscriptionPersistenceService.getAffectedDataNodes(subscriptionId); - final DmiCmSubscriptionTuple dmiCmSubscriptionTuple = - getLastRemainingAndOverlappingSubscriptionsPerDmi(subscriptionDataNodes); - dmiCacheHandler.add(subscriptionId, mergeDmiCmSubscriptionDetailsPerDmiMaps(dmiCmSubscriptionTuple)); - if (dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi().isEmpty()) { - acceptAndSendDeleteRequest(subscriptionId); - } else { - sendSubscriptionDeleteRequestToDmi(subscriptionId, - dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi( - dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi())); - scheduleNcmpOutEventResponse(subscriptionId, "subscriptionDeleteResponse"); + private void sendCreateEventToDmis(final String subscriptionId, final DataSelector dataSelector) { + final List dataNodeSelectors = + cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors(subscriptionId); + final Map cmHandleIdsAndDataNodeSelectorsPerDmi = + createDmiInEventTargetsPerDmi(dataNodeSelectors); + + for (final Map.Entry cmHandleIdsAndDataNodeSelectorsEntry : + cmHandleIdsAndDataNodeSelectorsPerDmi.entrySet()) { + final String dmiServiceName = cmHandleIdsAndDataNodeSelectorsEntry.getKey(); + final CmHandleIdsAndDataNodeSelectors cmHandleIdsAndDataNodeSelectors = + cmHandleIdsAndDataNodeSelectorsEntry.getValue(); + final DataJobSubscriptionDmiInEvent dmiInEvent = + buildDmiInEvent(cmHandleIdsAndDataNodeSelectors, dataSelector); + eventProducer.send(subscriptionId, dmiServiceName, "subscriptionCreateRequest", dmiInEvent); } } - private Map mergeDmiCmSubscriptionDetailsPerDmiMaps( - final DmiCmSubscriptionTuple dmiCmSubscriptionTuple) { - final Map lastRemainingDmiSubscriptionsPerDmi = - dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi( - dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi()); - final Map overlappingDmiSubscriptionsPerDmi = - dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi( - dmiCmSubscriptionTuple.overlappingSubscriptionsPerDmi()); - final Map mergedDmiSubscriptionsPerDmi = - new HashMap<>(lastRemainingDmiSubscriptionsPerDmi); - overlappingDmiSubscriptionsPerDmi.forEach((dmiServiceName, dmiCmSubscriptionDetails) -> - mergedDmiSubscriptionsPerDmi.merge(dmiServiceName, dmiCmSubscriptionDetails, - this::mergeDmiCmSubscriptionDetails)); - return mergedDmiSubscriptionsPerDmi; - } - - private DmiCmSubscriptionDetails mergeDmiCmSubscriptionDetails( - final DmiCmSubscriptionDetails dmiCmSubscriptionDetails, - final DmiCmSubscriptionDetails otherDmiCmSubscriptionDetails) { - final List mergedDmiCmSubscriptionPredicates = - new ArrayList<>(dmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates()); - mergedDmiCmSubscriptionPredicates.addAll(otherDmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates()); - return new DmiCmSubscriptionDetails(mergedDmiCmSubscriptionPredicates, CmSubscriptionStatus.PENDING); - } - private void scheduleNcmpOutEventResponse(final String subscriptionId, final String eventType) { - ncmpOutEventProducer.sendNcmpOutEvent(subscriptionId, eventType, null, true); + private DataJobSubscriptionDmiInEvent buildDmiInEvent( + final CmHandleIdsAndDataNodeSelectors cmHandleIdsAndDataNodeSelectors, + final DataSelector dataSelector) { + final List cmHandleIds = new ArrayList<>(cmHandleIdsAndDataNodeSelectors.cmHandleIds); + final List dataNodeSelectors = new ArrayList<>(cmHandleIdsAndDataNodeSelectors.dataNodeSelectors); + final List notificationTypes = dataSelector.getNotificationTypes(); + final String notificationFilter = dataSelector.getNotificationFilter(); + return dmiInEventMapper.toDmiInEvent(cmHandleIds, dataNodeSelectors, notificationTypes, notificationFilter); } - private void rejectAndSendCreateRequest(final String subscriptionId, final List predicates) { - final Set subscriptionTargetFilters = - predicates.stream().flatMap(predicate -> predicate.getTargetFilter().stream()) - .collect(Collectors.toSet()); - final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEventForRejectedRequest(subscriptionId, - new ArrayList<>(subscriptionTargetFilters)); - ncmpOutEventProducer.sendNcmpOutEvent(subscriptionId, "subscriptionCreateResponse", ncmpOutEvent, false); - } - - private void acceptAndSendDeleteRequest(final String subscriptionId) { - final Set dmiServiceNames = dmiCacheHandler.get(subscriptionId).keySet(); - for (final String dmiServiceName : dmiServiceNames) { - dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiServiceName, - CmSubscriptionStatus.ACCEPTED); - dmiCacheHandler.removeFromDatabase(subscriptionId, dmiServiceName); - } - final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEvent(subscriptionId, - dmiCacheHandler.get(subscriptionId)); - ncmpOutEventProducer.sendNcmpOutEvent(subscriptionId, "subscriptionDeleteResponse", ncmpOutEvent, - false); - } - - private void handleNewCmSubscription(final String subscriptionId) { - final Map dmiSubscriptionsPerDmi = - dmiCacheHandler.get(subscriptionId); - dmiSubscriptionsPerDmi.forEach((dmiPluginName, dmiSubscriptionDetails) -> { - final List dmiCmSubscriptionPredicates = - cmSubscriptionComparator.getNewDmiSubscriptionPredicates( - dmiSubscriptionDetails.getDmiCmSubscriptionPredicates()); - - if (dmiCmSubscriptionPredicates.isEmpty()) { - acceptAndPersistCmSubscriptionPerDmi(subscriptionId, dmiPluginName); + private Map createDmiInEventTargetsPerDmi( + final List dataNodeSelectors) { + final Map dmiInEventTargetsPerDmi = new HashMap<>(); + for (final String dataNodeSelector : dataNodeSelectors) { + final String cmHandleId = getCmHandleId(dataNodeSelector); + if (cmHandleId == null) { + log.info("Failed to resolve cm handle ID for dataNodeSelector {}", dataNodeSelector); } else { - sendDmiInEventPerDmi(subscriptionId, dmiPluginName, dmiCmSubscriptionPredicates); + final String dmiServiceName = getDmiServiceName(cmHandleId); + final CmHandleIdsAndDataNodeSelectors cmHandleIdsAndDataNodeSelectors; + if (dmiInEventTargetsPerDmi.get(dmiServiceName) == null) { + cmHandleIdsAndDataNodeSelectors = + new CmHandleIdsAndDataNodeSelectors(new HashSet<>(), new HashSet<>()); + dmiInEventTargetsPerDmi.put(dmiServiceName, cmHandleIdsAndDataNodeSelectors); + } else { + cmHandleIdsAndDataNodeSelectors = dmiInEventTargetsPerDmi.get(dmiServiceName); + } + cmHandleIdsAndDataNodeSelectors.cmHandleIds.add(cmHandleId); + cmHandleIdsAndDataNodeSelectors.dataNodeSelectors.add(dataNodeSelector); } - }); - } - - private void sendDmiInEventPerDmi(final String subscriptionId, final String dmiPluginName, - final List dmiCmSubscriptionPredicates) { - final DmiInEvent dmiInEvent = dmiInEventMapper.toDmiInEvent(dmiCmSubscriptionPredicates); - dmiInEventProducer.sendDmiInEvent(subscriptionId, dmiPluginName, - "subscriptionCreateRequest", dmiInEvent); - } - - private void acceptAndPersistCmSubscriptionPerDmi(final String subscriptionId, final String dmiPluginName) { - dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiPluginName, - CmSubscriptionStatus.ACCEPTED); - dmiCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName); - } - - private void sendSubscriptionDeleteRequestToDmi(final String subscriptionId, - final Map - dmiCmSubscriptionsPerDmi) { - dmiCmSubscriptionsPerDmi.forEach((dmiPluginName, dmiCmSubscriptionDetails) -> { - final DmiInEvent dmiInEvent = - dmiInEventMapper.toDmiInEvent( - dmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates()); - dmiInEventProducer.sendDmiInEvent(subscriptionId, - dmiPluginName, "subscriptionDeleteRequest", dmiInEvent); - }); + } + return dmiInEventTargetsPerDmi; } - - private DmiCmSubscriptionTuple getLastRemainingAndOverlappingSubscriptionsPerDmi( - final Collection subscriptionNodes) { - final Map> lastRemainingSubscriptionsPerDmi = new HashMap<>(); - final Map> overlappingSubscriptionsPerDmi = new HashMap<>(); - - for (final DataNode subscriptionNode : subscriptionNodes) { - final DmiCmSubscriptionKey dmiCmSubscriptionKey = extractCmSubscriptionKey(subscriptionNode.getXpath()); - final String dmiServiceName = inventoryPersistence.getYangModelCmHandle( - dmiCmSubscriptionKey.cmHandleId()).getDmiServiceName(); - @SuppressWarnings("unchecked") final List subscribers = - (List) subscriptionNode.getLeaves().get("subscriptionIds"); - populateDmiCmSubscriptionTuple(subscribers, overlappingSubscriptionsPerDmi, - lastRemainingSubscriptionsPerDmi, dmiServiceName, dmiCmSubscriptionKey); + private String getCmHandleId(final String dataNodeSelector) { + final String alternateId = JexParser.extractFdnPrefix(dataNodeSelector).orElse(""); + if (alternateId.isEmpty()) { + return null; } - return new DmiCmSubscriptionTuple(lastRemainingSubscriptionsPerDmi, overlappingSubscriptionsPerDmi); + return alternateIdMatcher.getCmHandleId(alternateId); } - private static void populateDmiCmSubscriptionTuple(final List subscribers, - final Map> - overlappingSubscriptionsPerDmi, - final Map> - lastRemainingSubscriptionsPerDmi, - final String dmiServiceName, - final DmiCmSubscriptionKey dmiCmSubscriptionKey) { - final Map> targetMap = - subscribers.size() > 1 ? overlappingSubscriptionsPerDmi : lastRemainingSubscriptionsPerDmi; - targetMap.computeIfAbsent(dmiServiceName, dmiName -> new HashSet<>()).add(dmiCmSubscriptionKey); + private String getDmiServiceName(final String cmHandleId) { + final YangModelCmHandle yangModelCmHandle = inventoryPersistence.getYangModelCmHandle(cmHandleId); + return yangModelCmHandle.getDmiServiceName(); } - private DmiCmSubscriptionKey extractCmSubscriptionKey(final String xpath) { - final Matcher matcher = SUBSCRIPTION_KEY_FROM_XPATH_PATTERN.matcher(xpath); - if (matcher.find()) { - final String datastoreName = matcher.group(1); - final String cmHandleId = matcher.group(2); - final String filterXpath = matcher.group(3); - return new DmiCmSubscriptionKey(datastoreName, cmHandleId, filterXpath); - } - throw new IllegalArgumentException("DataNode xpath does not represent a subscription key"); - } + private record CmHandleIdsAndDataNodeSelectors(Set cmHandleIds, Set dataNodeSelectors) {} } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpInEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpInEventConsumer.java index 141bbd072c..464604a5f9 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpInEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpInEventConsumer.java @@ -23,7 +23,9 @@ package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp; import java.util.List; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataJob; import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataJobSubscriptionOperationInEvent; +import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataSelector; import org.onap.cps.ncmp.impl.utils.JexParser; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; @@ -35,6 +37,8 @@ import org.springframework.stereotype.Component; @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) public class NcmpInEventConsumer { + private final CmSubscriptionHandler cmSubscriptionHandler; + /** * Consume the specified event. * @@ -46,16 +50,19 @@ public class NcmpInEventConsumer { + "org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataJobSubscriptionOperationInEvent"}) public void consumeSubscriptionEvent( final DataJobSubscriptionOperationInEvent dataJobSubscriptionOperationInEvent) { - final String eventType = dataJobSubscriptionOperationInEvent.getEventType(); - final String dataNodeSelector = dataJobSubscriptionOperationInEvent.getEvent().getDataJob() - .getProductionJobDefinition().getTargetSelector().getDataNodeSelector(); - final List fdns = JexParser.toXpaths(dataNodeSelector); final String dataJobId = dataJobSubscriptionOperationInEvent.getEvent().getDataJob().getId(); - final String dataTypeId = dataJobSubscriptionOperationInEvent.getEvent().getDataType() != null - ? dataJobSubscriptionOperationInEvent.getEvent().getDataType().getDataTypeId() : "UNKNOWN"; - log.info("Consumed subscription event with details: | jobId={} | eventType={} | fdns={} | dataType={}", - dataJobId, eventType, fdns, dataTypeId); + log.info("Consumed subscription event with details: | dataJobId={} | eventType={}", dataJobId, eventType); + + if (eventType.equals("dataJobCreated")) { + final DataJob dataJob = dataJobSubscriptionOperationInEvent.getEvent().getDataJob(); + final String dataNodeSelector = + dataJob.getProductionJobDefinition().getTargetSelector().getDataNodeSelector(); + final List dataNodeSelectors = JexParser.toXpaths(dataNodeSelector); + final DataSelector dataSelector = dataJobSubscriptionOperationInEvent.getEvent().getDataJob() + .getProductionJobDefinition().getDataSelector(); + cmSubscriptionHandler.processSubscriptionCreate(dataSelector, dataJobId, dataNodeSelectors); + } } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpOutEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpOutEventMapper.java deleted file mode 100644 index 8954412085..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpOutEventMapper.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import lombok.RequiredArgsConstructor; -import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus; -import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails; -import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate; -import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.Data; -import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent; -import org.springframework.stereotype.Component; - -@Component -@RequiredArgsConstructor -public class NcmpOutEventMapper { - - /** - * Mapper to form a response for the client for the Cm Notification Subscription. - * - * @param subscriptionId CM notification subscription id - * @param dmiSubscriptionsPerDmi contains CmNotificationSubscriptionDetails per dmi plugin - * @return CmNotificationSubscriptionNcmpOutEvent to sent back to the client - */ - public NcmpOutEvent toNcmpOutEvent(final String subscriptionId, - final Map dmiSubscriptionsPerDmi) { - - final NcmpOutEvent ncmpOutEvent = new NcmpOutEvent(); - final Data cmSubscriptionData = new Data(); - cmSubscriptionData.setSubscriptionId(subscriptionId); - populateNcmpOutEventWithCmHandleIds(dmiSubscriptionsPerDmi, - cmSubscriptionData); - ncmpOutEvent.setData(cmSubscriptionData); - - return ncmpOutEvent; - } - - /** - * Mapper to form a rejected response for the client for the Cm Notification Subscription Request. - * - * @param subscriptionId subscription id - * @param rejectedTargetFilters list of rejected target filters for the subscription request - * @return to sent back to the client - */ - public NcmpOutEvent toNcmpOutEventForRejectedRequest(final String subscriptionId, - final List rejectedTargetFilters) { - final NcmpOutEvent ncmpOutEvent = new NcmpOutEvent(); - final Data cmSubscriptionData = new Data(); - cmSubscriptionData.setSubscriptionId(subscriptionId); - cmSubscriptionData.setRejectedTargets(rejectedTargetFilters); - ncmpOutEvent.setData(cmSubscriptionData); - return ncmpOutEvent; - } - - private void populateNcmpOutEventWithCmHandleIds( - final Map dmiSubscriptionsPerDmi, - final Data cmSubscriptionData) { - - final Collection acceptedCmHandleIds = new HashSet<>(); - final Collection pendingCmHandleIds = new HashSet<>(); - final Collection rejectedCmHandleIds = new HashSet<>(); - - dmiSubscriptionsPerDmi.forEach((dmiPluginName, dmiSubscriptionDetails) -> { - final CmSubscriptionStatus cmSubscriptionStatus = - dmiSubscriptionDetails.getCmSubscriptionStatus(); - final List dmiCmSubscriptionPredicates = - dmiSubscriptionDetails.getDmiCmSubscriptionPredicates(); - - switch (cmSubscriptionStatus) { - case ACCEPTED -> acceptedCmHandleIds.addAll( - extractCmHandleIds(dmiCmSubscriptionPredicates)); - case PENDING -> pendingCmHandleIds.addAll(extractCmHandleIds(dmiCmSubscriptionPredicates)); - default -> rejectedCmHandleIds.addAll(extractCmHandleIds(dmiCmSubscriptionPredicates)); - } - }); - - cmSubscriptionData.setAcceptedTargets(acceptedCmHandleIds); - cmSubscriptionData.setPendingTargets(pendingCmHandleIds); - cmSubscriptionData.setRejectedTargets(rejectedCmHandleIds); - - } - - private List extractCmHandleIds( - final List dmiCmSubscriptionPredicates) { - final List cmHandleIds = new ArrayList<>(); - dmiCmSubscriptionPredicates.forEach(dmiSubscriptionPredicate -> cmHandleIds.addAll( - dmiSubscriptionPredicate.getTargetCmHandleIds())); - - return cmHandleIds; - } - -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpOutEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpOutEventProducer.java deleted file mode 100644 index 027414fcde..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpOutEventProducer.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp; - -import static org.onap.cps.ncmp.events.NcmpEventDataSchema.SUBSCRIPTIONS_V1; - -import io.cloudevents.CloudEvent; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.onap.cps.events.EventsProducer; -import org.onap.cps.ncmp.impl.datajobs.subscription.cache.DmiCacheHandler; -import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent; -import org.onap.cps.ncmp.utils.events.NcmpEvent; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -@Component -@Slf4j -@RequiredArgsConstructor -@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) -public class NcmpOutEventProducer { - - @Value("${app.ncmp.avc.cm-subscription-ncmp-out}") - private String ncmpOutEventTopic; - - @Value("${ncmp.timers.subscription-forwarding.dmi-response-timeout-ms}") - private Integer dmiOutEventTimeoutInMs; - - private final EventsProducer eventsProducer; - private final NcmpOutEventMapper ncmpOutEventMapper; - private final DmiCacheHandler dmiCacheHandler; - private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); - private static final Map> scheduledTasksPerSubscriptionIdAndEventType = - new ConcurrentHashMap<>(); - - /** - * Send the event to the client who requested the subscription with key as subscription id and event is Cloud - * Event compliant. - * - * @param subscriptionId CM subscription id - * @param eventType Type of event - * @param ncmpOutEvent Cm Notification Subscription Event for the - * client - * @param isScheduledEvent Determines if the event is to be scheduled - * or send now - */ - public void sendNcmpOutEvent(final String subscriptionId, final String eventType, - final NcmpOutEvent ncmpOutEvent, final boolean isScheduledEvent) { - - final String taskKey = subscriptionId.concat(eventType); - - if (isScheduledEvent && !scheduledTasksPerSubscriptionIdAndEventType.containsKey(taskKey)) { - final ScheduledFuture scheduledFuture = scheduleAndSendNcmpOutEvent(subscriptionId, eventType); - scheduledTasksPerSubscriptionIdAndEventType.putIfAbsent(taskKey, scheduledFuture); - log.debug("Scheduled the Cm Subscription Event for subscriptionId : {} and eventType : {}", subscriptionId, - eventType); - } else { - cancelScheduledTask(taskKey); - if (ncmpOutEvent != null) { - sendNcmpOutEventNow(subscriptionId, eventType, ncmpOutEvent); - log.debug("Sent Cm Subscription Event on demand for subscriptionId : {} and eventType : {}", - subscriptionId, eventType); - } - } - } - - /** - * Get an NCMP out event as cloud event. - * - * @param subscriptionId subscription id - * @param eventType event type - * @param ncmpOutEvent cm notification subscription NCMP out event - * @return cm notification subscription NCMP out event as cloud event - */ - public static CloudEvent buildAndGetNcmpOutEventAsCloudEvent( - final String subscriptionId, final String eventType, final NcmpOutEvent ncmpOutEvent) { - - return NcmpEvent.builder() - .type(eventType) - .dataSchema(SUBSCRIPTIONS_V1.getDataSchema()) - .extensions(Map.of("correlationid", subscriptionId)) - .data(ncmpOutEvent) - .build() - .asCloudEvent(); - } - - private ScheduledFuture scheduleAndSendNcmpOutEvent(final String subscriptionId, final String eventType) { - final NcmpOutEventPublishingTask ncmpOutEventPublishingTask = - new NcmpOutEventPublishingTask(ncmpOutEventTopic, subscriptionId, eventType, eventsProducer, - ncmpOutEventMapper, dmiCacheHandler); - return scheduledExecutorService.schedule(ncmpOutEventPublishingTask, dmiOutEventTimeoutInMs, - TimeUnit.MILLISECONDS); - } - - private void cancelScheduledTask(final String taskKey) { - final ScheduledFuture scheduledFuture = scheduledTasksPerSubscriptionIdAndEventType.get(taskKey); - if (scheduledFuture != null) { - scheduledFuture.cancel(true); - scheduledTasksPerSubscriptionIdAndEventType.remove(taskKey); - } - } - - private void sendNcmpOutEventNow(final String subscriptionId, final String eventType, - final NcmpOutEvent ncmpOutEvent) { - final CloudEvent ncmpOutEventAsCloudEvent = - buildAndGetNcmpOutEventAsCloudEvent(subscriptionId, eventType, ncmpOutEvent); - eventsProducer.sendCloudEvent(ncmpOutEventTopic, subscriptionId, ncmpOutEventAsCloudEvent); - dmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId); - } - -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpOutEventPublishingTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpOutEventPublishingTask.java deleted file mode 100644 index 3e65a352d3..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpOutEventPublishingTask.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp; - -import static org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.NcmpOutEventProducer.buildAndGetNcmpOutEventAsCloudEvent; - -import io.cloudevents.CloudEvent; -import java.util.Map; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.onap.cps.events.EventsProducer; -import org.onap.cps.ncmp.impl.datajobs.subscription.cache.DmiCacheHandler; -import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails; -import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent; - -@Slf4j -@RequiredArgsConstructor -public class NcmpOutEventPublishingTask implements Runnable { - - private final String topicName; - private final String subscriptionId; - private final String eventType; - private final EventsProducer eventsProducer; - private final NcmpOutEventMapper ncmpOutEventMapper; - private final DmiCacheHandler dmiCacheHandler; - - /** - * Delegating the responsibility of sending NcmpOutEvent as a separate task which will - * be called after a specified delay. - */ - @Override - public void run() { - final Map dmiSubscriptionsPerDmi = - dmiCacheHandler.get(subscriptionId); - final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEvent(subscriptionId, - dmiSubscriptionsPerDmi); - eventsProducer.sendCloudEvent(topicName, subscriptionId, - buildAndGetNcmpOutEventAsCloudEvent(subscriptionId, eventType, ncmpOutEvent)); - dmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId); - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/utils/CmDataJobSubscriptionPersistenceService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/utils/CmDataJobSubscriptionPersistenceService.java index 7c05789fd9..14259d87b4 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/utils/CmDataJobSubscriptionPersistenceService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/utils/CmDataJobSubscriptionPersistenceService.java @@ -22,11 +22,14 @@ package org.onap.cps.ncmp.impl.datajobs.subscription.utils; import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS; +import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.UNKNOWN; import java.io.Serializable; import java.time.OffsetDateTime; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -42,27 +45,30 @@ import org.springframework.stereotype.Service; @RequiredArgsConstructor public class CmDataJobSubscriptionPersistenceService { - private static final String NCMP_DATASPACE_NAME = "NCMP-Admin"; - private static final String CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME = "cm-data-job-subscriptions"; - private static final String CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH = "/dataJob"; - private static final String CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE = - "/dataJob/subscription[@alternateId='%s' and @dataTypeId='%s']"; + private static final String DATASPACE = "NCMP-Admin"; + private static final String ANCHOR = "cm-data-job-subscriptions"; + + private static final String PARENT_NODE_XPATH = "/dataJob"; + private static final String CPS_PATH_FOR_SUBSCRIPTION_NODE = "//subscription"; + private static final String CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR = + CPS_PATH_FOR_SUBSCRIPTION_NODE + "[@dataNodeSelector='%s']"; private static final String CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID = - "//subscription/dataJobId[text()='%s']"; + CPS_PATH_FOR_SUBSCRIPTION_NODE + "/dataJobId[text()='%s']"; + private static final String CPS_PATH_TEMPLATE_FOR_INACTIVE_SUBSCRIPTIONS = + CPS_PATH_FOR_SUBSCRIPTION_NODE + "[@status='UNKNOWN' or @status='REJECTED']/dataJobId[text()='%s']"; private final JsonObjectMapper jsonObjectMapper; private final CpsQueryService cpsQueryService; private final CpsDataService cpsDataService; /** - * Check if we have a cm data job subscription for the given data type and target (FDN). + * Check if we have a cm data job subscription for the given data node selector. * - * @param dataType the data type of the data job subscription - * @param alternateId the alternate id target of the data job subscription + * @param dataNodeSelector the target of the data job subscription * @return true if the subscription details has at least one subscriber , otherwise false */ - public boolean hasAtLeastOneSubscription(final String dataType, final String alternateId) { - return !getSubscriptionIds(dataType, alternateId).isEmpty(); + public boolean hasAtLeastOneSubscription(final String dataNodeSelector) { + return !getSubscriptionIds(dataNodeSelector).isEmpty(); } /** @@ -73,23 +79,21 @@ public class CmDataJobSubscriptionPersistenceService { */ public boolean isNewSubscriptionId(final String subscriptionId) { final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID.formatted(subscriptionId); - return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME, + return cpsQueryService.queryDataNodes(DATASPACE, ANCHOR, query, OMIT_DESCENDANTS).isEmpty(); } /** - * Get the ids for the subscriptions for the given data type and targets. + * Get the ids for the subscriptions for the given data node selector. * - * @param dataType the data type of the data job subscription - * @param alternateId the alternate id target of the data job subscription + * @param dataNodeSelector the target of the data job subscription * @return collection of subscription ids of ongoing cm notification subscription */ @SuppressWarnings("unchecked") - public Collection getSubscriptionIds(final String dataType, final String alternateId) { - final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted( - alternateId, dataType); + public Collection getSubscriptionIds(final String dataNodeSelector) { + final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR.formatted(dataNodeSelector); final Collection existingNodes = - cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME, + cpsQueryService.queryDataNodes(DATASPACE, ANCHOR, query, OMIT_DESCENDANTS); if (existingNodes.isEmpty()) { return Collections.emptyList(); @@ -98,88 +102,70 @@ public class CmDataJobSubscriptionPersistenceService { } /** - * Add cm notification data job subscription. + * Get data node selectors for subscriptions with status UNKNOWN or REJECTED. * - * @param dataType the data type of the data job subscription - * @param alternateId the alternate id target of the data job subscription - * @param subscriptionId data job subscription id to be added + * @param subscriptionId subscription ID + * @return a list of data node selectors */ - public void addSubscription(final String dataType, final String alternateId, final String subscriptionId) { - final Collection subscriptionIds = getSubscriptionIds(dataType, alternateId); - if (subscriptionIds.isEmpty()) { - addNewSubscriptionDetails(dataType, alternateId, subscriptionId); - } else { - subscriptionIds.add(subscriptionId); - updateSubscriptionDetails(subscriptionIds, dataType, alternateId); + public List getInactiveDataNodeSelectors(final String subscriptionId) { + final String query = CPS_PATH_TEMPLATE_FOR_INACTIVE_SUBSCRIPTIONS.formatted(subscriptionId); + final Collection dataNodes = cpsQueryService.queryDataNodes(DATASPACE, ANCHOR, query, + OMIT_DESCENDANTS); + final List dataNodeSelectors = new ArrayList<>(dataNodes.size()); + for (final DataNode dataNode : dataNodes) { + final String dataNodeSelector = dataNode.getLeaves().get("dataNodeSelector").toString(); + dataNodeSelectors.add(dataNodeSelector); } + return dataNodeSelectors; } /** - * Remove cm notification data job Subscription. + * Add cm notification data job subscription. * - * @param dataType the data type of the data job subscription - * @param alternateId the alternate id target of the data job subscription - * @param subscriptionId data subscription id to remove + * @param subscriptionId data job subscription id to be added + * @param dataNodeSelector the target of the data job subscription */ - public void removeSubscription(final String dataType, final String alternateId, final String subscriptionId) { - final Collection subscriptionIds = getSubscriptionIds(dataType, alternateId); - if (subscriptionIds.remove(subscriptionId)) { - updateSubscriptionDetails(subscriptionIds, dataType, alternateId); - log.info("There is at least one subscriber left for dataType {} on {}", dataType, alternateId); - if (subscriptionIds.isEmpty()) { - log.info("There are no subscribers left for dataType {} on {}", dataType, alternateId); - deleteUnusedSubscriptionDetails(dataType, alternateId); - } + public void add(final String subscriptionId, final String dataNodeSelector) { + final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR.formatted(dataNodeSelector); + final Collection dataNodes = cpsQueryService.queryDataNodes(DATASPACE, ANCHOR, query, + OMIT_DESCENDANTS); + if (dataNodes.isEmpty()) { + addNewSubscriptionDetails(subscriptionId, dataNodeSelector); + } else { + final Collection subscriptionIds = getSubscriptionIds(dataNodeSelector); + final String status = dataNodes.iterator().next().getLeaves().get("status").toString(); + subscriptionIds.add(subscriptionId); + updateSubscriptionDetails(dataNodeSelector, subscriptionIds, status); } } - /** - * Retrieve all existing data nodes for given data job subscription id. - * - * @param subscriptionId data job subscription id - * @return collection of DataNodes - */ - public Collection getAffectedDataNodes(final String subscriptionId) { - final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID.formatted(subscriptionId); - return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME, - query, OMIT_DESCENDANTS); - } - - private void deleteUnusedSubscriptionDetails(final String dataType, final String alternateId) { - final String deleteListOfSubscriptionCpsPathQuery = - CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted(alternateId, - dataType); - cpsDataService.deleteDataNode(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME, - deleteListOfSubscriptionCpsPathQuery, OffsetDateTime.now()); - } - - private void addNewSubscriptionDetails(final String dataType, - final String alternateId, - final String subscriptionId) { + private void addNewSubscriptionDetails(final String subscriptionId, + final String dataNodeSelector) { final Collection newSubscriptionList = Collections.singletonList(subscriptionId); - final String subscriptionDetailsAsJson = getSubscriptionDetailsAsJson(newSubscriptionList, dataType, - alternateId); - cpsDataService.saveData(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME, subscriptionDetailsAsJson, + final String status = UNKNOWN.name(); + final String subscriptionDetailsAsJson = createSubscriptionDetailsAsJson(dataNodeSelector, + newSubscriptionList, status); + cpsDataService.saveData(DATASPACE, ANCHOR, subscriptionDetailsAsJson, OffsetDateTime.now(), ContentType.JSON); } - private void updateSubscriptionDetails(final Collection subscriptionIds, final String dataType, - final String alternateId) { - final String subscriptionDetailsAsJson = getSubscriptionDetailsAsJson(subscriptionIds, dataType, alternateId); - cpsDataService.updateNodeLeaves(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME, - CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH, subscriptionDetailsAsJson, OffsetDateTime.now(), + private void updateSubscriptionDetails(final String dataNodeSelector, final Collection subscriptionIds, + final String status) { + final String subscriptionDetailsAsJson = createSubscriptionDetailsAsJson(dataNodeSelector, + subscriptionIds, status); + cpsDataService.updateNodeLeaves(DATASPACE, ANCHOR, + PARENT_NODE_XPATH, subscriptionDetailsAsJson, OffsetDateTime.now(), ContentType.JSON); } - private String getSubscriptionDetailsAsJson(final Collection subscriptionIds, - final String dataTypeId, - final String alternateId) { + private String createSubscriptionDetailsAsJson(final String dataNodeSelector, + final Collection subscriptionIds, + final String status) { final Map subscriptionDetailsAsMap = - Map.of("dataTypeId", dataTypeId, - "alternateId", alternateId, - "dataJobId", (Serializable) subscriptionIds); + Map.of("dataNodeSelector", dataNodeSelector, + "dataJobId", (Serializable) subscriptionIds, + "status", status); return "{\"subscription\":[" + jsonObjectMapper.asJsonString(subscriptionDetailsAsMap) + "]}"; } - } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/JexParser.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/JexParser.java index aa5cbdee68..ed640b2803 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/JexParser.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/JexParser.java @@ -46,7 +46,6 @@ public class JexParser { * @param jsonExpressionsAsString Multi-line jex string. * @return List of xpaths */ - @SuppressWarnings("unused") public static List toXpaths(final String jsonExpressionsAsString) { if (jsonExpressionsAsString == null) { return Collections.emptyList(); @@ -67,7 +66,6 @@ public class JexParser { * @param xpath A single json expression. * @return Optional containing resolved fdn if found; empty otherwise. */ - @SuppressWarnings("unused") public static Optional extractFdnPrefix(final String xpath) { final List xpathSegments = splitIntoXpaths(xpath); final StringBuilder fdnBuilder = new StringBuilder(); @@ -96,7 +94,6 @@ public class JexParser { * @param xpaths List of xpath strings to be joined. * @return A string representing the concatenated json expression. */ - @SuppressWarnings("unused") public static String toJsonExpressionsAsString(final Collection xpaths) { return String.join(LINE_JOINER_DELIMITER, xpaths); } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cache/CmSubscriptionConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cache/CmSubscriptionConfigSpec.groovy deleted file mode 100644 index c9a95ac92e..0000000000 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cache/CmSubscriptionConfigSpec.groovy +++ /dev/null @@ -1,67 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2024 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.impl.datajobs.subscription.cache - -import com.hazelcast.core.Hazelcast -import com.hazelcast.map.IMap -import org.onap.cps.ncmp.api.data.models.DatastoreType -import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus -import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails -import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.boot.test.context.SpringBootTest -import spock.lang.Specification - -@SpringBootTest(classes = [CmSubscriptionConfig]) -class CmSubscriptionConfigSpec extends Specification { - - @Autowired - IMap> cmNotificationSubscriptionCache; - - def cleanupSpec() { - Hazelcast.getHazelcastInstanceByName('cps-and-ncmp-hazelcast-instance-test-config').shutdown() - } - - def 'Embedded (hazelcast) cache for Cm Notification Subscription Cache.'() { - expect: 'system is able to create an instance of the Cm Notification Subscription Cache' - assert null != cmNotificationSubscriptionCache - and: 'there is at least 1 instance' - assert Hazelcast.allHazelcastInstances.size() > 0 - and: 'Cm Notification Subscription Cache is present' - assert Hazelcast.allHazelcastInstances.name.contains('cps-and-ncmp-hazelcast-instance-test-config') - } - - def 'Provided CM Subscription data'() { - given: 'a cm subscription properties' - def subscriptionId = 'sub123' - def dmiPluginName = 'dummydmi' - def cmSubscriptionPredicates = new DmiCmSubscriptionPredicate(['cmhandle1', 'cmhandle2'].toSet(), DatastoreType.PASSTHROUGH_RUNNING, ['/a/b/c'].toSet()) - def cmSubscriptionCacheObject = new DmiCmSubscriptionDetails([cmSubscriptionPredicates], CmSubscriptionStatus.PENDING) - when: 'the cache is populated' - cmNotificationSubscriptionCache.put(subscriptionId, [(dmiPluginName): cmSubscriptionCacheObject]) - then: 'the values are present in memory' - assert cmNotificationSubscriptionCache.get(subscriptionId) != null - and: 'properties match' - assert dmiPluginName == cmNotificationSubscriptionCache.get(subscriptionId).keySet()[0] - assert cmSubscriptionCacheObject.cmSubscriptionStatus == cmNotificationSubscriptionCache.get(subscriptionId).values().cmSubscriptionStatus[0] - assert cmSubscriptionCacheObject.dmiCmSubscriptionPredicates[0].targetCmHandleIds == cmNotificationSubscriptionCache.get(subscriptionId).values().dmiCmSubscriptionPredicates[0].targetCmHandleIds[0] - } -} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cache/DmiCacheHandlerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cache/DmiCacheHandlerSpec.groovy deleted file mode 100644 index 8be98110e1..0000000000 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cache/DmiCacheHandlerSpec.groovy +++ /dev/null @@ -1,228 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the 'License'); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an 'AS IS' BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.impl.datajobs.subscription.cache - -import com.fasterxml.jackson.databind.ObjectMapper -import io.cloudevents.CloudEvent -import io.cloudevents.core.builder.CloudEventBuilder -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.NcmpInEvent -import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus -import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails -import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService -import org.onap.cps.ncmp.impl.inventory.InventoryPersistence -import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle -import org.onap.cps.ncmp.utils.TestUtils -import org.onap.cps.ncmp.utils.events.MessagingBaseSpec -import org.onap.cps.utils.JsonObjectMapper -import org.spockframework.spring.SpringBean -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.boot.test.context.SpringBootTest - -import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent - -@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper]) -class DmiCacheHandlerSpec extends MessagingBaseSpec { - - @Autowired - JsonObjectMapper jsonObjectMapper - @Autowired - ObjectMapper objectMapper - @SpringBean - InventoryPersistence mockInventoryPersistence = Mock(InventoryPersistence) - @SpringBean - CmDataJobSubscriptionPersistenceService mockCmSubscriptionPersistenceService = Mock(CmDataJobSubscriptionPersistenceService) - - def testCache = [:] - def objectUnderTest = new DmiCacheHandler(mockCmSubscriptionPersistenceService, testCache, mockInventoryPersistence) - - NcmpInEvent ncmpInEvent - def yangModelCmHandle1 = new YangModelCmHandle(id: 'ch1', dmiServiceName: 'dmi-1') - def yangModelCmHandle2 = new YangModelCmHandle(id: 'ch2', dmiServiceName: 'dmi-2') - def yangModelCmHandle3 = new YangModelCmHandle(id: 'ch3', dmiServiceName: 'dmi-1') - def yangModelCmHandle4 = new YangModelCmHandle(id: 'ch4', dmiServiceName: 'dmi-2') - - def setup() { - setUpTestEvent() - initialiseMockInventoryPersistenceResponses() - } - - def 'Load CM subscription event to cache with predicates'() { - given: 'a subscription event with id' - def subscriptionId = ncmpInEvent.getData().getSubscriptionId() - and: 'list of predicates' - def predicates = ncmpInEvent.getData().getPredicates() - when: 'subscription is loaded to cache with predicates' - objectUnderTest.add(subscriptionId, predicates) - then: 'the number of entries in cache is correct' - assert testCache.size() == 1 - and: 'the cache contains the correct entries' - assert testCache.containsKey(subscriptionId) - } - - def 'Load CM subscription event to cache with dmi subscription details per dmi'() { - given: 'a subscription event with id' - def subscriptionId = ncmpInEvent.getData().getSubscriptionId() - and: 'dmi subscription details per dmi' - def dmiSubscriptionsPerDmi = [:] - when: 'subscription is loaded to cache with dmi subscription details per dmi' - objectUnderTest.add(subscriptionId, dmiSubscriptionsPerDmi) - then: 'the number of entries in cache is correct' - assert testCache.size() == 1 - and: 'the cache contains the correct entries' - assert testCache.containsKey(subscriptionId) - and: 'the entry for the subscription ID matches the provided DMI subscription details' - assert testCache.get(subscriptionId) == dmiSubscriptionsPerDmi - } - - def 'Get cache entry via subscription id'() { - given: 'the cache contains value for some-id' - testCache.put('some-id', [:]) - when: 'the get method is called' - def result = objectUnderTest.get('some-id') - then: 'correct value is returned as expected' - assert result == [:] - } - - def 'Remove accepted and rejected entries from cache via subscription id'() { - given: 'a map as the value for cache entry for some-id' - def testMap = [:] - testMap.put("dmi-1", - new DmiCmSubscriptionDetails([], CmSubscriptionStatus.ACCEPTED)) - testMap.put("dmi-2", - new DmiCmSubscriptionDetails([], CmSubscriptionStatus.REJECTED)) - testMap.put("dmi-3", - new DmiCmSubscriptionDetails([], CmSubscriptionStatus.PENDING)) - testCache.put("test-id", testMap) - assert testCache.get("test-id").size() == 3 - when: 'the method to remove accepted and rejected entries for test-id is called' - objectUnderTest.removeAcceptedAndRejectedDmiSubscriptionEntries("test-id") - then: 'all entries with status accepted/rejected are no longer present for test-id' - testCache.get("test-id").each {key, testResultMap -> - assert testResultMap.cmSubscriptionStatus != CmSubscriptionStatus.ACCEPTED - || testResultMap.cmSubscriptionStatus != CmSubscriptionStatus.REJECTED - } - and: 'the size of the map for cache entry test-id is as expected' - assert testCache.get("test-id").size() == 1 - } - - def 'Create map for DMI cm notification subscription per DMI service name'() { - given: 'list of predicates from the create subscription event' - def predicates = ncmpInEvent.getData().getPredicates() - when: 'method to create map of DMI cm notification subscription per DMI service name is called' - def result = objectUnderTest.createDmiSubscriptionsPerDmi(predicates) - then: 'the result size of resulting map is correct to the number of DMIs' - assert result.size() == 2 - and: 'the cache objects per DMI exists' - def resultMapForDmi1 = result.get('dmi-1') - def resultMapForDmi2 = result.get('dmi-2') - assert resultMapForDmi1 != null - assert resultMapForDmi2 != null - and: 'the size of predicates in each object is correct' - assert resultMapForDmi1.dmiCmSubscriptionPredicates.size() == 2 - assert resultMapForDmi2.dmiCmSubscriptionPredicates.size() == 2 - and: 'the subscription status in each object is correct' - assert resultMapForDmi1.cmSubscriptionStatus.toString() == 'PENDING' - assert resultMapForDmi2.cmSubscriptionStatus.toString() == 'PENDING' - and: 'the target cmHandles for each predicate is correct' - assert resultMapForDmi1.dmiCmSubscriptionPredicates[0].targetCmHandleIds == ['ch1'].toSet() - assert resultMapForDmi1.dmiCmSubscriptionPredicates[1].targetCmHandleIds == ['ch3'].toSet() - - assert resultMapForDmi2.dmiCmSubscriptionPredicates[0].targetCmHandleIds == ['ch2'].toSet() - assert resultMapForDmi2.dmiCmSubscriptionPredicates[1].targetCmHandleIds == ['ch4'].toSet() - and: 'the list of xpath for each is correct' - assert resultMapForDmi1.dmiCmSubscriptionPredicates[0].xpaths - && resultMapForDmi2.dmiCmSubscriptionPredicates[0].xpaths == ['/x1/y1', 'x2/y2'].toSet() - - assert resultMapForDmi1.dmiCmSubscriptionPredicates[1].xpaths - && resultMapForDmi2.dmiCmSubscriptionPredicates[1].xpaths == ['/x3/y3', 'x4/y4'].toSet() - } - - def 'Get map for cm handle IDs by DMI service name'() { - given: 'the predicate from the test request CM subscription event' - def targetFilter = ncmpInEvent.getData().getPredicates().get(0).getTargetFilter() - when: 'the method to group all target CM handles by DMI service name is called' - def mapOfCMHandleIDsByDmi = objectUnderTest.groupTargetCmHandleIdsByDmi(targetFilter) - then: 'the size of the resulting map is correct' - assert mapOfCMHandleIDsByDmi.size() == 2 - and: 'the values in the map is as expected' - assert mapOfCMHandleIDsByDmi.get('dmi-1') == ['ch1'].toSet() - assert mapOfCMHandleIDsByDmi.get('dmi-2') == ['ch2'].toSet() - } - - def 'Update subscription status in cache per DMI service name'() { - given: 'populated cache' - def predicates = ncmpInEvent.getData().getPredicates() - def subscriptionId = ncmpInEvent.getData().getSubscriptionId() - objectUnderTest.add(subscriptionId, predicates) - when: 'subscription status per dmi is updated in cache' - objectUnderTest.updateDmiSubscriptionStatus(subscriptionId, 'dmi-1', CmSubscriptionStatus.ACCEPTED) - then: 'verify status has been updated in cache' - def predicate = testCache.get(subscriptionId) - assert predicate.get('dmi-1').cmSubscriptionStatus == CmSubscriptionStatus.ACCEPTED - } - - def 'Persist Cache into database per dmi'() { - given: 'populated cache' - def predicates = ncmpInEvent.getData().getPredicates() - def subscriptionId = ncmpInEvent.getData().getSubscriptionId() - objectUnderTest.add(subscriptionId, predicates) - when: 'subscription is persisted in database' - objectUnderTest.persistIntoDatabasePerDmi(subscriptionId, 'dmi-1') - then: 'persistence service is called the correct number of times per dmi' - 2 * mockCmSubscriptionPersistenceService.addSubscription(*_) - } - - def 'Remove subscription from database per dmi'() { - given: 'populated cache' - def predicates = ncmpInEvent.getData().getPredicates() - def subscriptionId = ncmpInEvent.getData().getSubscriptionId() - objectUnderTest.add(subscriptionId, predicates) - when: 'subscription is persisted in database' - objectUnderTest.removeFromDatabase(subscriptionId, 'dmi-1') - then: 'persistence service is called the correct number of times per dmi' - 2 * mockCmSubscriptionPersistenceService.removeSubscription(*_) - } - - def setUpTestEvent() { - def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json') - def testEventSent = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent.class) - def testCloudEventSent = CloudEventBuilder.v1() - .withData(objectMapper.writeValueAsBytes(testEventSent)) - .withId('subscriptionCreated') - .withType('subscriptionCreated') - .withSource(URI.create('some-resource')) - .withExtension('correlationid', 'test-cmhandle1').build() - def consumerRecord = new ConsumerRecord('topic-name', 0, 0, 'event-key', testCloudEventSent) - def cloudEvent = consumerRecord.value() - - ncmpInEvent = toTargetEvent(cloudEvent, NcmpInEvent.class); - } - - def initialiseMockInventoryPersistenceResponses() { - mockInventoryPersistence.getYangModelCmHandles(['ch1', 'ch2']) - >> [yangModelCmHandle1, yangModelCmHandle2] - - mockInventoryPersistence.getYangModelCmHandles(['ch3', 'ch4']) - >> [yangModelCmHandle3, yangModelCmHandle4] - } - -} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiCmSubscriptionDetailsPerDmiMapperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiCmSubscriptionDetailsPerDmiMapperSpec.groovy deleted file mode 100644 index 2d0a4c54fd..0000000000 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiCmSubscriptionDetailsPerDmiMapperSpec.groovy +++ /dev/null @@ -1,52 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2024 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.impl.datajobs.subscription.dmi - -import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionKey -import spock.lang.Specification - -class DmiCmSubscriptionDetailsPerDmiMapperSpec extends Specification { - - def objectUnderTest = new DmiCmSubscriptionDetailsPerDmiMapper() - - def 'Check for grouping of Dmi Subscription Details'() { - given: 'details in the form of datastore , cmhandle and xpath' - def subscribersPerDmi = [ - 'dmi-1': [ - new DmiCmSubscriptionKey('ncmp-datastore:passthrough-operational', 'ch-1', '/a/b'), - new DmiCmSubscriptionKey('ncmp-datastore:passthrough-operational', 'ch-2', '/a/b') - ], - 'dmi-2': [ - new DmiCmSubscriptionKey('ncmp-datastore:passthrough-running', 'ch-3', '/c/d'), - new DmiCmSubscriptionKey('ncmp-datastore:passthrough-running', 'ch-3', '/e/f') - ] - ] - when: 'we try to map the values based on datastore and xpath' - def result = objectUnderTest.toDmiCmSubscriptionsPerDmi(subscribersPerDmi) - then: 'the mapped values are grouped as expected for dmi-1' - assert result['dmi-1'].dmiCmSubscriptionPredicates.size() == 1 - assert result['dmi-1'].dmiCmSubscriptionPredicates[0].targetCmHandleIds.containsAll(['ch-1', 'ch-2']) - and: 'similarly for dmi-2' - assert result['dmi-2'].dmiCmSubscriptionPredicates.size() == 2 - assert result['dmi-2'].dmiCmSubscriptionPredicates[0].targetCmHandleIds.contains('ch-3') - assert result['dmi-2'].dmiCmSubscriptionPredicates[1].targetCmHandleIds.contains('ch-3') - } -} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiInEventMapperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiInEventMapperSpec.groovy index ccc25255d1..421f13941d 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiInEventMapperSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiInEventMapperSpec.groovy @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2024 Nordix Foundation + * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,15 +20,11 @@ package org.onap.cps.ncmp.impl.datajobs.subscription.dmi - -import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate import org.onap.cps.ncmp.impl.inventory.InventoryPersistence import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle +import org.onap.cps.ncmp.impl.utils.JexParser import spock.lang.Specification -import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_OPERATIONAL -import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_RUNNING - class DmiInEventMapperSpec extends Specification { def mockInventoryPersistence = Mock(InventoryPersistence) @@ -42,16 +38,18 @@ class DmiInEventMapperSpec extends Specification { } def 'Check for Cm Notification Subscription DMI In Event mapping'() { - given: 'a collection of cm subscription predicates' - def dmiSubscriptionPredicates = [new DmiCmSubscriptionPredicate(['ch-1'].toSet(), PASSTHROUGH_RUNNING, ['/ch-1'].toSet()), - new DmiCmSubscriptionPredicate(['ch-2'].toSet(), PASSTHROUGH_OPERATIONAL, ['/ch-2'].toSet())] + given: 'data job subscription details' + def cmHandleIds = ['ch-1', 'ch-2'].asList() + def dataNodeSelectors = ['/dataNodeSelector1'].asList() + def notificationTypes = [] + def notificationFilter = '' + def dataNodeSelectorAsJsonExpression = JexParser.toJsonExpressionsAsString(dataNodeSelectors) when: 'we try to map the values' - def result = objectUnderTest.toDmiInEvent(dmiSubscriptionPredicates) - then: 'it contains correct cm notification subscription cmhandle object' - assert result.data.cmHandles.cmhandleId.containsAll(['ch-1', 'ch-2']) - assert result.data.cmHandles.privateProperties.containsAll([['k1': 'v1'], ['k2': 'v2']]) - and: 'also has the correct dmi cm notification subscription predicates' - assert result.data.predicates.targetFilter.containsAll([['ch-1'], ['ch-2']]) + def result = objectUnderTest.toDmiInEvent(cmHandleIds, dataNodeSelectors, notificationTypes, notificationFilter) + then: 'it contains correct cm handles' + assert result.data.cmHandles.cmhandleId.containsAll(cmHandleIds) + and: 'correct data node selector' + assert result.data.productionJobDefinition.targetSelector.dataNodeSelector == dataNodeSelectorAsJsonExpression } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiOutEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiOutEventConsumerSpec.groovy deleted file mode 100644 index 8cdbf88852..0000000000 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiOutEventConsumerSpec.groovy +++ /dev/null @@ -1,123 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (c) 2024-2025 OpenInfra Foundation Europe. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an 'AS IS' BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.impl.datajobs.subscription.dmi - -import ch.qos.logback.classic.Level -import ch.qos.logback.classic.Logger -import ch.qos.logback.classic.spi.ILoggingEvent -import ch.qos.logback.core.read.ListAppender -import com.fasterxml.jackson.databind.ObjectMapper -import io.cloudevents.CloudEvent -import io.cloudevents.core.builder.CloudEventBuilder -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.onap.cps.ncmp.impl.datajobs.subscription.dmi_to_ncmp.Data -import org.onap.cps.ncmp.impl.datajobs.subscription.dmi_to_ncmp.DmiOutEvent -import org.onap.cps.ncmp.impl.datajobs.subscription.cache.DmiCacheHandler -import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.NcmpOutEventMapper -import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.NcmpOutEventProducer -import org.onap.cps.ncmp.utils.TestUtils -import org.onap.cps.ncmp.utils.events.MessagingBaseSpec -import org.onap.cps.utils.JsonObjectMapper -import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.boot.test.context.SpringBootTest - -import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED -import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.REJECTED - -@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper]) -class DmiOutEventConsumerSpec extends MessagingBaseSpec { - - @Autowired - JsonObjectMapper jsonObjectMapper - - @Autowired - ObjectMapper objectMapper - - def mockDmiCacheHandler = Mock(DmiCacheHandler) - def mockNcmpOutEventProducer = Mock(NcmpOutEventProducer) - def mockNcmpOutEventMapper = Mock(NcmpOutEventMapper) - - def objectUnderTest = new DmiOutEventConsumer(mockDmiCacheHandler, mockNcmpOutEventProducer, mockNcmpOutEventMapper) - def logger = Spy(ListAppender) - - void setup() { - ((Logger) LoggerFactory.getLogger(DmiOutEventConsumer.class)).addAppender(logger) - logger.start() - } - - void cleanup() { - ((Logger) LoggerFactory.getLogger(DmiOutEventConsumer.class)).detachAndStopAllAppenders() - } - - - def 'Consume valid CM Subscription response from DMI Plugin'() { - given: 'a cmsubscription event' - def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionDmiOutEvent.json') - def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DmiOutEvent.class) - def testCloudEventSent = CloudEventBuilder.v1() - .withData(objectMapper.writeValueAsBytes(testEventSent)) - .withId('random-uuid') - .withType('subscriptionCreateResponse') - .withSource(URI.create('test-dmi-plugin-name')) - .withExtension('correlationid', 'sub-1#test-dmi-plugin-name').build() - def consumerRecord = new ConsumerRecord('topic-name', 0, 0, 'event-key', testCloudEventSent) - when: 'the valid event is consumed' - objectUnderTest.consumeDmiOutEvent(consumerRecord) - then: 'an event is logged with level INFO' - def loggingEvent = getLoggingEvent() - assert loggingEvent.level == Level.INFO - and: 'the log indicates the task completed successfully' - assert loggingEvent.formattedMessage == 'Cm Subscription with id : sub-1 handled by the dmi-plugin : test-dmi-plugin-name has the status : accepted' - } - - def 'Consume a valid CM Notification Subscription Event and perform correct actions base on status'() { - given: 'a cmNotificationSubscription event' - def dmiOutEventData = new Data(statusCode: statusCode, statusMessage: subscriptionStatus.toString()) - def dmiOutEvent = new DmiOutEvent().withData(dmiOutEventData) - def testCloudEventSent = CloudEventBuilder.v1() - .withData(objectMapper.writeValueAsBytes(dmiOutEvent)) - .withId('random-uuid') - .withType('subscriptionCreateResponse') - .withSource(URI.create('test-dmi-plugin-name')) - .withExtension('correlationid', 'sub-1#test-dmi-plugin-name').build() - def consumerRecord = new ConsumerRecord('topic-name', 0, 0, 'event-key', testCloudEventSent) - when: 'the event is consumed' - objectUnderTest.consumeDmiOutEvent(consumerRecord) - then: 'correct number of calls to cache' - expectedCacheCalls * mockDmiCacheHandler.updateDmiSubscriptionStatus('sub-1', 'test-dmi-plugin-name', subscriptionStatus) - and: 'correct number of calls to persist cache' - expectedPersistenceCalls * mockDmiCacheHandler.persistIntoDatabasePerDmi('sub-1', 'test-dmi-plugin-name') - and: 'correct number of calls to map the ncmp out event' - 1 * mockNcmpOutEventMapper.toNcmpOutEvent('sub-1', _) - and: 'correct number of calls to send the ncmp out event to client' - 1 * mockNcmpOutEventProducer.sendNcmpOutEvent('sub-1', 'subscriptionCreateResponse', _, false) - where: 'the following parameters are used' - scenario | subscriptionStatus | statusCode || expectedCacheCalls | expectedPersistenceCalls - 'Accepted Status' | ACCEPTED | '1' || 1 | 1 - 'Rejected Status' | REJECTED | '104' || 1 | 0 - } - - def getLoggingEvent() { - return logger.list[0] - } - -} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiInEventProducerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventProducerSpec.groovy similarity index 87% rename from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiInEventProducerSpec.groovy rename to cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventProducerSpec.groovy index 683f10be13..32f4eb8bd0 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiInEventProducerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventProducerSpec.groovy @@ -27,7 +27,7 @@ import org.onap.cps.events.EventsProducer import org.onap.cps.ncmp.config.CpsApplicationContext import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.CmHandle import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.Data -import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DmiInEvent +import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent import org.onap.cps.ncmp.utils.events.CloudEventMapper import org.onap.cps.utils.JsonObjectMapper import org.springframework.boot.test.context.SpringBootTest @@ -36,22 +36,22 @@ import spock.lang.Specification @SpringBootTest(classes = [ObjectMapper, JsonObjectMapper, CloudEventBuilder]) @ContextConfiguration(classes = [CpsApplicationContext]) -class DmiInEventProducerSpec extends Specification { +class EventProducerSpec extends Specification { def mockEventsProducer = Mock(EventsProducer) - def objectUnderTest = new DmiInEventProducer(mockEventsProducer) + def objectUnderTest = new EventProducer(mockEventsProducer) def 'Create and Send Cm Notification Subscription DMI In Event'() { given: 'a cm subscription for a dmi plugin' def subscriptionId = 'test-subscription-id' def dmiPluginName = 'test-dmiplugin' def eventType = 'subscriptionCreateRequest' - def dmiInEvent = new DmiInEvent(data: new Data(cmHandles: [new CmHandle(cmhandleId: 'test-1', privateProperties: [:])])) + def dmiInEvent = new DataJobSubscriptionDmiInEvent(data: new Data(cmHandles: [new CmHandle(cmhandleId: 'test-1', privateProperties: [:])])) and: 'also we have target topic for dmiPlugin' objectUnderTest.dmiInEventTopic = 'dmiplugin-test-topic' when: 'the event is sent' - objectUnderTest.sendDmiInEvent(subscriptionId, dmiPluginName, eventType, dmiInEvent) + objectUnderTest.send(subscriptionId, dmiPluginName, eventType, dmiInEvent) then: 'the event contains the required attributes' 1 * mockEventsProducer.sendCloudEvent(_, _, _) >> { args -> @@ -62,7 +62,7 @@ class DmiInEventProducerSpec extends Specification { assert dmiInEventAsCloudEvent.getExtension('correlationid') == subscriptionId + '#' + dmiPluginName assert dmiInEventAsCloudEvent.type == 'subscriptionCreateRequest' assert dmiInEventAsCloudEvent.source.toString() == 'NCMP' - assert CloudEventMapper.toTargetEvent(dmiInEventAsCloudEvent, DmiInEvent) == dmiInEvent + assert CloudEventMapper.toTargetEvent(dmiInEventAsCloudEvent, DataJobSubscriptionDmiInEvent) == dmiInEvent } } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionComparatorSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionComparatorSpec.groovy deleted file mode 100644 index 42f289f13c..0000000000 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionComparatorSpec.groovy +++ /dev/null @@ -1,57 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (c) 2024-2025 OpenInfra Foundation Europe. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an 'AS IS' BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ -package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp - -import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate -import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService -import spock.lang.Specification - -import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_OPERATIONAL - -class CmSubscriptionComparatorSpec extends Specification { - - def mockCmSubscriptionPersistenceService = Mock(CmDataJobSubscriptionPersistenceService) - def objectUnderTest = new CmSubscriptionComparator(mockCmSubscriptionPersistenceService) - - def 'Find the difference based on the provided predicates'() { - given: 'A list of predicates' - def predicates = [new DmiCmSubscriptionPredicate(['ch-1', 'ch-2'].toSet(), PASSTHROUGH_OPERATIONAL, ['a/1/', 'b/2'].toSet())] - and: '1 positive and 1 negative response.' - mockCmSubscriptionPersistenceService.hasAtLeastOneSubscription(PASSTHROUGH_OPERATIONAL.getDatastoreName(), 'ch-1') >> true - mockCmSubscriptionPersistenceService.hasAtLeastOneSubscription(PASSTHROUGH_OPERATIONAL.getDatastoreName(), 'ch-2') >> false - when: 'method to extract only NEW predicates for dmi is called' - def result = objectUnderTest.getNewDmiSubscriptionPredicates(predicates) - then: 'from 2 predicates only 1 remains' - assert result.size() == 1 - assert result[0].targetCmHandleIds[0] == 'ch-2' - } - - def 'Find the difference based on the provided predicates when it is an ongoing Cm Subscription'() { - given: 'A list of predicates' - def predicates = [new DmiCmSubscriptionPredicate(['ch-1'].toSet(), PASSTHROUGH_OPERATIONAL, ['a/1/'].toSet())] - and: 'its already present' - mockCmSubscriptionPersistenceService.hasAtLeastOneSubscription(PASSTHROUGH_OPERATIONAL.getDatastoreName(), 'ch-1') >> true - when: 'method to extract only NEW predicates for dmi is called' - def result = objectUnderTest.getNewDmiSubscriptionPredicates(predicates) - then: 'from 1 predicate, none remains' - assert result.size() == 0 - } - -} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImplSpec.groovy index a267e277a7..79becb3454 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImplSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImplSpec.groovy @@ -20,156 +20,120 @@ package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp -import com.fasterxml.jackson.databind.ObjectMapper -import org.onap.cps.api.model.DataNode -import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.NcmpInEvent -import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent -import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DmiInEvent -import org.onap.cps.ncmp.impl.datajobs.subscription.cache.DmiCacheHandler -import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiCmSubscriptionDetailsPerDmiMapper + +import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataSelector +import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventMapper -import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventProducer -import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails -import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate +import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.EventProducer import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService import org.onap.cps.ncmp.impl.inventory.InventoryPersistence import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle -import org.onap.cps.ncmp.utils.TestUtils -import org.onap.cps.utils.JsonObjectMapper +import org.onap.cps.ncmp.impl.utils.AlternateIdMatcher +import org.onap.cps.ncmp.impl.utils.JexParser import spock.lang.Specification -import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_OPERATIONAL -import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED -import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.PENDING - class CmSubscriptionHandlerImplSpec extends Specification { - def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper()) def mockCmSubscriptionPersistenceService = Mock(CmDataJobSubscriptionPersistenceService) - def mockCmSubscriptionComparator = Mock(CmSubscriptionComparator) - def mockNcmpOutEventMapper = Mock(NcmpOutEventMapper) def mockDmiInEventMapper = Mock(DmiInEventMapper) - def dmiCmSubscriptionDetailsPerDmiMapper = new DmiCmSubscriptionDetailsPerDmiMapper() - def mockNcmpOutEventProducer = Mock(NcmpOutEventProducer) - def mockDmiInEventProducer = Mock(DmiInEventProducer) - def mockDmiCacheHandler = Mock(DmiCacheHandler) + def mockDmiInEventProducer = Mock(EventProducer) def mockInventoryPersistence = Mock(InventoryPersistence) + def mockAlternateIdMatcher = Mock(AlternateIdMatcher) - def objectUnderTest = new CmSubscriptionHandlerImpl(mockCmSubscriptionPersistenceService, - mockCmSubscriptionComparator, mockNcmpOutEventMapper, mockDmiInEventMapper, dmiCmSubscriptionDetailsPerDmiMapper, - mockNcmpOutEventProducer, mockDmiInEventProducer, mockDmiCacheHandler, mockInventoryPersistence) - - def testDmiSubscriptionsPerDmi = ["dmi-1": new DmiCmSubscriptionDetails([], PENDING)] - - def 'Consume valid and unique CmNotificationSubscriptionNcmpInEvent create message'() { - given: 'a cmNotificationSubscriptionNcmp in event with new subscription id' - def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json') - def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent.class) - def testListOfDeltaPredicates = [new DmiCmSubscriptionPredicate(['ch1'].toSet(), PASSTHROUGH_OPERATIONAL, ['/a/b'].toSet())] - and: 'the persistence service confirms subscription id is new (not used for other subscription)' - mockCmSubscriptionPersistenceService.isNewSubscriptionId('test-id') >> true - and: 'relevant details is extracted from the event' - def subscriptionId = testEventConsumed.getData().getSubscriptionId() - def predicates = testEventConsumed.getData().getPredicates() - and: 'the cache handler returns for relevant subscription id' - 1 * mockDmiCacheHandler.get("test-id") >> testDmiSubscriptionsPerDmi - and: 'the delta predicates is returned' - 1 * mockCmSubscriptionComparator.getNewDmiSubscriptionPredicates(_) >> testListOfDeltaPredicates - and: 'the DMI in event mapper returns cm notification subscription event' - def testDmiInEvent = new DmiInEvent() - 1 * mockDmiInEventMapper.toDmiInEvent(testListOfDeltaPredicates) >> testDmiInEvent - when: 'the valid and unique event is consumed' - objectUnderTest.processSubscriptionCreateRequest(subscriptionId, predicates) - then: 'the subscription cache handler is called once' - 1 * mockDmiCacheHandler.add('test-id', _) - and: 'the events handler method to send DMI event is called correct number of times with the correct parameters' - testDmiSubscriptionsPerDmi.size() * mockDmiInEventProducer.sendDmiInEvent( - "test-id", "dmi-1", "subscriptionCreateRequest", testDmiInEvent) - and: 'we schedule to send the response after configured time from the cache' - 1 * mockNcmpOutEventProducer.sendNcmpOutEvent('test-id', 'subscriptionCreateResponse', null, true) - } + def objectUnderTest = new CmSubscriptionHandlerImpl(mockCmSubscriptionPersistenceService, mockDmiInEventMapper, + mockDmiInEventProducer, mockInventoryPersistence, mockAlternateIdMatcher) - def 'Consume valid and Overlapping Cm Notification Subscription NcmpIn Event'() { - given: 'a cmNotificationSubscriptionNcmp in event with unique subscription id' - def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json') - def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent.class) - def noDeltaPredicates = [] - and: 'the persistence service confirms subscription id is new (not used for other subscription)' - mockCmSubscriptionPersistenceService.isNewSubscriptionId('test-id') >> true - and: 'the cache handler returns for relevant subscription id' - 1 * mockDmiCacheHandler.get('test-id') >> testDmiSubscriptionsPerDmi - and: 'the delta predicates is returned' - 1 * mockCmSubscriptionComparator.getNewDmiSubscriptionPredicates(_) >> noDeltaPredicates - when: 'the valid and unique event is consumed' - objectUnderTest.processSubscriptionCreateRequest('test-id', noDeltaPredicates) - then: 'the subscription cache handler is called once' - 1 * mockDmiCacheHandler.add('test-id', _) - and: 'the subscription details are updated in the cache' - 1 * mockDmiCacheHandler.updateDmiSubscriptionStatus('test-id', _, ACCEPTED) - and: 'we schedule to send the response after configured time from the cache' - 1 * mockNcmpOutEventProducer.sendNcmpOutEvent('test-id', 'subscriptionCreateResponse', null, true) + def 'Process subscription CREATE request for new target [non existing]'() { + given: 'relevant subscription details' + def mySubId = 'dataJobId' + def myDataNodeSelectors = ['/parent[id="1"]'].toList() + def notificationTypes = [] + def notificationFilter = '' + def dataSelector = new DataSelector(notificationTypes: notificationTypes, notificationFilter: notificationFilter) + and: 'alternate Id matcher returns cm handle id for given data node selector' + def fdn = getFdn(myDataNodeSelectors.iterator().next()) + mockAlternateIdMatcher.getCmHandleId(fdn) >> 'myCmHandleId' + and: 'returns inactive data node selector(s)' + mockCmSubscriptionPersistenceService.getInactiveDataNodeSelectors(mySubId) >> ['/parent[id="1"]'] + and: 'the inventory persistence service returns cm handle' + mockInventoryPersistence.getYangModelCmHandle('myCmHandleId') >> new YangModelCmHandle(dmiServiceName: 'myDmiService') + and: 'DMI in event mapper returns event' + def myDmiInEvent = new DataJobSubscriptionDmiInEvent() + mockDmiInEventMapper.toDmiInEvent(['myCmHandleId'], myDataNodeSelectors, notificationTypes, notificationFilter) >> myDmiInEvent + when: 'the method to process subscription create request is called' + objectUnderTest.processSubscriptionCreate(dataSelector, mySubId, myDataNodeSelectors) + then: 'the persistence service is called' + 1 * mockCmSubscriptionPersistenceService.add(mySubId, '/parent[id="1"]') + and: 'the event is sent to correct DMI' + 1 * mockDmiInEventProducer.send(mySubId, 'myDmiService', 'subscriptionCreateRequest', _) } - def 'Consume valid and but non-unique CmNotificationSubscription create message'() { - given: 'a cmNotificationSubscriptionNcmp in event' - def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json') - def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent.class) - and: 'the persistence service confirms subscription id is not new (already used subscription)' - mockCmSubscriptionPersistenceService.isNewSubscriptionId('test-id') >> false - and: 'relevant details is extracted from the event' - def subscriptionId = testEventConsumed.getData().getSubscriptionId() - def predicates = testEventConsumed.getData().getPredicates() - and: 'the NCMP out in event mapper returns an event for rejected request' - def testNcmpOutEvent = new NcmpOutEvent() - 1 * mockNcmpOutEventMapper.toNcmpOutEventForRejectedRequest( - "test-id", _) >> testNcmpOutEvent - when: 'the valid but non-unique event is consumed' - objectUnderTest.processSubscriptionCreateRequest(subscriptionId, predicates) - then: 'the events handler method to send DMI event is never called' - 0 * mockDmiInEventProducer.sendDmiInEvent(_, _, _, _) - and: 'the events handler method to send NCMP out event is called once' - 1 * mockNcmpOutEventProducer.sendNcmpOutEvent('test-id', 'subscriptionCreateResponse', testNcmpOutEvent, false) + def 'Process subscription CREATE request for new targets [non existing] to be sent to multiple DMIs'() { + given: 'relevant subscription details' + def mySubId = 'dataJobId' + def myDataNodeSelectors = [ + '/parent[id="forDmi1"]', + '/parent[id="forDmi1"]/child', + '/parent[id="forDmi2"]'].toList() + def someAttr1 = [] + def someAttr2 = '' + def dataSelector = new DataSelector(notificationTypes: someAttr1, notificationFilter: someAttr2) + and: 'alternate Id matcher returns cm handle ids for given data node selectors' + def fdn1 = getFdn(myDataNodeSelectors.get(0)) + def fdn2 = getFdn(myDataNodeSelectors.get(1)) + def fdn3 = getFdn(myDataNodeSelectors.get(2)) + mockAlternateIdMatcher.getCmHandleId(fdn1) >> 'myCmHandleId1' + mockAlternateIdMatcher.getCmHandleId(fdn2) >> 'myCmHandleId1' + mockAlternateIdMatcher.getCmHandleId(fdn3) >> 'myCmHandleId2' + and: 'returns inactive data node selector(s)' + mockCmSubscriptionPersistenceService.getInactiveDataNodeSelectors(mySubId) >> [ + '/parent[id="forDmi1"]', + '/parent[id="forDmi1"]/child', + '/parent[id="forDmi2"]'] + and: 'the inventory persistence service returns cm handles with dmi information' + mockInventoryPersistence.getYangModelCmHandle('myCmHandleId1') >> new YangModelCmHandle(dmiServiceName: 'myDmiService1') + mockInventoryPersistence.getYangModelCmHandle('myCmHandleId2') >> new YangModelCmHandle(dmiServiceName: 'myDmiService2') + and: 'DMI in event mapper returns events' + def myDmiInEvent1 = new DataJobSubscriptionDmiInEvent() + def myDmiInEvent2 = new DataJobSubscriptionDmiInEvent() + mockDmiInEventMapper.toDmiInEvent(['myCmHandleId1'], ['/parent[id="forDmi1"]', '/parent[id="forDmi1"]/child'], someAttr1, someAttr2) >> myDmiInEvent1 + mockDmiInEventMapper.toDmiInEvent(['myCmHandleId2'], ['/parent[id="forDmi2"]'], someAttr1, someAttr2) >> myDmiInEvent2 + when: 'the method to process subscription create request is called' + objectUnderTest.processSubscriptionCreate(dataSelector, mySubId, myDataNodeSelectors) + then: 'the persistence service is called' + myDataNodeSelectors.each { dataNodeSelector -> + 1 * mockCmSubscriptionPersistenceService.add(_, dataNodeSelector)} + and: 'the event is sent to correct DMIs' + 1 * mockDmiInEventProducer.send(mySubId, 'myDmiService1', 'subscriptionCreateRequest', myDmiInEvent1) + 1 * mockDmiInEventProducer.send(mySubId, 'myDmiService2', 'subscriptionCreateRequest', myDmiInEvent2) } - def 'Consume valid CmNotificationSubscriptionNcmpInEvent delete message'() { - given: 'a test subscription id' - def subscriptionId = 'test-id' - and: 'the persistence service returns datanodes' - 1 * mockCmSubscriptionPersistenceService.getAffectedDataNodes(subscriptionId) >> - [new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-1']/filters/filter[@xpath='x/y']", leaves: ['xpath': 'x/y', 'subscriptionIds': ['test-id']]), - new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-2']/filters/filter[@xpath='y/z']", leaves: ['xpath': 'y/z', 'subscriptionIds': ['test-id']])] - and: 'the inventory persistence returns yang model cm handles' - 1 * mockInventoryPersistence.getYangModelCmHandle('ch-1') >> new YangModelCmHandle(dmiServiceName: 'dmi-1') - 1 * mockInventoryPersistence.getYangModelCmHandle('ch-2') >> new YangModelCmHandle(dmiServiceName: 'dmi-2') - when: 'the subscription delete request is processed' - objectUnderTest.processSubscriptionDeleteRequest(subscriptionId) - then: 'the method to send a dmi event is called with correct parameters' - 1 * mockDmiInEventProducer.sendDmiInEvent(subscriptionId, 'dmi-1', 'subscriptionDeleteRequest', _) - 1 * mockDmiInEventProducer.sendDmiInEvent(subscriptionId, 'dmi-2', 'subscriptionDeleteRequest', _) - and: 'the method to send nmcp out event is called with correct parameters' - 1 * mockNcmpOutEventProducer.sendNcmpOutEvent(subscriptionId, 'subscriptionDeleteResponse', null, true) + def 'Process subscription CREATE request for overlapping targets [non existing & existing]'() { + given: 'relevant subscription details' + def myNewSubId = 'newId' + def myDataNodeSelectors = ['/newDataNodeSelector[id=""]'].toList() + def dataSelector = new DataSelector(notificationTypes: [], notificationFilter: '') + and: 'alternate id matcher always returns a cm handle id' + mockAlternateIdMatcher.getCmHandleId(_) >> 'someCmHandleId' + and: 'the inventory persistence service returns cm handles with dmi information' + mockInventoryPersistence.getYangModelCmHandle(_) >> new YangModelCmHandle(dmiServiceName: 'myDmiService') + and: 'returns inactive data node selector(s)' + mockCmSubscriptionPersistenceService.getInactiveDataNodeSelectors(myNewSubId) >> inactiveDataNodeSelectors + when: 'the method to process subscription create request is called' + objectUnderTest.processSubscriptionCreate(dataSelector, myNewSubId, myDataNodeSelectors) + then: 'the persistence service is called' + 1 * mockCmSubscriptionPersistenceService.add(_, myDataNodeSelectors.iterator().next()) + and: 'the event is sent to correct DMIs' + expectedCallsToDmi * mockDmiInEventProducer.send(myNewSubId, 'myDmiService', 'subscriptionCreateRequest', _) + where: 'following data are used' + scenario | inactiveDataNodeSelectors || expectedCallsToDmi + 'new target overlaps with ACCEPTED targets' | [] || 0 + 'new target overlaps with REJECTED targets' | ['/existingDataNodeSelector[id=""]','/newDataNodeSelector[id=""]']|| 1 + 'new target overlaps with UNKNOWN targets' | ['/existingDataNodeSelector[id=""]','/newDataNodeSelector[id=""]']|| 1 + 'new target does not overlap with existing targets'| ['/newDataNodeSelector[id=""]'] || 1 } - def 'Delete a subscriber for fully overlapping subscriptions'() { - given: 'a test subscription id' - def subscriptionId = 'test-id' - and: 'the persistence service returns datanodes with multiple subscribers' - 1 * mockCmSubscriptionPersistenceService.getAffectedDataNodes(subscriptionId) >> - [new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-1']/filters/filter[@xpath='x/y']", leaves: ['xpath': 'x/y', 'subscriptionIds': ['test-id', 'other-id']]), - new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-2']/filters/filter[@xpath='y/z']", leaves: ['xpath': 'y/z', 'subscriptionIds': ['test-id', 'other-id']])] - and: 'the inventory persistence returns yang model cm handles' - 1 * mockInventoryPersistence.getYangModelCmHandle('ch-1') >> new YangModelCmHandle(dmiServiceName: 'dmi-1') - 1 * mockInventoryPersistence.getYangModelCmHandle('ch-2') >> new YangModelCmHandle(dmiServiceName: 'dmi-2') - and: 'the cache handler returns the relevant maps whenever called' - 2 * mockDmiCacheHandler.get(subscriptionId) >> ['dmi-1': [:], 'dmi-2': [:]] - when: 'the subscription delete request is processed' - objectUnderTest.processSubscriptionDeleteRequest(subscriptionId) - then: 'the method to send a dmi event is never called' - 0 * mockDmiInEventProducer.sendDmiInEvent(_, _, _, _) - and: 'the cache handler is called to remove subscriber from database per dmi' - 1 * mockDmiCacheHandler.removeFromDatabase('test-id', 'dmi-1') - 1 * mockDmiCacheHandler.removeFromDatabase('test-id', 'dmi-2') - and: 'the method to send ncmp out event is called with correct parameters' - 1 * mockNcmpOutEventProducer.sendNcmpOutEvent(subscriptionId, 'subscriptionDeleteResponse', null, false) + def getFdn(dataNodeSelector) { + return JexParser.extractFdnPrefix(dataNodeSelector).orElse("") } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpInEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpInEventConsumerSpec.groovy index 8290166989..ad201c6e52 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpInEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpInEventConsumerSpec.groovy @@ -26,6 +26,7 @@ import ch.qos.logback.classic.spi.ILoggingEvent import ch.qos.logback.core.read.ListAppender import com.fasterxml.jackson.databind.ObjectMapper import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataJobSubscriptionOperationInEvent +import org.onap.cps.ncmp.impl.utils.JexParser import org.onap.cps.ncmp.utils.TestUtils import org.onap.cps.utils.JsonObjectMapper import org.slf4j.LoggerFactory @@ -36,7 +37,8 @@ import spock.lang.Specification @SpringBootTest(classes = [ObjectMapper, JsonObjectMapper]) class NcmpInEventConsumerSpec extends Specification { - def objectUnderTest = new NcmpInEventConsumer() + def mockCmSubscriptionHandler = Mock(CmSubscriptionHandlerImpl) + def objectUnderTest = new NcmpInEventConsumer(mockCmSubscriptionHandler) def logger = new ListAppender() @Autowired @@ -54,23 +56,34 @@ class NcmpInEventConsumerSpec extends Specification { ((Logger) LoggerFactory.getLogger(NcmpInEventConsumer.class)).detachAndStopAllAppenders() } - def 'Consuming CM Data Notification #scenario data type id.'() { - given: 'a JSON file containing a subscription event' - def jsonData = TestUtils.getResourceFileContent('sample_dataJobSubscriptionInEvent.json') - jsonData = jsonData.replace('#dataTypeId', dataTypeId) + def 'Consuming CREATE cm data job subscription request.'() { + given: 'a JSON file for create event' + def jsonData = TestUtils.getResourceFileContent( + 'datajobs/subscription/cmNotificationSubscriptionNcmpInEvent.json') + def myEventType = "dataJobCreated" + jsonData = jsonData.replace('#myEventType', myEventType) + and: 'the event' def event = objectMapper.readValue(jsonData, DataJobSubscriptionOperationInEvent) + and: 'the list of data node selectors' + def dataNodeSelectorList = getDataNodeSelectorsAsXpaths(event) + and: 'the other data job event attributes' + def dataSelector = getDataSelector(event) when: 'the event is consumed' objectUnderTest.consumeSubscriptionEvent(event) then: 'event details are logged at level INFO' def loggingEvent = logger.list.last() assert loggingEvent.level == Level.INFO - assert loggingEvent.formattedMessage.contains('jobId=my job id') - assert loggingEvent.formattedMessage.contains('eventType=my event type') - assert loggingEvent.formattedMessage.contains("dataType=${dataTypeId}") - assert loggingEvent.formattedMessage.contains('fdns=[/SubNetwork[id="SN1"]]') - where: 'the following data type ids are used' - scenario | dataTypeId - 'with' | 'my data type' - 'without' | 'null' + assert loggingEvent.formattedMessage.contains('dataJobId=myDataJobId') + assert loggingEvent.formattedMessage.contains("eventType=${myEventType}") + and: 'method to handle process subscription create request is called' + 1 * mockCmSubscriptionHandler.processSubscriptionCreate(dataSelector, "myDataJobId", dataNodeSelectorList) + } + + def getDataNodeSelectorsAsXpaths(event) { + return JexParser.toXpaths(event.event.dataJob.productionJobDefinition.targetSelector.dataNodeSelector) + } + + def getDataSelector(event) { + return event.event.dataJob.productionJobDefinition.dataSelector } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpOutEventMapperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpOutEventMapperSpec.groovy deleted file mode 100644 index cefb855207..0000000000 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpOutEventMapperSpec.groovy +++ /dev/null @@ -1,69 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2024 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp - - -import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails -import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate -import spock.lang.Specification - -import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_OPERATIONAL -import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_RUNNING -import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED -import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.PENDING -import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.REJECTED - -class NcmpOutEventMapperSpec extends Specification { - - static Map dmiSubscriptionsPerDmi - - def objectUnderTest = new NcmpOutEventMapper() - - def setup() { - def dmiSubscriptionPredicateA = new DmiCmSubscriptionPredicate(['ch-A'] as Set, PASSTHROUGH_RUNNING, ['/a'] as Set) - def dmiSubscriptionPredicateB = new DmiCmSubscriptionPredicate(['ch-B'] as Set, PASSTHROUGH_OPERATIONAL, ['/b'] as Set) - def dmiSubscriptionPredicateC = new DmiCmSubscriptionPredicate(['ch-C'] as Set, PASSTHROUGH_OPERATIONAL, ['/c'] as Set) - dmiSubscriptionsPerDmi = ['dmi-1': new DmiCmSubscriptionDetails([dmiSubscriptionPredicateA], PENDING), - 'dmi-2': new DmiCmSubscriptionDetails([dmiSubscriptionPredicateB], ACCEPTED), - 'dmi-3': new DmiCmSubscriptionDetails([dmiSubscriptionPredicateC], REJECTED) - ] - } - - def 'Check for Cm Notification Subscription Outgoing event mapping'() { - when: 'we try to map the event to send it to client' - def result = objectUnderTest.toNcmpOutEvent('test-subscription', dmiSubscriptionsPerDmi) - then: 'event is mapped correctly for the subscription' - result.data.subscriptionId == 'test-subscription' - and: 'the cm handle ids are part of correct list' - result.data.pendingTargets == ['ch-A'] as Set - result.data.acceptedTargets == ['ch-B'] as Set - result.data.rejectedTargets == ['ch-C'] as Set - } - - def 'Check for Cm Notification Rejected Subscription Outgoing event mapping'() { - when: 'we try to map the event to send it to client' - def result = objectUnderTest.toNcmpOutEventForRejectedRequest('test-subscription', ['ch-1', 'ch-2']) - then: 'event is mapped correctly for the subscription id' - result.data.subscriptionId == 'test-subscription' - and: 'the cm handle ids are part of correct list' - result.data.withRejectedTargets(['ch-1', 'ch-2']) - } -} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpOutEventProducerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpOutEventProducerSpec.groovy deleted file mode 100644 index 3ec8514a45..0000000000 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpOutEventProducerSpec.groovy +++ /dev/null @@ -1,130 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp - -import com.fasterxml.jackson.databind.ObjectMapper -import io.cloudevents.CloudEvent -import io.cloudevents.core.v1.CloudEventBuilder -import org.onap.cps.events.EventsProducer -import org.onap.cps.ncmp.config.CpsApplicationContext -import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.Data -import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent -import org.onap.cps.ncmp.impl.datajobs.subscription.cache.DmiCacheHandler -import org.onap.cps.ncmp.utils.events.CloudEventMapper -import org.onap.cps.utils.JsonObjectMapper -import org.springframework.boot.test.context.SpringBootTest -import org.springframework.test.context.ContextConfiguration -import spock.lang.Specification - -@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper, CloudEventBuilder]) -@ContextConfiguration(classes = [CpsApplicationContext]) -class NcmpOutEventProducerSpec extends Specification { - - def mockEventsProducer = Mock(EventsProducer) - def mockNcmpOutEventMapper = Mock(NcmpOutEventMapper) - def mockDmiCacheHandler = Mock(DmiCacheHandler) - - def objectUnderTest = new NcmpOutEventProducer(mockEventsProducer, mockNcmpOutEventMapper, mockDmiCacheHandler) - - def 'Create and #scenario Cm Notification Subscription NCMP out event'() { - given: 'a cm subscription response for the client' - def subscriptionId = 'test-subscription-id-2' - def eventType = 'subscriptionCreateResponse' - def ncmpOutEvent = new NcmpOutEvent(data: new Data(subscriptionId: 'test-subscription-id-2', acceptedTargets: ['ch-1', 'ch-2'])) - and: 'also we have target topic for sending to client' - objectUnderTest.ncmpOutEventTopic = 'client-test-topic' - and: 'a deadline to an event' - objectUnderTest.dmiOutEventTimeoutInMs = 1000 - when: 'the event is sent' - objectUnderTest.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, eventPublishingTaskToBeScheduled) - then: 'we conditionally wait for a while' - Thread.sleep(delayInMs) - then: 'the event contains the required attributes' - 1 * mockEventsProducer.sendCloudEvent(_, _, _) >> { - args -> - { - assert args[0] == 'client-test-topic' - assert args[1] == subscriptionId - def ncmpOutEventAsCloudEvent = (args[2] as CloudEvent) - assert ncmpOutEventAsCloudEvent.getExtension('correlationid') == subscriptionId - assert ncmpOutEventAsCloudEvent.type == 'subscriptionCreateResponse' - assert ncmpOutEventAsCloudEvent.source.toString() == 'NCMP' - assert CloudEventMapper.toTargetEvent(ncmpOutEventAsCloudEvent, NcmpOutEvent) == ncmpOutEvent - } - } - where: 'following scenarios are considered' - scenario | delayInMs | eventPublishingTaskToBeScheduled - 'send event now' | 0 | false - 'schedule and send after the configured time ' | 1500 | true - } - - def 'Schedule Cm Notification Subscription NCMP out event but later send it on demand'() { - given: 'a cm subscription response for the client' - def subscriptionId = 'test-subscription-id-3' - def eventType = 'subscriptionCreateResponse' - def ncmpOutEvent = new NcmpOutEvent(data: new Data(subscriptionId: 'test-subscription-id-3', acceptedTargets: ['ch-2', 'ch-3'])) - and: 'also we have target topic for sending to client' - objectUnderTest.ncmpOutEventTopic = 'client-test-topic' - and: 'a deadline to an event' - objectUnderTest.dmiOutEventTimeoutInMs = 1000 - when: 'the event is scheduled to be sent' - objectUnderTest.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, true) - then: 'we wait for 10ms and then we receive response from DMI' - Thread.sleep(10) - and: 'we receive response from DMI so we send the message on demand' - objectUnderTest.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false) - then: 'the event contains the required attributes' - 1 * mockEventsProducer.sendCloudEvent(_, _, _) >> { - args -> - { - assert args[0] == 'client-test-topic' - assert args[1] == subscriptionId - def ncmpOutEventAsCloudEvent = (args[2] as CloudEvent) - assert ncmpOutEventAsCloudEvent.getExtension('correlationid') == subscriptionId - assert ncmpOutEventAsCloudEvent.type == 'subscriptionCreateResponse' - assert ncmpOutEventAsCloudEvent.source.toString() == 'NCMP' - assert CloudEventMapper.toTargetEvent(ncmpOutEventAsCloudEvent, NcmpOutEvent) == ncmpOutEvent - } - } - then: 'the cache handler is called once to remove accepted and rejected entries in cache' - 1 * mockDmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId) - } - - def 'No event sent when NCMP out event is null'() { - given: 'a cm subscription response for the client' - def subscriptionId = 'test-subscription-id-3' - def eventType = 'subscriptionCreateResponse' - def ncmpOutEvent = null - and: 'also we have target topic for sending to client' - objectUnderTest.ncmpOutEventTopic = 'client-test-topic' - and: 'a deadline to an event' - objectUnderTest.dmiOutEventTimeoutInMs = 1000 - when: 'the event is scheduled to be sent' - objectUnderTest.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, true) - then: 'we wait for 10ms and then we receive response from DMI' - Thread.sleep(10) - and: 'we receive NO response from DMI so we send the message on demand' - objectUnderTest.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false) - and: 'no event sent' - 0 * mockEventsProducer.sendCloudEvent(*_) - } - -} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/utils/CmSubscriptionPersistenceServiceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/utils/CmSubscriptionPersistenceServiceSpec.groovy index 2bb770541e..ed2fa33636 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/utils/CmSubscriptionPersistenceServiceSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/utils/CmSubscriptionPersistenceServiceSpec.groovy @@ -21,6 +21,13 @@ package org.onap.cps.ncmp.impl.datajobs.subscription.utils + +import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR +import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID +import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_INACTIVE_SUBSCRIPTIONS +import static CmDataJobSubscriptionPersistenceService.PARENT_NODE_XPATH +import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS + import com.fasterxml.jackson.databind.ObjectMapper import org.onap.cps.api.CpsDataService import org.onap.cps.api.CpsQueryService @@ -29,10 +36,6 @@ import org.onap.cps.utils.ContentType import org.onap.cps.utils.JsonObjectMapper import spock.lang.Specification -import static CmDataJobSubscriptionPersistenceService.CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH -import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE -import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID -import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS class CmSubscriptionPersistenceServiceSpec extends Specification { @@ -44,11 +47,11 @@ class CmSubscriptionPersistenceServiceSpec extends Specification { def 'Check cm data job subscription details has at least one subscriber #scenario'() { given: 'a valid cm data job subscription query' - def cpsPathQuery = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('altId1', 'dataType1') + def cpsPathQuery = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR.formatted('/myDataNodeSelector') and: 'datanodes optionally returned' 1 * mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', cpsPathQuery, OMIT_DESCENDANTS) >> dataNode when: 'we check if subscription details already has at least one subscriber' - def result = objectUnderTest.hasAtLeastOneSubscription('dataType1', 'altId1') + def result = objectUnderTest.hasAtLeastOneSubscription('/myDataNodeSelector') then: 'we get expected result' assert result == hasAtLeastOneSubscription where: 'following scenarios are used' @@ -72,87 +75,48 @@ class CmSubscriptionPersistenceServiceSpec extends Specification { 'no datanodes present' | [] || true } - def 'Get all nodes for subscription id'() { + def 'Get all inactive data node selectors for subscription id'() { given: 'the query service returns nodes for subscription id' - def expectedDataNode = new DataNode(leaves: ['datajobId': ['id1'], 'dataTypeId': 'dataType1', 'alternateId': 'altId1']) + def expectedDataNode = new DataNode(leaves: ['datajobId': ['id1'], 'dataNodeSelector': '/dataNodeSelector', 'status': 'UNKNOWN']) def queryServiceResponse = [expectedDataNode].asCollection() - def cmDataJobSubscriptionIdCpsPath = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID.formatted('mySubId') + def cmDataJobSubscriptionIdCpsPath = CPS_PATH_TEMPLATE_FOR_INACTIVE_SUBSCRIPTIONS.formatted('id1') 1 * mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', cmDataJobSubscriptionIdCpsPath, OMIT_DESCENDANTS) >> queryServiceResponse when: 'retrieving all nodes for data job subscription id' - def result = objectUnderTest.getAffectedDataNodes('mySubId') + def result = objectUnderTest.getInactiveDataNodeSelectors('id1') then: 'the result returns correct number of datanodes' assert result.size() == 1 and: 'the attribute of the data nodes is as expected' - assert result.iterator().next().leaves.alternateId == expectedDataNode.leaves.alternateId - assert result.iterator().next().leaves.dataTypeId == expectedDataNode.leaves.dataTypeId + assert result.iterator().next() == expectedDataNode.leaves.dataNodeSelector } - def 'Add subscription for a data type and and fdn that have no subscriptions yet.'() { + def 'Add subscription for a data node selector that have no subscriptions yet.'() { given: 'a valid cm data job subscription path query' - def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('altId1', 'dataType1') + def dataNodeSelector = '/myDataNodeSelector' + def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR.formatted(dataNodeSelector) and: 'a data node does not exist for cm data job subscription path query' mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', query, OMIT_DESCENDANTS) >> [] - and: 'a datanode does not exist for the given cm data job subscription path query' - mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions', query, OMIT_DESCENDANTS) >> [] and: 'data job subscription details is mapped as JSON' def subscriptionIds = ['newSubId'] - def subscriptionAsJson = objectUnderTest.getSubscriptionDetailsAsJson(subscriptionIds, 'dataType1', 'altId1') - when: 'the method to add/update cm notification subscription is called' - objectUnderTest.addSubscription('dataType1', 'altId1', 'newSubId') + def subscriptionAsJson = objectUnderTest.createSubscriptionDetailsAsJson(dataNodeSelector, subscriptionIds, 'UNKNOWN') + when: 'the method to add cm notification subscription is called' + objectUnderTest.add('newSubId', dataNodeSelector) then: 'data service method to create new subscription for given subscriber is called once with the correct parameters' 1 * mockCpsDataService.saveData('NCMP-Admin', 'cm-data-job-subscriptions', subscriptionAsJson, _, ContentType.JSON) } - def 'Add subscription for a data type and fdn that already have subscription(s).'() { + def 'Add subscription for a data node selector that already have subscription(s).'() { given: 'a valid cm subscription path query' - def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('altId1', 'dataType1') + def dataNodeSelector = '/myDataNodeSelector' + def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR.formatted(dataNodeSelector) and: 'a dataNode exists for the given cps path query' - mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', query, OMIT_DESCENDANTS) >> [new DataNode(leaves: ['dataJobId': ['existingId'], 'dataTypeId': 'dataType1', 'alternateId': 'altId1'])] + mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', query, OMIT_DESCENDANTS) >> [new DataNode(leaves: ['dataJobId': ['existingId'], 'dataNodeSelector': dataNodeSelector, 'status': 'ACCEPTED'])] and: 'updated cm data job subscription details as json' def newListOfSubscriptionIds = ['existingId', 'newSubId'] - def subscriptionDetailsAsJson = objectUnderTest.getSubscriptionDetailsAsJson(newListOfSubscriptionIds, 'dataType1', 'altId1') - when: 'the method to add/update cm notification subscription is called' - objectUnderTest.addSubscription('dataType1', 'altId1', 'newSubId') + def subscriptionDetailsAsJson = objectUnderTest.createSubscriptionDetailsAsJson(dataNodeSelector, newListOfSubscriptionIds, 'ACCEPTED') + when: 'the method to add cm notification subscription is called' + objectUnderTest.add('newSubId', dataNodeSelector) then: 'data service method to update list of subscribers is called once' - 1 * mockCpsDataService.updateNodeLeaves('NCMP-Admin', 'cm-data-job-subscriptions', CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH, subscriptionDetailsAsJson, _, ContentType.JSON) - } - - def 'Remove subscription (other subscriptions remain for same data type and target).'() { - given: 'a subscription exists when queried' - def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('altId1', 'dataType1') - mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', query, OMIT_DESCENDANTS) - >> [new DataNode(leaves: ['dataJobId': ['existingId', 'subIdToRemove'], 'dataTypeId': 'dataType1', 'alternateId': 'altId1'])] - and: 'updated cm data job subscription details as json' - def subscriptionDetailsAsJson = objectUnderTest.getSubscriptionDetailsAsJson(['existingId'], 'dataType1', 'altId1') - when: 'the subscriber is removed' - objectUnderTest.removeSubscription('dataType1', 'altId1', 'subIdToRemove') - then: 'the list of subscribers is updated' - 1 * mockCpsDataService.updateNodeLeaves('NCMP-Admin', 'cm-data-job-subscriptions', CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH, subscriptionDetailsAsJson, _, ContentType.JSON) - } - - def 'Remove last subscription (no subscriptions remain for same data type and target).'() { - given: 'a subscription exists when queried but has only 1 subscriber' - def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('last-alt-id', 'last-data-type') - mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', query, OMIT_DESCENDANTS) - >> [new DataNode(leaves: ['dataJobId': ['subIdToRemove'], 'dataTypeId': 'last-data-type', 'alternateId': 'last-alt-id'])] - and: 'a cps path with alternate id and data type for deleting a node' - def cpsPath = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('last-alt-id', 'last-data-type') - when: 'that last ongoing subscription is removed' - objectUnderTest.removeSubscription('last-data-type', 'last-alt-id', 'subIdToRemove') - then: 'the data job subscription with empty subscribers list is removed' - 1 * mockCpsDataService.deleteDataNode('NCMP-Admin', 'cm-data-job-subscriptions', cpsPath, _) - } - - def 'Attempt to remove non existing subscription (id).'() { - given: 'a subscription exists when queried with other subscriber' - def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('some-alt-id', 'some-data-type') - mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', query, OMIT_DESCENDANTS) >> [new DataNode(leaves: ['dataJobId': ['otherDataJobId']])] - when: 'the remove subscription method is with a non existing id' - objectUnderTest.removeSubscription('some-data-type', 'some-alt-id', 'nonExistingSubId') - then: 'no calls to cps data service is made' - 0 * mockCpsDataService.deleteDataNode(*_) - and: 'removal of non existent subscription id silently ignored with no exception thrown' - noExceptionThrown() + 1 * mockCpsDataService.updateNodeLeaves('NCMP-Admin', 'cm-data-job-subscriptions', PARENT_NODE_XPATH, subscriptionDetailsAsJson, _, ContentType.JSON) } } diff --git a/cps-ncmp-service/src/test/resources/cmSubscription/cmNotificationSubscriptionNcmpInEvent.json b/cps-ncmp-service/src/test/resources/cmSubscription/cmNotificationSubscriptionNcmpInEvent.json deleted file mode 100644 index 04d37b8bb9..0000000000 --- a/cps-ncmp-service/src/test/resources/cmSubscription/cmNotificationSubscriptionNcmpInEvent.json +++ /dev/null @@ -1,21 +0,0 @@ -{ - "data": { - "subscriptionId": "test-id", - "predicates": [ - { - "targetFilter": ["ch1","ch2"], - "scopeFilter": { - "datastore": "ncmp-datastore:passthrough-operational", - "xpathFilter": ["/x1/y1","x2/y2"] - } - }, - { - "targetFilter": ["ch3","ch4"], - "scopeFilter": { - "datastore": "ncmp-datastore:passthrough-operational", - "xpathFilter": ["/x3/y3","x4/y4"] - } - } - ] - } -} \ No newline at end of file diff --git a/cps-ncmp-service/src/test/resources/cmSubscription/cmNotificationSubscriptionDmiOutEvent.json b/cps-ncmp-service/src/test/resources/datajobs/subscription/cmNotificationSubscriptionDmiOutEvent.json similarity index 100% rename from cps-ncmp-service/src/test/resources/cmSubscription/cmNotificationSubscriptionDmiOutEvent.json rename to cps-ncmp-service/src/test/resources/datajobs/subscription/cmNotificationSubscriptionDmiOutEvent.json diff --git a/cps-ncmp-service/src/test/resources/sample_dataJobSubscriptionInEvent.json b/cps-ncmp-service/src/test/resources/datajobs/subscription/cmNotificationSubscriptionNcmpInEvent.json similarity index 78% rename from cps-ncmp-service/src/test/resources/sample_dataJobSubscriptionInEvent.json rename to cps-ncmp-service/src/test/resources/datajobs/subscription/cmNotificationSubscriptionNcmpInEvent.json index f81e7e1215..6858bdb88e 100644 --- a/cps-ncmp-service/src/test/resources/sample_dataJobSubscriptionInEvent.json +++ b/cps-ncmp-service/src/test/resources/datajobs/subscription/cmNotificationSubscriptionNcmpInEvent.json @@ -1,8 +1,8 @@ { - "eventType": "my event type", + "eventType": "#myEventType", "event": { "dataJob": { - "id": "my job id", + "id": "myDataJobId", "productionJobDefinition": { "targetSelector": { "dataNodeSelector": "/SubNetwork[id=\"SN1\"]" @@ -16,7 +16,7 @@ } }, "dataType": { - "dataTypeId": "#dataTypeId" + "dataTypeId": "" } } } \ No newline at end of file diff --git a/integration-test/src/test/resources/application.yml b/integration-test/src/test/resources/application.yml index f1ae7efdc4..2593c49ac4 100644 --- a/integration-test/src/test/resources/application.yml +++ b/integration-test/src/test/resources/application.yml @@ -100,7 +100,6 @@ app: avc: cm-subscription-ncmp-in: ${CM_SUBSCRIPTION_NCMP_IN_TOPIC:subscription} cm-subscription-dmi-in: ${CM_SUBSCRIPTION_DMI_IN_TOPIC:ncmp-dmi-cm-avc-subscription} - cm-subscription-dmi-out: ${CM_SUBSCRIPTION_DMI_OUT_TOPIC:dmi-ncmp-cm-avc-subscription} cm-subscription-ncmp-out: ${CM_SUBSCRIPTION_NCMP_OUT_TOPIC:subscription-response} cm-events-topic: ${NCMP_CM_EVENTS_TOPIC:cm-events} inventory-events-topic: ncmp-inventory-events @@ -178,8 +177,6 @@ ncmp: sleep-time-ms: 1000000 cm-handle-data-sync: sleep-time-ms: 30000 - subscription-forwarding: - dmi-response-timeout-ms: 30000 model-loader: retry-time-ms: 1000 trust-level: -- 2.16.6