Remove multithreading from module sync watchdog 34/140134/1
authordanielhanrahan <daniel.hanrahan@est.tech>
Wed, 29 Jan 2025 18:35:15 +0000 (18:35 +0000)
committerdanielhanrahan <daniel.hanrahan@est.tech>
Tue, 4 Feb 2025 10:24:58 +0000 (10:24 +0000)
After introduction of module set tag improvements, there is no need
to multithreading in module sync. Performance impact is minimal.

Issue-ID: CPS-2165
Signed-off-by: danielhanrahan <daniel.hanrahan@est.tech>
Change-Id: I1557fc8348d39da3654a1b92944c6ad49fa8670d

cps-application/src/main/resources/application.yml
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java [deleted file]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java [deleted file]
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutorSpec.groovy [deleted file]
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy
cps-ncmp-service/src/test/resources/application.yml
integration-test/src/test/resources/application.yml

index 6b9c694..c10a26f 100644 (file)
@@ -247,10 +247,6 @@ ncmp:
         trust-level:
             dmi-availability-watchdog-ms: 30000
 
-    modules-sync-watchdog:
-        async-executor:
-            parallelism-level: 10
-
     model-loader:
         maximum-attempt-count: 20
 
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java
deleted file mode 100644 (file)
index 80bc4ab..0000000
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *  ============LICENSE_START=======================================================
- *  Copyright (C) 2022-2024 Nordix Foundation
- *  ================================================================================
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *        http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- *  SPDX-License-Identifier: Apache-2.0
- *  ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.inventory.sync;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
-import jakarta.annotation.PostConstruct;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeoutException;
-import java.util.function.Supplier;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Service;
-
-@Slf4j
-@Service
-public class AsyncTaskExecutor {
-
-    @Value("${ncmp.modules-sync-watchdog.async-executor.parallelism-level:10}")
-    @Getter
-    private int asyncTaskParallelismLevel;
-    private ExecutorService executorService;
-    private static final int DEFAULT_PARALLELISM_LEVEL = 10;
-
-    /**
-     *  Set up executor service with thread-pool size as per configuration parameter.
-     *  If modules-sync-watchdog.async-executor.parallelism-level not set a default of 10 threads will be applied.
-     */
-    @PostConstruct
-    public void setupThreadPool() {
-        executorService = Executors.newWorkStealingPool(
-                asyncTaskParallelismLevel == 0 ? DEFAULT_PARALLELISM_LEVEL : asyncTaskParallelismLevel);
-    }
-
-    /**
-     * Execute supplied task asynchronously.
-     *
-     * @param taskSupplier    functional method is get() task need to executed asynchronously
-     * @param timeOutInMillis the task timeout value in milliseconds
-     */
-    public void executeTask(final Supplier<Object> taskSupplier, final long timeOutInMillis) {
-        CompletableFuture.supplyAsync(taskSupplier::get, executorService)
-                .orTimeout(timeOutInMillis, MILLISECONDS)
-                .whenCompleteAsync(this::handleTaskCompletion);
-    }
-
-    private void handleTaskCompletion(final Object response, final Throwable throwable) {
-        if (throwable != null) {
-            if (throwable instanceof TimeoutException) {
-                log.error("Async task didn't complete within the required time.", throwable);
-            } else {
-                log.error("Watchdog async batch failed.", throwable);
-            }
-        }
-    }
-}
index f039cf3..b727e79 100644 (file)
@@ -24,8 +24,6 @@ import com.hazelcast.map.IMap;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicInteger;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.api.exceptions.DataNodeNotFoundException;
@@ -51,12 +49,8 @@ public class ModuleSyncTasks {
      * Perform module sync on a batch of cm handles.
      *
      * @param cmHandleIds                  a batch of cm handle ids to perform module sync on
-     * @param batchCounter                 the number of batches currently being processed, will be decreased when
-     *                                     task is finished or fails
-     * @return completed future to handle post-processing
      */
-    public CompletableFuture<Void> performModuleSync(final Collection<String> cmHandleIds,
-                                                     final AtomicInteger batchCounter) {
+    public void performModuleSync(final Collection<String> cmHandleIds) {
         final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = new HashMap<>(cmHandleIds.size());
         try {
             for (final String cmHandleId : cmHandleIds) {
@@ -74,11 +68,8 @@ public class ModuleSyncTasks {
                 }
             }
         } finally {
-            batchCounter.getAndDecrement();
             lcmEventsCmHandleStateHandler.updateCmHandleStateBatch(cmHandleStatePerCmHandle);
-            log.info("Processing module sync batch finished. {} batch(es) active.", batchCounter.get());
         }
-        return CompletableFuture.completedFuture(null);
     }
 
     /**
index 32e1c49..6eefedb 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2022-2024 Nordix Foundation
+ *  Copyright (C) 2022-2025 Nordix Foundation
  *  Modifications Copyright (C) 2022 Bell Canada
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
@@ -27,13 +27,9 @@ import com.hazelcast.map.IMap;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
-import org.onap.cps.ncmp.impl.utils.Sleeper;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
 
@@ -46,16 +42,10 @@ public class ModuleSyncWatchdog {
     private final BlockingQueue<String> moduleSyncWorkQueue;
     private final IMap<String, Object> moduleSyncStartedOnCmHandles;
     private final ModuleSyncTasks moduleSyncTasks;
-    private final AsyncTaskExecutor asyncTaskExecutor;
     private final IMap<String, String> cpsAndNcmpLock;
-    private final Sleeper sleeper;
 
     private static final int MODULE_SYNC_BATCH_SIZE = 100;
-    private static final long PREVENT_CPU_BURN_WAIT_TIME_MILLIS = 10;
     private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started";
-    private static final long ASYNC_TASK_TIMEOUT_IN_MILLISECONDS = TimeUnit.MINUTES.toMillis(5);
-    @Getter
-    private AtomicInteger batchCounter = new AtomicInteger(1);
 
     /**
      * Check DB for any cm handles in 'ADVISED' state.
@@ -69,18 +59,11 @@ public class ModuleSyncWatchdog {
         log.debug("Processing module sync watchdog waking up.");
         populateWorkQueueIfNeeded();
         while (!moduleSyncWorkQueue.isEmpty()) {
-            if (batchCounter.get() <= asyncTaskExecutor.getAsyncTaskParallelismLevel()) {
-                final Collection<String> nextBatch = prepareNextBatch();
-                log.info("Processing module sync batch of {}. {} batch(es) active.",
-                    nextBatch.size(), batchCounter.get());
-                if (!nextBatch.isEmpty()) {
-                    asyncTaskExecutor.executeTask(() ->
-                            moduleSyncTasks.performModuleSync(nextBatch, batchCounter),
-                        ASYNC_TASK_TIMEOUT_IN_MILLISECONDS);
-                    batchCounter.getAndIncrement();
-                }
-            } else {
-                preventBusyWait();
+            final Collection<String> nextBatch = prepareNextBatch();
+            if (!nextBatch.isEmpty()) {
+                log.info("Processing module sync batch of {}. 1 batch(es) active.", nextBatch.size());
+                moduleSyncTasks.performModuleSync(nextBatch);
+                log.info("Processing module sync batch finished. 0 batch(es) active.");
             }
         }
     }
@@ -153,13 +136,4 @@ public class ModuleSyncWatchdog {
         log.info("nextBatch size : {}", nextBatch.size());
         return nextBatch;
     }
-
-    private void preventBusyWait() {
-        try {
-            log.debug("Busy waiting now");
-            sleeper.haveALittleRest(PREVENT_CPU_BURN_WAIT_TIME_MILLIS);
-        } catch (final InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-    }
 }
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java
deleted file mode 100644 (file)
index 7a02fa0..0000000
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- *  ============LICENSE_START=======================================================
- *  Copyright (C) 2024 Nordix Foundation
- *  ================================================================================
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *        http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- *  SPDX-License-Identifier: Apache-2.0
- *  ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.utils;
-
-import java.util.concurrent.TimeUnit;
-import org.springframework.stereotype.Service;
-
-/**
- * This class is to extract out sleep functionality so the interrupted exception handling can
- * be covered with a test (e.g. using spy on Sleeper) and help to get to 100% code coverage.
- */
-@Service
-public class Sleeper {
-    public void haveALittleRest(final long timeInMillis) throws InterruptedException {
-        TimeUnit.MILLISECONDS.sleep(timeInMillis);
-    }
-}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutorSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutorSpec.groovy
deleted file mode 100644 (file)
index 751c97a..0000000
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *  ============LICENSE_START=======================================================
- *  Copyright (C) 2022-2023 Nordix Foundation
- *  ================================================================================
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *        http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- *  SPDX-License-Identifier: Apache-2.0
- *  ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.inventory.sync
-
-
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.boot.test.context.SpringBootTest
-import spock.lang.Specification
-
-import java.util.concurrent.TimeoutException
-import java.util.function.Supplier
-
-@SpringBootTest(classes = AsyncTaskExecutor)
-class AsyncTaskExecutorSpec extends Specification {
-
-    @Autowired
-    AsyncTaskExecutor objectUnderTest
-    def mockTaskSupplier = Mock(Supplier<Object>)
-
-    def 'Parallelism level configuration.'() {
-        expect: 'Parallelism level is configured with the correct value'
-            assert objectUnderTest.getAsyncTaskParallelismLevel() == 3
-    }
-
-    def 'Task completion with #caseDescriptor.'() {
-        when: 'task completion is handled'
-            def irrelevantResponse = null
-            objectUnderTest.handleTaskCompletion(irrelevantResponse, exception);
-        then: 'any exception is swallowed by the task completion (logged)'
-            noExceptionThrown()
-        where: 'following cases are tested'
-            caseDescriptor         | exception
-            'no exception'         | null
-            'time out exception'   | new TimeoutException("time-out")
-            'unexpected exception' | new Exception("some exception")
-    }
-
-    def 'Task execution.'() {
-        when: 'a task is submitted for execution'
-            objectUnderTest.executeTask(() -> mockTaskSupplier, 0)
-        then: 'the task submission is successful'
-            noExceptionThrown()
-    }
-
-}
index 92f4b38..98f3cc0 100644 (file)
@@ -39,8 +39,6 @@ import org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventsCmHandleStateHandler
 import org.slf4j.LoggerFactory
 import spock.lang.Specification
 
-import java.util.concurrent.atomic.AtomicInteger
-
 import static org.onap.cps.ncmp.api.inventory.models.LockReasonCategory.MODULE_SYNC_FAILED
 import static org.onap.cps.ncmp.api.inventory.models.LockReasonCategory.MODULE_UPGRADE
 import static org.onap.cps.ncmp.api.inventory.models.LockReasonCategory.MODULE_UPGRADE_FAILED
@@ -70,8 +68,6 @@ class ModuleSyncTasksSpec extends Specification {
             .getOrCreateHazelcastInstance(new Config('hazelcastInstanceName'))
             .getMap('mapInstanceName')
 
-    def batchCount = new AtomicInteger(5)
-
     def objectUnderTest = new ModuleSyncTasks(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService,
             mockLcmEventsCmHandleStateHandler, moduleSyncStartedOnCmHandles)
 
@@ -87,7 +83,7 @@ class ModuleSyncTasksSpec extends Specification {
             mockInventoryPersistence.getYangModelCmHandle('cm-handle-1') >> cmHandle1
             mockInventoryPersistence.getYangModelCmHandle('cm-handle-2') >> cmHandle2
         when: 'module sync poll is executed'
-            objectUnderTest.performModuleSync(['cm-handle-1', 'cm-handle-2'], batchCount)
+            objectUnderTest.performModuleSync(['cm-handle-1', 'cm-handle-2'])
         then: 'module sync service is invoked for each cm handle'
             1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assert args[0].id == 'cm-handle-1' }
             1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assert args[0].id == 'cm-handle-2' }
@@ -95,8 +91,6 @@ class ModuleSyncTasksSpec extends Specification {
             1 * mockLcmEventsCmHandleStateHandler.updateCmHandleStateBatch(_) >> { args ->
                 assertBatch(args, ['cm-handle-1', 'cm-handle-2'], CmHandleState.READY)
             }
-        and: 'batch count is decremented by one'
-            assert batchCount.get() == 4
     }
 
     def 'Handle CM handle failure during #scenario and log MODULE_UPGRADE lock reason'() {
@@ -108,15 +102,13 @@ class ModuleSyncTasksSpec extends Specification {
             mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { throw new Exception('some exception') }
             mockModuleSyncService.syncAndUpgradeSchemaSet(_) >> { throw new Exception('some exception') }
         when: 'module sync is executed'
-            objectUnderTest.performModuleSync(['cm-handle'], batchCount)
+            objectUnderTest.performModuleSync(['cm-handle'])
         then: 'lock reason is updated with number of attempts'
             1 * mockSyncUtils.updateLockReasonWithAttempts(_, expectedLockReasonCategory, 'some exception')
         and: 'the state handler is called to update the state to LOCKED'
             1 * mockLcmEventsCmHandleStateHandler.updateCmHandleStateBatch(_) >> { args ->
                 assertBatch(args, ['cm-handle'], CmHandleState.LOCKED)
             }
-        and: 'batch count is decremented by one'
-            assert batchCount.get() == 4
         where:
             scenario         | lockReasonCategory    | lockReasonDetails                              || expectedLockReasonCategory
             'module sync'    | MODULE_SYNC_FAILED    | 'some lock details'                            || MODULE_SYNC_FAILED
@@ -132,7 +124,7 @@ class ModuleSyncTasksSpec extends Specification {
         and: 'a cm handle in advised state'
             mockInventoryPersistence.getYangModelCmHandle('cm-handle-3') >> cmHandleByIdAndState('cm-handle-3', CmHandleState.ADVISED)
         when: 'module sync poll is executed'
-            objectUnderTest.performModuleSync(['cm-handle-1', 'cm-handle-2', 'cm-handle-3'], batchCount)
+            objectUnderTest.performModuleSync(['cm-handle-1', 'cm-handle-2', 'cm-handle-3'])
         then: 'no exception is thrown'
             noExceptionThrown()
         and: 'the deleted cm-handle did not sync'
@@ -176,7 +168,7 @@ class ModuleSyncTasksSpec extends Specification {
         and: 'entry in progress map for other cm handle'
             moduleSyncStartedOnCmHandles.put('other-cm-handle', 'started')
         when: 'module sync poll is executed'
-            objectUnderTest.performModuleSync(['cm-handle-1'], batchCount)
+            objectUnderTest.performModuleSync(['cm-handle-1'])
         then: 'module sync service is invoked for cm handle'
             1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assert args[0].id == 'cm-handle-1' }
         and: 'the entry for other cm handle is still in the progress map'
@@ -201,7 +193,7 @@ class ModuleSyncTasksSpec extends Specification {
             cmHandle.compositeState.setLockReason(CompositeState.LockReason.builder().lockReasonCategory(lockReasonCategory).build())
             mockInventoryPersistence.getYangModelCmHandle('cm-handle') >> cmHandle
         when: 'module sync is executed'
-            objectUnderTest.performModuleSync(['cm-handle'], batchCount)
+            objectUnderTest.performModuleSync(['cm-handle'])
         then: 'the module sync service should attempt to sync and upgrade the CM handle'
             1 * mockModuleSyncService.syncAndUpgradeSchemaSet(_) >> { args ->
                 assert args[0].id == 'cm-handle'
index a9b88c2..68aa6a1 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2022-2024 Nordix Foundation
+ *  Copyright (C) 2022-2025 Nordix Foundation
  *  Modifications Copyright (C) 2022 Bell Canada
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
 package org.onap.cps.ncmp.impl.inventory.sync
 
 import com.hazelcast.map.IMap
+import java.util.concurrent.ArrayBlockingQueue
 import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
-import org.onap.cps.ncmp.impl.utils.Sleeper
-import org.onap.cps.api.model.DataNode
 import spock.lang.Specification
 
-import java.util.concurrent.ArrayBlockingQueue
-import java.util.concurrent.locks.Lock
-
 class ModuleSyncWatchdogSpec extends Specification {
 
     def mockModuleOperationsUtils = Mock(ModuleOperationsUtils)
@@ -42,17 +38,9 @@ class ModuleSyncWatchdogSpec extends Specification {
 
     def mockModuleSyncTasks = Mock(ModuleSyncTasks)
 
-    def spiedAsyncTaskExecutor = Spy(AsyncTaskExecutor)
-
     def mockCpsAndNcmpLock = Mock(IMap<String,String>)
 
-    def spiedSleeper = Spy(Sleeper)
-
-    def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor, mockCpsAndNcmpLock, spiedSleeper)
-
-    void setup() {
-        spiedAsyncTaskExecutor.setupThreadPool()
-    }
+    def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, mockCpsAndNcmpLock)
 
     def 'Module sync advised cm handles with #scenario.'() {
         given: 'module sync utilities returns #numberOfAdvisedCmHandles advised cm handles'
@@ -61,12 +49,10 @@ class ModuleSyncWatchdogSpec extends Specification {
             mockModuleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> []
         and: 'the work queue can be locked'
             mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
-        and: 'the executor has enough available threads'
-            spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 3
         when: ' module sync is started'
             objectUnderTest.moduleSyncAdvisedCmHandles()
         then: 'it performs #expectedNumberOfTaskExecutions tasks'
-            expectedNumberOfTaskExecutions * spiedAsyncTaskExecutor.executeTask(*_)
+            expectedNumberOfTaskExecutions * mockModuleSyncTasks.performModuleSync(*_)
         and: 'the executing thread is unlocked'
             1 * mockCpsAndNcmpLock.unlock('workQueueLock')
         where: 'the following parameter are used'
@@ -84,12 +70,10 @@ class ModuleSyncWatchdogSpec extends Specification {
             mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
         and: 'the work queue can be locked'
             mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
-        and: 'the executor first has no threads but has one thread on the second attempt'
-            spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >>> [ 0, 1 ]
         when: ' module sync is started'
             objectUnderTest.moduleSyncAdvisedCmHandles()
         then: 'it performs one task'
-            1 * spiedAsyncTaskExecutor.executeTask(*_)
+            1 * mockModuleSyncTasks.performModuleSync(*_)
     }
 
     def 'Module sync advised cm handle already handled by other thread.'() {
@@ -97,27 +81,21 @@ class ModuleSyncWatchdogSpec extends Specification {
             mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
         and: 'the work queue can be locked'
             mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
-        and: 'the executor has a thread available'
-            spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 1
         and: 'the semaphore cache indicates the cm handle is already being processed'
             mockModuleSyncStartedOnCmHandles.putIfAbsent(*_) >> 'Started'
-        when: ' module sync is started'
+        when: 'module sync is started'
             objectUnderTest.moduleSyncAdvisedCmHandles()
         then: 'it does NOT execute a task to process the (empty) batch'
-            0 * spiedAsyncTaskExecutor.executeTask(*_)
+            0 * mockModuleSyncTasks.performModuleSync(*_)
     }
 
     def 'Module sync with previous cm handle(s) left in work queue.'() {
         given: 'there is still a cm handle in the queue'
             moduleSyncWorkQueue.offer('ch-1')
-        and: 'sync utilities returns many advise cm handles'
-            mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(500)
-        and: 'the executor has plenty threads available'
-            spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 10
-        when: ' module sync is started'
+        when: 'module sync is started'
             objectUnderTest.moduleSyncAdvisedCmHandles()
         then: 'it does executes only one task to process the remaining handle in the queue'
-            1 * spiedAsyncTaskExecutor.executeTask(*_)
+            1 * mockModuleSyncTasks.performModuleSync(*_)
     }
 
     def 'Reset failed cm handles.'() {
@@ -147,15 +125,6 @@ class ModuleSyncWatchdogSpec extends Specification {
             true    || false                   || 1
     }
 
-    def 'Sleeper gets interrupted.'() {
-        given: 'sleeper gets interrupted'
-            spiedSleeper.haveALittleRest(_) >> { throw new InterruptedException() }
-        when: 'the watchdog attempts to sleep to save cpu cycles'
-            objectUnderTest.preventBusyWait()
-        then: 'no exception is thrown'
-            noExceptionThrown()
-    }
-
     def createCmHandleIds(numberOfCmHandles) {
         return (numberOfCmHandles > 0) ? (1..numberOfCmHandles).collect { 'ch-'+it } : []
     }
index 12db639..3276ceb 100644 (file)
@@ -77,10 +77,6 @@ ncmp:
         trust-level:
             dmi-availability-watchdog-ms: 30000
 
-    modules-sync-watchdog:
-        async-executor:
-            parallelism-level: 3
-
     policy-executor:
         enabled: true
         defaultDecision: "some default decision"
index 30598df..8ede492 100644 (file)
@@ -189,10 +189,6 @@ ncmp:
     trust-level:
       dmi-availability-watchdog-ms: 30000
 
-  modules-sync-watchdog:
-    async-executor:
-      parallelism-level: 2
-
   model-loader:
     maximum-attempt-count: 20