/* * ============LICENSE_START======================================================= * Copyright (C) 2022 Nordix Foundation * Modifications Copyright (C) 2022 Bell Canada * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ package org.onap.cps.ncmp.api.inventory.sync; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle; import org.onap.cps.spi.model.DataNode; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @Slf4j @RequiredArgsConstructor @Component public class ModuleSyncWatchdog { private final SyncUtils syncUtils; private final BlockingQueue moduleSyncWorkQueue; private final Map moduleSyncStartedOnCmHandles; private final ModuleSyncTasks moduleSyncTasks; private static final int MODULE_SYNC_BATCH_SIZE = 100; private static final long PREVENT_CPU_BURN_WAIT_TIME_MILLIS = 10; private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started"; /** * Execute Cm Handle poll which changes the cm handle state from 'ADVISED' to 'READY'. */ @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:5000}") public void moduleSyncAdvisedCmHandles() { populateWorkQueueIfNeeded(); while (!moduleSyncWorkQueue.isEmpty()) { final Collection nextBatch = prepareNextBatch(); moduleSyncTasks.performModuleSync(nextBatch); preventBusyWait(); } } /** * Find any failed (locked) cm handles and change state back to 'ADVISED'. */ @Scheduled(fixedDelayString = "${timers.locked-modules-sync.sleep-time-ms:300000}") public void resetPreviouslyFailedCmHandles() { final List failedCmHandles = syncUtils.getModuleSyncFailedCmHandles(); moduleSyncTasks.resetFailedCmHandles(failedCmHandles); } private void preventBusyWait() { // This method isn't really needed until CPS-1200 Performance Improvement: Watchdog Parallel execution // but leaving here to minimize impacts on this class for that Jira try { TimeUnit.MILLISECONDS.sleep(PREVENT_CPU_BURN_WAIT_TIME_MILLIS); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); } } private void populateWorkQueueIfNeeded() { if (moduleSyncWorkQueue.isEmpty()) { final List advisedCmHandles = syncUtils.getAdvisedCmHandles(); for (final DataNode advisedCmHandle : advisedCmHandles) { if (!moduleSyncWorkQueue.offer(advisedCmHandle)) { log.warn("Unable to add cm handle {} to the work queue", advisedCmHandle.getLeaves().get("id")); } } } } private Collection prepareNextBatch() { final Collection nextBatchCandidates = new HashSet<>(MODULE_SYNC_BATCH_SIZE); final Collection nextBatch = new HashSet<>(MODULE_SYNC_BATCH_SIZE); moduleSyncWorkQueue.drainTo(nextBatchCandidates, MODULE_SYNC_BATCH_SIZE); log.debug("nextBatchCandidates size : {}", nextBatchCandidates.size()); for (final DataNode batchCandidate : nextBatchCandidates) { final String cmHandleId = String.valueOf(batchCandidate.getLeaves().get("id")); final boolean alreadyAddedToInProgressMap = VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP .equals(moduleSyncStartedOnCmHandles.putIfAbsent(cmHandleId, VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP)); if (alreadyAddedToInProgressMap) { log.debug("module sync for {} already in progress by other instance", cmHandleId); } else { nextBatch.add(batchCandidate); } } log.debug("nextBatch size : {}", nextBatch.size()); return nextBatch; } }