Remove retry mechanism for duplicate module(resource) errors 93/139993/4
authorToineSiebelink <toine.siebelink@est.tech>
Tue, 21 Jan 2025 13:35:03 +0000 (13:35 +0000)
committerToineSiebelink <toine.siebelink@est.tech>
Tue, 21 Jan 2025 15:45:22 +0000 (15:45 +0000)
- CM Handle with problems will go into 'LOCKED' state and retried using existing business logic

Issue-ID: CPS-2555
Change-Id: I563f6bc536d8fd8c6675891c62318ff2fb43fea6
Signed-off-by: ToineSiebelink <toine.siebelink@est.tech>
cps-application/src/test/groovy/org/onap/cps/config/MicroMeterConfigSpec.groovy
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncService.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncServiceSpec.groovy
cps-ri/src/main/java/org/onap/cps/ri/CpsModulePersistenceServiceImpl.java
cps-ri/src/test/groovy/org/onap/cps/ri/CpsModulePersistenceServiceConcurrencySpec.groovy [deleted file]
integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy
integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy

index da3afc6..9cef8de 100644 (file)
@@ -44,9 +44,8 @@ class MicroMeterConfigSpec extends Specification {
              objectUnderTest.lockedCmHandles(simpleMeterRegistry)
              objectUnderTest.deletingCmHandles(simpleMeterRegistry)
         then: 'each state has the correct value when queried'
-            def states = ["ADVISED", "READY", "LOCKED", "DELETING"]
-            states.each { state ->
-                def gaugeValue = simpleMeterRegistry.get("cmHandlesByState").tag("state",state).gauge().value()
+            ['ADVISED', 'READY', 'LOCKED', 'DELETING'].each { state ->
+                def gaugeValue = simpleMeterRegistry.get('cmHandlesByState').tag('state',state).gauge().value()
                 assert gaugeValue == 1
             }
     }
index 9534cf3..041daa0 100644 (file)
@@ -35,8 +35,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.api.CpsAnchorService;
 import org.onap.cps.api.CpsDataService;
 import org.onap.cps.api.CpsModuleService;
-import org.onap.cps.api.exceptions.AlreadyDefinedException;
-import org.onap.cps.api.exceptions.DuplicatedYangResourceException;
 import org.onap.cps.api.model.ModuleReference;
 import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
 import org.onap.cps.utils.ContentType;
@@ -101,20 +99,14 @@ public class ModuleSyncService {
     private void syncAndCreateSchemaSet(final YangModelCmHandle yangModelCmHandle, final String schemaSetName) {
         if (isNewSchemaSet(schemaSetName)) {
             final ModuleDelta moduleDelta = getModuleDelta(yangModelCmHandle);
-            try {
-                log.info("Creating Schema Set {} for CM Handle {}", schemaSetName, yangModelCmHandle.getId());
-                cpsModuleService.createSchemaSetFromModules(
-                        NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME,
-                        schemaSetName,
-                        moduleDelta.newModuleNameToContentMap,
-                        moduleDelta.allModuleReferences
-                );
-                log.info("Successfully created Schema Set {} for CM Handle {}",
-                    schemaSetName, yangModelCmHandle.getId());
-            } catch (final AlreadyDefinedException | DuplicatedYangResourceException exception) {
-                log.warn("Schema Set {} already exists, no need to (re)create it for {}",
-                    schemaSetName, yangModelCmHandle.getId());
-            }
+            log.info("Creating Schema Set {} for CM Handle {}", schemaSetName, yangModelCmHandle.getId());
+            cpsModuleService.createSchemaSetFromModules(
+                    NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME,
+                    schemaSetName,
+                    moduleDelta.newModuleNameToContentMap,
+                    moduleDelta.allModuleReferences
+            );
+            log.info("Successfully created Schema Set {} for CM Handle {}", schemaSetName, yangModelCmHandle.getId());
         }
     }
 
index f8adfe5..7881375 100644 (file)
@@ -90,20 +90,21 @@ class ModuleSyncServiceSpec extends Specification {
             'without' | ''
     }
 
-    def 'Attempt Sync models for a cm handle with existing schema set (#exception).'() {
+    def 'Attempt Sync models for a cm handle with existing schema set (#originalException).'() {
         given: 'a cm handle to be synced'
             def yangModelCmHandle = createAdvisedCmHandle('existing tag')
         and: 'dmi returns no new yang resources'
             mockDmiModelOperations.getNewYangResourcesFromDmi(*_) >> [:]
         and: 'already defined exception occurs when creating schema (existing)'
-            mockCpsModuleService.createSchemaSetFromModules(*_) >> { throw exception  }
+            mockCpsModuleService.createSchemaSetFromModules(*_) >> { throw originalException  }
         when: 'module sync is triggered'
             objectUnderTest.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle)
-        then: 'no exception is thrown up'
-            noExceptionThrown()
+        then: 'same exception is thrown up'
+            def thrownException = thrown(Exception)
+            assert thrownException == originalException
         where: 'following exceptions occur'
-            exception << [ AlreadyDefinedException.forSchemaSet('', '', null),
-                           new DuplicatedYangResourceException('', '', null) ]
+            originalException << [AlreadyDefinedException.forSchemaSet('', '', null),
+                                  new DuplicatedYangResourceException('', '', null) ]
     }
 
     def 'Model upgrade without using Module Set Tags (legacy) where the modules are in database.'() {
index cf9fb02..64c9539 100755 (executable)
@@ -71,8 +71,6 @@ import org.opendaylight.yangtools.yang.parser.api.YangSyntaxErrorException;
 import org.opendaylight.yangtools.yang.parser.rfc7950.repo.YangModelDependencyInfo;
 import org.springframework.dao.DataIntegrityViolationException;
 import org.springframework.retry.RetryContext;
-import org.springframework.retry.annotation.Backoff;
-import org.springframework.retry.annotation.Retryable;
 import org.springframework.retry.support.RetrySynchronizationManager;
 import org.springframework.stereotype.Component;
 
@@ -154,10 +152,6 @@ public class CpsModulePersistenceServiceImpl implements CpsModulePersistenceServ
 
     @Override
     @Transactional
-    // A retry is made to store the schema set if it fails because of duplicated yang resource exception that
-    // can occur in case of specific concurrent requests.
-    @Retryable(retryFor = DuplicatedYangResourceException.class, maxAttempts = 5, backoff =
-        @Backoff(random = true, delay = 200, maxDelay = 2000, multiplier = 2))
     public void storeSchemaSet(final String dataspaceName, final String schemaSetName,
         final Map<String, String> moduleReferenceNameToContentMap) {
         final DataspaceEntity dataspaceEntity = dataspaceRepository.getByName(dataspaceName);
@@ -189,12 +183,8 @@ public class CpsModulePersistenceServiceImpl implements CpsModulePersistenceServ
 
     @Override
     @Transactional
-    // A retry is made to store the schema set if it fails because of duplicated yang resource exception that
-    // can occur in case of specific concurrent requests.
-    @Retryable(retryFor = DuplicatedYangResourceException.class, maxAttempts = 5, backoff =
-        @Backoff(random = true, delay = 200, maxDelay = 2000, multiplier = 2))
     @Timed(value = "cps.module.persistence.schemaset.store",
-        description = "Time taken to store a schemaset (list of module references")
+        description = "Time taken to store a schemaset (list of module references)")
     public void storeSchemaSetFromModules(final String dataspaceName, final String schemaSetName,
                                           final Map<String, String> newModuleNameToContentMap,
                                           final Collection<ModuleReference> allModuleReferences) {
diff --git a/cps-ri/src/test/groovy/org/onap/cps/ri/CpsModulePersistenceServiceConcurrencySpec.groovy b/cps-ri/src/test/groovy/org/onap/cps/ri/CpsModulePersistenceServiceConcurrencySpec.groovy
deleted file mode 100644 (file)
index 28a615b..0000000
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- *  ============LICENSE_START=======================================================
- *  Copyright (C) 2022 Bell Canada.
- *  Modifications Copyright (C) 2021-2023 Nordix Foundation.
- *  ================================================================================
- *  Licensed under the Apache License, Version 2.0 (the 'License');
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *        http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an 'AS IS' BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- *  SPDX-License-Identifier: Apache-2.0
- *  ============LICENSE_END=========================================================
- */
-package org.onap.cps.ri
-
-import org.hibernate.exception.ConstraintViolationException
-import org.onap.cps.ri.models.DataspaceEntity
-import org.onap.cps.ri.models.SchemaSetEntity
-import org.onap.cps.ri.repository.DataspaceRepository
-import org.onap.cps.ri.repository.ModuleReferenceRepository
-import org.onap.cps.ri.repository.SchemaSetRepository
-import org.onap.cps.ri.repository.YangResourceRepository
-import org.onap.cps.spi.CpsAdminPersistenceService
-import org.onap.cps.spi.CpsModulePersistenceService
-import org.onap.cps.api.exceptions.DuplicatedYangResourceException
-import org.onap.cps.api.model.ModuleReference
-import org.spockframework.spring.SpringBean
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.boot.test.context.SpringBootTest
-import org.springframework.dao.DataIntegrityViolationException
-import org.springframework.retry.annotation.EnableRetry
-import spock.lang.Specification
-
-import java.sql.SQLException
-
-@SpringBootTest(classes=[CpsModulePersistenceServiceImpl])
-@EnableRetry
-class CpsModulePersistenceServiceConcurrencySpec extends Specification {
-
-    @Autowired
-    CpsModulePersistenceService objectUnderTest
-
-    @SpringBean
-    DataspaceRepository dataspaceRepository = Mock()
-
-    @SpringBean
-    YangResourceRepository yangResourceRepository = Mock()
-
-    @SpringBean
-    SchemaSetRepository schemaSetRepository = Mock()
-
-    @SpringBean
-    CpsAdminPersistenceService cpsAdminPersistenceService = Mock()
-
-    @SpringBean
-    ModuleReferenceRepository moduleReferenceRepository = Mock()
-
-    def NEW_RESOURCE_NAME = 'some new resource'
-    def NEW_RESOURCE_CONTENT = 'module stores {\n' +
-            '    yang-version 1.1;\n' +
-            '    namespace "org:onap:ccsdk:sample";\n' +
-            '}'
-
-    def newYangResourcesNameToContentMap = [(NEW_RESOURCE_NAME):NEW_RESOURCE_CONTENT]
-
-    def yangResourceChecksum = 'b13faef573ed1374139d02c40d8ce09c80ea1dc70e63e464c1ed61568d48d539'
-
-    def yangResourceChecksumDbConstraint = 'yang_resource_checksum_key'
-
-    def sqlExceptionMessage = String.format('(checksum)=(%s)', yangResourceChecksum)
-
-    def checksumIntegrityException = new DataIntegrityViolationException("checksum integrity exception",
-                 new ConstraintViolationException('', new SQLException(sqlExceptionMessage), yangResourceChecksumDbConstraint))
-
-    def 'Store new schema set, maximum retries.'() {
-        given: 'no pre-existing schemaset in database'
-            dataspaceRepository.getByName(_) >> new DataspaceEntity()
-            yangResourceRepository.findAllByChecksumIn(_) >> Collections.emptyList()
-        when: 'a new schemaset is stored'
-            objectUnderTest.storeSchemaSet('some dataspace', 'some new schema set', newYangResourcesNameToContentMap)
-        then: 'a duplicated yang resource exception is thrown '
-            thrown(DuplicatedYangResourceException)
-        and: 'the system will attempt to save the data 5 times (because checksum integrity exception is thrown each time)'
-            5 * yangResourceRepository.saveAll(_) >> { throw checksumIntegrityException }
-    }
-
-    def 'Store new schema set, succeed on third attempt.'() {
-        given: 'no pre-existing schemaset in database'
-            dataspaceRepository.getByName(_) >> new DataspaceEntity()
-            yangResourceRepository.findAllByChecksumIn(_) >> Collections.emptyList()
-        when: 'a new schemaset is stored'
-            objectUnderTest.storeSchemaSet('some dataspace', 'some new schema set', newYangResourcesNameToContentMap)
-        then: 'no exception is thrown '
-            noExceptionThrown()
-        and: 'the system will attempt to save the data 2 times with checksum integrity exception but then succeed'
-            2 * yangResourceRepository.saveAll(_) >> { throw checksumIntegrityException }
-            1 * yangResourceRepository.saveAll(_) >> []
-    }
-
-    def 'Store schema set using modules, maximum retries.'() {
-        given: 'map of new modules, a list of existing modules, module reference'
-            def mapOfNewModules = [newModule1: 'module newmodule { yang-version 1.1; revision "2021-10-12" { } }']
-            def moduleReferenceForExistingModule = new ModuleReference("test","2021-10-12")
-            def listOfExistingModulesModuleReference = [moduleReferenceForExistingModule]
-        and: 'no pre-existing schemaset in database'
-            dataspaceRepository.getByName(_) >> new DataspaceEntity()
-            yangResourceRepository.findAllByChecksumIn(_) >> Collections.emptyList()
-        when: 'a new schemaset is stored from a module'
-            objectUnderTest.storeSchemaSetFromModules('some dataspace', 'some new schema set' , mapOfNewModules, listOfExistingModulesModuleReference)
-        then: 'a duplicated yang resource exception is thrown '
-            thrown(DuplicatedYangResourceException)
-        and: 'the system will attempt to save the data 5 times (because checksum integrity exception is thrown each time)'
-            5 * yangResourceRepository.saveAll(_) >> { throw checksumIntegrityException }
-    }
-
-    def 'Store schema set using modules, succeed on third attempt.'() {
-        given: 'map of new modules, a list of existing modules, module reference'
-            def mapOfNewModules = [newModule1: 'module newmodule { yang-version 1.1; revision "2021-10-12" { } }']
-            def moduleReferenceForExistingModule = new ModuleReference("test","2021-10-12")
-            def listOfExistingModulesModuleReference = [moduleReferenceForExistingModule]
-        and: 'no pre-existing schemaset in database'
-            def dataspaceEntity = new DataspaceEntity()
-            dataspaceRepository.getByName(_) >> new DataspaceEntity()
-            yangResourceRepository.findAllByChecksumIn(_) >> Collections.emptyList()
-            yangResourceRepository.getResourceIdsByModuleReferences(_) >> []
-        and: 'can retrieve schemaset details after storing it'
-            def schemaSetEntity = new SchemaSetEntity()
-            schemaSetRepository.getByDataspaceAndName(dataspaceEntity, 'new schema set') >> schemaSetEntity
-        when: 'a new schemaset is stored from a module'
-            objectUnderTest.storeSchemaSetFromModules('some dataspace', 'new schema set' , mapOfNewModules, listOfExistingModulesModuleReference)
-        then: 'no exception is thrown '
-            noExceptionThrown()
-        and: 'the system will attempt to save the data 2 times with checksum integrity exception but then succeed'
-            2 * yangResourceRepository.saveAll(_) >> { throw checksumIntegrityException }
-            1 * yangResourceRepository.saveAll(_) >> []
-    }
-
-}
index f3cca80..2402c1b 100644 (file)
@@ -158,6 +158,8 @@ abstract class CpsIntegrationSpecBase extends Specification {
     static initialized = false
     def now = OffsetDateTime.now()
 
+    enum ModuleNameStrategy { UNIQUE, OVERLAPPING }
+
     def setup() {
         if (!initialized) {
             cpsDataspaceService.createDataspace(GENERAL_TEST_DATASPACE)
@@ -265,9 +267,14 @@ abstract class CpsIntegrationSpecBase extends Specification {
     }
 
     def registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(dmiPlugin, moduleSetTag, numberOfCmHandles, offset) {
+        registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(dmiPlugin, moduleSetTag, numberOfCmHandles, offset, ModuleNameStrategy.UNIQUE)
+    }
+
+    def registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(dmiPlugin, moduleSetTag, numberOfCmHandles, offset, ModuleNameStrategy moduleNameStrategy ) {
         def cmHandles = []
         def id = offset
-        def moduleReferences = (1..200).collect {  "${moduleSetTag}Module${it}" }
+        def modulePrefix = moduleNameStrategy.OVERLAPPING.equals(moduleNameStrategy) ? 'same' : moduleSetTag
+        def moduleReferences = (1..200).collect {  "${modulePrefix}Module${it}" }
         (1..numberOfCmHandles).each {
             def ncmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: "ch-${id}", moduleSetTag: moduleSetTag, alternateId: NO_ALTERNATE_ID)
             cmHandles.add(ncmpServiceCmHandle)
index 43db9b2..d1353b8 100644 (file)
 
 package org.onap.cps.integration.functional.ncmp
 
+import com.hazelcast.map.IMap
 import io.micrometer.core.instrument.MeterRegistry
-import spock.lang.Ignore
-
-import java.util.concurrent.Executors
-import java.util.concurrent.TimeUnit
 import org.onap.cps.integration.base.CpsIntegrationSpecBase
 import org.onap.cps.ncmp.impl.inventory.sync.ModuleSyncWatchdog
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.util.StopWatch
+import spock.lang.Ignore
 import spock.util.concurrent.PollingConditions
 
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
+
 class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase {
 
     ModuleSyncWatchdog objectUnderTest
@@ -38,11 +39,15 @@ class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase {
     @Autowired
     MeterRegistry meterRegistry
 
+    @Autowired
+    IMap<String, Integer> cmHandlesByState
+
     def executorService = Executors.newFixedThreadPool(2)
     def PARALLEL_SYNC_SAMPLE_SIZE = 100
 
     def setup() {
         objectUnderTest = moduleSyncWatchdog
+        clearCmHandleStateGauge()
     }
 
     def cleanup() {
@@ -64,11 +69,10 @@ class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase {
             deregisterSequenceOfCmHandles(DMI1_URL, PARALLEL_SYNC_SAMPLE_SIZE, 1)
     }
 
-    @Ignore
     /** this test has intermittent failures, due to timeouts.
      *  Ignored but left here as it might be valuable to further optimization investigations.
      **/
-
+    @Ignore
     def 'CPS-2478 Highlight (and improve) module sync inefficiencies.'() {
         given: 'register 250 cm handles with module set tag cps-2478-A'
             def numberOfTags = 2
@@ -131,6 +135,26 @@ class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase {
             deregisterSequenceOfCmHandles(DMI1_URL, PARALLEL_SYNC_SAMPLE_SIZE, 1)
     }
 
+    def 'Schema sets with overlapping modules processed at the same time (DB constraint violation).'() {
+        given: 'register one batch (100) cm handles of tag A (with overlapping module names)'
+            registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'tagA', 100, 1, ModuleNameStrategy.OVERLAPPING)
+        and: 'register another batch cm handles of tag B (with overlapping module names)'
+            registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'tagB', 100, 101, ModuleNameStrategy.OVERLAPPING)
+        and: 'populate the work queue with both batches'
+            objectUnderTest.populateWorkQueueIfNeeded()
+        when: 'advised cm handles are processed on 2 threads (exactly one batch for each)'
+            objectUnderTest.moduleSyncAdvisedCmHandles()
+            executorService.execute(moduleSyncAdvisedCmHandles)
+        then: 'wait till all cm handles have been processed'
+            new PollingConditions().within(10, () -> {
+                assert getNumberOfProcessedCmHandles() == 200
+            })
+        then: 'at least 1 cm handle is in state LOCKED'
+            assert cmHandlesByState.get('lockedCmHandlesCount') >= 1
+        cleanup: 'remove all test cm handles'
+            deregisterSequenceOfCmHandles(DMI1_URL, 200, 1)
+    }
+
     def 'Populate module sync work queue on two parallel threads with a slight difference in start time.'() {
         // This test proved that the issue in CPS-2403 did not arise if the the queue was populated and given time to be distributed
         given: 'the queue is empty at the start'
@@ -169,4 +193,21 @@ class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase {
         }
     }
 
+    def moduleSyncAdvisedCmHandles = () -> {
+        try {
+            objectUnderTest.moduleSyncAdvisedCmHandles()
+        } catch (InterruptedException e) {
+            e.printStackTrace()
+        }
+    }
+
+    def clearCmHandleStateGauge() {
+        cmHandlesByState.keySet().each { cmHandlesByState.put(it, 0)}
+    }
+
+    def getNumberOfProcessedCmHandles() {
+        return cmHandlesByState.get('readyCmHandlesCount') + cmHandlesByState.get('lockedCmHandlesCount')
+    }
+
+
 }