\r
import org.springframework.boot.SpringApplication;\r
import org.springframework.boot.autoconfigure.SpringBootApplication;\r
-import org.springframework.retry.annotation.EnableRetry;\r
\r
-@EnableRetry\r
@SpringBootApplication\r
public class Application {\r
public static void main(final String[] args) {\r
import org.onap.cps.api.CpsAnchorService;
import org.onap.cps.api.CpsDataService;
import org.onap.cps.api.CpsModuleService;
+import org.onap.cps.api.exceptions.AlreadyDefinedException;
+import org.onap.cps.api.exceptions.DuplicatedYangResourceException;
import org.onap.cps.api.model.ModuleReference;
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
import org.onap.cps.utils.ContentType;
final String moduleSetTag = yangModelCmHandle.getModuleSetTag();
final String schemaSetName = getSchemaSetName(cmHandleId, moduleSetTag);
syncAndCreateSchemaSet(yangModelCmHandle, schemaSetName);
- cpsAnchorService.createAnchor(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, schemaSetName, cmHandleId);
+ try {
+ cpsAnchorService.createAnchor(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, schemaSetName, cmHandleId);
+ } catch (final AlreadyDefinedException alreadyDefinedException) {
+ log.warn("Ignoring (anchor) already exist exception for {}. Exception details: ",
+ yangModelCmHandle.getId(), alreadyDefinedException);
+ }
}
/**
private void syncAndCreateSchemaSet(final YangModelCmHandle yangModelCmHandle, final String schemaSetName) {
if (isNewSchemaSet(schemaSetName)) {
final ModuleDelta moduleDelta = getModuleDelta(yangModelCmHandle);
- log.info("Creating Schema Set {} for CM Handle {}", schemaSetName, yangModelCmHandle.getId());
- cpsModuleService.createSchemaSetFromModules(
+ try {
+ log.info("Creating Schema Set {} for CM Handle {}", schemaSetName, yangModelCmHandle.getId());
+ cpsModuleService.createSchemaSetFromModules(
NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME,
schemaSetName,
moduleDelta.newModuleNameToContentMap,
moduleDelta.allModuleReferences
- );
- log.info("Successfully created Schema Set {} for CM Handle {}", schemaSetName, yangModelCmHandle.getId());
+ );
+ log.info("Successfully created Schema Set {} for CM Handle {}", schemaSetName,
+ yangModelCmHandle.getId());
+ } catch (final AlreadyDefinedException alreadyDefinedException) {
+ log.warn("Ignoring already exist (schema set) exception for {}. Exception details: ",
+ yangModelCmHandle.getId(), alreadyDefinedException);
+ } catch (final DuplicatedYangResourceException duplicatedYangResourceException) {
+ log.warn("Duplicate Yang Resource {} creation for {}. "
+ + "CM Handle will be LOCKED (for retry). Exception details: ",
+ duplicatedYangResourceException.getName(),
+ yangModelCmHandle.getId(),
+ duplicatedYangResourceException);
+ throw duplicatedYangResourceException;
+ }
}
}
compositeState.setLockReason(null);
return CmHandleState.READY;
} catch (final Exception e) {
- log.warn("Processing of {} failed,reason : {}.", yangModelCmHandle.getId(), e.getMessage());
+ log.warn("Processing of {} failed,reason : ", yangModelCmHandle.getId(), e);
final LockReasonCategory lockReasonCategory = inUpgrade
? LockReasonCategory.MODULE_UPGRADE_FAILED
: LockReasonCategory.MODULE_SYNC_FAILED;
'without' | ''
}
- def 'Attempt Sync models for a cm handle with existing schema set (#originalException).'() {
+ def 'Sync models for a cm handle with already defined exception upon schema set creation.'() {
given: 'a cm handle to be synced'
def yangModelCmHandle = createAdvisedCmHandle('existing tag')
and: 'dmi returns no new yang resources'
mockDmiModelOperations.getNewYangResourcesFromDmi(*_) >> [:]
and: 'already defined exception occurs when creating schema (existing)'
+ mockCpsModuleService.createSchemaSetFromModules(*_) >> { throw AlreadyDefinedException.forSchemaSet('', '', null) }
+ when: 'module sync is triggered'
+ objectUnderTest.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle)
+ then: 'the exception is ignored'
+ noExceptionThrown()
+ }
+
+ def 'Sync models for a cm handle with already defined exception upon anchor set creation.'() {
+ given: 'a cm handle to be synced'
+ def yangModelCmHandle = createAdvisedCmHandle('existing tag')
+ and: 'dmi returns no new yang resources'
+ mockDmiModelOperations.getNewYangResourcesFromDmi(*_) >> [:]
+ and: 'already defined exception occurs when creating schema (existing)'
+ mockCpsAnchorService.createAnchor(*_) >> { throw AlreadyDefinedException.forAnchor('', '', null) }
+ when: 'module sync is triggered'
+ objectUnderTest.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle)
+ then: 'the exception is ignored'
+ noExceptionThrown()
+ }
+
+ def 'Attempt Sync models for a cm handle with duplicate yang resources exception).'() {
+ given: 'a cm handle to be synced'
+ def yangModelCmHandle = createAdvisedCmHandle('existing tag')
+ and: 'dmi returns no new yang resources'
+ mockDmiModelOperations.getNewYangResourcesFromDmi(*_) >> [:]
+ and: 'duplicate yang resource exception occurs when creating schema'
+ def originalException = new DuplicatedYangResourceException('', '', null)
mockCpsModuleService.createSchemaSetFromModules(*_) >> { throw originalException }
when: 'module sync is triggered'
objectUnderTest.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle)
then: 'same exception is thrown up'
def thrownException = thrown(Exception)
assert thrownException == originalException
- where: 'following exceptions occur'
- originalException << [AlreadyDefinedException.forSchemaSet('', '', null),
- new DuplicatedYangResourceException('', '', null) ]
}
def 'Model upgrade without using Module Set Tags (legacy) where the modules are in database.'() {
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jetty</artifactId>
</dependency>
- <dependency>
- <groupId>org.springframework.retry</groupId>
- <artifactId>spring-retry</artifactId>
- </dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
<groupId>org.springframework.boot</groupId>\r
<artifactId>spring-boot-starter-validation</artifactId>\r
</dependency>\r
- <dependency>\r
- <groupId>org.springframework.retry</groupId>\r
- <artifactId>spring-retry</artifactId>\r
- </dependency>\r
<dependency>\r
<groupId>org.springframework</groupId>\r
<artifactId>spring-aspects</artifactId>\r
import org.opendaylight.yangtools.yang.parser.api.YangSyntaxErrorException;
import org.opendaylight.yangtools.yang.parser.rfc7950.repo.YangModelDependencyInfo;
import org.springframework.dao.DataIntegrityViolationException;
-import org.springframework.retry.RetryContext;
-import org.springframework.retry.support.RetrySynchronizationManager;
import org.springframework.stereotype.Component;
@Slf4j
yangResourceRepository.saveAll(newYangResourceEntities);
} catch (final DataIntegrityViolationException dataIntegrityViolationException) {
// Throw a CPS duplicated Yang resource exception if the cause of the error is a yang checksum
- // database constraint violation.
- // If it is not, then throw the original exception
+ // database constraint violation. If it is not, then throw the original exception
final Optional<DuplicatedYangResourceException> convertedException =
convertToDuplicatedYangResourceException(
dataIntegrityViolationException, newYangResourceEntities);
- convertedException.ifPresent(
- e -> {
- final RetryContext retryContext = RetrySynchronizationManager.getContext();
- int retryCount = retryContext == null ? 0 : retryContext.getRetryCount();
- log.warn("Cannot persist duplicated yang resource. System will attempt this method "
- + "up to 5 times. Current retry count : {}", ++retryCount, e);
- });
throw convertedException.isPresent() ? convertedException.get() : dataIntegrityViolationException;
}
}
deregisterSequenceOfCmHandles(DMI1_URL, PARALLEL_SYNC_SAMPLE_SIZE, 1)
}
- /** this test has intermittent failures, due to timeouts.
+ /** this test has intermittent failures, due to race conditions
* Ignored but left here as it might be valuable to further optimization investigations.
**/
@Ignore
def cmHandlesPerTag = 250
def totalCmHandles = numberOfTags * cmHandlesPerTag
def offset = 1
- def minimumBatches = totalCmHandles / 100
registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'cps-2478-A', cmHandlesPerTag, offset)
and: 'register anther 250 cm handles with module set tag cps-2478-B'
offset += cmHandlesPerTag
when: 'sync all advised cm handles'
objectUnderTest.moduleSyncAdvisedCmHandles()
Thread.sleep(100)
- then: 'retry until both schema sets are stored in db (1 schema set for each module set tag)'
- def dbSchemaSetStorageTimer = meterRegistry.get('cps.module.persistence.schemaset.store').timer()
- new PollingConditions().within(10, () -> {
- objectUnderTest.moduleSyncAdvisedCmHandles()
- Thread.sleep(100)
- assert dbSchemaSetStorageTimer.count() == 2
- })
- then: 'wait till at least 5 batches of state updates are done (often more because of retries of locked cm handles)'
- def dbStateUpdateTimer = meterRegistry.get('cps.ncmp.cmhandle.state.update.batch').timer()
+ then: 'Keep processing until there are no more LOCKED or ADVISED cm handles'
new PollingConditions().within(10, () -> {
- assert dbStateUpdateTimer.count() >= minimumBatches
+ def advised = cmHandlesByState.get('advisedCmHandlesCount')
+ def locked = cmHandlesByState.get('lockedCmHandlesCount')
+ if ( locked > 0 | advised > 0 ) {
+ println "CPS-2576 Need to retry ${locked} LOCKED / ${advised} ADVISED cm Handles"
+ objectUnderTest.moduleSyncAdvisedCmHandles()
+ Thread.sleep(100)
+ }
+ assert cmHandlesByState.get('lockedCmHandlesCount') + cmHandlesByState.get('advisedCmHandlesCount') == 0
})
- and: 'one call to DMI per module set tag to get module references (may be more due to parallel processing of batches)'
- def dmiModuleRetrievalTimer = meterRegistry.get('cps.ncmp.inventory.module.references.from.dmi').timer()
- assert dmiModuleRetrievalTimer.count() >= numberOfTags && dmiModuleRetrievalTimer.count() <= minimumBatches
-
and: 'log the relevant instrumentation'
+ def dmiModuleRetrievalTimer = meterRegistry.get('cps.ncmp.inventory.module.references.from.dmi').timer()
+ def dbSchemaSetStorageTimer = meterRegistry.get('cps.module.persistence.schemaset.store').timer()
+ def dbStateUpdateTimer = meterRegistry.get('cps.ncmp.cmhandle.state.update.batch').timer()
logInstrumentation(dmiModuleRetrievalTimer, 'get modules from DMI ')
logInstrumentation(dbSchemaSetStorageTimer, 'store schema sets ')
logInstrumentation(dbStateUpdateTimer, 'batch state updates ')
deregisterSequenceOfCmHandles(DMI1_URL, PARALLEL_SYNC_SAMPLE_SIZE, 1)
}
+ /** this test has intermittent failures, due to race conditions
+ * Ignored but left here as it might be valuable to further optimization investigations.
+ **/
+ @Ignore
def 'Schema sets with overlapping modules processed at the same time (DB constraint violation).'() {
given: 'register one batch (100) cm handles of tag A (with overlapping module names)'
registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'tagA', 100, 1, ModuleNameStrategy.OVERLAPPING)