X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=cps-ncmp-service%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fcps%2Fncmp%2Fapi%2Finventory%2Fsync%2FModuleSyncWatchdog.java;h=f629b71d266e252570ac9357cdff3224c5cd961e;hb=5ced56253a01fadf5376bc5e1202a7ac7e136794;hp=6ec44197d22c3559255b77dc15956a51cfa42db2;hpb=d925a54f81d014ef279288469b6e64032c031929;p=cps.git diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java index 6ec44197d..f629b71d2 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java @@ -1,5 +1,5 @@ /* - * ============LICENSE_START======================================================= + * ============LICENSE_START======================================================= * Copyright (C) 2022 Nordix Foundation * Modifications Copyright (C) 2022 Bell Canada * ================================================================================ @@ -21,99 +21,114 @@ package org.onap.cps.ncmp.api.inventory.sync; +import com.hazelcast.map.IMap; +import java.util.Collection; +import java.util.HashSet; import java.util.List; -import java.util.function.Consumer; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.api.impl.config.embeddedcache.SynchronizationCacheConfig; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle; -import org.onap.cps.ncmp.api.inventory.CmHandleState; -import org.onap.cps.ncmp.api.inventory.CompositeState; -import org.onap.cps.ncmp.api.inventory.DataStoreSyncState; -import org.onap.cps.ncmp.api.inventory.InventoryPersistence; -import org.onap.cps.ncmp.api.inventory.LockReasonCategory; +import org.onap.cps.ncmp.api.inventory.sync.executor.AsyncTaskExecutor; +import org.onap.cps.spi.model.DataNode; import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; @Slf4j @RequiredArgsConstructor -@Component +@Service public class ModuleSyncWatchdog { - private final InventoryPersistence inventoryPersistence; - private final SyncUtils syncUtils; - - private final ModuleSyncService moduleSyncService; + private final BlockingQueue moduleSyncWorkQueue; + private final IMap moduleSyncStartedOnCmHandles; + private final ModuleSyncTasks moduleSyncTasks; + private final AsyncTaskExecutor asyncTaskExecutor; + private static final int MODULE_SYNC_BATCH_SIZE = 100; + private static final long PREVENT_CPU_BURN_WAIT_TIME_MILLIS = 10; + private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started"; + private static final long ASYNC_TASK_TIMEOUT_IN_MILLISECONDS = TimeUnit.MINUTES.toMillis(5); + @Getter + private AtomicInteger batchCounter = new AtomicInteger(1); /** - * Execute Cm Handle poll which changes the cm handle state from 'ADVISED' to 'READY'. + * Check DB for any cm handles in 'ADVISED' state. + * Queue and create batches to process them asynchronously. + * This method will only finish when there are no more 'ADVISED' cm handles in the DB. + * This method wil be triggered on a configurable interval */ - @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:30000}") - public void executeAdvisedCmHandlePoll() { - YangModelCmHandle advisedCmHandle = syncUtils.getAnAdvisedCmHandle(); - while (advisedCmHandle != null) { - final String cmHandleId = advisedCmHandle.getId(); - final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId); - try { - moduleSyncService.deleteSchemaSetIfExists(advisedCmHandle); - moduleSyncService.syncAndCreateSchemaSetAndAnchor(advisedCmHandle); - setCompositeStateToReadyWithInitialDataStoreSyncState().accept(compositeState); - } catch (final Exception e) { - setCompositeStateToLocked().accept(compositeState); - syncUtils.updateLockReasonDetailsAndAttempts(compositeState, - LockReasonCategory.LOCKED_MISBEHAVING, e.getMessage()); + @Scheduled(fixedDelayString = "${ncmp.timers.advised-modules-sync.sleep-time-ms:5000}") + public void moduleSyncAdvisedCmHandles() { + log.info("Processing module sync watchdog waking up."); + populateWorkQueueIfNeeded(); + final int asyncTaskParallelismLevel = asyncTaskExecutor.getAsyncTaskParallelismLevel(); + while (!moduleSyncWorkQueue.isEmpty()) { + if (batchCounter.get() <= asyncTaskParallelismLevel) { + final Collection nextBatch = prepareNextBatch(); + log.debug("Processing module sync batch of {}. {} batch(es) active.", + nextBatch.size(), batchCounter.get()); + asyncTaskExecutor.executeTask(() -> + moduleSyncTasks.performModuleSync(nextBatch, batchCounter), + ASYNC_TASK_TIMEOUT_IN_MILLISECONDS); + batchCounter.getAndIncrement(); + } else { + preventBusyWait(); } - inventoryPersistence.saveCmHandleState(cmHandleId, compositeState); - log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name()); - advisedCmHandle = syncUtils.getAnAdvisedCmHandle(); } - log.debug("No Cm-Handles currently found in an ADVISED state"); } /** - * Execute Cm Handle poll which changes the cm handle state from 'LOCKED' to 'ADVISED'. + * Find any failed (locked) cm handles and change state back to 'ADVISED'. */ - @Scheduled(fixedDelayString = "${timers.locked-modules-sync.sleep-time-ms:300000}") - public void executeLockedCmHandlePoll() { - final List lockedMisbehavingCmHandles = syncUtils.getLockedMisbehavingYangModelCmHandles(); - for (final YangModelCmHandle moduleSyncFailedCmHandle : lockedMisbehavingCmHandles) { - final CompositeState compositeState = moduleSyncFailedCmHandle.getCompositeState(); - final boolean isReadyForRetry = syncUtils.isReadyForRetry(compositeState); - if (isReadyForRetry) { - setCompositeStateToAdvisedAndRetainOldLockReasonDetails(compositeState); - log.debug("Locked misbehaving cm handle {} is being recycled", moduleSyncFailedCmHandle.getId()); - inventoryPersistence.saveCmHandleState(moduleSyncFailedCmHandle.getId(), compositeState); - } - } + @Scheduled(fixedDelayString = "${ncmp.timers.locked-modules-sync.sleep-time-ms:300000}") + public void resetPreviouslyFailedCmHandles() { + log.info("Processing module sync retry-watchdog waking up."); + final List failedCmHandles = syncUtils.getModuleSyncFailedCmHandles(); + moduleSyncTasks.resetFailedCmHandles(failedCmHandles); } - private Consumer setCompositeStateToLocked() { - return compositeState -> { - compositeState.setCmHandleState(CmHandleState.LOCKED); - compositeState.setLastUpdateTimeNow(); - }; + private void preventBusyWait() { + try { + TimeUnit.MILLISECONDS.sleep(PREVENT_CPU_BURN_WAIT_TIME_MILLIS); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } } - private Consumer setCompositeStateToReadyWithInitialDataStoreSyncState() { - return compositeState -> { - compositeState.setCmHandleState(CmHandleState.READY); - final CompositeState.Operational operational = CompositeState.Operational.builder() - .dataStoreSyncState(DataStoreSyncState.UNSYNCHRONIZED) - .lastSyncTime(CompositeState.nowInSyncTimeFormat()) - .build(); - final CompositeState.DataStores dataStores = CompositeState.DataStores.builder() - .operationalDataStore(operational) - .build(); - compositeState.setDataStores(dataStores); - }; + private void populateWorkQueueIfNeeded() { + if (moduleSyncWorkQueue.isEmpty()) { + final List advisedCmHandles = syncUtils.getAdvisedCmHandles(); + log.info("Processing module sync fetched {} advised cm handles from DB", advisedCmHandles.size()); + for (final DataNode advisedCmHandle : advisedCmHandles) { + if (!moduleSyncWorkQueue.offer(advisedCmHandle)) { + log.warn("Unable to add cm handle {} to the work queue", advisedCmHandle.getLeaves().get("id")); + } + } + } } - private void setCompositeStateToAdvisedAndRetainOldLockReasonDetails(final CompositeState compositeState) { - compositeState.setCmHandleState(CmHandleState.ADVISED); - compositeState.setLastUpdateTimeNow(); - final String oldLockReasonDetails = compositeState.getLockReason().getDetails(); - final CompositeState.LockReason lockReason = CompositeState.LockReason.builder() - .details(oldLockReasonDetails).build(); - compositeState.setLockReason(lockReason); + private Collection prepareNextBatch() { + final Collection nextBatchCandidates = new HashSet<>(MODULE_SYNC_BATCH_SIZE); + final Collection nextBatch = new HashSet<>(MODULE_SYNC_BATCH_SIZE); + moduleSyncWorkQueue.drainTo(nextBatchCandidates, MODULE_SYNC_BATCH_SIZE); + log.debug("nextBatchCandidates size : {}", nextBatchCandidates.size()); + for (final DataNode batchCandidate : nextBatchCandidates) { + final String cmHandleId = String.valueOf(batchCandidate.getLeaves().get("id")); + final boolean alreadyAddedToInProgressMap = VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP.equals( + moduleSyncStartedOnCmHandles.putIfAbsent(cmHandleId, VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP, + SynchronizationCacheConfig.MODULE_SYNC_STARTED_TTL_SECS, TimeUnit.SECONDS)); + if (alreadyAddedToInProgressMap) { + log.debug("module sync for {} already in progress by other instance", cmHandleId); + } else { + nextBatch.add(batchCandidate); + } + } + log.debug("nextBatch size : {}", nextBatch.size()); + return nextBatch; } + }