/**
* This method maps relevant details for a subscription to a data job subscription DMI in event.
*
- * @param cmHandleIds list of cm handle ID(s)
- * @param dataNodeSelectors list of data node selectors
- * @param notificationTypes the list of notification types
- * @param notificationFilter the notification filter
- * @return data job subscription DMI in event
+ * @param cmHandleIds list of cm handle ID(s)
+ * @param dataNodeSelectors list of data node selectors
+ * @param notificationTypes the list of notification types
+ * @param notificationFilter the notification filter
+ * @return data job subscription DMI in event
*/
public DataJobSubscriptionDmiInEvent toDmiInEvent(final List<String> cmHandleIds,
final List<String> dataNodeSelectors,
final CmHandle cmHandle = new CmHandle();
final Map<String, String> cmHandleAdditionalProperties = new LinkedHashMap<>();
yangModelCmHandle.getAdditionalProperties()
- .forEach(additionalProperty -> cmHandleAdditionalProperties.put(additionalProperty.name(),
- additionalProperty.value()));
+ .forEach(additionalProperty -> cmHandleAdditionalProperties.put(additionalProperty.name(),
+ additionalProperty.value()));
cmHandle.setCmhandleId(yangModelCmHandle.getId());
cmHandle.setPrivateProperties(cmHandleAdditionalProperties);
cmSubscriptionCmHandles.add(cmHandle);
return cmSubscriptionCmHandles;
}
-
}
public interface CmSubscriptionHandler {
/**
- * Process cm notification subscription create request.
+ * Process CM notification subscription create request.
*
* @param dataSelector subscription data selector
* @param subscriptionId subscription id
* @param dataNodeSelectors subscription data node selectors
*/
- void processSubscriptionCreate(final DataSelector dataSelector, final String subscriptionId,
- final List<String> dataNodeSelectors);
+ void createSubscription(final DataSelector dataSelector, final String subscriptionId,
+ final List<String> dataNodeSelectors);
+
+ /**
+ * Process CM notification subscription delete request.
+ *
+ * @param subscriptionId subscription id
+ */
+ void deleteSubscription(final String subscriptionId);
/**
* Update status of a subscription.
*/
void updateCmSubscriptionStatus(final String subscriptionId, final String dmiServiceName,
final CmSubscriptionStatus cmSubscriptionStatus);
-
}
package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
+
@Service
@RequiredArgsConstructor
@Slf4j
private final AlternateIdMatcher alternateIdMatcher;
@Override
- public void processSubscriptionCreate(final DataSelector dataSelector,
- final String subscriptionId, final List<String> dataNodeSelectors) {
+ public void createSubscription(final DataSelector dataSelector,
+ final String subscriptionId, final List<String> dataNodeSelectors) {
for (final String dataNodeSelector : dataNodeSelectors) {
cmDataJobSubscriptionPersistenceService.add(subscriptionId, dataNodeSelector);
}
- sendCreateEventToDmis(subscriptionId, dataSelector);
+ sendEventToDmis(subscriptionId,
+ cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors(subscriptionId),
+ dataSelector, "subscriptionCreateRequest");
+ }
+
+ @Override
+ public void deleteSubscription(final String subscriptionId) {
+ final Collection<String> dataNodeSelectors =
+ cmDataJobSubscriptionPersistenceService.getDataNodeSelectors(subscriptionId);
+ final List<String> dataNodeSelectorsWithoutAnySubscriber = new ArrayList<>();
+ for (final String dataNodeSelector : dataNodeSelectors) {
+ cmDataJobSubscriptionPersistenceService.delete(subscriptionId, dataNodeSelector);
+ if (cmDataJobSubscriptionPersistenceService.getSubscriptionIds(dataNodeSelector).isEmpty()) {
+ dataNodeSelectorsWithoutAnySubscriber.add(dataNodeSelector);
+ }
+ }
+ sendEventToDmis(subscriptionId, dataNodeSelectorsWithoutAnySubscriber, null, "subscriptionDeleteRequest");
}
@Override
}
}
- private void sendCreateEventToDmis(final String subscriptionId, final DataSelector dataSelector) {
- final List<String> dataNodeSelectors =
- cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors(subscriptionId);
+ private void sendEventToDmis(final String subscriptionId,
+ final List<String> dataNodeSelectors,
+ final DataSelector dataSelector,
+ final String eventType) {
final Map<String, CmHandleIdsAndDataNodeSelectors> cmHandleIdsAndDataNodeSelectorsPerDmi =
createDmiInEventTargetsPerDmi(dataNodeSelectors);
-
for (final Map.Entry<String, CmHandleIdsAndDataNodeSelectors> cmHandleIdsAndDataNodeSelectorsEntry :
cmHandleIdsAndDataNodeSelectorsPerDmi.entrySet()) {
final String dmiServiceName = cmHandleIdsAndDataNodeSelectorsEntry.getKey();
final CmHandleIdsAndDataNodeSelectors cmHandleIdsAndDataNodeSelectors =
cmHandleIdsAndDataNodeSelectorsEntry.getValue();
- final DataJobSubscriptionDmiInEvent dmiInEvent =
- buildDmiInEvent(cmHandleIdsAndDataNodeSelectors, dataSelector);
- eventProducer.send(subscriptionId, dmiServiceName, "subscriptionCreateRequest", dmiInEvent);
+
+ final DataJobSubscriptionDmiInEvent dmiInEvent;
+ dmiInEvent = buildDmiInEvent(cmHandleIdsAndDataNodeSelectors, dataSelector);
+ eventProducer.send(subscriptionId, dmiServiceName, eventType, dmiInEvent);
}
}
-
private DataJobSubscriptionDmiInEvent buildDmiInEvent(
final CmHandleIdsAndDataNodeSelectors cmHandleIdsAndDataNodeSelectors,
final DataSelector dataSelector) {
final List<String> cmHandleIds = new ArrayList<>(cmHandleIdsAndDataNodeSelectors.cmHandleIds);
final List<String> dataNodeSelectors = new ArrayList<>(cmHandleIdsAndDataNodeSelectors.dataNodeSelectors);
- final List<String> notificationTypes = dataSelector.getNotificationTypes();
- final String notificationFilter = dataSelector.getNotificationFilter();
+ final List<String> notificationTypes;
+ final String notificationFilter;
+ if (dataSelector != null) {
+ notificationTypes = dataSelector.getNotificationTypes();
+ notificationFilter = dataSelector.getNotificationFilter();
+ } else {
+ notificationTypes = null;
+ notificationFilter = null;
+ }
+
return dmiInEventMapper.toDmiInEvent(cmHandleIds, dataNodeSelectors, notificationTypes, notificationFilter);
}
return yangModelCmHandle.getDmiServiceName();
}
- private record CmHandleIdsAndDataNodeSelectors(Set<String> cmHandleIds, Set<String> dataNodeSelectors) {}
-
+ private record CmHandleIdsAndDataNodeSelectors(Set<String> cmHandleIds, Set<String> dataNodeSelectors) {
+ }
}
public void consumeSubscriptionEvent(
final DataJobSubscriptionOperationInEvent dataJobSubscriptionOperationInEvent) {
final String eventType = dataJobSubscriptionOperationInEvent.getEventType();
- final String dataJobId = dataJobSubscriptionOperationInEvent.getEvent().getDataJob().getId();
+ final DataJob dataJob = dataJobSubscriptionOperationInEvent.getEvent().getDataJob();
+ final String dataJobId = dataJob.getId();
log.info("Consumed subscription event with details: | dataJobId={} | eventType={}", dataJobId, eventType);
- if (eventType.equals("dataJobCreated")) {
- final DataJob dataJob = dataJobSubscriptionOperationInEvent.getEvent().getDataJob();
- final String dataNodeSelector =
- dataJob.getProductionJobDefinition().getTargetSelector().getDataNodeSelector();
- final List<String> dataNodeSelectors = JexParser.toXpaths(dataNodeSelector);
- final DataSelector dataSelector = dataJobSubscriptionOperationInEvent.getEvent().getDataJob()
- .getProductionJobDefinition().getDataSelector();
- cmSubscriptionHandler.processSubscriptionCreate(dataSelector, dataJobId, dataNodeSelectors);
+ switch (eventType) {
+ case "dataJobCreated" -> handleCreate(dataJobId, dataJob);
+ case "dataJobDeleted" -> cmSubscriptionHandler.deleteSubscription(dataJobId);
+ default -> log.warn("Unknown eventType={} for dataJobId={}", eventType, dataJobId);
}
}
+
+ private void handleCreate(final String dataJobId, final DataJob dataJob) {
+ final String dataNodeSelector =
+ dataJob.getProductionJobDefinition().getTargetSelector().getDataNodeSelector();
+ final List<String> dataNodeSelectors = JexParser.toXpaths(dataNodeSelector);
+ final DataSelector dataSelector = dataJob.getProductionJobDefinition().getDataSelector();
+ cmSubscriptionHandler.createSubscription(dataSelector, dataJobId, dataNodeSelectors);
+ }
}
private static final String CPS_PATH_FOR_SUBSCRIPTION_NODE = "//subscription";
private static final String CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR =
CPS_PATH_FOR_SUBSCRIPTION_NODE + "[@dataNodeSelector='%s']";
+ private static final String CPS_PATH_FOR_SUBSCRIPTION_WITH_DATA_NODE_SELECTOR =
+ "/dataJob/subscription[@dataNodeSelector='%s']";
private static final String CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID =
- CPS_PATH_FOR_SUBSCRIPTION_NODE + "/dataJobId[text()='%s']";
+ CPS_PATH_FOR_SUBSCRIPTION_NODE + "/dataJobId[text()='%s']";
private static final String CPS_PATH_TEMPLATE_FOR_INACTIVE_SUBSCRIPTIONS =
CPS_PATH_FOR_SUBSCRIPTION_NODE + "[@status='UNKNOWN' or @status='REJECTED']/dataJobId[text()='%s']";
public boolean isNewSubscriptionId(final String subscriptionId) {
final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID.formatted(subscriptionId);
return cpsQueryService.queryDataNodes(DATASPACE, ANCHOR,
- query, OMIT_DESCENDANTS).isEmpty();
+ query, OMIT_DESCENDANTS).isEmpty();
}
/**
public Collection<String> getSubscriptionIds(final String dataNodeSelector) {
final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR.formatted(dataNodeSelector);
final Collection<DataNode> existingNodes =
- cpsQueryService.queryDataNodes(DATASPACE, ANCHOR,
- query, OMIT_DESCENDANTS);
+ cpsQueryService.queryDataNodes(DATASPACE, ANCHOR,
+ query, OMIT_DESCENDANTS);
if (existingNodes.isEmpty()) {
return Collections.emptyList();
}
return (Collection<String>) existingNodes.iterator().next().getLeaves().get("dataJobId");
}
+ /**
+ * Get data node selectors for subscriptions with subscription ID.
+ *
+ * @param subscriptionId subscription ID
+ * @return a list of dataNodeSelectors, or empty list if none found
+ */
+ public Collection<String> getDataNodeSelectors(final String subscriptionId) {
+ final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID.formatted(subscriptionId);
+ final Collection<DataNode> dataNodes =
+ cpsQueryService.queryDataNodes(DATASPACE, ANCHOR, query, OMIT_DESCENDANTS);
+ final List<String> dataNodeSelectors = new ArrayList<>();
+ for (final DataNode dataNode : dataNodes) {
+ final String dataNodeSelector = dataNode.getLeaves().get("dataNodeSelector").toString();
+ dataNodeSelectors.add(dataNodeSelector);
+ }
+ return dataNodeSelectors;
+ }
+
+ /**
+ * Remove cm notification data job subscription.
+ *
+ * @param subscriptionId data job subscription id to be deleted
+ * @param dataNodeSelector the target of the data job subscription
+ */
+ public void delete(final String subscriptionId, final String dataNodeSelector) {
+ final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR.formatted(dataNodeSelector);
+ final Collection<DataNode> dataNodes =
+ cpsQueryService.queryDataNodes(DATASPACE, ANCHOR, query, OMIT_DESCENDANTS);
+ final Collection<String> subscriptionIds = getSubscriptionIds(dataNodeSelector);
+ if (!subscriptionIds.remove(subscriptionId)) {
+ log.warn("SubscriptionId={} not found under {}={}", subscriptionId, "dataNodeSelector", dataNodeSelector);
+ return;
+ }
+ if (subscriptionIds.isEmpty()) {
+ deleteEntireSubscription(dataNodeSelector);
+ } else {
+ final String currentStatus = dataNodes.iterator().next().getLeaves().get("status").toString();
+ updateSubscriptionDetails(dataNodeSelector, subscriptionIds, currentStatus);
+ }
+ }
+
+ /**
+ * Delete the entire subscription.
+ *
+ * @param dataNodeSelector data node selector
+ */
+ public void deleteEntireSubscription(final String dataNodeSelector) {
+ final String query = CPS_PATH_FOR_SUBSCRIPTION_WITH_DATA_NODE_SELECTOR.formatted(dataNodeSelector);
+ cpsDataService.deleteDataNode(DATASPACE, ANCHOR, query, OffsetDateTime.now());
+ }
+
/**
* Get data node selectors for subscriptions with status UNKNOWN or REJECTED.
*
- * @param subscriptionId subscription ID
- * @return a list of data node selectors
+ * @param subscriptionId subscription ID
+ * @return a list of data node selectors
*/
public List<String> getInactiveDataNodeSelectors(final String subscriptionId) {
final String query = CPS_PATH_TEMPLATE_FOR_INACTIVE_SUBSCRIPTIONS.formatted(subscriptionId);
private void addNewSubscriptionDetails(final String subscriptionId,
final String dataNodeSelector) {
final Collection<String> newSubscriptionList = Collections.singletonList(subscriptionId);
- final String cmSubscriptionStatus = UNKNOWN.name();
final String subscriptionDetailsAsJson = createSubscriptionDetailsAsJson(dataNodeSelector,
- newSubscriptionList, cmSubscriptionStatus);
+ newSubscriptionList, UNKNOWN.name());
cpsDataService.saveData(DATASPACE, ANCHOR, PARENT_NODE_XPATH, subscriptionDetailsAsJson,
OffsetDateTime.now(), ContentType.JSON);
}
final String cmSubscriptionStatusName) {
final Map<String, Serializable> subscriptionDetailsAsMap =
Map.of("dataNodeSelector", dataNodeSelector,
- "dataJobId", (Serializable) subscriptionIds,
- "status", cmSubscriptionStatusName);
+ "dataJobId", (Serializable) subscriptionIds,
+ "status", cmSubscriptionStatusName);
return "{\"subscription\":[" + jsonObjectMapper.asJsonString(subscriptionDetailsAsMap) + "]}";
}
+
}
package org.onap.cps.ncmp.impl.datajobs.subscription.dmi
+
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
import org.onap.cps.ncmp.impl.utils.JexParser
mockInventoryPersistence.getYangModelCmHandles(['ch-1', 'ch-2'] as Set) >> yangModelCmHandles
}
- def 'Check for Cm Notification Subscription DMI In Event mapping'() {
+ def 'Check for Cm Notification Subscription DMI In Event mapping.'() {
given: 'data job subscription details'
def cmHandleIds = ['ch-1', 'ch-2'].asList()
def dataNodeSelectors = ['/dataNodeSelector1'].asList()
assert result.data.cmHandles.cmhandleId.containsAll(cmHandleIds)
and: 'correct data node selector'
assert result.data.productionJobDefinition.targetSelector.dataNodeSelector == dataNodeSelectorAsJsonExpression
-
}
}
+
+
+
+
package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp
-import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED
-
import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataSelector
-import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent
import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventMapper
import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.EventProducer
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent
import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
import org.onap.cps.ncmp.impl.utils.JexParser
import spock.lang.Specification
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED
+
class CmSubscriptionHandlerImplSpec extends Specification {
def mockCmSubscriptionPersistenceService = Mock(CmDataJobSubscriptionPersistenceService)
def mockAlternateIdMatcher = Mock(AlternateIdMatcher)
def objectUnderTest = new CmSubscriptionHandlerImpl(mockCmSubscriptionPersistenceService, mockDmiInEventMapper,
- mockDmiInEventProducer, mockInventoryPersistence, mockAlternateIdMatcher)
+ mockDmiInEventProducer, mockInventoryPersistence, mockAlternateIdMatcher)
def 'Process subscription CREATE request for new target [non existing]'() {
given: 'relevant subscription details'
and: 'alternate Id matcher returns cm handle id for given data node selector'
def fdn = getFdn(myDataNodeSelectors.iterator().next())
mockAlternateIdMatcher.getCmHandleId(fdn) >> 'myCmHandleId'
- and: 'returns inactive data node selector(s)'
+ and: 'the persistence service returns inactive data node selector(s)'
mockCmSubscriptionPersistenceService.getInactiveDataNodeSelectors(mySubId) >> ['/parent[id="1"]']
and: 'the inventory persistence service returns cm handle'
mockInventoryPersistence.getYangModelCmHandle('myCmHandleId') >> new YangModelCmHandle(dmiServiceName: 'myDmiService')
and: 'DMI in event mapper returns event'
def myDmiInEvent = new DataJobSubscriptionDmiInEvent()
mockDmiInEventMapper.toDmiInEvent(['myCmHandleId'], myDataNodeSelectors, notificationTypes, notificationFilter) >> myDmiInEvent
- when: 'the method to process subscription create request is called'
- objectUnderTest.processSubscriptionCreate(dataSelector, mySubId, myDataNodeSelectors)
- then: 'the persistence service is called'
+ when: 'a subscription is created'
+ objectUnderTest.createSubscription(dataSelector, mySubId, myDataNodeSelectors)
+ then: 'each datanode selector is added using the persistence service'
1 * mockCmSubscriptionPersistenceService.add(mySubId, '/parent[id="1"]')
- and: 'the event is sent to correct DMI'
+ and: 'an event is sent to the correct DMI'
1 * mockDmiInEventProducer.send(mySubId, 'myDmiService', 'subscriptionCreateRequest', _)
}
given: 'relevant subscription details'
def mySubId = 'dataJobId'
def myDataNodeSelectors = [
- '/parent[id="forDmi1"]',
- '/parent[id="forDmi1"]/child',
- '/parent[id="forDmi2"]'].toList()
- def someAttr1 = []
- def someAttr2 = ''
- def dataSelector = new DataSelector(notificationTypes: someAttr1, notificationFilter: someAttr2)
+ '/parent[id="forDmi1"]',
+ '/parent[id="forDmi1"]/child',
+ '/parent[id="forDmi2"]'].toList()
+ def notificationTypes = []
+ def notificationFilter = ''
+ def dataSelector = new DataSelector(notificationTypes: notificationTypes, notificationFilter: notificationFilter)
and: 'alternate Id matcher returns cm handle ids for given data node selectors'
- def fdn1 = getFdn(myDataNodeSelectors.get(0))
- def fdn2 = getFdn(myDataNodeSelectors.get(1))
- def fdn3 = getFdn(myDataNodeSelectors.get(2))
+ def fdn1 = getFdn(myDataNodeSelectors[0])
+ def fdn2 = getFdn(myDataNodeSelectors[1])
+ def fdn3 = getFdn(myDataNodeSelectors[2])
mockAlternateIdMatcher.getCmHandleId(fdn1) >> 'myCmHandleId1'
mockAlternateIdMatcher.getCmHandleId(fdn2) >> 'myCmHandleId1'
mockAlternateIdMatcher.getCmHandleId(fdn3) >> 'myCmHandleId2'
- and: 'returns inactive data node selector(s)'
+ and: 'the persistence service returns inactive data node selector(s)'
mockCmSubscriptionPersistenceService.getInactiveDataNodeSelectors(mySubId) >> [
- '/parent[id="forDmi1"]',
- '/parent[id="forDmi1"]/child',
- '/parent[id="forDmi2"]']
+ '/parent[id="forDmi1"]',
+ '/parent[id="forDmi1"]/child',
+ '/parent[id="forDmi2"]']
and: 'the inventory persistence service returns cm handles with dmi information'
mockInventoryPersistence.getYangModelCmHandle('myCmHandleId1') >> new YangModelCmHandle(dmiServiceName: 'myDmiService1')
mockInventoryPersistence.getYangModelCmHandle('myCmHandleId2') >> new YangModelCmHandle(dmiServiceName: 'myDmiService2')
and: 'DMI in event mapper returns events'
def myDmiInEvent1 = new DataJobSubscriptionDmiInEvent()
def myDmiInEvent2 = new DataJobSubscriptionDmiInEvent()
- mockDmiInEventMapper.toDmiInEvent(['myCmHandleId1'], ['/parent[id="forDmi1"]', '/parent[id="forDmi1"]/child'], someAttr1, someAttr2) >> myDmiInEvent1
- mockDmiInEventMapper.toDmiInEvent(['myCmHandleId2'], ['/parent[id="forDmi2"]'], someAttr1, someAttr2) >> myDmiInEvent2
- when: 'the method to process subscription create request is called'
- objectUnderTest.processSubscriptionCreate(dataSelector, mySubId, myDataNodeSelectors)
- then: 'the persistence service is called'
+ mockDmiInEventMapper.toDmiInEvent(['myCmHandleId1'], ['/parent[id="forDmi1"]', '/parent[id="forDmi1"]/child'], notificationTypes, notificationFilter) >> myDmiInEvent1
+ mockDmiInEventMapper.toDmiInEvent(['myCmHandleId2'], ['/parent[id="forDmi2"]'], notificationTypes, notificationFilter) >> myDmiInEvent2
+ when: 'a subscription is created'
+ objectUnderTest.createSubscription(dataSelector, mySubId, myDataNodeSelectors)
+ then: 'each datanode selector is added using the persistence service'
myDataNodeSelectors.each { dataNodeSelector ->
- 1 * mockCmSubscriptionPersistenceService.add(_, dataNodeSelector)}
- and: 'the event is sent to correct DMIs'
+ 1 * mockCmSubscriptionPersistenceService.add(_, dataNodeSelector)
+ }
+ and: 'an event is sent to each DMI involved'
1 * mockDmiInEventProducer.send(mySubId, 'myDmiService1', 'subscriptionCreateRequest', myDmiInEvent1)
1 * mockDmiInEventProducer.send(mySubId, 'myDmiService2', 'subscriptionCreateRequest', myDmiInEvent2)
}
mockAlternateIdMatcher.getCmHandleId(_) >> 'someCmHandleId'
and: 'the inventory persistence service returns cm handles with dmi information'
mockInventoryPersistence.getYangModelCmHandle(_) >> new YangModelCmHandle(dmiServiceName: 'myDmiService')
- and: 'returns inactive data node selector(s)'
+ and: 'the inventory persistence service returns inactive data node selector(s)'
mockCmSubscriptionPersistenceService.getInactiveDataNodeSelectors(myNewSubId) >> inactiveDataNodeSelectors
- when: 'the method to process subscription create request is called'
- objectUnderTest.processSubscriptionCreate(dataSelector, myNewSubId, myDataNodeSelectors)
- then: 'the persistence service is called'
+ when: 'a subscription is created'
+ objectUnderTest.createSubscription(dataSelector, myNewSubId, myDataNodeSelectors)
+ then: 'each datanode selector is added using the persistence service'
1 * mockCmSubscriptionPersistenceService.add(_, myDataNodeSelectors.iterator().next())
- and: 'the event is sent to correct DMIs'
+ and: 'an event is sent to each DMI involved'
expectedCallsToDmi * mockDmiInEventProducer.send(myNewSubId, 'myDmiService', 'subscriptionCreateRequest', _)
where: 'following data are used'
- scenario | inactiveDataNodeSelectors || expectedCallsToDmi
- 'new target overlaps with ACCEPTED targets' | [] || 0
- 'new target overlaps with REJECTED targets' | ['/existingDataNodeSelector[id=""]','/newDataNodeSelector[id=""]']|| 1
- 'new target overlaps with UNKNOWN targets' | ['/existingDataNodeSelector[id=""]','/newDataNodeSelector[id=""]']|| 1
- 'new target does not overlap with existing targets'| ['/newDataNodeSelector[id=""]'] || 1
+ scenario | inactiveDataNodeSelectors || expectedCallsToDmi
+ 'new target overlaps with ACCEPTED targets' | [] || 0
+ 'new target overlaps with REJECTED targets' | ['/existingDataNodeSelector[id=""]', '/newDataNodeSelector[id=""]'] || 1
+ 'new target overlaps with UNKNOWN targets' | ['/existingDataNodeSelector[id=""]', '/newDataNodeSelector[id=""]'] || 1
+ 'new target does not overlap with existing targets' | ['/newDataNodeSelector[id=""]'] || 1
+ }
+
+ def 'Process subscription DELETE request where all data node selectors become unused'() {
+ given: 'a subscription id and its associated data node selectors'
+ def mySubId = 'deleteJobId'
+ def myDataNodeSelector = ['/node[id="1"]']
+ and: 'the persistence service returns the data node selectors'
+ mockCmSubscriptionPersistenceService.getDataNodeSelectors(mySubId) >> myDataNodeSelector
+ and: 'no other subscriptions exist for the data node selectors'
+ mockCmSubscriptionPersistenceService.getSubscriptionIds('/node[id="1"]') >> []
+ and: 'cm handle resolution setup'
+ def fdn = getFdn('/node[id="1"]')
+ mockAlternateIdMatcher.getCmHandleId(fdn) >> 'cmHandleId1'
+ mockInventoryPersistence.getYangModelCmHandle('cmHandleId1') >> new YangModelCmHandle(dmiServiceName: 'dmiService1')
+ and: 'DMI in event mapper returns events'
+ def deleteEvent = new DataJobSubscriptionDmiInEvent()
+ mockDmiInEventMapper.toDmiInEvent(['cmHandleId1'], ['/node[id="1"]'], null, null) >> deleteEvent
+ when: 'a subscription is deleted'
+ objectUnderTest.deleteSubscription(mySubId)
+ then: 'subscription is removed from persistence'
+ 1 * mockCmSubscriptionPersistenceService.delete(mySubId, '/node[id="1"]')
+ and: 'an event is sent to each DMI involved'
+ 1 * mockDmiInEventProducer.send(mySubId, 'dmiService1', 'subscriptionDeleteRequest', deleteEvent)
+ }
+
+ def 'Process subscription DELETE request where some data node selectors are still in use'() {
+ given: 'a subscription id and two associated selectors'
+ def mySubId = 'deleteJobId2'
+ def dataNodeSelectors = ['/node[id="1"]', '/node[id="2"]']
+ and: 'the persistence service returns the data node selectors'
+ mockCmSubscriptionPersistenceService.getDataNodeSelectors(mySubId) >> dataNodeSelectors
+ and: 'data node selector 1 has no more subscribers, data node selector 2 still has subscribers'
+ mockCmSubscriptionPersistenceService.getSubscriptionIds('/node[id="1"]') >> []
+ mockCmSubscriptionPersistenceService.getSubscriptionIds('/node[id="2"]') >> ['anotherSub']
+ and: 'cm handle resolution for data node selector 1'
+ def fdn = getFdn('/node[id="1"]')
+ mockAlternateIdMatcher.getCmHandleId(fdn) >> 'cmHandleIdX'
+ mockInventoryPersistence.getYangModelCmHandle('cmHandleIdX') >> new YangModelCmHandle(dmiServiceName: 'dmiServiceX')
+ and: 'DMI in event mapper returns events'
+ def deleteEvent = new DataJobSubscriptionDmiInEvent()
+ mockDmiInEventMapper.toDmiInEvent(['cmHandleIdX'], ['/node[id="1"]'], null, null) >> deleteEvent
+ when: 'a subscription is deleted'
+ objectUnderTest.deleteSubscription(mySubId)
+ then: 'subscription is removed from persistence for both data node selectors'
+ 1 * mockCmSubscriptionPersistenceService.delete(mySubId, '/node[id="1"]')
+ 1 * mockCmSubscriptionPersistenceService.delete(mySubId, '/node[id="2"]')
+ and: 'delete event is sent only for data node selectors without any subscriber'
+ 1 * mockDmiInEventProducer.send(mySubId, 'dmiServiceX', 'subscriptionDeleteRequest', deleteEvent)
+ }
+
+ def 'Process subscription DELETE request where cmHandleId cannot be resolved'() {
+ given: 'a subscription id and its data node selector'
+ def mySubId = 'deleteJobId3'
+ def dataNodeSelectors = ['/node[id="unresolvable"]']
+ and: 'the persistence service returns the data node selectors'
+ mockCmSubscriptionPersistenceService.getDataNodeSelectors(mySubId) >> dataNodeSelectors
+ and: 'no more subscriptions exist for the data node selector'
+ mockCmSubscriptionPersistenceService.getSubscriptionIds('/node[id="unresolvable"]') >> []
+ and: 'alternate id matcher cannot resolve cm handle id'
+ def fdn = getFdn('/node[id="unresolvable"]')
+ mockAlternateIdMatcher.getCmHandleId(fdn) >> null
+ and: 'DMI in event mapper returns events'
+ def deleteEvent = new DataJobSubscriptionDmiInEvent()
+ mockDmiInEventMapper.toDmiInEvent(['cmHandleIdX'], ['/node[id="1"]'], null, null) >> deleteEvent
+ when: 'a subscription is deleted'
+ objectUnderTest.deleteSubscription(mySubId)
+ then: 'subscription is removed from persistence'
+ 1 * mockCmSubscriptionPersistenceService.delete(mySubId, '/node[id="unresolvable"]')
+ and: 'no delete event is sent because cmHandleId was not resolved'
+ 0 * mockDmiInEventProducer.send(*_)
}
def 'Update subscription status to ACCEPTED: #scenario'() {
and: 'alternate id matcher always returns a cm handle id'
mockAlternateIdMatcher.getCmHandleId(_) >> 'someCmHandleId'
and: 'the inventory persistence service returns a yang model with a dmi service name for the accepted subscription'
- mockInventoryPersistence.getYangModelCmHandle(_) >> new YangModelCmHandle(dmiServiceName: 'myDmi')
+ mockInventoryPersistence.getYangModelCmHandle(_) >> new YangModelCmHandle(dmiServiceName: 'myDmi')
when: 'the method to update subscription status is called with status=ACCEPTED and dmi #dmiName'
objectUnderTest.updateCmSubscriptionStatus(mySubscriptionId, dmiName, ACCEPTED)
then: 'the persistence service to update subscription status is ONLY called for matching dmi name'
expectedCallsToPersistenceService * mockCmSubscriptionPersistenceService.updateCmSubscriptionStatus('/myDataNodeSelector[id=""]', ACCEPTED)
where: 'the following data are used'
- scenario |dmiName || expectedCallsToPersistenceService
- 'data node selector for "myDmi"' |'myDmi' || 1
- 'data node selector for other dmi'| 'someOtherDmi'|| 0
+ scenario | dmiName || expectedCallsToPersistenceService
+ 'data node selector for "myDmi"' | 'myDmi' || 1
+ 'data node selector for other dmi' | 'someOtherDmi' || 0
}
def getFdn(dataNodeSelector) {
return JexParser.extractFdnPrefix(dataNodeSelector).orElse("")
}
-}
+}
\ No newline at end of file
class NcmpInEventConsumerSpec extends Specification {
- def logger = new ListAppender<ILoggingEvent>()
def objectMapper = new ObjectMapper()
def mockCmSubscriptionHandler = Mock(CmSubscriptionHandlerImpl)
def objectUnderTest = new NcmpInEventConsumer(mockCmSubscriptionHandler)
- void setup() {
- ((Logger) LoggerFactory.getLogger(NcmpInEventConsumer.class)).addAppender(logger)
- logger.start()
- }
-
- void cleanup() {
- ((Logger) LoggerFactory.getLogger(NcmpInEventConsumer.class)).detachAndStopAllAppenders()
- }
-
def 'Consuming CREATE cm data job subscription request.'() {
given: 'a JSON file for create event'
def jsonData = TestUtils.getResourceFileContent(
def dataSelector = getDataSelector(event)
when: 'the event is consumed'
objectUnderTest.consumeSubscriptionEvent(event)
- then: 'event details are logged at level INFO'
- def loggingEvent = logger.list.last()
- assert loggingEvent.level == Level.INFO
- assert loggingEvent.formattedMessage.contains('dataJobId=myDataJobId')
- assert loggingEvent.formattedMessage.contains("eventType=${myEventType}")
- and: 'method to handle process subscription create request is called'
- 1 * mockCmSubscriptionHandler.processSubscriptionCreate(dataSelector, "myDataJobId", dataNodeSelectorList)
+ then: 'subscription create request is called'
+ 1 * mockCmSubscriptionHandler.createSubscription(dataSelector, "myDataJobId", dataNodeSelectorList)
+ }
+
+ def 'Consuming DELETE cm data job subscription request.'() {
+ given: 'a JSON file for delete event'
+ def jsonData = TestUtils.getResourceFileContent(
+ 'datajobs/subscription/cmNotificationSubscriptionNcmpInEvent.json')
+ def myEventType = "dataJobDeleted"
+ jsonData = jsonData.replace('#myEventType', myEventType)
+ and: 'the event'
+ def event = objectMapper.readValue(jsonData, DataJobSubscriptionOperationInEvent)
+ when: 'the event is consumed'
+ objectUnderTest.consumeSubscriptionEvent(event)
+ then: 'subscription delete request is called'
+ 1 * mockCmSubscriptionHandler.deleteSubscription("myDataJobId")
}
def getDataNodeSelectorsAsXpaths(event) {
import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR
import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID
import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_INACTIVE_SUBSCRIPTIONS
+import static CmDataJobSubscriptionPersistenceService.CPS_PATH_FOR_SUBSCRIPTION_WITH_DATA_NODE_SELECTOR
import static CmDataJobSubscriptionPersistenceService.PARENT_NODE_XPATH
-import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS
import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED
+import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS
import com.fasterxml.jackson.databind.ObjectMapper
import org.onap.cps.api.CpsDataService
import org.onap.cps.utils.JsonObjectMapper
import spock.lang.Specification
-
class CmSubscriptionPersistenceServiceSpec extends Specification {
def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
1 * mockCpsDataService.updateNodeLeaves('NCMP-Admin', 'cm-data-job-subscriptions', PARENT_NODE_XPATH, subscriptionDetailsAsJson, _, ContentType.JSON)
}
- def 'Update subscription status'() {
- given: 'a data node selector'
+ def 'Get data node selectors by subscription id.'() {
+ given: 'a subscription id and a corresponding CPS query path'
+ def subscriptionId = 'mySubId'
+ def cpsPathQuery = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID.formatted(subscriptionId)
+ and: 'the query service returns a collection of DataNodes with dataNodeSelectors'
+ def expectedDataNode1 = new DataNode(leaves: ['dataNodeSelector': '/dataNodeSelector1'])
+ def expectedDataNode2 = new DataNode(leaves: ['dataNodeSelector': '/dataNodeSelector2'])
+ def queryServiceResponse = [expectedDataNode1, expectedDataNode2]
+ 1 * mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', cpsPathQuery, OMIT_DESCENDANTS) >> queryServiceResponse
+ when: 'get data node selectors by subscription id is called'
+ def result = objectUnderTest.getDataNodeSelectors(subscriptionId)
+ then: 'the returned list contains the correct data node selectors'
+ assert result.size() == 2
+ assert result.containsAll('/dataNodeSelector1', '/dataNodeSelector2' )
+ }
+
+ def 'Delete subscription removes last subscriber.'() {
+ given: 'a dataNode with only one subscription'
+ def dataNodeSelector = '/myDataNodeSelector'
+ def subscriptionId = 'someId'
+ def queryForDataNode = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR.formatted(dataNodeSelector)
+ def queryForDelete = CPS_PATH_FOR_SUBSCRIPTION_WITH_DATA_NODE_SELECTOR.formatted(dataNodeSelector)
+ def dataNode = new DataNode(leaves: ['dataJobId': [subscriptionId], 'status': 'ACCEPTED'])
+ mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', queryForDataNode, OMIT_DESCENDANTS) >> [dataNode]
+ and: 'subscription IDs for the data node'
+ objectUnderTest = Spy(objectUnderTest)
+ objectUnderTest.getSubscriptionIds(dataNodeSelector) >> [subscriptionId].toList()
+ when: 'delete method is called'
+ objectUnderTest.delete(subscriptionId, dataNodeSelector)
+ then: 'subscription deletion is performed'
+ 1 * mockCpsDataService.deleteDataNode('NCMP-Admin', 'cm-data-job-subscriptions', queryForDelete, _)
+ }
+
+ def 'Delete subscription removes one of multiple subscribers.'() {
+ given: 'a dataNode with multiple subscriptions'
+ def dataNodeSelector = '/myDataNodeSelector'
+ def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR.formatted(dataNodeSelector)
+ def dataNode = new DataNode(leaves: ['dataJobId': ['id-to-remove', 'id-remaining'], 'status': 'ACCEPTED'])
+ mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', query, OMIT_DESCENDANTS) >> [dataNode]
+ and: 'subscription IDs for the data node'
+ objectUnderTest.getSubscriptionIds(dataNodeSelector) >> ['id-to-remove', 'id-remaining']
+ when: 'delete method is called'
+ objectUnderTest.delete('id-to-remove', dataNodeSelector)
+ then: 'data service is called to update leaves with remaining subscription'
+ 1 * mockCpsDataService.updateNodeLeaves('NCMP-Admin', 'cm-data-job-subscriptions', PARENT_NODE_XPATH, { json ->
+ json.contains('"status":"ACCEPTED"') &&
+ json.contains('"dataJobId":["id-remaining"]')
+ }, _, ContentType.JSON)
+ }
+
+ def 'Update status of a subscription.'() {
+ given: 'a data node selector and status'
def myDataNodeSelector = "/myDataNodeSelector"
- and: 'a status'
def status = ACCEPTED
and: 'the query service returns data node'
def subscriptionIds = ['someId']
- mockCpsQueryService.queryDataNodes(_,_,_,_) >> [new DataNode(leaves: ['dataJobId': subscriptionIds, 'dataNodeSelector': myDataNodeSelector, 'status': 'UNKNOWN'])]
+ mockCpsQueryService.queryDataNodes(*_) >> [new DataNode(leaves: ['dataJobId': subscriptionIds, 'dataNodeSelector': myDataNodeSelector, 'status': 'UNKNOWN'])]
and: 'updated cm data job subscription details as json'
def subscriptionDetailsAsJson = objectUnderTest.createSubscriptionDetailsAsJson(myDataNodeSelector, subscriptionIds, status.name())
when: 'the method to update subscription status is called'
objectUnderTest.updateCmSubscriptionStatus(myDataNodeSelector, status)
then: 'data service method to update list of subscribers is called once'
1 * mockCpsDataService.updateNodeLeaves('NCMP-Admin', 'cm-data-job-subscriptions', PARENT_NODE_XPATH, subscriptionDetailsAsJson, _, _)
-
}
-
}