d1353b83a3c47594835067873aa2fdaaad473f07
[cps.git] /
1 /*
2  *  ============LICENSE_START=======================================================
3  *  Copyright (C) 2024-2025 Nordix Foundation
4  *  ================================================================================
5  *  Licensed under the Apache License, Version 2.0 (the 'License');
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an 'AS IS' BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  *  SPDX-License-Identifier: Apache-2.0
18  *  ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.integration.functional.ncmp
22
23 import com.hazelcast.map.IMap
24 import io.micrometer.core.instrument.MeterRegistry
25 import org.onap.cps.integration.base.CpsIntegrationSpecBase
26 import org.onap.cps.ncmp.impl.inventory.sync.ModuleSyncWatchdog
27 import org.springframework.beans.factory.annotation.Autowired
28 import org.springframework.util.StopWatch
29 import spock.lang.Ignore
30 import spock.util.concurrent.PollingConditions
31
32 import java.util.concurrent.Executors
33 import java.util.concurrent.TimeUnit
34
35 class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase {
36
37     ModuleSyncWatchdog objectUnderTest
38
39     @Autowired
40     MeterRegistry meterRegistry
41
42     @Autowired
43     IMap<String, Integer> cmHandlesByState
44
45     def executorService = Executors.newFixedThreadPool(2)
46     def PARALLEL_SYNC_SAMPLE_SIZE = 100
47
48     def setup() {
49         objectUnderTest = moduleSyncWatchdog
50         clearCmHandleStateGauge()
51     }
52
53     def cleanup() {
54         try {
55             moduleSyncWorkQueue.clear()
56         } finally {
57             executorService.shutdownNow()
58         }
59     }
60
61     def 'Watchdog is disabled for test.'() {
62         given: 'some cm handles are registered'
63             registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, PARALLEL_SYNC_SAMPLE_SIZE, 1)
64         when: 'wait a while but less then the initial delay of 10 minutes'
65             Thread.sleep(3000)
66         then: 'the work queue remains empty'
67             assert moduleSyncWorkQueue.isEmpty()
68         cleanup: 'remove advised cm handles'
69             deregisterSequenceOfCmHandles(DMI1_URL, PARALLEL_SYNC_SAMPLE_SIZE, 1)
70     }
71
72     /** this test has intermittent failures, due to timeouts.
73      *  Ignored but left here as it might be valuable to further optimization investigations.
74      **/
75     @Ignore
76     def 'CPS-2478 Highlight (and improve) module sync inefficiencies.'() {
77         given: 'register 250 cm handles with module set tag cps-2478-A'
78             def numberOfTags = 2
79             def cmHandlesPerTag = 250
80             def totalCmHandles = numberOfTags * cmHandlesPerTag
81             def offset = 1
82             def minimumBatches = totalCmHandles / 100
83             registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'cps-2478-A', cmHandlesPerTag, offset)
84         and: 'register anther 250 cm handles with module set tag cps-2478-B'
85             offset += cmHandlesPerTag
86             registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'cps-2478-B', cmHandlesPerTag, offset)
87         and: 'clear any previous instrumentation'
88             meterRegistry.clear()
89         when: 'sync all advised cm handles'
90             objectUnderTest.moduleSyncAdvisedCmHandles()
91             Thread.sleep(100)
92         then: 'retry until both schema sets are stored in db (1 schema set for each module set tag)'
93             def dbSchemaSetStorageTimer = meterRegistry.get('cps.module.persistence.schemaset.store').timer()
94             new PollingConditions().within(10, () -> {
95                 objectUnderTest.moduleSyncAdvisedCmHandles()
96                 Thread.sleep(100)
97                 assert dbSchemaSetStorageTimer.count() == 2
98             })
99         then: 'wait till at least 5 batches of state updates are done (often more because of retries of locked cm handles)'
100             def dbStateUpdateTimer = meterRegistry.get('cps.ncmp.cmhandle.state.update.batch').timer()
101             new PollingConditions().within(10, () -> {
102                 assert dbStateUpdateTimer.count() >= minimumBatches
103             })
104         and: 'one call to DMI per module set tag to get module references (may be more due to parallel processing of batches)'
105             def dmiModuleRetrievalTimer = meterRegistry.get('cps.ncmp.inventory.module.references.from.dmi').timer()
106             assert dmiModuleRetrievalTimer.count() >= numberOfTags && dmiModuleRetrievalTimer.count() <= minimumBatches
107
108         and: 'log the relevant instrumentation'
109             logInstrumentation(dmiModuleRetrievalTimer, 'get modules from DMI   ')
110             logInstrumentation(dbSchemaSetStorageTimer, 'store schema sets      ')
111             logInstrumentation(dbStateUpdateTimer,      'batch state updates    ')
112         cleanup: 'remove all test cm handles'
113             // To properly measure performance the sample-size should be increased to 20,000 cm handles or higher (10,000 per tag)
114             def stopWatch = new StopWatch()
115             stopWatch.start()
116             deregisterSequenceOfCmHandles(DMI1_URL, totalCmHandles, 1)
117             cpsModuleService.deleteAllUnusedYangModuleData()
118             stopWatch.stop()
119             println "*** CPS-2478, Deletion of $totalCmHandles cm handles took ${stopWatch.getTotalTimeMillis()} milliseconds"
120     }
121
122     def 'Populate module sync work queue simultaneously on two parallel threads (CPS-2403).'() {
123         // This test failed before bug https://lf-onap.atlassian.net/browse/CPS-2403 was fixed
124         given: 'the queue is empty at the start'
125             registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, PARALLEL_SYNC_SAMPLE_SIZE, 1)
126             assert moduleSyncWorkQueue.isEmpty()
127         when: 'attempt to populate the queue on the main (test) and another parallel thread at the same time'
128             objectUnderTest.populateWorkQueueIfNeeded()
129             executorService.execute(populateQueueWithoutDelay)
130         and: 'wait a little (to give all threads time to complete their task)'
131             Thread.sleep(50)
132         then: 'the queue size is exactly the sample size'
133             assert moduleSyncWorkQueue.size() == PARALLEL_SYNC_SAMPLE_SIZE
134         cleanup: 'remove all test cm handles'
135             deregisterSequenceOfCmHandles(DMI1_URL, PARALLEL_SYNC_SAMPLE_SIZE, 1)
136     }
137
138     def 'Schema sets with overlapping modules processed at the same time (DB constraint violation).'() {
139         given: 'register one batch (100) cm handles of tag A (with overlapping module names)'
140             registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'tagA', 100, 1, ModuleNameStrategy.OVERLAPPING)
141         and: 'register another batch cm handles of tag B (with overlapping module names)'
142             registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'tagB', 100, 101, ModuleNameStrategy.OVERLAPPING)
143         and: 'populate the work queue with both batches'
144             objectUnderTest.populateWorkQueueIfNeeded()
145         when: 'advised cm handles are processed on 2 threads (exactly one batch for each)'
146             objectUnderTest.moduleSyncAdvisedCmHandles()
147             executorService.execute(moduleSyncAdvisedCmHandles)
148         then: 'wait till all cm handles have been processed'
149             new PollingConditions().within(10, () -> {
150                 assert getNumberOfProcessedCmHandles() == 200
151             })
152         then: 'at least 1 cm handle is in state LOCKED'
153             assert cmHandlesByState.get('lockedCmHandlesCount') >= 1
154         cleanup: 'remove all test cm handles'
155             deregisterSequenceOfCmHandles(DMI1_URL, 200, 1)
156     }
157
158     def 'Populate module sync work queue on two parallel threads with a slight difference in start time.'() {
159         // This test proved that the issue in CPS-2403 did not arise if the the queue was populated and given time to be distributed
160         given: 'the queue is empty at the start'
161             registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, PARALLEL_SYNC_SAMPLE_SIZE, 1)
162             assert moduleSyncWorkQueue.isEmpty()
163         when: 'attempt to populate the queue on the main (test) and another parallel thread a little later'
164             objectUnderTest.populateWorkQueueIfNeeded()
165             executorService.execute(populateQueueWithDelay)
166         and: 'wait a little (to give all threads time to complete their task)'
167             Thread.sleep(50)
168         then: 'the queue size is exactly the sample size'
169             assert moduleSyncWorkQueue.size() == PARALLEL_SYNC_SAMPLE_SIZE
170         cleanup: 'remove all test cm handles'
171             deregisterSequenceOfCmHandles(DMI1_URL, PARALLEL_SYNC_SAMPLE_SIZE, 1)
172     }
173
174     def logInstrumentation(timer, description) {
175         println "*** CPS-2478, $description : Invoked ${timer.count()} times, Total Time: ${timer.totalTime(TimeUnit.MILLISECONDS)} ms, Mean Time: ${timer.mean(TimeUnit.MILLISECONDS)} ms"
176         return true
177     }
178
179     def populateQueueWithoutDelay = () -> {
180         try {
181             objectUnderTest.populateWorkQueueIfNeeded()
182         } catch (InterruptedException e) {
183             e.printStackTrace()
184         }
185     }
186
187     def populateQueueWithDelay = () -> {
188         try {
189             Thread.sleep(10)
190             objectUnderTest.populateWorkQueueIfNeeded()
191         } catch (InterruptedException e) {
192             e.printStackTrace()
193         }
194     }
195
196     def moduleSyncAdvisedCmHandles = () -> {
197         try {
198             objectUnderTest.moduleSyncAdvisedCmHandles()
199         } catch (InterruptedException e) {
200             e.printStackTrace()
201         }
202     }
203
204     def clearCmHandleStateGauge() {
205         cmHandlesByState.keySet().each { cmHandlesByState.put(it, 0)}
206     }
207
208     def getNumberOfProcessedCmHandles() {
209         return cmHandlesByState.get('readyCmHandlesCount') + cmHandlesByState.get('lockedCmHandlesCount')
210     }
211
212
213 }