private final InventoryPersistence inventoryPersistence;
/**
- * Adds new subscription to the subscription cache.
+ * Adds subscription to the subscription cache.
*
* @param subscriptionId subscription id
* @param predicates subscription request predicates
cmNotificationSubscriptionCache.put(subscriptionId, createDmiSubscriptionsPerDmi(predicates));
}
+ /**
+ * Adds subscription to the subscription cache.
+ *
+ * @param subscriptionId subscription id
+ * @param dmiCmSubscriptionDetailsPerDmi map of dmi cm notification subscription details per dmi
+ */
+ public void add(final String subscriptionId,
+ final Map<String, DmiCmSubscriptionDetails>
+ dmiCmSubscriptionDetailsPerDmi) {
+ cmNotificationSubscriptionCache.put(subscriptionId, dmiCmSubscriptionDetailsPerDmi);
+ }
+
/**
* Get cm notification subscription cache entry via subscription id.
*
* @param status String of status
*
*/
- public void updateDmiSubscriptionStatusPerDmi(final String subscriptionId, final String dmiServiceName,
- final CmSubscriptionStatus status) {
+ public void updateDmiSubscriptionStatus(final String subscriptionId, final String dmiServiceName,
+ final CmSubscriptionStatus status) {
final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
cmNotificationSubscriptionCache.get(subscriptionId);
dmiSubscriptionsPerDmi.get(dmiServiceName).setCmSubscriptionStatus(status);
* @param dmiServiceName String of dmiServiceName
*
*/
- public void removeFromDatabasePerDmi(final String subscriptionId, final String dmiServiceName) {
+ public void removeFromDatabase(final String subscriptionId, final String dmiServiceName) {
final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates =
cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName)
.getDmiCmSubscriptionPredicates();
private boolean isAcceptedOrRejected(final DmiCmSubscriptionDetails dmiCmSubscription) {
return dmiCmSubscription.getCmSubscriptionStatus().toString().equals("ACCEPTED")
- || dmiCmSubscription.getCmSubscriptionStatus().toString().equals("REJECTED");
+ || dmiCmSubscription.getCmSubscriptionStatus().toString().equals("REJECTED");
}
}
dmiCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName);
}
if (eventType.equals("subscriptionDeleteResponse")) {
- dmiCacheHandler.removeFromDatabasePerDmi(subscriptionId, dmiPluginName);
+ dmiCacheHandler.removeFromDatabase(subscriptionId, dmiPluginName);
}
handleEventsStatusPerDmi(subscriptionId, eventType);
}
private void handleCacheStatusPerDmi(final String subscriptionId, final String dmiPluginName,
final CmSubscriptionStatus cmSubscriptionStatus) {
- dmiCacheHandler.updateDmiSubscriptionStatusPerDmi(subscriptionId, dmiPluginName,
+ dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiPluginName,
cmSubscriptionStatus);
}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.models;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Tuple to be used during for to delete usecase.
+ *
+ * @param lastRemainingSubscriptionsPerDmi subscriptions that are used by only one subscriber grouped per dmi
+ * @param overlappingSubscriptionsPerDmi subscriptions that are shared by multiple subscribers grouped per dmi
+ */
+public record DmiCmSubscriptionTuple(Map<String, Collection<DmiCmSubscriptionKey>> lastRemainingSubscriptionsPerDmi,
+ Map<String, Collection<DmiCmSubscriptionKey>> overlappingSubscriptionsPerDmi) {
+}
* Process cm notification subscription delete request.
*
* @param subscriptionId subscription id
- * @param predicates subscription predicates
*/
- void processSubscriptionDeleteRequest(final String subscriptionId, final List<Predicate> predicates);
+ void processSubscriptionDeleteRequest(final String subscriptionId);
}
\ No newline at end of file
package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiCmSubscriptionDetailsPerDmiMapper;
import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventMapper;
import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventProducer;
import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus;
import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionKey;
import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionTuple;
import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService;
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate;
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent;
+import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
+import org.onap.cps.spi.model.DataNode;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler {
+ private static final Pattern SUBSCRIPTION_KEY_FROM_XPATH_PATTERN = Pattern.compile(
+ "^/datastores/datastore\\[@name='([^']*)']/cm-handles/cm-handle\\[@id='([^']*)']/"
+ + "filters/filter\\[@xpath='(.*)']$");
+
private final CmSubscriptionPersistenceService cmSubscriptionPersistenceService;
private final CmSubscriptionComparator cmSubscriptionComparator;
private final NcmpOutEventMapper ncmpOutEventMapper;
private final DmiInEventMapper dmiInEventMapper;
+ private final DmiCmSubscriptionDetailsPerDmiMapper dmiCmSubscriptionDetailsPerDmiMapper;
private final NcmpOutEventProducer ncmpOutEventProducer;
private final DmiInEventProducer dmiInEventProducer;
private final DmiCacheHandler dmiCacheHandler;
+ private final InventoryPersistence inventoryPersistence;
@Override
public void processSubscriptionCreateRequest(final String subscriptionId, final List<Predicate> predicates) {
}
@Override
- public void processSubscriptionDeleteRequest(final String subscriptionId, final List<Predicate> predicates) {
- dmiCacheHandler.add(subscriptionId, predicates);
- sendSubscriptionDeleteRequestToDmi(subscriptionId);
- scheduleNcmpOutEventResponse(subscriptionId, "subscriptionDeleteResponse");
+ public void processSubscriptionDeleteRequest(final String subscriptionId) {
+ final Collection<DataNode> subscriptionDataNodes =
+ cmSubscriptionPersistenceService.getAllNodesForSubscriptionId(subscriptionId);
+ final DmiCmSubscriptionTuple dmiCmSubscriptionTuple =
+ getLastRemainingAndOverlappingSubscriptionsPerDmi(subscriptionDataNodes);
+ dmiCacheHandler.add(subscriptionId, mergeDmiCmSubscriptionDetailsPerDmiMaps(dmiCmSubscriptionTuple));
+ if (dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi().isEmpty()) {
+ acceptAndPublishDeleteRequest(subscriptionId);
+ } else {
+ sendSubscriptionDeleteRequestToDmi(subscriptionId,
+ dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi(
+ dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi()));
+ scheduleNcmpOutEventResponse(subscriptionId, "subscriptionDeleteResponse");
+ }
+ }
+
+ private Map<String, DmiCmSubscriptionDetails> mergeDmiCmSubscriptionDetailsPerDmiMaps(
+ final DmiCmSubscriptionTuple dmiCmSubscriptionTuple) {
+ final Map<String, DmiCmSubscriptionDetails> lastRemainingDmiSubscriptionsPerDmi =
+ dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi(
+ dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi());
+ final Map<String, DmiCmSubscriptionDetails> overlappingDmiSubscriptionsPerDmi =
+ dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi(
+ dmiCmSubscriptionTuple.overlappingSubscriptionsPerDmi());
+ final Map<String, DmiCmSubscriptionDetails> mergedDmiSubscriptionsPerDmi =
+ new HashMap<>(lastRemainingDmiSubscriptionsPerDmi);
+ overlappingDmiSubscriptionsPerDmi.forEach((dmiServiceName, dmiCmSubscriptionDetails) ->
+ mergedDmiSubscriptionsPerDmi.merge(dmiServiceName, dmiCmSubscriptionDetails,
+ this::mergeDmiCmSubscriptionDetails));
+ return mergedDmiSubscriptionsPerDmi;
+ }
+
+ private DmiCmSubscriptionDetails mergeDmiCmSubscriptionDetails(
+ final DmiCmSubscriptionDetails dmiCmSubscriptionDetails,
+ final DmiCmSubscriptionDetails otherDmiCmSubscriptionDetails) {
+ final List<DmiCmSubscriptionPredicate> mergedDmiCmSubscriptionPredicates =
+ new ArrayList<>(dmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates());
+ mergedDmiCmSubscriptionPredicates.addAll(otherDmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates());
+ return new DmiCmSubscriptionDetails(mergedDmiCmSubscriptionPredicates, CmSubscriptionStatus.PENDING);
}
private void scheduleNcmpOutEventResponse(final String subscriptionId, final String eventType) {
ncmpOutEventProducer.publishNcmpOutEvent(subscriptionId, "subscriptionCreateResponse", ncmpOutEvent, false);
}
+ private void acceptAndPublishDeleteRequest(final String subscriptionId) {
+ final Set<String> dmiServiceNames = dmiCacheHandler.get(subscriptionId).keySet();
+ for (final String dmiServiceName : dmiServiceNames) {
+ dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiServiceName,
+ CmSubscriptionStatus.ACCEPTED);
+ dmiCacheHandler.removeFromDatabase(subscriptionId, dmiServiceName);
+ }
+ final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEvent(subscriptionId,
+ dmiCacheHandler.get(subscriptionId));
+ ncmpOutEventProducer.publishNcmpOutEvent(subscriptionId, "subscriptionDeleteResponse", ncmpOutEvent,
+ false);
+ }
+
private void handleNewCmSubscription(final String subscriptionId) {
final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
dmiCacheHandler.get(subscriptionId);
}
private void publishDmiInEventPerDmi(final String subscriptionId, final String dmiPluginName,
- final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
+ final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
final DmiInEvent dmiInEvent = dmiInEventMapper.toDmiInEvent(dmiCmSubscriptionPredicates);
dmiInEventProducer.publishDmiInEvent(subscriptionId, dmiPluginName,
"subscriptionCreateRequest", dmiInEvent);
}
private void acceptAndPublishNcmpOutEventPerDmi(final String subscriptionId, final String dmiPluginName) {
- dmiCacheHandler.updateDmiSubscriptionStatusPerDmi(subscriptionId, dmiPluginName,
+ dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiPluginName,
CmSubscriptionStatus.ACCEPTED);
dmiCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName);
}
- private void sendSubscriptionDeleteRequestToDmi(final String subscriptionId) {
- final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
- dmiCacheHandler.get(subscriptionId);
- dmiSubscriptionsPerDmi.forEach((dmiPluginName, dmiSubscriptionDetails) -> {
- final DmiInEvent dmiInEvent = dmiInEventMapper.toDmiInEvent(
- dmiSubscriptionDetails.getDmiCmSubscriptionPredicates());
- dmiInEventProducer.publishDmiInEvent(subscriptionId, dmiPluginName,
- "subscriptionDeleteRequest", dmiInEvent);
+ private void sendSubscriptionDeleteRequestToDmi(final String subscriptionId,
+ final Map<String, DmiCmSubscriptionDetails>
+ dmiCmSubscriptionsPerDmi) {
+ dmiCmSubscriptionsPerDmi.forEach((dmiPluginName, dmiCmSubscriptionDetails) -> {
+ final DmiInEvent dmiInEvent =
+ dmiInEventMapper.toDmiInEvent(
+ dmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates());
+ dmiInEventProducer.publishDmiInEvent(subscriptionId,
+ dmiPluginName, "subscriptionDeleteRequest", dmiInEvent);
});
}
+
+
+ private DmiCmSubscriptionTuple getLastRemainingAndOverlappingSubscriptionsPerDmi(
+ final Collection<DataNode> subscriptionNodes) {
+ final Map<String, Collection<DmiCmSubscriptionKey>> lastRemainingSubscriptionsPerDmi = new HashMap<>();
+ final Map<String, Collection<DmiCmSubscriptionKey>> overlappingSubscriptionsPerDmi = new HashMap<>();
+
+ for (final DataNode subscriptionNode : subscriptionNodes) {
+ final DmiCmSubscriptionKey dmiCmSubscriptionKey = extractCmSubscriptionKey(subscriptionNode.getXpath());
+ final String dmiServiceName = inventoryPersistence.getYangModelCmHandle(
+ dmiCmSubscriptionKey.cmHandleId()).getDmiServiceName();
+ final List<String> subscribers = (List<String>) subscriptionNode.getLeaves().get("subscriptionIds");
+ populateDmiCmSubscriptionTuple(subscribers, overlappingSubscriptionsPerDmi,
+ lastRemainingSubscriptionsPerDmi, dmiServiceName, dmiCmSubscriptionKey);
+ }
+ return new DmiCmSubscriptionTuple(lastRemainingSubscriptionsPerDmi, overlappingSubscriptionsPerDmi);
+ }
+
+ private static void populateDmiCmSubscriptionTuple(final List<String> subscribers,
+ final Map<String, Collection<DmiCmSubscriptionKey>>
+ overlappingSubscriptionsPerDmi,
+ final Map<String, Collection<DmiCmSubscriptionKey>>
+ lastRemainingSubscriptionsPerDmi,
+ final String dmiServiceName,
+ final DmiCmSubscriptionKey dmiCmSubscriptionKey) {
+ final Map<String, Collection<DmiCmSubscriptionKey>> targetMap =
+ subscribers.size() > 1 ? overlappingSubscriptionsPerDmi : lastRemainingSubscriptionsPerDmi;
+ targetMap.computeIfAbsent(dmiServiceName, dmiName -> new HashSet<>()).add(dmiCmSubscriptionKey);
+ }
+
+ private DmiCmSubscriptionKey extractCmSubscriptionKey(final String xpath) {
+ final Matcher matcher = SUBSCRIPTION_KEY_FROM_XPATH_PATTERN.matcher(xpath);
+ if (matcher.find()) {
+ final String datastoreName = matcher.group(1);
+ final String cmHandleId = matcher.group(2);
+ final String filterXpath = matcher.group(3);
+ return new DmiCmSubscriptionKey(datastoreName, cmHandleId, filterXpath);
+ }
+ throw new IllegalArgumentException("DataNode xpath does not represent a subscription key");
+ }
+
}
\ No newline at end of file
if ("subscriptionDeleteRequest".equals(cloudEvent.getType())) {
log.info("Subscription delete request for source {} with subscription id {} ...",
cloudEvent.getSource(), subscriptionId);
- cmSubscriptionHandler.processSubscriptionDeleteRequest(subscriptionId, predicates);
+ cmSubscriptionHandler.processSubscriptionDeleteRequest(subscriptionId);
}
}
}
}
}
+ /**
+ * Retrieve all existing dataNodes for given subscription id.
+ *
+ * @param subscriptionId subscription id
+ * @return collection of DataNodes
+ */
+ public Collection<DataNode> getAllNodesForSubscriptionId(final String subscriptionId) {
+ return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
+ CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_ID.formatted(subscriptionId),
+ OMIT_DESCENDANTS);
+ }
+
private void deleteListOfSubscriptionsFor(final DatastoreType datastoreType, final String cmHandleId,
final String xpath) {
cpsDataService.deleteDataNode(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
initialiseMockInventoryPersistenceResponses()
}
- def 'Load CM subscription event to cache'() {
- given: 'a valid subscription event with Id'
+ def 'Load CM subscription event to cache with predicates'() {
+ given: 'a subscription event with id'
def subscriptionId = ncmpInEvent.getData().getSubscriptionId()
and: 'list of predicates'
def predicates = ncmpInEvent.getData().getPredicates()
- when: 'a valid event object loaded in cache'
+ when: 'subscription is loaded to cache with predicates'
objectUnderTest.add(subscriptionId, predicates)
- then: 'the cache contains the correct entry with #subscriptionId subscription ID'
+ then: 'the number of entries in cache is correct'
+ assert testCache.size() == 1
+ and: 'the cache contains the correct entries'
assert testCache.containsKey(subscriptionId)
}
+ def 'Load CM subscription event to cache with dmi subscription details per dmi'() {
+ given: 'a subscription event with id'
+ def subscriptionId = ncmpInEvent.getData().getSubscriptionId()
+ and: 'dmi subscription details per dmi'
+ def dmiSubscriptionsPerDmi = [:]
+ when: 'subscription is loaded to cache with dmi subscription details per dmi'
+ objectUnderTest.add(subscriptionId, dmiSubscriptionsPerDmi)
+ then: 'the number of entries in cache is correct'
+ assert testCache.size() == 1
+ and: 'the cache contains the correct entries'
+ assert testCache.containsKey(subscriptionId)
+ and: 'the entry for the subscription ID matches the provided DMI subscription details'
+ assert testCache.get(subscriptionId) == dmiSubscriptionsPerDmi
+ }
+
def 'Get cache entry via subscription id'() {
given: 'the cache contains value for some-id'
testCache.put('some-id',[:])
def subscriptionId = ncmpInEvent.getData().getSubscriptionId()
objectUnderTest.add(subscriptionId, predicates)
when: 'subscription status per dmi is updated in cache'
- objectUnderTest.updateDmiSubscriptionStatusPerDmi(subscriptionId,'dmi-1', CmSubscriptionStatus.ACCEPTED)
+ objectUnderTest.updateDmiSubscriptionStatus(subscriptionId,'dmi-1', CmSubscriptionStatus.ACCEPTED)
then: 'verify status has been updated in cache'
def predicate = testCache.get(subscriptionId)
assert predicate.get('dmi-1').cmSubscriptionStatus == CmSubscriptionStatus.ACCEPTED
def subscriptionId = ncmpInEvent.getData().getSubscriptionId()
objectUnderTest.add(subscriptionId, predicates)
when: 'subscription is persisted in database'
- objectUnderTest.removeFromDatabasePerDmi(subscriptionId,'dmi-1')
+ objectUnderTest.removeFromDatabase(subscriptionId,'dmi-1')
then: 'persistence service is called the correct number of times per dmi'
4 * mockCmSubscriptionPersistenceService.removeCmSubscription(_,_,_,subscriptionId)
}
when: 'the event is consumed'
objectUnderTest.consumeDmiOutEvent(consumerRecord)
then: 'correct number of calls to cache'
- expectedCacheCalls * mockDmiCacheHandler.updateDmiSubscriptionStatusPerDmi('sub-1','test-dmi-plugin-name', subscriptionStatus)
+ expectedCacheCalls * mockDmiCacheHandler.updateDmiSubscriptionStatus('sub-1','test-dmi-plugin-name', subscriptionStatus)
and: 'correct number of calls to persist cache'
expectedPersistenceCalls * mockDmiCacheHandler.persistIntoDatabasePerDmi('sub-1','test-dmi-plugin-name')
and: 'correct number of calls to map the ncmp out event'
import com.fasterxml.jackson.databind.ObjectMapper
import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiCmSubscriptionDetailsPerDmiMapper
import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventMapper
import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventProducer
import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.NcmpInEvent
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent
+import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
+import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
import org.onap.cps.ncmp.utils.TestUtils
+import org.onap.cps.spi.model.DataNode
import org.onap.cps.utils.JsonObjectMapper
import spock.lang.Specification
def mockCmSubscriptionComparator = Mock(CmSubscriptionComparator)
def mockNcmpOutEventMapper = Mock(NcmpOutEventMapper)
def mockDmiInEventMapper = Mock(DmiInEventMapper)
+ def dmiCmSubscriptionDetailsPerDmiMapper = new DmiCmSubscriptionDetailsPerDmiMapper()
def mockNcmpOutEventProducer = Mock(NcmpOutEventProducer)
def mockDmiInEventProducer = Mock(DmiInEventProducer)
def mockDmiCacheHandler = Mock(DmiCacheHandler)
+ def mockInventoryPersistence = Mock(InventoryPersistence)
def objectUnderTest = new CmSubscriptionHandlerImpl(mockCmSubscriptionPersistenceService,
- mockCmSubscriptionComparator, mockNcmpOutEventMapper, mockDmiInEventMapper,
- mockNcmpOutEventProducer, mockDmiInEventProducer, mockDmiCacheHandler)
+ mockCmSubscriptionComparator, mockNcmpOutEventMapper, mockDmiInEventMapper, dmiCmSubscriptionDetailsPerDmiMapper,
+ mockNcmpOutEventProducer, mockDmiInEventProducer, mockDmiCacheHandler, mockInventoryPersistence)
def testDmiSubscriptionsPerDmi = ["dmi-1": new DmiCmSubscriptionDetails([], PENDING)]
then: 'the subscription cache handler is called once'
1 * mockDmiCacheHandler.add('test-id', _)
and: 'the subscription details are updated in the cache'
- 1 * mockDmiCacheHandler.updateDmiSubscriptionStatusPerDmi('test-id', _, ACCEPTED)
+ 1 * mockDmiCacheHandler.updateDmiSubscriptionStatus('test-id', _, ACCEPTED)
and: 'we schedule to send the response after configured time from the cache'
1 * mockNcmpOutEventProducer.publishNcmpOutEvent('test-id', 'subscriptionCreateResponse', null, true)
}
}
def 'Consume valid CmNotificationSubscriptionNcmpInEvent delete message'() {
- given: 'a cmNotificationSubscriptionNcmp in event for delete'
- def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
- def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent.class)
- and: 'relevant details is extracted from the event'
- def subscriptionId = testEventConsumed.getData().getSubscriptionId()
- def predicates = testEventConsumed.getData().getPredicates()
- and: 'the cache handler returns for relevant subscription id'
- 1 * mockDmiCacheHandler.get('test-id') >> testDmiSubscriptionsPerDmi
- when: 'the valid and unique event is consumed'
- objectUnderTest.processSubscriptionDeleteRequest(subscriptionId, predicates)
- then: 'the subscription cache handler is called once'
- 1 * mockDmiCacheHandler.add('test-id', predicates)
- and: 'the mapper handler to get DMI in event is called once'
- 1 * mockDmiInEventMapper.toDmiInEvent(_)
- and: 'the events handler method to publish DMI event is called correct number of times with the correct parameters'
- testDmiSubscriptionsPerDmi.size() * mockDmiInEventProducer.publishDmiInEvent(
- 'test-id', 'dmi-1', 'subscriptionDeleteRequest', _)
- and: 'we schedule to send the response after configured time from the cache'
- 1 * mockNcmpOutEventProducer.publishNcmpOutEvent('test-id', 'subscriptionDeleteResponse', null, true)
+ given: 'a test subscription id'
+ def subscriptionId = 'test-id'
+ and: 'the persistence service returns datanodes'
+ 1 * mockCmSubscriptionPersistenceService.getAllNodesForSubscriptionId(subscriptionId) >>
+ [new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-1']/filters/filter[@xpath='x/y']", leaves: ['xpath': 'x/y', 'subscriptionIds': ['test-id']]),
+ new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-2']/filters/filter[@xpath='y/z']", leaves: ['xpath': 'y/z', 'subscriptionIds': ['test-id']])]
+ and: 'the inventory persistence returns yang model cm handles'
+ 1 * mockInventoryPersistence.getYangModelCmHandle('ch-1') >> new YangModelCmHandle(dmiServiceName: 'dmi-1')
+ 1 * mockInventoryPersistence.getYangModelCmHandle('ch-2') >> new YangModelCmHandle(dmiServiceName: 'dmi-2')
+ when: 'the subscription delete request is processed'
+ objectUnderTest.processSubscriptionDeleteRequest(subscriptionId)
+ then: 'the method to publish a dmi event is called with correct parameters'
+ 1 * mockDmiInEventProducer.publishDmiInEvent(subscriptionId,'dmi-1','subscriptionDeleteRequest',_)
+ 1 * mockDmiInEventProducer.publishDmiInEvent(subscriptionId,'dmi-2','subscriptionDeleteRequest',_)
+ and: 'the method to publish nmcp out event is called with correct parameters'
+ 1 * mockNcmpOutEventProducer.publishNcmpOutEvent(subscriptionId, 'subscriptionDeleteResponse', null, true)
+ }
+
+ def 'Delete a subscriber for fully overlapping subscriptions'() {
+ given: 'a test subscription id'
+ def subscriptionId = 'test-id'
+ and: 'the persistence service returns datanodes with multiple subscribers'
+ 1 * mockCmSubscriptionPersistenceService.getAllNodesForSubscriptionId(subscriptionId) >>
+ [new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-1']/filters/filter[@xpath='x/y']", leaves: ['xpath': 'x/y', 'subscriptionIds': ['test-id','other-id']]),
+ new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-2']/filters/filter[@xpath='y/z']", leaves: ['xpath': 'y/z', 'subscriptionIds': ['test-id','other-id']])]
+ and: 'the inventory persistence returns yang model cm handles'
+ 1 * mockInventoryPersistence.getYangModelCmHandle('ch-1') >> new YangModelCmHandle(dmiServiceName: 'dmi-1')
+ 1 * mockInventoryPersistence.getYangModelCmHandle('ch-2') >> new YangModelCmHandle(dmiServiceName: 'dmi-2')
+ and: 'the cache handler returns the relevant maps whenever called'
+ 2 * mockDmiCacheHandler.get(subscriptionId) >> ['dmi-1':[:],'dmi-2':[:]]
+ when: 'the subscription delete request is processed'
+ objectUnderTest.processSubscriptionDeleteRequest(subscriptionId)
+ then: 'the method to publish a dmi event is never called'
+ 0 * mockDmiInEventProducer.publishDmiInEvent(_,_,_,_)
+ and: 'the cache handler is called to remove subscriber from database per dmi'
+ 1 * mockDmiCacheHandler.removeFromDatabase('test-id', 'dmi-1')
+ 1 * mockDmiCacheHandler.removeFromDatabase('test-id', 'dmi-2')
+ and: 'the method to publish nmcp out event is called with correct parameters'
+ 1 * mockNcmpOutEventProducer.publishNcmpOutEvent(subscriptionId, 'subscriptionDeleteResponse', null, false)
}
}
and: 'the log indicates the task completed successfully'
assert loggingEvent.formattedMessage == 'Subscription delete request for source some-resource with subscription id test-id ...'
and: 'the subscription handler service is called once'
- 1 * mockCmSubscriptionHandler.processSubscriptionDeleteRequest('test-id',_)
+ 1 * mockCmSubscriptionHandler.processSubscriptionDeleteRequest('test-id')
}
'cm handle in same datastore is NOT used for other subscriptions' | [] || 1
}
+ def 'Get all nodes for subscription id'() {
+ given: 'the query service returns nodes for subscription id'
+ def expectedDataNode = new DataNode(xpath: '/some/xpath')
+ def queryServiceResponse = [expectedDataNode].asCollection()
+ 1 * mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions', '//filter/subscriptionIds[text()=\'some-id\']', OMIT_DESCENDANTS) >> queryServiceResponse
+ when: 'retrieving all nodes for subscription id'
+ def result = objectUnderTest.getAllNodesForSubscriptionId('some-id')
+ then: 'the result returns correct number of datanodes'
+ assert result.size() == 1
+ and: 'the attribute of the datanode is as expected'
+ assert result.iterator().next().xpath == expectedDataNode.xpath
+ }
+
}