avc:
cm-subscription-ncmp-in: ${CM_SUBSCRIPTION_NCMP_IN_TOPIC:subscription}
cm-subscription-dmi-in: ${CM_SUBSCRIPTION_DMI_IN_TOPIC:ncmp-dmi-cm-avc-subscription}
- cm-subscription-dmi-out: ${CM_SUBSCRIPTION_DMI_OUT_TOPIC:dmi-ncmp-cm-avc-subscription}
cm-subscription-ncmp-out: ${CM_SUBSCRIPTION_NCMP_OUT_TOPIC:subscription-response}
cm-events-topic: ${NCMP_CM_EVENTS_TOPIC:cm-events}
inventory-events-topic: ncmp-inventory-events
cm-handle-data-sync:
initial-delay-ms: 40000
sleep-time-ms: 30000
- subscription-forwarding:
- dmi-response-timeout-ms: 30000
model-loader:
retry-time-ms: 1000
trust-level:
avc:
cm-subscription-ncmp-in: ${CM_SUBSCRIPTION_NCMP_IN_TOPIC:subscription}
cm-subscription-dmi-in: ${CM_SUBSCRIPTION_DMI_IN_TOPIC:ncmp-dmi-cm-avc-subscription}
- cm-subscription-dmi-out: ${CM_SUBSCRIPTION_DMI_OUT_TOPIC:dmi-ncmp-cm-avc-subscription}
cm-subscription-ncmp-out: ${CM_SUBSCRIPTION_NCMP_OUT_TOPIC:subscription-response}
cm-events-topic: ${NCMP_CM_EVENTS_TOPIC:cm-events}
inventory-events-topic: ncmp-inventory-events
cm-handle-data-sync:
initial-delay-ms: 40000
sleep-time-ms: 30000
- subscription-forwarding:
- dmi-response-timeout-ms: 30000
model-loader:
retry-time-ms: 1000
trust-level:
+++ /dev/null
-{
- "$schema": "https://json-schema.org/draft/2019-09/schema",
- "$id": "urn:cps:org.onap.ncmp.events.subscription:1.0.0",
- "$ref": "#/definitions/DmiInEvent",
- "definitions": {
- "DmiInEvent": {
- "description": "The payload for cm notification subscription event incoming message from NCMP.",
- "type": "object",
- "javaType": "org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DmiInEvent",
- "additionalProperties": false,
- "properties": {
- "data": {
- "$ref": "#/definitions/data"
- }
- },
- "required": [
- "data"
- ]
- },
- "data": {
- "type": "object",
- "description": "Information about the targets and subscription",
- "additionalProperties": false,
- "properties": {
- "cmHandles": {
- "type": "array",
- "items": {
- "type": "object",
- "description": "Details for the target cmhandles",
- "additionalProperties": false,
- "properties": {
- "cmhandleId": {
- "type": "string"
- },
- "privateProperties": {
- "type": "object",
- "existingJavaType": "java.util.Map<String,String>",
- "items": {
- "type": "string"
- }
- }
- }
- }
- },
- "predicates": {
- "type": "array",
- "description": "Additional values to be added into the subscription",
- "items": {
- "type": "object",
- "properties": {
- "targetFilter": {
- "description": "CM Handles to be targeted by the subscription",
- "type": "array",
- "items": {
- "type": "string"
- }
- },
- "scopeFilter": {
- "type": "object",
- "properties": {
- "datastore": {
- "description": "Datastore which is to be used by the subscription",
- "type": "string",
- "enum": ["ncmp-datastore:passthrough-operational", "ncmp-datastore:passthrough-running"]
- },
- "xpathFilter": {
- "description": "Filter to be applied to the CM Handles through this event",
- "type": "array",
- "items": {
- "type": "string"
- }
- }
- },
- "additionalProperties": false,
- "required": [
- "xpathFilter"
- ]
- }
- },
- "additionalProperties": false,
- "required": [
- "targetFilter"
- ]
- },
- "additionalProperties": false
- }
- },
- "required": [
- "cmHandles"
- ]
- }
- }
-}
+++ /dev/null
-{
- "$schema": "https://json-schema.org/draft/2019-09/schema",
- "$id": "urn:cps:org.onap.ncmp.events.subscription:1.0.0",
- "$ref": "#/definitions/DmiOutEvent",
- "definitions": {
- "DmiOutEvent": {
- "description": "The payload for cm notification subscription merge event coming out from DMI Plugin.",
- "type": "object",
- "additionalProperties": false,
- "javaType": "org.onap.cps.ncmp.impl.datajobs.subscription.dmi_to_ncmp.DmiOutEvent",
- "properties": {
- "data": {
- "$ref": "#/definitions/Data"
- }
- },
- "required": [
- "data"
- ],
- "title": "DmiOutEvent"
- },
- "Data": {
- "type": "object",
- "description": "Information about the targets and subscription",
- "additionalProperties": false,
- "properties": {
- "statusCode": {
- "type": "string",
- "format": "integer",
- "description": "The common status as defined in CPS"
- },
- "statusMessage": {
- "type": "string",
- "description": "The common status message as defined in CPS"
- }
- },
- "required": [
- "statusCode",
- "statusMessage"
- ],
- "title": "Data"
- }
- }
-}
+++ /dev/null
-{
- "$id": "urn:cps:org.onap.ncmp.events.subscription:1.0.0",
- "$ref": "#/definitions/NcmpInEvent",
- "$schema": "https://json-schema.org/draft/2019-09/schema",
- "definitions": {
- "NcmpInEvent": {
- "description": "The payload for subscription merge event.",
- "javaType": "org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.NcmpInEvent",
- "properties": {
- "data": {
- "properties": {
- "subscriptionId": {
- "description": "The subscription details.",
- "type": "string"
- },
- "predicates": {
- "type": "array",
- "description": "Additional values to be added into the subscription",
- "items": {
- "type": "object",
- "properties": {
- "targetFilter": {
- "description": "CM Handles to be targeted by the subscription",
- "type": "array",
- "items": {
- "type": "string"
- }
- },
- "scopeFilter": {
- "type": "object",
- "properties": {
- "datastore": {
- "description": "Datastore which is to be used by the subscription",
- "type": "string",
- "enum": ["ncmp-datastore:passthrough-operational", "ncmp-datastore:passthrough-running"]
- },
- "xpathFilter": {
- "description": "Filter to be applied to the CM Handles through this event",
- "type": "array",
- "items": {
- "type": "string"
- }
- }
- },
- "additionalProperties": false,
- "required": [
- "xpathFilter"
- ]
- }
- },
- "additionalProperties": false,
- "required": [
- "targetFilter"
- ]
- },
- "additionalProperties": false
- }
- },
- "required": [
- "subscriptionId"
- ],
- "type": "object",
- "additionalProperties": false
- }
- },
- "type": "object",
- "additionalProperties": false,
- "required": [
- "data"
- ]
- }
- }
-}
+++ /dev/null
-{
- "$schema": "https://json-schema.org/draft/2019-09/schema",
- "$id": "urn:cps:org.onap.ncmp.events.subscription:1.0.0",
- "$ref": "#/definitions/NcmpOutEvent",
- "definitions": {
- "NcmpOutEvent": {
- "type": "object",
- "description": "The payload applied cm subscription merge event coming out from NCMP.",
- "javaType": "org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent",
- "additionalProperties": false,
- "properties": {
- "data": {
- "$ref": "#/definitions/Data"
- }
- },
- "required": [
- "data"
- ],
- "title": "NcmpOutEvent"
- },
- "Data": {
- "type": "object",
- "description": "Information about the targets and subscription",
- "additionalProperties": false,
- "properties": {
- "subscriptionId": {
- "type": "string",
- "description": "The unique subscription id"
- },
- "acceptedTargets": {
- "type": "object",
- "existingJavaType": "java.util.Collection<String>",
- "description": "Collection of accepted targets"
- },
- "rejectedTargets": {
- "type": "object",
- "existingJavaType": "java.util.Collection<String>",
- "description": "Collection of rejected targets"
- },
- "pendingTargets": {
- "type": "object",
- "existingJavaType": "java.util.Collection<String>",
- "description": "Collection of pending targets"
- }
- },
- "required": [
- "subscriptionId",
- "acceptedTargets",
- "rejectedTargets",
- "pendingTargets"
- ],
- "title": "Data"
- }
- }
-
-
-}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.datajobs.subscription.cache;
-
-import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.PENDING;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import lombok.RequiredArgsConstructor;
-import org.onap.cps.ncmp.api.data.models.DatastoreType;
-import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.Predicate;
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus;
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails;
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate;
-import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService;
-import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
-import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
-import org.springframework.stereotype.Component;
-
-@Component
-@RequiredArgsConstructor
-public class DmiCacheHandler {
-
- private final CmDataJobSubscriptionPersistenceService cmDataJobSubscriptionPersistenceService;
- private final Map<String, Map<String, DmiCmSubscriptionDetails>> cmNotificationSubscriptionCache;
- private final InventoryPersistence inventoryPersistence;
-
- /**
- * Adds subscription to the subscription cache.
- *
- * @param subscriptionId subscription id
- * @param predicates subscription request predicates
- */
- public void add(final String subscriptionId, final List<Predicate> predicates) {
- cmNotificationSubscriptionCache.put(subscriptionId, createDmiSubscriptionsPerDmi(predicates));
- }
-
- /**
- * Adds subscription to the subscription cache.
- *
- * @param subscriptionId subscription id
- * @param dmiCmSubscriptionDetailsPerDmi map of dmi cm notification subscription details per dmi
- */
- public void add(final String subscriptionId,
- final Map<String, DmiCmSubscriptionDetails>
- dmiCmSubscriptionDetailsPerDmi) {
- cmNotificationSubscriptionCache.put(subscriptionId, dmiCmSubscriptionDetailsPerDmi);
- }
-
- /**
- * Get cm notification subscription cache entry via subscription id.
- *
- * @param subscriptionId subscription id
- * @return map of dmi cm notification subscriptions per dmi
- */
- public Map<String, DmiCmSubscriptionDetails> get(final String subscriptionId) {
- return cmNotificationSubscriptionCache.get(subscriptionId);
- }
-
-
- /**
- * Remove cache entries with CmNotificationSubscriptionStatus ACCEPTED/REJECTED via subscription id.
- *
- * @param subscriptionId subscription id as key in CM notification Subscription cache.
- */
- public void removeAcceptedAndRejectedDmiSubscriptionEntries(final String subscriptionId) {
- final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
- cmNotificationSubscriptionCache.get(subscriptionId);
- final Map<String, DmiCmSubscriptionDetails> updatedDmiSubscriptionsPerDmi =
- dmiSubscriptionsPerDmi.entrySet().stream()
- .filter(dmiCmNotificationSubscription -> !isAcceptedOrRejected(
- dmiCmNotificationSubscription.getValue()))
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- cmNotificationSubscriptionCache.put(subscriptionId, updatedDmiSubscriptionsPerDmi);
- }
-
- /**
- * Creates map of subscription details per DMI.
- *
- * @param predicates CM Subscription Create Request Predicates
- * @return Map of DmiCmNotificationSubscription per DMI plugin
- */
- public Map<String, DmiCmSubscriptionDetails> createDmiSubscriptionsPerDmi(
- final List<Predicate> predicates) {
- final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
- new HashMap<>();
- for (final Predicate requestPredicate : predicates) {
- final List<String> targetFilter = requestPredicate.getTargetFilter();
- final DatastoreType datastoreType = DatastoreType.fromDatastoreName(
- requestPredicate.getScopeFilter().getDatastore().toString());
- final Set<String> xpaths = new HashSet<>(requestPredicate.getScopeFilter().getXpathFilter());
- final Map<String, Set<String>> targetCmHandlesByDmiMap = groupTargetCmHandleIdsByDmi(targetFilter);
- for (final Map.Entry<String, Set<String>> targetCmHandlesByDmi : targetCmHandlesByDmiMap.entrySet()) {
- final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate =
- new DmiCmSubscriptionPredicate(targetCmHandlesByDmi.getValue(),
- datastoreType, xpaths);
- updateDmiSubscriptionDetailsPerDmi(targetCmHandlesByDmi.getKey(),
- dmiCmSubscriptionPredicate,
- dmiSubscriptionsPerDmi);
- }
- }
- return dmiSubscriptionsPerDmi;
- }
-
- /**
- * Update status in map of subscription details per DMI.
- *
- * @param subscriptionId String of subscription id
- * @param dmiServiceName String of dmiServiceName
- * @param status String of status
- */
- public void updateDmiSubscriptionStatus(final String subscriptionId, final String dmiServiceName,
- final CmSubscriptionStatus status) {
- final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
- cmNotificationSubscriptionCache.get(subscriptionId);
- dmiSubscriptionsPerDmi.get(dmiServiceName).setCmSubscriptionStatus(status);
- cmNotificationSubscriptionCache.put(subscriptionId, dmiSubscriptionsPerDmi);
- }
-
- /**
- * Persist map of subscription details per DMI.
- *
- * @param subscriptionId String of subscription id
- * @param dmiServiceName String of dmiServiceName
- */
- public void persistIntoDatabasePerDmi(final String subscriptionId, final String dmiServiceName) {
- final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates =
- cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName)
- .getDmiCmSubscriptionPredicates();
- for (final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate : dmiCmSubscriptionPredicates) {
- final DatastoreType datastoreType = dmiCmSubscriptionPredicate.getDatastoreType();
- final Set<String> cmHandles = dmiCmSubscriptionPredicate.getTargetCmHandleIds();
- for (final String cmHandle : cmHandles) {
- cmDataJobSubscriptionPersistenceService.addSubscription(datastoreType.getDatastoreName(),
- cmHandle, subscriptionId);
- }
- }
- }
-
- /**
- * Remove subscription from database per DMI service name.
- *
- * @param subscriptionId String of subscription id
- * @param dmiServiceName String of dmiServiceName
- */
- public void removeFromDatabase(final String subscriptionId, final String dmiServiceName) {
- final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates =
- cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName)
- .getDmiCmSubscriptionPredicates();
- for (final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate : dmiCmSubscriptionPredicates) {
- final DatastoreType datastoreType = dmiCmSubscriptionPredicate.getDatastoreType();
- final Set<String> cmHandles = dmiCmSubscriptionPredicate.getTargetCmHandleIds();
- for (final String cmHandle : cmHandles) {
- cmDataJobSubscriptionPersistenceService.removeSubscription(datastoreType.getDatastoreName(),
- cmHandle, subscriptionId);
- }
- }
- }
-
- private void updateDmiSubscriptionDetailsPerDmi(
- final String dmiServiceName,
- final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate,
- final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi) {
- if (dmiSubscriptionsPerDmi.containsKey(dmiServiceName)) {
- dmiSubscriptionsPerDmi.get(dmiServiceName)
- .getDmiCmSubscriptionPredicates().add(dmiCmSubscriptionPredicate);
- } else {
- dmiSubscriptionsPerDmi.put(dmiServiceName,
- new DmiCmSubscriptionDetails(
- new ArrayList<>(List.of(dmiCmSubscriptionPredicate)),
- PENDING));
- }
- }
-
- private Map<String, Set<String>> groupTargetCmHandleIdsByDmi(final List<String> targetCmHandleIds) {
- final Map<String, Set<String>> targetCmHandlesByDmiServiceNames = new HashMap<>();
- final Collection<YangModelCmHandle> yangModelCmHandles =
- inventoryPersistence.getYangModelCmHandles(targetCmHandleIds);
-
- for (final YangModelCmHandle yangModelCmHandle : yangModelCmHandles) {
- final String dmiServiceName = yangModelCmHandle.getDmiServiceName();
- targetCmHandlesByDmiServiceNames.putIfAbsent(dmiServiceName, new HashSet<>());
- targetCmHandlesByDmiServiceNames.get(dmiServiceName).add(yangModelCmHandle.getId());
- }
- return targetCmHandlesByDmiServiceNames;
- }
-
- private boolean isAcceptedOrRejected(final DmiCmSubscriptionDetails dmiCmSubscription) {
- return dmiCmSubscription.getCmSubscriptionStatus().toString().equals("ACCEPTED")
- || dmiCmSubscription.getCmSubscriptionStatus().toString().equals("REJECTED");
- }
-}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.datajobs.subscription.dmi;
-
-import static org.onap.cps.ncmp.api.data.models.DatastoreType.fromDatastoreName;
-import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.PENDING;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import org.onap.cps.ncmp.api.data.models.DatastoreType;
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails;
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionKey;
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate;
-import org.springframework.stereotype.Component;
-
-@Component
-public class DmiCmSubscriptionDetailsPerDmiMapper {
-
- /**
- * Maps Dmi Subscription Keys grouped by Dmi Plugin to DmiCmSubscriptionDetails per Dmi plugin.
- *
- * @param subscribersPerDmi Details managed by each dmi plugin
- * @return Grouped Dmi Subscription details per dmi plugin
- */
- public Map<String, DmiCmSubscriptionDetails> toDmiCmSubscriptionsPerDmi(
- final Map<String, Collection<DmiCmSubscriptionKey>> subscribersPerDmi) {
-
- final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi = new HashMap<>();
-
- subscribersPerDmi.forEach((dmiPluginName, dmiCmSubscriptionKeys) -> {
- final Map<DatastoreTypeAndXpath, List<DmiCmSubscriptionKey>> groupedByDatastoreTypeAndXpath =
- groupByDatastoreTypeAndXpath(dmiCmSubscriptionKeys);
-
- final List<DmiCmSubscriptionPredicate> dmiSubscriptionPredicates =
- createDmiCmSubscriptionPredicates(groupedByDatastoreTypeAndXpath);
-
- final DmiCmSubscriptionDetails dmiCmSubscriptionDetails =
- new DmiCmSubscriptionDetails(dmiSubscriptionPredicates, PENDING);
-
- dmiSubscriptionsPerDmi.put(dmiPluginName, dmiCmSubscriptionDetails);
- });
-
- return dmiSubscriptionsPerDmi;
- }
-
- private static Map<DatastoreTypeAndXpath, List<DmiCmSubscriptionKey>> groupByDatastoreTypeAndXpath(
- final Collection<DmiCmSubscriptionKey> dmiCmSubscriptionKeys) {
- return dmiCmSubscriptionKeys.stream().collect(Collectors.groupingBy(
- datastoreTypeAndXpath -> new DatastoreTypeAndXpath(
- fromDatastoreName(datastoreTypeAndXpath.datastoreName()), datastoreTypeAndXpath.xpath())));
- }
-
- private static List<DmiCmSubscriptionPredicate> createDmiCmSubscriptionPredicates(
- final Map<DatastoreTypeAndXpath, List<DmiCmSubscriptionKey>> groupedByDatastoreTypeAndXpath) {
- final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates = new ArrayList<>();
-
- for (final Map.Entry<DatastoreTypeAndXpath, List<DmiCmSubscriptionKey>> datastoreTypeXpathGroupEntry :
- groupedByDatastoreTypeAndXpath.entrySet()) {
- final DatastoreTypeAndXpath datastoreTypeAndXpath = datastoreTypeXpathGroupEntry.getKey();
- final Set<String> cmHandleIds = new HashSet<>();
-
- for (final DmiCmSubscriptionKey dmiCmSubscriptionKey : datastoreTypeXpathGroupEntry.getValue()) {
- cmHandleIds.add(dmiCmSubscriptionKey.cmHandleId());
- }
-
- final Set<String> xpaths = Collections.singleton(datastoreTypeAndXpath.xpath());
- dmiCmSubscriptionPredicates.add(
- new DmiCmSubscriptionPredicate(cmHandleIds, datastoreTypeAndXpath.datastoreType(), xpaths));
- }
-
- return dmiCmSubscriptionPredicates;
- }
-
-
- private record DatastoreTypeAndXpath(DatastoreType datastoreType, String xpath) {
- }
-
-}
-
import java.util.Map;
import java.util.Set;
import lombok.RequiredArgsConstructor;
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate;
import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.CmHandle;
import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.Data;
-import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DmiInEvent;
-import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.Predicate;
-import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.ScopeFilter;
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent;
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataSelector;
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.ProductionJobDefinition;
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.TargetSelector;
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
+import org.onap.cps.ncmp.impl.utils.JexParser;
import org.springframework.stereotype.Component;
@Component
private final InventoryPersistence inventoryPersistence;
/**
- * Mapper to form a request for the DMI Plugin for the Cm Notification Subscription.
+ * This method maps relevant details for a subscription to a data job subscription DMI in event.
*
- * @param dmiCmSubscriptionPredicates Collection of Cm Notification Subscription predicates
- * @return DmiInEvent to be sent to DMI Plugin
+ * @param cmHandleIds list of cm handle ID(s)
+ * @param dataNodeSelectors list of data node selectors
+ * @param notificationTypes the list of notification types
+ * @param notificationFilter the notification filter
+ * @return data job subscription DMI in event
*/
- public DmiInEvent toDmiInEvent(final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
- final DmiInEvent dmiInEvent = new DmiInEvent();
- final Data cmSubscriptionData = new Data();
- cmSubscriptionData.setPredicates(mapToDmiInEventPredicates(dmiCmSubscriptionPredicates));
- cmSubscriptionData.setCmHandles(mapToCmSubscriptionCmHandleWithAdditionalProperties(
- extractUniqueCmHandleIds(dmiCmSubscriptionPredicates)));
- dmiInEvent.setData(cmSubscriptionData);
+ public DataJobSubscriptionDmiInEvent toDmiInEvent(final List<String> cmHandleIds,
+ final List<String> dataNodeSelectors,
+ final List<String> notificationTypes,
+ final String notificationFilter) {
+ final DataJobSubscriptionDmiInEvent dmiInEvent = new DataJobSubscriptionDmiInEvent();
+ final Data data = new Data();
+ final String dataNodeSelectorsAsJson = JexParser.toJsonExpressionsAsString(dataNodeSelectors);
+ data.setCmHandles(mapToCmSubscriptionCmHandleWithAdditionalProperties(new HashSet<>(cmHandleIds)));
+ addProductJobDefinition(data, dataNodeSelectorsAsJson);
+ addDataSelector(data, notificationTypes, notificationFilter);
+ dmiInEvent.setData(data);
return dmiInEvent;
-
}
- private List<Predicate> mapToDmiInEventPredicates(
- final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
-
- final List<Predicate> predicates = new ArrayList<>();
-
- dmiCmSubscriptionPredicates.forEach(dmiCmNotificationSubscriptionPredicate -> {
- final Predicate predicate = new Predicate();
- final ScopeFilter scopeFilter = new ScopeFilter();
- scopeFilter.setDatastore(ScopeFilter.Datastore.fromValue(
- dmiCmNotificationSubscriptionPredicate.getDatastoreType().getDatastoreName()));
- scopeFilter.setXpathFilter(dmiCmNotificationSubscriptionPredicate.getXpaths().stream().toList());
- predicate.setScopeFilter(scopeFilter);
- predicate.setTargetFilter(dmiCmNotificationSubscriptionPredicate.getTargetCmHandleIds().stream().toList());
- predicates.add(predicate);
- });
-
- return predicates;
+ private static void addProductJobDefinition(final Data data, final String dataNodeSelector) {
+ data.setProductionJobDefinition(new ProductionJobDefinition());
+ data.getProductionJobDefinition().setTargetSelector(new TargetSelector());
+ data.getProductionJobDefinition().getTargetSelector().setDataNodeSelector(dataNodeSelector);
+ }
+ private static void addDataSelector(final Data data, final List<String> notificationTypes,
+ final String notificationFilter) {
+ data.getProductionJobDefinition().setDataSelector(new DataSelector());
+ data.getProductionJobDefinition().getDataSelector().setNotificationTypes(notificationTypes);
+ data.getProductionJobDefinition().getDataSelector().setNotificationFilter(notificationFilter);
}
private List<CmHandle> mapToCmSubscriptionCmHandleWithAdditionalProperties(final Set<String> cmHandleIds) {
}
- private Set<String> extractUniqueCmHandleIds(final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
-
- final Set<String> cmHandleIds = new HashSet<>();
- dmiCmSubscriptionPredicates.forEach(dmiCmNotificationSubscriptionPredicate -> cmHandleIds.addAll(
- dmiCmNotificationSubscriptionPredicate.getTargetCmHandleIds()));
- return cmHandleIds;
- }
-
-
}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.datajobs.subscription.dmi;
-
-import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_ACCEPTED;
-import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_REJECTED;
-import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent;
-
-import io.cloudevents.CloudEvent;
-import java.util.Map;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.onap.cps.ncmp.api.NcmpResponseStatus;
-import org.onap.cps.ncmp.impl.datajobs.subscription.cache.DmiCacheHandler;
-import org.onap.cps.ncmp.impl.datajobs.subscription.dmi_to_ncmp.Data;
-import org.onap.cps.ncmp.impl.datajobs.subscription.dmi_to_ncmp.DmiOutEvent;
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus;
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails;
-import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.NcmpOutEventMapper;
-import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.NcmpOutEventProducer;
-import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.stereotype.Component;
-
-@Component
-@Slf4j
-@RequiredArgsConstructor
-@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
-public class DmiOutEventConsumer {
-
- private final DmiCacheHandler dmiCacheHandler;
- private final NcmpOutEventProducer ncmpOutEventProducer;
- private final NcmpOutEventMapper ncmpOutEventMapper;
-
- private static final String CM_SUBSCRIPTION_CORRELATION_ID_SEPARATOR = "#";
-
- /**
- * Consume the Cm Notification Subscription event from the dmi-plugin.
- *
- * @param dmiOutEventAsConsumerRecord the event to be consumed
- */
- @KafkaListener(topics = "${app.ncmp.avc.cm-subscription-dmi-out}",
- containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
- public void consumeDmiOutEvent(final ConsumerRecord<String, CloudEvent> dmiOutEventAsConsumerRecord) {
- final CloudEvent cloudEvent = dmiOutEventAsConsumerRecord.value();
- final DmiOutEvent dmiOutEvent = toTargetEvent(cloudEvent, DmiOutEvent.class);
- final String correlationId = String.valueOf(cloudEvent.getExtension("correlationid"));
- if (dmiOutEvent != null && correlationId != null) {
- final String eventType = cloudEvent.getType();
- handleDmiOutEvent(correlationId, eventType, dmiOutEvent);
- }
- }
-
- private void handleDmiOutEvent(final String correlationId, final String eventType,
- final DmiOutEvent dmiOutEvent) {
- final String subscriptionId = correlationId.split(CM_SUBSCRIPTION_CORRELATION_ID_SEPARATOR)[0];
- final String dmiPluginName = correlationId.split(CM_SUBSCRIPTION_CORRELATION_ID_SEPARATOR)[1];
-
- if (checkStatusCodeAndMessage(CM_DATA_SUBSCRIPTION_ACCEPTED, dmiOutEvent.getData())) {
- handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmSubscriptionStatus.ACCEPTED);
- if (eventType.equals("subscriptionCreateResponse")) {
- dmiCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName);
- }
- if (eventType.equals("subscriptionDeleteResponse")) {
- dmiCacheHandler.removeFromDatabase(subscriptionId, dmiPluginName);
- }
- handleEventsStatusPerDmi(subscriptionId, eventType);
- }
-
- if (checkStatusCodeAndMessage(CM_DATA_SUBSCRIPTION_REJECTED, dmiOutEvent.getData())) {
- handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmSubscriptionStatus.REJECTED);
- handleEventsStatusPerDmi(subscriptionId, eventType);
- }
-
- log.info("Cm Subscription with id : {} handled by the dmi-plugin : {} has the status : {}", subscriptionId,
- dmiPluginName, dmiOutEvent.getData().getStatusMessage());
- }
-
- private void handleCacheStatusPerDmi(final String subscriptionId, final String dmiPluginName,
- final CmSubscriptionStatus cmSubscriptionStatus) {
- dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiPluginName,
- cmSubscriptionStatus);
- }
-
- private void handleEventsStatusPerDmi(final String subscriptionId, final String eventType) {
- final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi = dmiCacheHandler.get(subscriptionId);
- final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEvent(subscriptionId, dmiSubscriptionsPerDmi);
- ncmpOutEventProducer.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false);
- }
-
- private boolean checkStatusCodeAndMessage(final NcmpResponseStatus ncmpResponseStatus,
- final Data dmiOutData) {
- return ncmpResponseStatus.getCode().equals(dmiOutData.getStatusCode())
- && ncmpResponseStatus.getMessage().equals(dmiOutData.getStatusMessage());
- }
-}
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.onap.cps.events.EventsProducer;
-import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DmiInEvent;
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent;
import org.onap.cps.ncmp.utils.events.NcmpEvent;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@Component
@RequiredArgsConstructor
@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
-public class DmiInEventProducer {
+public class EventProducer {
private final EventsProducer<CloudEvent> eventsProducer;
* @param subscriptionId CM subscription id
* @param dmiPluginName Dmi plugin Name
* @param eventType Type of event
- * @param dmiInEvent Cm Notification Subscription event for Dmi
+ * @param event Cm Notification Subscription event for Dmi
*/
- public void sendDmiInEvent(final String subscriptionId, final String dmiPluginName,
- final String eventType, final DmiInEvent dmiInEvent) {
+ public void send(final String subscriptionId, final String dmiPluginName,
+ final String eventType, final DataJobSubscriptionDmiInEvent event) {
eventsProducer.sendCloudEvent(dmiInEventTopic, subscriptionId,
- buildAndGetDmiInEventAsCloudEvent(subscriptionId, dmiPluginName, eventType, dmiInEvent));
+ toCloudEvent(eventType, event, subscriptionId, dmiPluginName));
}
- private CloudEvent buildAndGetDmiInEventAsCloudEvent(final String subscriptionId, final String dmiPluginName,
- final String eventType, final DmiInEvent dmiInEvent) {
+ private CloudEvent toCloudEvent(final String eventType, final DataJobSubscriptionDmiInEvent event,
+ final String subscriptionId, final String dmiPluginName) {
return NcmpEvent.builder()
.type(eventType)
.dataSchema(SUBSCRIPTIONS_V1.getDataSchema())
+ .data(event)
.extensions(Map.of("correlationid", String.join("#", subscriptionId, dmiPluginName)))
- .data(dmiInEvent)
.build()
.asCloudEvent();
}
public enum CmSubscriptionStatus {
ACCEPTED,
REJECTED,
- PENDING
+ UNKNOWN
}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import lombok.RequiredArgsConstructor;
-import org.onap.cps.ncmp.api.data.models.DatastoreType;
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate;
-import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService;
-import org.springframework.stereotype.Component;
-
-@Component
-@RequiredArgsConstructor
-public class CmSubscriptionComparator {
-
- private final CmDataJobSubscriptionPersistenceService cmDataJobSubscriptionPersistenceService;
-
- /**
- * Get the new Dmi Predicates for a given predicates list.
- *
- * @param existingDmiCmSubscriptionPredicates list of DmiCmNotificationSubscriptionPredicates
- * @return new list of DmiCmNotificationSubscriptionPredicates
- */
- public List<DmiCmSubscriptionPredicate> getNewDmiSubscriptionPredicates(
- final List<DmiCmSubscriptionPredicate> existingDmiCmSubscriptionPredicates) {
- final List<DmiCmSubscriptionPredicate> newDmiCmSubscriptionPredicates =
- new ArrayList<>();
- for (final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate : existingDmiCmSubscriptionPredicates) {
- final Set<String> targetCmHandleIds = new HashSet<>();
- final Set<String> xpaths = new HashSet<>();
- final DatastoreType datastoreType = dmiCmSubscriptionPredicate.getDatastoreType();
- for (final String cmHandleId : dmiCmSubscriptionPredicate.getTargetCmHandleIds()) {
- if (!cmDataJobSubscriptionPersistenceService.hasAtLeastOneSubscription(
- datastoreType.getDatastoreName(), cmHandleId)) {
- targetCmHandleIds.add(cmHandleId);
- }
- }
- populateValidDmiSubscriptionPredicates(targetCmHandleIds, xpaths, datastoreType,
- newDmiCmSubscriptionPredicates);
- }
- return newDmiCmSubscriptionPredicates;
- }
-
- private void populateValidDmiSubscriptionPredicates(final Set<String> targetCmHandleIds,
- final Set<String> xpaths,
- final DatastoreType datastoreType,
- final List<DmiCmSubscriptionPredicate>
- dmiCmSubscriptionPredicates) {
- if (!targetCmHandleIds.isEmpty()) {
- final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate =
- new DmiCmSubscriptionPredicate(targetCmHandleIds, datastoreType, xpaths);
- dmiCmSubscriptionPredicates.add(dmiCmSubscriptionPredicate);
- }
- }
-
-}
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
+ * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp;
import java.util.List;
-import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.Predicate;
+import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataSelector;
public interface CmSubscriptionHandler {
/**
* Process cm notification subscription create request.
*
- * @param subscriptionId subscription id
- * @param predicates subscription predicates
+ * @param dataSelector subscription data selector
+ * @param subscriptionId subscription id
+ * @param dataNodeSelectors subscription data node selectors
*/
- void processSubscriptionCreateRequest(final String subscriptionId, final List<Predicate> predicates);
-
- /**
- * Process cm notification subscription delete request.
- *
- * @param subscriptionId subscription id
- */
- void processSubscriptionDeleteRequest(final String subscriptionId);
+ void processSubscriptionCreate(final DataSelector dataSelector, final String subscriptionId,
+ final List<String> dataNodeSelectors);
}
package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
-import org.onap.cps.api.model.DataNode;
-import org.onap.cps.ncmp.impl.datajobs.subscription.cache.DmiCacheHandler;
-import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.Predicate;
-import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiCmSubscriptionDetailsPerDmiMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataSelector;
import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventMapper;
-import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventProducer;
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus;
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails;
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionKey;
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate;
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionTuple;
-import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent;
-import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DmiInEvent;
+import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.EventProducer;
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent;
import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService;
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
+import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
+import org.onap.cps.ncmp.impl.utils.AlternateIdMatcher;
+import org.onap.cps.ncmp.impl.utils.JexParser;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
+@Slf4j
@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler {
- private static final Pattern SUBSCRIPTION_KEY_FROM_XPATH_PATTERN = Pattern.compile(
- "^/datastores/datastore\\[@name='([^']*)']/cm-handles/cm-handle\\[@id='([^']*)']/"
- + "filters/filter\\[@xpath='(.*)']$");
-
private final CmDataJobSubscriptionPersistenceService cmDataJobSubscriptionPersistenceService;
- private final CmSubscriptionComparator cmSubscriptionComparator;
- private final NcmpOutEventMapper ncmpOutEventMapper;
private final DmiInEventMapper dmiInEventMapper;
- private final DmiCmSubscriptionDetailsPerDmiMapper dmiCmSubscriptionDetailsPerDmiMapper;
- private final NcmpOutEventProducer ncmpOutEventProducer;
- private final DmiInEventProducer dmiInEventProducer;
- private final DmiCacheHandler dmiCacheHandler;
+ private final EventProducer eventProducer;
private final InventoryPersistence inventoryPersistence;
+ private final AlternateIdMatcher alternateIdMatcher;
@Override
- public void processSubscriptionCreateRequest(final String subscriptionId, final List<Predicate> predicates) {
- if (cmDataJobSubscriptionPersistenceService.isNewSubscriptionId(subscriptionId)) {
- dmiCacheHandler.add(subscriptionId, predicates);
- handleNewCmSubscription(subscriptionId);
- scheduleNcmpOutEventResponse(subscriptionId, "subscriptionCreateResponse");
- } else {
- rejectAndSendCreateRequest(subscriptionId, predicates);
+ public void processSubscriptionCreate(final DataSelector dataSelector,
+ final String subscriptionId, final List<String> dataNodeSelectors) {
+ for (final String dataNodeSelector : dataNodeSelectors) {
+ cmDataJobSubscriptionPersistenceService.add(subscriptionId, dataNodeSelector);
}
+ sendCreateEventToDmis(subscriptionId, dataSelector);
}
- @Override
- public void processSubscriptionDeleteRequest(final String subscriptionId) {
- final Collection<DataNode> subscriptionDataNodes =
- cmDataJobSubscriptionPersistenceService.getAffectedDataNodes(subscriptionId);
- final DmiCmSubscriptionTuple dmiCmSubscriptionTuple =
- getLastRemainingAndOverlappingSubscriptionsPerDmi(subscriptionDataNodes);
- dmiCacheHandler.add(subscriptionId, mergeDmiCmSubscriptionDetailsPerDmiMaps(dmiCmSubscriptionTuple));
- if (dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi().isEmpty()) {
- acceptAndSendDeleteRequest(subscriptionId);
- } else {
- sendSubscriptionDeleteRequestToDmi(subscriptionId,
- dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi(
- dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi()));
- scheduleNcmpOutEventResponse(subscriptionId, "subscriptionDeleteResponse");
+ private void sendCreateEventToDmis(final String subscriptionId, final DataSelector dataSelector) {
+ final List<String> dataNodeSelectors =
+ cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors(subscriptionId);
+ final Map<String, CmHandleIdsAndDataNodeSelectors> cmHandleIdsAndDataNodeSelectorsPerDmi =
+ createDmiInEventTargetsPerDmi(dataNodeSelectors);
+
+ for (final Map.Entry<String, CmHandleIdsAndDataNodeSelectors> cmHandleIdsAndDataNodeSelectorsEntry :
+ cmHandleIdsAndDataNodeSelectorsPerDmi.entrySet()) {
+ final String dmiServiceName = cmHandleIdsAndDataNodeSelectorsEntry.getKey();
+ final CmHandleIdsAndDataNodeSelectors cmHandleIdsAndDataNodeSelectors =
+ cmHandleIdsAndDataNodeSelectorsEntry.getValue();
+ final DataJobSubscriptionDmiInEvent dmiInEvent =
+ buildDmiInEvent(cmHandleIdsAndDataNodeSelectors, dataSelector);
+ eventProducer.send(subscriptionId, dmiServiceName, "subscriptionCreateRequest", dmiInEvent);
}
}
- private Map<String, DmiCmSubscriptionDetails> mergeDmiCmSubscriptionDetailsPerDmiMaps(
- final DmiCmSubscriptionTuple dmiCmSubscriptionTuple) {
- final Map<String, DmiCmSubscriptionDetails> lastRemainingDmiSubscriptionsPerDmi =
- dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi(
- dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi());
- final Map<String, DmiCmSubscriptionDetails> overlappingDmiSubscriptionsPerDmi =
- dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi(
- dmiCmSubscriptionTuple.overlappingSubscriptionsPerDmi());
- final Map<String, DmiCmSubscriptionDetails> mergedDmiSubscriptionsPerDmi =
- new HashMap<>(lastRemainingDmiSubscriptionsPerDmi);
- overlappingDmiSubscriptionsPerDmi.forEach((dmiServiceName, dmiCmSubscriptionDetails) ->
- mergedDmiSubscriptionsPerDmi.merge(dmiServiceName, dmiCmSubscriptionDetails,
- this::mergeDmiCmSubscriptionDetails));
- return mergedDmiSubscriptionsPerDmi;
- }
-
- private DmiCmSubscriptionDetails mergeDmiCmSubscriptionDetails(
- final DmiCmSubscriptionDetails dmiCmSubscriptionDetails,
- final DmiCmSubscriptionDetails otherDmiCmSubscriptionDetails) {
- final List<DmiCmSubscriptionPredicate> mergedDmiCmSubscriptionPredicates =
- new ArrayList<>(dmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates());
- mergedDmiCmSubscriptionPredicates.addAll(otherDmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates());
- return new DmiCmSubscriptionDetails(mergedDmiCmSubscriptionPredicates, CmSubscriptionStatus.PENDING);
- }
- private void scheduleNcmpOutEventResponse(final String subscriptionId, final String eventType) {
- ncmpOutEventProducer.sendNcmpOutEvent(subscriptionId, eventType, null, true);
+ private DataJobSubscriptionDmiInEvent buildDmiInEvent(
+ final CmHandleIdsAndDataNodeSelectors cmHandleIdsAndDataNodeSelectors,
+ final DataSelector dataSelector) {
+ final List<String> cmHandleIds = new ArrayList<>(cmHandleIdsAndDataNodeSelectors.cmHandleIds);
+ final List<String> dataNodeSelectors = new ArrayList<>(cmHandleIdsAndDataNodeSelectors.dataNodeSelectors);
+ final List<String> notificationTypes = dataSelector.getNotificationTypes();
+ final String notificationFilter = dataSelector.getNotificationFilter();
+ return dmiInEventMapper.toDmiInEvent(cmHandleIds, dataNodeSelectors, notificationTypes, notificationFilter);
}
- private void rejectAndSendCreateRequest(final String subscriptionId, final List<Predicate> predicates) {
- final Set<String> subscriptionTargetFilters =
- predicates.stream().flatMap(predicate -> predicate.getTargetFilter().stream())
- .collect(Collectors.toSet());
- final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEventForRejectedRequest(subscriptionId,
- new ArrayList<>(subscriptionTargetFilters));
- ncmpOutEventProducer.sendNcmpOutEvent(subscriptionId, "subscriptionCreateResponse", ncmpOutEvent, false);
- }
-
- private void acceptAndSendDeleteRequest(final String subscriptionId) {
- final Set<String> dmiServiceNames = dmiCacheHandler.get(subscriptionId).keySet();
- for (final String dmiServiceName : dmiServiceNames) {
- dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiServiceName,
- CmSubscriptionStatus.ACCEPTED);
- dmiCacheHandler.removeFromDatabase(subscriptionId, dmiServiceName);
- }
- final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEvent(subscriptionId,
- dmiCacheHandler.get(subscriptionId));
- ncmpOutEventProducer.sendNcmpOutEvent(subscriptionId, "subscriptionDeleteResponse", ncmpOutEvent,
- false);
- }
-
- private void handleNewCmSubscription(final String subscriptionId) {
- final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
- dmiCacheHandler.get(subscriptionId);
- dmiSubscriptionsPerDmi.forEach((dmiPluginName, dmiSubscriptionDetails) -> {
- final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates =
- cmSubscriptionComparator.getNewDmiSubscriptionPredicates(
- dmiSubscriptionDetails.getDmiCmSubscriptionPredicates());
-
- if (dmiCmSubscriptionPredicates.isEmpty()) {
- acceptAndPersistCmSubscriptionPerDmi(subscriptionId, dmiPluginName);
+ private Map<String, CmHandleIdsAndDataNodeSelectors> createDmiInEventTargetsPerDmi(
+ final List<String> dataNodeSelectors) {
+ final Map<String, CmHandleIdsAndDataNodeSelectors> dmiInEventTargetsPerDmi = new HashMap<>();
+ for (final String dataNodeSelector : dataNodeSelectors) {
+ final String cmHandleId = getCmHandleId(dataNodeSelector);
+ if (cmHandleId == null) {
+ log.info("Failed to resolve cm handle ID for dataNodeSelector {}", dataNodeSelector);
} else {
- sendDmiInEventPerDmi(subscriptionId, dmiPluginName, dmiCmSubscriptionPredicates);
+ final String dmiServiceName = getDmiServiceName(cmHandleId);
+ final CmHandleIdsAndDataNodeSelectors cmHandleIdsAndDataNodeSelectors;
+ if (dmiInEventTargetsPerDmi.get(dmiServiceName) == null) {
+ cmHandleIdsAndDataNodeSelectors =
+ new CmHandleIdsAndDataNodeSelectors(new HashSet<>(), new HashSet<>());
+ dmiInEventTargetsPerDmi.put(dmiServiceName, cmHandleIdsAndDataNodeSelectors);
+ } else {
+ cmHandleIdsAndDataNodeSelectors = dmiInEventTargetsPerDmi.get(dmiServiceName);
+ }
+ cmHandleIdsAndDataNodeSelectors.cmHandleIds.add(cmHandleId);
+ cmHandleIdsAndDataNodeSelectors.dataNodeSelectors.add(dataNodeSelector);
}
- });
- }
-
- private void sendDmiInEventPerDmi(final String subscriptionId, final String dmiPluginName,
- final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
- final DmiInEvent dmiInEvent = dmiInEventMapper.toDmiInEvent(dmiCmSubscriptionPredicates);
- dmiInEventProducer.sendDmiInEvent(subscriptionId, dmiPluginName,
- "subscriptionCreateRequest", dmiInEvent);
- }
-
- private void acceptAndPersistCmSubscriptionPerDmi(final String subscriptionId, final String dmiPluginName) {
- dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiPluginName,
- CmSubscriptionStatus.ACCEPTED);
- dmiCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName);
- }
-
- private void sendSubscriptionDeleteRequestToDmi(final String subscriptionId,
- final Map<String, DmiCmSubscriptionDetails>
- dmiCmSubscriptionsPerDmi) {
- dmiCmSubscriptionsPerDmi.forEach((dmiPluginName, dmiCmSubscriptionDetails) -> {
- final DmiInEvent dmiInEvent =
- dmiInEventMapper.toDmiInEvent(
- dmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates());
- dmiInEventProducer.sendDmiInEvent(subscriptionId,
- dmiPluginName, "subscriptionDeleteRequest", dmiInEvent);
- });
+ }
+ return dmiInEventTargetsPerDmi;
}
-
- private DmiCmSubscriptionTuple getLastRemainingAndOverlappingSubscriptionsPerDmi(
- final Collection<DataNode> subscriptionNodes) {
- final Map<String, Collection<DmiCmSubscriptionKey>> lastRemainingSubscriptionsPerDmi = new HashMap<>();
- final Map<String, Collection<DmiCmSubscriptionKey>> overlappingSubscriptionsPerDmi = new HashMap<>();
-
- for (final DataNode subscriptionNode : subscriptionNodes) {
- final DmiCmSubscriptionKey dmiCmSubscriptionKey = extractCmSubscriptionKey(subscriptionNode.getXpath());
- final String dmiServiceName = inventoryPersistence.getYangModelCmHandle(
- dmiCmSubscriptionKey.cmHandleId()).getDmiServiceName();
- @SuppressWarnings("unchecked") final List<String> subscribers =
- (List<String>) subscriptionNode.getLeaves().get("subscriptionIds");
- populateDmiCmSubscriptionTuple(subscribers, overlappingSubscriptionsPerDmi,
- lastRemainingSubscriptionsPerDmi, dmiServiceName, dmiCmSubscriptionKey);
+ private String getCmHandleId(final String dataNodeSelector) {
+ final String alternateId = JexParser.extractFdnPrefix(dataNodeSelector).orElse("");
+ if (alternateId.isEmpty()) {
+ return null;
}
- return new DmiCmSubscriptionTuple(lastRemainingSubscriptionsPerDmi, overlappingSubscriptionsPerDmi);
+ return alternateIdMatcher.getCmHandleId(alternateId);
}
- private static void populateDmiCmSubscriptionTuple(final List<String> subscribers,
- final Map<String, Collection<DmiCmSubscriptionKey>>
- overlappingSubscriptionsPerDmi,
- final Map<String, Collection<DmiCmSubscriptionKey>>
- lastRemainingSubscriptionsPerDmi,
- final String dmiServiceName,
- final DmiCmSubscriptionKey dmiCmSubscriptionKey) {
- final Map<String, Collection<DmiCmSubscriptionKey>> targetMap =
- subscribers.size() > 1 ? overlappingSubscriptionsPerDmi : lastRemainingSubscriptionsPerDmi;
- targetMap.computeIfAbsent(dmiServiceName, dmiName -> new HashSet<>()).add(dmiCmSubscriptionKey);
+ private String getDmiServiceName(final String cmHandleId) {
+ final YangModelCmHandle yangModelCmHandle = inventoryPersistence.getYangModelCmHandle(cmHandleId);
+ return yangModelCmHandle.getDmiServiceName();
}
- private DmiCmSubscriptionKey extractCmSubscriptionKey(final String xpath) {
- final Matcher matcher = SUBSCRIPTION_KEY_FROM_XPATH_PATTERN.matcher(xpath);
- if (matcher.find()) {
- final String datastoreName = matcher.group(1);
- final String cmHandleId = matcher.group(2);
- final String filterXpath = matcher.group(3);
- return new DmiCmSubscriptionKey(datastoreName, cmHandleId, filterXpath);
- }
- throw new IllegalArgumentException("DataNode xpath does not represent a subscription key");
- }
+ private record CmHandleIdsAndDataNodeSelectors(Set<String> cmHandleIds, Set<String> dataNodeSelectors) {}
}
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataJob;
import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataJobSubscriptionOperationInEvent;
+import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataSelector;
import org.onap.cps.ncmp.impl.utils.JexParser;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
public class NcmpInEventConsumer {
+ private final CmSubscriptionHandler cmSubscriptionHandler;
+
/**
* Consume the specified event.
*
+ "org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataJobSubscriptionOperationInEvent"})
public void consumeSubscriptionEvent(
final DataJobSubscriptionOperationInEvent dataJobSubscriptionOperationInEvent) {
-
final String eventType = dataJobSubscriptionOperationInEvent.getEventType();
- final String dataNodeSelector = dataJobSubscriptionOperationInEvent.getEvent().getDataJob()
- .getProductionJobDefinition().getTargetSelector().getDataNodeSelector();
- final List<String> fdns = JexParser.toXpaths(dataNodeSelector);
final String dataJobId = dataJobSubscriptionOperationInEvent.getEvent().getDataJob().getId();
- final String dataTypeId = dataJobSubscriptionOperationInEvent.getEvent().getDataType() != null
- ? dataJobSubscriptionOperationInEvent.getEvent().getDataType().getDataTypeId() : "UNKNOWN";
- log.info("Consumed subscription event with details: | jobId={} | eventType={} | fdns={} | dataType={}",
- dataJobId, eventType, fdns, dataTypeId);
+ log.info("Consumed subscription event with details: | dataJobId={} | eventType={}", dataJobId, eventType);
+
+ if (eventType.equals("dataJobCreated")) {
+ final DataJob dataJob = dataJobSubscriptionOperationInEvent.getEvent().getDataJob();
+ final String dataNodeSelector =
+ dataJob.getProductionJobDefinition().getTargetSelector().getDataNodeSelector();
+ final List<String> dataNodeSelectors = JexParser.toXpaths(dataNodeSelector);
+ final DataSelector dataSelector = dataJobSubscriptionOperationInEvent.getEvent().getDataJob()
+ .getProductionJobDefinition().getDataSelector();
+ cmSubscriptionHandler.processSubscriptionCreate(dataSelector, dataJobId, dataNodeSelectors);
+ }
}
}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import lombok.RequiredArgsConstructor;
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus;
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails;
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate;
-import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.Data;
-import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent;
-import org.springframework.stereotype.Component;
-
-@Component
-@RequiredArgsConstructor
-public class NcmpOutEventMapper {
-
- /**
- * Mapper to form a response for the client for the Cm Notification Subscription.
- *
- * @param subscriptionId CM notification subscription id
- * @param dmiSubscriptionsPerDmi contains CmNotificationSubscriptionDetails per dmi plugin
- * @return CmNotificationSubscriptionNcmpOutEvent to sent back to the client
- */
- public NcmpOutEvent toNcmpOutEvent(final String subscriptionId,
- final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi) {
-
- final NcmpOutEvent ncmpOutEvent = new NcmpOutEvent();
- final Data cmSubscriptionData = new Data();
- cmSubscriptionData.setSubscriptionId(subscriptionId);
- populateNcmpOutEventWithCmHandleIds(dmiSubscriptionsPerDmi,
- cmSubscriptionData);
- ncmpOutEvent.setData(cmSubscriptionData);
-
- return ncmpOutEvent;
- }
-
- /**
- * Mapper to form a rejected response for the client for the Cm Notification Subscription Request.
- *
- * @param subscriptionId subscription id
- * @param rejectedTargetFilters list of rejected target filters for the subscription request
- * @return to sent back to the client
- */
- public NcmpOutEvent toNcmpOutEventForRejectedRequest(final String subscriptionId,
- final List<String> rejectedTargetFilters) {
- final NcmpOutEvent ncmpOutEvent = new NcmpOutEvent();
- final Data cmSubscriptionData = new Data();
- cmSubscriptionData.setSubscriptionId(subscriptionId);
- cmSubscriptionData.setRejectedTargets(rejectedTargetFilters);
- ncmpOutEvent.setData(cmSubscriptionData);
- return ncmpOutEvent;
- }
-
- private void populateNcmpOutEventWithCmHandleIds(
- final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi,
- final Data cmSubscriptionData) {
-
- final Collection<String> acceptedCmHandleIds = new HashSet<>();
- final Collection<String> pendingCmHandleIds = new HashSet<>();
- final Collection<String> rejectedCmHandleIds = new HashSet<>();
-
- dmiSubscriptionsPerDmi.forEach((dmiPluginName, dmiSubscriptionDetails) -> {
- final CmSubscriptionStatus cmSubscriptionStatus =
- dmiSubscriptionDetails.getCmSubscriptionStatus();
- final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates =
- dmiSubscriptionDetails.getDmiCmSubscriptionPredicates();
-
- switch (cmSubscriptionStatus) {
- case ACCEPTED -> acceptedCmHandleIds.addAll(
- extractCmHandleIds(dmiCmSubscriptionPredicates));
- case PENDING -> pendingCmHandleIds.addAll(extractCmHandleIds(dmiCmSubscriptionPredicates));
- default -> rejectedCmHandleIds.addAll(extractCmHandleIds(dmiCmSubscriptionPredicates));
- }
- });
-
- cmSubscriptionData.setAcceptedTargets(acceptedCmHandleIds);
- cmSubscriptionData.setPendingTargets(pendingCmHandleIds);
- cmSubscriptionData.setRejectedTargets(rejectedCmHandleIds);
-
- }
-
- private List<String> extractCmHandleIds(
- final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
- final List<String> cmHandleIds = new ArrayList<>();
- dmiCmSubscriptionPredicates.forEach(dmiSubscriptionPredicate -> cmHandleIds.addAll(
- dmiSubscriptionPredicate.getTargetCmHandleIds()));
-
- return cmHandleIds;
- }
-
-}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp;
-
-import static org.onap.cps.ncmp.events.NcmpEventDataSchema.SUBSCRIPTIONS_V1;
-
-import io.cloudevents.CloudEvent;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.events.EventsProducer;
-import org.onap.cps.ncmp.impl.datajobs.subscription.cache.DmiCacheHandler;
-import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent;
-import org.onap.cps.ncmp.utils.events.NcmpEvent;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.stereotype.Component;
-
-@Component
-@Slf4j
-@RequiredArgsConstructor
-@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
-public class NcmpOutEventProducer {
-
- @Value("${app.ncmp.avc.cm-subscription-ncmp-out}")
- private String ncmpOutEventTopic;
-
- @Value("${ncmp.timers.subscription-forwarding.dmi-response-timeout-ms}")
- private Integer dmiOutEventTimeoutInMs;
-
- private final EventsProducer<CloudEvent> eventsProducer;
- private final NcmpOutEventMapper ncmpOutEventMapper;
- private final DmiCacheHandler dmiCacheHandler;
- private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
- private static final Map<String, ScheduledFuture<?>> scheduledTasksPerSubscriptionIdAndEventType =
- new ConcurrentHashMap<>();
-
- /**
- * Send the event to the client who requested the subscription with key as subscription id and event is Cloud
- * Event compliant.
- *
- * @param subscriptionId CM subscription id
- * @param eventType Type of event
- * @param ncmpOutEvent Cm Notification Subscription Event for the
- * client
- * @param isScheduledEvent Determines if the event is to be scheduled
- * or send now
- */
- public void sendNcmpOutEvent(final String subscriptionId, final String eventType,
- final NcmpOutEvent ncmpOutEvent, final boolean isScheduledEvent) {
-
- final String taskKey = subscriptionId.concat(eventType);
-
- if (isScheduledEvent && !scheduledTasksPerSubscriptionIdAndEventType.containsKey(taskKey)) {
- final ScheduledFuture<?> scheduledFuture = scheduleAndSendNcmpOutEvent(subscriptionId, eventType);
- scheduledTasksPerSubscriptionIdAndEventType.putIfAbsent(taskKey, scheduledFuture);
- log.debug("Scheduled the Cm Subscription Event for subscriptionId : {} and eventType : {}", subscriptionId,
- eventType);
- } else {
- cancelScheduledTask(taskKey);
- if (ncmpOutEvent != null) {
- sendNcmpOutEventNow(subscriptionId, eventType, ncmpOutEvent);
- log.debug("Sent Cm Subscription Event on demand for subscriptionId : {} and eventType : {}",
- subscriptionId, eventType);
- }
- }
- }
-
- /**
- * Get an NCMP out event as cloud event.
- *
- * @param subscriptionId subscription id
- * @param eventType event type
- * @param ncmpOutEvent cm notification subscription NCMP out event
- * @return cm notification subscription NCMP out event as cloud event
- */
- public static CloudEvent buildAndGetNcmpOutEventAsCloudEvent(
- final String subscriptionId, final String eventType, final NcmpOutEvent ncmpOutEvent) {
-
- return NcmpEvent.builder()
- .type(eventType)
- .dataSchema(SUBSCRIPTIONS_V1.getDataSchema())
- .extensions(Map.of("correlationid", subscriptionId))
- .data(ncmpOutEvent)
- .build()
- .asCloudEvent();
- }
-
- private ScheduledFuture<?> scheduleAndSendNcmpOutEvent(final String subscriptionId, final String eventType) {
- final NcmpOutEventPublishingTask ncmpOutEventPublishingTask =
- new NcmpOutEventPublishingTask(ncmpOutEventTopic, subscriptionId, eventType, eventsProducer,
- ncmpOutEventMapper, dmiCacheHandler);
- return scheduledExecutorService.schedule(ncmpOutEventPublishingTask, dmiOutEventTimeoutInMs,
- TimeUnit.MILLISECONDS);
- }
-
- private void cancelScheduledTask(final String taskKey) {
- final ScheduledFuture<?> scheduledFuture = scheduledTasksPerSubscriptionIdAndEventType.get(taskKey);
- if (scheduledFuture != null) {
- scheduledFuture.cancel(true);
- scheduledTasksPerSubscriptionIdAndEventType.remove(taskKey);
- }
- }
-
- private void sendNcmpOutEventNow(final String subscriptionId, final String eventType,
- final NcmpOutEvent ncmpOutEvent) {
- final CloudEvent ncmpOutEventAsCloudEvent =
- buildAndGetNcmpOutEventAsCloudEvent(subscriptionId, eventType, ncmpOutEvent);
- eventsProducer.sendCloudEvent(ncmpOutEventTopic, subscriptionId, ncmpOutEventAsCloudEvent);
- dmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId);
- }
-
-}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp;
-
-import static org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.NcmpOutEventProducer.buildAndGetNcmpOutEventAsCloudEvent;
-
-import io.cloudevents.CloudEvent;
-import java.util.Map;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.events.EventsProducer;
-import org.onap.cps.ncmp.impl.datajobs.subscription.cache.DmiCacheHandler;
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails;
-import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent;
-
-@Slf4j
-@RequiredArgsConstructor
-public class NcmpOutEventPublishingTask implements Runnable {
-
- private final String topicName;
- private final String subscriptionId;
- private final String eventType;
- private final EventsProducer<CloudEvent> eventsProducer;
- private final NcmpOutEventMapper ncmpOutEventMapper;
- private final DmiCacheHandler dmiCacheHandler;
-
- /**
- * Delegating the responsibility of sending NcmpOutEvent as a separate task which will
- * be called after a specified delay.
- */
- @Override
- public void run() {
- final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
- dmiCacheHandler.get(subscriptionId);
- final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEvent(subscriptionId,
- dmiSubscriptionsPerDmi);
- eventsProducer.sendCloudEvent(topicName, subscriptionId,
- buildAndGetNcmpOutEventAsCloudEvent(subscriptionId, eventType, ncmpOutEvent));
- dmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId);
- }
-}
package org.onap.cps.ncmp.impl.datajobs.subscription.utils;
import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS;
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.UNKNOWN;
import java.io.Serializable;
import java.time.OffsetDateTime;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@RequiredArgsConstructor
public class CmDataJobSubscriptionPersistenceService {
- private static final String NCMP_DATASPACE_NAME = "NCMP-Admin";
- private static final String CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME = "cm-data-job-subscriptions";
- private static final String CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH = "/dataJob";
- private static final String CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE =
- "/dataJob/subscription[@alternateId='%s' and @dataTypeId='%s']";
+ private static final String DATASPACE = "NCMP-Admin";
+ private static final String ANCHOR = "cm-data-job-subscriptions";
+
+ private static final String PARENT_NODE_XPATH = "/dataJob";
+ private static final String CPS_PATH_FOR_SUBSCRIPTION_NODE = "//subscription";
+ private static final String CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR =
+ CPS_PATH_FOR_SUBSCRIPTION_NODE + "[@dataNodeSelector='%s']";
private static final String CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID =
- "//subscription/dataJobId[text()='%s']";
+ CPS_PATH_FOR_SUBSCRIPTION_NODE + "/dataJobId[text()='%s']";
+ private static final String CPS_PATH_TEMPLATE_FOR_INACTIVE_SUBSCRIPTIONS =
+ CPS_PATH_FOR_SUBSCRIPTION_NODE + "[@status='UNKNOWN' or @status='REJECTED']/dataJobId[text()='%s']";
private final JsonObjectMapper jsonObjectMapper;
private final CpsQueryService cpsQueryService;
private final CpsDataService cpsDataService;
/**
- * Check if we have a cm data job subscription for the given data type and target (FDN).
+ * Check if we have a cm data job subscription for the given data node selector.
*
- * @param dataType the data type of the data job subscription
- * @param alternateId the alternate id target of the data job subscription
+ * @param dataNodeSelector the target of the data job subscription
* @return true if the subscription details has at least one subscriber , otherwise false
*/
- public boolean hasAtLeastOneSubscription(final String dataType, final String alternateId) {
- return !getSubscriptionIds(dataType, alternateId).isEmpty();
+ public boolean hasAtLeastOneSubscription(final String dataNodeSelector) {
+ return !getSubscriptionIds(dataNodeSelector).isEmpty();
}
/**
*/
public boolean isNewSubscriptionId(final String subscriptionId) {
final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID.formatted(subscriptionId);
- return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME,
+ return cpsQueryService.queryDataNodes(DATASPACE, ANCHOR,
query, OMIT_DESCENDANTS).isEmpty();
}
/**
- * Get the ids for the subscriptions for the given data type and targets.
+ * Get the ids for the subscriptions for the given data node selector.
*
- * @param dataType the data type of the data job subscription
- * @param alternateId the alternate id target of the data job subscription
+ * @param dataNodeSelector the target of the data job subscription
* @return collection of subscription ids of ongoing cm notification subscription
*/
@SuppressWarnings("unchecked")
- public Collection<String> getSubscriptionIds(final String dataType, final String alternateId) {
- final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted(
- alternateId, dataType);
+ public Collection<String> getSubscriptionIds(final String dataNodeSelector) {
+ final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR.formatted(dataNodeSelector);
final Collection<DataNode> existingNodes =
- cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME,
+ cpsQueryService.queryDataNodes(DATASPACE, ANCHOR,
query, OMIT_DESCENDANTS);
if (existingNodes.isEmpty()) {
return Collections.emptyList();
}
/**
- * Add cm notification data job subscription.
+ * Get data node selectors for subscriptions with status UNKNOWN or REJECTED.
*
- * @param dataType the data type of the data job subscription
- * @param alternateId the alternate id target of the data job subscription
- * @param subscriptionId data job subscription id to be added
+ * @param subscriptionId subscription ID
+ * @return a list of data node selectors
*/
- public void addSubscription(final String dataType, final String alternateId, final String subscriptionId) {
- final Collection<String> subscriptionIds = getSubscriptionIds(dataType, alternateId);
- if (subscriptionIds.isEmpty()) {
- addNewSubscriptionDetails(dataType, alternateId, subscriptionId);
- } else {
- subscriptionIds.add(subscriptionId);
- updateSubscriptionDetails(subscriptionIds, dataType, alternateId);
+ public List<String> getInactiveDataNodeSelectors(final String subscriptionId) {
+ final String query = CPS_PATH_TEMPLATE_FOR_INACTIVE_SUBSCRIPTIONS.formatted(subscriptionId);
+ final Collection<DataNode> dataNodes = cpsQueryService.queryDataNodes(DATASPACE, ANCHOR, query,
+ OMIT_DESCENDANTS);
+ final List<String> dataNodeSelectors = new ArrayList<>(dataNodes.size());
+ for (final DataNode dataNode : dataNodes) {
+ final String dataNodeSelector = dataNode.getLeaves().get("dataNodeSelector").toString();
+ dataNodeSelectors.add(dataNodeSelector);
}
+ return dataNodeSelectors;
}
/**
- * Remove cm notification data job Subscription.
+ * Add cm notification data job subscription.
*
- * @param dataType the data type of the data job subscription
- * @param alternateId the alternate id target of the data job subscription
- * @param subscriptionId data subscription id to remove
+ * @param subscriptionId data job subscription id to be added
+ * @param dataNodeSelector the target of the data job subscription
*/
- public void removeSubscription(final String dataType, final String alternateId, final String subscriptionId) {
- final Collection<String> subscriptionIds = getSubscriptionIds(dataType, alternateId);
- if (subscriptionIds.remove(subscriptionId)) {
- updateSubscriptionDetails(subscriptionIds, dataType, alternateId);
- log.info("There is at least one subscriber left for dataType {} on {}", dataType, alternateId);
- if (subscriptionIds.isEmpty()) {
- log.info("There are no subscribers left for dataType {} on {}", dataType, alternateId);
- deleteUnusedSubscriptionDetails(dataType, alternateId);
- }
+ public void add(final String subscriptionId, final String dataNodeSelector) {
+ final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR.formatted(dataNodeSelector);
+ final Collection<DataNode> dataNodes = cpsQueryService.queryDataNodes(DATASPACE, ANCHOR, query,
+ OMIT_DESCENDANTS);
+ if (dataNodes.isEmpty()) {
+ addNewSubscriptionDetails(subscriptionId, dataNodeSelector);
+ } else {
+ final Collection<String> subscriptionIds = getSubscriptionIds(dataNodeSelector);
+ final String status = dataNodes.iterator().next().getLeaves().get("status").toString();
+ subscriptionIds.add(subscriptionId);
+ updateSubscriptionDetails(dataNodeSelector, subscriptionIds, status);
}
}
- /**
- * Retrieve all existing data nodes for given data job subscription id.
- *
- * @param subscriptionId data job subscription id
- * @return collection of DataNodes
- */
- public Collection<DataNode> getAffectedDataNodes(final String subscriptionId) {
- final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID.formatted(subscriptionId);
- return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME,
- query, OMIT_DESCENDANTS);
- }
-
- private void deleteUnusedSubscriptionDetails(final String dataType, final String alternateId) {
- final String deleteListOfSubscriptionCpsPathQuery =
- CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted(alternateId,
- dataType);
- cpsDataService.deleteDataNode(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME,
- deleteListOfSubscriptionCpsPathQuery, OffsetDateTime.now());
- }
-
- private void addNewSubscriptionDetails(final String dataType,
- final String alternateId,
- final String subscriptionId) {
+ private void addNewSubscriptionDetails(final String subscriptionId,
+ final String dataNodeSelector) {
final Collection<String> newSubscriptionList = Collections.singletonList(subscriptionId);
- final String subscriptionDetailsAsJson = getSubscriptionDetailsAsJson(newSubscriptionList, dataType,
- alternateId);
- cpsDataService.saveData(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME, subscriptionDetailsAsJson,
+ final String status = UNKNOWN.name();
+ final String subscriptionDetailsAsJson = createSubscriptionDetailsAsJson(dataNodeSelector,
+ newSubscriptionList, status);
+ cpsDataService.saveData(DATASPACE, ANCHOR, subscriptionDetailsAsJson,
OffsetDateTime.now(), ContentType.JSON);
}
- private void updateSubscriptionDetails(final Collection<String> subscriptionIds, final String dataType,
- final String alternateId) {
- final String subscriptionDetailsAsJson = getSubscriptionDetailsAsJson(subscriptionIds, dataType, alternateId);
- cpsDataService.updateNodeLeaves(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME,
- CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH, subscriptionDetailsAsJson, OffsetDateTime.now(),
+ private void updateSubscriptionDetails(final String dataNodeSelector, final Collection<String> subscriptionIds,
+ final String status) {
+ final String subscriptionDetailsAsJson = createSubscriptionDetailsAsJson(dataNodeSelector,
+ subscriptionIds, status);
+ cpsDataService.updateNodeLeaves(DATASPACE, ANCHOR,
+ PARENT_NODE_XPATH, subscriptionDetailsAsJson, OffsetDateTime.now(),
ContentType.JSON);
}
- private String getSubscriptionDetailsAsJson(final Collection<String> subscriptionIds,
- final String dataTypeId,
- final String alternateId) {
+ private String createSubscriptionDetailsAsJson(final String dataNodeSelector,
+ final Collection<String> subscriptionIds,
+ final String status) {
final Map<String, Serializable> subscriptionDetailsAsMap =
- Map.of("dataTypeId", dataTypeId,
- "alternateId", alternateId,
- "dataJobId", (Serializable) subscriptionIds);
+ Map.of("dataNodeSelector", dataNodeSelector,
+ "dataJobId", (Serializable) subscriptionIds,
+ "status", status);
return "{\"subscription\":[" + jsonObjectMapper.asJsonString(subscriptionDetailsAsMap) + "]}";
}
-
}
* @param jsonExpressionsAsString Multi-line jex string.
* @return List of xpaths
*/
- @SuppressWarnings("unused")
public static List<String> toXpaths(final String jsonExpressionsAsString) {
if (jsonExpressionsAsString == null) {
return Collections.emptyList();
* @param xpath A single json expression.
* @return Optional containing resolved fdn if found; empty otherwise.
*/
- @SuppressWarnings("unused")
public static Optional<String> extractFdnPrefix(final String xpath) {
final List<String> xpathSegments = splitIntoXpaths(xpath);
final StringBuilder fdnBuilder = new StringBuilder();
* @param xpaths List of xpath strings to be joined.
* @return A string representing the concatenated json expression.
*/
- @SuppressWarnings("unused")
public static String toJsonExpressionsAsString(final Collection<String> xpaths) {
return String.join(LINE_JOINER_DELIMITER, xpaths);
}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.datajobs.subscription.cache
-
-import com.hazelcast.core.Hazelcast
-import com.hazelcast.map.IMap
-import org.onap.cps.ncmp.api.data.models.DatastoreType
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.boot.test.context.SpringBootTest
-import spock.lang.Specification
-
-@SpringBootTest(classes = [CmSubscriptionConfig])
-class CmSubscriptionConfigSpec extends Specification {
-
- @Autowired
- IMap<String, Map<String, DmiCmSubscriptionDetails>> cmNotificationSubscriptionCache;
-
- def cleanupSpec() {
- Hazelcast.getHazelcastInstanceByName('cps-and-ncmp-hazelcast-instance-test-config').shutdown()
- }
-
- def 'Embedded (hazelcast) cache for Cm Notification Subscription Cache.'() {
- expect: 'system is able to create an instance of the Cm Notification Subscription Cache'
- assert null != cmNotificationSubscriptionCache
- and: 'there is at least 1 instance'
- assert Hazelcast.allHazelcastInstances.size() > 0
- and: 'Cm Notification Subscription Cache is present'
- assert Hazelcast.allHazelcastInstances.name.contains('cps-and-ncmp-hazelcast-instance-test-config')
- }
-
- def 'Provided CM Subscription data'() {
- given: 'a cm subscription properties'
- def subscriptionId = 'sub123'
- def dmiPluginName = 'dummydmi'
- def cmSubscriptionPredicates = new DmiCmSubscriptionPredicate(['cmhandle1', 'cmhandle2'].toSet(), DatastoreType.PASSTHROUGH_RUNNING, ['/a/b/c'].toSet())
- def cmSubscriptionCacheObject = new DmiCmSubscriptionDetails([cmSubscriptionPredicates], CmSubscriptionStatus.PENDING)
- when: 'the cache is populated'
- cmNotificationSubscriptionCache.put(subscriptionId, [(dmiPluginName): cmSubscriptionCacheObject])
- then: 'the values are present in memory'
- assert cmNotificationSubscriptionCache.get(subscriptionId) != null
- and: 'properties match'
- assert dmiPluginName == cmNotificationSubscriptionCache.get(subscriptionId).keySet()[0]
- assert cmSubscriptionCacheObject.cmSubscriptionStatus == cmNotificationSubscriptionCache.get(subscriptionId).values().cmSubscriptionStatus[0]
- assert cmSubscriptionCacheObject.dmiCmSubscriptionPredicates[0].targetCmHandleIds == cmNotificationSubscriptionCache.get(subscriptionId).values().dmiCmSubscriptionPredicates[0].targetCmHandleIds[0]
- }
-}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the 'License');
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an 'AS IS' BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.datajobs.subscription.cache
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import io.cloudevents.CloudEvent
-import io.cloudevents.core.builder.CloudEventBuilder
-import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.NcmpInEvent
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails
-import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService
-import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
-import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
-import org.onap.cps.ncmp.utils.TestUtils
-import org.onap.cps.ncmp.utils.events.MessagingBaseSpec
-import org.onap.cps.utils.JsonObjectMapper
-import org.spockframework.spring.SpringBean
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.boot.test.context.SpringBootTest
-
-import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
-
-@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
-class DmiCacheHandlerSpec extends MessagingBaseSpec {
-
- @Autowired
- JsonObjectMapper jsonObjectMapper
- @Autowired
- ObjectMapper objectMapper
- @SpringBean
- InventoryPersistence mockInventoryPersistence = Mock(InventoryPersistence)
- @SpringBean
- CmDataJobSubscriptionPersistenceService mockCmSubscriptionPersistenceService = Mock(CmDataJobSubscriptionPersistenceService)
-
- def testCache = [:]
- def objectUnderTest = new DmiCacheHandler(mockCmSubscriptionPersistenceService, testCache, mockInventoryPersistence)
-
- NcmpInEvent ncmpInEvent
- def yangModelCmHandle1 = new YangModelCmHandle(id: 'ch1', dmiServiceName: 'dmi-1')
- def yangModelCmHandle2 = new YangModelCmHandle(id: 'ch2', dmiServiceName: 'dmi-2')
- def yangModelCmHandle3 = new YangModelCmHandle(id: 'ch3', dmiServiceName: 'dmi-1')
- def yangModelCmHandle4 = new YangModelCmHandle(id: 'ch4', dmiServiceName: 'dmi-2')
-
- def setup() {
- setUpTestEvent()
- initialiseMockInventoryPersistenceResponses()
- }
-
- def 'Load CM subscription event to cache with predicates'() {
- given: 'a subscription event with id'
- def subscriptionId = ncmpInEvent.getData().getSubscriptionId()
- and: 'list of predicates'
- def predicates = ncmpInEvent.getData().getPredicates()
- when: 'subscription is loaded to cache with predicates'
- objectUnderTest.add(subscriptionId, predicates)
- then: 'the number of entries in cache is correct'
- assert testCache.size() == 1
- and: 'the cache contains the correct entries'
- assert testCache.containsKey(subscriptionId)
- }
-
- def 'Load CM subscription event to cache with dmi subscription details per dmi'() {
- given: 'a subscription event with id'
- def subscriptionId = ncmpInEvent.getData().getSubscriptionId()
- and: 'dmi subscription details per dmi'
- def dmiSubscriptionsPerDmi = [:]
- when: 'subscription is loaded to cache with dmi subscription details per dmi'
- objectUnderTest.add(subscriptionId, dmiSubscriptionsPerDmi)
- then: 'the number of entries in cache is correct'
- assert testCache.size() == 1
- and: 'the cache contains the correct entries'
- assert testCache.containsKey(subscriptionId)
- and: 'the entry for the subscription ID matches the provided DMI subscription details'
- assert testCache.get(subscriptionId) == dmiSubscriptionsPerDmi
- }
-
- def 'Get cache entry via subscription id'() {
- given: 'the cache contains value for some-id'
- testCache.put('some-id', [:])
- when: 'the get method is called'
- def result = objectUnderTest.get('some-id')
- then: 'correct value is returned as expected'
- assert result == [:]
- }
-
- def 'Remove accepted and rejected entries from cache via subscription id'() {
- given: 'a map as the value for cache entry for some-id'
- def testMap = [:]
- testMap.put("dmi-1",
- new DmiCmSubscriptionDetails([], CmSubscriptionStatus.ACCEPTED))
- testMap.put("dmi-2",
- new DmiCmSubscriptionDetails([], CmSubscriptionStatus.REJECTED))
- testMap.put("dmi-3",
- new DmiCmSubscriptionDetails([], CmSubscriptionStatus.PENDING))
- testCache.put("test-id", testMap)
- assert testCache.get("test-id").size() == 3
- when: 'the method to remove accepted and rejected entries for test-id is called'
- objectUnderTest.removeAcceptedAndRejectedDmiSubscriptionEntries("test-id")
- then: 'all entries with status accepted/rejected are no longer present for test-id'
- testCache.get("test-id").each {key, testResultMap ->
- assert testResultMap.cmSubscriptionStatus != CmSubscriptionStatus.ACCEPTED
- || testResultMap.cmSubscriptionStatus != CmSubscriptionStatus.REJECTED
- }
- and: 'the size of the map for cache entry test-id is as expected'
- assert testCache.get("test-id").size() == 1
- }
-
- def 'Create map for DMI cm notification subscription per DMI service name'() {
- given: 'list of predicates from the create subscription event'
- def predicates = ncmpInEvent.getData().getPredicates()
- when: 'method to create map of DMI cm notification subscription per DMI service name is called'
- def result = objectUnderTest.createDmiSubscriptionsPerDmi(predicates)
- then: 'the result size of resulting map is correct to the number of DMIs'
- assert result.size() == 2
- and: 'the cache objects per DMI exists'
- def resultMapForDmi1 = result.get('dmi-1')
- def resultMapForDmi2 = result.get('dmi-2')
- assert resultMapForDmi1 != null
- assert resultMapForDmi2 != null
- and: 'the size of predicates in each object is correct'
- assert resultMapForDmi1.dmiCmSubscriptionPredicates.size() == 2
- assert resultMapForDmi2.dmiCmSubscriptionPredicates.size() == 2
- and: 'the subscription status in each object is correct'
- assert resultMapForDmi1.cmSubscriptionStatus.toString() == 'PENDING'
- assert resultMapForDmi2.cmSubscriptionStatus.toString() == 'PENDING'
- and: 'the target cmHandles for each predicate is correct'
- assert resultMapForDmi1.dmiCmSubscriptionPredicates[0].targetCmHandleIds == ['ch1'].toSet()
- assert resultMapForDmi1.dmiCmSubscriptionPredicates[1].targetCmHandleIds == ['ch3'].toSet()
-
- assert resultMapForDmi2.dmiCmSubscriptionPredicates[0].targetCmHandleIds == ['ch2'].toSet()
- assert resultMapForDmi2.dmiCmSubscriptionPredicates[1].targetCmHandleIds == ['ch4'].toSet()
- and: 'the list of xpath for each is correct'
- assert resultMapForDmi1.dmiCmSubscriptionPredicates[0].xpaths
- && resultMapForDmi2.dmiCmSubscriptionPredicates[0].xpaths == ['/x1/y1', 'x2/y2'].toSet()
-
- assert resultMapForDmi1.dmiCmSubscriptionPredicates[1].xpaths
- && resultMapForDmi2.dmiCmSubscriptionPredicates[1].xpaths == ['/x3/y3', 'x4/y4'].toSet()
- }
-
- def 'Get map for cm handle IDs by DMI service name'() {
- given: 'the predicate from the test request CM subscription event'
- def targetFilter = ncmpInEvent.getData().getPredicates().get(0).getTargetFilter()
- when: 'the method to group all target CM handles by DMI service name is called'
- def mapOfCMHandleIDsByDmi = objectUnderTest.groupTargetCmHandleIdsByDmi(targetFilter)
- then: 'the size of the resulting map is correct'
- assert mapOfCMHandleIDsByDmi.size() == 2
- and: 'the values in the map is as expected'
- assert mapOfCMHandleIDsByDmi.get('dmi-1') == ['ch1'].toSet()
- assert mapOfCMHandleIDsByDmi.get('dmi-2') == ['ch2'].toSet()
- }
-
- def 'Update subscription status in cache per DMI service name'() {
- given: 'populated cache'
- def predicates = ncmpInEvent.getData().getPredicates()
- def subscriptionId = ncmpInEvent.getData().getSubscriptionId()
- objectUnderTest.add(subscriptionId, predicates)
- when: 'subscription status per dmi is updated in cache'
- objectUnderTest.updateDmiSubscriptionStatus(subscriptionId, 'dmi-1', CmSubscriptionStatus.ACCEPTED)
- then: 'verify status has been updated in cache'
- def predicate = testCache.get(subscriptionId)
- assert predicate.get('dmi-1').cmSubscriptionStatus == CmSubscriptionStatus.ACCEPTED
- }
-
- def 'Persist Cache into database per dmi'() {
- given: 'populated cache'
- def predicates = ncmpInEvent.getData().getPredicates()
- def subscriptionId = ncmpInEvent.getData().getSubscriptionId()
- objectUnderTest.add(subscriptionId, predicates)
- when: 'subscription is persisted in database'
- objectUnderTest.persistIntoDatabasePerDmi(subscriptionId, 'dmi-1')
- then: 'persistence service is called the correct number of times per dmi'
- 2 * mockCmSubscriptionPersistenceService.addSubscription(*_)
- }
-
- def 'Remove subscription from database per dmi'() {
- given: 'populated cache'
- def predicates = ncmpInEvent.getData().getPredicates()
- def subscriptionId = ncmpInEvent.getData().getSubscriptionId()
- objectUnderTest.add(subscriptionId, predicates)
- when: 'subscription is persisted in database'
- objectUnderTest.removeFromDatabase(subscriptionId, 'dmi-1')
- then: 'persistence service is called the correct number of times per dmi'
- 2 * mockCmSubscriptionPersistenceService.removeSubscription(*_)
- }
-
- def setUpTestEvent() {
- def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
- def testEventSent = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent.class)
- def testCloudEventSent = CloudEventBuilder.v1()
- .withData(objectMapper.writeValueAsBytes(testEventSent))
- .withId('subscriptionCreated')
- .withType('subscriptionCreated')
- .withSource(URI.create('some-resource'))
- .withExtension('correlationid', 'test-cmhandle1').build()
- def consumerRecord = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent)
- def cloudEvent = consumerRecord.value()
-
- ncmpInEvent = toTargetEvent(cloudEvent, NcmpInEvent.class);
- }
-
- def initialiseMockInventoryPersistenceResponses() {
- mockInventoryPersistence.getYangModelCmHandles(['ch1', 'ch2'])
- >> [yangModelCmHandle1, yangModelCmHandle2]
-
- mockInventoryPersistence.getYangModelCmHandles(['ch3', 'ch4'])
- >> [yangModelCmHandle3, yangModelCmHandle4]
- }
-
-}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.datajobs.subscription.dmi
-
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionKey
-import spock.lang.Specification
-
-class DmiCmSubscriptionDetailsPerDmiMapperSpec extends Specification {
-
- def objectUnderTest = new DmiCmSubscriptionDetailsPerDmiMapper()
-
- def 'Check for grouping of Dmi Subscription Details'() {
- given: 'details in the form of datastore , cmhandle and xpath'
- def subscribersPerDmi = [
- 'dmi-1': [
- new DmiCmSubscriptionKey('ncmp-datastore:passthrough-operational', 'ch-1', '/a/b'),
- new DmiCmSubscriptionKey('ncmp-datastore:passthrough-operational', 'ch-2', '/a/b')
- ],
- 'dmi-2': [
- new DmiCmSubscriptionKey('ncmp-datastore:passthrough-running', 'ch-3', '/c/d'),
- new DmiCmSubscriptionKey('ncmp-datastore:passthrough-running', 'ch-3', '/e/f')
- ]
- ]
- when: 'we try to map the values based on datastore and xpath'
- def result = objectUnderTest.toDmiCmSubscriptionsPerDmi(subscribersPerDmi)
- then: 'the mapped values are grouped as expected for dmi-1'
- assert result['dmi-1'].dmiCmSubscriptionPredicates.size() == 1
- assert result['dmi-1'].dmiCmSubscriptionPredicates[0].targetCmHandleIds.containsAll(['ch-1', 'ch-2'])
- and: 'similarly for dmi-2'
- assert result['dmi-2'].dmiCmSubscriptionPredicates.size() == 2
- assert result['dmi-2'].dmiCmSubscriptionPredicates[0].targetCmHandleIds.contains('ch-3')
- assert result['dmi-2'].dmiCmSubscriptionPredicates[1].targetCmHandleIds.contains('ch-3')
- }
-}
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
+ * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.cps.ncmp.impl.datajobs.subscription.dmi
-
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
+import org.onap.cps.ncmp.impl.utils.JexParser
import spock.lang.Specification
-import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_OPERATIONAL
-import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_RUNNING
-
class DmiInEventMapperSpec extends Specification {
def mockInventoryPersistence = Mock(InventoryPersistence)
}
def 'Check for Cm Notification Subscription DMI In Event mapping'() {
- given: 'a collection of cm subscription predicates'
- def dmiSubscriptionPredicates = [new DmiCmSubscriptionPredicate(['ch-1'].toSet(), PASSTHROUGH_RUNNING, ['/ch-1'].toSet()),
- new DmiCmSubscriptionPredicate(['ch-2'].toSet(), PASSTHROUGH_OPERATIONAL, ['/ch-2'].toSet())]
+ given: 'data job subscription details'
+ def cmHandleIds = ['ch-1', 'ch-2'].asList()
+ def dataNodeSelectors = ['/dataNodeSelector1'].asList()
+ def notificationTypes = []
+ def notificationFilter = ''
+ def dataNodeSelectorAsJsonExpression = JexParser.toJsonExpressionsAsString(dataNodeSelectors)
when: 'we try to map the values'
- def result = objectUnderTest.toDmiInEvent(dmiSubscriptionPredicates)
- then: 'it contains correct cm notification subscription cmhandle object'
- assert result.data.cmHandles.cmhandleId.containsAll(['ch-1', 'ch-2'])
- assert result.data.cmHandles.privateProperties.containsAll([['k1': 'v1'], ['k2': 'v2']])
- and: 'also has the correct dmi cm notification subscription predicates'
- assert result.data.predicates.targetFilter.containsAll([['ch-1'], ['ch-2']])
+ def result = objectUnderTest.toDmiInEvent(cmHandleIds, dataNodeSelectors, notificationTypes, notificationFilter)
+ then: 'it contains correct cm handles'
+ assert result.data.cmHandles.cmhandleId.containsAll(cmHandleIds)
+ and: 'correct data node selector'
+ assert result.data.productionJobDefinition.targetSelector.dataNodeSelector == dataNodeSelectorAsJsonExpression
}
}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * Copyright (c) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an 'AS IS' BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.datajobs.subscription.dmi
-
-import ch.qos.logback.classic.Level
-import ch.qos.logback.classic.Logger
-import ch.qos.logback.classic.spi.ILoggingEvent
-import ch.qos.logback.core.read.ListAppender
-import com.fasterxml.jackson.databind.ObjectMapper
-import io.cloudevents.CloudEvent
-import io.cloudevents.core.builder.CloudEventBuilder
-import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.onap.cps.ncmp.impl.datajobs.subscription.dmi_to_ncmp.Data
-import org.onap.cps.ncmp.impl.datajobs.subscription.dmi_to_ncmp.DmiOutEvent
-import org.onap.cps.ncmp.impl.datajobs.subscription.cache.DmiCacheHandler
-import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.NcmpOutEventMapper
-import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.NcmpOutEventProducer
-import org.onap.cps.ncmp.utils.TestUtils
-import org.onap.cps.ncmp.utils.events.MessagingBaseSpec
-import org.onap.cps.utils.JsonObjectMapper
-import org.slf4j.LoggerFactory
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.boot.test.context.SpringBootTest
-
-import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED
-import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.REJECTED
-
-@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
-class DmiOutEventConsumerSpec extends MessagingBaseSpec {
-
- @Autowired
- JsonObjectMapper jsonObjectMapper
-
- @Autowired
- ObjectMapper objectMapper
-
- def mockDmiCacheHandler = Mock(DmiCacheHandler)
- def mockNcmpOutEventProducer = Mock(NcmpOutEventProducer)
- def mockNcmpOutEventMapper = Mock(NcmpOutEventMapper)
-
- def objectUnderTest = new DmiOutEventConsumer(mockDmiCacheHandler, mockNcmpOutEventProducer, mockNcmpOutEventMapper)
- def logger = Spy(ListAppender<ILoggingEvent>)
-
- void setup() {
- ((Logger) LoggerFactory.getLogger(DmiOutEventConsumer.class)).addAppender(logger)
- logger.start()
- }
-
- void cleanup() {
- ((Logger) LoggerFactory.getLogger(DmiOutEventConsumer.class)).detachAndStopAllAppenders()
- }
-
-
- def 'Consume valid CM Subscription response from DMI Plugin'() {
- given: 'a cmsubscription event'
- def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionDmiOutEvent.json')
- def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DmiOutEvent.class)
- def testCloudEventSent = CloudEventBuilder.v1()
- .withData(objectMapper.writeValueAsBytes(testEventSent))
- .withId('random-uuid')
- .withType('subscriptionCreateResponse')
- .withSource(URI.create('test-dmi-plugin-name'))
- .withExtension('correlationid', 'sub-1#test-dmi-plugin-name').build()
- def consumerRecord = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent)
- when: 'the valid event is consumed'
- objectUnderTest.consumeDmiOutEvent(consumerRecord)
- then: 'an event is logged with level INFO'
- def loggingEvent = getLoggingEvent()
- assert loggingEvent.level == Level.INFO
- and: 'the log indicates the task completed successfully'
- assert loggingEvent.formattedMessage == 'Cm Subscription with id : sub-1 handled by the dmi-plugin : test-dmi-plugin-name has the status : accepted'
- }
-
- def 'Consume a valid CM Notification Subscription Event and perform correct actions base on status'() {
- given: 'a cmNotificationSubscription event'
- def dmiOutEventData = new Data(statusCode: statusCode, statusMessage: subscriptionStatus.toString())
- def dmiOutEvent = new DmiOutEvent().withData(dmiOutEventData)
- def testCloudEventSent = CloudEventBuilder.v1()
- .withData(objectMapper.writeValueAsBytes(dmiOutEvent))
- .withId('random-uuid')
- .withType('subscriptionCreateResponse')
- .withSource(URI.create('test-dmi-plugin-name'))
- .withExtension('correlationid', 'sub-1#test-dmi-plugin-name').build()
- def consumerRecord = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent)
- when: 'the event is consumed'
- objectUnderTest.consumeDmiOutEvent(consumerRecord)
- then: 'correct number of calls to cache'
- expectedCacheCalls * mockDmiCacheHandler.updateDmiSubscriptionStatus('sub-1', 'test-dmi-plugin-name', subscriptionStatus)
- and: 'correct number of calls to persist cache'
- expectedPersistenceCalls * mockDmiCacheHandler.persistIntoDatabasePerDmi('sub-1', 'test-dmi-plugin-name')
- and: 'correct number of calls to map the ncmp out event'
- 1 * mockNcmpOutEventMapper.toNcmpOutEvent('sub-1', _)
- and: 'correct number of calls to send the ncmp out event to client'
- 1 * mockNcmpOutEventProducer.sendNcmpOutEvent('sub-1', 'subscriptionCreateResponse', _, false)
- where: 'the following parameters are used'
- scenario | subscriptionStatus | statusCode || expectedCacheCalls | expectedPersistenceCalls
- 'Accepted Status' | ACCEPTED | '1' || 1 | 1
- 'Rejected Status' | REJECTED | '104' || 1 | 0
- }
-
- def getLoggingEvent() {
- return logger.list[0]
- }
-
-}
import org.onap.cps.ncmp.config.CpsApplicationContext
import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.CmHandle
import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.Data
-import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DmiInEvent
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent
import org.onap.cps.ncmp.utils.events.CloudEventMapper
import org.onap.cps.utils.JsonObjectMapper
import org.springframework.boot.test.context.SpringBootTest
@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper, CloudEventBuilder])
@ContextConfiguration(classes = [CpsApplicationContext])
-class DmiInEventProducerSpec extends Specification {
+class EventProducerSpec extends Specification {
def mockEventsProducer = Mock(EventsProducer)
- def objectUnderTest = new DmiInEventProducer(mockEventsProducer)
+ def objectUnderTest = new EventProducer(mockEventsProducer)
def 'Create and Send Cm Notification Subscription DMI In Event'() {
given: 'a cm subscription for a dmi plugin'
def subscriptionId = 'test-subscription-id'
def dmiPluginName = 'test-dmiplugin'
def eventType = 'subscriptionCreateRequest'
- def dmiInEvent = new DmiInEvent(data: new Data(cmHandles: [new CmHandle(cmhandleId: 'test-1', privateProperties: [:])]))
+ def dmiInEvent = new DataJobSubscriptionDmiInEvent(data: new Data(cmHandles: [new CmHandle(cmhandleId: 'test-1', privateProperties: [:])]))
and: 'also we have target topic for dmiPlugin'
objectUnderTest.dmiInEventTopic = 'dmiplugin-test-topic'
when: 'the event is sent'
- objectUnderTest.sendDmiInEvent(subscriptionId, dmiPluginName, eventType, dmiInEvent)
+ objectUnderTest.send(subscriptionId, dmiPluginName, eventType, dmiInEvent)
then: 'the event contains the required attributes'
1 * mockEventsProducer.sendCloudEvent(_, _, _) >> {
args ->
assert dmiInEventAsCloudEvent.getExtension('correlationid') == subscriptionId + '#' + dmiPluginName
assert dmiInEventAsCloudEvent.type == 'subscriptionCreateRequest'
assert dmiInEventAsCloudEvent.source.toString() == 'NCMP'
- assert CloudEventMapper.toTargetEvent(dmiInEventAsCloudEvent, DmiInEvent) == dmiInEvent
+ assert CloudEventMapper.toTargetEvent(dmiInEventAsCloudEvent, DataJobSubscriptionDmiInEvent) == dmiInEvent
}
}
}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * Copyright (c) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an 'AS IS' BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp
-
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate
-import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService
-import spock.lang.Specification
-
-import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_OPERATIONAL
-
-class CmSubscriptionComparatorSpec extends Specification {
-
- def mockCmSubscriptionPersistenceService = Mock(CmDataJobSubscriptionPersistenceService)
- def objectUnderTest = new CmSubscriptionComparator(mockCmSubscriptionPersistenceService)
-
- def 'Find the difference based on the provided predicates'() {
- given: 'A list of predicates'
- def predicates = [new DmiCmSubscriptionPredicate(['ch-1', 'ch-2'].toSet(), PASSTHROUGH_OPERATIONAL, ['a/1/', 'b/2'].toSet())]
- and: '1 positive and 1 negative response.'
- mockCmSubscriptionPersistenceService.hasAtLeastOneSubscription(PASSTHROUGH_OPERATIONAL.getDatastoreName(), 'ch-1') >> true
- mockCmSubscriptionPersistenceService.hasAtLeastOneSubscription(PASSTHROUGH_OPERATIONAL.getDatastoreName(), 'ch-2') >> false
- when: 'method to extract only NEW predicates for dmi is called'
- def result = objectUnderTest.getNewDmiSubscriptionPredicates(predicates)
- then: 'from 2 predicates only 1 remains'
- assert result.size() == 1
- assert result[0].targetCmHandleIds[0] == 'ch-2'
- }
-
- def 'Find the difference based on the provided predicates when it is an ongoing Cm Subscription'() {
- given: 'A list of predicates'
- def predicates = [new DmiCmSubscriptionPredicate(['ch-1'].toSet(), PASSTHROUGH_OPERATIONAL, ['a/1/'].toSet())]
- and: 'its already present'
- mockCmSubscriptionPersistenceService.hasAtLeastOneSubscription(PASSTHROUGH_OPERATIONAL.getDatastoreName(), 'ch-1') >> true
- when: 'method to extract only NEW predicates for dmi is called'
- def result = objectUnderTest.getNewDmiSubscriptionPredicates(predicates)
- then: 'from 1 predicate, none remains'
- assert result.size() == 0
- }
-
-}
package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp
-import com.fasterxml.jackson.databind.ObjectMapper
-import org.onap.cps.api.model.DataNode
-import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.NcmpInEvent
-import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent
-import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DmiInEvent
-import org.onap.cps.ncmp.impl.datajobs.subscription.cache.DmiCacheHandler
-import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiCmSubscriptionDetailsPerDmiMapper
+
+import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataSelector
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent
import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventMapper
-import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventProducer
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate
+import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.EventProducer
import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
-import org.onap.cps.ncmp.utils.TestUtils
-import org.onap.cps.utils.JsonObjectMapper
+import org.onap.cps.ncmp.impl.utils.AlternateIdMatcher
+import org.onap.cps.ncmp.impl.utils.JexParser
import spock.lang.Specification
-import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_OPERATIONAL
-import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED
-import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.PENDING
-
class CmSubscriptionHandlerImplSpec extends Specification {
- def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
def mockCmSubscriptionPersistenceService = Mock(CmDataJobSubscriptionPersistenceService)
- def mockCmSubscriptionComparator = Mock(CmSubscriptionComparator)
- def mockNcmpOutEventMapper = Mock(NcmpOutEventMapper)
def mockDmiInEventMapper = Mock(DmiInEventMapper)
- def dmiCmSubscriptionDetailsPerDmiMapper = new DmiCmSubscriptionDetailsPerDmiMapper()
- def mockNcmpOutEventProducer = Mock(NcmpOutEventProducer)
- def mockDmiInEventProducer = Mock(DmiInEventProducer)
- def mockDmiCacheHandler = Mock(DmiCacheHandler)
+ def mockDmiInEventProducer = Mock(EventProducer)
def mockInventoryPersistence = Mock(InventoryPersistence)
+ def mockAlternateIdMatcher = Mock(AlternateIdMatcher)
- def objectUnderTest = new CmSubscriptionHandlerImpl(mockCmSubscriptionPersistenceService,
- mockCmSubscriptionComparator, mockNcmpOutEventMapper, mockDmiInEventMapper, dmiCmSubscriptionDetailsPerDmiMapper,
- mockNcmpOutEventProducer, mockDmiInEventProducer, mockDmiCacheHandler, mockInventoryPersistence)
-
- def testDmiSubscriptionsPerDmi = ["dmi-1": new DmiCmSubscriptionDetails([], PENDING)]
-
- def 'Consume valid and unique CmNotificationSubscriptionNcmpInEvent create message'() {
- given: 'a cmNotificationSubscriptionNcmp in event with new subscription id'
- def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
- def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent.class)
- def testListOfDeltaPredicates = [new DmiCmSubscriptionPredicate(['ch1'].toSet(), PASSTHROUGH_OPERATIONAL, ['/a/b'].toSet())]
- and: 'the persistence service confirms subscription id is new (not used for other subscription)'
- mockCmSubscriptionPersistenceService.isNewSubscriptionId('test-id') >> true
- and: 'relevant details is extracted from the event'
- def subscriptionId = testEventConsumed.getData().getSubscriptionId()
- def predicates = testEventConsumed.getData().getPredicates()
- and: 'the cache handler returns for relevant subscription id'
- 1 * mockDmiCacheHandler.get("test-id") >> testDmiSubscriptionsPerDmi
- and: 'the delta predicates is returned'
- 1 * mockCmSubscriptionComparator.getNewDmiSubscriptionPredicates(_) >> testListOfDeltaPredicates
- and: 'the DMI in event mapper returns cm notification subscription event'
- def testDmiInEvent = new DmiInEvent()
- 1 * mockDmiInEventMapper.toDmiInEvent(testListOfDeltaPredicates) >> testDmiInEvent
- when: 'the valid and unique event is consumed'
- objectUnderTest.processSubscriptionCreateRequest(subscriptionId, predicates)
- then: 'the subscription cache handler is called once'
- 1 * mockDmiCacheHandler.add('test-id', _)
- and: 'the events handler method to send DMI event is called correct number of times with the correct parameters'
- testDmiSubscriptionsPerDmi.size() * mockDmiInEventProducer.sendDmiInEvent(
- "test-id", "dmi-1", "subscriptionCreateRequest", testDmiInEvent)
- and: 'we schedule to send the response after configured time from the cache'
- 1 * mockNcmpOutEventProducer.sendNcmpOutEvent('test-id', 'subscriptionCreateResponse', null, true)
- }
+ def objectUnderTest = new CmSubscriptionHandlerImpl(mockCmSubscriptionPersistenceService, mockDmiInEventMapper,
+ mockDmiInEventProducer, mockInventoryPersistence, mockAlternateIdMatcher)
- def 'Consume valid and Overlapping Cm Notification Subscription NcmpIn Event'() {
- given: 'a cmNotificationSubscriptionNcmp in event with unique subscription id'
- def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
- def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent.class)
- def noDeltaPredicates = []
- and: 'the persistence service confirms subscription id is new (not used for other subscription)'
- mockCmSubscriptionPersistenceService.isNewSubscriptionId('test-id') >> true
- and: 'the cache handler returns for relevant subscription id'
- 1 * mockDmiCacheHandler.get('test-id') >> testDmiSubscriptionsPerDmi
- and: 'the delta predicates is returned'
- 1 * mockCmSubscriptionComparator.getNewDmiSubscriptionPredicates(_) >> noDeltaPredicates
- when: 'the valid and unique event is consumed'
- objectUnderTest.processSubscriptionCreateRequest('test-id', noDeltaPredicates)
- then: 'the subscription cache handler is called once'
- 1 * mockDmiCacheHandler.add('test-id', _)
- and: 'the subscription details are updated in the cache'
- 1 * mockDmiCacheHandler.updateDmiSubscriptionStatus('test-id', _, ACCEPTED)
- and: 'we schedule to send the response after configured time from the cache'
- 1 * mockNcmpOutEventProducer.sendNcmpOutEvent('test-id', 'subscriptionCreateResponse', null, true)
+ def 'Process subscription CREATE request for new target [non existing]'() {
+ given: 'relevant subscription details'
+ def mySubId = 'dataJobId'
+ def myDataNodeSelectors = ['/parent[id="1"]'].toList()
+ def notificationTypes = []
+ def notificationFilter = ''
+ def dataSelector = new DataSelector(notificationTypes: notificationTypes, notificationFilter: notificationFilter)
+ and: 'alternate Id matcher returns cm handle id for given data node selector'
+ def fdn = getFdn(myDataNodeSelectors.iterator().next())
+ mockAlternateIdMatcher.getCmHandleId(fdn) >> 'myCmHandleId'
+ and: 'returns inactive data node selector(s)'
+ mockCmSubscriptionPersistenceService.getInactiveDataNodeSelectors(mySubId) >> ['/parent[id="1"]']
+ and: 'the inventory persistence service returns cm handle'
+ mockInventoryPersistence.getYangModelCmHandle('myCmHandleId') >> new YangModelCmHandle(dmiServiceName: 'myDmiService')
+ and: 'DMI in event mapper returns event'
+ def myDmiInEvent = new DataJobSubscriptionDmiInEvent()
+ mockDmiInEventMapper.toDmiInEvent(['myCmHandleId'], myDataNodeSelectors, notificationTypes, notificationFilter) >> myDmiInEvent
+ when: 'the method to process subscription create request is called'
+ objectUnderTest.processSubscriptionCreate(dataSelector, mySubId, myDataNodeSelectors)
+ then: 'the persistence service is called'
+ 1 * mockCmSubscriptionPersistenceService.add(mySubId, '/parent[id="1"]')
+ and: 'the event is sent to correct DMI'
+ 1 * mockDmiInEventProducer.send(mySubId, 'myDmiService', 'subscriptionCreateRequest', _)
}
- def 'Consume valid and but non-unique CmNotificationSubscription create message'() {
- given: 'a cmNotificationSubscriptionNcmp in event'
- def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
- def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent.class)
- and: 'the persistence service confirms subscription id is not new (already used subscription)'
- mockCmSubscriptionPersistenceService.isNewSubscriptionId('test-id') >> false
- and: 'relevant details is extracted from the event'
- def subscriptionId = testEventConsumed.getData().getSubscriptionId()
- def predicates = testEventConsumed.getData().getPredicates()
- and: 'the NCMP out in event mapper returns an event for rejected request'
- def testNcmpOutEvent = new NcmpOutEvent()
- 1 * mockNcmpOutEventMapper.toNcmpOutEventForRejectedRequest(
- "test-id", _) >> testNcmpOutEvent
- when: 'the valid but non-unique event is consumed'
- objectUnderTest.processSubscriptionCreateRequest(subscriptionId, predicates)
- then: 'the events handler method to send DMI event is never called'
- 0 * mockDmiInEventProducer.sendDmiInEvent(_, _, _, _)
- and: 'the events handler method to send NCMP out event is called once'
- 1 * mockNcmpOutEventProducer.sendNcmpOutEvent('test-id', 'subscriptionCreateResponse', testNcmpOutEvent, false)
+ def 'Process subscription CREATE request for new targets [non existing] to be sent to multiple DMIs'() {
+ given: 'relevant subscription details'
+ def mySubId = 'dataJobId'
+ def myDataNodeSelectors = [
+ '/parent[id="forDmi1"]',
+ '/parent[id="forDmi1"]/child',
+ '/parent[id="forDmi2"]'].toList()
+ def someAttr1 = []
+ def someAttr2 = ''
+ def dataSelector = new DataSelector(notificationTypes: someAttr1, notificationFilter: someAttr2)
+ and: 'alternate Id matcher returns cm handle ids for given data node selectors'
+ def fdn1 = getFdn(myDataNodeSelectors.get(0))
+ def fdn2 = getFdn(myDataNodeSelectors.get(1))
+ def fdn3 = getFdn(myDataNodeSelectors.get(2))
+ mockAlternateIdMatcher.getCmHandleId(fdn1) >> 'myCmHandleId1'
+ mockAlternateIdMatcher.getCmHandleId(fdn2) >> 'myCmHandleId1'
+ mockAlternateIdMatcher.getCmHandleId(fdn3) >> 'myCmHandleId2'
+ and: 'returns inactive data node selector(s)'
+ mockCmSubscriptionPersistenceService.getInactiveDataNodeSelectors(mySubId) >> [
+ '/parent[id="forDmi1"]',
+ '/parent[id="forDmi1"]/child',
+ '/parent[id="forDmi2"]']
+ and: 'the inventory persistence service returns cm handles with dmi information'
+ mockInventoryPersistence.getYangModelCmHandle('myCmHandleId1') >> new YangModelCmHandle(dmiServiceName: 'myDmiService1')
+ mockInventoryPersistence.getYangModelCmHandle('myCmHandleId2') >> new YangModelCmHandle(dmiServiceName: 'myDmiService2')
+ and: 'DMI in event mapper returns events'
+ def myDmiInEvent1 = new DataJobSubscriptionDmiInEvent()
+ def myDmiInEvent2 = new DataJobSubscriptionDmiInEvent()
+ mockDmiInEventMapper.toDmiInEvent(['myCmHandleId1'], ['/parent[id="forDmi1"]', '/parent[id="forDmi1"]/child'], someAttr1, someAttr2) >> myDmiInEvent1
+ mockDmiInEventMapper.toDmiInEvent(['myCmHandleId2'], ['/parent[id="forDmi2"]'], someAttr1, someAttr2) >> myDmiInEvent2
+ when: 'the method to process subscription create request is called'
+ objectUnderTest.processSubscriptionCreate(dataSelector, mySubId, myDataNodeSelectors)
+ then: 'the persistence service is called'
+ myDataNodeSelectors.each { dataNodeSelector ->
+ 1 * mockCmSubscriptionPersistenceService.add(_, dataNodeSelector)}
+ and: 'the event is sent to correct DMIs'
+ 1 * mockDmiInEventProducer.send(mySubId, 'myDmiService1', 'subscriptionCreateRequest', myDmiInEvent1)
+ 1 * mockDmiInEventProducer.send(mySubId, 'myDmiService2', 'subscriptionCreateRequest', myDmiInEvent2)
}
- def 'Consume valid CmNotificationSubscriptionNcmpInEvent delete message'() {
- given: 'a test subscription id'
- def subscriptionId = 'test-id'
- and: 'the persistence service returns datanodes'
- 1 * mockCmSubscriptionPersistenceService.getAffectedDataNodes(subscriptionId) >>
- [new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-1']/filters/filter[@xpath='x/y']", leaves: ['xpath': 'x/y', 'subscriptionIds': ['test-id']]),
- new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-2']/filters/filter[@xpath='y/z']", leaves: ['xpath': 'y/z', 'subscriptionIds': ['test-id']])]
- and: 'the inventory persistence returns yang model cm handles'
- 1 * mockInventoryPersistence.getYangModelCmHandle('ch-1') >> new YangModelCmHandle(dmiServiceName: 'dmi-1')
- 1 * mockInventoryPersistence.getYangModelCmHandle('ch-2') >> new YangModelCmHandle(dmiServiceName: 'dmi-2')
- when: 'the subscription delete request is processed'
- objectUnderTest.processSubscriptionDeleteRequest(subscriptionId)
- then: 'the method to send a dmi event is called with correct parameters'
- 1 * mockDmiInEventProducer.sendDmiInEvent(subscriptionId, 'dmi-1', 'subscriptionDeleteRequest', _)
- 1 * mockDmiInEventProducer.sendDmiInEvent(subscriptionId, 'dmi-2', 'subscriptionDeleteRequest', _)
- and: 'the method to send nmcp out event is called with correct parameters'
- 1 * mockNcmpOutEventProducer.sendNcmpOutEvent(subscriptionId, 'subscriptionDeleteResponse', null, true)
+ def 'Process subscription CREATE request for overlapping targets [non existing & existing]'() {
+ given: 'relevant subscription details'
+ def myNewSubId = 'newId'
+ def myDataNodeSelectors = ['/newDataNodeSelector[id=""]'].toList()
+ def dataSelector = new DataSelector(notificationTypes: [], notificationFilter: '')
+ and: 'alternate id matcher always returns a cm handle id'
+ mockAlternateIdMatcher.getCmHandleId(_) >> 'someCmHandleId'
+ and: 'the inventory persistence service returns cm handles with dmi information'
+ mockInventoryPersistence.getYangModelCmHandle(_) >> new YangModelCmHandle(dmiServiceName: 'myDmiService')
+ and: 'returns inactive data node selector(s)'
+ mockCmSubscriptionPersistenceService.getInactiveDataNodeSelectors(myNewSubId) >> inactiveDataNodeSelectors
+ when: 'the method to process subscription create request is called'
+ objectUnderTest.processSubscriptionCreate(dataSelector, myNewSubId, myDataNodeSelectors)
+ then: 'the persistence service is called'
+ 1 * mockCmSubscriptionPersistenceService.add(_, myDataNodeSelectors.iterator().next())
+ and: 'the event is sent to correct DMIs'
+ expectedCallsToDmi * mockDmiInEventProducer.send(myNewSubId, 'myDmiService', 'subscriptionCreateRequest', _)
+ where: 'following data are used'
+ scenario | inactiveDataNodeSelectors || expectedCallsToDmi
+ 'new target overlaps with ACCEPTED targets' | [] || 0
+ 'new target overlaps with REJECTED targets' | ['/existingDataNodeSelector[id=""]','/newDataNodeSelector[id=""]']|| 1
+ 'new target overlaps with UNKNOWN targets' | ['/existingDataNodeSelector[id=""]','/newDataNodeSelector[id=""]']|| 1
+ 'new target does not overlap with existing targets'| ['/newDataNodeSelector[id=""]'] || 1
}
- def 'Delete a subscriber for fully overlapping subscriptions'() {
- given: 'a test subscription id'
- def subscriptionId = 'test-id'
- and: 'the persistence service returns datanodes with multiple subscribers'
- 1 * mockCmSubscriptionPersistenceService.getAffectedDataNodes(subscriptionId) >>
- [new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-1']/filters/filter[@xpath='x/y']", leaves: ['xpath': 'x/y', 'subscriptionIds': ['test-id', 'other-id']]),
- new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-2']/filters/filter[@xpath='y/z']", leaves: ['xpath': 'y/z', 'subscriptionIds': ['test-id', 'other-id']])]
- and: 'the inventory persistence returns yang model cm handles'
- 1 * mockInventoryPersistence.getYangModelCmHandle('ch-1') >> new YangModelCmHandle(dmiServiceName: 'dmi-1')
- 1 * mockInventoryPersistence.getYangModelCmHandle('ch-2') >> new YangModelCmHandle(dmiServiceName: 'dmi-2')
- and: 'the cache handler returns the relevant maps whenever called'
- 2 * mockDmiCacheHandler.get(subscriptionId) >> ['dmi-1': [:], 'dmi-2': [:]]
- when: 'the subscription delete request is processed'
- objectUnderTest.processSubscriptionDeleteRequest(subscriptionId)
- then: 'the method to send a dmi event is never called'
- 0 * mockDmiInEventProducer.sendDmiInEvent(_, _, _, _)
- and: 'the cache handler is called to remove subscriber from database per dmi'
- 1 * mockDmiCacheHandler.removeFromDatabase('test-id', 'dmi-1')
- 1 * mockDmiCacheHandler.removeFromDatabase('test-id', 'dmi-2')
- and: 'the method to send ncmp out event is called with correct parameters'
- 1 * mockNcmpOutEventProducer.sendNcmpOutEvent(subscriptionId, 'subscriptionDeleteResponse', null, false)
+ def getFdn(dataNodeSelector) {
+ return JexParser.extractFdnPrefix(dataNodeSelector).orElse("")
}
}
import ch.qos.logback.core.read.ListAppender
import com.fasterxml.jackson.databind.ObjectMapper
import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataJobSubscriptionOperationInEvent
+import org.onap.cps.ncmp.impl.utils.JexParser
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.utils.JsonObjectMapper
import org.slf4j.LoggerFactory
@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
class NcmpInEventConsumerSpec extends Specification {
- def objectUnderTest = new NcmpInEventConsumer()
+ def mockCmSubscriptionHandler = Mock(CmSubscriptionHandlerImpl)
+ def objectUnderTest = new NcmpInEventConsumer(mockCmSubscriptionHandler)
def logger = new ListAppender<ILoggingEvent>()
@Autowired
((Logger) LoggerFactory.getLogger(NcmpInEventConsumer.class)).detachAndStopAllAppenders()
}
- def 'Consuming CM Data Notification #scenario data type id.'() {
- given: 'a JSON file containing a subscription event'
- def jsonData = TestUtils.getResourceFileContent('sample_dataJobSubscriptionInEvent.json')
- jsonData = jsonData.replace('#dataTypeId', dataTypeId)
+ def 'Consuming CREATE cm data job subscription request.'() {
+ given: 'a JSON file for create event'
+ def jsonData = TestUtils.getResourceFileContent(
+ 'datajobs/subscription/cmNotificationSubscriptionNcmpInEvent.json')
+ def myEventType = "dataJobCreated"
+ jsonData = jsonData.replace('#myEventType', myEventType)
+ and: 'the event'
def event = objectMapper.readValue(jsonData, DataJobSubscriptionOperationInEvent)
+ and: 'the list of data node selectors'
+ def dataNodeSelectorList = getDataNodeSelectorsAsXpaths(event)
+ and: 'the other data job event attributes'
+ def dataSelector = getDataSelector(event)
when: 'the event is consumed'
objectUnderTest.consumeSubscriptionEvent(event)
then: 'event details are logged at level INFO'
def loggingEvent = logger.list.last()
assert loggingEvent.level == Level.INFO
- assert loggingEvent.formattedMessage.contains('jobId=my job id')
- assert loggingEvent.formattedMessage.contains('eventType=my event type')
- assert loggingEvent.formattedMessage.contains("dataType=${dataTypeId}")
- assert loggingEvent.formattedMessage.contains('fdns=[/SubNetwork[id="SN1"]]')
- where: 'the following data type ids are used'
- scenario | dataTypeId
- 'with' | 'my data type'
- 'without' | 'null'
+ assert loggingEvent.formattedMessage.contains('dataJobId=myDataJobId')
+ assert loggingEvent.formattedMessage.contains("eventType=${myEventType}")
+ and: 'method to handle process subscription create request is called'
+ 1 * mockCmSubscriptionHandler.processSubscriptionCreate(dataSelector, "myDataJobId", dataNodeSelectorList)
+ }
+
+ def getDataNodeSelectorsAsXpaths(event) {
+ return JexParser.toXpaths(event.event.dataJob.productionJobDefinition.targetSelector.dataNodeSelector)
+ }
+
+ def getDataSelector(event) {
+ return event.event.dataJob.productionJobDefinition.dataSelector
}
}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp
-
-
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails
-import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate
-import spock.lang.Specification
-
-import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_OPERATIONAL
-import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_RUNNING
-import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED
-import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.PENDING
-import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.REJECTED
-
-class NcmpOutEventMapperSpec extends Specification {
-
- static Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi
-
- def objectUnderTest = new NcmpOutEventMapper()
-
- def setup() {
- def dmiSubscriptionPredicateA = new DmiCmSubscriptionPredicate(['ch-A'] as Set, PASSTHROUGH_RUNNING, ['/a'] as Set)
- def dmiSubscriptionPredicateB = new DmiCmSubscriptionPredicate(['ch-B'] as Set, PASSTHROUGH_OPERATIONAL, ['/b'] as Set)
- def dmiSubscriptionPredicateC = new DmiCmSubscriptionPredicate(['ch-C'] as Set, PASSTHROUGH_OPERATIONAL, ['/c'] as Set)
- dmiSubscriptionsPerDmi = ['dmi-1': new DmiCmSubscriptionDetails([dmiSubscriptionPredicateA], PENDING),
- 'dmi-2': new DmiCmSubscriptionDetails([dmiSubscriptionPredicateB], ACCEPTED),
- 'dmi-3': new DmiCmSubscriptionDetails([dmiSubscriptionPredicateC], REJECTED)
- ]
- }
-
- def 'Check for Cm Notification Subscription Outgoing event mapping'() {
- when: 'we try to map the event to send it to client'
- def result = objectUnderTest.toNcmpOutEvent('test-subscription', dmiSubscriptionsPerDmi)
- then: 'event is mapped correctly for the subscription'
- result.data.subscriptionId == 'test-subscription'
- and: 'the cm handle ids are part of correct list'
- result.data.pendingTargets == ['ch-A'] as Set
- result.data.acceptedTargets == ['ch-B'] as Set
- result.data.rejectedTargets == ['ch-C'] as Set
- }
-
- def 'Check for Cm Notification Rejected Subscription Outgoing event mapping'() {
- when: 'we try to map the event to send it to client'
- def result = objectUnderTest.toNcmpOutEventForRejectedRequest('test-subscription', ['ch-1', 'ch-2'])
- then: 'event is mapped correctly for the subscription id'
- result.data.subscriptionId == 'test-subscription'
- and: 'the cm handle ids are part of correct list'
- result.data.withRejectedTargets(['ch-1', 'ch-2'])
- }
-}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import io.cloudevents.CloudEvent
-import io.cloudevents.core.v1.CloudEventBuilder
-import org.onap.cps.events.EventsProducer
-import org.onap.cps.ncmp.config.CpsApplicationContext
-import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.Data
-import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent
-import org.onap.cps.ncmp.impl.datajobs.subscription.cache.DmiCacheHandler
-import org.onap.cps.ncmp.utils.events.CloudEventMapper
-import org.onap.cps.utils.JsonObjectMapper
-import org.springframework.boot.test.context.SpringBootTest
-import org.springframework.test.context.ContextConfiguration
-import spock.lang.Specification
-
-@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper, CloudEventBuilder])
-@ContextConfiguration(classes = [CpsApplicationContext])
-class NcmpOutEventProducerSpec extends Specification {
-
- def mockEventsProducer = Mock(EventsProducer)
- def mockNcmpOutEventMapper = Mock(NcmpOutEventMapper)
- def mockDmiCacheHandler = Mock(DmiCacheHandler)
-
- def objectUnderTest = new NcmpOutEventProducer(mockEventsProducer, mockNcmpOutEventMapper, mockDmiCacheHandler)
-
- def 'Create and #scenario Cm Notification Subscription NCMP out event'() {
- given: 'a cm subscription response for the client'
- def subscriptionId = 'test-subscription-id-2'
- def eventType = 'subscriptionCreateResponse'
- def ncmpOutEvent = new NcmpOutEvent(data: new Data(subscriptionId: 'test-subscription-id-2', acceptedTargets: ['ch-1', 'ch-2']))
- and: 'also we have target topic for sending to client'
- objectUnderTest.ncmpOutEventTopic = 'client-test-topic'
- and: 'a deadline to an event'
- objectUnderTest.dmiOutEventTimeoutInMs = 1000
- when: 'the event is sent'
- objectUnderTest.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, eventPublishingTaskToBeScheduled)
- then: 'we conditionally wait for a while'
- Thread.sleep(delayInMs)
- then: 'the event contains the required attributes'
- 1 * mockEventsProducer.sendCloudEvent(_, _, _) >> {
- args ->
- {
- assert args[0] == 'client-test-topic'
- assert args[1] == subscriptionId
- def ncmpOutEventAsCloudEvent = (args[2] as CloudEvent)
- assert ncmpOutEventAsCloudEvent.getExtension('correlationid') == subscriptionId
- assert ncmpOutEventAsCloudEvent.type == 'subscriptionCreateResponse'
- assert ncmpOutEventAsCloudEvent.source.toString() == 'NCMP'
- assert CloudEventMapper.toTargetEvent(ncmpOutEventAsCloudEvent, NcmpOutEvent) == ncmpOutEvent
- }
- }
- where: 'following scenarios are considered'
- scenario | delayInMs | eventPublishingTaskToBeScheduled
- 'send event now' | 0 | false
- 'schedule and send after the configured time ' | 1500 | true
- }
-
- def 'Schedule Cm Notification Subscription NCMP out event but later send it on demand'() {
- given: 'a cm subscription response for the client'
- def subscriptionId = 'test-subscription-id-3'
- def eventType = 'subscriptionCreateResponse'
- def ncmpOutEvent = new NcmpOutEvent(data: new Data(subscriptionId: 'test-subscription-id-3', acceptedTargets: ['ch-2', 'ch-3']))
- and: 'also we have target topic for sending to client'
- objectUnderTest.ncmpOutEventTopic = 'client-test-topic'
- and: 'a deadline to an event'
- objectUnderTest.dmiOutEventTimeoutInMs = 1000
- when: 'the event is scheduled to be sent'
- objectUnderTest.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, true)
- then: 'we wait for 10ms and then we receive response from DMI'
- Thread.sleep(10)
- and: 'we receive response from DMI so we send the message on demand'
- objectUnderTest.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false)
- then: 'the event contains the required attributes'
- 1 * mockEventsProducer.sendCloudEvent(_, _, _) >> {
- args ->
- {
- assert args[0] == 'client-test-topic'
- assert args[1] == subscriptionId
- def ncmpOutEventAsCloudEvent = (args[2] as CloudEvent)
- assert ncmpOutEventAsCloudEvent.getExtension('correlationid') == subscriptionId
- assert ncmpOutEventAsCloudEvent.type == 'subscriptionCreateResponse'
- assert ncmpOutEventAsCloudEvent.source.toString() == 'NCMP'
- assert CloudEventMapper.toTargetEvent(ncmpOutEventAsCloudEvent, NcmpOutEvent) == ncmpOutEvent
- }
- }
- then: 'the cache handler is called once to remove accepted and rejected entries in cache'
- 1 * mockDmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId)
- }
-
- def 'No event sent when NCMP out event is null'() {
- given: 'a cm subscription response for the client'
- def subscriptionId = 'test-subscription-id-3'
- def eventType = 'subscriptionCreateResponse'
- def ncmpOutEvent = null
- and: 'also we have target topic for sending to client'
- objectUnderTest.ncmpOutEventTopic = 'client-test-topic'
- and: 'a deadline to an event'
- objectUnderTest.dmiOutEventTimeoutInMs = 1000
- when: 'the event is scheduled to be sent'
- objectUnderTest.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, true)
- then: 'we wait for 10ms and then we receive response from DMI'
- Thread.sleep(10)
- and: 'we receive NO response from DMI so we send the message on demand'
- objectUnderTest.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false)
- and: 'no event sent'
- 0 * mockEventsProducer.sendCloudEvent(*_)
- }
-
-}
package org.onap.cps.ncmp.impl.datajobs.subscription.utils
+
+import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR
+import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID
+import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_INACTIVE_SUBSCRIPTIONS
+import static CmDataJobSubscriptionPersistenceService.PARENT_NODE_XPATH
+import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS
+
import com.fasterxml.jackson.databind.ObjectMapper
import org.onap.cps.api.CpsDataService
import org.onap.cps.api.CpsQueryService
import org.onap.cps.utils.JsonObjectMapper
import spock.lang.Specification
-import static CmDataJobSubscriptionPersistenceService.CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH
-import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE
-import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID
-import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS
class CmSubscriptionPersistenceServiceSpec extends Specification {
def 'Check cm data job subscription details has at least one subscriber #scenario'() {
given: 'a valid cm data job subscription query'
- def cpsPathQuery = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('altId1', 'dataType1')
+ def cpsPathQuery = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR.formatted('/myDataNodeSelector')
and: 'datanodes optionally returned'
1 * mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', cpsPathQuery, OMIT_DESCENDANTS) >> dataNode
when: 'we check if subscription details already has at least one subscriber'
- def result = objectUnderTest.hasAtLeastOneSubscription('dataType1', 'altId1')
+ def result = objectUnderTest.hasAtLeastOneSubscription('/myDataNodeSelector')
then: 'we get expected result'
assert result == hasAtLeastOneSubscription
where: 'following scenarios are used'
'no datanodes present' | [] || true
}
- def 'Get all nodes for subscription id'() {
+ def 'Get all inactive data node selectors for subscription id'() {
given: 'the query service returns nodes for subscription id'
- def expectedDataNode = new DataNode(leaves: ['datajobId': ['id1'], 'dataTypeId': 'dataType1', 'alternateId': 'altId1'])
+ def expectedDataNode = new DataNode(leaves: ['datajobId': ['id1'], 'dataNodeSelector': '/dataNodeSelector', 'status': 'UNKNOWN'])
def queryServiceResponse = [expectedDataNode].asCollection()
- def cmDataJobSubscriptionIdCpsPath = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID.formatted('mySubId')
+ def cmDataJobSubscriptionIdCpsPath = CPS_PATH_TEMPLATE_FOR_INACTIVE_SUBSCRIPTIONS.formatted('id1')
1 * mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', cmDataJobSubscriptionIdCpsPath, OMIT_DESCENDANTS) >> queryServiceResponse
when: 'retrieving all nodes for data job subscription id'
- def result = objectUnderTest.getAffectedDataNodes('mySubId')
+ def result = objectUnderTest.getInactiveDataNodeSelectors('id1')
then: 'the result returns correct number of datanodes'
assert result.size() == 1
and: 'the attribute of the data nodes is as expected'
- assert result.iterator().next().leaves.alternateId == expectedDataNode.leaves.alternateId
- assert result.iterator().next().leaves.dataTypeId == expectedDataNode.leaves.dataTypeId
+ assert result.iterator().next() == expectedDataNode.leaves.dataNodeSelector
}
- def 'Add subscription for a data type and and fdn that have no subscriptions yet.'() {
+ def 'Add subscription for a data node selector that have no subscriptions yet.'() {
given: 'a valid cm data job subscription path query'
- def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('altId1', 'dataType1')
+ def dataNodeSelector = '/myDataNodeSelector'
+ def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR.formatted(dataNodeSelector)
and: 'a data node does not exist for cm data job subscription path query'
mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', query, OMIT_DESCENDANTS) >> []
- and: 'a datanode does not exist for the given cm data job subscription path query'
- mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions', query, OMIT_DESCENDANTS) >> []
and: 'data job subscription details is mapped as JSON'
def subscriptionIds = ['newSubId']
- def subscriptionAsJson = objectUnderTest.getSubscriptionDetailsAsJson(subscriptionIds, 'dataType1', 'altId1')
- when: 'the method to add/update cm notification subscription is called'
- objectUnderTest.addSubscription('dataType1', 'altId1', 'newSubId')
+ def subscriptionAsJson = objectUnderTest.createSubscriptionDetailsAsJson(dataNodeSelector, subscriptionIds, 'UNKNOWN')
+ when: 'the method to add cm notification subscription is called'
+ objectUnderTest.add('newSubId', dataNodeSelector)
then: 'data service method to create new subscription for given subscriber is called once with the correct parameters'
1 * mockCpsDataService.saveData('NCMP-Admin', 'cm-data-job-subscriptions', subscriptionAsJson, _, ContentType.JSON)
}
- def 'Add subscription for a data type and fdn that already have subscription(s).'() {
+ def 'Add subscription for a data node selector that already have subscription(s).'() {
given: 'a valid cm subscription path query'
- def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('altId1', 'dataType1')
+ def dataNodeSelector = '/myDataNodeSelector'
+ def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR.formatted(dataNodeSelector)
and: 'a dataNode exists for the given cps path query'
- mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', query, OMIT_DESCENDANTS) >> [new DataNode(leaves: ['dataJobId': ['existingId'], 'dataTypeId': 'dataType1', 'alternateId': 'altId1'])]
+ mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', query, OMIT_DESCENDANTS) >> [new DataNode(leaves: ['dataJobId': ['existingId'], 'dataNodeSelector': dataNodeSelector, 'status': 'ACCEPTED'])]
and: 'updated cm data job subscription details as json'
def newListOfSubscriptionIds = ['existingId', 'newSubId']
- def subscriptionDetailsAsJson = objectUnderTest.getSubscriptionDetailsAsJson(newListOfSubscriptionIds, 'dataType1', 'altId1')
- when: 'the method to add/update cm notification subscription is called'
- objectUnderTest.addSubscription('dataType1', 'altId1', 'newSubId')
+ def subscriptionDetailsAsJson = objectUnderTest.createSubscriptionDetailsAsJson(dataNodeSelector, newListOfSubscriptionIds, 'ACCEPTED')
+ when: 'the method to add cm notification subscription is called'
+ objectUnderTest.add('newSubId', dataNodeSelector)
then: 'data service method to update list of subscribers is called once'
- 1 * mockCpsDataService.updateNodeLeaves('NCMP-Admin', 'cm-data-job-subscriptions', CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH, subscriptionDetailsAsJson, _, ContentType.JSON)
- }
-
- def 'Remove subscription (other subscriptions remain for same data type and target).'() {
- given: 'a subscription exists when queried'
- def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('altId1', 'dataType1')
- mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', query, OMIT_DESCENDANTS)
- >> [new DataNode(leaves: ['dataJobId': ['existingId', 'subIdToRemove'], 'dataTypeId': 'dataType1', 'alternateId': 'altId1'])]
- and: 'updated cm data job subscription details as json'
- def subscriptionDetailsAsJson = objectUnderTest.getSubscriptionDetailsAsJson(['existingId'], 'dataType1', 'altId1')
- when: 'the subscriber is removed'
- objectUnderTest.removeSubscription('dataType1', 'altId1', 'subIdToRemove')
- then: 'the list of subscribers is updated'
- 1 * mockCpsDataService.updateNodeLeaves('NCMP-Admin', 'cm-data-job-subscriptions', CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH, subscriptionDetailsAsJson, _, ContentType.JSON)
- }
-
- def 'Remove last subscription (no subscriptions remain for same data type and target).'() {
- given: 'a subscription exists when queried but has only 1 subscriber'
- def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('last-alt-id', 'last-data-type')
- mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', query, OMIT_DESCENDANTS)
- >> [new DataNode(leaves: ['dataJobId': ['subIdToRemove'], 'dataTypeId': 'last-data-type', 'alternateId': 'last-alt-id'])]
- and: 'a cps path with alternate id and data type for deleting a node'
- def cpsPath = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('last-alt-id', 'last-data-type')
- when: 'that last ongoing subscription is removed'
- objectUnderTest.removeSubscription('last-data-type', 'last-alt-id', 'subIdToRemove')
- then: 'the data job subscription with empty subscribers list is removed'
- 1 * mockCpsDataService.deleteDataNode('NCMP-Admin', 'cm-data-job-subscriptions', cpsPath, _)
- }
-
- def 'Attempt to remove non existing subscription (id).'() {
- given: 'a subscription exists when queried with other subscriber'
- def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('some-alt-id', 'some-data-type')
- mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', query, OMIT_DESCENDANTS) >> [new DataNode(leaves: ['dataJobId': ['otherDataJobId']])]
- when: 'the remove subscription method is with a non existing id'
- objectUnderTest.removeSubscription('some-data-type', 'some-alt-id', 'nonExistingSubId')
- then: 'no calls to cps data service is made'
- 0 * mockCpsDataService.deleteDataNode(*_)
- and: 'removal of non existent subscription id silently ignored with no exception thrown'
- noExceptionThrown()
+ 1 * mockCpsDataService.updateNodeLeaves('NCMP-Admin', 'cm-data-job-subscriptions', PARENT_NODE_XPATH, subscriptionDetailsAsJson, _, ContentType.JSON)
}
}
+++ /dev/null
-{
- "data": {
- "subscriptionId": "test-id",
- "predicates": [
- {
- "targetFilter": ["ch1","ch2"],
- "scopeFilter": {
- "datastore": "ncmp-datastore:passthrough-operational",
- "xpathFilter": ["/x1/y1","x2/y2"]
- }
- },
- {
- "targetFilter": ["ch3","ch4"],
- "scopeFilter": {
- "datastore": "ncmp-datastore:passthrough-operational",
- "xpathFilter": ["/x3/y3","x4/y4"]
- }
- }
- ]
- }
-}
\ No newline at end of file
{
- "eventType": "my event type",
+ "eventType": "#myEventType",
"event": {
"dataJob": {
- "id": "my job id",
+ "id": "myDataJobId",
"productionJobDefinition": {
"targetSelector": {
"dataNodeSelector": "/SubNetwork[id=\"SN1\"]"
}
},
"dataType": {
- "dataTypeId": "#dataTypeId"
+ "dataTypeId": ""
}
}
}
\ No newline at end of file
avc:
cm-subscription-ncmp-in: ${CM_SUBSCRIPTION_NCMP_IN_TOPIC:subscription}
cm-subscription-dmi-in: ${CM_SUBSCRIPTION_DMI_IN_TOPIC:ncmp-dmi-cm-avc-subscription}
- cm-subscription-dmi-out: ${CM_SUBSCRIPTION_DMI_OUT_TOPIC:dmi-ncmp-cm-avc-subscription}
cm-subscription-ncmp-out: ${CM_SUBSCRIPTION_NCMP_OUT_TOPIC:subscription-response}
cm-events-topic: ${NCMP_CM_EVENTS_TOPIC:cm-events}
inventory-events-topic: ncmp-inventory-events
sleep-time-ms: 1000000
cm-handle-data-sync:
sleep-time-ms: 30000
- subscription-forwarding:
- dmi-response-timeout-ms: 30000
model-loader:
retry-time-ms: 1000
trust-level: