handle partial failure on batch state update 23/130923/1
authormpriyank <priyank.maheshwari@est.tech>
Tue, 13 Sep 2022 18:00:59 +0000 (19:00 +0100)
committermpriyank <priyank.maheshwari@est.tech>
Tue, 13 Sep 2022 20:42:14 +0000 (21:42 +0100)
- catching of failures on retry of individual nodes on batch update
- test scenarios for the same

Issue-ID: CPS-1232
Issue-ID: CPS-1126
Change-Id: I9dc13e7bbe44673f8ac14fbde08a85d6a5142487
Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java
cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceSpec.groovy
cps-service/src/main/java/org/onap/cps/spi/exceptions/ConcurrencyException.java

index d62421c..c13422d 100644 (file)
@@ -234,8 +234,8 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
             } catch (final PathParsingException e) {
                 throw new CpsPathException(e.getMessage());
             }
-            return fragmentRepository.getByDataspaceAndAnchorAndXpath(dataspaceEntity, anchorEntity,
-                    normalizedXpath);
+
+            return fragmentRepository.getByDataspaceAndAnchorAndXpath(dataspaceEntity, anchorEntity, normalizedXpath);
         }
     }
 
@@ -345,8 +345,7 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
         } catch (final StaleStateException staleStateException) {
             throw new ConcurrencyException("Concurrent Transactions",
                     String.format("dataspace :'%s', Anchor : '%s' and xpath: '%s' is updated by another transaction.",
-                            dataspaceName, anchorName, dataNode.getXpath()),
-                    staleStateException);
+                            dataspaceName, anchorName, dataNode.getXpath()));
         }
     }
 
@@ -354,6 +353,7 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
     public void updateDataNodesAndDescendants(final String dataspaceName,
                                               final String anchorName,
                                               final List<DataNode> dataNodes) {
+
         final Map<DataNode, FragmentEntity> dataNodeFragmentEntityMap = dataNodes.stream()
             .collect(Collectors.toMap(
                 dataNode -> dataNode, dataNode -> getFragmentByXpath(dataspaceName, anchorName, dataNode.getXpath())));
@@ -362,10 +362,27 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
         try {
             fragmentRepository.saveAll(dataNodeFragmentEntityMap.values());
         } catch (final StaleStateException staleStateException) {
-            throw new ConcurrencyException("Concurrent Transactions",
-                String.format("A data node in dataspace :'%s' with Anchor : '%s' is updated by another transaction.",
-                    dataspaceName, anchorName),
-                staleStateException);
+            retryUpdateDataNodesIndividually(dataspaceName, anchorName, dataNodeFragmentEntityMap.values());
+        }
+    }
+
+    private void retryUpdateDataNodesIndividually(final String dataspaceName, final String anchorName,
+            final Collection<FragmentEntity> fragmentEntities) {
+        final Collection<String> failedXpaths = new HashSet<>();
+
+        fragmentEntities.forEach(dataNodeFragment -> {
+            try {
+                fragmentRepository.save(dataNodeFragment);
+            } catch (final StaleStateException e) {
+                failedXpaths.add(dataNodeFragment.getXpath());
+            }
+        });
+
+        if (!failedXpaths.isEmpty()) {
+            final String failedXpathsConcatenated = String.join(",", failedXpaths);
+            throw new ConcurrencyException("Concurrent Transactions", String.format(
+                    "DataNodes : %s in Dataspace :'%s' with Anchor : '%s'  are updated by another transaction.",
+                    failedXpathsConcatenated, dataspaceName, anchorName));
         }
     }
 
index 1bbf358..470b03a 100644 (file)
@@ -86,17 +86,25 @@ class CpsDataPersistenceServiceSpec extends Specification {
             assert concurrencyException.getDetails().contains('/some/xpath')
     }
 
-    def 'Handling of StaleStateException (caused by concurrent updates) during  update data nodes and descendants.'() {
-        given: 'the fragment repository returns a list of fragment entities'
-            mockFragmentRepository.getByDataspaceAndAnchorAndXpath(*_) >> new FragmentEntity()
-        and: 'a data node is concurrently updated by another transaction'
+    def 'Handling of StaleStateException (caused by concurrent updates) during update data nodes and descendants.'() {
+        given: 'the system contains and can update one datanode'
+            def dataNode1 = mockDataNodeAndFragmentEntity('/node1', 'OK')
+        and: 'the system contains two more datanodes that throw an exception while updating'
+            def dataNode2 = mockDataNodeAndFragmentEntity('/node2', 'EXCEPTION')
+            def dataNode3 = mockDataNodeAndFragmentEntity('/node3', 'EXCEPTION')
+        and: 'the batch update will therefore also fail'
             mockFragmentRepository.saveAll(*_) >> { throw new StaleStateException("concurrent updates") }
-        when: 'attempt to update data node with submitted data nodes'
-            objectUnderTest.updateDataNodesAndDescendants('some-dataspace', 'some-anchor', [])
+        when: 'attempt batch update data nodes'
+            objectUnderTest.updateDataNodesAndDescendants('some-dataspace', 'some-anchor', [dataNode1, dataNode2, dataNode3])
         then: 'concurrency exception is thrown'
-            def concurrencyException = thrown(ConcurrencyException)
-            assert concurrencyException.getDetails().contains('some-dataspace')
-            assert concurrencyException.getDetails().contains('some-anchor')
+            def thrown = thrown(ConcurrencyException)
+            assert thrown.message == 'Concurrent Transactions'
+        and: 'it does not contain the successfull datanode'
+            assert !thrown.details.contains('/node1')
+        and: 'it contains the failed datanodes'
+            assert thrown.details.contains('/node2')
+            assert thrown.details.contains('/node3')
+
     }
 
     def 'Retrieving a data node with a property JSON value of #scenario'() {
@@ -193,4 +201,14 @@ class CpsDataPersistenceServiceSpec extends Specification {
                 assert fragmentEntities.size() == 2
             }})
     }
+
+    def mockDataNodeAndFragmentEntity(xpath, scenario) {
+        def dataNode = new DataNodeBuilder().withXpath(xpath).build()
+        def fragmentEntity = new FragmentEntity(xpath: xpath, childFragments: [])
+        mockFragmentRepository.getByDataspaceAndAnchorAndXpath(_, _, xpath) >> fragmentEntity
+        if ('EXCEPTION' == scenario) {
+            mockFragmentRepository.save(fragmentEntity) >> { throw new StaleStateException("concurrent updates") }
+        }
+        return dataNode
+    }
 }
\ No newline at end of file
index 3a8a94b..b5eae93 100644 (file)
@@ -20,8 +20,8 @@ package org.onap.cps.spi.exceptions;
 
 public class ConcurrencyException extends CpsException {
 
-    public ConcurrencyException(final String message, final String details, final Throwable cause) {
-        super(message, details, cause);
+    public ConcurrencyException(final String message, final String details) {
+        super(message, details);
     }
 
 }