/*
- * ============LICENSE_START=======================================================
+ * ============LICENSE_START=======================================================
* Copyright (C) 2022 Nordix Foundation
* Modifications Copyright (C) 2022 Bell Canada
* ================================================================================
package org.onap.cps.ncmp.api.inventory.sync;
import java.util.List;
+import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
private final ModuleSyncService moduleSyncService;
+ private final ConcurrentMap<String, Boolean> moduleSyncSemaphoreMap;
+
/**
* Execute Cm Handle poll which changes the cm handle state from 'ADVISED' to 'READY'.
*/
@Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:30000}")
public void executeAdvisedCmHandlePoll() {
- YangModelCmHandle advisedCmHandle = syncUtils.getAnAdvisedCmHandle();
- while (advisedCmHandle != null) {
+ syncUtils.getAdvisedCmHandles().forEach(advisedCmHandle -> {
final String cmHandleId = advisedCmHandle.getId();
- final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
- try {
- moduleSyncService.deleteSchemaSetIfExists(advisedCmHandle);
- moduleSyncService.syncAndCreateSchemaSetAndAnchor(advisedCmHandle);
- setCompositeStateToReadyWithInitialDataStoreSyncState().accept(compositeState);
- } catch (final Exception e) {
- setCompositeStateToLocked().accept(compositeState);
- syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
- LockReasonCategory.LOCKED_MISBEHAVING, e.getMessage());
+ if (hasPushedIntoSemaphoreMap(cmHandleId)) {
+ log.debug("executing module sync on {}", cmHandleId);
+ final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
+ try {
+ moduleSyncService.deleteSchemaSetIfExists(advisedCmHandle);
+ moduleSyncService.syncAndCreateSchemaSetAndAnchor(advisedCmHandle);
+ setCompositeStateToReadyWithInitialDataStoreSyncState().accept(compositeState);
+ updateModuleSyncSemaphoreMap(cmHandleId);
+ } catch (final Exception e) {
+ setCompositeStateToLocked().accept(compositeState);
+ syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
+ LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
+ }
+ inventoryPersistence.saveCmHandleState(cmHandleId, compositeState);
+ log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
+ } else {
+ log.debug("{} already processed by another instance", cmHandleId);
}
- inventoryPersistence.saveCmHandleState(cmHandleId, compositeState);
- log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
- advisedCmHandle = syncUtils.getAnAdvisedCmHandle();
- }
+ });
log.debug("No Cm-Handles currently found in an ADVISED state");
}
*/
@Scheduled(fixedDelayString = "${timers.locked-modules-sync.sleep-time-ms:300000}")
public void executeLockedCmHandlePoll() {
- final List<YangModelCmHandle> lockedMisbehavingCmHandles = syncUtils.getLockedMisbehavingYangModelCmHandles();
- for (final YangModelCmHandle moduleSyncFailedCmHandle : lockedMisbehavingCmHandles) {
- final CompositeState compositeState = moduleSyncFailedCmHandle.getCompositeState();
+ final List<YangModelCmHandle> lockedCmHandles = syncUtils.getModuleSyncFailedCmHandles();
+ for (final YangModelCmHandle lockedCmHandle : lockedCmHandles) {
+ final CompositeState compositeState = lockedCmHandle.getCompositeState();
final boolean isReadyForRetry = syncUtils.isReadyForRetry(compositeState);
if (isReadyForRetry) {
setCompositeStateToAdvisedAndRetainOldLockReasonDetails(compositeState);
- log.debug("Locked misbehaving cm handle {} is being recycled", moduleSyncFailedCmHandle.getId());
- inventoryPersistence.saveCmHandleState(moduleSyncFailedCmHandle.getId(), compositeState);
+ log.debug("Locked cm handle {} is being re-synced", lockedCmHandle.getId());
+ inventoryPersistence.saveCmHandleState(lockedCmHandle.getId(), compositeState);
}
}
}
private Consumer<CompositeState> setCompositeStateToReadyWithInitialDataStoreSyncState() {
return compositeState -> {
+ compositeState.setDataSyncEnabled(false);
compositeState.setCmHandleState(CmHandleState.READY);
- final CompositeState.Operational operational = CompositeState.Operational.builder()
- .dataStoreSyncState(DataStoreSyncState.UNSYNCHRONIZED)
- .lastSyncTime(CompositeState.nowInSyncTimeFormat())
- .build();
+ final CompositeState.Operational operational = getDataStoreSyncState();
final CompositeState.DataStores dataStores = CompositeState.DataStores.builder()
.operationalDataStore(operational)
.build();
.details(oldLockReasonDetails).build();
compositeState.setLockReason(lockReason);
}
+
+ private CompositeState.Operational getDataStoreSyncState() {
+ final DataStoreSyncState dataStoreSyncState = DataStoreSyncState.NONE_REQUESTED;
+ return CompositeState.Operational.builder().dataStoreSyncState(dataStoreSyncState).build();
+ }
+
+ private void updateModuleSyncSemaphoreMap(final String cmHandleId) {
+ moduleSyncSemaphoreMap.replace(cmHandleId, true);
+ }
+
+ private boolean hasPushedIntoSemaphoreMap(final String cmHandleId) {
+ return moduleSyncSemaphoreMap.putIfAbsent(cmHandleId, false) == null;
+ }
}