Performance Improvement: Watchdog Parallel execution with configuration 34/130534/3
authorsourabh_sourabh <sourabh.sourabh@est.tech>
Wed, 31 Aug 2022 10:58:09 +0000 (11:58 +0100)
committerToine Siebelink <toine.siebelink@est.tech>
Thu, 1 Sep 2022 08:18:17 +0000 (08:18 +0000)
- Introduced AsyncSyncExecutor to get task and execute it with
  configured number of parallel threads.
- Number of parallel thread can be configured from application.yml.
- AsyncTaskExecutorSpec is added
- Fixed existing grovvy test now async task would be submitted.

Issue-ID: CPS-1200
Change-Id: I58c0368b945c90e619c2acfc7458ba58de047484
Signed-off-by: ToineSiebelink <toine.siebelink@est.tech>
Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech>
cps-application/src/main/resources/application.yml
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutor.java [new file with mode: 0644]
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutorSpec.groovy [new file with mode: 0644]
cps-ncmp-service/src/test/resources/application.yml

index f7fa735..9b6f41e 100644 (file)
@@ -172,3 +172,7 @@ timers:
         sleep-time-ms: 300000\r
     cm-handle-data-sync:\r
         sleep-time-ms: 30000\r
+\r
+modules-sync-watchdog:\r
+    async-executor:\r
+        parallelism-level: 10
\ No newline at end of file
index 5e26650..597e2ba 100644 (file)
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler;
@@ -52,28 +53,35 @@ public class ModuleSyncTasks {
      * Perform module sync on a batch of cm handles.
      *
      * @param cmHandlesAsDataNodes a batch of Data nodes representing cm handles to perform module sync on
+     * @param batchCounter the number of batches currently being processed, will be decreased when task is finished
+     *                     or fails
      * @return completed future to handle post-processing
      */
-    public CompletableFuture<Void> performModuleSync(final Collection<DataNode> cmHandlesAsDataNodes) {
-        final Map<YangModelCmHandle, CmHandleState> cmHandelStatePerCmHandle = new HashMap<>();
-        for (final DataNode cmHandleAsDataNode : cmHandlesAsDataNodes) {
-            final String cmHandleId = String.valueOf(cmHandleAsDataNode.getLeaves().get("id"));
-            final YangModelCmHandle yangModelCmHandle =
-                YangDataConverter.convertCmHandleToYangModel(cmHandleAsDataNode, cmHandleId);
-            final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
-            try {
-                moduleSyncService.deleteSchemaSetIfExists(cmHandleId);
-                moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle);
-                cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.READY);
-            } catch (final Exception e) {
-                syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
-                    LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
-                setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason());
-                cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.LOCKED);
+    public CompletableFuture<Void> performModuleSync(final Collection<DataNode> cmHandlesAsDataNodes,
+                                                     final AtomicInteger batchCounter) {
+        try {
+            final Map<YangModelCmHandle, CmHandleState> cmHandelStatePerCmHandle = new HashMap<>();
+            for (final DataNode cmHandleAsDataNode : cmHandlesAsDataNodes) {
+                final String cmHandleId = String.valueOf(cmHandleAsDataNode.getLeaves().get("id"));
+                final YangModelCmHandle yangModelCmHandle =
+                        YangDataConverter.convertCmHandleToYangModel(cmHandleAsDataNode, cmHandleId);
+                final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
+                try {
+                    moduleSyncService.deleteSchemaSetIfExists(cmHandleId);
+                    moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle);
+                    cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.READY);
+                } catch (final Exception e) {
+                    syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
+                            LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
+                    setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason());
+                    cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.LOCKED);
+                }
+                log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
             }
-            log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
+            updateCmHandlesStateBatch(cmHandelStatePerCmHandle);
+        } finally {
+            batchCounter.getAndDecrement();
         }
-        updateCmHandlesStateBatch(cmHandelStatePerCmHandle);
         return COMPLETED_FUTURE;
     }
 
index 8074fe6..73954c3 100644 (file)
@@ -27,36 +27,50 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
+import org.onap.cps.ncmp.api.inventory.sync.executor.AsyncTaskExecutor;
 import org.onap.cps.spi.model.DataNode;
 import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
 
 @Slf4j
 @RequiredArgsConstructor
