Modify methods in CmSubscriptionPersistenceService 71/141671/6
authoremaclee <lee.anjella.macabuhay@est.tech>
Fri, 8 Aug 2025 16:36:11 +0000 (17:36 +0100)
committeremaclee <lee.anjella.macabuhay@est.tech>
Tue, 19 Aug 2025 09:03:58 +0000 (10:03 +0100)
- existing methods of CmSubscriptionPersistenceService to fit the new cm subscription model
- unit tests updated, not used tests removed
- DmiCacheHandler, DmiCacheHandlerSpec, CmSubscriptionComparator, CmSubscriptionComparatorSpec was only modified to pass tests due to changes on persistence service methods - these classes will be changed as part of CPS-2166 development

Issue-ID: CPS-2919
Change-Id: Ic0e470a9993b8a9d7414e301a199564490d0a044
Signed-off-by: emaclee <lee.anjella.macabuhay@est.tech>
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandler.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionComparator.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmDataJobSubscriptionPersistenceService.java [new file with mode: 0644]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceService.java [deleted file]
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandlerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionComparatorSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImplSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceServiceSpec.groovy

index b5ab7f6..b0b1cc5 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2024 Nordix Foundation
+ *  Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -35,7 +35,7 @@ import org.onap.cps.ncmp.api.data.models.DatastoreType;
 import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus;
 import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
 import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmDataJobSubscriptionPersistenceService;
 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate;
 import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
 import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
