username: ${DB_USERNAME}\r
password: ${DB_PASSWORD}\r
driverClassName: org.postgresql.Driver\r
- initialization-mode: always\r
hikari:\r
minimumIdle: 5\r
maximumPoolSize: 80\r
- idleTimeout: 120000\r
- connectionTimeout: 300000\r
- leakDetectionThreshold: 300000\r
+ idleTimeout: 60000\r
+ connectionTimeout: 120000\r
+ leakDetectionThreshold: 2000\r
pool-name: CpsDatabasePool\r
\r
cache:\r
default-property-inclusion: NON_NULL\r
serialization:\r
FAIL_ON_EMPTY_BEANS: false\r
+ sql:\r
+ init:\r
+ mode: ALWAYS\r
app:\r
ncmp:\r
async-m2m:\r
} catch (final HttpStatusCodeException httpStatusCodeException) {
final String exceptionMessage = "Unable to " + operation.toString() + " resource data.";
throw new HttpClientRequestException(exceptionMessage, httpStatusCodeException.getResponseBodyAsString(),
- httpStatusCodeException.getRawStatusCode());
+ httpStatusCodeException.getRawStatusCode());
}
}
moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle);
cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.READY);
} catch (final Exception e) {
+ log.warn("Processing module sync batch failed.");
syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason());
lcmEventsCmHandleStateHandler.updateCmHandleStateBatch(cmHandelStatePerCmHandle);
} finally {
batchCounter.getAndDecrement();
+ log.info("Processing module sync batch finished. {} batch(es) active.", batchCounter.get());
}
return COMPLETED_FUTURE;
}
*/
@Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:5000}")
public void moduleSyncAdvisedCmHandles() {
+ log.info("Processing module sync watchdog waking up.");
populateWorkQueueIfNeeded();
final int asyncTaskParallelismLevel = asyncTaskExecutor.getAsyncTaskParallelismLevel();
- while (!moduleSyncWorkQueue.isEmpty() && batchCounter.get() <= asyncTaskParallelismLevel) {
- batchCounter.getAndIncrement();
- final Collection<DataNode> nextBatch = prepareNextBatch();
- asyncTaskExecutor.executeTask(() ->
- moduleSyncTasks.performModuleSync(nextBatch, batchCounter),
- ASYNC_TASK_TIMEOUT_IN_MILLISECONDS
- );
- preventBusyWait();
+ while (!moduleSyncWorkQueue.isEmpty()) {
+ if (batchCounter.get() <= asyncTaskParallelismLevel) {
+ final Collection<DataNode> nextBatch = prepareNextBatch();
+ log.debug("Processing module sync batch of {}. {} batch(es) active.",
+ nextBatch.size(), batchCounter.get());
+ asyncTaskExecutor.executeTask(() ->
+ moduleSyncTasks.performModuleSync(nextBatch, batchCounter),
+ ASYNC_TASK_TIMEOUT_IN_MILLISECONDS
+ );
+ batchCounter.getAndIncrement();
+ } else {
+ preventBusyWait();
+ }
}
}
*/
@Scheduled(fixedDelayString = "${timers.locked-modules-sync.sleep-time-ms:300000}")
public void resetPreviouslyFailedCmHandles() {
+ log.info("Processing module sync retry-watchdog waking up.");
final List<YangModelCmHandle> failedCmHandles = syncUtils.getModuleSyncFailedCmHandles();
moduleSyncTasks.resetFailedCmHandles(failedCmHandles);
}
private void populateWorkQueueIfNeeded() {
if (moduleSyncWorkQueue.isEmpty()) {
final List<DataNode> advisedCmHandles = syncUtils.getAdvisedCmHandles();
+ log.info("Processing module sync fetched {} advised cm handles from DB", advisedCmHandles.size());
for (final DataNode advisedCmHandle : advisedCmHandles) {
if (!moduleSyncWorkQueue.offer(advisedCmHandle)) {
log.warn("Unable to add cm handle {} to the work queue", advisedCmHandle.getLeaves().get("id"));
def 'Module sync advised cm handles with #scenario.'() {
given: 'sync utilities returns #numberOfAdvisedCmHandles advised cm handles'
mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(numberOfAdvisedCmHandles)
- and: 'the executor has #parallelismLevel available threads'
- spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> parallelismLevel
+ and: 'the executor has enough available threads'
+ spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 3
when: ' module sync is started'
objectUnderTest.moduleSyncAdvisedCmHandles()
then: 'it performs #expectedNumberOfTaskExecutions tasks'
expectedNumberOfTaskExecutions * spiedAsyncTaskExecutor.executeTask(*_)
where: ' the following parameter are used'
- scenario | parallelismLevel | numberOfAdvisedCmHandles || expectedNumberOfTaskExecutions
- 'less then 1 batch' | 9 | 1 || 1
- 'exactly 1 batch' | 9 | ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 1
- '2 batches' | 9 | 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 2
- 'queue capacity' | 9 | testQueueCapacity || 3
- 'over queue capacity' | 9 | testQueueCapacity + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 3
- 'not enough threads' | 2 | testQueueCapacity || 2
+ scenario | numberOfAdvisedCmHandles || expectedNumberOfTaskExecutions
+ 'less then 1 batch' | 1 || 1
+ 'exactly 1 batch' | ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 1
+ '2 batches' | 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 2
+ 'queue capacity' | testQueueCapacity || 3
+ 'over queue capacity' | testQueueCapacity + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 3
}
def 'Reset failed cm handles.'() {
package org.onap.cps.spi.repository;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.persistence.Query;
import javax.transaction.Transactional;
import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import org.onap.cps.cpspath.parser.CpsPathPrefixType;
import org.onap.cps.cpspath.parser.CpsPathQuery;
import org.onap.cps.spi.entities.FragmentEntity;
import org.onap.cps.utils.JsonObjectMapper;
@RequiredArgsConstructor
+@Slf4j
public class FragmentRepositoryCpsPathQueryImpl implements FragmentRepositoryCpsPathQuery {
public static final String SIMILAR_TO_ABSOLUTE_PATH_PREFIX = "%/";
addTextFunctionCondition(cpsPathQuery, sqlStringBuilder, queryParameters);
final Query query = entityManager.createNativeQuery(sqlStringBuilder.toString(), FragmentEntity.class);
setQueryParameters(query, queryParameters);
- return getFragmentEntitiesAsStream(query);
- }
-
- private List<FragmentEntity> getFragmentEntitiesAsStream(final Query query) {
- final List<FragmentEntity> fragmentEntities = new ArrayList<>();
- query.getResultStream().forEach(fragmentEntity -> {
- fragmentEntities.add((FragmentEntity) fragmentEntity);
- entityManager.detach(fragmentEntity);
- });
-
+ final List<FragmentEntity> fragmentEntities = query.getResultList();
+ log.debug("Fetched {} fragment entities by anchor and cps path.", fragmentEntities.size());
return fragmentEntities;
}