-@Component
+@Service
 public class ModuleSyncWatchdog {
 
     private final SyncUtils syncUtils;
     private final BlockingQueue<DataNode> moduleSyncWorkQueue;
     private final Map<String, Object> moduleSyncStartedOnCmHandles;
     private final ModuleSyncTasks moduleSyncTasks;
-
+    private final AsyncTaskExecutor asyncTaskExecutor;
     private static final int MODULE_SYNC_BATCH_SIZE = 100;
     private static final long PREVENT_CPU_BURN_WAIT_TIME_MILLIS = 10;
     private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started";
+    private static final long ASYNC_TASK_TIMEOUT_IN_MILLISECONDS = TimeUnit.MINUTES.toMillis(5);
+    @Getter
+    private AtomicInteger batchCounter = new AtomicInteger(1);
 
     /**
-     * Execute Cm Handle poll which changes the cm handle state from 'ADVISED' to 'READY'.
+     * Check DB for any cm handles in 'ADVISED' state.
+     * Queue and create batches to process them asynchronously.
+     * This method will only finish when there are no more 'ADVISED' cm handles in the DB.
+     * This method wil be triggered on a configurable interval
      */
     @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:5000}")
     public void moduleSyncAdvisedCmHandles() {
         populateWorkQueueIfNeeded();
-        while (!moduleSyncWorkQueue.isEmpty()) {
+        final int asyncTaskParallelismLevel = asyncTaskExecutor.getAsyncTaskParallelismLevel();
+        while (!moduleSyncWorkQueue.isEmpty() && batchCounter.get() <= asyncTaskParallelismLevel) {
+            batchCounter.getAndIncrement();
             final Collection<DataNode> nextBatch = prepareNextBatch();
-            moduleSyncTasks.performModuleSync(nextBatch);
+            asyncTaskExecutor.executeTask(() ->
+                            moduleSyncTasks.performModuleSync(nextBatch, batchCounter),
+                    ASYNC_TASK_TIMEOUT_IN_MILLISECONDS
+            );
             preventBusyWait();
         }
     }
@@ -71,8 +85,6 @@ public class ModuleSyncWatchdog {
     }
 
     private void preventBusyWait() {
-        // This method isn't really needed until CPS-1200 Performance Improvement: Watchdog Parallel execution
-        // but leaving here to minimize impacts on this class for that Jira
         try {
             TimeUnit.MILLISECONDS.sleep(PREVENT_CPU_BURN_WAIT_TIME_MILLIS);
         } catch (final InterruptedException e) {
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutor.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutor.java
new file mode 100644 (file)
index 0000000..7b4d2cf
--- /dev/null
@@ -0,0 +1,77 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2022 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.inventory.sync.executor;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+import javax.annotation.PostConstruct;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+public class AsyncTaskExecutor {
+
+    @Value("${modules-sync-watchdog.async-executor.parallelism-level:10}")
+    @Getter
+    private int asyncTaskParallelismLevel;
+    private ExecutorService executorService;
+    private static final int DEFAULT_PARALLELISM_LEVEL = 10;
+
+    /**
+     *  Set up executor service with thread-pool size as per configuration parameter.
+     *  If modules-sync-watchdog.async-executor.parallelism-level not set a default of 10 threads will be applied.
+     */
+    @PostConstruct
+    public void setupThreadPool() {
+        executorService = Executors.newWorkStealingPool(
+                asyncTaskParallelismLevel == 0 ? DEFAULT_PARALLELISM_LEVEL : asyncTaskParallelismLevel);
+    }
+
+    /**
+     * Execute supplied task asynchronously.
+     *
+     * @param taskSupplier    functional method is get() task need to executed asynchronously
+     * @param timeOutInMillis the task timeout value in milliseconds
+     */
+    public void executeTask(final Supplier<Object> taskSupplier, final long timeOutInMillis) {
+        CompletableFuture.supplyAsync(taskSupplier::get, executorService)
+                .orTimeout(timeOutInMillis, MILLISECONDS)
+                .whenCompleteAsync(this::handleTaskCompletion);
+    }
+
+    private void handleTaskCompletion(final Object response, final Throwable throwable) {
+        if (throwable != null) {
+            if (throwable instanceof TimeoutException) {
+                log.warn("Async task didn't completed within the required time.");
+            } else {
+                log.debug("Watchdog async batch failed. caused by : {}", throwable.getMessage());
+            }
+        }
+    }
+}
index 291ba96..a233996 100644 (file)
@@ -30,6 +30,7 @@ import org.onap.cps.ncmp.api.inventory.InventoryPersistence
 import org.onap.cps.ncmp.api.inventory.LockReasonCategory
 import org.onap.cps.spi.model.DataNode
 import spock.lang.Specification
+import java.util.concurrent.atomic.AtomicInteger
 
 class ModuleSyncTasksSpec extends Specification {
 
@@ -41,6 +42,8 @@ class ModuleSyncTasksSpec extends Specification {
 
     def mockLcmEventsCmHandleStateHandler = Mock(LcmEventsCmHandleStateHandler)
 
+    def batchCount = new AtomicInteger(5)
+
     def objectUnderTest = new ModuleSyncTasks(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService, mockLcmEventsCmHandleStateHandler)
 
     def 'Module Sync ADVISED cm handles.'() {
@@ -50,15 +53,17 @@ class ModuleSyncTasksSpec extends Specification {
         and: 'the inventory persistence cm handle returns a ADVISED state for the any handle'
             mockInventoryPersistence.getCmHandleState(_) >> new CompositeState(cmHandleState: CmHandleState.ADVISED)
         when: 'module sync poll is executed'
-            objectUnderTest.performModuleSync([cmHandle1, cmHandle2])
+            objectUnderTest.performModuleSync([cmHandle1, cmHandle2], batchCount)
         then: 'module sync service deletes schemas set of each cm handle if it already exists'
             1 * mockModuleSyncService.deleteSchemaSetIfExists('cm-handle-1')
             1 * mockModuleSyncService.deleteSchemaSetIfExists('cm-handle-2')
         and: 'module sync service is invoked for each cm handle'
-            1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args,'cm-handle-1') }
-            1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args,'cm-handle-2') }
+            1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args, 'cm-handle-1') }
+            1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args, 'cm-handle-2') }
         and: 'the state handler is called for the both cm handles'
             2 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.READY)
