"DmiInEvent": {
"description": "The payload for cm notification subscription event incoming message from NCMP.",
"type": "object",
- "javaType": "org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent",
+ "javaType": "org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DmiInEvent",
"additionalProperties": false,
"properties": {
"data": {
]
}
}
-}
\ No newline at end of file
+}
"description": "The payload for cm notification subscription merge event coming out from DMI Plugin.",
"type": "object",
"additionalProperties": false,
- "javaType": "org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.dmi_to_ncmp.DmiOutEvent",
+ "javaType": "org.onap.cps.ncmp.impl.datajobs.subscription.dmi_to_ncmp.DmiOutEvent",
"properties": {
"data": {
"$ref": "#/definitions/Data"
"title": "Data"
}
}
-}
\ No newline at end of file
+}
"definitions": {
"NcmpInEvent": {
"description": "The payload for subscription merge event.",
- "javaType": "org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.NcmpInEvent",
+ "javaType": "org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.NcmpInEvent",
"properties": {
"data": {
"properties": {
]
}
}
-}
\ No newline at end of file
+}
"NcmpOutEvent": {
"type": "object",
"description": "The payload applied cm subscription merge event coming out from NCMP.",
- "javaType": "org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent",
+ "javaType": "org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent",
"additionalProperties": false,
"properties": {
"data": {
}
-}
\ No newline at end of file
+}
"DmiInEvent": {
"description": "The payload for cm notification data job subscription event incoming message from NCMP.",
"type": "object",
- "javaType": "org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DataJobSubscriptionDmiInEvent",
+ "javaType": "org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent",
"properties": {
"data": {
"$ref": "#/definitions/data"
"cmHandles"
]
}
-}
\ No newline at end of file
+}
-
{
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "Data job subscription operation event format",
"type": "object",
- "javaType": "org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.DataJobSubscriptionOperationInEvent",
+ "javaType": "org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataJobSubscriptionOperationInEvent",
"properties": {
"eventType": {
"description": "the type of the event",
}
}
}
-}
\ No newline at end of file
+}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.cache;
+package org.onap.cps.ncmp.impl.datajobs.subscription.cache;
import com.hazelcast.config.MapConfig;
import com.hazelcast.map.IMap;
import java.util.Map;
import org.onap.cps.ncmp.impl.cache.HazelcastCacheConfig;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
public class CmSubscriptionConfig extends HazelcastCacheConfig {
private static final MapConfig cmNotificationSubscriptionCacheMapConfig =
- createGenericMapConfig("cmNotificationSubscriptionCacheMapConfig");
+ createGenericMapConfig("cmNotificationSubscriptionCacheMapConfig");
/**
* Distributed instance of cm notification subscription information
@Bean
public IMap<String, Map<String, DmiCmSubscriptionDetails>> cmNotificationSubscriptionCache() {
return getOrCreateHazelcastInstance(cmNotificationSubscriptionCacheMapConfig).getMap(
- "cmNotificationSubscriptionCache");
+ "cmNotificationSubscriptionCache");
}
}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.cache;
+package org.onap.cps.ncmp.impl.datajobs.subscription.cache;
-import static org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus.PENDING;
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.PENDING;
import java.util.ArrayList;
import java.util.Collection;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.onap.cps.ncmp.api.data.models.DatastoreType;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmDataJobSubscriptionPersistenceService;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate;
+import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.Predicate;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate;
+import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService;
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
import org.springframework.stereotype.Component;
/**
* Adds subscription to the subscription cache.
*
- * @param subscriptionId subscription id
- * @param predicates subscription request predicates
+ * @param subscriptionId subscription id
+ * @param predicates subscription request predicates
*/
public void add(final String subscriptionId, final List<Predicate> predicates) {
cmNotificationSubscriptionCache.put(subscriptionId, createDmiSubscriptionsPerDmi(predicates));
/**
* Adds subscription to the subscription cache.
*
- * @param subscriptionId subscription id
+ * @param subscriptionId subscription id
* @param dmiCmSubscriptionDetailsPerDmi map of dmi cm notification subscription details per dmi
*/
public void add(final String subscriptionId,
final Map<String, DmiCmSubscriptionDetails>
- dmiCmSubscriptionDetailsPerDmi) {
+ dmiCmSubscriptionDetailsPerDmi) {
cmNotificationSubscriptionCache.put(subscriptionId, dmiCmSubscriptionDetailsPerDmi);
}
/**
* Get cm notification subscription cache entry via subscription id.
*
- * @param subscriptionId subscription id
+ * @param subscriptionId subscription id
* @return map of dmi cm notification subscriptions per dmi
*/
public Map<String, DmiCmSubscriptionDetails> get(final String subscriptionId) {
*/
public void removeAcceptedAndRejectedDmiSubscriptionEntries(final String subscriptionId) {
final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
- cmNotificationSubscriptionCache.get(subscriptionId);
+ cmNotificationSubscriptionCache.get(subscriptionId);
final Map<String, DmiCmSubscriptionDetails> updatedDmiSubscriptionsPerDmi =
- dmiSubscriptionsPerDmi.entrySet().stream()
- .filter(dmiCmNotificationSubscription -> !isAcceptedOrRejected(
- dmiCmNotificationSubscription.getValue()))
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ dmiSubscriptionsPerDmi.entrySet().stream()
+ .filter(dmiCmNotificationSubscription -> !isAcceptedOrRejected(
+ dmiCmNotificationSubscription.getValue()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
cmNotificationSubscriptionCache.put(subscriptionId, updatedDmiSubscriptionsPerDmi);
}
/**
- * Creates map of subscription details per DMI.
+ * Creates map of subscription details per DMI.
*
- * @param predicates CM Subscription Create Request Predicates
- * @return Map of DmiCmNotificationSubscription per DMI plugin
+ * @param predicates CM Subscription Create Request Predicates
+ * @return Map of DmiCmNotificationSubscription per DMI plugin
*/
public Map<String, DmiCmSubscriptionDetails> createDmiSubscriptionsPerDmi(
- final List<Predicate> predicates) {
+ final List<Predicate> predicates) {
final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
- new HashMap<>();
+ new HashMap<>();
for (final Predicate requestPredicate : predicates) {
final List<String> targetFilter = requestPredicate.getTargetFilter();
final DatastoreType datastoreType = DatastoreType.fromDatastoreName(
- requestPredicate.getScopeFilter().getDatastore().toString());
+ requestPredicate.getScopeFilter().getDatastore().toString());
final Set<String> xpaths = new HashSet<>(requestPredicate.getScopeFilter().getXpathFilter());
final Map<String, Set<String>> targetCmHandlesByDmiMap = groupTargetCmHandleIdsByDmi(targetFilter);
- for (final Map.Entry<String, Set<String>> targetCmHandlesByDmi: targetCmHandlesByDmiMap.entrySet()) {
+ for (final Map.Entry<String, Set<String>> targetCmHandlesByDmi : targetCmHandlesByDmiMap.entrySet()) {
final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate =
- new DmiCmSubscriptionPredicate(targetCmHandlesByDmi.getValue(),
- datastoreType, xpaths);
+ new DmiCmSubscriptionPredicate(targetCmHandlesByDmi.getValue(),
+ datastoreType, xpaths);
updateDmiSubscriptionDetailsPerDmi(targetCmHandlesByDmi.getKey(),
- dmiCmSubscriptionPredicate,
- dmiSubscriptionsPerDmi);
+ dmiCmSubscriptionPredicate,
+ dmiSubscriptionsPerDmi);
}
}
return dmiSubscriptionsPerDmi;
}
/**
- * Update status in map of subscription details per DMI.
- *
- * @param subscriptionId String of subscription id
- * @param dmiServiceName String of dmiServiceName
- * @param status String of status
+ * Update status in map of subscription details per DMI.
*
+ * @param subscriptionId String of subscription id
+ * @param dmiServiceName String of dmiServiceName
+ * @param status String of status
*/
public void updateDmiSubscriptionStatus(final String subscriptionId, final String dmiServiceName,
final CmSubscriptionStatus status) {
final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
- cmNotificationSubscriptionCache.get(subscriptionId);
+ cmNotificationSubscriptionCache.get(subscriptionId);
dmiSubscriptionsPerDmi.get(dmiServiceName).setCmSubscriptionStatus(status);
cmNotificationSubscriptionCache.put(subscriptionId, dmiSubscriptionsPerDmi);
}
/**
- * Persist map of subscription details per DMI.
- *
- * @param subscriptionId String of subscription id
- * @param dmiServiceName String of dmiServiceName
+ * Persist map of subscription details per DMI.
*
+ * @param subscriptionId String of subscription id
+ * @param dmiServiceName String of dmiServiceName
*/
public void persistIntoDatabasePerDmi(final String subscriptionId, final String dmiServiceName) {
final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates =
- cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName)
- .getDmiCmSubscriptionPredicates();
+ cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName)
+ .getDmiCmSubscriptionPredicates();
for (final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate : dmiCmSubscriptionPredicates) {
final DatastoreType datastoreType = dmiCmSubscriptionPredicate.getDatastoreType();
final Set<String> cmHandles = dmiCmSubscriptionPredicate.getTargetCmHandleIds();
- for (final String cmHandle: cmHandles) {
+ for (final String cmHandle : cmHandles) {
cmDataJobSubscriptionPersistenceService.addSubscription(datastoreType.getDatastoreName(),
- cmHandle, subscriptionId);
+ cmHandle, subscriptionId);
}
}
}
/**
- * Remove subscription from database per DMI service name.
- *
- * @param subscriptionId String of subscription id
- * @param dmiServiceName String of dmiServiceName
+ * Remove subscription from database per DMI service name.
*
+ * @param subscriptionId String of subscription id
+ * @param dmiServiceName String of dmiServiceName
*/
public void removeFromDatabase(final String subscriptionId, final String dmiServiceName) {
final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates =
- cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName)
- .getDmiCmSubscriptionPredicates();
+ cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName)
+ .getDmiCmSubscriptionPredicates();
for (final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate : dmiCmSubscriptionPredicates) {
final DatastoreType datastoreType = dmiCmSubscriptionPredicate.getDatastoreType();
final Set<String> cmHandles = dmiCmSubscriptionPredicate.getTargetCmHandleIds();
- for (final String cmHandle: cmHandles) {
+ for (final String cmHandle : cmHandles) {
cmDataJobSubscriptionPersistenceService.removeSubscription(datastoreType.getDatastoreName(),
- cmHandle, subscriptionId);
+ cmHandle, subscriptionId);
}
}
}
private void updateDmiSubscriptionDetailsPerDmi(
- final String dmiServiceName,
- final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate,
- final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi) {
+ final String dmiServiceName,
+ final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate,
+ final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi) {
if (dmiSubscriptionsPerDmi.containsKey(dmiServiceName)) {
dmiSubscriptionsPerDmi.get(dmiServiceName)
- .getDmiCmSubscriptionPredicates().add(dmiCmSubscriptionPredicate);
+ .getDmiCmSubscriptionPredicates().add(dmiCmSubscriptionPredicate);
} else {
dmiSubscriptionsPerDmi.put(dmiServiceName,
- new DmiCmSubscriptionDetails(
- new ArrayList<>(List.of(dmiCmSubscriptionPredicate)),
- PENDING));
+ new DmiCmSubscriptionDetails(
+ new ArrayList<>(List.of(dmiCmSubscriptionPredicate)),
+ PENDING));
}
}
private Map<String, Set<String>> groupTargetCmHandleIdsByDmi(final List<String> targetCmHandleIds) {
final Map<String, Set<String>> targetCmHandlesByDmiServiceNames = new HashMap<>();
final Collection<YangModelCmHandle> yangModelCmHandles =
- inventoryPersistence.getYangModelCmHandles(targetCmHandleIds);
+ inventoryPersistence.getYangModelCmHandles(targetCmHandleIds);
for (final YangModelCmHandle yangModelCmHandle : yangModelCmHandles) {
final String dmiServiceName = yangModelCmHandle.getDmiServiceName();
private boolean isAcceptedOrRejected(final DmiCmSubscriptionDetails dmiCmSubscription) {
return dmiCmSubscription.getCmSubscriptionStatus().toString().equals("ACCEPTED")
- || dmiCmSubscription.getCmSubscriptionStatus().toString().equals("REJECTED");
+ || dmiCmSubscription.getCmSubscriptionStatus().toString().equals("REJECTED");
}
}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.cmavc;
+package org.onap.cps.ncmp.impl.datajobs.subscription.cmavc;
import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent;
* @param cmAvcEventAsConsumerRecord Incoming raw consumer record
*/
@KafkaListener(topics = "${app.dmi.cm-events.topic}",
- containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
+ containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
@Timed(value = "cps.ncmp.cm.notifications.consume.and.forward", description = "Time taken to forward CM AVC events")
public void consumeAndForward(final ConsumerRecord<String, CloudEvent> cmAvcEventAsConsumerRecord) {
if (isEventFromOnapDmiPlugin(cmAvcEventAsConsumerRecord.headers())) {
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.cmavc;
+package org.onap.cps.ncmp.impl.datajobs.subscription.cmavc;
import static org.onap.cps.cpspath.parser.CpsPathUtil.NO_PARENT_PATH;
import static org.onap.cps.cpspath.parser.CpsPathUtil.getNormalizedParentXpath;
public void processCmAvcEvent(final String cmHandleId, final AvcEvent cmAvcEvent) {
final List<Edit> edits =
- cmAvcEvent.getData().getPushChangeUpdate().getDatastoreChanges().getIetfYangPatchYangPatch().getEdit();
+ cmAvcEvent.getData().getPushChangeUpdate().getDatastoreChanges().getIetfYangPatchYangPatch().getEdit();
edits.forEach(
- edit -> handleCmAvcEventOperation(CmAvcOperationEnum.fromValue(edit.getOperation()), cmHandleId, edit));
+ edit -> handleCmAvcEventOperation(CmAvcOperationEnum.fromValue(edit.getOperation()), cmHandleId, edit));
}
private void handleCmAvcEventOperation(final CmAvcOperationEnum cmAvcOperation, final String cmHandleId,
- final Edit cmAvcEventEdit) {
+ final Edit cmAvcEventEdit) {
log.info("Operation : {} requested for cmHandleId : {}", cmAvcOperation.getValue(), cmHandleId);
final String cpsPathFromRestConfStylePath = getCpsPath(cmHandleId, cmAvcEventEdit.getTarget());
final String parentXpath = getNormalizedParentXpath(cpsPathFromRestConfStylePath);
log.debug("Handling update operation for cmHandleId : {} , cpsPath : {} and parent-xpath : {}", cmHandleId,
- cpsPathFromRestConfStylePath, parentXpath);
+ cpsPathFromRestConfStylePath, parentXpath);
cpsDataService.updateDataNodeAndDescendants(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId,
- resolveParentNodeXpath(parentXpath), jsonData, NO_TIMESTAMP, JSON);
+ resolveParentNodeXpath(parentXpath), jsonData, NO_TIMESTAMP, JSON);
}
private void handlePatch(final String cmHandleId, final Edit cmAvcEventEdit) {
final String cpsPathFromRestConfStylePath = getCpsPath(cmHandleId, cmAvcEventEdit.getTarget());
final String parentXpath = getNormalizedParentXpath(cpsPathFromRestConfStylePath);
log.debug("Handling patch operation for cmHandleId : {} , cpsPath : {} and parent-xpath : {}", cmHandleId,
- cpsPathFromRestConfStylePath, parentXpath);
+ cpsPathFromRestConfStylePath, parentXpath);
cpsDataService.updateNodeLeaves(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId,
- resolveParentNodeXpath(parentXpath), jsonData, NO_TIMESTAMP, JSON);
+ resolveParentNodeXpath(parentXpath), jsonData, NO_TIMESTAMP, JSON);
}
final String cpsPathFromRestConfStylePath = getCpsPath(cmHandleId, cmAvcEventEdit.getTarget());
log.debug("Deleting data for xpath : {}", cpsPathFromRestConfStylePath);
cpsDataService.deleteDataNode(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId,
- cpsPathFromRestConfStylePath, NO_TIMESTAMP);
+ cpsPathFromRestConfStylePath, NO_TIMESTAMP);
}
}
private String extractNodeData(final Edit cmAvcEventEdit) {
return jsonObjectMapper.convertJsonString(jsonObjectMapper.asJsonString(cmAvcEventEdit.getValue()),
- String.class);
+ String.class);
}
private String getCpsPath(final String cmHandleId, final String restConfStylePath) {
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.cmavc;
+package org.onap.cps.ncmp.impl.datajobs.subscription.cmavc;
import com.fasterxml.jackson.annotation.JsonValue;
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi;
+package org.onap.cps.ncmp.impl.datajobs.subscription.dmi;
import static org.onap.cps.ncmp.api.data.models.DatastoreType.fromDatastoreName;
-import static org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus.PENDING;
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.PENDING;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;
import org.onap.cps.ncmp.api.data.models.DatastoreType;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionKey;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionKey;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate;
import org.springframework.stereotype.Component;
@Component
* @return Grouped Dmi Subscription details per dmi plugin
*/
public Map<String, DmiCmSubscriptionDetails> toDmiCmSubscriptionsPerDmi(
- final Map<String, Collection<DmiCmSubscriptionKey>> subscribersPerDmi) {
+ final Map<String, Collection<DmiCmSubscriptionKey>> subscribersPerDmi) {
final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi = new HashMap<>();
subscribersPerDmi.forEach((dmiPluginName, dmiCmSubscriptionKeys) -> {
final Map<DatastoreTypeAndXpath, List<DmiCmSubscriptionKey>> groupedByDatastoreTypeAndXpath =
- groupByDatastoreTypeAndXpath(dmiCmSubscriptionKeys);
+ groupByDatastoreTypeAndXpath(dmiCmSubscriptionKeys);
final List<DmiCmSubscriptionPredicate> dmiSubscriptionPredicates =
- createDmiCmSubscriptionPredicates(groupedByDatastoreTypeAndXpath);
+ createDmiCmSubscriptionPredicates(groupedByDatastoreTypeAndXpath);
final DmiCmSubscriptionDetails dmiCmSubscriptionDetails =
- new DmiCmSubscriptionDetails(dmiSubscriptionPredicates, PENDING);
+ new DmiCmSubscriptionDetails(dmiSubscriptionPredicates, PENDING);
dmiSubscriptionsPerDmi.put(dmiPluginName, dmiCmSubscriptionDetails);
});
}
private static Map<DatastoreTypeAndXpath, List<DmiCmSubscriptionKey>> groupByDatastoreTypeAndXpath(
- final Collection<DmiCmSubscriptionKey> dmiCmSubscriptionKeys) {
+ final Collection<DmiCmSubscriptionKey> dmiCmSubscriptionKeys) {
return dmiCmSubscriptionKeys.stream().collect(Collectors.groupingBy(
- datastoreTypeAndXpath -> new DatastoreTypeAndXpath(
- fromDatastoreName(datastoreTypeAndXpath.datastoreName()), datastoreTypeAndXpath.xpath())));
+ datastoreTypeAndXpath -> new DatastoreTypeAndXpath(
+ fromDatastoreName(datastoreTypeAndXpath.datastoreName()), datastoreTypeAndXpath.xpath())));
}
private static List<DmiCmSubscriptionPredicate> createDmiCmSubscriptionPredicates(
- final Map<DatastoreTypeAndXpath, List<DmiCmSubscriptionKey>> groupedByDatastoreTypeAndXpath) {
+ final Map<DatastoreTypeAndXpath, List<DmiCmSubscriptionKey>> groupedByDatastoreTypeAndXpath) {
final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates = new ArrayList<>();
for (final Map.Entry<DatastoreTypeAndXpath, List<DmiCmSubscriptionKey>> datastoreTypeXpathGroupEntry :
- groupedByDatastoreTypeAndXpath.entrySet()) {
+ groupedByDatastoreTypeAndXpath.entrySet()) {
final DatastoreTypeAndXpath datastoreTypeAndXpath = datastoreTypeXpathGroupEntry.getKey();
final Set<String> cmHandleIds = new HashSet<>();
final Set<String> xpaths = Collections.singleton(datastoreTypeAndXpath.xpath());
dmiCmSubscriptionPredicates.add(
- new DmiCmSubscriptionPredicate(cmHandleIds, datastoreTypeAndXpath.datastoreType(), xpaths));
+ new DmiCmSubscriptionPredicate(cmHandleIds, datastoreTypeAndXpath.datastoreType(), xpaths));
}
return dmiCmSubscriptionPredicates;
}
- private record DatastoreTypeAndXpath(DatastoreType datastoreType, String xpath) { }
+ private record DatastoreTypeAndXpath(DatastoreType datastoreType, String xpath) {
+ }
}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi;
+package org.onap.cps.ncmp.impl.datajobs.subscription.dmi;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import lombok.RequiredArgsConstructor;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.CmHandle;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.Data;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.Predicate;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.ScopeFilter;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate;
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.CmHandle;
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.Data;
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DmiInEvent;
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.Predicate;
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.ScopeFilter;
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
import org.springframework.stereotype.Component;
final Data cmSubscriptionData = new Data();
cmSubscriptionData.setPredicates(mapToDmiInEventPredicates(dmiCmSubscriptionPredicates));
cmSubscriptionData.setCmHandles(mapToCmSubscriptionCmHandleWithAdditionalProperties(
- extractUniqueCmHandleIds(dmiCmSubscriptionPredicates)));
+ extractUniqueCmHandleIds(dmiCmSubscriptionPredicates)));
dmiInEvent.setData(cmSubscriptionData);
return dmiInEvent;
}
private List<Predicate> mapToDmiInEventPredicates(
- final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
+ final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
final List<Predicate> predicates = new ArrayList<>();
final Predicate predicate = new Predicate();
final ScopeFilter scopeFilter = new ScopeFilter();
scopeFilter.setDatastore(ScopeFilter.Datastore.fromValue(
- dmiCmNotificationSubscriptionPredicate.getDatastoreType().getDatastoreName()));
+ dmiCmNotificationSubscriptionPredicate.getDatastoreType().getDatastoreName()));
scopeFilter.setXpathFilter(dmiCmNotificationSubscriptionPredicate.getXpaths().stream().toList());
predicate.setScopeFilter(scopeFilter);
predicate.setTargetFilter(dmiCmNotificationSubscriptionPredicate.getTargetCmHandleIds().stream().toList());
final CmHandle cmHandle = new CmHandle();
final Map<String, String> cmHandleAdditionalProperties = new LinkedHashMap<>();
yangModelCmHandle.getAdditionalProperties()
- .forEach(additionalProperty -> cmHandleAdditionalProperties.put(additionalProperty.name(),
- additionalProperty.value()));
+ .forEach(additionalProperty -> cmHandleAdditionalProperties.put(additionalProperty.name(),
+ additionalProperty.value()));
cmHandle.setCmhandleId(yangModelCmHandle.getId());
cmHandle.setPrivateProperties(cmHandleAdditionalProperties);
cmSubscriptionCmHandles.add(cmHandle);
final Set<String> cmHandleIds = new HashSet<>();
dmiCmSubscriptionPredicates.forEach(dmiCmNotificationSubscriptionPredicate -> cmHandleIds.addAll(
- dmiCmNotificationSubscriptionPredicate.getTargetCmHandleIds()));
+ dmiCmNotificationSubscriptionPredicate.getTargetCmHandleIds()));
return cmHandleIds;
}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi;
+package org.onap.cps.ncmp.impl.datajobs.subscription.dmi;
import static org.onap.cps.ncmp.events.NcmpEventDataSchema.SUBSCRIPTIONS_V1;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.onap.cps.events.EventsProducer;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent;
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DmiInEvent;
import org.onap.cps.ncmp.utils.events.NcmpEvent;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
public void sendDmiInEvent(final String subscriptionId, final String dmiPluginName,
final String eventType, final DmiInEvent dmiInEvent) {
eventsProducer.sendCloudEvent(dmiInEventTopic, subscriptionId,
- buildAndGetDmiInEventAsCloudEvent(subscriptionId, dmiPluginName, eventType, dmiInEvent));
+ buildAndGetDmiInEventAsCloudEvent(subscriptionId, dmiPluginName, eventType, dmiInEvent));
}
private CloudEvent buildAndGetDmiInEventAsCloudEvent(final String subscriptionId, final String dmiPluginName,
- final String eventType, final DmiInEvent dmiInEvent) {
+ final String eventType, final DmiInEvent dmiInEvent) {
return NcmpEvent.builder()
- .type(eventType)
- .dataSchema(SUBSCRIPTIONS_V1.getDataSchema())
- .extensions(Map.of("correlationid", String.join("#", subscriptionId, dmiPluginName)))
- .data(dmiInEvent)
- .build()
- .asCloudEvent();
+ .type(eventType)
+ .dataSchema(SUBSCRIPTIONS_V1.getDataSchema())
+ .extensions(Map.of("correlationid", String.join("#", subscriptionId, dmiPluginName)))
+ .data(dmiInEvent)
+ .build()
+ .asCloudEvent();
}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi;
+package org.onap.cps.ncmp.impl.datajobs.subscription.dmi;
import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_ACCEPTED;
import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_REJECTED;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.onap.cps.ncmp.api.NcmpResponseStatus;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp.NcmpOutEventMapper;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp.NcmpOutEventProducer;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.dmi_to_ncmp.Data;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.dmi_to_ncmp.DmiOutEvent;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
+import org.onap.cps.ncmp.impl.datajobs.subscription.cache.DmiCacheHandler;
+import org.onap.cps.ncmp.impl.datajobs.subscription.dmi_to_ncmp.Data;
+import org.onap.cps.ncmp.impl.datajobs.subscription.dmi_to_ncmp.DmiOutEvent;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails;
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.NcmpOutEventMapper;
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.NcmpOutEventProducer;
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
* @param dmiOutEventAsConsumerRecord the event to be consumed
*/
@KafkaListener(topics = "${app.ncmp.avc.cm-subscription-dmi-out}",
- containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
+ containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
public void consumeDmiOutEvent(final ConsumerRecord<String, CloudEvent> dmiOutEventAsConsumerRecord) {
final CloudEvent cloudEvent = dmiOutEventAsConsumerRecord.value();
final DmiOutEvent dmiOutEvent = toTargetEvent(cloudEvent, DmiOutEvent.class);
}
private void handleDmiOutEvent(final String correlationId, final String eventType,
- final DmiOutEvent dmiOutEvent) {
+ final DmiOutEvent dmiOutEvent) {
final String subscriptionId = correlationId.split(CM_SUBSCRIPTION_CORRELATION_ID_SEPARATOR)[0];
final String dmiPluginName = correlationId.split(CM_SUBSCRIPTION_CORRELATION_ID_SEPARATOR)[1];
}
log.info("Cm Subscription with id : {} handled by the dmi-plugin : {} has the status : {}", subscriptionId,
- dmiPluginName, dmiOutEvent.getData().getStatusMessage());
+ dmiPluginName, dmiOutEvent.getData().getStatusMessage());
}
private void handleCacheStatusPerDmi(final String subscriptionId, final String dmiPluginName,
- final CmSubscriptionStatus cmSubscriptionStatus) {
+ final CmSubscriptionStatus cmSubscriptionStatus) {
dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiPluginName,
- cmSubscriptionStatus);
+ cmSubscriptionStatus);
}
private void handleEventsStatusPerDmi(final String subscriptionId, final String eventType) {
}
private boolean checkStatusCodeAndMessage(final NcmpResponseStatus ncmpResponseStatus,
- final Data dmiOutData) {
+ final Data dmiOutData) {
return ncmpResponseStatus.getCode().equals(dmiOutData.getStatusCode())
- && ncmpResponseStatus.getMessage()
- .equals(dmiOutData.getStatusMessage());
+ && ncmpResponseStatus.getMessage().equals(dmiOutData.getStatusMessage());
}
}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.models;
+package org.onap.cps.ncmp.impl.datajobs.subscription.models;
public enum CmSubscriptionStatus {
ACCEPTED,
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.models;
+package org.onap.cps.ncmp.impl.datajobs.subscription.models;
import java.util.List;
import lombok.AllArgsConstructor;
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.models;
+package org.onap.cps.ncmp.impl.datajobs.subscription.models;
/**
* Key used to find the records to be sent to the DMI plugin.
* @param cmHandleId cmhandle id
* @param xpath xpath
*/
-public record DmiCmSubscriptionKey(String datastoreName, String cmHandleId, String xpath) { }
+public record DmiCmSubscriptionKey(String datastoreName, String cmHandleId, String xpath) {
+}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.models;
+package org.onap.cps.ncmp.impl.datajobs.subscription.models;
import java.util.Set;
import lombok.AllArgsConstructor;
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.models;
+package org.onap.cps.ncmp.impl.datajobs.subscription.models;
import java.util.Collection;
import java.util.Map;
/**
* Tuple to be used during for to delete use case.
*
- * @param lastRemainingSubscriptionsPerDmi subscriptions that are used by only one subscriber grouped per dmi
- * @param overlappingSubscriptionsPerDmi subscriptions that are shared by multiple subscribers grouped per dmi
+ * @param lastRemainingSubscriptionsPerDmi subscriptions that are used by only one subscriber grouped per dmi
+ * @param overlappingSubscriptionsPerDmi subscriptions that are shared by multiple subscribers grouped per dmi
*/
public record DmiCmSubscriptionTuple(Map<String, Collection<DmiCmSubscriptionKey>> lastRemainingSubscriptionsPerDmi,
Map<String, Collection<DmiCmSubscriptionKey>> overlappingSubscriptionsPerDmi) {
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
+package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import org.onap.cps.ncmp.api.data.models.DatastoreType;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmDataJobSubscriptionPersistenceService;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate;
+import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService;
import org.springframework.stereotype.Component;
@Component
* @return new list of DmiCmNotificationSubscriptionPredicates
*/
public List<DmiCmSubscriptionPredicate> getNewDmiSubscriptionPredicates(
- final List<DmiCmSubscriptionPredicate> existingDmiCmSubscriptionPredicates) {
+ final List<DmiCmSubscriptionPredicate> existingDmiCmSubscriptionPredicates) {
final List<DmiCmSubscriptionPredicate> newDmiCmSubscriptionPredicates =
- new ArrayList<>();
+ new ArrayList<>();
for (final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate : existingDmiCmSubscriptionPredicates) {
final Set<String> targetCmHandleIds = new HashSet<>();
final Set<String> xpaths = new HashSet<>();
final DatastoreType datastoreType = dmiCmSubscriptionPredicate.getDatastoreType();
for (final String cmHandleId : dmiCmSubscriptionPredicate.getTargetCmHandleIds()) {
if (!cmDataJobSubscriptionPersistenceService.hasAtLeastOneSubscription(
- datastoreType.getDatastoreName(), cmHandleId)) {
+ datastoreType.getDatastoreName(), cmHandleId)) {
targetCmHandleIds.add(cmHandleId);
}
}
populateValidDmiSubscriptionPredicates(targetCmHandleIds, xpaths, datastoreType,
- newDmiCmSubscriptionPredicates);
+ newDmiCmSubscriptionPredicates);
}
return newDmiCmSubscriptionPredicates;
}
private void populateValidDmiSubscriptionPredicates(final Set<String> targetCmHandleIds,
- final Set<String> xpaths, final DatastoreType datastoreType,
- final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
+ final Set<String> xpaths,
+ final DatastoreType datastoreType,
+ final List<DmiCmSubscriptionPredicate>
+ dmiCmSubscriptionPredicates) {
if (!targetCmHandleIds.isEmpty()) {
final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate =
- new DmiCmSubscriptionPredicate(targetCmHandleIds, datastoreType, xpaths);
+ new DmiCmSubscriptionPredicate(targetCmHandleIds, datastoreType, xpaths);
dmiCmSubscriptionPredicates.add(dmiCmSubscriptionPredicate);
}
}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
+package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp;
import java.util.List;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate;
+import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.Predicate;
public interface CmSubscriptionHandler {
* Process cm notification subscription create request.
*
* @param subscriptionId subscription id
- * @param predicates subscription predicates
+ * @param predicates subscription predicates
*/
void processSubscriptionCreateRequest(final String subscriptionId, final List<Predicate> predicates);
*/
void processSubscriptionDeleteRequest(final String subscriptionId);
-}
\ No newline at end of file
+}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
+package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.onap.cps.api.model.DataNode;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiCmSubscriptionDetailsPerDmiMapper;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventMapper;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventProducer;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionKey;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionTuple;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmDataJobSubscriptionPersistenceService;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent;
+import org.onap.cps.ncmp.impl.datajobs.subscription.cache.DmiCacheHandler;
+import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.Predicate;
+import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiCmSubscriptionDetailsPerDmiMapper;
+import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventMapper;
+import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventProducer;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionKey;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionTuple;
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent;
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DmiInEvent;
+import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService;
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler {
private static final Pattern SUBSCRIPTION_KEY_FROM_XPATH_PATTERN = Pattern.compile(
- "^/datastores/datastore\\[@name='([^']*)']/cm-handles/cm-handle\\[@id='([^']*)']/"
- + "filters/filter\\[@xpath='(.*)']$");
+ "^/datastores/datastore\\[@name='([^']*)']/cm-handles/cm-handle\\[@id='([^']*)']/"
+ + "filters/filter\\[@xpath='(.*)']$");
private final CmDataJobSubscriptionPersistenceService cmDataJobSubscriptionPersistenceService;
private final CmSubscriptionComparator cmSubscriptionComparator;
@Override
public void processSubscriptionDeleteRequest(final String subscriptionId) {
final Collection<DataNode> subscriptionDataNodes =
- cmDataJobSubscriptionPersistenceService.getAffectedDataNodes(subscriptionId);
+ cmDataJobSubscriptionPersistenceService.getAffectedDataNodes(subscriptionId);
final DmiCmSubscriptionTuple dmiCmSubscriptionTuple =
- getLastRemainingAndOverlappingSubscriptionsPerDmi(subscriptionDataNodes);
+ getLastRemainingAndOverlappingSubscriptionsPerDmi(subscriptionDataNodes);
dmiCacheHandler.add(subscriptionId, mergeDmiCmSubscriptionDetailsPerDmiMaps(dmiCmSubscriptionTuple));
if (dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi().isEmpty()) {
acceptAndSendDeleteRequest(subscriptionId);
} else {
sendSubscriptionDeleteRequestToDmi(subscriptionId,
- dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi(
- dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi()));
+ dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi(
+ dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi()));
scheduleNcmpOutEventResponse(subscriptionId, "subscriptionDeleteResponse");
}
}
private Map<String, DmiCmSubscriptionDetails> mergeDmiCmSubscriptionDetailsPerDmiMaps(
- final DmiCmSubscriptionTuple dmiCmSubscriptionTuple) {
+ final DmiCmSubscriptionTuple dmiCmSubscriptionTuple) {
final Map<String, DmiCmSubscriptionDetails> lastRemainingDmiSubscriptionsPerDmi =
- dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi(
- dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi());
+ dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi(
+ dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi());
final Map<String, DmiCmSubscriptionDetails> overlappingDmiSubscriptionsPerDmi =
- dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi(
- dmiCmSubscriptionTuple.overlappingSubscriptionsPerDmi());
+ dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi(
+ dmiCmSubscriptionTuple.overlappingSubscriptionsPerDmi());
final Map<String, DmiCmSubscriptionDetails> mergedDmiSubscriptionsPerDmi =
- new HashMap<>(lastRemainingDmiSubscriptionsPerDmi);
+ new HashMap<>(lastRemainingDmiSubscriptionsPerDmi);
overlappingDmiSubscriptionsPerDmi.forEach((dmiServiceName, dmiCmSubscriptionDetails) ->
- mergedDmiSubscriptionsPerDmi.merge(dmiServiceName, dmiCmSubscriptionDetails,
- this::mergeDmiCmSubscriptionDetails));
+ mergedDmiSubscriptionsPerDmi.merge(dmiServiceName, dmiCmSubscriptionDetails,
+ this::mergeDmiCmSubscriptionDetails));
return mergedDmiSubscriptionsPerDmi;
}
private DmiCmSubscriptionDetails mergeDmiCmSubscriptionDetails(
- final DmiCmSubscriptionDetails dmiCmSubscriptionDetails,
- final DmiCmSubscriptionDetails otherDmiCmSubscriptionDetails) {
+ final DmiCmSubscriptionDetails dmiCmSubscriptionDetails,
+ final DmiCmSubscriptionDetails otherDmiCmSubscriptionDetails) {
final List<DmiCmSubscriptionPredicate> mergedDmiCmSubscriptionPredicates =
- new ArrayList<>(dmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates());
+ new ArrayList<>(dmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates());
mergedDmiCmSubscriptionPredicates.addAll(otherDmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates());
return new DmiCmSubscriptionDetails(mergedDmiCmSubscriptionPredicates, CmSubscriptionStatus.PENDING);
}
private void rejectAndSendCreateRequest(final String subscriptionId, final List<Predicate> predicates) {
final Set<String> subscriptionTargetFilters =
- predicates.stream().flatMap(predicate -> predicate.getTargetFilter().stream())
- .collect(Collectors.toSet());
+ predicates.stream().flatMap(predicate -> predicate.getTargetFilter().stream())
+ .collect(Collectors.toSet());
final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEventForRejectedRequest(subscriptionId,
- new ArrayList<>(subscriptionTargetFilters));
+ new ArrayList<>(subscriptionTargetFilters));
ncmpOutEventProducer.sendNcmpOutEvent(subscriptionId, "subscriptionCreateResponse", ncmpOutEvent, false);
}
final Set<String> dmiServiceNames = dmiCacheHandler.get(subscriptionId).keySet();
for (final String dmiServiceName : dmiServiceNames) {
dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiServiceName,
- CmSubscriptionStatus.ACCEPTED);
+ CmSubscriptionStatus.ACCEPTED);
dmiCacheHandler.removeFromDatabase(subscriptionId, dmiServiceName);
}
final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEvent(subscriptionId,
- dmiCacheHandler.get(subscriptionId));
+ dmiCacheHandler.get(subscriptionId));
ncmpOutEventProducer.sendNcmpOutEvent(subscriptionId, "subscriptionDeleteResponse", ncmpOutEvent,
- false);
+ false);
}
private void handleNewCmSubscription(final String subscriptionId) {
final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
- dmiCacheHandler.get(subscriptionId);
+ dmiCacheHandler.get(subscriptionId);
dmiSubscriptionsPerDmi.forEach((dmiPluginName, dmiSubscriptionDetails) -> {
final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates =
- cmSubscriptionComparator.getNewDmiSubscriptionPredicates(
- dmiSubscriptionDetails.getDmiCmSubscriptionPredicates());
+ cmSubscriptionComparator.getNewDmiSubscriptionPredicates(
+ dmiSubscriptionDetails.getDmiCmSubscriptionPredicates());
if (dmiCmSubscriptionPredicates.isEmpty()) {
acceptAndPersistCmSubscriptionPerDmi(subscriptionId, dmiPluginName);
final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
final DmiInEvent dmiInEvent = dmiInEventMapper.toDmiInEvent(dmiCmSubscriptionPredicates);
dmiInEventProducer.sendDmiInEvent(subscriptionId, dmiPluginName,
- "subscriptionCreateRequest", dmiInEvent);
+ "subscriptionCreateRequest", dmiInEvent);
}
private void acceptAndPersistCmSubscriptionPerDmi(final String subscriptionId, final String dmiPluginName) {
dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiPluginName,
- CmSubscriptionStatus.ACCEPTED);
+ CmSubscriptionStatus.ACCEPTED);
dmiCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName);
}
private void sendSubscriptionDeleteRequestToDmi(final String subscriptionId,
final Map<String, DmiCmSubscriptionDetails>
- dmiCmSubscriptionsPerDmi) {
+ dmiCmSubscriptionsPerDmi) {
dmiCmSubscriptionsPerDmi.forEach((dmiPluginName, dmiCmSubscriptionDetails) -> {
final DmiInEvent dmiInEvent =
- dmiInEventMapper.toDmiInEvent(
- dmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates());
+ dmiInEventMapper.toDmiInEvent(
+ dmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates());
dmiInEventProducer.sendDmiInEvent(subscriptionId,
- dmiPluginName, "subscriptionDeleteRequest", dmiInEvent);
+ dmiPluginName, "subscriptionDeleteRequest", dmiInEvent);
});
}
private DmiCmSubscriptionTuple getLastRemainingAndOverlappingSubscriptionsPerDmi(
- final Collection<DataNode> subscriptionNodes) {
+ final Collection<DataNode> subscriptionNodes) {
final Map<String, Collection<DmiCmSubscriptionKey>> lastRemainingSubscriptionsPerDmi = new HashMap<>();
final Map<String, Collection<DmiCmSubscriptionKey>> overlappingSubscriptionsPerDmi = new HashMap<>();
for (final DataNode subscriptionNode : subscriptionNodes) {
final DmiCmSubscriptionKey dmiCmSubscriptionKey = extractCmSubscriptionKey(subscriptionNode.getXpath());
final String dmiServiceName = inventoryPersistence.getYangModelCmHandle(
- dmiCmSubscriptionKey.cmHandleId()).getDmiServiceName();
- @SuppressWarnings("unchecked")
- final List<String> subscribers = (List<String>) subscriptionNode.getLeaves().get("subscriptionIds");
+ dmiCmSubscriptionKey.cmHandleId()).getDmiServiceName();
+ @SuppressWarnings("unchecked") final List<String> subscribers =
+ (List<String>) subscriptionNode.getLeaves().get("subscriptionIds");
populateDmiCmSubscriptionTuple(subscribers, overlappingSubscriptionsPerDmi,
- lastRemainingSubscriptionsPerDmi, dmiServiceName, dmiCmSubscriptionKey);
+ lastRemainingSubscriptionsPerDmi, dmiServiceName, dmiCmSubscriptionKey);
}
return new DmiCmSubscriptionTuple(lastRemainingSubscriptionsPerDmi, overlappingSubscriptionsPerDmi);
}
private static void populateDmiCmSubscriptionTuple(final List<String> subscribers,
final Map<String, Collection<DmiCmSubscriptionKey>>
- overlappingSubscriptionsPerDmi,
+ overlappingSubscriptionsPerDmi,
final Map<String, Collection<DmiCmSubscriptionKey>>
- lastRemainingSubscriptionsPerDmi,
+ lastRemainingSubscriptionsPerDmi,
final String dmiServiceName,
final DmiCmSubscriptionKey dmiCmSubscriptionKey) {
final Map<String, Collection<DmiCmSubscriptionKey>> targetMap =
- subscribers.size() > 1 ? overlappingSubscriptionsPerDmi : lastRemainingSubscriptionsPerDmi;
+ subscribers.size() > 1 ? overlappingSubscriptionsPerDmi : lastRemainingSubscriptionsPerDmi;
targetMap.computeIfAbsent(dmiServiceName, dmiName -> new HashSet<>()).add(dmiCmSubscriptionKey);
}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
+package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.DataJobSubscriptionOperationInEvent;
+import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataJobSubscriptionOperationInEvent;
import org.onap.cps.ncmp.impl.utils.JexParser;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
* @param dataJobSubscriptionOperationInEvent the event to be consumed
*/
@KafkaListener(topics = "${app.ncmp.avc.cm-subscription-ncmp-in}",
- containerFactory = "legacyEventConcurrentKafkaListenerContainerFactory",
- properties = {"spring.json.value.default.type="
- + "org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0"
- + ".client_to_ncmp.DataJobSubscriptionOperationInEvent"})
+ containerFactory = "legacyEventConcurrentKafkaListenerContainerFactory",
+ properties = {"spring.json.value.default.type="
+ + "org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataJobSubscriptionOperationInEvent"})
public void consumeSubscriptionEvent(
- final DataJobSubscriptionOperationInEvent dataJobSubscriptionOperationInEvent) {
+ final DataJobSubscriptionOperationInEvent dataJobSubscriptionOperationInEvent) {
final String eventType = dataJobSubscriptionOperationInEvent.getEventType();
final String dataNodeSelector = dataJobSubscriptionOperationInEvent.getEvent().getDataJob()
- .getProductionJobDefinition().getTargetSelector().getDataNodeSelector();
+ .getProductionJobDefinition().getTargetSelector().getDataNodeSelector();
final List<String> fdns = JexParser.extractFdnsFromLocationPaths(dataNodeSelector);
final String dataJobId = dataJobSubscriptionOperationInEvent.getEvent().getDataJob().getId();
final String dataTypeId = dataJobSubscriptionOperationInEvent.getEvent().getDataType() != null
- ? dataJobSubscriptionOperationInEvent.getEvent().getDataType().getDataTypeId() : "UNKNOWN";
+ ? dataJobSubscriptionOperationInEvent.getEvent().getDataType().getDataTypeId() : "UNKNOWN";
log.info("Consumed subscription event with details: | jobId={} | eventType={} | fdns={} | dataType={}",
- dataJobId, eventType, fdns, dataTypeId);
+ dataJobId, eventType, fdns, dataTypeId);
}
}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
+package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.Data;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate;
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.Data;
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent;
import org.springframework.stereotype.Component;
@Component
* @return CmNotificationSubscriptionNcmpOutEvent to sent back to the client
*/
public NcmpOutEvent toNcmpOutEvent(final String subscriptionId,
- final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi) {
+ final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi) {
final NcmpOutEvent ncmpOutEvent = new NcmpOutEvent();
final Data cmSubscriptionData = new Data();
cmSubscriptionData.setSubscriptionId(subscriptionId);
populateNcmpOutEventWithCmHandleIds(dmiSubscriptionsPerDmi,
- cmSubscriptionData);
+ cmSubscriptionData);
ncmpOutEvent.setData(cmSubscriptionData);
return ncmpOutEvent;
/**
* Mapper to form a rejected response for the client for the Cm Notification Subscription Request.
*
- * @param subscriptionId subscription id
+ * @param subscriptionId subscription id
* @param rejectedTargetFilters list of rejected target filters for the subscription request
* @return to sent back to the client
*/
public NcmpOutEvent toNcmpOutEventForRejectedRequest(final String subscriptionId,
- final List<String> rejectedTargetFilters) {
+ final List<String> rejectedTargetFilters) {
final NcmpOutEvent ncmpOutEvent = new NcmpOutEvent();
final Data cmSubscriptionData = new Data();
cmSubscriptionData.setSubscriptionId(subscriptionId);
}
private void populateNcmpOutEventWithCmHandleIds(
- final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi,
- final Data cmSubscriptionData) {
+ final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi,
+ final Data cmSubscriptionData) {
final Collection<String> acceptedCmHandleIds = new HashSet<>();
final Collection<String> pendingCmHandleIds = new HashSet<>();
dmiSubscriptionsPerDmi.forEach((dmiPluginName, dmiSubscriptionDetails) -> {
final CmSubscriptionStatus cmSubscriptionStatus =
- dmiSubscriptionDetails.getCmSubscriptionStatus();
+ dmiSubscriptionDetails.getCmSubscriptionStatus();
final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates =
- dmiSubscriptionDetails.getDmiCmSubscriptionPredicates();
+ dmiSubscriptionDetails.getDmiCmSubscriptionPredicates();
switch (cmSubscriptionStatus) {
case ACCEPTED -> acceptedCmHandleIds.addAll(
- extractCmHandleIds(dmiCmSubscriptionPredicates));
+ extractCmHandleIds(dmiCmSubscriptionPredicates));
case PENDING -> pendingCmHandleIds.addAll(extractCmHandleIds(dmiCmSubscriptionPredicates));
default -> rejectedCmHandleIds.addAll(extractCmHandleIds(dmiCmSubscriptionPredicates));
}
}
private List<String> extractCmHandleIds(
- final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
+ final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
final List<String> cmHandleIds = new ArrayList<>();
dmiCmSubscriptionPredicates.forEach(dmiSubscriptionPredicate -> cmHandleIds.addAll(
- dmiSubscriptionPredicate.getTargetCmHandleIds()));
+ dmiSubscriptionPredicate.getTargetCmHandleIds()));
return cmHandleIds;
}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
+package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp;
import static org.onap.cps.ncmp.events.NcmpEventDataSchema.SUBSCRIPTIONS_V1;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.events.EventsProducer;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
+import org.onap.cps.ncmp.impl.datajobs.subscription.cache.DmiCacheHandler;
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent;
import org.onap.cps.ncmp.utils.events.NcmpEvent;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
private final DmiCacheHandler dmiCacheHandler;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private static final Map<String, ScheduledFuture<?>> scheduledTasksPerSubscriptionIdAndEventType =
- new ConcurrentHashMap<>();
+ new ConcurrentHashMap<>();
/**
* Send the event to the client who requested the subscription with key as subscription id and event is Cloud
final ScheduledFuture<?> scheduledFuture = scheduleAndSendNcmpOutEvent(subscriptionId, eventType);
scheduledTasksPerSubscriptionIdAndEventType.putIfAbsent(taskKey, scheduledFuture);
log.debug("Scheduled the Cm Subscription Event for subscriptionId : {} and eventType : {}", subscriptionId,
- eventType);
+ eventType);
} else {
cancelScheduledTask(taskKey);
if (ncmpOutEvent != null) {
sendNcmpOutEventNow(subscriptionId, eventType, ncmpOutEvent);
log.debug("Sent Cm Subscription Event on demand for subscriptionId : {} and eventType : {}",
- subscriptionId, eventType);
+ subscriptionId, eventType);
}
}
}
/**
* Get an NCMP out event as cloud event.
*
- * @param subscriptionId subscription id
- * @param eventType event type
- * @param ncmpOutEvent cm notification subscription NCMP out event
+ * @param subscriptionId subscription id
+ * @param eventType event type
+ * @param ncmpOutEvent cm notification subscription NCMP out event
* @return cm notification subscription NCMP out event as cloud event
*/
public static CloudEvent buildAndGetNcmpOutEventAsCloudEvent(
- final String subscriptionId, final String eventType, final NcmpOutEvent ncmpOutEvent) {
+ final String subscriptionId, final String eventType, final NcmpOutEvent ncmpOutEvent) {
return NcmpEvent.builder()
- .type(eventType)
- .dataSchema(SUBSCRIPTIONS_V1.getDataSchema())
- .extensions(Map.of("correlationid", subscriptionId))
- .data(ncmpOutEvent)
- .build()
- .asCloudEvent();
+ .type(eventType)
+ .dataSchema(SUBSCRIPTIONS_V1.getDataSchema())
+ .extensions(Map.of("correlationid", subscriptionId))
+ .data(ncmpOutEvent)
+ .build()
+ .asCloudEvent();
}
private ScheduledFuture<?> scheduleAndSendNcmpOutEvent(final String subscriptionId, final String eventType) {
final NcmpOutEventPublishingTask ncmpOutEventPublishingTask =
- new NcmpOutEventPublishingTask(ncmpOutEventTopic, subscriptionId, eventType, eventsProducer,
- ncmpOutEventMapper, dmiCacheHandler);
+ new NcmpOutEventPublishingTask(ncmpOutEventTopic, subscriptionId, eventType, eventsProducer,
+ ncmpOutEventMapper, dmiCacheHandler);
return scheduledExecutorService.schedule(ncmpOutEventPublishingTask, dmiOutEventTimeoutInMs,
- TimeUnit.MILLISECONDS);
+ TimeUnit.MILLISECONDS);
}
private void cancelScheduledTask(final String taskKey) {
private void sendNcmpOutEventNow(final String subscriptionId, final String eventType,
final NcmpOutEvent ncmpOutEvent) {
final CloudEvent ncmpOutEventAsCloudEvent =
- buildAndGetNcmpOutEventAsCloudEvent(subscriptionId, eventType, ncmpOutEvent);
+ buildAndGetNcmpOutEventAsCloudEvent(subscriptionId, eventType, ncmpOutEvent);
eventsProducer.sendCloudEvent(ncmpOutEventTopic, subscriptionId, ncmpOutEventAsCloudEvent);
dmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId);
}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
+package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp;
-import static org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp.NcmpOutEventProducer.buildAndGetNcmpOutEventAsCloudEvent;
+import static org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.NcmpOutEventProducer.buildAndGetNcmpOutEventAsCloudEvent;
import io.cloudevents.CloudEvent;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.events.EventsProducer;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
+import org.onap.cps.ncmp.impl.datajobs.subscription.cache.DmiCacheHandler;
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails;
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent;
@Slf4j
@RequiredArgsConstructor
@Override
public void run() {
final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
- dmiCacheHandler.get(subscriptionId);
+ dmiCacheHandler.get(subscriptionId);
final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEvent(subscriptionId,
- dmiSubscriptionsPerDmi);
+ dmiSubscriptionsPerDmi);
eventsProducer.sendCloudEvent(topicName, subscriptionId,
- buildAndGetNcmpOutEventAsCloudEvent(subscriptionId, eventType, ncmpOutEvent));
+ buildAndGetNcmpOutEventAsCloudEvent(subscriptionId, eventType, ncmpOutEvent));
dmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId);
}
}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.utils;
+package org.onap.cps.ncmp.impl.datajobs.subscription.utils;
import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS;
private static final String CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME = "cm-data-job-subscriptions";
private static final String CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH = "/dataJob";
private static final String CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE =
- "/dataJob/subscription[@alternateId='%s' and @dataTypeId='%s']";
+ "/dataJob/subscription[@alternateId='%s' and @dataTypeId='%s']";
private static final String CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID =
- "//subscription/dataJobId[text()='%s']";
+ "//subscription/dataJobId[text()='%s']";
private final JsonObjectMapper jsonObjectMapper;
private final CpsQueryService cpsQueryService;
/**
* Check if we have a cm data job subscription for the given data type and target (FDN).
*
- * @param dataType the data type of the data job subscription
- * @param alternateId the alternate id target of the data job subscription
- * @return true if the subscription details has at least one subscriber , otherwise false
+ * @param dataType the data type of the data job subscription
+ * @param alternateId the alternate id target of the data job subscription
+ * @return true if the subscription details has at least one subscriber , otherwise false
*/
public boolean hasAtLeastOneSubscription(final String dataType, final String alternateId) {
return !getSubscriptionIds(dataType, alternateId).isEmpty();
public boolean isNewSubscriptionId(final String subscriptionId) {
final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID.formatted(subscriptionId);
return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME,
- query, OMIT_DESCENDANTS).isEmpty();
+ query, OMIT_DESCENDANTS).isEmpty();
}
/**
* Get the ids for the subscriptions for the given data type and targets.
*
- * @param dataType the data type of the data job subscription
- * @param alternateId the alternate id target of the data job subscription
- * @return collection of subscription ids of ongoing cm notification subscription
+ * @param dataType the data type of the data job subscription
+ * @param alternateId the alternate id target of the data job subscription
+ * @return collection of subscription ids of ongoing cm notification subscription
*/
@SuppressWarnings("unchecked")
public Collection<String> getSubscriptionIds(final String dataType, final String alternateId) {
final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted(
- alternateId, dataType);
+ alternateId, dataType);
final Collection<DataNode> existingNodes =
- cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME,
- query, OMIT_DESCENDANTS);
+ cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME,
+ query, OMIT_DESCENDANTS);
if (existingNodes.isEmpty()) {
return Collections.emptyList();
}
/**
* Add cm notification data job subscription.
*
- * @param dataType the data type of the data job subscription
- * @param alternateId the alternate id target of the data job subscription
+ * @param dataType the data type of the data job subscription
+ * @param alternateId the alternate id target of the data job subscription
* @param subscriptionId data job subscription id to be added
*/
public void addSubscription(final String dataType, final String alternateId, final String subscriptionId) {
/**
* Remove cm notification data job Subscription.
*
- * @param dataType the data type of the data job subscription
- * @param alternateId the alternate id target of the data job subscription
- * @param subscriptionId data subscription id to remove
+ * @param dataType the data type of the data job subscription
+ * @param alternateId the alternate id target of the data job subscription
+ * @param subscriptionId data subscription id to remove
*/
public void removeSubscription(final String dataType, final String alternateId, final String subscriptionId) {
final Collection<String> subscriptionIds = getSubscriptionIds(dataType, alternateId);
/**
* Retrieve all existing data nodes for given data job subscription id.
*
- * @param subscriptionId data job subscription id
- * @return collection of DataNodes
+ * @param subscriptionId data job subscription id
+ * @return collection of DataNodes
*/
public Collection<DataNode> getAffectedDataNodes(final String subscriptionId) {
final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID.formatted(subscriptionId);
return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME,
- query, OMIT_DESCENDANTS);
+ query, OMIT_DESCENDANTS);
}
private void deleteUnusedSubscriptionDetails(final String dataType, final String alternateId) {
final String deleteListOfSubscriptionCpsPathQuery =
- CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted(alternateId,
- dataType);
+ CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted(alternateId,
+ dataType);
cpsDataService.deleteDataNode(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME,
- deleteListOfSubscriptionCpsPathQuery, OffsetDateTime.now());
+ deleteListOfSubscriptionCpsPathQuery, OffsetDateTime.now());
}
private void addNewSubscriptionDetails(final String dataType,
final String subscriptionId) {
final Collection<String> newSubscriptionList = Collections.singletonList(subscriptionId);
final String subscriptionDetailsAsJson = getSubscriptionDetailsAsJson(newSubscriptionList, dataType,
- alternateId);
+ alternateId);
cpsDataService.saveData(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME, subscriptionDetailsAsJson,
- OffsetDateTime.now(), ContentType.JSON);
+ OffsetDateTime.now(), ContentType.JSON);
}
private void updateSubscriptionDetails(final Collection<String> subscriptionIds, final String dataType,
final String alternateId) {
final String subscriptionDetailsAsJson = getSubscriptionDetailsAsJson(subscriptionIds, dataType, alternateId);
cpsDataService.updateNodeLeaves(NCMP_DATASPACE_NAME, CM_DATA_JOB_SUBSCRIPTIONS_ANCHOR_NAME,
- CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH, subscriptionDetailsAsJson, OffsetDateTime.now(),
- ContentType.JSON);
+ CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH, subscriptionDetailsAsJson, OffsetDateTime.now(),
+ ContentType.JSON);
}
private String getSubscriptionDetailsAsJson(final Collection<String> subscriptionIds,
final String dataTypeId,
final String alternateId) {
final Map<String, Serializable> subscriptionDetailsAsMap =
- Map.of("dataTypeId", dataTypeId,
- "alternateId", alternateId,
- "dataJobId", (Serializable) subscriptionIds);
+ Map.of("dataTypeId", dataTypeId,
+ "alternateId", alternateId,
+ "dataJobId", (Serializable) subscriptionIds);
return "{\"subscription\":[" + jsonObjectMapper.asJsonString(subscriptionDetailsAsMap) + "]}";
}
revision "2022-05-10" {
description
- "Added data-sync-enabled, sync-state with state, last-sync-time, data-store-sync-state with operational and running syncstate";
+ "Added data-sync-enabled, sync-state with state, last-sync-time, data-store-sync-state with operational and running sync state";
}
revision "2022-02-10" {
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.cache
+package org.onap.cps.ncmp.impl.datajobs.subscription.cache
import com.hazelcast.core.Hazelcast
import com.hazelcast.map.IMap
import org.onap.cps.ncmp.api.data.models.DatastoreType
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import spock.lang.Specification
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.cache
+package org.onap.cps.ncmp.impl.datajobs.subscription.cache
import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
import io.cloudevents.core.builder.CloudEventBuilder
import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmDataJobSubscriptionPersistenceService
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.NcmpInEvent
+import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.NcmpInEvent
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails
+import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
import org.onap.cps.ncmp.utils.TestUtils
def objectUnderTest = new DmiCacheHandler(mockCmSubscriptionPersistenceService, testCache, mockInventoryPersistence)
NcmpInEvent ncmpInEvent
- def yangModelCmHandle1 = new YangModelCmHandle(id:'ch1',dmiServiceName:'dmi-1')
- def yangModelCmHandle2 = new YangModelCmHandle(id:'ch2',dmiServiceName:'dmi-2')
- def yangModelCmHandle3 = new YangModelCmHandle(id:'ch3',dmiServiceName:'dmi-1')
- def yangModelCmHandle4 = new YangModelCmHandle(id:'ch4',dmiServiceName:'dmi-2')
+ def yangModelCmHandle1 = new YangModelCmHandle(id: 'ch1', dmiServiceName: 'dmi-1')
+ def yangModelCmHandle2 = new YangModelCmHandle(id: 'ch2', dmiServiceName: 'dmi-2')
+ def yangModelCmHandle3 = new YangModelCmHandle(id: 'ch3', dmiServiceName: 'dmi-1')
+ def yangModelCmHandle4 = new YangModelCmHandle(id: 'ch4', dmiServiceName: 'dmi-2')
def setup() {
setUpTestEvent()
def 'Get cache entry via subscription id'() {
given: 'the cache contains value for some-id'
- testCache.put('some-id',[:])
+ testCache.put('some-id', [:])
when: 'the get method is called'
def result = objectUnderTest.get('some-id')
then: 'correct value is returned as expected'
given: 'a map as the value for cache entry for some-id'
def testMap = [:]
testMap.put("dmi-1",
- new DmiCmSubscriptionDetails([],CmSubscriptionStatus.ACCEPTED))
+ new DmiCmSubscriptionDetails([], CmSubscriptionStatus.ACCEPTED))
testMap.put("dmi-2",
- new DmiCmSubscriptionDetails([],CmSubscriptionStatus.REJECTED))
+ new DmiCmSubscriptionDetails([], CmSubscriptionStatus.REJECTED))
testMap.put("dmi-3",
- new DmiCmSubscriptionDetails([],CmSubscriptionStatus.PENDING))
+ new DmiCmSubscriptionDetails([], CmSubscriptionStatus.PENDING))
testCache.put("test-id", testMap)
assert testCache.get("test-id").size() == 3
when: 'the method to remove accepted and rejected entries for test-id is called'
objectUnderTest.removeAcceptedAndRejectedDmiSubscriptionEntries("test-id")
then: 'all entries with status accepted/rejected are no longer present for test-id'
- testCache.get("test-id").each { key, testResultMap ->
+ testCache.get("test-id").each {key, testResultMap ->
assert testResultMap.cmSubscriptionStatus != CmSubscriptionStatus.ACCEPTED
|| testResultMap.cmSubscriptionStatus != CmSubscriptionStatus.REJECTED
}
def subscriptionId = ncmpInEvent.getData().getSubscriptionId()
objectUnderTest.add(subscriptionId, predicates)
when: 'subscription status per dmi is updated in cache'
- objectUnderTest.updateDmiSubscriptionStatus(subscriptionId,'dmi-1', CmSubscriptionStatus.ACCEPTED)
+ objectUnderTest.updateDmiSubscriptionStatus(subscriptionId, 'dmi-1', CmSubscriptionStatus.ACCEPTED)
then: 'verify status has been updated in cache'
def predicate = testCache.get(subscriptionId)
assert predicate.get('dmi-1').cmSubscriptionStatus == CmSubscriptionStatus.ACCEPTED
def subscriptionId = ncmpInEvent.getData().getSubscriptionId()
objectUnderTest.add(subscriptionId, predicates)
when: 'subscription is persisted in database'
- objectUnderTest.persistIntoDatabasePerDmi(subscriptionId,'dmi-1')
+ objectUnderTest.persistIntoDatabasePerDmi(subscriptionId, 'dmi-1')
then: 'persistence service is called the correct number of times per dmi'
2 * mockCmSubscriptionPersistenceService.addSubscription(*_)
}
def subscriptionId = ncmpInEvent.getData().getSubscriptionId()
objectUnderTest.add(subscriptionId, predicates)
when: 'subscription is persisted in database'
- objectUnderTest.removeFromDatabase(subscriptionId,'dmi-1')
+ objectUnderTest.removeFromDatabase(subscriptionId, 'dmi-1')
then: 'persistence service is called the correct number of times per dmi'
2 * mockCmSubscriptionPersistenceService.removeSubscription(*_)
}
- def setUpTestEvent(){
+ def setUpTestEvent() {
def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent.class)
def testCloudEventSent = CloudEventBuilder.v1()
ncmpInEvent = toTargetEvent(cloudEvent, NcmpInEvent.class);
}
- def initialiseMockInventoryPersistenceResponses(){
- mockInventoryPersistence.getYangModelCmHandles(['ch1','ch2'])
+ def initialiseMockInventoryPersistenceResponses() {
+ mockInventoryPersistence.getYangModelCmHandles(['ch1', 'ch2'])
>> [yangModelCmHandle1, yangModelCmHandle2]
- mockInventoryPersistence.getYangModelCmHandles(['ch3','ch4'])
+ mockInventoryPersistence.getYangModelCmHandles(['ch3', 'ch4'])
>> [yangModelCmHandle3, yangModelCmHandle4]
}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.cmavc
+package org.onap.cps.ncmp.impl.datajobs.subscription.cmavc
import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
def sourceSystem = 'ONAP-DMI-PLUGIN'
def testCloudEventSent = buildCloudEvent(sourceSystem, 'some-cmhandle-id')
and: 'a separate topic for this test'
- cmAvcEventConsumer.cmEventsTopicName = 'some-topic-for-Test-B'
+ cmAvcEventConsumer.cmEventsTopicName = 'some-topic-for-Test-B'
and: 'inventory persistence service has #scenario'
def compositeState = new CompositeState(dataSyncEnabled: dataSyncFlag)
1 * mockInventoryPersistence.getCmHandleState(_) >> compositeState
when: 'the event is consumed'
cmAvcEventConsumer.consumeAndForward(consumerRecord)
then: 'cm avc event is processed for updating the cached data'
- expectedCallToProcessCmAvcEvent * mockCmAvcEventService.processCmAvcEvent(testEventKey, _) >> { args ->
+ expectedCallToProcessCmAvcEvent * mockCmAvcEventService.processCmAvcEvent(testEventKey, _) >> {args ->
{
assert args[1] instanceof AvcEvent
}
}
- def buildCloudEvent(sourceSystem, cmHandleId){
+ def buildCloudEvent(sourceSystem, cmHandleId) {
return CloudEventBuilder.v1()
.withData(jsonObjectMapper.asJsonBytes(testAvcEvent))
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.cmavc
+package org.onap.cps.ncmp.impl.datajobs.subscription.cmavc
import com.fasterxml.jackson.databind.ObjectMapper
import org.onap.cps.api.CpsAnchorService
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi
+package org.onap.cps.ncmp.impl.datajobs.subscription.dmi
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionKey
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionKey
import spock.lang.Specification
class DmiCmSubscriptionDetailsPerDmiMapperSpec extends Specification {
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi
+package org.onap.cps.ncmp.impl.datajobs.subscription.dmi
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
import spock.lang.Specification
def 'Check for Cm Notification Subscription DMI In Event mapping'() {
given: 'a collection of cm subscription predicates'
def dmiSubscriptionPredicates = [new DmiCmSubscriptionPredicate(['ch-1'].toSet(), PASSTHROUGH_RUNNING, ['/ch-1'].toSet()),
- new DmiCmSubscriptionPredicate(['ch-2'].toSet(), PASSTHROUGH_OPERATIONAL, ['/ch-2'].toSet())]
+ new DmiCmSubscriptionPredicate(['ch-2'].toSet(), PASSTHROUGH_OPERATIONAL, ['/ch-2'].toSet())]
when: 'we try to map the values'
def result = objectUnderTest.toDmiInEvent(dmiSubscriptionPredicates)
then: 'it contains correct cm notification subscription cmhandle object'
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi
+package org.onap.cps.ncmp.impl.datajobs.subscription.dmi
import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
import io.cloudevents.core.v1.CloudEventBuilder
import org.onap.cps.events.EventsProducer
import org.onap.cps.ncmp.config.CpsApplicationContext
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.CmHandle
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.Data
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.CmHandle
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.Data
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DmiInEvent
import org.onap.cps.ncmp.utils.events.CloudEventMapper
import org.onap.cps.utils.JsonObjectMapper
import org.springframework.boot.test.context.SpringBootTest
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi
+package org.onap.cps.ncmp.impl.datajobs.subscription.dmi
import ch.qos.logback.classic.Level
import ch.qos.logback.classic.Logger
import io.cloudevents.CloudEvent
import io.cloudevents.core.builder.CloudEventBuilder
import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp.NcmpOutEventMapper
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp.NcmpOutEventProducer
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.dmi_to_ncmp.Data
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.dmi_to_ncmp.DmiOutEvent
+import org.onap.cps.ncmp.impl.datajobs.subscription.dmi_to_ncmp.Data
+import org.onap.cps.ncmp.impl.datajobs.subscription.dmi_to_ncmp.DmiOutEvent
+import org.onap.cps.ncmp.impl.datajobs.subscription.cache.DmiCacheHandler
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.NcmpOutEventMapper
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.NcmpOutEventProducer
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.ncmp.utils.events.MessagingBaseSpec
import org.onap.cps.utils.JsonObjectMapper
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
-import static org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus.ACCEPTED
-import static org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus.REJECTED
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.REJECTED
@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
class DmiOutEventConsumerSpec extends MessagingBaseSpec {
when: 'the event is consumed'
objectUnderTest.consumeDmiOutEvent(consumerRecord)
then: 'correct number of calls to cache'
- expectedCacheCalls * mockDmiCacheHandler.updateDmiSubscriptionStatus('sub-1','test-dmi-plugin-name', subscriptionStatus)
+ expectedCacheCalls * mockDmiCacheHandler.updateDmiSubscriptionStatus('sub-1', 'test-dmi-plugin-name', subscriptionStatus)
and: 'correct number of calls to persist cache'
- expectedPersistenceCalls * mockDmiCacheHandler.persistIntoDatabasePerDmi('sub-1','test-dmi-plugin-name')
+ expectedPersistenceCalls * mockDmiCacheHandler.persistIntoDatabasePerDmi('sub-1', 'test-dmi-plugin-name')
and: 'correct number of calls to map the ncmp out event'
1 * mockNcmpOutEventMapper.toNcmpOutEvent('sub-1', _)
and: 'correct number of calls to send the ncmp out event to client'
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp
+package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmDataJobSubscriptionPersistenceService
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate
+import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService
import spock.lang.Specification
import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_OPERATIONAL
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp
+package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp
import com.fasterxml.jackson.databind.ObjectMapper
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiCmSubscriptionDetailsPerDmiMapper
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventMapper
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventProducer
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmDataJobSubscriptionPersistenceService
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.NcmpInEvent
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent
+import org.onap.cps.api.model.DataNode
+import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.NcmpInEvent
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DmiInEvent
+import org.onap.cps.ncmp.impl.datajobs.subscription.cache.DmiCacheHandler
+import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiCmSubscriptionDetailsPerDmiMapper
+import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventMapper
+import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventProducer
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate
+import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
import org.onap.cps.ncmp.utils.TestUtils
-import org.onap.cps.api.model.DataNode
import org.onap.cps.utils.JsonObjectMapper
import spock.lang.Specification
import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_OPERATIONAL
-import static org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus.ACCEPTED
-import static org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus.PENDING
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.PENDING
class CmSubscriptionHandlerImplSpec extends Specification {
when: 'the subscription delete request is processed'
objectUnderTest.processSubscriptionDeleteRequest(subscriptionId)
then: 'the method to send a dmi event is called with correct parameters'
- 1 * mockDmiInEventProducer.sendDmiInEvent(subscriptionId,'dmi-1','subscriptionDeleteRequest',_)
- 1 * mockDmiInEventProducer.sendDmiInEvent(subscriptionId,'dmi-2','subscriptionDeleteRequest',_)
+ 1 * mockDmiInEventProducer.sendDmiInEvent(subscriptionId, 'dmi-1', 'subscriptionDeleteRequest', _)
+ 1 * mockDmiInEventProducer.sendDmiInEvent(subscriptionId, 'dmi-2', 'subscriptionDeleteRequest', _)
and: 'the method to send nmcp out event is called with correct parameters'
1 * mockNcmpOutEventProducer.sendNcmpOutEvent(subscriptionId, 'subscriptionDeleteResponse', null, true)
}
def subscriptionId = 'test-id'
and: 'the persistence service returns datanodes with multiple subscribers'
1 * mockCmSubscriptionPersistenceService.getAffectedDataNodes(subscriptionId) >>
- [new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-1']/filters/filter[@xpath='x/y']", leaves: ['xpath': 'x/y', 'subscriptionIds': ['test-id','other-id']]),
- new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-2']/filters/filter[@xpath='y/z']", leaves: ['xpath': 'y/z', 'subscriptionIds': ['test-id','other-id']])]
+ [new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-1']/filters/filter[@xpath='x/y']", leaves: ['xpath': 'x/y', 'subscriptionIds': ['test-id', 'other-id']]),
+ new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-2']/filters/filter[@xpath='y/z']", leaves: ['xpath': 'y/z', 'subscriptionIds': ['test-id', 'other-id']])]
and: 'the inventory persistence returns yang model cm handles'
1 * mockInventoryPersistence.getYangModelCmHandle('ch-1') >> new YangModelCmHandle(dmiServiceName: 'dmi-1')
1 * mockInventoryPersistence.getYangModelCmHandle('ch-2') >> new YangModelCmHandle(dmiServiceName: 'dmi-2')
and: 'the cache handler returns the relevant maps whenever called'
- 2 * mockDmiCacheHandler.get(subscriptionId) >> ['dmi-1':[:],'dmi-2':[:]]
+ 2 * mockDmiCacheHandler.get(subscriptionId) >> ['dmi-1': [:], 'dmi-2': [:]]
when: 'the subscription delete request is processed'
objectUnderTest.processSubscriptionDeleteRequest(subscriptionId)
then: 'the method to send a dmi event is never called'
- 0 * mockDmiInEventProducer.sendDmiInEvent(_,_,_,_)
+ 0 * mockDmiInEventProducer.sendDmiInEvent(_, _, _, _)
and: 'the cache handler is called to remove subscriber from database per dmi'
1 * mockDmiCacheHandler.removeFromDatabase('test-id', 'dmi-1')
1 * mockDmiCacheHandler.removeFromDatabase('test-id', 'dmi-2')
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp
+package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp
import ch.qos.logback.classic.Level
import ch.qos.logback.classic.Logger
import ch.qos.logback.classic.spi.ILoggingEvent
import ch.qos.logback.core.read.ListAppender
import com.fasterxml.jackson.databind.ObjectMapper
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.DataJobSubscriptionOperationInEvent
+import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataJobSubscriptionOperationInEvent
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.utils.JsonObjectMapper
import org.slf4j.LoggerFactory
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp
+package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails
+import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate
import spock.lang.Specification
import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_OPERATIONAL
import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_RUNNING
-import static org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus.ACCEPTED
-import static org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus.PENDING
-import static org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus.REJECTED
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.PENDING
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.REJECTED
class NcmpOutEventMapperSpec extends Specification {
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp
+package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp
import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
import io.cloudevents.core.v1.CloudEventBuilder
import org.onap.cps.events.EventsProducer
import org.onap.cps.ncmp.config.CpsApplicationContext
-import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.Data
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.Data
+import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent
+import org.onap.cps.ncmp.impl.datajobs.subscription.cache.DmiCacheHandler
import org.onap.cps.ncmp.utils.events.CloudEventMapper
import org.onap.cps.utils.JsonObjectMapper
import org.springframework.boot.test.context.SpringBootTest
}
}
where: 'following scenarios are considered'
- scenario | delayInMs | eventPublishingTaskToBeScheduled
+ scenario | delayInMs | eventPublishingTaskToBeScheduled
'send event now' | 0 | false
'schedule and send after the configured time ' | 1500 | true
}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cmnotificationsubscription.utils
-
-import static CmDataJobSubscriptionPersistenceService.CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH
-import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE
-import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID
-import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS
+package org.onap.cps.ncmp.impl.datajobs.subscription.utils
import com.fasterxml.jackson.databind.ObjectMapper
import org.onap.cps.api.CpsDataService
import org.onap.cps.utils.JsonObjectMapper
import spock.lang.Specification
+import static CmDataJobSubscriptionPersistenceService.CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH
+import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE
+import static CmDataJobSubscriptionPersistenceService.CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID
+import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS
+
class CmSubscriptionPersistenceServiceSpec extends Specification {
def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
then: 'we get expected result'
assert result == hasAtLeastOneSubscription
where: 'following scenarios are used'
- scenario | dataNode || hasAtLeastOneSubscription
- 'valid datanodes present' | [new DataNode(leaves: ['dataJobId': ['dataJobId1']])]|| true
- 'no datanodes present' | [] || false
+ scenario | dataNode || hasAtLeastOneSubscription
+ 'valid datanodes present' | [new DataNode(leaves: ['dataJobId': ['dataJobId1']])] || true
+ 'no datanodes present' | [] || false
}
def 'Checking uniqueness of incoming subscription ID'() {
given: 'a subscription exists when queried'
def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('altId1', 'dataType1')
mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', query, OMIT_DESCENDANTS)
- >> [new DataNode(leaves: ['dataJobId': ['existingId','subIdToRemove'], 'dataTypeId': 'dataType1', 'alternateId': 'altId1'])]
+ >> [new DataNode(leaves: ['dataJobId': ['existingId', 'subIdToRemove'], 'dataTypeId': 'dataType1', 'alternateId': 'altId1'])]
and: 'updated cm data job subscription details as json'
def subscriptionDetailsAsJson = objectUnderTest.getSubscriptionDetailsAsJson(['existingId'], 'dataType1', 'altId1')
when: 'the subscriber is removed'
- objectUnderTest.removeSubscription('dataType1', 'altId1','subIdToRemove')
+ objectUnderTest.removeSubscription('dataType1', 'altId1', 'subIdToRemove')
then: 'the list of subscribers is updated'
1 * mockCpsDataService.updateNodeLeaves('NCMP-Admin', 'cm-data-job-subscriptions', CM_DATA_JOB_SUBSCRIPTIONS_PARENT_NODE_XPATH, subscriptionDetailsAsJson, _, ContentType.JSON)
}
and: 'a cps path with alternate id and data type for deleting a node'
def cpsPath = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('last-alt-id', 'last-data-type')
when: 'that last ongoing subscription is removed'
- objectUnderTest.removeSubscription('last-data-type', 'last-alt-id','subIdToRemove')
+ objectUnderTest.removeSubscription('last-data-type', 'last-alt-id', 'subIdToRemove')
then: 'the data job subscription with empty subscribers list is removed'
1 * mockCpsDataService.deleteDataNode('NCMP-Admin', 'cm-data-job-subscriptions', cpsPath, _)
}
def query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_ALTERNATE_ID_AND_DATATYPE.formatted('some-alt-id', 'some-data-type')
mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', query, OMIT_DESCENDANTS) >> [new DataNode(leaves: ['dataJobId': ['otherDataJobId']])]
when: 'the remove subscription method is with a non existing id'
- objectUnderTest.removeSubscription('some-data-type', 'some-alt-id','nonExistingSubId')
+ objectUnderTest.removeSubscription('some-data-type', 'some-alt-id', 'nonExistingSubId')
then: 'no calls to cps data service is made'
0 * mockCpsDataService.deleteDataNode(*_)
and: 'removal of non existent subscription id silently ignored with no exception thrown'
"definitions": {
"NcmpInEvent": {
"description": "The payload for subscription merge event.",
- "javaType": "org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.NcmpInEvent",
+ "javaType": "org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.NcmpInEvent",
"properties": {
"data": {
"properties": {
"datastore": {
"description": "Datastore which is to be used by the subscription",
"type": "string",
- "enum": ["ncmp-datastore:passthrough-operational", "ncmp-datastore:passthrough-running"]
+ "enum": [
+ "ncmp-datastore:passthrough-operational",
+ "ncmp-datastore:passthrough-running"
+ ]
},
"xpathFilter": {
"description": "Filter to be applied to the CM Handles through this event",
]
}
}
-}
\ No newline at end of file
+}
"NcmpOutEvent": {
"type": "object",
"description": "The payload applied cm subscription merge event coming out from NCMP.",
- "javaType": "org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent",
+ "javaType": "org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent",
"additionalProperties": false,
"properties": {
"data": {
}
-}
\ No newline at end of file
+}