package org.onap.cps.ncmp.api.inventory.sync;
import java.util.List;
+import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.api.inventory.DataStoreSyncState;
import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
import org.onap.cps.ncmp.api.inventory.LockReasonCategory;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
private final ModuleSyncService moduleSyncService;
+ @Value("${data-sync.cache.enabled:false}")
+ private boolean isGlobalDataSyncCacheEnabled;
+
+ private final ConcurrentMap<String, Boolean> moduleSyncSemaphoreMap;
+
/**
* Execute Cm Handle poll which changes the cm handle state from 'ADVISED' to 'READY'.
*/
@Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:30000}")
public void executeAdvisedCmHandlePoll() {
- YangModelCmHandle advisedCmHandle = syncUtils.getAnAdvisedCmHandle();
- while (advisedCmHandle != null) {
+ syncUtils.getAdvisedCmHandles().forEach(advisedCmHandle -> {
final String cmHandleId = advisedCmHandle.getId();
- final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
- try {
- moduleSyncService.deleteSchemaSetIfExists(advisedCmHandle);
- moduleSyncService.syncAndCreateSchemaSetAndAnchor(advisedCmHandle);
- setCompositeStateToReadyWithInitialDataStoreSyncState().accept(compositeState);
- } catch (final Exception e) {
- setCompositeStateToLocked().accept(compositeState);
- syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
- LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
+ if (hasPushedIntoSemaphoreMap(cmHandleId)) {
+ log.debug("executing module sync on {}", cmHandleId);
+ final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
+ try {
+ moduleSyncService.deleteSchemaSetIfExists(advisedCmHandle);
+ moduleSyncService.syncAndCreateSchemaSetAndAnchor(advisedCmHandle);
+ setCompositeStateToReadyWithInitialDataStoreSyncState().accept(compositeState);
+ updateModuleSyncSemaphoreMap(cmHandleId);
+ } catch (final Exception e) {
+ setCompositeStateToLocked().accept(compositeState);
+ syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
+ LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
+ }
+ inventoryPersistence.saveCmHandleState(cmHandleId, compositeState);
+ log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
+ } else {
+ log.debug("{} already processed by another instance", cmHandleId);
}
- inventoryPersistence.saveCmHandleState(cmHandleId, compositeState);
- log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
- advisedCmHandle = syncUtils.getAnAdvisedCmHandle();
- }
+ });
log.debug("No Cm-Handles currently found in an ADVISED state");
}
final boolean isReadyForRetry = syncUtils.isReadyForRetry(compositeState);
if (isReadyForRetry) {
setCompositeStateToAdvisedAndRetainOldLockReasonDetails(compositeState);
- log.debug("Locked cm handle {} is being resynced", lockedCmHandle.getId());
+ log.debug("Locked cm handle {} is being re-synced", lockedCmHandle.getId());
inventoryPersistence.saveCmHandleState(lockedCmHandle.getId(), compositeState);
}
}
private Consumer<CompositeState> setCompositeStateToReadyWithInitialDataStoreSyncState() {
return compositeState -> {
+ compositeState.setDataSyncEnabled(isGlobalDataSyncCacheEnabled);
compositeState.setCmHandleState(CmHandleState.READY);
- final CompositeState.Operational operational = CompositeState.Operational.builder()
- .dataStoreSyncState(DataStoreSyncState.UNSYNCHRONIZED)
- .lastSyncTime(CompositeState.nowInSyncTimeFormat())
- .build();
+ final CompositeState.Operational operational = getDataStoreSyncState(compositeState.getDataSyncEnabled());
final CompositeState.DataStores dataStores = CompositeState.DataStores.builder()
.operationalDataStore(operational)
.build();
.details(oldLockReasonDetails).build();
compositeState.setLockReason(lockReason);
}
+
+ private CompositeState.Operational getDataStoreSyncState(final boolean dataSyncEnabled) {
+ final DataStoreSyncState dataStoreSyncState = dataSyncEnabled
+ ? DataStoreSyncState.UNSYNCHRONIZED : DataStoreSyncState.NONE_REQUESTED;
+ return CompositeState.Operational.builder().dataStoreSyncState(dataStoreSyncState).build();
+ }
+
+ private void updateModuleSyncSemaphoreMap(final String cmHandleId) {
+ moduleSyncSemaphoreMap.replace(cmHandleId, true);
+ }
+
+ private boolean hasPushedIntoSemaphoreMap(final String cmHandleId) {
+ return moduleSyncSemaphoreMap.putIfAbsent(cmHandleId, false) == null;
+ }
}