Merge "CM Data Subscriptions PoC/Performance test fixes"
[cps.git] / integration-test / src / test / groovy / org / onap / cps / integration / performance / ncmp / CmDataSubscriptionsPerfTest.groovy
1 /*
2  *  ============LICENSE_START=======================================================
3  *  Copyright (C) 2023 Nordix Foundation
4  *  ================================================================================
5  *  Licensed under the Apache License, Version 2.0 (the 'License');
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an 'AS IS' BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  *  SPDX-License-Identifier: Apache-2.0
18  *  ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.integration.performance.ncmp
22
23 import org.onap.cps.api.CpsQueryService
24 import org.onap.cps.integration.performance.base.NcmpPerfTestBase
25 import org.onap.cps.spi.model.DataNode
26
27 import static org.onap.cps.spi.FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS
28
29 class CmDataSubscriptionsPerfTest extends NcmpPerfTestBase {
30
31     def datastore1cmHandlePlaceHolder = '{"datastores":{"datastore":[{"name":"ds-1","cm-handles":{"cm-handle":[]}}]}}'
32     def xPathForDataStore1CmHandles = '/datastores/datastore[@name="ds-1"]/cm-handles'
33
34     CpsQueryService objectUnderTest
35
36     def setup() { objectUnderTest = cpsQueryService }
37
38     def totalNumberOfEntries = numberOfFiltersPerCmHandle * numberOfCmHandlesPerCmDataSubscription
39
40     def random = new Random()
41
42     def 'Find many subscribers in large dataset.'() {
43         when: 'all filters are queried'
44             stopWatch.start()
45             def cpsPath = '//filter'
46             def result = objectUnderTest.queryDataNodes(NCMP_PERFORMANCE_TEST_DATASPACE, CM_DATA_SUBSCRIPTIONS_ANCHOR, cpsPath, INCLUDE_ALL_DESCENDANTS)
47         then: 'got all filter entries'
48             result.size() == totalNumberOfEntries
49         then: 'find a random subscriptions by iteration (worst case: whole subscription matches previous entries)'
50             def matches = querySubscriptionsByIteration(result, -1)
51             stopWatch.stop()
52             matches.size() == numberOfFiltersPerCmHandle * numberOfCmHandlesPerCmDataSubscription
53         and: 'query all subscribers within 1 second'
54             def durationInMillis = stopWatch.getTotalTimeMillis()
55             recordAndAssertPerformance("Query all subscribers", 1_000, durationInMillis)
56     }
57
58     def 'Worst case subscription update (200x10 matching entries).'() {
59         given: 'all filters are queried'
60             def cpsPath = '//filter'
61             def result = objectUnderTest.queryDataNodes(NCMP_PERFORMANCE_TEST_DATASPACE, CM_DATA_SUBSCRIPTIONS_ANCHOR, cpsPath, INCLUDE_ALL_DESCENDANTS)
62         and: 'there are the expected number of subscribers per subscription'
63             assert result.collect {it.leaves.subscribers.size()}.sum() == totalNumberOfEntries * numberOfCmDataSubscribers
64         and: 'find all entries for an existing subscriptions'
65             def matches = querySubscriptionsByIteration(result, 1)
66         when: 'update all subscriptions found'
67             stopWatch.start()
68             HashMap<String, List<String>> filterEntriesPerPath = [:]
69             matches.each { dataNode, subscribersAsArray ->
70                 def updatedSubscribers = createLeafList('subscribers', 1 + numberOfCmDataSubscribers, subscriberIdPrefix)
71                 def filterEntry = '{"xpath":"' + dataNode.leaves.xpath + '", ' + updatedSubscribers + ' }'
72                 def parentPath = dataNode.xpath.toString().substring(0, dataNode.xpath.toString().indexOf('/filter[@xpath='))
73                 filterEntriesPerPath.putIfAbsent(parentPath, new ArrayList<String>())
74                 filterEntriesPerPath.get(parentPath).add(filterEntry)
75             }
76             HashMap<String, String> jsonPerPath = [:]
77             filterEntriesPerPath.each { parentPath, filterEntries ->
78                 jsonPerPath.put(parentPath, '{"filter": [' + filterEntries.join(',') + ']}')
79             }
80
81             // NOTE Below fails as updateDataNodesAndDescendants can't handle JSON lists!
82             // cpsDataService.updateDataNodesAndDescendants(NCMP_PERFORMANCE_TEST_DATASPACE, CM_DATA_SUBSCRIPTIONS_ANCHOR, jsonPerPath, now)
83
84             // So update for each CM-handle instead:
85             jsonPerPath.each { parentPath, json ->
86                 // Around 8.5 seconds for long strings, 4.8 with short strings
87                 // cpsDataService.updateDataNodeAndDescendants(NCMP_PERFORMANCE_TEST_DATASPACE, CM_DATA_SUBSCRIPTIONS_ANCHOR, parentPath, json, now)
88                 // Around 6.5 seconds for long strings, 3.3 seconds with short strings
89                 cpsDataService.updateNodeLeaves(NCMP_PERFORMANCE_TEST_DATASPACE, CM_DATA_SUBSCRIPTIONS_ANCHOR, parentPath, json, now)
90             }
91
92             stopWatch.stop()
93             def durationInMillis = stopWatch.getTotalTimeMillis()
94         then: 'a subscriber has been added to each filter entry'
95             def resultAfter = objectUnderTest.queryDataNodes(NCMP_PERFORMANCE_TEST_DATASPACE, CM_DATA_SUBSCRIPTIONS_ANCHOR, cpsPath, INCLUDE_ALL_DESCENDANTS)
96             assert resultAfter.collect {it.leaves.subscribers.size()}.sum() == totalNumberOfEntries * (1 + numberOfCmDataSubscribers)
97         and: 'update matching subscription within 8 seconds'
98             recordAndAssertPerformance("Update matching subscription", 8_000, durationInMillis)
99     }
100
101     def 'Worst case new subscription (200x10 new entries).'() {
102         given: 'a new subscription with non-matching data'
103             def subscribers = createLeafList('subscribers',1, subscriberIdPrefix)
104             def filters = '"filters":' + createJsonArray('filter',numberOfFiltersPerCmHandle,'xpath','other_' + xpathPrefix,subscribers)
105             def cmHandles = createJsonArray('cm-handle',numberOfCmHandlesPerCmDataSubscription,'id','other' + cmHandlePrefix, filters)
106         when: 'Insert a new subscription'
107             stopWatch.start()
108             cpsDataService.saveData(NCMP_PERFORMANCE_TEST_DATASPACE, CM_DATA_SUBSCRIPTIONS_ANCHOR, xPathForDataStore1CmHandles, cmHandles, now)
109             stopWatch.stop()
110             def durationInMillis = stopWatch.getTotalTimeMillis()
111         then: 'insert new subscription with 1 second'
112             recordAndAssertPerformance("Insert new subscription", 1_000, durationInMillis)
113     }
114
115     def querySubscriptionsByIteration(Collection<DataNode> allSubscriptionsAsDataNodes, targetSubscriptionSequenceNumber) {
116         def matches = [:]
117         allSubscriptionsAsDataNodes.each {
118             String[] subscribersAsArray = it.leaves.get('subscribers')
119             Set<String> subscribersAsSet = new HashSet<>(Arrays.asList(subscribersAsArray))
120             def targetSubscriptionId = subscriberIdPrefix + '-' + ( targetSubscriptionSequenceNumber > 0 ? targetSubscriptionSequenceNumber
121                                                                                                      : 1 + random.nextInt(numberOfCmDataSubscribers) )
122             if (subscribersAsSet.contains(targetSubscriptionId)) {
123                 matches.put(it, subscribersAsArray)
124             }
125         }
126         return matches
127     }
128
129 }