Merge "CPS 1824: Delta Between 2 Anchors release notes"
authorToine Siebelink <toine.siebelink@est.tech>
Thu, 25 Jan 2024 12:42:14 +0000 (12:42 +0000)
committerGerrit Code Review <gerrit@onap.org>
Thu, 25 Jan 2024 12:42:14 +0000 (12:42 +0000)
19 files changed:
cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventConsumer.java [new file with mode: 0644]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/inventory/CompositeStateUtils.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/CmDataSubscriptionModelLoader.java
cps-ncmp-service/src/main/resources/models/subscription.yang [deleted file]
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventConsumerSpec.groovy [new file with mode: 0644]
cps-ncmp-service/src/test/resources/cmSubscription/cmSubscriptionNcmpInEvent.json [new file with mode: 0644]
cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionDmiInEvent.json [deleted file]
cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionDmiOutEvent.json [deleted file]
cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionEvent.json [deleted file]
cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpInEvent.json [deleted file]
cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpOutEvent.json [deleted file]
cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpOutEvent2.json [deleted file]
cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java
csit/tests/cps-data-operations/cps-data-operations.robot
csit/tests/cps-data-sync/cps-data-sync.robot
csit/tests/cps-model-sync/cps-model-sync.robot
csit/tests/cps-trust-level/cps-trust-level.robot

