43db9b208e10e84862f53fc4890e75cedd23a244
[cps.git] /
1 /*
2  *  ============LICENSE_START=======================================================
3  *  Copyright (C) 2024-2025 Nordix Foundation
4  *  ================================================================================
5  *  Licensed under the Apache License, Version 2.0 (the 'License');
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an 'AS IS' BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  *  SPDX-License-Identifier: Apache-2.0
18  *  ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.integration.functional.ncmp
22
23 import io.micrometer.core.instrument.MeterRegistry
24 import spock.lang.Ignore
25
26 import java.util.concurrent.Executors
27 import java.util.concurrent.TimeUnit
28 import org.onap.cps.integration.base.CpsIntegrationSpecBase
29 import org.onap.cps.ncmp.impl.inventory.sync.ModuleSyncWatchdog
30 import org.springframework.beans.factory.annotation.Autowired
31 import org.springframework.util.StopWatch
32 import spock.util.concurrent.PollingConditions
33
34 class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase {
35
36     ModuleSyncWatchdog objectUnderTest
37
38     @Autowired
39     MeterRegistry meterRegistry
40
41     def executorService = Executors.newFixedThreadPool(2)
42     def PARALLEL_SYNC_SAMPLE_SIZE = 100
43
44     def setup() {
45         objectUnderTest = moduleSyncWatchdog
46     }
47
48     def cleanup() {
49         try {
50             moduleSyncWorkQueue.clear()
51         } finally {
52             executorService.shutdownNow()
53         }
54     }
55
56     def 'Watchdog is disabled for test.'() {
57         given: 'some cm handles are registered'
58             registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, PARALLEL_SYNC_SAMPLE_SIZE, 1)
59         when: 'wait a while but less then the initial delay of 10 minutes'
60             Thread.sleep(3000)
61         then: 'the work queue remains empty'
62             assert moduleSyncWorkQueue.isEmpty()
63         cleanup: 'remove advised cm handles'
64             deregisterSequenceOfCmHandles(DMI1_URL, PARALLEL_SYNC_SAMPLE_SIZE, 1)
65     }
66
67     @Ignore
68     /** this test has intermittent failures, due to timeouts.
69      *  Ignored but left here as it might be valuable to further optimization investigations.
70      **/
71
72     def 'CPS-2478 Highlight (and improve) module sync inefficiencies.'() {
73         given: 'register 250 cm handles with module set tag cps-2478-A'
74             def numberOfTags = 2
75             def cmHandlesPerTag = 250
76             def totalCmHandles = numberOfTags * cmHandlesPerTag
77             def offset = 1
78             def minimumBatches = totalCmHandles / 100
79             registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'cps-2478-A', cmHandlesPerTag, offset)
80         and: 'register anther 250 cm handles with module set tag cps-2478-B'
81             offset += cmHandlesPerTag
82             registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'cps-2478-B', cmHandlesPerTag, offset)
83         and: 'clear any previous instrumentation'
84             meterRegistry.clear()
85         when: 'sync all advised cm handles'
86             objectUnderTest.moduleSyncAdvisedCmHandles()
87             Thread.sleep(100)
88         then: 'retry until both schema sets are stored in db (1 schema set for each module set tag)'
89             def dbSchemaSetStorageTimer = meterRegistry.get('cps.module.persistence.schemaset.store').timer()
90             new PollingConditions().within(10, () -> {
91                 objectUnderTest.moduleSyncAdvisedCmHandles()
92                 Thread.sleep(100)
93                 assert dbSchemaSetStorageTimer.count() == 2
94             })
95         then: 'wait till at least 5 batches of state updates are done (often more because of retries of locked cm handles)'
96             def dbStateUpdateTimer = meterRegistry.get('cps.ncmp.cmhandle.state.update.batch').timer()
97             new PollingConditions().within(10, () -> {
98                 assert dbStateUpdateTimer.count() >= minimumBatches
99             })
100         and: 'one call to DMI per module set tag to get module references (may be more due to parallel processing of batches)'
101             def dmiModuleRetrievalTimer = meterRegistry.get('cps.ncmp.inventory.module.references.from.dmi').timer()
102             assert dmiModuleRetrievalTimer.count() >= numberOfTags && dmiModuleRetrievalTimer.count() <= minimumBatches
103
104         and: 'log the relevant instrumentation'
105             logInstrumentation(dmiModuleRetrievalTimer, 'get modules from DMI   ')
106             logInstrumentation(dbSchemaSetStorageTimer, 'store schema sets      ')
107             logInstrumentation(dbStateUpdateTimer,      'batch state updates    ')
108         cleanup: 'remove all test cm handles'
109             // To properly measure performance the sample-size should be increased to 20,000 cm handles or higher (10,000 per tag)
110             def stopWatch = new StopWatch()
111             stopWatch.start()
112             deregisterSequenceOfCmHandles(DMI1_URL, totalCmHandles, 1)
113             cpsModuleService.deleteAllUnusedYangModuleData()
114             stopWatch.stop()
115             println "*** CPS-2478, Deletion of $totalCmHandles cm handles took ${stopWatch.getTotalTimeMillis()} milliseconds"
116     }
117
118     def 'Populate module sync work queue simultaneously on two parallel threads (CPS-2403).'() {
119         // This test failed before bug https://lf-onap.atlassian.net/browse/CPS-2403 was fixed
120         given: 'the queue is empty at the start'
121             registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, PARALLEL_SYNC_SAMPLE_SIZE, 1)
122             assert moduleSyncWorkQueue.isEmpty()
123         when: 'attempt to populate the queue on the main (test) and another parallel thread at the same time'
124             objectUnderTest.populateWorkQueueIfNeeded()
125             executorService.execute(populateQueueWithoutDelay)
126         and: 'wait a little (to give all threads time to complete their task)'
127             Thread.sleep(50)
128         then: 'the queue size is exactly the sample size'
129             assert moduleSyncWorkQueue.size() == PARALLEL_SYNC_SAMPLE_SIZE
130         cleanup: 'remove all test cm handles'
131             deregisterSequenceOfCmHandles(DMI1_URL, PARALLEL_SYNC_SAMPLE_SIZE, 1)
132     }
133
134     def 'Populate module sync work queue on two parallel threads with a slight difference in start time.'() {
135         // This test proved that the issue in CPS-2403 did not arise if the the queue was populated and given time to be distributed
136         given: 'the queue is empty at the start'
137             registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, PARALLEL_SYNC_SAMPLE_SIZE, 1)
138             assert moduleSyncWorkQueue.isEmpty()
139         when: 'attempt to populate the queue on the main (test) and another parallel thread a little later'
140             objectUnderTest.populateWorkQueueIfNeeded()
141             executorService.execute(populateQueueWithDelay)
142         and: 'wait a little (to give all threads time to complete their task)'
143             Thread.sleep(50)
144         then: 'the queue size is exactly the sample size'
145             assert moduleSyncWorkQueue.size() == PARALLEL_SYNC_SAMPLE_SIZE
146         cleanup: 'remove all test cm handles'
147             deregisterSequenceOfCmHandles(DMI1_URL, PARALLEL_SYNC_SAMPLE_SIZE, 1)
148     }
149
150     def logInstrumentation(timer, description) {
151         println "*** CPS-2478, $description : Invoked ${timer.count()} times, Total Time: ${timer.totalTime(TimeUnit.MILLISECONDS)} ms, Mean Time: ${timer.mean(TimeUnit.MILLISECONDS)} ms"
152         return true
153     }
154
155     def populateQueueWithoutDelay = () -> {
156         try {
157             objectUnderTest.populateWorkQueueIfNeeded()
158         } catch (InterruptedException e) {
159             e.printStackTrace()
160         }
161     }
162
163     def populateQueueWithDelay = () -> {
164         try {
165             Thread.sleep(10)
166             objectUnderTest.populateWorkQueueIfNeeded()
167         } catch (InterruptedException e) {
168             e.printStackTrace()
169         }
170     }
171
172 }