2  *  ============LICENSE_START=======================================================
 
   3  *  Copyright (C) 2022-2023 Nordix Foundation
 
   4  *  Modifications Copyright (C) 2022 Bell Canada
 
   5  *  ================================================================================
 
   6  *  Licensed under the Apache License, Version 2.0 (the "License");
 
   7  *  you may not use this file except in compliance with the License.
 
   8  *  You may obtain a copy of the License at
 
  10  *        http://www.apache.org/licenses/LICENSE-2.0
 
  12  *  Unless required by applicable law or agreed to in writing, software
 
  13  *  distributed under the License is distributed on an "AS IS" BASIS,
 
  14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  15  *  See the License for the specific language governing permissions and
 
  16  *  limitations under the License.
 
  18  *  SPDX-License-Identifier: Apache-2.0
 
  19  *  ============LICENSE_END=========================================================
 
  22 package org.onap.cps.ncmp.impl.inventory.sync
 
  24 import com.hazelcast.map.IMap
 
  25 import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
 
  26 import org.onap.cps.spi.model.DataNode
 
  27 import spock.lang.Specification
 
  29 import java.util.concurrent.ArrayBlockingQueue
 
  31 class ModuleSyncWatchdogSpec extends Specification {
 
  33     def mockSyncUtils = Mock(ModuleOperationsUtils)
 
  35     def static testQueueCapacity = 50 + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE
 
  37     def moduleSyncWorkQueue = new ArrayBlockingQueue(testQueueCapacity)
 
  39     def mockModuleSyncStartedOnCmHandles = Mock(IMap<String, Object>)
 
  41     def mockModuleSyncTasks = Mock(ModuleSyncTasks)
 
  43     def spiedAsyncTaskExecutor = Spy(AsyncTaskExecutor)
 
  45     def objectUnderTest = new ModuleSyncWatchdog(mockSyncUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor)
 
  48         spiedAsyncTaskExecutor.setupThreadPool()
 
  51     def 'Module sync advised cm handles with #scenario.'() {
 
  52         given: 'sync utilities returns #numberOfAdvisedCmHandles advised cm handles'
 
  53             mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(numberOfAdvisedCmHandles)
 
  54         and: 'the executor has enough available threads'
 
  55             spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 3
 
  56         when: ' module sync is started'
 
  57             objectUnderTest.moduleSyncAdvisedCmHandles()
 
  58         then: 'it performs #expectedNumberOfTaskExecutions tasks'
 
  59             expectedNumberOfTaskExecutions * spiedAsyncTaskExecutor.executeTask(*_)
 
  60         where: 'the following parameter are used'
 
  61             scenario              | numberOfAdvisedCmHandles                                          || expectedNumberOfTaskExecutions
 
  62             'less then 1 batch'   | 1                                                                 || 1
 
  63             'exactly 1 batch'     | ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE                         || 1
 
  64             '2 batches'           | 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE                     || 2
 
  65             'queue capacity'      | testQueueCapacity                                                 || 3
 
  66             'over queue capacity' | testQueueCapacity + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 3
 
  69     def 'Module sync advised cm handles starts with no available threads.'() {
 
  70         given: 'sync utilities returns a advise cm handles'
 
  71             mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(1)
 
  72         and: 'the executor first has no threads but has one thread on the second attempt'
 
  73             spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >>> [ 0, 1 ]
 
  74         when: ' module sync is started'
 
  75             objectUnderTest.moduleSyncAdvisedCmHandles()
 
  76         then: 'it performs one task'
 
  77             1 * spiedAsyncTaskExecutor.executeTask(*_)
 
  80     def 'Module sync advised cm handles already handled.'() {
 
  81         given: 'sync utilities returns a advise cm handles'
 
  82             mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(1)
 
  83         and: 'the executor has a thread available'
 
  84             spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 1
 
  85         and: 'the semaphore cache indicates the cm handle is already being processed'
 
  86             mockModuleSyncStartedOnCmHandles.putIfAbsent(*_) >> 'Started'
 
  87         when: ' module sync is started'
 
  88             objectUnderTest.moduleSyncAdvisedCmHandles()
 
  89         then: 'it does NOT execute a task to process the (empty) batch'
 
  90             0 * spiedAsyncTaskExecutor.executeTask(*_)
 
  93     def 'Module sync with previous cm handle(s) left in work queue.'() {
 
  94         given: 'there is still a cm handle in the queue'
 
  95             moduleSyncWorkQueue.offer(new DataNode())
 
  96         and: 'sync utilities returns many advise cm handles'
 
  97             mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(500)
 
  98         and: 'the executor has plenty threads available'
 
  99             spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 10
 
 100         when: ' module sync is started'
 
 101             objectUnderTest.moduleSyncAdvisedCmHandles()
 
 102         then: 'it does executes only one task to process the remaining handle in the queue'
 
 103             1 * spiedAsyncTaskExecutor.executeTask(*_)
 
 106     def 'Reset failed cm handles.'() {
 
 107         given: 'sync utilities returns failed cm handles'
 
 108             def failedCmHandles = [new YangModelCmHandle()]
 
 109             mockSyncUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> failedCmHandles
 
 110         when: 'reset failed cm handles is started'
 
 111             objectUnderTest.resetPreviouslyFailedCmHandles()
 
 112         then: 'it is delegated to the module sync task (service)'
 
 113             1 * mockModuleSyncTasks.resetFailedCmHandles(failedCmHandles)
 
 116     def createDataNodes(numberOfDataNodes) {
 
 118         (1..numberOfDataNodes).each {dataNodes.add(new DataNode())}