Fix for cm handles stuck in LOCKED during registration 11/140011/4
authorToineSiebelink <toine.siebelink@est.tech>
Thu, 23 Jan 2025 12:34:40 +0000 (12:34 +0000)
committerToineSiebelink <toine.siebelink@est.tech>
Thu, 23 Jan 2025 15:34:05 +0000 (15:34 +0000)
- Additional Error logging when cm handles fail module sync
- Swallow already defined exception upon schema and/or anchor creation
- Updated integration test to try to reproduce the problem (but couldn't)
- Ignored integration tests that depend/affected by race conditions
  (they are useful for troubleshooting but not for pipeline checks)
- Removed last remnants of springboot retry annotation option (incl dependencies)

Issue-ID: CPS-2576
Change-Id: I910e802268332f955134c043bd1b46a7ec57233b
Signed-off-by: ToineSiebelink <toine.siebelink@est.tech>
cps-application/src/main/java/org/onap/cps/Application.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncService.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncServiceSpec.groovy
cps-rest/pom.xml
cps-ri/pom.xml
cps-ri/src/main/java/org/onap/cps/ri/CpsModulePersistenceServiceImpl.java
integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy

index 053139f..62103bf 100644 (file)
@@ -22,9 +22,7 @@ package org.onap.cps;
 \r
 import org.springframework.boot.SpringApplication;\r
 import org.springframework.boot.autoconfigure.SpringBootApplication;\r
-import org.springframework.retry.annotation.EnableRetry;\r
 \r
-@EnableRetry\r
 @SpringBootApplication\r
 public class Application {\r
     public static void main(final String[] args) {\r
index 041daa0..6c1dc73 100644 (file)
@@ -35,6 +35,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.api.CpsAnchorService;
 import org.onap.cps.api.CpsDataService;
 import org.onap.cps.api.CpsModuleService;
+import org.onap.cps.api.exceptions.AlreadyDefinedException;
+import org.onap.cps.api.exceptions.DuplicatedYangResourceException;
 import org.onap.cps.api.model.ModuleReference;
 import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
 import org.onap.cps.utils.ContentType;
@@ -68,7 +70,12 @@ public class ModuleSyncService {
         final String moduleSetTag = yangModelCmHandle.getModuleSetTag();
         final String schemaSetName = getSchemaSetName(cmHandleId, moduleSetTag);
         syncAndCreateSchemaSet(yangModelCmHandle, schemaSetName);
-        cpsAnchorService.createAnchor(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, schemaSetName, cmHandleId);
+        try {
+            cpsAnchorService.createAnchor(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, schemaSetName, cmHandleId);
+        } catch (final AlreadyDefinedException alreadyDefinedException) {
+            log.warn("Ignoring (anchor) already exist exception for {}. Exception details: ",
+                yangModelCmHandle.getId(), alreadyDefinedException);
+        }
     }
 
     /**
@@ -99,14 +106,27 @@ public class ModuleSyncService {
     private void syncAndCreateSchemaSet(final YangModelCmHandle yangModelCmHandle, final String schemaSetName) {
         if (isNewSchemaSet(schemaSetName)) {
             final ModuleDelta moduleDelta = getModuleDelta(yangModelCmHandle);
-            log.info("Creating Schema Set {} for CM Handle {}", schemaSetName, yangModelCmHandle.getId());
-            cpsModuleService.createSchemaSetFromModules(
+            try {
+                log.info("Creating Schema Set {} for CM Handle {}", schemaSetName, yangModelCmHandle.getId());
+                cpsModuleService.createSchemaSetFromModules(
                     NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME,
                     schemaSetName,
                     moduleDelta.newModuleNameToContentMap,
                     moduleDelta.allModuleReferences
-            );
-            log.info("Successfully created Schema Set {} for CM Handle {}", schemaSetName, yangModelCmHandle.getId());
+                );
+                log.info("Successfully created Schema Set {} for CM Handle {}", schemaSetName,
+                    yangModelCmHandle.getId());
+            } catch (final AlreadyDefinedException alreadyDefinedException) {
+                log.warn("Ignoring already exist (schema set) exception for {}. Exception details: ",
+                     yangModelCmHandle.getId(), alreadyDefinedException);
+            } catch (final DuplicatedYangResourceException duplicatedYangResourceException) {
+                log.warn("Duplicate Yang Resource {} creation for {}. "
+                        + "CM Handle will be LOCKED (for retry). Exception details: ",
+                    duplicatedYangResourceException.getName(),
+                    yangModelCmHandle.getId(),
+                    duplicatedYangResourceException);
+                throw duplicatedYangResourceException;
+            }
         }
     }
 
index 40404b7..b634967 100644 (file)
@@ -114,7 +114,7 @@ public class ModuleSyncTasks {
             compositeState.setLockReason(null);
             return CmHandleState.READY;
         } catch (final Exception e) {
-            log.warn("Processing of {} failed,reason : {}.", yangModelCmHandle.getId(), e.getMessage());
+            log.warn("Processing of {} failed,reason : ", yangModelCmHandle.getId(), e);
             final LockReasonCategory lockReasonCategory = inUpgrade
                     ? LockReasonCategory.MODULE_UPGRADE_FAILED
                     : LockReasonCategory.MODULE_SYNC_FAILED;
index 7881375..868609e 100644 (file)
@@ -90,21 +90,45 @@ class ModuleSyncServiceSpec extends Specification {
             'without' | ''
     }
 
-    def 'Attempt Sync models for a cm handle with existing schema set (#originalException).'() {
+    def 'Sync models for a cm handle with already defined exception upon schema set creation.'() {
         given: 'a cm handle to be synced'
             def yangModelCmHandle = createAdvisedCmHandle('existing tag')
         and: 'dmi returns no new yang resources'
             mockDmiModelOperations.getNewYangResourcesFromDmi(*_) >> [:]
         and: 'already defined exception occurs when creating schema (existing)'
+            mockCpsModuleService.createSchemaSetFromModules(*_) >> { throw AlreadyDefinedException.forSchemaSet('', '', null)  }
+        when: 'module sync is triggered'
+            objectUnderTest.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle)
+        then: 'the exception is ignored'
+            noExceptionThrown()
+    }
+
+    def 'Sync models for a cm handle with already defined exception upon anchor set creation.'() {
+        given: 'a cm handle to be synced'
+            def yangModelCmHandle = createAdvisedCmHandle('existing tag')
+        and: 'dmi returns no new yang resources'
+            mockDmiModelOperations.getNewYangResourcesFromDmi(*_) >> [:]
+        and: 'already defined exception occurs when creating schema (existing)'
+            mockCpsAnchorService.createAnchor(*_) >> { throw AlreadyDefinedException.forAnchor('', '', null)  }
+        when: 'module sync is triggered'
+            objectUnderTest.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle)
+        then: 'the exception is ignored'
+            noExceptionThrown()
+    }
+
+    def 'Attempt Sync models for a cm handle with duplicate yang resources exception).'() {
+        given: 'a cm handle to be synced'
+            def yangModelCmHandle = createAdvisedCmHandle('existing tag')
+        and: 'dmi returns no new yang resources'
+            mockDmiModelOperations.getNewYangResourcesFromDmi(*_) >> [:]
+        and: 'duplicate yang resource exception occurs when creating schema'
+            def originalException = new DuplicatedYangResourceException('', '', null)
             mockCpsModuleService.createSchemaSetFromModules(*_) >> { throw originalException  }
         when: 'module sync is triggered'
             objectUnderTest.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle)
         then: 'same exception is thrown up'
             def thrownException = thrown(Exception)
             assert thrownException == originalException
-        where: 'following exceptions occur'
-            originalException << [AlreadyDefinedException.forSchemaSet('', '', null),
-                                  new DuplicatedYangResourceException('', '', null) ]
     }
 
     def 'Model upgrade without using Module Set Tags (legacy) where the modules are in database.'() {
index b04daf0..f72cef9 100644 (file)
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-jetty</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.springframework.retry</groupId>
-            <artifactId>spring-retry</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.springframework</groupId>
             <artifactId>spring-aspects</artifactId>
index 2492cb8..f065421 100644 (file)
             <groupId>org.springframework.boot</groupId>\r
             <artifactId>spring-boot-starter-validation</artifactId>\r
         </dependency>\r
-        <dependency>\r
-            <groupId>org.springframework.retry</groupId>\r
-            <artifactId>spring-retry</artifactId>\r
-        </dependency>\r
         <dependency>\r
             <groupId>org.springframework</groupId>\r
             <artifactId>spring-aspects</artifactId>\r
index 4f7492f..aaf6165 100755 (executable)
@@ -70,8 +70,6 @@ import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
 import org.opendaylight.yangtools.yang.parser.api.YangSyntaxErrorException;
 import org.opendaylight.yangtools.yang.parser.rfc7950.repo.YangModelDependencyInfo;
 import org.springframework.dao.DataIntegrityViolationException;
-import org.springframework.retry.RetryContext;
-import org.springframework.retry.support.RetrySynchronizationManager;
 import org.springframework.stereotype.Component;
 
 @Slf4j
@@ -269,18 +267,10 @@ public class CpsModulePersistenceServiceImpl implements CpsModulePersistenceServ
                 yangResourceRepository.saveAll(newYangResourceEntities);
             } catch (final DataIntegrityViolationException dataIntegrityViolationException) {
                 // Throw a CPS duplicated Yang resource exception if the cause of the error is a yang checksum
-                // database constraint violation.
-                // If it is not, then throw the original exception
+                // database constraint violation. If it is not, then throw the original exception
                 final Optional<DuplicatedYangResourceException> convertedException =
                         convertToDuplicatedYangResourceException(
                                 dataIntegrityViolationException, newYangResourceEntities);
-                convertedException.ifPresent(
-                        e -> {
-                            final RetryContext retryContext = RetrySynchronizationManager.getContext();
-                            int retryCount = retryContext == null ? 0 : retryContext.getRetryCount();
-                            log.warn("Cannot persist duplicated yang resource. System will attempt this method "
-                                    + "up to 5 times. Current retry count : {}", ++retryCount, e);
-                        });
                 throw convertedException.isPresent() ? convertedException.get() : dataIntegrityViolationException;
             }
         }
index 9cb8c29..81fbc3c 100644 (file)
@@ -69,7 +69,7 @@ class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase {
             deregisterSequenceOfCmHandles(DMI1_URL, PARALLEL_SYNC_SAMPLE_SIZE, 1)
     }
 
-    /** this test has intermittent failures, due to timeouts.
+    /** this test has intermittent failures, due to race conditions
      *  Ignored but left here as it might be valuable to further optimization investigations.
      **/
     @Ignore
@@ -79,7 +79,6 @@ class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase {
             def cmHandlesPerTag = 250
             def totalCmHandles = numberOfTags * cmHandlesPerTag
             def offset = 1
-            def minimumBatches = totalCmHandles / 100
             registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'cps-2478-A', cmHandlesPerTag, offset)
         and: 'register anther 250 cm handles with module set tag cps-2478-B'
             offset += cmHandlesPerTag
@@ -89,23 +88,21 @@ class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase {
         when: 'sync all advised cm handles'
             objectUnderTest.moduleSyncAdvisedCmHandles()
             Thread.sleep(100)
-        then: 'retry until both schema sets are stored in db (1 schema set for each module set tag)'
-            def dbSchemaSetStorageTimer = meterRegistry.get('cps.module.persistence.schemaset.store').timer()
-            new PollingConditions().within(10, () -> {
-                objectUnderTest.moduleSyncAdvisedCmHandles()
-                Thread.sleep(100)
-                assert dbSchemaSetStorageTimer.count() == 2
-            })
-        then: 'wait till at least 5 batches of state updates are done (often more because of retries of locked cm handles)'
-            def dbStateUpdateTimer = meterRegistry.get('cps.ncmp.cmhandle.state.update.batch').timer()
+        then: 'Keep processing until there are no more LOCKED or ADVISED cm handles'
             new PollingConditions().within(10, () -> {
-                assert dbStateUpdateTimer.count() >= minimumBatches
+                def advised = cmHandlesByState.get('advisedCmHandlesCount')
+                def locked = cmHandlesByState.get('lockedCmHandlesCount')
+                if ( locked > 0 | advised > 0 ) {
+                    println "CPS-2576 Need to retry ${locked} LOCKED / ${advised} ADVISED cm Handles"
+                    objectUnderTest.moduleSyncAdvisedCmHandles()
+                    Thread.sleep(100)
+                }
+                assert cmHandlesByState.get('lockedCmHandlesCount') + cmHandlesByState.get('advisedCmHandlesCount') == 0
             })
-        and: 'one call to DMI per module set tag to get module references (may be more due to parallel processing of batches)'
-            def dmiModuleRetrievalTimer = meterRegistry.get('cps.ncmp.inventory.module.references.from.dmi').timer()
-            assert dmiModuleRetrievalTimer.count() >= numberOfTags && dmiModuleRetrievalTimer.count() <= minimumBatches
-
         and: 'log the relevant instrumentation'
+            def dmiModuleRetrievalTimer = meterRegistry.get('cps.ncmp.inventory.module.references.from.dmi').timer()
+            def dbSchemaSetStorageTimer = meterRegistry.get('cps.module.persistence.schemaset.store').timer()
+            def dbStateUpdateTimer = meterRegistry.get('cps.ncmp.cmhandle.state.update.batch').timer()
             logInstrumentation(dmiModuleRetrievalTimer, 'get modules from DMI   ')
             logInstrumentation(dbSchemaSetStorageTimer, 'store schema sets      ')
             logInstrumentation(dbStateUpdateTimer,      'batch state updates    ')
@@ -134,6 +131,10 @@ class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase {
             deregisterSequenceOfCmHandles(DMI1_URL, PARALLEL_SYNC_SAMPLE_SIZE, 1)
     }
 
+    /** this test has intermittent failures, due to race conditions
+     *  Ignored but left here as it might be valuable to further optimization investigations.
+     **/
+    @Ignore
     def 'Schema sets with overlapping modules processed at the same time (DB constraint violation).'() {
         given: 'register one batch (100) cm handles of tag A (with overlapping module names)'
             registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'tagA', 100, 1, ModuleNameStrategy.OVERLAPPING)