Code to persist or update the yang model subscription into db 01/134801/7
authorhalil.cakal <halil.cakal@est.tech>
Fri, 9 Jun 2023 13:58:00 +0000 (14:58 +0100)
committerhalil.cakal <halil.cakal@est.tech>
Mon, 19 Jun 2023 11:48:10 +0000 (12:48 +0100)
- Change subscription persistence for subscription registry
  to handle adding or updating cm handles into db

Issue-ID: CPS-1735
Change-Id: Icae609e810b2a097a4a174de1691892526f6b5c0
Signed-off-by: halil.cakal <halil.cakal@est.tech>
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistence.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionStatus.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeHelperSpec.groovy

index a746825..1bfc4ab 100644 (file)
@@ -22,8 +22,6 @@ package org.onap.cps.ncmp.api.impl.events.avcsubscription;
 
 import java.io.Serializable;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import lombok.RequiredArgsConstructor;
@@ -104,29 +102,12 @@ public class SubscriptionEventResponseOutcome {
     private SubscriptionEventResponse toSubscriptionEventResponse(
             final List<Collection<Serializable>> cmHandleIdToStatus, final String subscriptionClientId,
             final String subscriptionName) {
-        final Map<String, SubscriptionStatus> cmHandleIdToStatusMap = new HashMap<>();
+        final Map<String, SubscriptionStatus> cmHandleIdToStatusMap =
+                DataNodeHelper.getCmHandleIdToStatusMap(cmHandleIdToStatus);
+
         final SubscriptionEventResponse subscriptionEventResponse = new SubscriptionEventResponse();
         subscriptionEventResponse.setClientId(subscriptionClientId);
         subscriptionEventResponse.setSubscriptionName(subscriptionName);
-
-        for (final Collection<Serializable> cmHandleToStatusBucket: cmHandleIdToStatus) {
-            final Iterator<Serializable> bucketIterator = cmHandleToStatusBucket.iterator();
-            while (bucketIterator.hasNext()) {
-                final String item = (String) bucketIterator.next();
-                if ("PENDING".equals(item)) {
-                    cmHandleIdToStatusMap.put((String) bucketIterator.next(),
-                            SubscriptionStatus.PENDING);
-                }
-                if ("REJECTED".equals(item)) {
-                    cmHandleIdToStatusMap.put((String) bucketIterator.next(),
-                            SubscriptionStatus.REJECTED);
-                }
-                if ("ACCEPTED".equals(item)) {
-                    cmHandleIdToStatusMap.put((String) bucketIterator.next(),
-                            SubscriptionStatus.ACCEPTED);
-                }
-            }
-        }
         subscriptionEventResponse.setCmHandleIdToStatus(cmHandleIdToStatusMap);
 
         return subscriptionEventResponse;
index f240c45..27d4266 100644 (file)
@@ -39,4 +39,11 @@ public interface SubscriptionPersistence {
      * @return the DataNode as collection.
      */
     Collection<DataNode> getDataNodesForSubscriptionEvent();
+
+    /**
+     * Get data nodes by xpath.
+     *
+     * @return the DataNode as collection.
+     */
+    Collection<DataNode> getCmHandlesForSubscriptionEvent(String clientId, String subscriptionName);
 }
index 9a063d6..d2b1237 100644 (file)
@@ -22,11 +22,18 @@ package org.onap.cps.ncmp.api.impl.subscriptions;
 
 import static org.onap.cps.ncmp.api.impl.constants.DmiRegistryConstants.NO_TIMESTAMP;
 
+import java.io.Serializable;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.api.CpsDataService;
+import org.onap.cps.ncmp.api.impl.utils.DataNodeHelper;
 import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
 import org.onap.cps.spi.FetchDescendantsOption;
 import org.onap.cps.spi.model.DataNode;
@@ -41,35 +48,86 @@ public class SubscriptionPersistenceImpl implements SubscriptionPersistence {
     private static final String SUBSCRIPTION_DATASPACE_NAME = "NCMP-Admin";
     private static final String SUBSCRIPTION_ANCHOR_NAME = "AVC-Subscriptions";
     private static final String SUBSCRIPTION_REGISTRY_PARENT = "/subscription-registry";
-
     private final JsonObjectMapper jsonObjectMapper;
     private final CpsDataService cpsDataService;
 
     @Override
     public void saveSubscriptionEvent(final YangModelSubscriptionEvent yangModelSubscriptionEvent) {
-        final String subscriptionEventJsonData =
-                createSubscriptionEventJsonData(jsonObjectMapper.asJsonString(yangModelSubscriptionEvent));
+        final String clientId = yangModelSubscriptionEvent.getClientId();
+        final String subscriptionName = yangModelSubscriptionEvent.getSubscriptionName();
+
         final Collection<DataNode> dataNodes = cpsDataService.getDataNodes(SUBSCRIPTION_DATASPACE_NAME,
                 SUBSCRIPTION_ANCHOR_NAME, SUBSCRIPTION_REGISTRY_PARENT, FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS);
+
+        if (isSubscriptionRegistryEmptyOrNonExist(dataNodes, clientId, subscriptionName)) {
+            saveSubscriptionEventYangModel(createSubscriptionEventJsonData(
+                    jsonObjectMapper.asJsonString(yangModelSubscriptionEvent)));
+        } else {
+            findDeltaCmHandlesAddOrUpdateInDatabase(yangModelSubscriptionEvent, clientId, subscriptionName, dataNodes);
+        }
+    }
+
+    private void findDeltaCmHandlesAddOrUpdateInDatabase(final YangModelSubscriptionEvent yangModelSubscriptionEvent,
+                                                         final String clientId, final String subscriptionName,
+                                                         final Collection<DataNode> dataNodes) {
+        final Map<String, SubscriptionStatus> cmHandleIdsFromYangModel =
+                extractCmHandleFromYangModelAsMap(yangModelSubscriptionEvent);
+        final Map<String, SubscriptionStatus> cmHandleIdsFromDatabase =
+                extractCmHandleFromDbAsMap(dataNodes);
+
+        final Map<String, SubscriptionStatus> newCmHandles =
+                mapDifference(cmHandleIdsFromYangModel, cmHandleIdsFromDatabase);
+        traverseCmHandleList(newCmHandles, clientId, subscriptionName, true);
+
+        final Map<String, SubscriptionStatus> existingCmHandles =
+                mapDifference(cmHandleIdsFromYangModel, newCmHandles);
+        traverseCmHandleList(existingCmHandles, clientId, subscriptionName, false);
+    }
+
+    private boolean isSubscriptionRegistryEmptyOrNonExist(final Collection<DataNode> dataNodes,
+                                                          final String clientId, final String subscriptionName) {
         final Optional<DataNode> dataNodeFirst = dataNodes.stream().findFirst();
-        final boolean isCreateOperation =
-                dataNodeFirst.isPresent() && dataNodeFirst.get().getChildDataNodes().isEmpty();
-        saveOrUpdateSubscriptionEventYangModel(subscriptionEventJsonData, isCreateOperation);
+        return ((dataNodeFirst.isPresent() && dataNodeFirst.get().getChildDataNodes().isEmpty())
+                || getCmHandlesForSubscriptionEvent(clientId, subscriptionName).isEmpty());
+    }
+
+    private void traverseCmHandleList(final Map<String, SubscriptionStatus> cmHandleMap,
+                                      final String clientId,
+                                      final String subscriptionName,
+                                      final boolean isAddListElementOperation) {
+        final List<YangModelSubscriptionEvent.TargetCmHandle> cmHandleList =
+                targetCmHandlesAsList(cmHandleMap);
+        for (final YangModelSubscriptionEvent.TargetCmHandle targetCmHandle : cmHandleList) {
+            final String targetCmHandleAsJson =
+                    createTargetCmHandleJsonData(jsonObjectMapper.asJsonString(targetCmHandle));
+            addOrReplaceCmHandlePredicateListElement(targetCmHandleAsJson, clientId, subscriptionName,
+                    isAddListElementOperation);
+        }
     }
 
-    private void saveOrUpdateSubscriptionEventYangModel(final String subscriptionEventJsonData,
-                                                        final boolean isCreateOperation) {
-        if (isCreateOperation) {
-            log.info("SubscriptionEventJsonData to be saved into DB {}", subscriptionEventJsonData);
-            cpsDataService.saveListElements(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
-                    SUBSCRIPTION_REGISTRY_PARENT, subscriptionEventJsonData, NO_TIMESTAMP);
+    private void addOrReplaceCmHandlePredicateListElement(final String targetCmHandleAsJson,
+                                                          final String clientId,
+                                                          final String subscriptionName,
+                                                          final boolean isAddListElementOperation) {
+        if (isAddListElementOperation) {
+            log.info("targetCmHandleAsJson to be added into DB {}", targetCmHandleAsJson);
+            cpsDataService.saveListElements(SUBSCRIPTION_DATASPACE_NAME,
+                    SUBSCRIPTION_ANCHOR_NAME, createCmHandleXpathPredicates(clientId, subscriptionName),
+                    targetCmHandleAsJson, NO_TIMESTAMP);
         } else {
-            log.info("SubscriptionEventJsonData to be updated into DB {}", subscriptionEventJsonData);
-            cpsDataService.updateDataNodeAndDescendants(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
-                    SUBSCRIPTION_REGISTRY_PARENT, subscriptionEventJsonData, NO_TIMESTAMP);
+            log.info("targetCmHandleAsJson to be updated into DB {}", targetCmHandleAsJson);
+            cpsDataService.updateNodeLeaves(SUBSCRIPTION_DATASPACE_NAME,
+                    SUBSCRIPTION_ANCHOR_NAME, createCmHandleXpathPredicates(clientId, subscriptionName),
+                    targetCmHandleAsJson, NO_TIMESTAMP);
         }
     }
 
+    private void saveSubscriptionEventYangModel(final String subscriptionEventJsonData) {
+        log.info("SubscriptionEventJsonData to be saved into DB {}", subscriptionEventJsonData);
+        cpsDataService.saveListElements(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
+                SUBSCRIPTION_REGISTRY_PARENT, subscriptionEventJsonData, NO_TIMESTAMP);
+    }
+
     @Override
     public Collection<DataNode> getDataNodesForSubscriptionEvent() {
         return cpsDataService.getDataNodes(SUBSCRIPTION_DATASPACE_NAME,
@@ -77,7 +135,58 @@ public class SubscriptionPersistenceImpl implements SubscriptionPersistence {
                 FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS);
     }
 
+    @Override
+    public Collection<DataNode> getCmHandlesForSubscriptionEvent(final String clientId, final String subscriptionName) {
+        return cpsDataService.getDataNodesForMultipleXpaths(SUBSCRIPTION_DATASPACE_NAME,
+                SUBSCRIPTION_ANCHOR_NAME, Arrays.asList(createCmHandleXpath(clientId, subscriptionName)),
+                FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS);
+    }
+
+    private static Map<String, SubscriptionStatus> extractCmHandleFromDbAsMap(final Collection<DataNode> dataNodes) {
+        final List<Map<String, Serializable>> dataNodeLeaves = DataNodeHelper.getDataNodeLeaves(dataNodes);
+        final List<Collection<Serializable>> cmHandleIdToStatus = DataNodeHelper.getCmHandleIdToStatus(dataNodeLeaves);
+        return DataNodeHelper.getCmHandleIdToStatusMap(cmHandleIdToStatus);
+    }
+
+    private static Map<String, SubscriptionStatus> extractCmHandleFromYangModelAsMap(
+            final YangModelSubscriptionEvent yangModelSubscriptionEvent) {
+        return yangModelSubscriptionEvent.getPredicates().getTargetCmHandles()
+                .stream().collect(Collectors.toMap(
+                        YangModelSubscriptionEvent.TargetCmHandle::getCmHandleId,
+                        YangModelSubscriptionEvent.TargetCmHandle::getStatus));
+    }
+
+    private static List<YangModelSubscriptionEvent.TargetCmHandle> targetCmHandlesAsList(
+            final Map<String, SubscriptionStatus> newCmHandles) {
+        return newCmHandles.entrySet().stream().map(entry ->
+                new YangModelSubscriptionEvent.TargetCmHandle(entry.getKey(),
+                        entry.getValue())).collect(Collectors.toList());
+    }
+
     private static String createSubscriptionEventJsonData(final String yangModelSubscriptionAsJson) {
         return "{\"subscription\":[" + yangModelSubscriptionAsJson + "]}";
     }
+
+    private static String createTargetCmHandleJsonData(final String targetCmHandleAsJson) {
+        return "{\"targetCmHandles\":[" + targetCmHandleAsJson + "]}";
+    }
+
+    private static String createCmHandleXpathPredicates(final String clientId, final String subscriptionName) {
+        return "/subscription-registry/subscription[@clientID='" + clientId
+                + "' and @subscriptionName='" + subscriptionName + "']/predicates";
+    }
+
+    private static String createCmHandleXpath(final String clientId, final String subscriptionName) {
+        return "/subscription-registry/subscription[@clientID='" + clientId
+                + "' and @subscriptionName='" + subscriptionName + "']";
+    }
+
+    private static <K, V> Map<K, V> mapDifference(final Map<? extends K, ? extends V> left,
+                                                  final Map<? extends K, ? extends V> right) {
+        final Map<K, V> difference = new HashMap<>();
+        difference.putAll(left);
+        difference.putAll(right);
+        difference.entrySet().removeAll(right.entrySet());
+        return difference;
+    }
 }
index 0b4f91f..ce3b88b 100644 (file)
 
 package org.onap.cps.ncmp.api.impl.subscriptions;
 
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Map;
+
 public enum SubscriptionStatus {
     ACCEPTED,
     REJECTED,
-    PENDING
+    PENDING;
+
+
+    /**
+     * Populates a map with a key of cm handle id and a value of subscription status.
+     *
+     * @param resultMap the map is being populated
+     * @param bucketIterator to iterate over the collection
+     */
+    public static void populateCmHandleToSubscriptionStatusMap(final Map<String, SubscriptionStatus> resultMap,
+                                                          final Iterator<Serializable> bucketIterator) {
+        final String item = (String) bucketIterator.next();
+        if ("PENDING".equals(item)) {
+            resultMap.put((String) bucketIterator.next(),
+                    SubscriptionStatus.PENDING);
+        }
+        if ("REJECTED".equals(item)) {
+            resultMap.put((String) bucketIterator.next(),
+                    SubscriptionStatus.REJECTED);
+        }
+        if ("ACCEPTED".equals(item)) {
+            resultMap.put((String) bucketIterator.next(),
+                    SubscriptionStatus.ACCEPTED);
+        }
+    }
 }
index 1648ac4..8d44592 100644 (file)
@@ -22,12 +22,15 @@ package org.onap.cps.ncmp.api.impl.utils;
 
 import java.io.Serializable;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
+import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
 import org.onap.cps.spi.model.DataNode;
 
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
@@ -72,4 +75,22 @@ public class DataNodeHelper {
                         || col.contains("REJECTED"))
                 .collect(Collectors.toList());
     }
+
+    /**
+     * The cm handle and status is returned as a map.
+     *
+     * @param cmHandleIdToStatus as a list of collection
+     * @return a map of cm handle id to status
+     */
+    public static Map<String, SubscriptionStatus> getCmHandleIdToStatusMap(
+            final List<Collection<Serializable>> cmHandleIdToStatus) {
+        final Map<String, SubscriptionStatus> resultMap = new HashMap<>();
+        for (final Collection<Serializable> cmHandleToStatusBucket: cmHandleIdToStatus) {
+            final Iterator<Serializable> bucketIterator = cmHandleToStatusBucket.iterator();
+            while (bucketIterator.hasNext()) {
+                SubscriptionStatus.populateCmHandleToSubscriptionStatusMap(resultMap, bucketIterator);
+            }
+        }
+        return resultMap;
+    }
 }
index a372abe..ec54e89 100644 (file)
@@ -34,6 +34,7 @@ class SubscriptionPersistenceSpec extends Specification {
     private static final String SUBSCRIPTION_DATASPACE_NAME = "NCMP-Admin";
     private static final String SUBSCRIPTION_ANCHOR_NAME = "AVC-Subscriptions";
     private static final String SUBSCRIPTION_REGISTRY_PARENT = "/subscription-registry";
+    private static final String SUBSCRIPTION_REGISTRY_PREDICATES_XPATH = "/subscription-registry/subscription[@clientID='some-client-id' and @subscriptionName='some-subscription-name']/predicates";
 
     def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
     def mockCpsDataService = Mock(CpsDataService)
@@ -45,11 +46,11 @@ class SubscriptionPersistenceSpec extends Specification {
     def yangModelSubscriptionEvent = new YangModelSubscriptionEvent(clientId: 'some-client-id',
         subscriptionName: 'some-subscription-name', tagged: true, topic: 'some-topic', predicates: predicates)
 
-   def 'save a subscription event' () {
-       given: 'a data node that does not exist in db'
+   def 'save a subscription event as yang model into db for the #scenarios' () {
+       given: 'a blank data node that exist in db'
            def blankDataNode = new DataNodeBuilder().withDataspace('NCMP-Admin')
                 .withAnchor('AVC-Subscriptions').withXpath('/subscription-registry').build()
-       and: 'cps data service return non existing data node'
+       and: 'cps data service return an empty data node'
             mockCpsDataService.getDataNodes(*_) >> [blankDataNode]
        when: 'the yangModelSubscriptionEvent is saved into db'
             objectUnderTest.saveSubscriptionEvent(yangModelSubscriptionEvent)
@@ -63,24 +64,28 @@ class SubscriptionPersistenceSpec extends Specification {
                 NO_TIMESTAMP)
    }
 
-    def 'update a subscription event' () {
-        given: 'a data node exist in db'
+    def 'add or replace cm handle list element into db' () {
+        given: 'a data node with child node exist in db'
+            def leaves1 = [status:'PENDING', cmHandleId:'cmhandle1'] as Map
             def childDataNode = new DataNodeBuilder().withDataspace('NCMP-Admin')
-                .withAnchor('AVC-Subscriptions').withXpath('/subscription-registry/subscription').build()
+                .withAnchor('AVC-Subscriptions').withXpath('/subscription-registry/subscription')
+                .withLeaves(leaves1).build()
             def engagedDataNode = new DataNodeBuilder().withDataspace('NCMP-Admin')
                 .withAnchor('AVC-Subscriptions').withXpath('/subscription-registry')
                 .withChildDataNodes([childDataNode]).build()
-        and: 'cps data service return existing data node'
+        and: 'cps data service return data node including a child data node'
             mockCpsDataService.getDataNodes(*_) >> [engagedDataNode]
-        when: 'the yangModelSubscriptionEvent is saved into db'
+        and: 'cps data service return data node for querying by xpaths'
+            mockCpsDataService.getDataNodesForMultipleXpaths(*_) >> [engagedDataNode]
+        when: 'the yang model subscription event is saved into db'
             objectUnderTest.saveSubscriptionEvent(yangModelSubscriptionEvent)
-        then: 'the cpsDataService update operation is called with the correct data'
-            1 * mockCpsDataService.updateDataNodeAndDescendants(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
-                SUBSCRIPTION_REGISTRY_PARENT,
-                '{"subscription":[{' +
-                    '"topic":"some-topic",' +
-                    '"predicates":{"datastore":"some-datastore","targetCmHandles":[{"cmHandleId":"cmhandle1","status":"PENDING"},{"cmHandleId":"cmhandle2","status":"PENDING"}]},' +
-                    '"clientID":"some-client-id","subscriptionName":"some-subscription-name","isTagged":true}]}',
+        then: 'the cpsDataService save non-existing cm handle with the correct data'
+            1 * mockCpsDataService.saveListElements(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
+                SUBSCRIPTION_REGISTRY_PREDICATES_XPATH, '{"targetCmHandles":[{"cmHandleId":"cmhandle2","status":"PENDING"}]}',
+                NO_TIMESTAMP)
+        and: 'the cpsDataService update existing cm handle with the correct data'
+            1 * mockCpsDataService.updateNodeLeaves(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
+                SUBSCRIPTION_REGISTRY_PREDICATES_XPATH, '{"targetCmHandles":[{"cmHandleId":"cmhandle1","status":"PENDING"}]}',
                 NO_TIMESTAMP)
     }
 
index e527ae1..ee726a9 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.onap.cps.ncmp.api.impl.utils
 
+import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus
 import org.onap.cps.spi.model.DataNodeBuilder
 
 class DataNodeHelperSpec extends DataNodeBaseSpec {
@@ -51,8 +52,22 @@ class DataNodeHelperSpec extends DataNodeBaseSpec {
         and: 'the nested data node is flatten and retrieves the leaves '
             def leaves = DataNodeHelper.getDataNodeLeaves([dataNode])
         when:'cm handle id to status is retrieved'
-            def result = DataNodeHelper.getCmHandleIdToStatus(leaves);
+            def result = DataNodeHelper.getCmHandleIdToStatus(leaves)
         then: 'the result list size is 3'
             result.size() == 3
+        and: 'the result contains expected values'
+            result[0] as List == ['PENDING', 'CMHandle3']
+            result[1] as List == ['ACCEPTED', 'CMHandle2']
+            result[2] as List == ['REJECTED', 'CMHandle1']
+    }
+
+    def 'Get cm handle id to status map as expected from list of collection' () {
+        given: 'a list of collection'
+            def cmHandleCollection = [['PENDING', 'CMHandle3'], ['ACCEPTED', 'CMHandle2'], ['REJECTED', 'CMHandle1']]
+        when: 'the map is formed up with a method call'
+            def result = DataNodeHelper.getCmHandleIdToStatusMap(cmHandleCollection)
+        then: 'the map values are as expected'
+            result.keySet() == ['CMHandle3', 'CMHandle2', 'CMHandle1'] as Set
+            result.values() as List == [SubscriptionStatus.PENDING, SubscriptionStatus.ACCEPTED, SubscriptionStatus.REJECTED]
     }
 }