+        and: 'batch count is decremented by one'
+            assert batchCount.get() == 4
     }
 
     def 'Module Sync ADVISED cm handle with failure during sync.'() {
@@ -70,11 +75,13 @@ class ModuleSyncTasksSpec extends Specification {
         and: 'module sync service attempts to sync the cm handle and throws an exception'
             1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(*_) >> { throw new Exception('some exception') }
         when: 'module sync is executed'
-            objectUnderTest.performModuleSync([cmHandle])
+            objectUnderTest.performModuleSync([cmHandle], batchCount)
         then: 'update lock reason, details and attempts is invoked'
-            1 * mockSyncUtils.updateLockReasonDetailsAndAttempts(cmHandleState, LockReasonCategory.LOCKED_MODULE_SYNC_FAILED ,'some exception')
+            1 * mockSyncUtils.updateLockReasonDetailsAndAttempts(cmHandleState, LockReasonCategory.LOCKED_MODULE_SYNC_FAILED'some exception')
         and: 'the state handler is called to update the state to LOCKED'
             1 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.LOCKED)
+        and: 'batch count is decremented by one'
+            assert batchCount.get() == 4
     }
 
     def 'Reset failed CM Handles #scenario.'() {
@@ -90,14 +97,14 @@ class ModuleSyncTasksSpec extends Specification {
         then: 'updated to state "ADVISED" from "READY" is called as often as there are cm handles ready for retry'
             expectedNumberOfInvocationsToSaveCmHandleState * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.ADVISED)
         where:
-            scenario                        | isReadyForRetry         || expectedNumberOfInvocationsToSaveCmHandleState
-            'retry locked cm handle once'   | [true, false]           || 1
-            'retry locked cm handle twice'  | [true, true]            || 2
-            'do not retry locked cm handle' | [false, false]          || 0
+            scenario                        | isReadyForRetry || expectedNumberOfInvocationsToSaveCmHandleState
+            'retry locked cm handle once'   | [true, false]   || 1
+            'retry locked cm handle twice'  | [true, true]    || 2
+            'do not retry locked cm handle' | [false, false]  || 0
     }
 
     def advisedCmHandleAsDataNode(cmHandleId) {
-        return new DataNode(anchorName:cmHandleId, leaves:['id':cmHandleId, 'cm-handle-state':'ADVISED'])
+        return new DataNode(anchorName: cmHandleId, leaves: ['id': cmHandleId, 'cm-handle-state': 'ADVISED'])
     }
 
     def assertYamgModelCmHandleArgument(args, expectedCmHandleId) {
index 43f492d..e5240c0 100644 (file)
@@ -22,7 +22,7 @@
 package org.onap.cps.ncmp.api.inventory.sync
 
 import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
-
+import org.onap.cps.ncmp.api.inventory.sync.executor.AsyncTaskExecutor
 import java.util.concurrent.ArrayBlockingQueue
 import java.util.concurrent.BlockingQueue
 import org.onap.cps.spi.model.DataNode
@@ -34,28 +34,38 @@ class ModuleSyncWatchdogSpec extends Specification {
 
     def static testQueueCapacity = 50 + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE
 
-    BlockingQueue<DataNode> moduleSyncWorkQueue = new ArrayBlockingQueue(testQueueCapacity)
+    def moduleSyncWorkQueue = new ArrayBlockingQueue(testQueueCapacity)
 
     def moduleSyncStartedOnCmHandles = [:]
 
     def mockModuleSyncTasks = Mock(ModuleSyncTasks)
 
-    def objectUnderTest = new ModuleSyncWatchdog(mockSyncUtils, moduleSyncWorkQueue , moduleSyncStartedOnCmHandles, mockModuleSyncTasks)
+    def spiedAsyncTaskExecutor = Spy(AsyncTaskExecutor)
+
+    def objectUnderTest = new ModuleSyncWatchdog(mockSyncUtils, moduleSyncWorkQueue , moduleSyncStartedOnCmHandles,
+            mockModuleSyncTasks, spiedAsyncTaskExecutor)
+
+    void setup() {
+        spiedAsyncTaskExecutor.setupThreadPool();
+    }
 
-    def 'Module sync #scenario , #numberOfAdvisedCmHandles advised cm handles.'() {
+    def 'Module sync advised cm handles with #scenario.'() {
         given: 'sync utilities returns #numberOfAdvisedCmHandles advised cm handles'
             mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(numberOfAdvisedCmHandles)
+        and: 'the executor has #parallelismLevel available threads'
+            spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> parallelismLevel
         when: ' module sync is started'
             objectUnderTest.moduleSyncAdvisedCmHandles()
         then: 'it performs #expectedNumberOfTaskExecutions tasks'
-            expectedNumberOfTaskExecutions * mockModuleSyncTasks.performModuleSync(_)
-        where:
-            scenario              |  numberOfAdvisedCmHandles                                         || expectedNumberOfTaskExecutions
-            'less then 1 batch'   | 1                                                                 || 1
-            'exactly 1 batch'     | ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE                         || 1
-            '2 batches'           | 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE                     || 2
-            'queue capacity'      | testQueueCapacity                                                 || 3
-            'over queue capacity' | testQueueCapacity + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 3
+            expectedNumberOfTaskExecutions * spiedAsyncTaskExecutor.executeTask(*_)
+        where: ' the following parameter are used'
+            scenario              | parallelismLevel | numberOfAdvisedCmHandles                                          || expectedNumberOfTaskExecutions
+            'less then 1 batch'   | 9                | 1                                                                 || 1
+            'exactly 1 batch'     | 9                | ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE                         || 1
+            '2 batches'           | 9                | 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE                     || 2
+            'queue capacity'      | 9                | testQueueCapacity                                                 || 3
+            'over queue capacity' | 9                | testQueueCapacity + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 3
+            'not enough threads'  | 2                | testQueueCapacity                                                 || 2
     }
 
     def 'Reset failed cm handles.'() {
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutorSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutorSpec.groovy
new file mode 100644 (file)
index 0000000..ba1820e
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2022 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.inventory.sync.executor
+
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import spock.lang.Specification
+import java.util.concurrent.TimeoutException
+import java.util.function.Supplier
+
+@SpringBootTest(classes = AsyncTaskExecutor)
+class AsyncTaskExecutorSpec extends Specification {
+
+    @Autowired
+    AsyncTaskExecutor objectUnderTest
+    def mockTaskSupplier = Mock(Supplier<Object>)
+
+    def 'Parallelism level configuration.'() {
+        expect: 'Parallelism level is configured with the correct value'
+            assert objectUnderTest.getAsyncTaskParallelismLevel() == 3
+    }
+
+    def 'Task completion with #caseDescriptor.'() {
+        when: 'task completion is handled'
+            def irrelevantResponse = null
+            objectUnderTest.handleTaskCompletion(irrelevantResponse, exception);
+        then: 'any exception is swallowed by the task completion (logged)'
+            noExceptionThrown()
+        where: 'following cases are tested'
+            caseDescriptor         | exception
+            'no exception'         | null
+            'time out exception'   | new TimeoutException("time-out")
+            'unexpected exception' | new Exception("some exception")
+    }
+
+    def 'Task execution.'() {
+        when: 'a task is submitted for execution'
+            objectUnderTest.executeTask(() -> mockTaskSupplier, 0)
+        then: 'the task submission is successful'
+            noExceptionThrown()
+    }
+
+}
index c23926e..03d70c2 100644 (file)
@@ -23,3 +23,6 @@ dmi:
     api:
         base-path: dmi
 
+modules-sync-watchdog:
+    async-executor:
+        parallelism-level: 3
\ No newline at end of file