@@ -45,7 +45,7 @@ import org.springframework.stereotype.Component;
 @RequiredArgsConstructor
 public class DmiCacheHandler {
 
-    private final CmSubscriptionPersistenceService cmSubscriptionPersistenceService;
+    private final CmDataJobSubscriptionPersistenceService cmDataJobSubscriptionPersistenceService;
     private final Map<String, Map<String, DmiCmSubscriptionDetails>> cmNotificationSubscriptionCache;
     private final InventoryPersistence inventoryPersistence;
 
@@ -156,13 +156,9 @@ public class DmiCacheHandler {
         for (final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate : dmiCmSubscriptionPredicates) {
             final DatastoreType datastoreType = dmiCmSubscriptionPredicate.getDatastoreType();
             final Set<String> cmHandles = dmiCmSubscriptionPredicate.getTargetCmHandleIds();
-            final Set<String> xpaths = dmiCmSubscriptionPredicate.getXpaths();
-
             for (final String cmHandle: cmHandles) {
-                for (final String xpath: xpaths) {
-                    cmSubscriptionPersistenceService.addCmSubscription(datastoreType, cmHandle,
-                            xpath, subscriptionId);
-                }
+                cmDataJobSubscriptionPersistenceService.addSubscription(datastoreType.getDatastoreName(),
+                        cmHandle, subscriptionId);
             }
         }
     }
@@ -181,13 +177,9 @@ public class DmiCacheHandler {
         for (final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate : dmiCmSubscriptionPredicates) {
             final DatastoreType datastoreType = dmiCmSubscriptionPredicate.getDatastoreType();
             final Set<String> cmHandles = dmiCmSubscriptionPredicate.getTargetCmHandleIds();
-            final Set<String> xpaths = dmiCmSubscriptionPredicate.getXpaths();
-
             for (final String cmHandle: cmHandles) {
-                for (final String xpath: xpaths) {
-                    cmSubscriptionPersistenceService.removeCmSubscription(datastoreType,
-                            cmHandle, xpath, subscriptionId);
-                }
+                cmDataJobSubscriptionPersistenceService.removeSubscription(datastoreType.getDatastoreName(),
+                        cmHandle, subscriptionId);
             }
         }
     }
index 99c5695..cf1db6e 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2024 Nordix Foundation
+ *  Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -27,14 +27,14 @@ import java.util.Set;
 import lombok.RequiredArgsConstructor;
 import org.onap.cps.ncmp.api.data.models.DatastoreType;
 import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmDataJobSubscriptionPersistenceService;
 import org.springframework.stereotype.Component;
 
 @Component
 @RequiredArgsConstructor
 public class CmSubscriptionComparator {
 
-    private final CmSubscriptionPersistenceService cmSubscriptionPersistenceService;
+    private final CmDataJobSubscriptionPersistenceService cmDataJobSubscriptionPersistenceService;
 
     /**
      * Get the new Dmi Predicates for a given predicates list.
@@ -51,13 +51,9 @@ public class CmSubscriptionComparator {
             final Set<String> xpaths = new HashSet<>();
             final DatastoreType datastoreType = dmiCmSubscriptionPredicate.getDatastoreType();
             for (final String cmHandleId : dmiCmSubscriptionPredicate.getTargetCmHandleIds()) {
-                for (final String xpath : dmiCmSubscriptionPredicate.getXpaths()) {
-                    if (!cmSubscriptionPersistenceService.isOngoingCmSubscription(datastoreType,
-                            cmHandleId, xpath)) {
-                        targetCmHandleIds.add(cmHandleId);
-                        xpaths.add(xpath);
-
-                    }
+                if (!cmDataJobSubscriptionPersistenceService.hasAtLeastOneSubscription(
+                        datastoreType.getDatastoreName(), cmHandleId)) {
+                    targetCmHandleIds.add(cmHandleId);
                 }
             }
             populateValidDmiSubscriptionPredicates(targetCmHandleIds, xpaths, datastoreType,
index f6ac0cf..d0285e1 100644 (file)
@@ -41,7 +41,7 @@ import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptio
 import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionKey;
 import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
 import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionTuple;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmDataJobSubscriptionPersistenceService;
 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate;
 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent;
@@ -58,7 +58,7 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler {
             "^/datastores/datastore\\[@name='([^']*)']/cm-handles/cm-handle\\[@id='([^']*)']/"
                     + "filters/filter\\[@xpath='(.*)']$");
 
-    private final CmSubscriptionPersistenceService cmSubscriptionPersistenceService;
+    private final CmDataJobSubscriptionPersistenceService cmDataJobSubscriptionPersistenceService;
     private final CmSubscriptionComparator cmSubscriptionComparator;
     private final NcmpOutEventMapper ncmpOutEventMapper;
     private final DmiInEventMapper dmiInEventMapper;
@@ -70,7 +70,7 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler {
 
     @Override
     public void processSubscriptionCreateRequest(final String subscriptionId, final List<Predicate> predicates) {
-        if (cmSubscriptionPersistenceService.isUniqueSubscriptionId(subscriptionId)) {
+        if (cmDataJobSubscriptionPersistenceService.isNewSubscriptionId(subscriptionId)) {
             dmiCacheHandler.add(subscriptionId, predicates);
             handleNewCmSubscription(subscriptionId);
             scheduleNcmpOutEventResponse(subscriptionId, "subscriptionCreateResponse");
@@ -82,7 +82,7 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler {
     @Override
     public void processSubscriptionDeleteRequest(final String subscriptionId) {
         final Collection<DataNode> subscriptionDataNodes =
-                cmSubscriptionPersistenceService.getAllNodesForSubscriptionId(subscriptionId);
+                cmDataJobSubscriptionPersistenceService.getAffectedDataNodes(subscriptionId);
         final DmiCmSubscriptionTuple dmiCmSubscriptionTuple =
                 getLastRemainingAndOverlappingSubscriptionsPerDmi(subscriptionDataNodes);
         dmiCacheHandler.add(subscriptionId, mergeDmiCmSubscriptionDetailsPerDmiMaps(dmiCmSubscriptionTuple));
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmDataJobSubscriptionPersistenceService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmDataJobSubscriptionPersistenceService.java
new file mode 100644 (file)
index 0000000..5c53f3d
--- /dev/null
@@ -0,0 +1,186 @@
+/*
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
+ *  Modifications Copyright (C) 2024 TechMahindra Ltd.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.utils;
+
+import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS;
+
+import java.io.Serializable;
+import java.time.OffsetDateTime;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.api.CpsDataService;
+import org.onap.cps.api.CpsQueryService;
+import org.onap.cps.api.model.DataNode;
+import org.onap.cps.utils.ContentType;
+import org.onap.cps.utils.JsonObjectMapper;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class CmDataJobSubscriptionPersistenceService {
+
+    private static final String NCMP_DATASPACE_NAME = "NCMP-Admin";
+    private static final String CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME = "cm-data-job-subscriptions";
+    private static final String CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH = "/dataJob";
+    private static final String CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE =
+            "/dataJob/subscription[@alternateId='%s' and @dataTypeId='%s']";
+    private static final String CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID =
+            "//subscription/dataJobId[text()='%s']";
+
+    private final JsonObjectMapper jsonObjectMapper;
+    private final CpsQueryService cpsQueryService;
+    private final CpsDataService cpsDataService;
+
+    /**
+     * Check if we have a cm data job subscription for the given data type and target (FDN).
+     *
+     * @param dataType      the data type of the data job subscription
+     * @param alternateId   the alternate id target of the data job subscription
+     * @return              true if the subscription details has at least one subscriber , otherwise false
+     */
+    public boolean hasAtLeastOneSubscription(final String dataType, final String alternateId) {
+        return !getSubscriptionIds(dataType, alternateId).isEmpty();
+    }
+
+    /**
+     * Check if the input is a new subscription ID against ongoing subscriptions.
+     *
+     * @param subscriptionId subscription ID
+     * @return true if subscriptionId is not used in active subscriptions, otherwise false
+     */
+    public boolean isNewSubscriptionId(final String subscriptionId) {
+        final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID.formatted(subscriptionId);
+        return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME,
+                query, OMIT_DESCENDANTS).isEmpty();
+    }
+
+    /**
+     * Get the ids for the subscriptions for the given data type and targets.
+     *
+     * @param dataType      the data type of the data job subscription
+     * @param alternateId   the alternate id target of the data job subscription
+     * @return              collection of subscription ids of ongoing cm notification subscription
+     */
+    public Collection<String> getSubscriptionIds(final String dataType, final String alternateId) {
+        final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted(
+                alternateId, dataType);
+        final Collection<DataNode> existingNodes =
+                cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME,
+                        query, OMIT_DESCENDANTS);
+        if (existingNodes.isEmpty()) {
+            return Collections.emptyList();
+        }
+        return (List<String>) existingNodes.iterator().next().getLeaves().get("dataJobId");
+    }
+
+    /**
+     * Add cm notification data job subscription.
+     *
+     * @param dataType          the data type of the data job subscription
+     * @param alternateId       the alternate id target of the data job subscription
+     * @param subscriptionId data job subscription id to be added
+     */
+    public void addSubscription(final String dataType, final String alternateId, final String subscriptionId) {
+        final Collection<String> subscriptionIds =
+                getSubscriptionIds(dataType, alternateId);
+        if (subscriptionIds.isEmpty()) {
+            addNewSubscriptionDetails(dataType, alternateId, subscriptionId);
+        } else {
+            subscriptionIds.add(subscriptionId);
+            updateSubscriptionDetails(subscriptionIds, dataType, alternateId);
+        }
+    }
+
+    /**
+     * Remove cm notification data job Subscription.
+     *
+     * @param dataType          the data type of the data job subscription
+     * @param alternateId       the alternate id target of the data job subscription
+     * @param subscriptionId    data subscription id to remove
+     */
+    public void removeSubscription(final String dataType, final String alternateId, final String subscriptionId) {
+        final Collection<String> subscriptionIds = getSubscriptionIds(dataType, alternateId);
+        if (subscriptionIds.remove(subscriptionId)) {
+            updateSubscriptionDetails(subscriptionIds, dataType, alternateId);
+            log.info("There is at least one subscriber left for dataType {} on {}", dataType, alternateId);
+            if (subscriptionIds.isEmpty()) {
+                log.info("There are no subscribers left for dataType {} on {}", dataType, alternateId);
+                deleteUnusedSubscriptionDetails(dataType, alternateId);
+            }
+        }
+    }
+
+    /**
+     * Retrieve all existing data nodes for given data job subscription id.
+     *
+     * @param subscriptionId  data job subscription id
+     * @return                collection of DataNodes
+     */
+    public Collection<DataNode> getAffectedDataNodes(final String subscriptionId) {
+        final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID.formatted(subscriptionId);
+        return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME,
+                query, OMIT_DESCENDANTS);
+    }
+
+    private void deleteUnusedSubscriptionDetails(final String dataType, final String alternateId) {
+        final String deleteListOfSubscriptionCpsPathQuery =
+                CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted(alternateId,
+                        dataType);
+        cpsDataService.deleteDataNode(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME,
+                deleteListOfSubscriptionCpsPathQuery, OffsetDateTime.now());
+    }
+
+    private void addNewSubscriptionDetails(final String dataType,
+                                           final String alternateId,
+                                           final String subscriptionId) {
+        final Collection<String> newSubscriptionList = Collections.singletonList(subscriptionId);
+        final String subscriptionDetailsAsJson = getSubscriptionDetailsAsJson(newSubscriptionList, dataType,
+                alternateId);
+        cpsDataService.saveData(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME, subscriptionDetailsAsJson,
+                OffsetDateTime.now(), ContentType.JSON);
+    }
+
+    private void updateSubscriptionDetails(final Collection<String> subscriptionIds, final String dataType,
+                                           final String alternateId) {
+        final String subscriptionDetailsAsJson = getSubscriptionDetailsAsJson(subscriptionIds, dataType, alternateId);
+        cpsDataService.updateNodeLeaves(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME,
+                CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH, subscriptionDetailsAsJson, OffsetDateTime.now(),
+                ContentType.JSON);
+    }
+
+    private String getSubscriptionDetailsAsJson(final Collection<String> subscriptionIds,
+                                                final String dataTypeId,
+                                                final String alternateId) {
+        final Map<String, Serializable> subscriptionDetailsAsMap =
+                Map.of("dataTypeId", dataTypeId,
+                        "alternateId", alternateId,
+                        "dataJobId", (Serializable) subscriptionIds);
+        return "{\"subscription\":[" + jsonObjectMapper.asJsonString(subscriptionDetailsAsMap) + "]}";
+    }
+
+}
+
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceService.java
deleted file mode 100644 (file)
index e0f2531..0000000
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- *  Copyright (C) 2024 Nordix Foundation
- *  Modifications Copyright (C) 2024 TechMahindra Ltd.
- *  ================================================================================
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *        http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- *  SPDX-License-Identifier: Apache-2.0
- *  ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.utils;
-
-import static org.onap.cps.api.parameters.FetchDescendantsOption.DIRECT_CHILDREN_ONLY;
-import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS;
-
-import java.io.Serializable;
-import java.time.OffsetDateTime;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.api.CpsDataService;
-import org.onap.cps.api.CpsQueryService;
-import org.onap.cps.api.model.DataNode;
-import org.onap.cps.ncmp.api.data.models.DatastoreType;
-import org.onap.cps.utils.ContentType;
-import org.onap.cps.utils.JsonObjectMapper;
-import org.springframework.stereotype.Service;
-
-@Slf4j
-@Service
-@RequiredArgsConstructor
-public class CmSubscriptionPersistenceService {
-
-    private static final String NCMP_DATASPACE_NAME = "NCMP-Admin";
-    private static final String CM_SUBSCRIPTIONS_ANCHOR_NAME = "cm-data-subscriptions";
-
-    private static final String SUBSCRIPTION_ANCHOR_NAME = "cm-data-subscriptions";
-    private static final String CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_AND_CMHANDLE = """
-            /datastores/datastore[@name='%s']/cm-handles/cm-handle[@id='%s']
-            """.trim();
-    private static final String CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE =
-            CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_AND_CMHANDLE + "/filters";
-
-    private static final String CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH =
-            CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE + "/filter[@xpath='%s']";
-
-
-    private static final String CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_ID = """
-            //filter/subscriptionIds[text()='%s']
-            """.trim();
-
-    private final JsonObjectMapper jsonObjectMapper;
-    private final CpsQueryService cpsQueryService;
-    private final CpsDataService cpsDataService;
-
-    /**
-     * Check if we have an ongoing cm subscription based on the parameters.
-     *
-     * @param datastoreType the susbcription target datastore type
-     * @param cmHandleId    the id of the cm handle for the susbcription
-     * @param xpath         the target xpath
-     * @return true for ongoing cmsubscription , otherwise false
-     */
-    public boolean isOngoingCmSubscription(final DatastoreType datastoreType, final String cmHandleId,
-            final String xpath) {
-        return !getOngoingCmSubscriptionIds(datastoreType, cmHandleId, xpath).isEmpty();
-    }
-
-    /**
-     * Check if the subscription ID is unique against ongoing subscriptions.
-     *
-     * @param subscriptionId subscription ID
-     * @return true if subscriptionId is not used in active subscriptions, otherwise false
-     */
-    public boolean isUniqueSubscriptionId(final String subscriptionId) {
-        return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
-                CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_ID.formatted(subscriptionId), OMIT_DESCENDANTS).isEmpty();
-    }
-
-    /**
-     * Get all ongoing cm notification subscription based on the parameters.
-     *
-     * @param datastoreType the susbcription target datastore type
-     * @param cmHandleId    the id of the cm handle for the susbcription
-     * @param xpath         the target xpath
-     * @return collection of subscription ids of ongoing cm notification subscription
-     */
-    public Collection<String> getOngoingCmSubscriptionIds(final DatastoreType datastoreType,
-            final String cmHandleId, final String xpath) {
-
-        final String isOngoingCmSubscriptionCpsPathQuery =
-                CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted(
-                        datastoreType.getDatastoreName(), cmHandleId, escapeQuotesByDoublingThem(xpath));
-        final Collection<DataNode> existingNodes =
-                cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_SUBSCRIPTIONS_ANCHOR_NAME,
-                        isOngoingCmSubscriptionCpsPathQuery, OMIT_DESCENDANTS);
-        if (existingNodes.isEmpty()) {
-            return Collections.emptyList();
-        }
-        return (List<String>) existingNodes.iterator().next().getLeaves().get("subscriptionIds");
-    }
-
-    /**
-     * Add cm notification subscription.
-     *
-     * @param datastoreType     the susbcription target datastore type
-     * @param cmHandleId        the id of the cm handle for the susbcription
-     * @param xpath             the target xpath
-     * @param newSubscriptionId subscription id to be added
-     */
-    public void addCmSubscription(final DatastoreType datastoreType, final String cmHandleId,
-            final String xpath, final String newSubscriptionId) {
-        final Collection<String> subscriptionIds =
-                getOngoingCmSubscriptionIds(datastoreType, cmHandleId, xpath);
-        if (subscriptionIds.isEmpty()) {
-            addFirstSubscriptionForDatastoreCmHandleAndXpath(datastoreType, cmHandleId, xpath, newSubscriptionId);
-        } else if (!subscriptionIds.contains(newSubscriptionId)) {
-            subscriptionIds.add(newSubscriptionId);
-            saveSubscriptionDetails(datastoreType, cmHandleId, xpath, subscriptionIds);
-        }
-    }
-
-    /**
-     * Remove cm notification Subscription.
-     *
-     * @param datastoreType  the susbcription target datastore type
-     * @param cmHandleId     the id of the cm handle for the susbcription
-     * @param xpath          the target xpath
-     * @param subscriptionId subscription id to remove
-     */
-    public void removeCmSubscription(final DatastoreType datastoreType, final String cmHandleId,
-            final String xpath, final String subscriptionId) {
-        final Collection<String> subscriptionIds =
-                getOngoingCmSubscriptionIds(datastoreType, cmHandleId, xpath);
-        if (subscriptionIds.remove(subscriptionId)) {
-            saveSubscriptionDetails(datastoreType, cmHandleId, xpath, subscriptionIds);
-            log.info("There are subscribers left for the following cps path {} :",
-                    CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted(
-                            datastoreType.getDatastoreName(), cmHandleId, escapeQuotesByDoublingThem(xpath)));
-            if (subscriptionIds.isEmpty()) {
-                log.info("No subscribers left for the following cps path {} :",
-                        CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted(
-                                datastoreType.getDatastoreName(), cmHandleId, escapeQuotesByDoublingThem(xpath)));
-                deleteListOfSubscriptionsFor(datastoreType, cmHandleId, xpath);
-            }
-        }
-    }
-
-    /**
-     * Retrieve all existing dataNodes for given subscription id.
-     *
-     * @param subscriptionId  subscription id
-     * @return collection of DataNodes
-     */
-    public Collection<DataNode> getAllNodesForSubscriptionId(final String subscriptionId) {
-        return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
-                CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_ID.formatted(subscriptionId),
-                OMIT_DESCENDANTS);
-    }
-
-    private void deleteListOfSubscriptionsFor(final DatastoreType datastoreType, final String cmHandleId,
-            final String xpath) {
-        cpsDataService.deleteDataNode(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
-                CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted(
-                        datastoreType.getDatastoreName(), cmHandleId, escapeQuotesByDoublingThem(xpath)),
-                OffsetDateTime.now());
-        final Collection<DataNode> existingFiltersForCmHandle =
-                cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_SUBSCRIPTIONS_ANCHOR_NAME,
-                                CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted(
-                                        datastoreType.getDatastoreName(), cmHandleId),
-                                DIRECT_CHILDREN_ONLY).iterator().next()
-                        .getChildDataNodes();
-        if (existingFiltersForCmHandle.isEmpty()) {
-            removeCmHandleFromDatastore(datastoreType.getDatastoreName(), cmHandleId);
-        }
-    }
-
-    private void removeCmHandleFromDatastore(final String datastoreName, final String cmHandleId) {
-        cpsDataService.deleteDataNode(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
-                CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_AND_CMHANDLE.formatted(datastoreName, cmHandleId),
-                OffsetDateTime.now());
-    }
-
-    private boolean isFirstSubscriptionForCmHandle(final DatastoreType datastoreType, final String cmHandleId) {
-        return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_SUBSCRIPTIONS_ANCHOR_NAME,
-                CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted(
-                        datastoreType.getDatastoreName(), cmHandleId), OMIT_DESCENDANTS).isEmpty();
-    }
-
-    private void addFirstSubscriptionForDatastoreCmHandleAndXpath(final DatastoreType datastoreType,
-            final String cmHandleId, final String xpath, final String subscriptionId) {
-        final Collection<String> newSubscriptionList = Collections.singletonList(subscriptionId);
-        final String subscriptionDetailsAsJson = getSubscriptionDetailsAsJson(xpath, newSubscriptionList);
-        if (isFirstSubscriptionForCmHandle(datastoreType, cmHandleId)) {
-            final String parentXpath =
-                    "/datastores/datastore[@name='%s']/cm-handles".formatted(datastoreType.getDatastoreName());
-            final String subscriptionAsJson =
-                    String.format("{\"cm-handle\":[{\"id\":\"%s\",\"filters\":%s}]}", cmHandleId,
-                            subscriptionDetailsAsJson);
-            cpsDataService.saveData(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, parentXpath, subscriptionAsJson,
-                    OffsetDateTime.now(), ContentType.JSON);
-        } else {
-            cpsDataService.saveListElements(NCMP_DATASPACE_NAME, CM_SUBSCRIPTIONS_ANCHOR_NAME,
-                    CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted(
-                            datastoreType.getDatastoreName(), cmHandleId), subscriptionDetailsAsJson,
-                    OffsetDateTime.now(), ContentType.JSON);
-        }
-    }
-
-    private void saveSubscriptionDetails(final DatastoreType datastoreType, final String cmHandleId, final String xpath,
-            final Collection<String> subscriptionIds) {
-        final String subscriptionDetailsAsJson = getSubscriptionDetailsAsJson(xpath, subscriptionIds);
-        cpsDataService.updateNodeLeaves(NCMP_DATASPACE_NAME, CM_SUBSCRIPTIONS_ANCHOR_NAME,
-                CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted(
-                        datastoreType.getDatastoreName(), cmHandleId), subscriptionDetailsAsJson, OffsetDateTime.now(),
-                ContentType.JSON);
-    }
-
-    private String getSubscriptionDetailsAsJson(final String xpath, final Collection<String> subscriptionIds) {
-        final Map<String, Serializable> subscriptionDetailsAsMap =
-                Map.of("xpath", xpath, "subscriptionIds", (Serializable) subscriptionIds);
-        return "{\"filter\":[" + jsonObjectMapper.asJsonString(subscriptionDetailsAsMap) + "]}";
-    }
-
-    private static String escapeQuotesByDoublingThem(final String inputXpath) {
-        return inputXpath.replace("'", "''");
-    }
-
-}
-
index 791a154..5e2c29b 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2024 Nordix Foundation
+ *  Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the 'License');
  *  you may not use this file except in compliance with the License.
