20fa546eba44ec03292f152dfc7e204c3a9cbf6f
[cps.git] /
1 /*
2  *  ============LICENSE_START=======================================================
3  *  Copyright (C) 2024 Nordix Foundation
4  *  ================================================================================
5  *  Licensed under the Apache License, Version 2.0 (the 'License');
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an 'AS IS' BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  *  SPDX-License-Identifier: Apache-2.0
18  *  ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.integration.functional.ncmp
22
23 import io.micrometer.core.instrument.MeterRegistry
24 import org.onap.cps.integration.base.CpsIntegrationSpecBase
25 import org.onap.cps.ncmp.impl.inventory.sync.ModuleSyncWatchdog
26 import org.springframework.beans.factory.annotation.Autowired
27 import spock.util.concurrent.PollingConditions
28
29 import java.util.concurrent.Executors
30 import java.util.concurrent.TimeUnit
31
32 class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase {
33
34     ModuleSyncWatchdog objectUnderTest
35
36     @Autowired
37     MeterRegistry meterRegistry
38
39     def executorService = Executors.newFixedThreadPool(2)
40     def PARALLEL_SYNC_SAMPLE_SIZE = 100
41
42     def setup() {
43         objectUnderTest = moduleSyncWatchdog
44     }
45
46     def cleanup() {
47         try {
48             deregisterSequenceOfCmHandles(DMI1_URL, PARALLEL_SYNC_SAMPLE_SIZE, 1)
49             moduleSyncWorkQueue.clear()
50         } finally {
51             executorService.shutdownNow()
52         }
53     }
54
55     def 'Watchdog is disabled for test.'() {
56         given:
57             registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, PARALLEL_SYNC_SAMPLE_SIZE, 1)
58         when: 'wait a while but less then the initial delay of 10 minutes'
59             Thread.sleep(3000)
60         then: 'the work queue remains empty'
61             assert moduleSyncWorkQueue.isEmpty()
62     }
63
64     def 'CPS-2478 Highlight module sync inefficiencies.'() {
65         given: 'register 250 cm handles with module set tag cps-2478-A'
66             def numberOfTags = 2
67             def cmHandlesPerTag = 250
68             def totalCmHandles = numberOfTags * cmHandlesPerTag
69             def offset = 1
70             registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'cps-2478-A', cmHandlesPerTag, offset)
71         and: 'register anther 250 cm handles with module set tag cps-2478-B'
72             offset += cmHandlesPerTag
73             registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'cps-2478-B', cmHandlesPerTag, offset)
74         and: 'clear any previous instrumentation'
75             meterRegistry.clear()
76         when: 'sync all advised cm handles'
77             objectUnderTest.moduleSyncAdvisedCmHandles()
78             Thread.sleep(100)
79         then: 'retry until all schema sets are stored in db (1 schema set  for each cm handle)'
80             def dbSchemaSetStorageTimer = meterRegistry.get('cps.module.persistence.schemaset.store').timer()
81             new PollingConditions().within(10, () -> {
82                 objectUnderTest.moduleSyncAdvisedCmHandles()
83                 Thread.sleep(100)
84                 assert dbSchemaSetStorageTimer.count() >= 500
85             })
86         then: 'wait till at least 5 batches of state updates are done (often more because of retries of locked cm handles)'
87             def dbStateUpdateTimer = meterRegistry.get('cps.ncmp.cmhandle.state.update.batch').timer()
88             new PollingConditions().within(10, () -> {
89                 assert dbStateUpdateTimer.count() >= 5
90             })
91         and: 'the db has been queried for tags exactly 2 times.'
92             def dbModuleQueriesTimer = meterRegistry.get('cps.module.service.module.reference.query.by.attribute').timer()
93             assert dbModuleQueriesTimer.count() == 2
94         and: 'exactly 2 calls to DMI to get module references'
95             def dmiModuleRetrievalTimer = meterRegistry.get('cps.ncmp.inventory.module.references.from.dmi').timer()
96             assert dmiModuleRetrievalTimer.count() == 2
97         and: 'log the relevant instrumentation'
98             logInstrumentation(dbModuleQueriesTimer,    'query module references')
99             logInstrumentation(dmiModuleRetrievalTimer, 'get modules from DMI   ')
100             logInstrumentation(dbSchemaSetStorageTimer, 'store schema sets      ')
101             logInstrumentation(dbStateUpdateTimer,      'batch state updates    ')
102         cleanup: 'remove all cm handles'
103             deregisterSequenceOfCmHandles(DMI1_URL, totalCmHandles, 1)
104     }
105
106     def 'Populate module sync work queue simultaneously on two parallel threads (CPS-2403).'() {
107         // This test failed before bug https://lf-onap.atlassian.net/browse/CPS-2403 was fixed
108         given: 'the queue is empty at the start'
109             registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, PARALLEL_SYNC_SAMPLE_SIZE, 1)
110             assert moduleSyncWorkQueue.isEmpty()
111         when: 'attempt to populate the queue on the main (test) and another parallel thread at the same time'
112             objectUnderTest.populateWorkQueueIfNeeded()
113             executorService.execute(populateQueueWithoutDelay)
114         and: 'wait a little (to give all threads time to complete their task)'
115             Thread.sleep(50)
116         then: 'the queue size is exactly the sample size'
117             assert moduleSyncWorkQueue.size() == PARALLEL_SYNC_SAMPLE_SIZE
118     }
119
120     def 'Populate module sync work queue on two parallel threads with a slight difference in start time.'() {
121         // This test proved that the issue in CPS-2403 did not arise if the the queue was populated and given time to be distributed
122         given: 'the queue is empty at the start'
123             registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, PARALLEL_SYNC_SAMPLE_SIZE, 1)
124             assert moduleSyncWorkQueue.isEmpty()
125         when: 'attempt to populate the queue on the main (test) and another parallel thread a little later'
126             objectUnderTest.populateWorkQueueIfNeeded()
127             executorService.execute(populateQueueWithDelay)
128         and: 'wait a little (to give all threads time to complete their task)'
129             Thread.sleep(50)
130         then: 'the queue size is exactly the sample size'
131             assert moduleSyncWorkQueue.size() == PARALLEL_SYNC_SAMPLE_SIZE
132     }
133
134     def logInstrumentation(timer, description) {
135         println "*** CPS-2478, $description : Invoked ${timer.count()} times, Total Time: ${timer.totalTime(TimeUnit.MILLISECONDS)} ms, Mean Time: ${timer.mean(TimeUnit.MILLISECONDS)} ms"
136         return true
137     }
138
139     def populateQueueWithoutDelay = () -> {
140         try {
141             objectUnderTest.populateWorkQueueIfNeeded()
142         } catch (InterruptedException e) {
143             e.printStackTrace()
144         }
145     }
146
147     def populateQueueWithDelay = () -> {
148         try {
149             Thread.sleep(10)
150             objectUnderTest.populateWorkQueueIfNeeded()
151         } catch (InterruptedException e) {
152             e.printStackTrace()
153         }
154     }
155
156 }