index ae7c564..f06af6c 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2023 Nordix Foundation
+ *  Copyright (C) 2023-2024 Nordix Foundation
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -37,19 +37,27 @@ class NcmpDatastoreRequestHandlerSpec extends Specification {
 
     def objectUnderTest = new NcmpPassthroughResourceRequestHandler(spiedCpsNcmpTaskExecutor, mockNetworkCmProxyDataService)
 
+    def setup() {
+        objectUnderTest.timeOutInMilliSeconds = 100
+    }
+
     def 'Attempt to execute async get request with #scenario.'() {
         given: 'notification feature is turned on/off'
             objectUnderTest.notificationFeatureEnabled = notificationFeatureEnabled
+        and: ' a flag to track the network service call'
+            def networkServiceMethodCalled = false
+        and: 'the (mocked) service will use the flag to indicate if it is called'
+            mockNetworkCmProxyDataService.getResourceDataForCmHandle('ds', 'ch1', 'resource1', 'options', _, _) >> {
+                networkServiceMethodCalled = true
+            }
         when: 'get request is executed with topic = #topic'
             objectUnderTest.executeRequest('ds', 'ch1', 'resource1', 'options', topic, false)
-        and: 'wait a little for async execution (only if expected)'
-            if (expectedCalls > 0) {
-                Thread.sleep(500)
-            }
         then: 'the task is executed in an async fashion or not'
             expectedCalls * spiedCpsNcmpTaskExecutor.executeTask(*_)
-        /*and: 'the service request is always invoked'
-            1 * mockNetworkCmProxyDataService.getResourceDataForCmHandle('ds', 'ch1', 'resource1', 'options', _, _)*/
+        and: 'the service request is always invoked within 1 seconds'
+            new PollingConditions().within(1) {
+                assert networkServiceMethodCalled == true
+            }
         where: 'the following parameters are used'
             scenario                   | notificationFeatureEnabled | topic   || expectedCalls
             'feature on, valid topic'  | true                       | 'valid' || 1
@@ -89,9 +97,9 @@ class NcmpDatastoreRequestHandlerSpec extends Specification {
             objectUnderTest.executeRequest('myTopic', dataOperationRequest)
         then: 'the task is executed in an async fashion'
             1 * spiedCpsNcmpTaskExecutor.executeTask(*_)
-        and: 'the network service is invoked (wait max. 5 seconds)'
-            new PollingConditions(timeout: 30).eventually {
-                //TODO Fix test assertion
+        and: 'the network service is invoked within 1 seconds'
+            new PollingConditions().within(1) {
+                assert networkServiceMethodCalled == true
             }
         where: 'the following datastores are used'
             datastore << ['ncmp-datastore:passthrough-running', 'ncmp-datastore:passthrough-operational']
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventConsumer.java
new file mode 100644 (file)
index 0000000..8bc3694
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.events.cmsubscription;
+
+import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent;
+
+import io.cloudevents.CloudEvent;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.client_to_ncmp.CmSubscriptionNcmpInEvent;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class CmSubscriptionNcmpInEventConsumer {
+
+    @Value("${notification.enabled:true}")
+    private boolean notificationFeatureEnabled;
+
+    @Value("${ncmp.model-loader.subscription:false}")
+    private boolean subscriptionModelLoaderEnabled;
+
+    /**
+     * Consume the specified event.
+     *
+     * @param subscriptionEventConsumerRecord the event to be consumed
+     */
+    @KafkaListener(topics = "${app.ncmp.avc.subscription-topic}",
+        containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
+    public void consumeSubscriptionEvent(final ConsumerRecord<String, CloudEvent> subscriptionEventConsumerRecord) {
+        final CloudEvent cloudEvent = subscriptionEventConsumerRecord.value();
+        final CmSubscriptionNcmpInEvent cmSubscriptionNcmpInEvent =
+            toTargetEvent(cloudEvent, CmSubscriptionNcmpInEvent.class);
+        if (subscriptionModelLoaderEnabled) {
+            log.info("Subscription with name {} to be mapped to hazelcast object...",
+                cmSubscriptionNcmpInEvent.getData().getSubscriptionId());
+        }
+        if ("subscriptionCreated".equals(cloudEvent.getType()) && cmSubscriptionNcmpInEvent != null) {
+            log.info("Subscription for ClientID {} with name {} ...",
+                cloudEvent.getSource(),
+                cmSubscriptionNcmpInEvent.getData().getSubscriptionId());
+        }
+    }
+}
index a31332f..0ed95ad 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- * Copyright (C) 2022-2023 Nordix Foundation
+ * Copyright (C) 2022-2024 Nordix Foundation
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -164,12 +164,12 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState
     }
 
     private void setInitialStates(final YangModelCmHandle yangModelCmHandle) {
-        CompositeStateUtils.setInitialDataStoreSyncState().accept(yangModelCmHandle.getCompositeState());
-        CompositeStateUtils.setCompositeState(READY).accept(yangModelCmHandle.getCompositeState());
+        CompositeStateUtils.setInitialDataStoreSyncState(yangModelCmHandle.getCompositeState());
+        CompositeStateUtils.setCompositeState(READYyangModelCmHandle.getCompositeState());
     }
 
     private void retryCmHandle(final YangModelCmHandle yangModelCmHandle) {
-        CompositeStateUtils.setCompositeStateForRetry().accept(yangModelCmHandle.getCompositeState());
+        CompositeStateUtils.setCompositeStateForRetry(yangModelCmHandle.getCompositeState());
     }
 
     private void registerNewCmHandle(final YangModelCmHandle yangModelCmHandle) {
@@ -178,7 +178,7 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState
     }
 
     private void setCmHandleState(final YangModelCmHandle yangModelCmHandle, final CmHandleState targetCmHandleState) {
-        CompositeStateUtils.setCompositeState(targetCmHandleState).accept(yangModelCmHandle.getCompositeState());
+        CompositeStateUtils.setCompositeState(targetCmHandleStateyangModelCmHandle.getCompositeState());
     }
 
     private boolean isNew(final CompositeState existingCompositeState) {
index 99cca8c..35ad54f 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- * Copyright (C) 2022-2023 Nordix Foundation
+ * Copyright (C) 2022-2024 Nordix Foundation
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -20,7 +20,6 @@
 
 package org.onap.cps.ncmp.api.impl.inventory;
 
-import java.util.function.Consumer;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -34,31 +33,23 @@ public class CompositeStateUtils {
 
     /**
      * Sets the cmHandleState to the provided state and updates the timestamp.
-     *
-     * @return Updated CompositeState
      */
-    public static Consumer<CompositeState> setCompositeState(final CmHandleState cmHandleState) {
-        return compositeState -> {
-            compositeState.setCmHandleState(cmHandleState);
-            compositeState.setLastUpdateTimeNow();
-        };
+    public static void setCompositeState(final CmHandleState cmHandleState,
+                                                   final CompositeState compositeState) {
+        compositeState.setCmHandleState(cmHandleState);
+        compositeState.setLastUpdateTimeNow();
     }
 
     /**
      * Set the Operational datastore sync state based on the global flag.
-     *
-     * @return Updated CompositeState
      */
-    public static Consumer<CompositeState> setInitialDataStoreSyncState() {
-
-        return compositeState -> {
-            compositeState.setDataSyncEnabled(false);
-            final CompositeState.Operational operational =
-                    getInitialDataStoreSyncState(compositeState.getDataSyncEnabled());
-            final CompositeState.DataStores dataStores =
-                    CompositeState.DataStores.builder().operationalDataStore(operational).build();
-            compositeState.setDataStores(dataStores);
-        };
+    public static void setInitialDataStoreSyncState(final CompositeState compositeState) {
+        compositeState.setDataSyncEnabled(false);
+        final CompositeState.Operational operational =
+            getInitialDataStoreSyncState(compositeState.getDataSyncEnabled());
+        final CompositeState.DataStores dataStores =
+            CompositeState.DataStores.builder().operationalDataStore(operational).build();
+        compositeState.setDataStores(dataStores);
     }
 
     /**
@@ -91,19 +82,15 @@ public class CompositeStateUtils {
 
     /**
      * Sets the cmHandleState to ADVISED and retain the lock details. Used in retry scenarios.
-     *
-     * @return Updated CompositeState
      */
-    public static Consumer<CompositeState> setCompositeStateForRetry() {
-        return compositeState -> {
-            compositeState.setCmHandleState(CmHandleState.ADVISED);
-            compositeState.setLastUpdateTimeNow();
-            final String oldLockReasonDetails = compositeState.getLockReason().getDetails();
-            final CompositeState.LockReason lockReason =
-                    CompositeState.LockReason.builder()
-                            .lockReasonCategory(compositeState.getLockReason().getLockReasonCategory())
-                            .details(oldLockReasonDetails).build();
-            compositeState.setLockReason(lockReason);
-        };
+    public static void setCompositeStateForRetry(final CompositeState compositeState) {
+        compositeState.setCmHandleState(CmHandleState.ADVISED);
+        compositeState.setLastUpdateTimeNow();
+        final String oldLockReasonDetails = compositeState.getLockReason().getDetails();
+        final CompositeState.LockReason lockReason =
+            CompositeState.LockReason.builder()
+                .lockReasonCategory(compositeState.getLockReason().getLockReasonCategory())
+                .details(oldLockReasonDetails).build();
+        compositeState.setLockReason(lockReason);
     }
 }
index 81055db..88ba5e9 100644 (file)
@@ -45,13 +45,6 @@ public class CmDataSubscriptionModelLoader extends AbstractModelLoader {
     private static final String DATASTORE_PASSTHROUGH_OPERATIONAL = "ncmp-datastores:passthrough-operational";
     private static final String DATASTORE_PASSTHROUGH_RUNNING = "ncmp-datastores:passthrough-running";
 
-    private static final String DEPRECATED_MODEL_FILENAME = "subscription.yang";
-    private static final String DEPRECATED_ANCHOR_NAME = "AVC-Subscriptions";
-    private static final String DEPRECATED_SCHEMASET_NAME = "subscriptions";
-    private static final String DEPRECATED_REGISTRY_DATANODE_NAME = "subscription-registry";
-
-
-
     public CmDataSubscriptionModelLoader(final CpsDataspaceService cpsDataspaceService,
                                          final CpsModuleService cpsModuleService,
                                          final CpsAnchorService cpsAnchorService,
@@ -74,10 +67,6 @@ public class CmDataSubscriptionModelLoader extends AbstractModelLoader {
     }
 
     private void onboardSubscriptionModels() {
-        createSchemaSet(NCMP_DATASPACE_NAME, DEPRECATED_SCHEMASET_NAME, DEPRECATED_MODEL_FILENAME);
-        createAnchor(NCMP_DATASPACE_NAME, DEPRECATED_SCHEMASET_NAME, DEPRECATED_ANCHOR_NAME);
-        createTopLevelDataNode(NCMP_DATASPACE_NAME, DEPRECATED_ANCHOR_NAME, DEPRECATED_REGISTRY_DATANODE_NAME);
-
         createSchemaSet(NCMP_DATASPACE_NAME, SCHEMASET_NAME, MODEL_FILENAME);
         createAnchor(NCMP_DATASPACE_NAME, SCHEMASET_NAME, ANCHOR_NAME);
         createTopLevelDataNode(NCMP_DATASPACE_NAME, ANCHOR_NAME, REGISTRY_DATANODE_NAME);
diff --git a/cps-ncmp-service/src/main/resources/models/subscription.yang b/cps-ncmp-service/src/main/resources/models/subscription.yang
deleted file mode 100644 (file)
index 7096c18..0000000
+++ /dev/null
@@ -1,57 +0,0 @@
-module subscription {
-    yang-version 1.1;
-    namespace "org:onap:ncmp:subscription";
-
-    prefix subs;
-
-    revision "2023-03-21" {
-        description
-            "NCMP subscription model";
-    }
-
-    container subscription-registry {
-        list subscription {
-            key "clientID subscriptionName";
-
-            leaf clientID {
-                type string;
-            }
-
-            leaf subscriptionName {
-                type string;
-            }
-
-            leaf topic {
-                type string;
-            }
-
-            leaf isTagged {
-                type boolean;
-            }
-
-            container predicates {
-
-                list targetCmHandles {
-                    key "cmHandleId";
-
-                    leaf cmHandleId {
-                      type string;
-                    }
-
-                    leaf status {
-                      type string;
-                    }
-
-                    leaf details {
-                        type string;
-                    }
-                }
-
-                leaf datastore {
-                    type string;
-                }
-            }
-
-        }
-    }
-}
\ No newline at end of file
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventConsumerSpec.groovy
new file mode 100644 (file)
index 0000000..57e77eb
--- /dev/null
@@ -0,0 +1,93 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2024 Nordix Foundation.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an 'AS IS' BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.events.cmsubscription
+
+import ch.qos.logback.classic.Level
+import ch.qos.logback.classic.Logger
+import ch.qos.logback.classic.spi.ILoggingEvent
+import ch.qos.logback.core.read.ListAppender
+import com.fasterxml.jackson.databind.ObjectMapper
+import io.cloudevents.CloudEvent
+import io.cloudevents.core.builder.CloudEventBuilder
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.BeforeEach
+import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
+import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.client_to_ncmp.CmSubscriptionNcmpInEvent
+import org.onap.cps.ncmp.utils.TestUtils
+import org.onap.cps.utils.JsonObjectMapper
+import org.slf4j.LoggerFactory
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+
+@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
+class CmSubscriptionNcmpInEventConsumerSpec extends MessagingBaseSpec {
+
+    def objectUnderTest = new CmSubscriptionNcmpInEventConsumer()
+    def logger = Spy(ListAppender<ILoggingEvent>)
+
+    @Autowired
+    JsonObjectMapper jsonObjectMapper
+
+    @Autowired
+    ObjectMapper objectMapper
+
+    @BeforeEach
+    void setup() {
+        ((Logger) LoggerFactory.getLogger(CmSubscriptionNcmpInEventConsumer.class)).addAppender(logger);
+        logger.start();
+    }
+
+    @AfterEach
+    void teardown() {
+        ((Logger) LoggerFactory.getLogger(CmSubscriptionNcmpInEventConsumer.class)).detachAndStopAllAppenders();
+    }
+
+
+    def 'Consume valid CMSubscription create message'() {
+        given: 'a cmsubscription event'
+            def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmSubscriptionNcmpInEvent.json')
+            def testEventSent = jsonObjectMapper.convertJsonString(jsonData, CmSubscriptionNcmpInEvent.class)
+            def testCloudEventSent = CloudEventBuilder.v1()
+                .withData(objectMapper.writeValueAsBytes(testEventSent))
+                .withId('subscriptionCreated')
+                .withType('subscriptionCreated')
+                .withSource(URI.create('some-resource'))
+                .withExtension('correlationid', 'test-cmhandle1').build()
+            def consumerRecord = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent)
+        and: 'notifications are enabled'
+            objectUnderTest.notificationFeatureEnabled = true
+        and: 'subscription model loader is enabled'
+            objectUnderTest.subscriptionModelLoaderEnabled = true
+        when: 'the valid event is consumed'
+            objectUnderTest.consumeSubscriptionEvent(consumerRecord)
+        then: 'an event is logged with level INFO'
+            def loggingEvent = getLoggingEvent()
+            assert loggingEvent.level == Level.INFO
+        and: 'the log indicates the task completed successfully'
+            assert loggingEvent.formattedMessage == 'Subscription with name cm-subscription-001 to be mapped to hazelcast object...'
+    }
+
+    def getLoggingEvent() {
+        return logger.list[0]
+    }
+
+}
diff --git a/cps-ncmp-service/src/test/resources/cmSubscription/cmSubscriptionNcmpInEvent.json b/cps-ncmp-service/src/test/resources/cmSubscription/cmSubscriptionNcmpInEvent.json
new file mode 100644 (file)
index 0000000..5246618
--- /dev/null
@@ -0,0 +1,23 @@
+{
+  "data": {
+    "subscriptionId": "cm-subscription-001",
+    "predicates": [
+      {
+        "targetFilter": [
+          "CMHandle1",
+          "CMHandle2",
+          "CMHandle3"
+        ],
+        "scopeFilter": {
+          "datastore": "ncmp-datastore:passthrough-running",
+          "xpath-filter": [
+            "//_3gpp-nr-nrm-gnbdufunction:GNBDUFunction/_3gpp-nr-nrm-nrcelldu:NRCellDU/",
+            "//_3gpp-nr-nrm-gnbcuupfunction:GNBCUUPFunction//",
+            "//_3gpp-nr-nrm-gnbcucpfunction:GNBCUCPFunction/_3gpp-nr-nrm-nrcelldu:NRCellCU//",
+            "//_3gpp-nr-nrm-nrsectorcarrier:NRSectorCarrier//"
+          ]
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file
diff --git a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionDmiInEvent.json b/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionDmiInEvent.json
deleted file mode 100644 (file)
index f31362a..0000000
+++ /dev/null
@@ -1,31 +0,0 @@
-{
-  "data": {
-    "subscription": {
-      "clientID": "SCO-9989752",
-      "name": "cm-subscription-001"
-    },
-    "dataType": {
-      "dataspace": "ALL",
-      "dataCategory": "CM",
-      "dataProvider": "CM-SERVICE"
-    },
-    "predicates": {
-      "targets":[
-        {
-          "id":"CMHandle2",
-          "additional-properties":{
-            "Books":"Novel"
-          }
-        },
-        {
-          "id":"CMHandle1",
-          "additional-properties":{
-            "Books":"Social Media"
-          }
-        }
-      ],
-      "datastore": "passthrough-running",
-      "datastore-xpath-filter": "//_3gpp-nr-nrm-gnbdufunction:GNBDUFunction/_3gpp-nr-nrm-nrcelldu:NRCellDU/ | //_3gpp-nr-nrm-gnbcuupfunction:GNBCUUPFunction// | //_3gpp-nr-nrm-gnbcucpfunction:GNBCUCPFunction/_3gpp-nr-nrm-nrcelldu:NRCellCU// | //_3gpp-nr-nrm-nrsectorcarrier:NRSectorCarrier//"
-    }
-  }
-}
\ No newline at end of file
diff --git a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionDmiOutEvent.json b/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionDmiOutEvent.json
deleted file mode 100644 (file)
index ae14b5c..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-{
-  "data": {
-    "clientId": "SCO-9989752",
-    "subscriptionName": "cm-subscription-001",
-    "dmiName": "dminame1",
-    "subscriptionStatus": [
-      {
-        "id": "CMHandle1",
-        "status": "REJECTED",
-        "details": "Some error message from the DMI"
-      },
-      {
-        "id": "CMHandle2",
-        "status": "REJECTED",
-        "details": "Some other error message from the DMI"
-      }
-    ]
-  }
-}
\ No newline at end of file
diff --git a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionEvent.json b/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionEvent.json
deleted file mode 100644 (file)
index c38cb79..0000000
+++ /dev/null
@@ -1,31 +0,0 @@
-{
-  "clientId": "SCO-9989752",
-  "subscriptionName": "cm-subscription-001",
-  "cmSubscriptionStatus": [
-    {
-      "id": "CMHandle1",
-      "status": "REJECTED",
-      "details": "Some error message from the DMI"
-    },
-    {
-      "id": "CMHandle2",
-      "status": "REJECTED",
-      "details": "Some other error message from the DMI"
-    },
-    {
-      "id": "CMHandle3",
-      "status": "PENDING",
-      "details": "Some error causes pending"
-    },
-    {
-      "id": "CMHandle4",
-      "status": "PENDING",
-      "details": "Some other error happened"
-    },
-    {
-      "id": "CMHandle5",
-      "status": "PENDING",
-      "details": "Some other error happened"
-    }
-  ]
-}
\ No newline at end of file
diff --git a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpInEvent.json b/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpInEvent.json
deleted file mode 100644 (file)
index 803fa48..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-{
-  "data": {
-    "subscription": {
-      "clientID": "SCO-9989752",
-      "name": "cm-subscription-001"
-    },
-    "dataType": {
-      "dataspace": "ALL",
-      "dataCategory": "CM",
-      "dataProvider": "CM-SERVICE"
-    },
-    "predicates": {
-      "targets": [
-        "CMHandle1",
-        "CMHandle2",
-        "CMHandle3"
-      ],
-      "datastore": "ncmp-datastore:passthrough-running",
-      "datastore-xpath-filter": "//_3gpp-nr-nrm-gnbdufunction:GNBDUFunction/_3gpp-nr-nrm-nrcelldu:NRCellDU/ | //_3gpp-nr-nrm-gnbcuupfunction:GNBCUUPFunction// | //_3gpp-nr-nrm-gnbcucpfunction:GNBCUCPFunction/_3gpp-nr-nrm-nrcelldu:NRCellCU// | //_3gpp-nr-nrm-nrsectorcarrier:NRSectorCarrier//"
-    }
-  }
-}
\ No newline at end of file
diff --git a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpOutEvent.json b/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpOutEvent.json
deleted file mode 100644 (file)
index 856f238..0000000
+++ /dev/null
@@ -1,28 +0,0 @@
-{
-  "data": {
-    "statusCode": 104,
-    "statusMessage": "partially applied subscription",
-    "additionalInfo": {
-      "rejected": [
-        {
-          "details": "Some other error message from the DMI",
-          "targets": ["CMHandle2"]
-        },
-        {
-          "details": "Some error message from the DMI",
-          "targets": ["CMHandle1"]
-        }
-      ],
-      "pending": [
-        {
-          "details": "Some other error happened",
-          "targets": ["CMHandle4", "CMHandle5"]
-        },
-        {
-          "details": "Some error causes pending",
-          "targets": ["CMHandle3"]
-        }
-      ]
-    }
-  }
-}
\ No newline at end of file
diff --git a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpOutEvent2.json b/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpOutEvent2.json
deleted file mode 100644 (file)
index 35ff024..0000000
+++ /dev/null
@@ -1,20 +0,0 @@
-{
-  "data": {
-    "statusCode": 104,
-    "statusMessage": "partially applied subscription",
-    "additionalInfo": {
-      "rejected": [
-        {
-          "details": "Cm handle does not exist",
-          "targets": ["CMHandle1"]
-        }
-      ],
-      "pending": [
-        {
-          "details": "Subscription forwarded to dmi plugin",
-          "targets": ["CMHandle3"]
-        }
-      ]
-    }
-  }
-}
\ No newline at end of file
index 1cfe21d..fd47793 100644 (file)
@@ -83,67 +83,6 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
 
     private static final String REG_EX_FOR_OPTIONAL_LIST_INDEX = "(\\[@.+?])?)";
 
-    @Override
-    public void addChildDataNodes(final String dataspaceName, final String anchorName,
-                                  final String parentNodeXpath, final Collection<DataNode> dataNodes) {
-        final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName);
-        addChildrenDataNodes(anchorEntity, parentNodeXpath, dataNodes);
-    }
-
-    @Override
-    public void addListElements(final String dataspaceName, final String anchorName, final String parentNodeXpath,
-                                final Collection<DataNode> newListElements) {
-        final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName);
-        addChildrenDataNodes(anchorEntity, parentNodeXpath, newListElements);
-    }
-
-    private void addNewChildDataNode(final AnchorEntity anchorEntity, final String parentNodeXpath,
-                                     final DataNode newChild) {
-        final FragmentEntity parentFragmentEntity = getFragmentEntity(anchorEntity, parentNodeXpath);
-        final FragmentEntity newChildAsFragmentEntity = convertToFragmentWithAllDescendants(anchorEntity, newChild);
-        newChildAsFragmentEntity.setParentId(parentFragmentEntity.getId());
-        try {
-            fragmentRepository.save(newChildAsFragmentEntity);
-        } catch (final DataIntegrityViolationException dataIntegrityViolationException) {
-            throw AlreadyDefinedException.forDataNodes(Collections.singletonList(newChild.getXpath()),
-                    anchorEntity.getName());
-        }
-    }
-
-    private void addChildrenDataNodes(final AnchorEntity anchorEntity, final String parentNodeXpath,
-                                      final Collection<DataNode> newChildren) {
-        final FragmentEntity parentFragmentEntity = getFragmentEntity(anchorEntity, parentNodeXpath);
-        final List<FragmentEntity> fragmentEntities = new ArrayList<>(newChildren.size());
-        try {
-            for (final DataNode newChildAsDataNode : newChildren) {
-                final FragmentEntity newChildAsFragmentEntity =
-                    convertToFragmentWithAllDescendants(anchorEntity, newChildAsDataNode);
-                newChildAsFragmentEntity.setParentId(parentFragmentEntity.getId());
-                fragmentEntities.add(newChildAsFragmentEntity);
-            }
-            fragmentRepository.saveAll(fragmentEntities);
-        } catch (final DataIntegrityViolationException dataIntegrityViolationException) {
-            log.warn("Exception occurred : {} , While saving : {} children, retrying using individual save operations",
-                    dataIntegrityViolationException, fragmentEntities.size());
-            retrySavingEachChildIndividually(anchorEntity, parentNodeXpath, newChildren);
-        }
-    }
-
-    private void retrySavingEachChildIndividually(final AnchorEntity anchorEntity, final String parentNodeXpath,
-                                                  final Collection<DataNode> newChildren) {
-        final Collection<String> failedXpaths = new HashSet<>();
-        for (final DataNode newChild : newChildren) {
-            try {
-                addNewChildDataNode(anchorEntity, parentNodeXpath, newChild);
-            } catch (final AlreadyDefinedException alreadyDefinedException) {
-                failedXpaths.add(newChild.getXpath());
-            }
-        }
-        if (!failedXpaths.isEmpty()) {
-            throw AlreadyDefinedException.forDataNodes(failedXpaths, anchorEntity.getName());
-        }
-    }
-
     @Override
     public void storeDataNodes(final String dataspaceName, final String anchorName,
                                final Collection<DataNode> dataNodes) {
@@ -157,7 +96,7 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
             fragmentRepository.saveAll(fragmentEntities);
         } catch (final DataIntegrityViolationException exception) {
             log.warn("Exception occurred : {} , While saving : {} data nodes, Retrying saving data nodes individually",
-                    exception, dataNodes.size());
+                exception, dataNodes.size());
             storeDataNodesIndividually(anchorEntity, dataNodes);
         }
     }
@@ -197,79 +136,153 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
         return parentFragment;
     }
 
-    private FragmentEntity toFragmentEntity(final AnchorEntity anchorEntity, final DataNode dataNode) {
-        return FragmentEntity.builder()
-                .anchor(anchorEntity)
-                .xpath(dataNode.getXpath())
-                .attributes(jsonObjectMapper.asJsonString(dataNode.getLeaves()))
-                .build();
+    @Override
+    public void addListElements(final String dataspaceName, final String anchorName, final String parentNodeXpath,
+                                final Collection<DataNode> newListElements) {
+        final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName);
+        addChildrenDataNodes(anchorEntity, parentNodeXpath, newListElements);
     }
 
     @Override
-    @Timed(value = "cps.data.persistence.service.datanode.get",
-            description = "Time taken to get a data node")
-    public Collection<DataNode> getDataNodes(final String dataspaceName, final String anchorName,
-                                             final String xpath,
-                                             final FetchDescendantsOption fetchDescendantsOption) {
-        final String targetXpath = getNormalizedXpath(xpath);
-        final Collection<DataNode> dataNodes = getDataNodesForMultipleXpaths(dataspaceName, anchorName,
-                Collections.singletonList(targetXpath), fetchDescendantsOption);
-        if (dataNodes.isEmpty()) {
-            throw new DataNodeNotFoundException(dataspaceName, anchorName, xpath);
+    public void addChildDataNodes(final String dataspaceName, final String anchorName,
+                                  final String parentNodeXpath, final Collection<DataNode> dataNodes) {
+        final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName);
+        addChildrenDataNodes(anchorEntity, parentNodeXpath, dataNodes);
+    }
+
+    private void addChildrenDataNodes(final AnchorEntity anchorEntity, final String parentNodeXpath,
+                                      final Collection<DataNode> newChildren) {
+        final FragmentEntity parentFragmentEntity = getFragmentEntity(anchorEntity, parentNodeXpath);
+        final List<FragmentEntity> fragmentEntities = new ArrayList<>(newChildren.size());
+        try {
+            for (final DataNode newChildAsDataNode : newChildren) {
+                final FragmentEntity newChildAsFragmentEntity =
+                    convertToFragmentWithAllDescendants(anchorEntity, newChildAsDataNode);
+                newChildAsFragmentEntity.setParentId(parentFragmentEntity.getId());
+                fragmentEntities.add(newChildAsFragmentEntity);
+            }
+            fragmentRepository.saveAll(fragmentEntities);
+        } catch (final DataIntegrityViolationException dataIntegrityViolationException) {
+            log.warn("Exception occurred : {} , While saving : {} children, retrying using individual save operations",
+                dataIntegrityViolationException, fragmentEntities.size());
+            retrySavingEachChildIndividually(anchorEntity, parentNodeXpath, newChildren);
         }
-        return dataNodes;
     }
 
-    @Override
-    @Timed(value = "cps.data.persistence.service.datanode.batch.get",
-            description = "Time taken to get data nodes")
-    public Collection<DataNode> getDataNodesForMultipleXpaths(final String dataspaceName, final String anchorName,
-                                                              final Collection<String> xpaths,
-                                                              final FetchDescendantsOption fetchDescendantsOption) {
-        final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName);
-        Collection<FragmentEntity> fragmentEntities = getFragmentEntities(anchorEntity, xpaths);
-        fragmentEntities = fragmentRepository.prefetchDescendantsOfFragmentEntities(fetchDescendantsOption,
-                fragmentEntities);
-        return createDataNodesFromFragmentEntities(fetchDescendantsOption, fragmentEntities);
+    private void addNewChildDataNode(final AnchorEntity anchorEntity, final String parentNodeXpath,
+                                     final DataNode newChild) {
+        final FragmentEntity parentFragmentEntity = getFragmentEntity(anchorEntity, parentNodeXpath);
+        final FragmentEntity newChildAsFragmentEntity = convertToFragmentWithAllDescendants(anchorEntity, newChild);
+        newChildAsFragmentEntity.setParentId(parentFragmentEntity.getId());
+        try {
+            fragmentRepository.save(newChildAsFragmentEntity);
+        } catch (final DataIntegrityViolationException dataIntegrityViolationException) {
+            throw AlreadyDefinedException.forDataNodes(Collections.singletonList(newChild.getXpath()),
+                anchorEntity.getName());
+        }
     }
 
-    private Collection<FragmentEntity> getFragmentEntities(final AnchorEntity anchorEntity,
-                                                           final Collection<String> xpaths) {
-        final Collection<String> normalizedXpaths = getNormalizedXpaths(xpaths);
+    private void retrySavingEachChildIndividually(final AnchorEntity anchorEntity, final String parentNodeXpath,
+                                                  final Collection<DataNode> newChildren) {
+        final Collection<String> failedXpaths = new HashSet<>();
+        for (final DataNode newChild : newChildren) {
+            try {
+                addNewChildDataNode(anchorEntity, parentNodeXpath, newChild);
+            } catch (final AlreadyDefinedException alreadyDefinedException) {
+                failedXpaths.add(newChild.getXpath());
+            }
+        }
+        if (!failedXpaths.isEmpty()) {
+            throw AlreadyDefinedException.forDataNodes(failedXpaths, anchorEntity.getName());
+        }
+    }
 
-        final boolean haveRootXpath = normalizedXpaths.removeIf(CpsDataPersistenceServiceImpl::isRootXpath);
+    @Override
+    public void batchUpdateDataLeaves(final String dataspaceName, final String anchorName,
+                                      final Map<String, Map<String, Serializable>> updatedLeavesPerXPath) {
+        final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName);
 
-        final List<FragmentEntity> fragmentEntities = fragmentRepository.findByAnchorAndXpathIn(anchorEntity,
-                normalizedXpaths);
+        final Collection<String> xpathsOfUpdatedLeaves = updatedLeavesPerXPath.keySet();
+        final Collection<FragmentEntity> fragmentEntities = getFragmentEntities(anchorEntity, xpathsOfUpdatedLeaves);
 
         for (final FragmentEntity fragmentEntity : fragmentEntities) {
-            normalizedXpaths.remove(fragmentEntity.getXpath());
+            final Map<String, Serializable> updatedLeaves = updatedLeavesPerXPath.get(fragmentEntity.getXpath());
+            final String mergedLeaves = mergeLeaves(updatedLeaves, fragmentEntity.getAttributes());
+            fragmentEntity.setAttributes(mergedLeaves);
         }
 
-        for (final String xpath : normalizedXpaths) {
-            if (!CpsPathUtil.isPathToListElement(xpath)) {
-                fragmentEntities.addAll(fragmentRepository.findListByAnchorAndXpath(anchorEntity, xpath));
-            }
+        try {
+            fragmentRepository.saveAll(fragmentEntities);
+        } catch (final StaleStateException staleStateException) {
+            retryUpdateDataNodesIndividually(anchorEntity, fragmentEntities);
         }
+    }
 
-        if (haveRootXpath) {
-            fragmentEntities.addAll(fragmentRepository.findRootsByAnchorId(anchorEntity.getId()));
+    @Override
+    public void updateDataNodesAndDescendants(final String dataspaceName, final String anchorName,
+                                              final Collection<DataNode> updatedDataNodes) {
+        final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName);
+
+        final Map<String, DataNode> xpathToUpdatedDataNode = updatedDataNodes.stream()
+            .collect(Collectors.toMap(DataNode::getXpath, dataNode -> dataNode));
+
+        final Collection<String> xpaths = xpathToUpdatedDataNode.keySet();
+        Collection<FragmentEntity> existingFragmentEntities = getFragmentEntities(anchorEntity, xpaths);
+        existingFragmentEntities = fragmentRepository.prefetchDescendantsOfFragmentEntities(
+            FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS, existingFragmentEntities);
+
+        for (final FragmentEntity existingFragmentEntity : existingFragmentEntities) {
+            final DataNode updatedDataNode = xpathToUpdatedDataNode.get(existingFragmentEntity.getXpath());
+            updateFragmentEntityAndDescendantsWithDataNode(existingFragmentEntity, updatedDataNode);
         }
 
-        return fragmentEntities;
+        try {
+            fragmentRepository.saveAll(existingFragmentEntities);
+        } catch (final StaleStateException staleStateException) {
+            retryUpdateDataNodesIndividually(anchorEntity, existingFragmentEntities);
+        }
     }
 
-    private FragmentEntity getFragmentEntity(final AnchorEntity anchorEntity, final String xpath) {
-        final FragmentEntity fragmentEntity;
-        if (isRootXpath(xpath)) {
-            fragmentEntity = fragmentRepository.findOneByAnchorId(anchorEntity.getId()).orElse(null);
-        } else {
-            fragmentEntity = fragmentRepository.getByAnchorAndXpath(anchorEntity, getNormalizedXpath(xpath));
+    private void retryUpdateDataNodesIndividually(final AnchorEntity anchorEntity,
+                                                  final Collection<FragmentEntity> fragmentEntities) {
+        final Collection<String> failedXpaths = new HashSet<>();
+        for (final FragmentEntity dataNodeFragment : fragmentEntities) {
+            try {
+                fragmentRepository.save(dataNodeFragment);
+            } catch (final StaleStateException staleStateException) {
+                failedXpaths.add(dataNodeFragment.getXpath());
+            }
         }
-        if (fragmentEntity == null) {
-            throw new DataNodeNotFoundException(anchorEntity.getDataspace().getName(), anchorEntity.getName(), xpath);
+        if (!failedXpaths.isEmpty()) {
+            final String failedXpathsConcatenated = String.join(",", failedXpaths);
+            throw new ConcurrencyException("Concurrent Transactions", String.format(
+                "DataNodes : %s in Dataspace :'%s' with Anchor : '%s'  are updated by another transaction.",
+                failedXpathsConcatenated, anchorEntity.getDataspace().getName(), anchorEntity.getName()));
         }
-        return fragmentEntity;
+    }
+
+    private void updateFragmentEntityAndDescendantsWithDataNode(final FragmentEntity existingFragmentEntity,
+                                                                final DataNode newDataNode) {
+        copyAttributesFromNewDataNode(existingFragmentEntity, newDataNode);
+
+        final Map<String, FragmentEntity> existingChildrenByXpath = existingFragmentEntity.getChildFragments().stream()
+            .collect(Collectors.toMap(FragmentEntity::getXpath, childFragmentEntity -> childFragmentEntity));
+
+        final Collection<FragmentEntity> updatedChildFragments = new HashSet<>();
+        for (final DataNode newDataNodeChild : newDataNode.getChildDataNodes()) {
+            final FragmentEntity childFragment;
+            if (isNewDataNode(newDataNodeChild, existingChildrenByXpath)) {
+                childFragment = convertToFragmentWithAllDescendants(existingFragmentEntity.getAnchor(),
+                    newDataNodeChild);
+            } else {
+                childFragment = existingChildrenByXpath.get(newDataNodeChild.getXpath());
+                updateFragmentEntityAndDescendantsWithDataNode(childFragment, newDataNodeChild);
+            }
+            updatedChildFragments.add(childFragment);
+        }
+
+        existingFragmentEntity.getChildFragments().clear();
+        existingFragmentEntity.getChildFragments().addAll(updatedChildFragments);
     }
 
     @Override
@@ -338,11 +351,6 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
         return createDataNodesFromFragmentEntities(fetchDescendantsOption, fragmentEntities);
     }
 
-    private List<Long> getAnchorIdsForPagination(final DataspaceEntity dataspaceEntity, final CpsPathQuery cpsPathQuery,
-                                                 final PaginationOption paginationOption) {
-        return fragmentRepository.findAnchorIdsForPagination(dataspaceEntity, cpsPathQuery, paginationOption);
-    }
-
     private List<DataNode> createDataNodesFromFragmentEntities(final FetchDescendantsOption fetchDescendantsOption,
                                                                final Collection<FragmentEntity> fragmentEntities) {
         final List<DataNode> dataNodes = new ArrayList<>(fragmentEntities.size());
@@ -352,29 +360,6 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
         return Collections.unmodifiableList(dataNodes);
     }
 
-    private static String getNormalizedXpath(final String xpathSource) {
-        if (isRootXpath(xpathSource)) {
-            return xpathSource;
-        }
-        try {
-            return CpsPathUtil.getNormalizedXpath(xpathSource);
-        } catch (final PathParsingException pathParsingException) {
-            throw new CpsPathException(pathParsingException.getMessage());
-        }
-    }
-
-    private static Collection<String> getNormalizedXpaths(final Collection<String> xpaths) {
-        final Collection<String> normalizedXpaths = new HashSet<>(xpaths.size());
-        for (final String xpath : xpaths) {
-            try {
-                normalizedXpaths.add(getNormalizedXpath(xpath));
-            } catch (final CpsPathException cpsPathException) {
-                log.warn("Error parsing xpath \"{}\": {}", xpath, cpsPathException.getMessage());
-            }
-        }
-        return normalizedXpaths;
-    }
-
     @Override
     public String startSession() {
         return sessionManager.startSession();
@@ -404,21 +389,6 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
         return anchorIdList.size();
     }
 
-    private static Set<String> processAncestorXpath(final Collection<FragmentEntity> fragmentEntities,
-                                                    final CpsPathQuery cpsPathQuery) {
-        final Set<String> ancestorXpath = new HashSet<>();
-        final Pattern pattern =
-                Pattern.compile("(.*/" + Pattern.quote(cpsPathQuery.getAncestorSchemaNodeIdentifier())
-                        + REG_EX_FOR_OPTIONAL_LIST_INDEX + "/.*");
-        for (final FragmentEntity fragmentEntity : fragmentEntities) {
-            final Matcher matcher = pattern.matcher(fragmentEntity.getXpath());
-            if (matcher.matches()) {
-                ancestorXpath.add(matcher.group(1));
-            }
-        }
-        return ancestorXpath;
-    }
-
     private DataNode toDataNode(final FragmentEntity fragmentEntity,
                                 final FetchDescendantsOption fetchDescendantsOption) {
         final List<DataNode> childDataNodes = getChildDataNodes(fragmentEntity, fetchDescendantsOption);
@@ -434,103 +404,15 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
                 .withChildDataNodes(childDataNodes).build();
     }
 
-    private List<DataNode> getChildDataNodes(final FragmentEntity fragmentEntity,
-                                             final FetchDescendantsOption fetchDescendantsOption) {
-        if (fetchDescendantsOption.hasNext()) {
-            return fragmentEntity.getChildFragments().stream()
-                    .map(childFragmentEntity -> toDataNode(childFragmentEntity, fetchDescendantsOption.next()))
-                    .collect(Collectors.toList());
-        }
-        return Collections.emptyList();
-    }
-
-    @Override
-    public void batchUpdateDataLeaves(final String dataspaceName, final String anchorName,
-                                        final Map<String, Map<String, Serializable>> updatedLeavesPerXPath) {
-        final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName);
-
-        final Collection<String> xpathsOfUpdatedLeaves = updatedLeavesPerXPath.keySet();
-        final Collection<FragmentEntity> fragmentEntities = getFragmentEntities(anchorEntity, xpathsOfUpdatedLeaves);
-
-        for (final FragmentEntity fragmentEntity : fragmentEntities) {
-            final Map<String, Serializable> updatedLeaves = updatedLeavesPerXPath.get(fragmentEntity.getXpath());
-            final String mergedLeaves = mergeLeaves(updatedLeaves, fragmentEntity.getAttributes());
-            fragmentEntity.setAttributes(mergedLeaves);
-        }
-
-        try {
-            fragmentRepository.saveAll(fragmentEntities);
-        } catch (final StaleStateException staleStateException) {
-            retryUpdateDataNodesIndividually(anchorEntity, fragmentEntities);
-        }
-    }
-
-    @Override
-    public void updateDataNodesAndDescendants(final String dataspaceName, final String anchorName,
-                                              final Collection<DataNode> updatedDataNodes) {
-        final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName);
-
-        final Map<String, DataNode> xpathToUpdatedDataNode = updatedDataNodes.stream()
-            .collect(Collectors.toMap(DataNode::getXpath, dataNode -> dataNode));
-
-        final Collection<String> xpaths = xpathToUpdatedDataNode.keySet();
-        Collection<FragmentEntity> existingFragmentEntities = getFragmentEntities(anchorEntity, xpaths);
-        existingFragmentEntities = fragmentRepository.prefetchDescendantsOfFragmentEntities(
-                FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS, existingFragmentEntities);
-
-        for (final FragmentEntity existingFragmentEntity : existingFragmentEntities) {
-            final DataNode updatedDataNode = xpathToUpdatedDataNode.get(existingFragmentEntity.getXpath());
-            updateFragmentEntityAndDescendantsWithDataNode(existingFragmentEntity, updatedDataNode);
-        }
-
-        try {
-            fragmentRepository.saveAll(existingFragmentEntities);
-        } catch (final StaleStateException staleStateException) {
-            retryUpdateDataNodesIndividually(anchorEntity, existingFragmentEntities);
-        }
-    }
-
-    private void retryUpdateDataNodesIndividually(final AnchorEntity anchorEntity,
-                                                  final Collection<FragmentEntity> fragmentEntities) {
-        final Collection<String> failedXpaths = new HashSet<>();
-        for (final FragmentEntity dataNodeFragment : fragmentEntities) {
-            try {
-                fragmentRepository.save(dataNodeFragment);
-            } catch (final StaleStateException staleStateException) {
-                failedXpaths.add(dataNodeFragment.getXpath());
-            }
-        }
-        if (!failedXpaths.isEmpty()) {
-            final String failedXpathsConcatenated = String.join(",", failedXpaths);
-            throw new ConcurrencyException("Concurrent Transactions", String.format(
-                    "DataNodes : %s in Dataspace :'%s' with Anchor : '%s'  are updated by another transaction.",
-                    failedXpathsConcatenated, anchorEntity.getDataspace().getName(), anchorEntity.getName()));
-        }
+    private FragmentEntity toFragmentEntity(final AnchorEntity anchorEntity, final DataNode dataNode) {
+        return FragmentEntity.builder()
+            .anchor(anchorEntity)
+            .xpath(dataNode.getXpath())
+            .attributes(jsonObjectMapper.asJsonString(dataNode.getLeaves()))
+            .build();
     }
 
-    private void updateFragmentEntityAndDescendantsWithDataNode(final FragmentEntity existingFragmentEntity,
-                                                                final DataNode newDataNode) {
-        copyAttributesFromNewDataNode(existingFragmentEntity, newDataNode);
-
-        final Map<String, FragmentEntity> existingChildrenByXpath = existingFragmentEntity.getChildFragments().stream()
-                .collect(Collectors.toMap(FragmentEntity::getXpath, childFragmentEntity -> childFragmentEntity));
-
-        final Collection<FragmentEntity> updatedChildFragments = new HashSet<>();
-        for (final DataNode newDataNodeChild : newDataNode.getChildDataNodes()) {
-            final FragmentEntity childFragment;
-            if (isNewDataNode(newDataNodeChild, existingChildrenByXpath)) {
-                childFragment = convertToFragmentWithAllDescendants(existingFragmentEntity.getAnchor(),
-                    newDataNodeChild);
-            } else {
-                childFragment = existingChildrenByXpath.get(newDataNodeChild.getXpath());
-                updateFragmentEntityAndDescendantsWithDataNode(childFragment, newDataNodeChild);
-            }
-            updatedChildFragments.add(childFragment);
-        }
 
-        existingFragmentEntity.getChildFragments().clear();
-        existingFragmentEntity.getChildFragments().addAll(updatedChildFragments);
-    }
 
     @Override
     @Transactional
@@ -636,6 +518,116 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
         }
     }
 
+    @Override
+    @Timed(value = "cps.data.persistence.service.datanode.get",
+        description = "Time taken to get a data node")
+    public Collection<DataNode> getDataNodes(final String dataspaceName, final String anchorName,
+                                             final String xpath,
+                                             final FetchDescendantsOption fetchDescendantsOption) {
+        final String targetXpath = getNormalizedXpath(xpath);
+        final Collection<DataNode> dataNodes = getDataNodesForMultipleXpaths(dataspaceName, anchorName,
+            Collections.singletonList(targetXpath), fetchDescendantsOption);
+        if (dataNodes.isEmpty()) {
+            throw new DataNodeNotFoundException(dataspaceName, anchorName, xpath);
+        }
+        return dataNodes;
+    }
+
+    @Override
+    @Timed(value = "cps.data.persistence.service.datanode.batch.get",
+        description = "Time taken to get data nodes")
+    public Collection<DataNode> getDataNodesForMultipleXpaths(final String dataspaceName, final String anchorName,
+                                                              final Collection<String> xpaths,
+                                                              final FetchDescendantsOption fetchDescendantsOption) {
+        final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName);
+        Collection<FragmentEntity> fragmentEntities = getFragmentEntities(anchorEntity, xpaths);
+        fragmentEntities = fragmentRepository.prefetchDescendantsOfFragmentEntities(fetchDescendantsOption,
+            fragmentEntities);
+        return createDataNodesFromFragmentEntities(fetchDescendantsOption, fragmentEntities);
+    }
+
+    private List<DataNode> getChildDataNodes(final FragmentEntity fragmentEntity,
+                                             final FetchDescendantsOption fetchDescendantsOption) {
+        if (fetchDescendantsOption.hasNext()) {
+            return fragmentEntity.getChildFragments().stream()
+                .map(childFragmentEntity -> toDataNode(childFragmentEntity, fetchDescendantsOption.next()))
+                .collect(Collectors.toList());
+        }
+        return Collections.emptyList();
+    }
+
+    private AnchorEntity getAnchorEntity(final String dataspaceName, final String anchorName) {
+        final DataspaceEntity dataspaceEntity = dataspaceRepository.getByName(dataspaceName);
+        return anchorRepository.getByDataspaceAndName(dataspaceEntity, anchorName);
+    }
+
+    private List<Long> getAnchorIdsForPagination(final DataspaceEntity dataspaceEntity, final CpsPathQuery cpsPathQuery,
+                                                 final PaginationOption paginationOption) {
+        return fragmentRepository.findAnchorIdsForPagination(dataspaceEntity, cpsPathQuery, paginationOption);
+    }
+
+    private static String getNormalizedXpath(final String xpathSource) {
+        if (isRootXpath(xpathSource)) {
+            return xpathSource;
+        }
+        try {
+            return CpsPathUtil.getNormalizedXpath(xpathSource);
+        } catch (final PathParsingException pathParsingException) {
+            throw new CpsPathException(pathParsingException.getMessage());
+        }
+    }
+
+    private static Collection<String> getNormalizedXpaths(final Collection<String> xpaths) {
+        final Collection<String> normalizedXpaths = new HashSet<>(xpaths.size());
+        for (final String xpath : xpaths) {
+            try {
+                normalizedXpaths.add(getNormalizedXpath(xpath));
+            } catch (final CpsPathException cpsPathException) {
+                log.warn("Error parsing xpath \"{}\": {}", xpath, cpsPathException.getMessage());
+            }
+        }
+        return normalizedXpaths;
+    }
+
+    private FragmentEntity getFragmentEntity(final AnchorEntity anchorEntity, final String xpath) {
+        final FragmentEntity fragmentEntity;
+        if (isRootXpath(xpath)) {
+            fragmentEntity = fragmentRepository.findOneByAnchorId(anchorEntity.getId()).orElse(null);
+        } else {
+            fragmentEntity = fragmentRepository.getByAnchorAndXpath(anchorEntity, getNormalizedXpath(xpath));
+        }
+        if (fragmentEntity == null) {
+            throw new DataNodeNotFoundException(anchorEntity.getDataspace().getName(), anchorEntity.getName(), xpath);
+        }
+        return fragmentEntity;
+    }
+
+    private Collection<FragmentEntity> getFragmentEntities(final AnchorEntity anchorEntity,
+                                                           final Collection<String> xpaths) {
+        final Collection<String> normalizedXpaths = getNormalizedXpaths(xpaths);
+
+        final boolean haveRootXpath = normalizedXpaths.removeIf(CpsDataPersistenceServiceImpl::isRootXpath);
+
+        final List<FragmentEntity> fragmentEntities = fragmentRepository.findByAnchorAndXpathIn(anchorEntity,
+            normalizedXpaths);
+
+        for (final FragmentEntity fragmentEntity : fragmentEntities) {
+            normalizedXpaths.remove(fragmentEntity.getXpath());
+        }
+
+        for (final String xpath : normalizedXpaths) {
+            if (!CpsPathUtil.isPathToListElement(xpath)) {
+                fragmentEntities.addAll(fragmentRepository.findListByAnchorAndXpath(anchorEntity, xpath));
+            }
+        }
+
+        if (haveRootXpath) {
+            fragmentEntities.addAll(fragmentRepository.findRootsByAnchorId(anchorEntity.getId()));
+        }
+
+        return fragmentEntities;
+    }
+
     private static String getListElementXpathPrefix(final Collection<DataNode> newListElements) {
         if (newListElements.isEmpty()) {
             throw new CpsAdminException("Invalid list replacement",
@@ -660,20 +652,6 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
         return existingListElementEntity;
     }
 
-    private static boolean isNewDataNode(final DataNode replacementDataNode,
-                                         final Map<String, FragmentEntity> existingListElementsByXpath) {
-        return !existingListElementsByXpath.containsKey(replacementDataNode.getXpath());
-    }
-
-    private void copyAttributesFromNewDataNode(final FragmentEntity existingFragmentEntity,
-                                               final DataNode newDataNode) {
-        final String oldOrderedLeavesAsJson = getOrderedLeavesAsJson(existingFragmentEntity.getAttributes());
-        final String newOrderedLeavesAsJson = getOrderedLeavesAsJson(newDataNode.getLeaves());
-        if (!oldOrderedLeavesAsJson.equals(newOrderedLeavesAsJson)) {
-            existingFragmentEntity.setAttributes(jsonObjectMapper.asJsonString(newDataNode.getLeaves()));
-        }
-    }
-
     private String getOrderedLeavesAsJson(final Map<String, Serializable> currentLeaves) {
         final Map<String, Serializable> sortedLeaves = new TreeMap<>(String::compareTo);
         sortedLeaves.putAll(currentLeaves);
@@ -685,7 +663,7 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
             return "{}";
         }
         final Map<String, Serializable> sortedLeaves = jsonObjectMapper.convertJsonString(currentLeavesAsString,
-                TreeMap.class);
+            TreeMap.class);
         return jsonObjectMapper.asJsonString(sortedLeaves);
     }
 
@@ -696,10 +674,39 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
                 .collect(Collectors.toMap(FragmentEntity::getXpath, fragmentEntity -> fragmentEntity));
     }
 
+    private static Set<String> processAncestorXpath(final Collection<FragmentEntity> fragmentEntities,
+                                                    final CpsPathQuery cpsPathQuery) {
+        final Set<String> ancestorXpath = new HashSet<>();
+        final Pattern pattern =
+            Pattern.compile("(.*/" + Pattern.quote(cpsPathQuery.getAncestorSchemaNodeIdentifier())
+                + REG_EX_FOR_OPTIONAL_LIST_INDEX + "/.*");
+        for (final FragmentEntity fragmentEntity : fragmentEntities) {
+            final Matcher matcher = pattern.matcher(fragmentEntity.getXpath());
+            if (matcher.matches()) {
+                ancestorXpath.add(matcher.group(1));
+            }
+        }
+        return ancestorXpath;
+    }
+
     private static boolean isRootXpath(final String xpath) {
         return "/".equals(xpath) || "".equals(xpath);
     }
 
+    private static boolean isNewDataNode(final DataNode replacementDataNode,
+                                         final Map<String, FragmentEntity> existingListElementsByXpath) {
+        return !existingListElementsByXpath.containsKey(replacementDataNode.getXpath());
+    }
+
+    private void copyAttributesFromNewDataNode(final FragmentEntity existingFragmentEntity,
+                                               final DataNode newDataNode) {
+        final String oldOrderedLeavesAsJson = getOrderedLeavesAsJson(existingFragmentEntity.getAttributes());
+        final String newOrderedLeavesAsJson = getOrderedLeavesAsJson(newDataNode.getLeaves());
+        if (!oldOrderedLeavesAsJson.equals(newOrderedLeavesAsJson)) {
+            existingFragmentEntity.setAttributes(jsonObjectMapper.asJsonString(newDataNode.getLeaves()));
+        }
+    }
+
     private String mergeLeaves(final Map<String, Serializable> updateLeaves, final String currentLeavesAsString) {
         Map<String, Serializable> currentLeavesAsMap = new HashMap<>();
         if (currentLeavesAsString != null) {
@@ -712,9 +719,4 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
         }
         return jsonObjectMapper.asJsonString(currentLeavesAsMap);
     }
-
-    private AnchorEntity getAnchorEntity(final String dataspaceName, final String anchorName) {
-        final DataspaceEntity dataspaceEntity = dataspaceRepository.getByName(dataspaceName);
-        return anchorRepository.getByDataspaceAndName(dataspaceEntity, anchorName);
-    }
 }
index 1f66119..17dce16 100644 (file)
@@ -46,18 +46,19 @@ NCMP Data Operation, forwarded to DMI, response on Client Topic
         ${params}=                       Create Dictionary   topic=${topic}
         ${headers}=                      Create Dictionary   Content-Type=application/json         Authorization=${auth}
                                          POST On Session     CPS_URL   ncmpInventory/v1/ch         headers=${headers}     data=${newCmHandleRequestBody}
-        Sleep                            8                   wait some time to get updated the cm handle state to READY
+        ${getCmHandleUri}=               Set Variable        ${ncmpBasePath}/v1/ch/CMHandle1
+        ${getCmHandleHeaders}=           Create Dictionary   Authorization=${auth}
+        Wait Until Keyword Succeeds      8sec    100ms       Is CM Handle READY    ${getCmHandleUri}    ${getCmHandleHeaders}    CMHandle1
         ${response}=                     POST On Session     CPS_URL   ${uri}   params=${params}   headers=${headers}     data=${dataOperationReqBody}
         Set Global Variable              ${expectedRequestId}       ${response.json()}[requestId]
         Should Be Equal As Strings       ${response.status_code}   200
-        Sleep                            5                         wait some time to get published a message to the client topic
 
 Consume cloud event from client topic
     ${group_id}=         Create Consumer     auto_offset_reset=earliest
     Subscribe Topic      topics=${topic}     group_id=${group_id}
     ${messages}=         Poll                group_id=${group_id}     only_value=false
-    ${event}                        Set Variable                      ${messages}[0]
-    ${headers}                      Set Variable                      ${event.headers()}
+    ${event}             Set Variable        ${messages}[0]
+    ${headers}           Set Variable        ${event.headers()}
     FOR   ${header_key_value_pair}   IN  @{headers}
         Compare Header Values       ${header_key_value_pair[0]}   ${header_key_value_pair[1]}      "ce_specversion"      "1.0"
         Compare Header Values       ${header_key_value_pair[0]}   ${header_key_value_pair[1]}      "ce_type"             "org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent"
@@ -68,9 +69,19 @@ Consume cloud event from client topic
 
 *** Keywords ***
 Compare Header Values
-    [Arguments]                    ${header_key}        ${header_value}     ${header_to_check}       ${expected_header_value}
+    [Arguments]    ${header_key}    ${header_value}    ${header_to_check}    ${expected_header_value}
     IF   "${header_key}" == ${header_to_check}
-        Should Be Equal As Strings              "${header_value}"    ${expected_header_value}
+        Should Be Equal As Strings    "${header_value}"    ${expected_header_value}
+    END
+
+Is CM Handle READY
+    [Arguments]    ${uri}    ${headers}    ${cmHandle}
+    ${response}=    GET On Session    CPS_URL    ${uri}    headers=${headers}
+    Should Be Equal As Strings    ${response.status_code}    200
+    FOR  ${item}  IN  ${response.json()}
+            IF  "${item['cmHandle']}" == "${cmHandle}"
+                Should Be Equal As Strings    ${item['state']['cmHandleState']}    READY
+            END
     END
 
 Basic Teardown
index 71de4be..c0ee4da 100644 (file)
@@ -34,20 +34,40 @@ ${auth}                   Basic Y3BzdXNlcjpjcHNyMGNrcyE=
 ${ncmpBasePath}           /ncmp
 
 *** Test Cases ***
+
+Check if ietfYang-PNFDemo is READY
+    ${uri}=        Set Variable       ${ncmpBasePath}/v1/ch/ietfYang-PNFDemo
+    ${headers}=    Create Dictionary  Authorization=${auth}
+    Wait Until Keyword Succeeds       10sec    100ms    Is CM Handle READY    ${uri}    ${headers}    ietfYang-PNFDemo
+
 Operational state goes to UNSYNCHRONIZED when data sync (flag) is enabled
     ${uri}=              Set Variable       ${ncmpBasePath}/v1/ch/ietfYang-PNFDemo/data-sync
     ${params}=           Create Dictionary  dataSyncEnabled=true
     ${headers}=          Create Dictionary  Authorization=${auth}
     ${response}=         PUT On Session     CPS_URL   ${uri}   params=${params}   headers=${headers}
     Should Be Equal As Strings              ${response.status_code}   200
-    ${verifyUri}=              Set Variable       ${ncmpBasePath}/v1/ch/ietfYang-PNFDemo/state
-    ${verifyHeaders}=          Create Dictionary  Authorization=${auth}
-    ${verifyResponse}=         GET On Session     CPS_URL   ${verifyUri}   headers=${verifyHeaders}
-    Should Be Equal As Strings                    ${verifyResponse.json()['state']['dataSyncState']['operational']['syncState']}   UNSYNCHRONIZED
-    Sleep    5
+    ${verifyUri}=        Set Variable       ${ncmpBasePath}/v1/ch/ietfYang-PNFDemo/state
+    ${verifyHeaders}=    Create Dictionary  Authorization=${auth}
+    ${verifyResponse}=   GET On Session     CPS_URL   ${verifyUri}   headers=${verifyHeaders}
+    Should Be Equal As Strings    ${verifyResponse.json()['state']['dataSyncState']['operational']['syncState']}   UNSYNCHRONIZED
 
 Operational state goes to SYNCHRONIZED after sometime when data sync (flag) is enabled
-    ${uri}=              Set Variable       ${ncmpBasePath}/v1/ch/ietfYang-PNFDemo/state
-    ${headers}=          Create Dictionary  Authorization=${auth}
-    ${response}=         GET On Session     CPS_URL   ${uri}   headers=${headers}
-    Should Be Equal As Strings              ${response.json()['state']['dataSyncState']['operational']['syncState']}   SYNCHRONIZED
\ No newline at end of file
+    ${uri}=        Set Variable       ${ncmpBasePath}/v1/ch/ietfYang-PNFDemo/state
+    ${headers}=    Create Dictionary  Authorization=${auth}
+    Wait Until Keyword Succeeds    10sec    100ms    Is CM Handle State SYNCHRONIZED    ${uri}    ${headers}
+
+*** Keywords ***
+Is CM Handle READY
+    [Arguments]    ${uri}    ${headers}    ${cmHandle}
+    ${response}=    GET On Session    CPS_URL    ${uri}    headers=${headers}
+    Should Be Equal As Strings    ${response.status_code}    200
+    FOR  ${item}  IN  ${response.json()}
+            IF  "${item['cmHandle']}" == "${cmHandle}"
+                Should Be Equal As Strings    ${item['state']['cmHandleState']}    READY
+            END
+    END
+
+Is CM Handle State SYNCHRONIZED
+    [Arguments]    ${uri}    ${headers}
+    ${response}=   GET On Session     CPS_URL    ${uri}    headers=${headers}
+    Should Be Equal As Strings        ${response.json()['state']['dataSyncState']['operational']['syncState']}    SYNCHRONIZED
index 704d02c..3e8551f 100644 (file)
@@ -85,5 +85,4 @@ Get modules for registered data node
             IF   "${item['moduleName']}" == "stores"
                 Should Be Equal As Strings              "${item['revision']}"   "2020-09-15"
             END
-    END
-    Sleep    10
\ No newline at end of file
+    END
\ No newline at end of file
index 7065900..e4deeff 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2023 Nordix Foundation
+ *  Copyright (C) 2023-2024 Nordix Foundation
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -44,14 +44,13 @@ Register data node
     ${headers}=     Create Dictionary    Content-Type=application/json    Authorization=${auth}
     ${response}=    POST On Session      CPS_URL   ${uri}    headers=${headers}    data=${jsonCreateCmHandles}
     Should Be Equal As Strings           ${response.status_code}    200
-    Sleep           5
 
 Verify notification
     ${group_id}=         Create Consumer     auto_offset_reset=earliest
-    Subscribe Topic      topics=cm-events     group_id=${group_id}
-    ${result}=      Poll                    group_id=${group_id}  only_value=False  poll_attempts=5
-    ${headers}                      Set Variable                ${result[0].headers()}
-    ${payload}                      Set Variable                ${result[0].value()}
+    Subscribe Topic      topics=cm-events    group_id=${group_id}
+    ${result}=           Poll                group_id=${group_id}  only_value=False  poll_attempts=5
+    ${headers}           Set Variable        ${result[0].headers()}
+    ${payload}           Set Variable        ${result[0].value()}
     FOR   ${header_key_value_pair}   IN  @{headers}
         Compare Header Values       ${header_key_value_pair[0]}   ${header_key_value_pair[1]}     "ce_specversion"      "1.0"
         Compare Header Values       ${header_key_value_pair[0]}   ${header_key_value_pair[1]}     "ce_source"           "NCMP"