@@ -26,7 +26,7 @@ import io.cloudevents.core.builder.CloudEventBuilder
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus
 import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmDataJobSubscriptionPersistenceService
 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.NcmpInEvent
 import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
 import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
@@ -49,7 +49,7 @@ class DmiCacheHandlerSpec extends MessagingBaseSpec {
     @SpringBean
     InventoryPersistence mockInventoryPersistence = Mock(InventoryPersistence)
     @SpringBean
-    CmSubscriptionPersistenceService mockCmSubscriptionPersistenceService = Mock(CmSubscriptionPersistenceService)
+    CmDataJobSubscriptionPersistenceService mockCmSubscriptionPersistenceService = Mock(CmDataJobSubscriptionPersistenceService)
 
     def testCache = [:]
     def objectUnderTest = new DmiCacheHandler(mockCmSubscriptionPersistenceService, testCache, mockInventoryPersistence)
@@ -188,7 +188,7 @@ class DmiCacheHandlerSpec extends MessagingBaseSpec {
         when: 'subscription is persisted in database'
             objectUnderTest.persistIntoDatabasePerDmi(subscriptionId,'dmi-1')
         then: 'persistence service is called the correct number of times per dmi'
-            4 * mockCmSubscriptionPersistenceService.addCmSubscription(_,_,_,subscriptionId)
+            2 * mockCmSubscriptionPersistenceService.addSubscription(*_)
     }
 
     def 'Remove subscription from database per dmi'() {
@@ -199,7 +199,7 @@ class DmiCacheHandlerSpec extends MessagingBaseSpec {
         when: 'subscription is persisted in database'
             objectUnderTest.removeFromDatabase(subscriptionId,'dmi-1')
         then: 'persistence service is called the correct number of times per dmi'
-            4 * mockCmSubscriptionPersistenceService.removeCmSubscription(_,_,_,subscriptionId)
+            2 * mockCmSubscriptionPersistenceService.removeSubscription(*_)
     }
 
     def setUpTestEvent(){
index 0ebf9a6..c06e16a 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- * Copyright (c) 2024 Nordix Foundation.
+ * Copyright (c) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
  */
 package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp
 
-
 import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmDataJobSubscriptionPersistenceService
 import spock.lang.Specification
 
 import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_OPERATIONAL
 
 class CmSubscriptionComparatorSpec extends Specification {
 
-    def mockCmSubscriptionPersistenceService = Mock(CmSubscriptionPersistenceService)
+    def mockCmSubscriptionPersistenceService = Mock(CmDataJobSubscriptionPersistenceService)
     def objectUnderTest = new CmSubscriptionComparator(mockCmSubscriptionPersistenceService)
 
-    def 'Find Delta of given list of predicates'() {
+    def 'Find the difference based on the provided predicates'() {
         given: 'A list of predicates'
             def predicates = [new DmiCmSubscriptionPredicate(['ch-1', 'ch-2'].toSet(), PASSTHROUGH_OPERATIONAL, ['a/1/', 'b/2'].toSet())]
-        and: '3 positive responses and 1 negative.'
-            mockCmSubscriptionPersistenceService.isOngoingCmSubscription(PASSTHROUGH_OPERATIONAL, 'ch-1', 'a/1/') >>> true
-            mockCmSubscriptionPersistenceService.isOngoingCmSubscription(PASSTHROUGH_OPERATIONAL, 'ch-1', 'b/2') >>> true
-            mockCmSubscriptionPersistenceService.isOngoingCmSubscription(PASSTHROUGH_OPERATIONAL, 'ch-2', 'a/1/') >>> true
-            mockCmSubscriptionPersistenceService.isOngoingCmSubscription(PASSTHROUGH_OPERATIONAL, 'ch-2', 'b/2') >>> false
-        when: 'getDelta is called'
+        and: '1 positive and 1 negative response.'
+            mockCmSubscriptionPersistenceService.hasAtLeastOneSubscription(PASSTHROUGH_OPERATIONAL.getDatastoreName(), 'ch-1') >> true
+            mockCmSubscriptionPersistenceService.hasAtLeastOneSubscription(PASSTHROUGH_OPERATIONAL.getDatastoreName(), 'ch-2') >> false
+        when: 'method to extract only NEW predicates for dmi is called'
             def result = objectUnderTest.getNewDmiSubscriptionPredicates(predicates)
-        then: 'verify correct delta is returned'
+        then: 'from 2 predicates only 1 remains'
             assert result.size() == 1
             assert result[0].targetCmHandleIds[0] == 'ch-2'
-            assert result[0].xpaths[0] == 'b/2'
-
     }
 
-    def 'Find Delta of given list of predicates when it is an ongoing Cm Subscription'() {
+    def 'Find the difference based on the provided predicates when it is an ongoing Cm Subscription'() {
         given: 'A list of predicates'
             def predicates = [new DmiCmSubscriptionPredicate(['ch-1'].toSet(), PASSTHROUGH_OPERATIONAL, ['a/1/'].toSet())]
         and: 'its already present'
-            mockCmSubscriptionPersistenceService.isOngoingCmSubscription(PASSTHROUGH_OPERATIONAL, 'ch-1', 'a/1/') >>> true
-        when: 'getDelta is called'
+            mockCmSubscriptionPersistenceService.hasAtLeastOneSubscription(PASSTHROUGH_OPERATIONAL.getDatastoreName(), 'ch-1') >> true
+        when: 'method to extract only NEW predicates for dmi is called'
             def result = objectUnderTest.getNewDmiSubscriptionPredicates(predicates)
-        then: 'verify correct delta is returned'
+        then: 'from 1 predicate, none remains'
             assert result.size() == 0
     }
 
index e4321ff..5690b0d 100644 (file)
@@ -27,7 +27,7 @@ import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventMapper
 import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventProducer
 import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails
 import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmDataJobSubscriptionPersistenceService
 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.NcmpInEvent
 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent
 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent
@@ -45,7 +45,7 @@ import static org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscri
 class CmSubscriptionHandlerImplSpec extends Specification {
 
     def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
-    def mockCmSubscriptionPersistenceService = Mock(CmSubscriptionPersistenceService)
+    def mockCmSubscriptionPersistenceService = Mock(CmDataJobSubscriptionPersistenceService)
     def mockCmSubscriptionComparator = Mock(CmSubscriptionComparator)
     def mockNcmpOutEventMapper = Mock(NcmpOutEventMapper)
     def mockDmiInEventMapper = Mock(DmiInEventMapper)
@@ -62,11 +62,12 @@ class CmSubscriptionHandlerImplSpec extends Specification {
     def testDmiSubscriptionsPerDmi = ["dmi-1": new DmiCmSubscriptionDetails([], PENDING)]
 
     def 'Consume valid and unique CmNotificationSubscriptionNcmpInEvent create message'() {
-        given: 'a cmNotificationSubscriptionNcmp in event with unique subscription id'
+        given: 'a cmNotificationSubscriptionNcmp in event with new subscription id'
             def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
             def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent.class)
             def testListOfDeltaPredicates = [new DmiCmSubscriptionPredicate(['ch1'].toSet(), PASSTHROUGH_OPERATIONAL, ['/a/b'].toSet())]
-            mockCmSubscriptionPersistenceService.isUniqueSubscriptionId("test-id") >> true
+        and: 'the persistence service confirms subscription id is new (not used for other subscription)'
+            mockCmSubscriptionPersistenceService.isNewSubscriptionId('test-id') >> true
         and: 'relevant details is extracted from the event'
             def subscriptionId = testEventConsumed.getData().getSubscriptionId()
             def predicates = testEventConsumed.getData().getPredicates()
@@ -93,7 +94,8 @@ class CmSubscriptionHandlerImplSpec extends Specification {
             def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
             def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent.class)
             def noDeltaPredicates = []
-            mockCmSubscriptionPersistenceService.isUniqueSubscriptionId("test-id") >> true
+        and: 'the persistence service confirms subscription id is new (not used for other subscription)'
+            mockCmSubscriptionPersistenceService.isNewSubscriptionId('test-id') >> true
         and: 'the cache handler returns for relevant subscription id'
             1 * mockDmiCacheHandler.get('test-id') >> testDmiSubscriptionsPerDmi
         and: 'the delta predicates is returned'
@@ -112,7 +114,8 @@ class CmSubscriptionHandlerImplSpec extends Specification {
         given: 'a cmNotificationSubscriptionNcmp in event'
             def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
             def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent.class)
-            mockCmSubscriptionPersistenceService.isUniqueSubscriptionId('test-id') >> false
+        and: 'the persistence service confirms subscription id is not new (already used subscription)'
+            mockCmSubscriptionPersistenceService.isNewSubscriptionId('test-id') >> false
         and: 'relevant details is extracted from the event'
             def subscriptionId = testEventConsumed.getData().getSubscriptionId()
             def predicates = testEventConsumed.getData().getPredicates()
@@ -132,9 +135,9 @@ class CmSubscriptionHandlerImplSpec extends Specification {
         given: 'a test subscription id'
             def subscriptionId = 'test-id'
         and: 'the persistence service returns datanodes'
-            1 * mockCmSubscriptionPersistenceService.getAllNodesForSubscriptionId(subscriptionId) >>
+            1 * mockCmSubscriptionPersistenceService.getAffectedDataNodes(subscriptionId) >>
                 [new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-1']/filters/filter[@xpath='x/y']", leaves: ['xpath': 'x/y', 'subscriptionIds': ['test-id']]),
-                new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-2']/filters/filter[@xpath='y/z']", leaves: ['xpath': 'y/z', 'subscriptionIds': ['test-id']])]
+                 new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-2']/filters/filter[@xpath='y/z']", leaves: ['xpath': 'y/z', 'subscriptionIds': ['test-id']])]
         and: 'the inventory persistence returns yang model cm handles'
             1 * mockInventoryPersistence.getYangModelCmHandle('ch-1') >> new YangModelCmHandle(dmiServiceName: 'dmi-1')
             1 * mockInventoryPersistence.getYangModelCmHandle('ch-2') >> new YangModelCmHandle(dmiServiceName: 'dmi-2')
@@ -151,7 +154,7 @@ class CmSubscriptionHandlerImplSpec extends Specification {
         given: 'a test subscription id'
             def subscriptionId = 'test-id'
         and: 'the persistence service returns datanodes with multiple subscribers'
-            1 * mockCmSubscriptionPersistenceService.getAllNodesForSubscriptionId(subscriptionId) >>
+            1 * mockCmSubscriptionPersistenceService.getAffectedDataNodes(subscriptionId) >>
                 [new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-1']/filters/filter[@xpath='x/y']", leaves: ['xpath': 'x/y', 'subscriptionIds': ['test-id','other-id']]),
                  new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-2']/filters/filter[@xpath='y/z']", leaves: ['xpath': 'y/z', 'subscriptionIds': ['test-id','other-id']])]
         and: 'the inventory persistence returns yang model cm handles'
index b95d647..b077dea 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- * Copyright (c) 2024 Nordix Foundation.
+ * Copyright (c) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
  * Modifications Copyright (C) 2024 TechMahindra Ltd.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
 
 package org.onap.cps.ncmp.impl.cmnotificationsubscription.utils
 
+import static CmDataJobSubscriptionPersistenceService.CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH
+import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE
+import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID
+import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS
+
 import com.fasterxml.jackson.databind.ObjectMapper
 import org.onap.cps.api.CpsDataService
 import org.onap.cps.api.CpsQueryService
@@ -29,176 +34,125 @@ import org.onap.cps.utils.ContentType
 import org.onap.cps.utils.JsonObjectMapper
 import spock.lang.Specification
 
-import static CmSubscriptionPersistenceService.CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE
-import static CmSubscriptionPersistenceService.CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH
-import static CmSubscriptionPersistenceService.CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_ID
-import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_OPERATIONAL
-import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_RUNNING
-import static org.onap.cps.api.parameters.FetchDescendantsOption.DIRECT_CHILDREN_ONLY
-import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS
-
 class CmSubscriptionPersistenceServiceSpec extends Specification {
 
     def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
     def mockCpsQueryService = Mock(CpsQueryService)
     def mockCpsDataService = Mock(CpsDataService)
 
-    def objectUnderTest = new CmSubscriptionPersistenceService(jsonObjectMapper, mockCpsQueryService, mockCpsDataService)
+    def objectUnderTest = new CmDataJobSubscriptionPersistenceService(jsonObjectMapper, mockCpsQueryService, mockCpsDataService)
 
-    def 'Check ongoing cm subscription #scenario'() {
-        given: 'a valid cm subscription query'
-            def cpsPathQuery = "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-1']/filters/filter[@xpath='/cps/path']"
+    def 'Check cm data job subscription details has at least one subscriber #scenario'() {
+        given: 'a valid cm data job subscription query'
+            def cpsPathQuery = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('altId1', 'dataType1')
         and: 'datanodes optionally returned'
-            1 * mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
-                cpsPathQuery, OMIT_DESCENDANTS) >> dataNode
-        when: 'we check for an ongoing cm subscription'
-            def response = objectUnderTest.isOngoingCmSubscription(PASSTHROUGH_RUNNING, 'ch-1', '/cps/path')
-        then: 'we get expected response'
-            assert response == isOngoingCmSubscription
+            1 * mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', cpsPathQuery, OMIT_DESCENDANTS) >> dataNode
+        when: 'we check if subscription details already has at least one subscriber'
+            def result = objectUnderTest.hasAtLeastOneSubscription('dataType1', 'altId1')
+        then: 'we get expected result'
+            assert result == hasAtLeastOneSubscription
         where: 'following scenarios are used'
-            scenario                  | dataNode                                                                            || isOngoingCmSubscription
-            'valid datanodes present' | [new DataNode(xpath: '/cps/path', leaves: ['subscriptionIds': ['sub-1', 'sub-2']])] || true
-            'no datanodes present'    | []                                                                                  || false
+            scenario                  | dataNode                                             || hasAtLeastOneSubscription
+            'valid datanodes present' | [new DataNode(leaves: ['dataJobId': ['dataJobId1']])]|| true
+            'no datanodes present'    | []                                                   || false
     }
 
     def 'Checking uniqueness of incoming subscription ID'() {
-        given: 'a cps path with a subscription ID for querying'
-            def cpsPathQuery = CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_ID.formatted('some-sub')
-        and: 'relevant datanodes are returned'
-            1 * mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions', cpsPathQuery, OMIT_DESCENDANTS) >>
-                dataNodes
-        when: 'a subscription ID is tested for uniqueness'
-            def result = objectUnderTest.isUniqueSubscriptionId('some-sub')
+        given: 'a cps path with a data job subscription ID for querying'
+            def cpsPathQuery = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID.formatted('mySubId')
+        and: 'collection of data nodes are returned'
+            1 * mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', cpsPathQuery, OMIT_DESCENDANTS) >> dataNodes
+        when: 'a data job subscription id is tested for uniqueness'
+            def result = objectUnderTest.isNewSubscriptionId('mySubId')
         then: 'result is as expected'
-            assert result == isValidSubscriptionId
+            assert result == isValidDataJobSubscriptionId
         where: 'following scenarios are used'
-            scenario               | dataNodes        || isValidSubscriptionId
+            scenario               | dataNodes        || isValidDataJobSubscriptionId
             'datanodes present'    | [new DataNode()] || false
             'no datanodes present' | []               || true
     }
 
-    def 'Add new subscriber to an ongoing cm notification subscription'() {
-        given: 'a valid cm subscription path query'
-            def cpsPathQuery = CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted('ncmp-datastore:passthrough-running', 'ch-1', '/x/y')
-        and: 'a dataNode exists for the given cps path query'
-            mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
-                cpsPathQuery, OMIT_DESCENDANTS) >> [new DataNode(xpath: cpsPathQuery, leaves: ['xpath': '/x/y', 'subscriptionIds': ['sub-1']])]
-        when: 'the method to add/update cm notification subscription is called'
-            objectUnderTest.addCmSubscription(PASSTHROUGH_RUNNING, 'ch-1', '/x/y', 'newSubId')
-        then: 'data service method to update list of subscribers is called once'
-            1 * mockCpsDataService.updateNodeLeaves(
-                'NCMP-Admin',
-                'cm-data-subscriptions',
-                '/datastores/datastore[@name=\'ncmp-datastore:passthrough-running\']/cm-handles/cm-handle[@id=\'ch-1\']/filters',
-                objectUnderTest.getSubscriptionDetailsAsJson('/x/y', ['sub-1', 'newSubId']), _, ContentType.JSON)
+    def 'Get all nodes for subscription id'() {
+        given: 'the query service returns nodes for subscription id'
+            def expectedDataNode = new DataNode(leaves: ['datajobId': ['id1'], 'dataTypeId': 'dataType1', 'alternateId': 'altId1'])
+            def queryServiceResponse = [expectedDataNode].asCollection()
+            def cmDataJobSubscriptionIdCpsPath = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID.formatted('mySubId')
+            1 * mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', cmDataJobSubscriptionIdCpsPath, OMIT_DESCENDANTS) >> queryServiceResponse
+        when: 'retrieving all nodes for data job subscription id'
+            def result = objectUnderTest.getAffectedDataNodes('mySubId')
+        then: 'the result returns correct number of datanodes'
+            assert result.size() == 1
+        and: 'the attribute of the data nodes is as expected'
+            assert result.iterator().next().leaves.alternateId == expectedDataNode.leaves.alternateId
+            assert result.iterator().next().leaves.dataTypeId == expectedDataNode.leaves.dataTypeId
     }
 
-    def 'Add new cm notification subscription for #datastoreType'() {
-        given: 'a valid cm subscription path query'
-            def cmSubscriptionCpsPathQuery = CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted(datastoreName, 'ch-1', '/x/y')
-            def cmHandleForSubscriptionPathQuery = CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted(datastoreName, 'ch-1')
-        and: 'a parent node xpath for the cm subscription path above'
-            def parentNodeXpath = '/datastores/datastore[@name=\'%s\']/cm-handles'
-        and: 'a datanode does not exist for cm subscription path query'
-            mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
-                cmSubscriptionCpsPathQuery,
-                OMIT_DESCENDANTS) >> []
-        and: 'a datanode does not exist for the given cm handle subscription path query'
-            mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
-                cmHandleForSubscriptionPathQuery, OMIT_DESCENDANTS) >> []
-        and: 'subscription is mapped as JSON'
-            def subscriptionAsJson = '{"cm-handle":[{"id":"ch-1","filters":' +
-                objectUnderTest.getSubscriptionDetailsAsJson('/x/y', ['newSubId']) + '}]}'
+    def 'Add subscription for a data type and and fdn that have no subscriptions yet.'() {
+        given: 'a valid cm data job subscription path query'
+            def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('altId1', 'dataType1')
+        and: 'a data node does not exist for cm data job subscription path query'
+            mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', query, OMIT_DESCENDANTS) >> []
+        and: 'a datanode does not exist for the given cm data job subscription path query'
+            mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions', query, OMIT_DESCENDANTS) >> []
+        and: 'data job subscription details is mapped as JSON'
+            def subscriptionIds = ['newSubId']
+            def subscriptionAsJson = objectUnderTest.getSubscriptionDetailsAsJson(subscriptionIds, 'dataType1', 'altId1')
         when: 'the method to add/update cm notification subscription is called'
-            objectUnderTest.addCmSubscription(datastoreType, 'ch-1', '/x/y', 'newSubId')
+            objectUnderTest.addSubscription('dataType1', 'altId1', 'newSubId')
         then: 'data service method to create new subscription for given subscriber is called once with the correct parameters'
-            1 * mockCpsDataService.saveData(
-                'NCMP-Admin',
-                'cm-data-subscriptions',
-                parentNodeXpath.formatted(datastoreName),
-                subscriptionAsJson, _, ContentType.JSON)
-        where:
-            scenario                  | datastoreType           || datastoreName
-            'passthrough_running'     | PASSTHROUGH_RUNNING     || 'ncmp-datastore:passthrough-running'
-            'passthrough_operational' | PASSTHROUGH_OPERATIONAL || 'ncmp-datastore:passthrough-operational'
+            1 * mockCpsDataService.saveData('NCMP-Admin', 'cm-data-job-subscriptions', subscriptionAsJson, _, ContentType.JSON)
     }
 
-    def 'Add new cm notification subscription when xpath does not exist for existing subscription cm handle'() {
+    def 'Add subscription for a data type and fdn that already have subscription(s).'() {
         given: 'a valid cm subscription path query'
-            def cmSubscriptionCpsPathQuery = CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted(datastoreName, 'ch-1', '/x/y')
-            def cmHandleForSubscriptionPathQuery = CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted(datastoreName, 'ch-1')
-        and: 'a parent node xpath for given cm handle for subscription path above'
-            def parentNodeXpath = '/datastores/datastore[@name=\'%s\']/cm-handles/cm-handle[@id=\'%s\']/filters'
-        and: 'a datanode does not exist for cm subscription path query'
-            mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
-                cmSubscriptionCpsPathQuery, OMIT_DESCENDANTS) >> []
-        and: 'a datanode exists for the given cm handle subscription path query'
-            mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
-                cmHandleForSubscriptionPathQuery, OMIT_DESCENDANTS) >> [new DataNode()]
+            def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('altId1', 'dataType1')
+        and: 'a dataNode exists for the given cps path query'
+            mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', query, OMIT_DESCENDANTS) >> [new DataNode(leaves: ['dataJobId': ['existingId'], 'dataTypeId': 'dataType1', 'alternateId': 'altId1'])]
+        and: 'updated cm data job subscription details as json'
+            def newListOfSubscriptionIds = ['existingId', 'newSubId']
+            def subscriptionDetailsAsJson = objectUnderTest.getSubscriptionDetailsAsJson(newListOfSubscriptionIds, 'dataType1', 'altId1')
         when: 'the method to add/update cm notification subscription is called'
-            objectUnderTest.addCmSubscription(datastoreType, 'ch-1', '/x/y', 'newSubId')
-        then: 'data service method to create new subscription for given subscriber is called once with the correct parameters'
-            1 * mockCpsDataService.saveListElements(
-                'NCMP-Admin',
-                'cm-data-subscriptions',
-                parentNodeXpath.formatted(datastoreName, 'ch-1'),
-                objectUnderTest.getSubscriptionDetailsAsJson('/x/y', ['newSubId']), _, ContentType.JSON)
-        where:
-            scenario                  | datastoreType           || datastoreName
-            'passthrough_running'     | PASSTHROUGH_RUNNING     || 'ncmp-datastore:passthrough-running'
-            'passthrough_operational' | PASSTHROUGH_OPERATIONAL || 'ncmp-datastore:passthrough-operational'
+            objectUnderTest.addSubscription('dataType1', 'altId1', 'newSubId')
+        then: 'data service method to update list of subscribers is called once'
+            1 * mockCpsDataService.updateNodeLeaves('NCMP-Admin', 'cm-data-job-subscriptions', CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH, subscriptionDetailsAsJson, _, ContentType.JSON)
     }
 
-    def 'Remove subscriber from a list of an ongoing cm notification subscription'() {
+    def 'Remove subscription (other subscriptions remain for same data type and target).'() {
         given: 'a subscription exists when queried'
-            def cpsPathQuery = CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted('ncmp-datastore:passthrough-running', 'ch-1', '/x/y')
-            mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
-                cpsPathQuery, OMIT_DESCENDANTS) >> [new DataNode(xpath: cpsPathQuery, leaves: ['xpath': '/x/y', 'subscriptionIds': ['sub-1', 'sub-2']])]
+            def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('altId1', 'dataType1')
+            mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', query, OMIT_DESCENDANTS)
+                >> [new DataNode(leaves: ['dataJobId': ['existingId','subIdToRemove'], 'dataTypeId': 'dataType1', 'alternateId': 'altId1'])]
+        and: 'updated cm data job subscription details as json'
+            def subscriptionDetailsAsJson = objectUnderTest.getSubscriptionDetailsAsJson(['existingId'], 'dataType1', 'altId1')
         when: 'the subscriber is removed'
-            objectUnderTest.removeCmSubscription(PASSTHROUGH_RUNNING, 'ch-1', '/x/y', 'sub-1')
+            objectUnderTest.removeSubscription('dataType1', 'altId1','subIdToRemove')
         then: 'the list of subscribers is updated'
-            1 * mockCpsDataService.updateNodeLeaves('NCMP-Admin', 'cm-data-subscriptions',
-                '/datastores/datastore[@name=\'ncmp-datastore:passthrough-running\']/cm-handles/cm-handle[@id=\'ch-1\']/filters',
-                objectUnderTest.getSubscriptionDetailsAsJson('/x/y', ['sub-2']), _, ContentType.JSON)
+            1 * mockCpsDataService.updateNodeLeaves('NCMP-Admin', 'cm-data-job-subscriptions', CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH, subscriptionDetailsAsJson, _, ContentType.JSON)
     }
 
-    def 'Removing last ongoing subscription for datastore and cmhandle and xpath'() {
+    def 'Remove last subscription (no subscriptions remain for same data type and target).'() {
         given: 'a subscription exists when queried but has only 1 subscriber'
-            mockCpsQueryService.queryDataNodes(
-                'NCMP-Admin', 'cm-data-subscriptions',
-                CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted('ncmp-datastore:passthrough-running', 'ch-1', '/x/y'),
-                OMIT_DESCENDANTS) >> [new DataNode(leaves: ['xpath': '/x/y', 'subscriptionIds': ['sub-1']])]
-        and: 'the #scenario'
-            mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
-                CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted('ncmp-datastore:passthrough-running', 'ch-1'),
-                DIRECT_CHILDREN_ONLY) >> [new DataNode(childDataNodes: listOfChildNodes)]
+            def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('last-alt-id', 'last-data-type')
+            mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', query, OMIT_DESCENDANTS)
+                >> [new DataNode(leaves: ['dataJobId': ['subIdToRemove'], 'dataTypeId': 'last-data-type', 'alternateId': 'last-alt-id'])]
+        and: 'a cps path with alternate id and data type for deleting a node'
+            def cpsPath = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('last-alt-id', 'last-data-type')
         when: 'that last ongoing subscription is removed'
-            objectUnderTest.removeCmSubscription(PASSTHROUGH_RUNNING, 'ch-1', '/x/y', 'sub-1')
-        then: 'the subscription with empty subscriber list is removed'
-            1 * mockCpsDataService.deleteDataNode('NCMP-Admin', 'cm-data-subscriptions',
-                '/datastores/datastore[@name=\'ncmp-datastore:passthrough-running\']/cm-handles/cm-handle[@id=\'ch-1\']/filters/filter[@xpath=\'/x/y\']',
-                _)
-        and: 'method call to delete the cm handle is called the correct number of times'
-            numberOfCallsToDeleteCmHandle * mockCpsDataService.deleteDataNode('NCMP-Admin', 'cm-data-subscriptions',
-                '/datastores/datastore[@name=\'ncmp-datastore:passthrough-running\']/cm-handles/cm-handle[@id=\'ch-1\']',
-                _)
-        where:
-            scenario                                                          | listOfChildNodes || numberOfCallsToDeleteCmHandle
-            'cm handle in same datastore is used for other subscriptions'     | [new DataNode()] || 0
-            'cm handle in same datastore is NOT used for other subscriptions' | []               || 1
+            objectUnderTest.removeSubscription('last-data-type', 'last-alt-id','subIdToRemove')
+        then: 'the data job subscription with empty subscribers list is removed'
+            1 * mockCpsDataService.deleteDataNode('NCMP-Admin', 'cm-data-job-subscriptions', cpsPath, _)
     }
 
-    def 'Get all nodes for subscription id'() {
-        given: 'the query service returns nodes for subscription id'
-            def expectedDataNode = new DataNode(xpath: '/some/xpath')
-            def queryServiceResponse = [expectedDataNode].asCollection()
-            1 * mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions', '//filter/subscriptionIds[text()=\'some-id\']', OMIT_DESCENDANTS) >> queryServiceResponse
-        when: 'retrieving all nodes for subscription id'
-            def result = objectUnderTest.getAllNodesForSubscriptionId('some-id')
-        then: 'the result returns correct number of datanodes'
-            assert result.size() == 1
-        and: 'the attribute of the datanode is as expected'
-            assert result.iterator().next().xpath == expectedDataNode.xpath
+    def 'Attempt to remove non existing subscription (id).'() {
+        given: 'a subscription exists when queried with other subscriber'
+            def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('some-alt-id', 'some-data-type')
+            mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', query, OMIT_DESCENDANTS) >> [new DataNode(leaves: ['dataJobId': ['otherDataJobId']])]
+        when: 'the remove subscription method is with a non existing id'
+            objectUnderTest.removeSubscription('some-data-type', 'some-alt-id','nonExistingSubId')
+        then: 'no calls to cps data service is made'
+            0 * mockCpsDataService.deleteDataNode(*_)
+        and: 'removal of non existent subscription id silently ignored with no exception thrown'
+            noExceptionThrown()
     }
 
 }