2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2024 Nordix Foundation
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the 'License');
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an 'AS IS' BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.integration.functional.ncmp
23 import io.micrometer.core.instrument.MeterRegistry
24 import org.onap.cps.integration.base.CpsIntegrationSpecBase
25 import org.onap.cps.ncmp.impl.inventory.sync.ModuleSyncWatchdog
26 import org.springframework.beans.factory.annotation.Autowired
27 import spock.util.concurrent.PollingConditions
29 import java.util.concurrent.Executors
30 import java.util.concurrent.TimeUnit
32 class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase {
34 ModuleSyncWatchdog objectUnderTest
37 MeterRegistry meterRegistry
39 def executorService = Executors.newFixedThreadPool(2)
40 def PARALLEL_SYNC_SAMPLE_SIZE = 100
43 objectUnderTest = moduleSyncWatchdog
48 deregisterSequenceOfCmHandles(DMI1_URL, PARALLEL_SYNC_SAMPLE_SIZE, 1)
49 moduleSyncWorkQueue.clear()
51 executorService.shutdownNow()
55 def 'Watchdog is disabled for test.'() {
57 registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, PARALLEL_SYNC_SAMPLE_SIZE, 1)
58 when: 'wait a while but less then the initial delay of 10 minutes'
60 then: 'the work queue remains empty'
61 assert moduleSyncWorkQueue.isEmpty()
64 def 'CPS-2478 Highlight module sync inefficiencies.'() {
65 given: 'register 250 cm handles with module set tag cps-2478-A'
67 def cmHandlesPerTag = 250
68 def totalCmHandles = numberOfTags * cmHandlesPerTag
70 registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'cps-2478-A', cmHandlesPerTag, offset)
71 and: 'register anther 250 cm handles with module set tag cps-2478-B'
72 offset += cmHandlesPerTag
73 registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'cps-2478-B', cmHandlesPerTag, offset)
74 and: 'clear any previous instrumentation'
76 when: 'sync all advised cm handles'
77 objectUnderTest.moduleSyncAdvisedCmHandles()
79 then: 'retry until all schema sets are stored in db (1 schema set for each cm handle)'
80 def dbSchemaSetStorageTimer = meterRegistry.get('cps.module.persistence.schemaset.store').timer()
81 new PollingConditions().within(10, () -> {
82 objectUnderTest.moduleSyncAdvisedCmHandles()
84 assert dbSchemaSetStorageTimer.count() >= 500
86 then: 'wait till at least 5 batches of state updates are done (often more because of retries of locked cm handles)'
87 def dbStateUpdateTimer = meterRegistry.get('cps.ncmp.cmhandle.state.update.batch').timer()
88 new PollingConditions().within(10, () -> {
89 assert dbStateUpdateTimer.count() >= 5
91 and: 'the db has been queried for tags exactly 2 times.'
92 def dbModuleQueriesTimer = meterRegistry.get('cps.module.service.module.reference.query.by.attribute').timer()
93 assert dbModuleQueriesTimer.count() == 2
94 and: 'exactly 2 calls to DMI to get module references'
95 def dmiModuleRetrievalTimer = meterRegistry.get('cps.ncmp.inventory.module.references.from.dmi').timer()
96 assert dmiModuleRetrievalTimer.count() == 2
97 and: 'log the relevant instrumentation'
98 logInstrumentation(dbModuleQueriesTimer, 'query module references')
99 logInstrumentation(dmiModuleRetrievalTimer, 'get modules from DMI ')
100 logInstrumentation(dbSchemaSetStorageTimer, 'store schema sets ')
101 logInstrumentation(dbStateUpdateTimer, 'batch state updates ')
102 cleanup: 'remove all cm handles'
103 deregisterSequenceOfCmHandles(DMI1_URL, totalCmHandles, 1)
106 def 'Populate module sync work queue simultaneously on two parallel threads (CPS-2403).'() {
107 // This test failed before bug https://lf-onap.atlassian.net/browse/CPS-2403 was fixed
108 given: 'the queue is empty at the start'
109 registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, PARALLEL_SYNC_SAMPLE_SIZE, 1)
110 assert moduleSyncWorkQueue.isEmpty()
111 when: 'attempt to populate the queue on the main (test) and another parallel thread at the same time'
112 objectUnderTest.populateWorkQueueIfNeeded()
113 executorService.execute(populateQueueWithoutDelay)
114 and: 'wait a little (to give all threads time to complete their task)'
116 then: 'the queue size is exactly the sample size'
117 assert moduleSyncWorkQueue.size() == PARALLEL_SYNC_SAMPLE_SIZE
120 def 'Populate module sync work queue on two parallel threads with a slight difference in start time.'() {
121 // This test proved that the issue in CPS-2403 did not arise if the the queue was populated and given time to be distributed
122 given: 'the queue is empty at the start'
123 registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, PARALLEL_SYNC_SAMPLE_SIZE, 1)
124 assert moduleSyncWorkQueue.isEmpty()
125 when: 'attempt to populate the queue on the main (test) and another parallel thread a little later'
126 objectUnderTest.populateWorkQueueIfNeeded()
127 executorService.execute(populateQueueWithDelay)
128 and: 'wait a little (to give all threads time to complete their task)'
130 then: 'the queue size is exactly the sample size'
131 assert moduleSyncWorkQueue.size() == PARALLEL_SYNC_SAMPLE_SIZE
134 def logInstrumentation(timer, description) {
135 System.out.println('*** CPS-2478, ' + description + ' : ' + timer.count()+ ' times, total ' + timer.totalTime(TimeUnit.MILLISECONDS) + ' ms')
139 def populateQueueWithoutDelay = () -> {
141 objectUnderTest.populateWorkQueueIfNeeded()
142 } catch (InterruptedException e) {
147 def populateQueueWithDelay = () -> {
150 objectUnderTest.populateWorkQueueIfNeeded()
151 } catch (InterruptedException e) {