2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2024-2025 Nordix Foundation
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the 'License');
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an 'AS IS' BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.integration.functional.ncmp
23 import com.hazelcast.map.IMap
24 import io.micrometer.core.instrument.MeterRegistry
25 import org.onap.cps.integration.base.CpsIntegrationSpecBase
26 import org.onap.cps.ncmp.impl.inventory.sync.ModuleSyncWatchdog
27 import org.springframework.beans.factory.annotation.Autowired
28 import org.springframework.util.StopWatch
29 import spock.lang.Ignore
30 import spock.util.concurrent.PollingConditions
32 import java.util.concurrent.Executors
33 import java.util.concurrent.TimeUnit
35 class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase {
37 ModuleSyncWatchdog objectUnderTest
40 MeterRegistry meterRegistry
43 IMap<String, Integer> cmHandlesByState
45 def executorService = Executors.newFixedThreadPool(2)
46 def PARALLEL_SYNC_SAMPLE_SIZE = 100
49 objectUnderTest = moduleSyncWatchdog
50 clearCmHandleStateGauge()
55 moduleSyncWorkQueue.clear()
57 executorService.shutdownNow()
61 def 'Watchdog is disabled for test.'() {
62 given: 'some cm handles are registered'
63 registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, PARALLEL_SYNC_SAMPLE_SIZE, 1)
64 when: 'wait a while but less then the initial delay of 10 minutes'
66 then: 'the work queue remains empty'
67 assert moduleSyncWorkQueue.isEmpty()
68 cleanup: 'remove advised cm handles'
69 deregisterSequenceOfCmHandles(DMI1_URL, PARALLEL_SYNC_SAMPLE_SIZE, 1)
72 /** this test has intermittent failures, due to timeouts.
73 * Ignored but left here as it might be valuable to further optimization investigations.
76 def 'CPS-2478 Highlight (and improve) module sync inefficiencies.'() {
77 given: 'register 250 cm handles with module set tag cps-2478-A'
79 def cmHandlesPerTag = 250
80 def totalCmHandles = numberOfTags * cmHandlesPerTag
82 def minimumBatches = totalCmHandles / 100
83 registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'cps-2478-A', cmHandlesPerTag, offset)
84 and: 'register anther 250 cm handles with module set tag cps-2478-B'
85 offset += cmHandlesPerTag
86 registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'cps-2478-B', cmHandlesPerTag, offset)
87 and: 'clear any previous instrumentation'
89 when: 'sync all advised cm handles'
90 objectUnderTest.moduleSyncAdvisedCmHandles()
92 then: 'retry until both schema sets are stored in db (1 schema set for each module set tag)'
93 def dbSchemaSetStorageTimer = meterRegistry.get('cps.module.persistence.schemaset.store').timer()
94 new PollingConditions().within(10, () -> {
95 objectUnderTest.moduleSyncAdvisedCmHandles()
97 assert dbSchemaSetStorageTimer.count() == 2
99 then: 'wait till at least 5 batches of state updates are done (often more because of retries of locked cm handles)'
100 def dbStateUpdateTimer = meterRegistry.get('cps.ncmp.cmhandle.state.update.batch').timer()
101 new PollingConditions().within(10, () -> {
102 assert dbStateUpdateTimer.count() >= minimumBatches
104 and: 'one call to DMI per module set tag to get module references (may be more due to parallel processing of batches)'
105 def dmiModuleRetrievalTimer = meterRegistry.get('cps.ncmp.inventory.module.references.from.dmi').timer()
106 assert dmiModuleRetrievalTimer.count() >= numberOfTags && dmiModuleRetrievalTimer.count() <= minimumBatches
108 and: 'log the relevant instrumentation'
109 logInstrumentation(dmiModuleRetrievalTimer, 'get modules from DMI ')
110 logInstrumentation(dbSchemaSetStorageTimer, 'store schema sets ')
111 logInstrumentation(dbStateUpdateTimer, 'batch state updates ')
112 cleanup: 'remove all test cm handles'
113 // To properly measure performance the sample-size should be increased to 20,000 cm handles or higher (10,000 per tag)
114 def stopWatch = new StopWatch()
116 deregisterSequenceOfCmHandles(DMI1_URL, totalCmHandles, 1)
117 cpsModuleService.deleteAllUnusedYangModuleData()
119 println "*** CPS-2478, Deletion of $totalCmHandles cm handles took ${stopWatch.getTotalTimeMillis()} milliseconds"
122 def 'Populate module sync work queue simultaneously on two parallel threads (CPS-2403).'() {
123 // This test failed before bug https://lf-onap.atlassian.net/browse/CPS-2403 was fixed
124 given: 'the queue is empty at the start'
125 registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, PARALLEL_SYNC_SAMPLE_SIZE, 1)
126 assert moduleSyncWorkQueue.isEmpty()
127 when: 'attempt to populate the queue on the main (test) and another parallel thread at the same time'
128 objectUnderTest.populateWorkQueueIfNeeded()
129 executorService.execute(populateQueueWithoutDelay)
130 and: 'wait a little (to give all threads time to complete their task)'
132 then: 'the queue size is exactly the sample size'
133 assert moduleSyncWorkQueue.size() == PARALLEL_SYNC_SAMPLE_SIZE
134 cleanup: 'remove all test cm handles'
135 deregisterSequenceOfCmHandles(DMI1_URL, PARALLEL_SYNC_SAMPLE_SIZE, 1)
138 def 'Schema sets with overlapping modules processed at the same time (DB constraint violation).'() {
139 given: 'register one batch (100) cm handles of tag A (with overlapping module names)'
140 registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'tagA', 100, 1, ModuleNameStrategy.OVERLAPPING)
141 and: 'register another batch cm handles of tag B (with overlapping module names)'
142 registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'tagB', 100, 101, ModuleNameStrategy.OVERLAPPING)
143 and: 'populate the work queue with both batches'
144 objectUnderTest.populateWorkQueueIfNeeded()
145 when: 'advised cm handles are processed on 2 threads (exactly one batch for each)'
146 objectUnderTest.moduleSyncAdvisedCmHandles()
147 executorService.execute(moduleSyncAdvisedCmHandles)
148 then: 'wait till all cm handles have been processed'
149 new PollingConditions().within(10, () -> {
150 assert getNumberOfProcessedCmHandles() == 200
152 then: 'at least 1 cm handle is in state LOCKED'
153 assert cmHandlesByState.get('lockedCmHandlesCount') >= 1
154 cleanup: 'remove all test cm handles'
155 deregisterSequenceOfCmHandles(DMI1_URL, 200, 1)
158 def 'Populate module sync work queue on two parallel threads with a slight difference in start time.'() {
159 // This test proved that the issue in CPS-2403 did not arise if the the queue was populated and given time to be distributed
160 given: 'the queue is empty at the start'
161 registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, PARALLEL_SYNC_SAMPLE_SIZE, 1)
162 assert moduleSyncWorkQueue.isEmpty()
163 when: 'attempt to populate the queue on the main (test) and another parallel thread a little later'
164 objectUnderTest.populateWorkQueueIfNeeded()
165 executorService.execute(populateQueueWithDelay)
166 and: 'wait a little (to give all threads time to complete their task)'
168 then: 'the queue size is exactly the sample size'
169 assert moduleSyncWorkQueue.size() == PARALLEL_SYNC_SAMPLE_SIZE
170 cleanup: 'remove all test cm handles'
171 deregisterSequenceOfCmHandles(DMI1_URL, PARALLEL_SYNC_SAMPLE_SIZE, 1)
174 def logInstrumentation(timer, description) {
175 println "*** CPS-2478, $description : Invoked ${timer.count()} times, Total Time: ${timer.totalTime(TimeUnit.MILLISECONDS)} ms, Mean Time: ${timer.mean(TimeUnit.MILLISECONDS)} ms"
179 def populateQueueWithoutDelay = () -> {
181 objectUnderTest.populateWorkQueueIfNeeded()
182 } catch (InterruptedException e) {
187 def populateQueueWithDelay = () -> {
190 objectUnderTest.populateWorkQueueIfNeeded()
191 } catch (InterruptedException e) {
196 def moduleSyncAdvisedCmHandles = () -> {
198 objectUnderTest.moduleSyncAdvisedCmHandles()
199 } catch (InterruptedException e) {
204 def clearCmHandleStateGauge() {
205 cmHandlesByState.keySet().each { cmHandlesByState.put(it, 0)}
208 def getNumberOfProcessedCmHandles() {
209 return cmHandlesByState.get('readyCmHandlesCount') + cmHandlesByState.get('lockedCmHandlesCount')