- existing methods of CmSubscriptionPersistenceService to fit the new cm subscription model
- unit tests updated, not used tests removed
- DmiCacheHandler, DmiCacheHandlerSpec, CmSubscriptionComparator, CmSubscriptionComparatorSpec was only modified to pass tests due to changes on persistence service methods - these classes will be changed as part of CPS-2166 development
Issue-ID: CPS-2919
Change-Id: Ic0e470a9993b8a9d7414e301a199564490d0a044
Signed-off-by: emaclee <lee.anjella.macabuhay@est.tech>
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
+ * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus;
import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmDataJobSubscriptionPersistenceService;
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate;
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
@RequiredArgsConstructor
public class DmiCacheHandler {
- private final CmSubscriptionPersistenceService cmSubscriptionPersistenceService;
+ private final CmDataJobSubscriptionPersistenceService cmDataJobSubscriptionPersistenceService;
private final Map<String, Map<String, DmiCmSubscriptionDetails>> cmNotificationSubscriptionCache;
private final InventoryPersistence inventoryPersistence;
for (final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate : dmiCmSubscriptionPredicates) {
final DatastoreType datastoreType = dmiCmSubscriptionPredicate.getDatastoreType();
final Set<String> cmHandles = dmiCmSubscriptionPredicate.getTargetCmHandleIds();
- final Set<String> xpaths = dmiCmSubscriptionPredicate.getXpaths();
-
for (final String cmHandle: cmHandles) {
- for (final String xpath: xpaths) {
- cmSubscriptionPersistenceService.addCmSubscription(datastoreType, cmHandle,
- xpath, subscriptionId);
- }
+ cmDataJobSubscriptionPersistenceService.addSubscription(datastoreType.getDatastoreName(),
+ cmHandle, subscriptionId);
}
}
}
for (final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate : dmiCmSubscriptionPredicates) {
final DatastoreType datastoreType = dmiCmSubscriptionPredicate.getDatastoreType();
final Set<String> cmHandles = dmiCmSubscriptionPredicate.getTargetCmHandleIds();
- final Set<String> xpaths = dmiCmSubscriptionPredicate.getXpaths();
-
for (final String cmHandle: cmHandles) {
- for (final String xpath: xpaths) {
- cmSubscriptionPersistenceService.removeCmSubscription(datastoreType,
- cmHandle, xpath, subscriptionId);
- }
+ cmDataJobSubscriptionPersistenceService.removeSubscription(datastoreType.getDatastoreName(),
+ cmHandle, subscriptionId);
}
}
}
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
+ * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import lombok.RequiredArgsConstructor;
import org.onap.cps.ncmp.api.data.models.DatastoreType;
import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmDataJobSubscriptionPersistenceService;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class CmSubscriptionComparator {
- private final CmSubscriptionPersistenceService cmSubscriptionPersistenceService;
+ private final CmDataJobSubscriptionPersistenceService cmDataJobSubscriptionPersistenceService;
/**
* Get the new Dmi Predicates for a given predicates list.
final Set<String> xpaths = new HashSet<>();
final DatastoreType datastoreType = dmiCmSubscriptionPredicate.getDatastoreType();
for (final String cmHandleId : dmiCmSubscriptionPredicate.getTargetCmHandleIds()) {
- for (final String xpath : dmiCmSubscriptionPredicate.getXpaths()) {
- if (!cmSubscriptionPersistenceService.isOngoingCmSubscription(datastoreType,
- cmHandleId, xpath)) {
- targetCmHandleIds.add(cmHandleId);
- xpaths.add(xpath);
-
- }
+ if (!cmDataJobSubscriptionPersistenceService.hasAtLeastOneSubscription(
+ datastoreType.getDatastoreName(), cmHandleId)) {
+ targetCmHandleIds.add(cmHandleId);
}
}
populateValidDmiSubscriptionPredicates(targetCmHandleIds, xpaths, datastoreType,
import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionKey;
import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionTuple;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmDataJobSubscriptionPersistenceService;
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate;
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent;
"^/datastores/datastore\\[@name='([^']*)']/cm-handles/cm-handle\\[@id='([^']*)']/"
+ "filters/filter\\[@xpath='(.*)']$");
- private final CmSubscriptionPersistenceService cmSubscriptionPersistenceService;
+ private final CmDataJobSubscriptionPersistenceService cmDataJobSubscriptionPersistenceService;
private final CmSubscriptionComparator cmSubscriptionComparator;
private final NcmpOutEventMapper ncmpOutEventMapper;
private final DmiInEventMapper dmiInEventMapper;
@Override
public void processSubscriptionCreateRequest(final String subscriptionId, final List<Predicate> predicates) {
- if (cmSubscriptionPersistenceService.isUniqueSubscriptionId(subscriptionId)) {
+ if (cmDataJobSubscriptionPersistenceService.isNewSubscriptionId(subscriptionId)) {
dmiCacheHandler.add(subscriptionId, predicates);
handleNewCmSubscription(subscriptionId);
scheduleNcmpOutEventResponse(subscriptionId, "subscriptionCreateResponse");
@Override
public void processSubscriptionDeleteRequest(final String subscriptionId) {
final Collection<DataNode> subscriptionDataNodes =
- cmSubscriptionPersistenceService.getAllNodesForSubscriptionId(subscriptionId);
+ cmDataJobSubscriptionPersistenceService.getAffectedDataNodes(subscriptionId);
final DmiCmSubscriptionTuple dmiCmSubscriptionTuple =
getLastRemainingAndOverlappingSubscriptionsPerDmi(subscriptionDataNodes);
dmiCacheHandler.add(subscriptionId, mergeDmiCmSubscriptionDetailsPerDmiMaps(dmiCmSubscriptionTuple));
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
+ * Modifications Copyright (C) 2024 TechMahindra Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.utils;
+
+import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS;
+
+import java.io.Serializable;
+import java.time.OffsetDateTime;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.api.CpsDataService;
+import org.onap.cps.api.CpsQueryService;
+import org.onap.cps.api.model.DataNode;
+import org.onap.cps.utils.ContentType;
+import org.onap.cps.utils.JsonObjectMapper;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class CmDataJobSubscriptionPersistenceService {
+
+ private static final String NCMP_DATASPACE_NAME = "NCMP-Admin";
+ private static final String CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME = "cm-data-job-subscriptions";
+ private static final String CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH = "/dataJob";
+ private static final String CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE =
+ "/dataJob/subscription[@alternateId='%s' and @dataTypeId='%s']";
+ private static final String CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID =
+ "//subscription/dataJobId[text()='%s']";
+
+ private final JsonObjectMapper jsonObjectMapper;
+ private final CpsQueryService cpsQueryService;
+ private final CpsDataService cpsDataService;
+
+ /**
+ * Check if we have a cm data job subscription for the given data type and target (FDN).
+ *
+ * @param dataType the data type of the data job subscription
+ * @param alternateId the alternate id target of the data job subscription
+ * @return true if the subscription details has at least one subscriber , otherwise false
+ */
+ public boolean hasAtLeastOneSubscription(final String dataType, final String alternateId) {
+ return !getSubscriptionIds(dataType, alternateId).isEmpty();
+ }
+
+ /**
+ * Check if the input is a new subscription ID against ongoing subscriptions.
+ *
+ * @param subscriptionId subscription ID
+ * @return true if subscriptionId is not used in active subscriptions, otherwise false
+ */
+ public boolean isNewSubscriptionId(final String subscriptionId) {
+ final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID.formatted(subscriptionId);
+ return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME,
+ query, OMIT_DESCENDANTS).isEmpty();
+ }
+
+ /**
+ * Get the ids for the subscriptions for the given data type and targets.
+ *
+ * @param dataType the data type of the data job subscription
+ * @param alternateId the alternate id target of the data job subscription
+ * @return collection of subscription ids of ongoing cm notification subscription
+ */
+ public Collection<String> getSubscriptionIds(final String dataType, final String alternateId) {
+ final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted(
+ alternateId, dataType);
+ final Collection<DataNode> existingNodes =
+ cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME,
+ query, OMIT_DESCENDANTS);
+ if (existingNodes.isEmpty()) {
+ return Collections.emptyList();
+ }
+ return (List<String>) existingNodes.iterator().next().getLeaves().get("dataJobId");
+ }
+
+ /**
+ * Add cm notification data job subscription.
+ *
+ * @param dataType the data type of the data job subscription
+ * @param alternateId the alternate id target of the data job subscription
+ * @param subscriptionId data job subscription id to be added
+ */
+ public void addSubscription(final String dataType, final String alternateId, final String subscriptionId) {
+ final Collection<String> subscriptionIds =
+ getSubscriptionIds(dataType, alternateId);
+ if (subscriptionIds.isEmpty()) {
+ addNewSubscriptionDetails(dataType, alternateId, subscriptionId);
+ } else {
+ subscriptionIds.add(subscriptionId);
+ updateSubscriptionDetails(subscriptionIds, dataType, alternateId);
+ }
+ }
+
+ /**
+ * Remove cm notification data job Subscription.
+ *
+ * @param dataType the data type of the data job subscription
+ * @param alternateId the alternate id target of the data job subscription
+ * @param subscriptionId data subscription id to remove
+ */
+ public void removeSubscription(final String dataType, final String alternateId, final String subscriptionId) {
+ final Collection<String> subscriptionIds = getSubscriptionIds(dataType, alternateId);
+ if (subscriptionIds.remove(subscriptionId)) {
+ updateSubscriptionDetails(subscriptionIds, dataType, alternateId);
+ log.info("There is at least one subscriber left for dataType {} on {}", dataType, alternateId);
+ if (subscriptionIds.isEmpty()) {
+ log.info("There are no subscribers left for dataType {} on {}", dataType, alternateId);
+ deleteUnusedSubscriptionDetails(dataType, alternateId);
+ }
+ }
+ }
+
+ /**
+ * Retrieve all existing data nodes for given data job subscription id.
+ *
+ * @param subscriptionId data job subscription id
+ * @return collection of DataNodes
+ */
+ public Collection<DataNode> getAffectedDataNodes(final String subscriptionId) {
+ final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID.formatted(subscriptionId);
+ return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME,
+ query, OMIT_DESCENDANTS);
+ }
+
+ private void deleteUnusedSubscriptionDetails(final String dataType, final String alternateId) {
+ final String deleteListOfSubscriptionCpsPathQuery =
+ CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted(alternateId,
+ dataType);
+ cpsDataService.deleteDataNode(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME,
+ deleteListOfSubscriptionCpsPathQuery, OffsetDateTime.now());
+ }
+
+ private void addNewSubscriptionDetails(final String dataType,
+ final String alternateId,
+ final String subscriptionId) {
+ final Collection<String> newSubscriptionList = Collections.singletonList(subscriptionId);
+ final String subscriptionDetailsAsJson = getSubscriptionDetailsAsJson(newSubscriptionList, dataType,
+ alternateId);
+ cpsDataService.saveData(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME, subscriptionDetailsAsJson,
+ OffsetDateTime.now(), ContentType.JSON);
+ }
+
+ private void updateSubscriptionDetails(final Collection<String> subscriptionIds, final String dataType,
+ final String alternateId) {
+ final String subscriptionDetailsAsJson = getSubscriptionDetailsAsJson(subscriptionIds, dataType, alternateId);
+ cpsDataService.updateNodeLeaves(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME,
+ CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH, subscriptionDetailsAsJson, OffsetDateTime.now(),
+ ContentType.JSON);
+ }
+
+ private String getSubscriptionDetailsAsJson(final Collection<String> subscriptionIds,
+ final String dataTypeId,
+ final String alternateId) {
+ final Map<String, Serializable> subscriptionDetailsAsMap =
+ Map.of("dataTypeId", dataTypeId,
+ "alternateId", alternateId,
+ "dataJobId", (Serializable) subscriptionIds);
+ return "{\"subscription\":[" + jsonObjectMapper.asJsonString(subscriptionDetailsAsMap) + "]}";
+ }
+
+}
+
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * Modifications Copyright (C) 2024 TechMahindra Ltd.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.utils;
-
-import static org.onap.cps.api.parameters.FetchDescendantsOption.DIRECT_CHILDREN_ONLY;
-import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS;
-
-import java.io.Serializable;
-import java.time.OffsetDateTime;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.api.CpsDataService;
-import org.onap.cps.api.CpsQueryService;
-import org.onap.cps.api.model.DataNode;
-import org.onap.cps.ncmp.api.data.models.DatastoreType;
-import org.onap.cps.utils.ContentType;
-import org.onap.cps.utils.JsonObjectMapper;
-import org.springframework.stereotype.Service;
-
-@Slf4j
-@Service
-@RequiredArgsConstructor
-public class CmSubscriptionPersistenceService {
-
- private static final String NCMP_DATASPACE_NAME = "NCMP-Admin";
- private static final String CM_SUBSCRIPTIONS_ANCHOR_NAME = "cm-data-subscriptions";
-
- private static final String SUBSCRIPTION_ANCHOR_NAME = "cm-data-subscriptions";
- private static final String CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_AND_CMHANDLE = """
- /datastores/datastore[@name='%s']/cm-handles/cm-handle[@id='%s']
- """.trim();
- private static final String CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE =
- CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_AND_CMHANDLE + "/filters";
-
- private static final String CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH =
- CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE + "/filter[@xpath='%s']";
-
-
- private static final String CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_ID = """
- //filter/subscriptionIds[text()='%s']
- """.trim();
-
- private final JsonObjectMapper jsonObjectMapper;
- private final CpsQueryService cpsQueryService;
- private final CpsDataService cpsDataService;
-
- /**
- * Check if we have an ongoing cm subscription based on the parameters.
- *
- * @param datastoreType the susbcription target datastore type
- * @param cmHandleId the id of the cm handle for the susbcription
- * @param xpath the target xpath
- * @return true for ongoing cmsubscription , otherwise false
- */
- public boolean isOngoingCmSubscription(final DatastoreType datastoreType, final String cmHandleId,
- final String xpath) {
- return !getOngoingCmSubscriptionIds(datastoreType, cmHandleId, xpath).isEmpty();
- }
-
- /**
- * Check if the subscription ID is unique against ongoing subscriptions.
- *
- * @param subscriptionId subscription ID
- * @return true if subscriptionId is not used in active subscriptions, otherwise false
- */
- public boolean isUniqueSubscriptionId(final String subscriptionId) {
- return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
- CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_ID.formatted(subscriptionId), OMIT_DESCENDANTS).isEmpty();
- }
-
- /**
- * Get all ongoing cm notification subscription based on the parameters.
- *
- * @param datastoreType the susbcription target datastore type
- * @param cmHandleId the id of the cm handle for the susbcription
- * @param xpath the target xpath
- * @return collection of subscription ids of ongoing cm notification subscription
- */
- public Collection<String> getOngoingCmSubscriptionIds(final DatastoreType datastoreType,
- final String cmHandleId, final String xpath) {
-
- final String isOngoingCmSubscriptionCpsPathQuery =
- CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted(
- datastoreType.getDatastoreName(), cmHandleId, escapeQuotesByDoublingThem(xpath));
- final Collection<DataNode> existingNodes =
- cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_SUBSCRIPTIONS_ANCHOR_NAME,
- isOngoingCmSubscriptionCpsPathQuery, OMIT_DESCENDANTS);
- if (existingNodes.isEmpty()) {
- return Collections.emptyList();
- }
- return (List<String>) existingNodes.iterator().next().getLeaves().get("subscriptionIds");
- }
-
- /**
- * Add cm notification subscription.
- *
- * @param datastoreType the susbcription target datastore type
- * @param cmHandleId the id of the cm handle for the susbcription
- * @param xpath the target xpath
- * @param newSubscriptionId subscription id to be added
- */
- public void addCmSubscription(final DatastoreType datastoreType, final String cmHandleId,
- final String xpath, final String newSubscriptionId) {
- final Collection<String> subscriptionIds =
- getOngoingCmSubscriptionIds(datastoreType, cmHandleId, xpath);
- if (subscriptionIds.isEmpty()) {
- addFirstSubscriptionForDatastoreCmHandleAndXpath(datastoreType, cmHandleId, xpath, newSubscriptionId);
- } else if (!subscriptionIds.contains(newSubscriptionId)) {
- subscriptionIds.add(newSubscriptionId);
- saveSubscriptionDetails(datastoreType, cmHandleId, xpath, subscriptionIds);
- }
- }
-
- /**
- * Remove cm notification Subscription.
- *
- * @param datastoreType the susbcription target datastore type
- * @param cmHandleId the id of the cm handle for the susbcription
- * @param xpath the target xpath
- * @param subscriptionId subscription id to remove
- */
- public void removeCmSubscription(final DatastoreType datastoreType, final String cmHandleId,
- final String xpath, final String subscriptionId) {
- final Collection<String> subscriptionIds =
- getOngoingCmSubscriptionIds(datastoreType, cmHandleId, xpath);
- if (subscriptionIds.remove(subscriptionId)) {
- saveSubscriptionDetails(datastoreType, cmHandleId, xpath, subscriptionIds);
- log.info("There are subscribers left for the following cps path {} :",
- CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted(
- datastoreType.getDatastoreName(), cmHandleId, escapeQuotesByDoublingThem(xpath)));
- if (subscriptionIds.isEmpty()) {
- log.info("No subscribers left for the following cps path {} :",
- CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted(
- datastoreType.getDatastoreName(), cmHandleId, escapeQuotesByDoublingThem(xpath)));
- deleteListOfSubscriptionsFor(datastoreType, cmHandleId, xpath);
- }
- }
- }
-
- /**
- * Retrieve all existing dataNodes for given subscription id.
- *
- * @param subscriptionId subscription id
- * @return collection of DataNodes
- */
- public Collection<DataNode> getAllNodesForSubscriptionId(final String subscriptionId) {
- return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
- CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_ID.formatted(subscriptionId),
- OMIT_DESCENDANTS);
- }
-
- private void deleteListOfSubscriptionsFor(final DatastoreType datastoreType, final String cmHandleId,
- final String xpath) {
- cpsDataService.deleteDataNode(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
- CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted(
- datastoreType.getDatastoreName(), cmHandleId, escapeQuotesByDoublingThem(xpath)),
- OffsetDateTime.now());
- final Collection<DataNode> existingFiltersForCmHandle =
- cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_SUBSCRIPTIONS_ANCHOR_NAME,
- CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted(
- datastoreType.getDatastoreName(), cmHandleId),
- DIRECT_CHILDREN_ONLY).iterator().next()
- .getChildDataNodes();
- if (existingFiltersForCmHandle.isEmpty()) {
- removeCmHandleFromDatastore(datastoreType.getDatastoreName(), cmHandleId);
- }
- }
-
- private void removeCmHandleFromDatastore(final String datastoreName, final String cmHandleId) {
- cpsDataService.deleteDataNode(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
- CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_AND_CMHANDLE.formatted(datastoreName, cmHandleId),
- OffsetDateTime.now());
- }
-
- private boolean isFirstSubscriptionForCmHandle(final DatastoreType datastoreType, final String cmHandleId) {
- return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_SUBSCRIPTIONS_ANCHOR_NAME,
- CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted(
- datastoreType.getDatastoreName(), cmHandleId), OMIT_DESCENDANTS).isEmpty();
- }
-
- private void addFirstSubscriptionForDatastoreCmHandleAndXpath(final DatastoreType datastoreType,
- final String cmHandleId, final String xpath, final String subscriptionId) {
- final Collection<String> newSubscriptionList = Collections.singletonList(subscriptionId);
- final String subscriptionDetailsAsJson = getSubscriptionDetailsAsJson(xpath, newSubscriptionList);
- if (isFirstSubscriptionForCmHandle(datastoreType, cmHandleId)) {
- final String parentXpath =
- "/datastores/datastore[@name='%s']/cm-handles".formatted(datastoreType.getDatastoreName());
- final String subscriptionAsJson =
- String.format("{\"cm-handle\":[{\"id\":\"%s\",\"filters\":%s}]}", cmHandleId,
- subscriptionDetailsAsJson);
- cpsDataService.saveData(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, parentXpath, subscriptionAsJson,
- OffsetDateTime.now(), ContentType.JSON);
- } else {
- cpsDataService.saveListElements(NCMP_DATASPACE_NAME, CM_SUBSCRIPTIONS_ANCHOR_NAME,
- CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted(
- datastoreType.getDatastoreName(), cmHandleId), subscriptionDetailsAsJson,
- OffsetDateTime.now(), ContentType.JSON);
- }
- }
-
- private void saveSubscriptionDetails(final DatastoreType datastoreType, final String cmHandleId, final String xpath,
- final Collection<String> subscriptionIds) {
- final String subscriptionDetailsAsJson = getSubscriptionDetailsAsJson(xpath, subscriptionIds);
- cpsDataService.updateNodeLeaves(NCMP_DATASPACE_NAME, CM_SUBSCRIPTIONS_ANCHOR_NAME,
- CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted(
- datastoreType.getDatastoreName(), cmHandleId), subscriptionDetailsAsJson, OffsetDateTime.now(),
- ContentType.JSON);
- }
-
- private String getSubscriptionDetailsAsJson(final String xpath, final Collection<String> subscriptionIds) {
- final Map<String, Serializable> subscriptionDetailsAsMap =
- Map.of("xpath", xpath, "subscriptionIds", (Serializable) subscriptionIds);
- return "{\"filter\":[" + jsonObjectMapper.asJsonString(subscriptionDetailsAsMap) + "]}";
- }
-
- private static String escapeQuotesByDoublingThem(final String inputXpath) {
- return inputXpath.replace("'", "''");
- }
-
-}
-
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
+ * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the 'License');
* you may not use this file except in compliance with the License.
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus
import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmDataJobSubscriptionPersistenceService
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.NcmpInEvent
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
@SpringBean
InventoryPersistence mockInventoryPersistence = Mock(InventoryPersistence)
@SpringBean
- CmSubscriptionPersistenceService mockCmSubscriptionPersistenceService = Mock(CmSubscriptionPersistenceService)
+ CmDataJobSubscriptionPersistenceService mockCmSubscriptionPersistenceService = Mock(CmDataJobSubscriptionPersistenceService)
def testCache = [:]
def objectUnderTest = new DmiCacheHandler(mockCmSubscriptionPersistenceService, testCache, mockInventoryPersistence)
when: 'subscription is persisted in database'
objectUnderTest.persistIntoDatabasePerDmi(subscriptionId,'dmi-1')
then: 'persistence service is called the correct number of times per dmi'
- 4 * mockCmSubscriptionPersistenceService.addCmSubscription(_,_,_,subscriptionId)
+ 2 * mockCmSubscriptionPersistenceService.addSubscription(*_)
}
def 'Remove subscription from database per dmi'() {
when: 'subscription is persisted in database'
objectUnderTest.removeFromDatabase(subscriptionId,'dmi-1')
then: 'persistence service is called the correct number of times per dmi'
- 4 * mockCmSubscriptionPersistenceService.removeCmSubscription(_,_,_,subscriptionId)
+ 2 * mockCmSubscriptionPersistenceService.removeSubscription(*_)
}
def setUpTestEvent(){
/*
* ============LICENSE_START=======================================================
- * Copyright (c) 2024 Nordix Foundation.
+ * Copyright (c) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
*/
package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp
-
import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmDataJobSubscriptionPersistenceService
import spock.lang.Specification
import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_OPERATIONAL
class CmSubscriptionComparatorSpec extends Specification {
- def mockCmSubscriptionPersistenceService = Mock(CmSubscriptionPersistenceService)
+ def mockCmSubscriptionPersistenceService = Mock(CmDataJobSubscriptionPersistenceService)
def objectUnderTest = new CmSubscriptionComparator(mockCmSubscriptionPersistenceService)
- def 'Find Delta of given list of predicates'() {
+ def 'Find the difference based on the provided predicates'() {
given: 'A list of predicates'
def predicates = [new DmiCmSubscriptionPredicate(['ch-1', 'ch-2'].toSet(), PASSTHROUGH_OPERATIONAL, ['a/1/', 'b/2'].toSet())]
- and: '3 positive responses and 1 negative.'
- mockCmSubscriptionPersistenceService.isOngoingCmSubscription(PASSTHROUGH_OPERATIONAL, 'ch-1', 'a/1/') >>> true
- mockCmSubscriptionPersistenceService.isOngoingCmSubscription(PASSTHROUGH_OPERATIONAL, 'ch-1', 'b/2') >>> true
- mockCmSubscriptionPersistenceService.isOngoingCmSubscription(PASSTHROUGH_OPERATIONAL, 'ch-2', 'a/1/') >>> true
- mockCmSubscriptionPersistenceService.isOngoingCmSubscription(PASSTHROUGH_OPERATIONAL, 'ch-2', 'b/2') >>> false
- when: 'getDelta is called'
+ and: '1 positive and 1 negative response.'
+ mockCmSubscriptionPersistenceService.hasAtLeastOneSubscription(PASSTHROUGH_OPERATIONAL.getDatastoreName(), 'ch-1') >> true
+ mockCmSubscriptionPersistenceService.hasAtLeastOneSubscription(PASSTHROUGH_OPERATIONAL.getDatastoreName(), 'ch-2') >> false
+ when: 'method to extract only NEW predicates for dmi is called'
def result = objectUnderTest.getNewDmiSubscriptionPredicates(predicates)
- then: 'verify correct delta is returned'
+ then: 'from 2 predicates only 1 remains'
assert result.size() == 1
assert result[0].targetCmHandleIds[0] == 'ch-2'
- assert result[0].xpaths[0] == 'b/2'
-
}
- def 'Find Delta of given list of predicates when it is an ongoing Cm Subscription'() {
+ def 'Find the difference based on the provided predicates when it is an ongoing Cm Subscription'() {
given: 'A list of predicates'
def predicates = [new DmiCmSubscriptionPredicate(['ch-1'].toSet(), PASSTHROUGH_OPERATIONAL, ['a/1/'].toSet())]
and: 'its already present'
- mockCmSubscriptionPersistenceService.isOngoingCmSubscription(PASSTHROUGH_OPERATIONAL, 'ch-1', 'a/1/') >>> true
- when: 'getDelta is called'
+ mockCmSubscriptionPersistenceService.hasAtLeastOneSubscription(PASSTHROUGH_OPERATIONAL.getDatastoreName(), 'ch-1') >> true
+ when: 'method to extract only NEW predicates for dmi is called'
def result = objectUnderTest.getNewDmiSubscriptionPredicates(predicates)
- then: 'verify correct delta is returned'
+ then: 'from 1 predicate, none remains'
assert result.size() == 0
}
import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventProducer
import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails
import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmDataJobSubscriptionPersistenceService
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.NcmpInEvent
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent
class CmSubscriptionHandlerImplSpec extends Specification {
def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
- def mockCmSubscriptionPersistenceService = Mock(CmSubscriptionPersistenceService)
+ def mockCmSubscriptionPersistenceService = Mock(CmDataJobSubscriptionPersistenceService)
def mockCmSubscriptionComparator = Mock(CmSubscriptionComparator)
def mockNcmpOutEventMapper = Mock(NcmpOutEventMapper)
def mockDmiInEventMapper = Mock(DmiInEventMapper)
def testDmiSubscriptionsPerDmi = ["dmi-1": new DmiCmSubscriptionDetails([], PENDING)]
def 'Consume valid and unique CmNotificationSubscriptionNcmpInEvent create message'() {
- given: 'a cmNotificationSubscriptionNcmp in event with unique subscription id'
+ given: 'a cmNotificationSubscriptionNcmp in event with new subscription id'
def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent.class)
def testListOfDeltaPredicates = [new DmiCmSubscriptionPredicate(['ch1'].toSet(), PASSTHROUGH_OPERATIONAL, ['/a/b'].toSet())]
- mockCmSubscriptionPersistenceService.isUniqueSubscriptionId("test-id") >> true
+ and: 'the persistence service confirms subscription id is new (not used for other subscription)'
+ mockCmSubscriptionPersistenceService.isNewSubscriptionId('test-id') >> true
and: 'relevant details is extracted from the event'
def subscriptionId = testEventConsumed.getData().getSubscriptionId()
def predicates = testEventConsumed.getData().getPredicates()
def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent.class)
def noDeltaPredicates = []
- mockCmSubscriptionPersistenceService.isUniqueSubscriptionId("test-id") >> true
+ and: 'the persistence service confirms subscription id is new (not used for other subscription)'
+ mockCmSubscriptionPersistenceService.isNewSubscriptionId('test-id') >> true
and: 'the cache handler returns for relevant subscription id'
1 * mockDmiCacheHandler.get('test-id') >> testDmiSubscriptionsPerDmi
and: 'the delta predicates is returned'
given: 'a cmNotificationSubscriptionNcmp in event'
def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent.class)
- mockCmSubscriptionPersistenceService.isUniqueSubscriptionId('test-id') >> false
+ and: 'the persistence service confirms subscription id is not new (already used subscription)'
+ mockCmSubscriptionPersistenceService.isNewSubscriptionId('test-id') >> false
and: 'relevant details is extracted from the event'
def subscriptionId = testEventConsumed.getData().getSubscriptionId()
def predicates = testEventConsumed.getData().getPredicates()
given: 'a test subscription id'
def subscriptionId = 'test-id'
and: 'the persistence service returns datanodes'
- 1 * mockCmSubscriptionPersistenceService.getAllNodesForSubscriptionId(subscriptionId) >>
+ 1 * mockCmSubscriptionPersistenceService.getAffectedDataNodes(subscriptionId) >>
[new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-1']/filters/filter[@xpath='x/y']", leaves: ['xpath': 'x/y', 'subscriptionIds': ['test-id']]),
- new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-2']/filters/filter[@xpath='y/z']", leaves: ['xpath': 'y/z', 'subscriptionIds': ['test-id']])]
+ new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-2']/filters/filter[@xpath='y/z']", leaves: ['xpath': 'y/z', 'subscriptionIds': ['test-id']])]
and: 'the inventory persistence returns yang model cm handles'
1 * mockInventoryPersistence.getYangModelCmHandle('ch-1') >> new YangModelCmHandle(dmiServiceName: 'dmi-1')
1 * mockInventoryPersistence.getYangModelCmHandle('ch-2') >> new YangModelCmHandle(dmiServiceName: 'dmi-2')
given: 'a test subscription id'
def subscriptionId = 'test-id'
and: 'the persistence service returns datanodes with multiple subscribers'
- 1 * mockCmSubscriptionPersistenceService.getAllNodesForSubscriptionId(subscriptionId) >>
+ 1 * mockCmSubscriptionPersistenceService.getAffectedDataNodes(subscriptionId) >>
[new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-1']/filters/filter[@xpath='x/y']", leaves: ['xpath': 'x/y', 'subscriptionIds': ['test-id','other-id']]),
new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-2']/filters/filter[@xpath='y/z']", leaves: ['xpath': 'y/z', 'subscriptionIds': ['test-id','other-id']])]
and: 'the inventory persistence returns yang model cm handles'
/*
* ============LICENSE_START=======================================================
- * Copyright (c) 2024 Nordix Foundation.
+ * Copyright (c) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
* Modifications Copyright (C) 2024 TechMahindra Ltd.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
package org.onap.cps.ncmp.impl.cmnotificationsubscription.utils
+import static CmDataJobSubscriptionPersistenceService.CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH
+import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE
+import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID
+import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS
+
import com.fasterxml.jackson.databind.ObjectMapper
import org.onap.cps.api.CpsDataService
import org.onap.cps.api.CpsQueryService
import org.onap.cps.utils.JsonObjectMapper
import spock.lang.Specification
-import static CmSubscriptionPersistenceService.CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE
-import static CmSubscriptionPersistenceService.CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH
-import static CmSubscriptionPersistenceService.CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_ID
-import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_OPERATIONAL
-import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_RUNNING
-import static org.onap.cps.api.parameters.FetchDescendantsOption.DIRECT_CHILDREN_ONLY
-import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS
-
class CmSubscriptionPersistenceServiceSpec extends Specification {
def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
def mockCpsQueryService = Mock(CpsQueryService)
def mockCpsDataService = Mock(CpsDataService)
- def objectUnderTest = new CmSubscriptionPersistenceService(jsonObjectMapper, mockCpsQueryService, mockCpsDataService)
+ def objectUnderTest = new CmDataJobSubscriptionPersistenceService(jsonObjectMapper, mockCpsQueryService, mockCpsDataService)
- def 'Check ongoing cm subscription #scenario'() {
- given: 'a valid cm subscription query'
- def cpsPathQuery = "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-1']/filters/filter[@xpath='/cps/path']"
+ def 'Check cm data job subscription details has at least one subscriber #scenario'() {
+ given: 'a valid cm data job subscription query'
+ def cpsPathQuery = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('altId1', 'dataType1')
and: 'datanodes optionally returned'
- 1 * mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
- cpsPathQuery, OMIT_DESCENDANTS) >> dataNode
- when: 'we check for an ongoing cm subscription'
- def response = objectUnderTest.isOngoingCmSubscription(PASSTHROUGH_RUNNING, 'ch-1', '/cps/path')
- then: 'we get expected response'
- assert response == isOngoingCmSubscription
+ 1 * mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', cpsPathQuery, OMIT_DESCENDANTS) >> dataNode
+ when: 'we check if subscription details already has at least one subscriber'
+ def result = objectUnderTest.hasAtLeastOneSubscription('dataType1', 'altId1')
+ then: 'we get expected result'
+ assert result == hasAtLeastOneSubscription
where: 'following scenarios are used'
- scenario | dataNode || isOngoingCmSubscription
- 'valid datanodes present' | [new DataNode(xpath: '/cps/path', leaves: ['subscriptionIds': ['sub-1', 'sub-2']])] || true
- 'no datanodes present' | [] || false
+ scenario | dataNode || hasAtLeastOneSubscription
+ 'valid datanodes present' | [new DataNode(leaves: ['dataJobId': ['dataJobId1']])]|| true
+ 'no datanodes present' | [] || false
}
def 'Checking uniqueness of incoming subscription ID'() {
- given: 'a cps path with a subscription ID for querying'
- def cpsPathQuery = CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_ID.formatted('some-sub')
- and: 'relevant datanodes are returned'
- 1 * mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions', cpsPathQuery, OMIT_DESCENDANTS) >>
- dataNodes
- when: 'a subscription ID is tested for uniqueness'
- def result = objectUnderTest.isUniqueSubscriptionId('some-sub')
+ given: 'a cps path with a data job subscription ID for querying'
+ def cpsPathQuery = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID.formatted('mySubId')
+ and: 'collection of data nodes are returned'
+ 1 * mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', cpsPathQuery, OMIT_DESCENDANTS) >> dataNodes
+ when: 'a data job subscription id is tested for uniqueness'
+ def result = objectUnderTest.isNewSubscriptionId('mySubId')
then: 'result is as expected'
- assert result == isValidSubscriptionId
+ assert result == isValidDataJobSubscriptionId
where: 'following scenarios are used'
- scenario | dataNodes || isValidSubscriptionId
+ scenario | dataNodes || isValidDataJobSubscriptionId
'datanodes present' | [new DataNode()] || false
'no datanodes present' | [] || true
}
- def 'Add new subscriber to an ongoing cm notification subscription'() {
- given: 'a valid cm subscription path query'
- def cpsPathQuery = CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted('ncmp-datastore:passthrough-running', 'ch-1', '/x/y')
- and: 'a dataNode exists for the given cps path query'
- mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
- cpsPathQuery, OMIT_DESCENDANTS) >> [new DataNode(xpath: cpsPathQuery, leaves: ['xpath': '/x/y', 'subscriptionIds': ['sub-1']])]
- when: 'the method to add/update cm notification subscription is called'
- objectUnderTest.addCmSubscription(PASSTHROUGH_RUNNING, 'ch-1', '/x/y', 'newSubId')
- then: 'data service method to update list of subscribers is called once'
- 1 * mockCpsDataService.updateNodeLeaves(
- 'NCMP-Admin',
- 'cm-data-subscriptions',
- '/datastores/datastore[@name=\'ncmp-datastore:passthrough-running\']/cm-handles/cm-handle[@id=\'ch-1\']/filters',
- objectUnderTest.getSubscriptionDetailsAsJson('/x/y', ['sub-1', 'newSubId']), _, ContentType.JSON)
+ def 'Get all nodes for subscription id'() {
+ given: 'the query service returns nodes for subscription id'
+ def expectedDataNode = new DataNode(leaves: ['datajobId': ['id1'], 'dataTypeId': 'dataType1', 'alternateId': 'altId1'])
+ def queryServiceResponse = [expectedDataNode].asCollection()
+ def cmDataJobSubscriptionIdCpsPath = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID.formatted('mySubId')
+ 1 * mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', cmDataJobSubscriptionIdCpsPath, OMIT_DESCENDANTS) >> queryServiceResponse
+ when: 'retrieving all nodes for data job subscription id'
+ def result = objectUnderTest.getAffectedDataNodes('mySubId')
+ then: 'the result returns correct number of datanodes'
+ assert result.size() == 1
+ and: 'the attribute of the data nodes is as expected'
+ assert result.iterator().next().leaves.alternateId == expectedDataNode.leaves.alternateId
+ assert result.iterator().next().leaves.dataTypeId == expectedDataNode.leaves.dataTypeId
}
- def 'Add new cm notification subscription for #datastoreType'() {
- given: 'a valid cm subscription path query'
- def cmSubscriptionCpsPathQuery = CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted(datastoreName, 'ch-1', '/x/y')
- def cmHandleForSubscriptionPathQuery = CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted(datastoreName, 'ch-1')
- and: 'a parent node xpath for the cm subscription path above'
- def parentNodeXpath = '/datastores/datastore[@name=\'%s\']/cm-handles'
- and: 'a datanode does not exist for cm subscription path query'
- mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
- cmSubscriptionCpsPathQuery,
- OMIT_DESCENDANTS) >> []
- and: 'a datanode does not exist for the given cm handle subscription path query'
- mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
- cmHandleForSubscriptionPathQuery, OMIT_DESCENDANTS) >> []
- and: 'subscription is mapped as JSON'
- def subscriptionAsJson = '{"cm-handle":[{"id":"ch-1","filters":' +
- objectUnderTest.getSubscriptionDetailsAsJson('/x/y', ['newSubId']) + '}]}'
+ def 'Add subscription for a data type and and fdn that have no subscriptions yet.'() {
+ given: 'a valid cm data job subscription path query'
+ def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('altId1', 'dataType1')
+ and: 'a data node does not exist for cm data job subscription path query'
+ mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', query, OMIT_DESCENDANTS) >> []
+ and: 'a datanode does not exist for the given cm data job subscription path query'
+ mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions', query, OMIT_DESCENDANTS) >> []
+ and: 'data job subscription details is mapped as JSON'
+ def subscriptionIds = ['newSubId']
+ def subscriptionAsJson = objectUnderTest.getSubscriptionDetailsAsJson(subscriptionIds, 'dataType1', 'altId1')
when: 'the method to add/update cm notification subscription is called'
- objectUnderTest.addCmSubscription(datastoreType, 'ch-1', '/x/y', 'newSubId')
+ objectUnderTest.addSubscription('dataType1', 'altId1', 'newSubId')
then: 'data service method to create new subscription for given subscriber is called once with the correct parameters'
- 1 * mockCpsDataService.saveData(
- 'NCMP-Admin',
- 'cm-data-subscriptions',
- parentNodeXpath.formatted(datastoreName),
- subscriptionAsJson, _, ContentType.JSON)
- where:
- scenario | datastoreType || datastoreName
- 'passthrough_running' | PASSTHROUGH_RUNNING || 'ncmp-datastore:passthrough-running'
- 'passthrough_operational' | PASSTHROUGH_OPERATIONAL || 'ncmp-datastore:passthrough-operational'
+ 1 * mockCpsDataService.saveData('NCMP-Admin', 'cm-data-job-subscriptions', subscriptionAsJson, _, ContentType.JSON)
}
- def 'Add new cm notification subscription when xpath does not exist for existing subscription cm handle'() {
+ def 'Add subscription for a data type and fdn that already have subscription(s).'() {
given: 'a valid cm subscription path query'
- def cmSubscriptionCpsPathQuery = CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted(datastoreName, 'ch-1', '/x/y')
- def cmHandleForSubscriptionPathQuery = CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted(datastoreName, 'ch-1')
- and: 'a parent node xpath for given cm handle for subscription path above'
- def parentNodeXpath = '/datastores/datastore[@name=\'%s\']/cm-handles/cm-handle[@id=\'%s\']/filters'
- and: 'a datanode does not exist for cm subscription path query'
- mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
- cmSubscriptionCpsPathQuery, OMIT_DESCENDANTS) >> []
- and: 'a datanode exists for the given cm handle subscription path query'
- mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
- cmHandleForSubscriptionPathQuery, OMIT_DESCENDANTS) >> [new DataNode()]
+ def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('altId1', 'dataType1')
+ and: 'a dataNode exists for the given cps path query'
+ mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', query, OMIT_DESCENDANTS) >> [new DataNode(leaves: ['dataJobId': ['existingId'], 'dataTypeId': 'dataType1', 'alternateId': 'altId1'])]
+ and: 'updated cm data job subscription details as json'
+ def newListOfSubscriptionIds = ['existingId', 'newSubId']
+ def subscriptionDetailsAsJson = objectUnderTest.getSubscriptionDetailsAsJson(newListOfSubscriptionIds, 'dataType1', 'altId1')
when: 'the method to add/update cm notification subscription is called'
- objectUnderTest.addCmSubscription(datastoreType, 'ch-1', '/x/y', 'newSubId')
- then: 'data service method to create new subscription for given subscriber is called once with the correct parameters'
- 1 * mockCpsDataService.saveListElements(
- 'NCMP-Admin',
- 'cm-data-subscriptions',
- parentNodeXpath.formatted(datastoreName, 'ch-1'),
- objectUnderTest.getSubscriptionDetailsAsJson('/x/y', ['newSubId']), _, ContentType.JSON)
- where:
- scenario | datastoreType || datastoreName
- 'passthrough_running' | PASSTHROUGH_RUNNING || 'ncmp-datastore:passthrough-running'
- 'passthrough_operational' | PASSTHROUGH_OPERATIONAL || 'ncmp-datastore:passthrough-operational'
+ objectUnderTest.addSubscription('dataType1', 'altId1', 'newSubId')
+ then: 'data service method to update list of subscribers is called once'
+ 1 * mockCpsDataService.updateNodeLeaves('NCMP-Admin', 'cm-data-job-subscriptions', CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH, subscriptionDetailsAsJson, _, ContentType.JSON)
}
- def 'Remove subscriber from a list of an ongoing cm notification subscription'() {
+ def 'Remove subscription (other subscriptions remain for same data type and target).'() {
given: 'a subscription exists when queried'
- def cpsPathQuery = CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted('ncmp-datastore:passthrough-running', 'ch-1', '/x/y')
- mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
- cpsPathQuery, OMIT_DESCENDANTS) >> [new DataNode(xpath: cpsPathQuery, leaves: ['xpath': '/x/y', 'subscriptionIds': ['sub-1', 'sub-2']])]
+ def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('altId1', 'dataType1')
+ mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', query, OMIT_DESCENDANTS)
+ >> [new DataNode(leaves: ['dataJobId': ['existingId','subIdToRemove'], 'dataTypeId': 'dataType1', 'alternateId': 'altId1'])]
+ and: 'updated cm data job subscription details as json'
+ def subscriptionDetailsAsJson = objectUnderTest.getSubscriptionDetailsAsJson(['existingId'], 'dataType1', 'altId1')
when: 'the subscriber is removed'
- objectUnderTest.removeCmSubscription(PASSTHROUGH_RUNNING, 'ch-1', '/x/y', 'sub-1')
+ objectUnderTest.removeSubscription('dataType1', 'altId1','subIdToRemove')
then: 'the list of subscribers is updated'
- 1 * mockCpsDataService.updateNodeLeaves('NCMP-Admin', 'cm-data-subscriptions',
- '/datastores/datastore[@name=\'ncmp-datastore:passthrough-running\']/cm-handles/cm-handle[@id=\'ch-1\']/filters',
- objectUnderTest.getSubscriptionDetailsAsJson('/x/y', ['sub-2']), _, ContentType.JSON)
+ 1 * mockCpsDataService.updateNodeLeaves('NCMP-Admin', 'cm-data-job-subscriptions', CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH, subscriptionDetailsAsJson, _, ContentType.JSON)
}
- def 'Removing last ongoing subscription for datastore and cmhandle and xpath'() {
+ def 'Remove last subscription (no subscriptions remain for same data type and target).'() {
given: 'a subscription exists when queried but has only 1 subscriber'
- mockCpsQueryService.queryDataNodes(
- 'NCMP-Admin', 'cm-data-subscriptions',
- CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted('ncmp-datastore:passthrough-running', 'ch-1', '/x/y'),
- OMIT_DESCENDANTS) >> [new DataNode(leaves: ['xpath': '/x/y', 'subscriptionIds': ['sub-1']])]
- and: 'the #scenario'
- mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
- CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted('ncmp-datastore:passthrough-running', 'ch-1'),
- DIRECT_CHILDREN_ONLY) >> [new DataNode(childDataNodes: listOfChildNodes)]
+ def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('last-alt-id', 'last-data-type')
+ mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', query, OMIT_DESCENDANTS)
+ >> [new DataNode(leaves: ['dataJobId': ['subIdToRemove'], 'dataTypeId': 'last-data-type', 'alternateId': 'last-alt-id'])]
+ and: 'a cps path with alternate id and data type for deleting a node'
+ def cpsPath = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('last-alt-id', 'last-data-type')
when: 'that last ongoing subscription is removed'
- objectUnderTest.removeCmSubscription(PASSTHROUGH_RUNNING, 'ch-1', '/x/y', 'sub-1')
- then: 'the subscription with empty subscriber list is removed'
- 1 * mockCpsDataService.deleteDataNode('NCMP-Admin', 'cm-data-subscriptions',
- '/datastores/datastore[@name=\'ncmp-datastore:passthrough-running\']/cm-handles/cm-handle[@id=\'ch-1\']/filters/filter[@xpath=\'/x/y\']',
- _)
- and: 'method call to delete the cm handle is called the correct number of times'
- numberOfCallsToDeleteCmHandle * mockCpsDataService.deleteDataNode('NCMP-Admin', 'cm-data-subscriptions',
- '/datastores/datastore[@name=\'ncmp-datastore:passthrough-running\']/cm-handles/cm-handle[@id=\'ch-1\']',
- _)
- where:
- scenario | listOfChildNodes || numberOfCallsToDeleteCmHandle
- 'cm handle in same datastore is used for other subscriptions' | [new DataNode()] || 0
- 'cm handle in same datastore is NOT used for other subscriptions' | [] || 1
+ objectUnderTest.removeSubscription('last-data-type', 'last-alt-id','subIdToRemove')
+ then: 'the data job subscription with empty subscribers list is removed'
+ 1 * mockCpsDataService.deleteDataNode('NCMP-Admin', 'cm-data-job-subscriptions', cpsPath, _)
}
- def 'Get all nodes for subscription id'() {
- given: 'the query service returns nodes for subscription id'
- def expectedDataNode = new DataNode(xpath: '/some/xpath')
- def queryServiceResponse = [expectedDataNode].asCollection()
- 1 * mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions', '//filter/subscriptionIds[text()=\'some-id\']', OMIT_DESCENDANTS) >> queryServiceResponse
- when: 'retrieving all nodes for subscription id'
- def result = objectUnderTest.getAllNodesForSubscriptionId('some-id')
- then: 'the result returns correct number of datanodes'
- assert result.size() == 1
- and: 'the attribute of the datanode is as expected'
- assert result.iterator().next().xpath == expectedDataNode.xpath
+ def 'Attempt to remove non existing subscription (id).'() {
+ given: 'a subscription exists when queried with other subscriber'
+ def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('some-alt-id', 'some-data-type')
+ mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', query, OMIT_DESCENDANTS) >> [new DataNode(leaves: ['dataJobId': ['otherDataJobId']])]
+ when: 'the remove subscription method is with a non existing id'
+ objectUnderTest.removeSubscription('some-data-type', 'some-alt-id','nonExistingSubId')
+ then: 'no calls to cps data service is made'
+ 0 * mockCpsDataService.deleteDataNode(*_)
+ and: 'removal of non existent subscription id silently ignored with no exception thrown'
+ noExceptionThrown()
}
}