/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation
+ * Copyright (C) 2022-2023 Nordix Foundation
* Modifications Copyright (C) 2022 Bell Canada
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
package org.onap.cps.ncmp.api.inventory.sync;
+import com.hazelcast.map.IMap;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.api.impl.config.embeddedcache.SynchronizationCacheConfig;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
import org.onap.cps.ncmp.api.inventory.sync.executor.AsyncTaskExecutor;
import org.onap.cps.spi.model.DataNode;
private final SyncUtils syncUtils;
private final BlockingQueue<DataNode> moduleSyncWorkQueue;
- private final Map<String, Object> moduleSyncStartedOnCmHandles;
+ private final IMap<String, Object> moduleSyncStartedOnCmHandles;
private final ModuleSyncTasks moduleSyncTasks;
private final AsyncTaskExecutor asyncTaskExecutor;
private static final int MODULE_SYNC_BATCH_SIZE = 100;
nextBatch.size(), batchCounter.get());
asyncTaskExecutor.executeTask(() ->
moduleSyncTasks.performModuleSync(nextBatch, batchCounter),
- ASYNC_TASK_TIMEOUT_IN_MILLISECONDS
- );
+ ASYNC_TASK_TIMEOUT_IN_MILLISECONDS);
batchCounter.getAndIncrement();
} else {
preventBusyWait();
public void resetPreviouslyFailedCmHandles() {
log.info("Processing module sync retry-watchdog waking up.");
final List<YangModelCmHandle> failedCmHandles = syncUtils.getModuleSyncFailedCmHandles();
+ log.info("Retrying {} cmHandles", failedCmHandles.size());
moduleSyncTasks.resetFailedCmHandles(failedCmHandles);
}
private void preventBusyWait() {
try {
+ log.info("Busy waiting now");
TimeUnit.MILLISECONDS.sleep(PREVENT_CPU_BURN_WAIT_TIME_MILLIS);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Unable to add cm handle {} to the work queue", advisedCmHandle.getLeaves().get("id"));
}
}
+ log.info("Work Queue Size : {}", moduleSyncWorkQueue.size());
}
}
log.debug("nextBatchCandidates size : {}", nextBatchCandidates.size());
for (final DataNode batchCandidate : nextBatchCandidates) {
final String cmHandleId = String.valueOf(batchCandidate.getLeaves().get("id"));
- final boolean alreadyAddedToInProgressMap = VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP
- .equals(moduleSyncStartedOnCmHandles.putIfAbsent(cmHandleId, VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP));
+ final boolean alreadyAddedToInProgressMap = VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP.equals(
+ moduleSyncStartedOnCmHandles.putIfAbsent(cmHandleId, VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP,
+ SynchronizationCacheConfig.MODULE_SYNC_STARTED_TTL_SECS, TimeUnit.SECONDS));
if (alreadyAddedToInProgressMap) {
log.debug("module sync for {} already in progress by other instance", cmHandleId);
} else {