2  *  ============LICENSE_START=======================================================
 
   3  *  Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
 
   4  *  Modifications Copyright (C) 2024 TechMahindra Ltd.
 
   5  *  ================================================================================
 
   6  *  Licensed under the Apache License, Version 2.0 (the 'License');
 
   7  *  you may not use this file except in compliance with the License.
 
   8  *  You may obtain a copy of the License at
 
  10  *        http://www.apache.org/licenses/LICENSE-2.0
 
  12  *  Unless required by applicable law or agreed to in writing, software
 
  13  *  distributed under the License is distributed on an 'AS IS' BASIS,
 
  14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  15  *  See the License for the specific language governing permissions and
 
  16  *  limitations under the License.
 
  18  *  SPDX-License-Identifier: Apache-2.0
 
  19  *  ============LICENSE_END=========================================================
 
  22 package org.onap.cps.integration.performance.ncmp
 
  24 import org.onap.cps.api.CpsQueryService
 
  25 import org.onap.cps.integration.performance.base.NcmpPerfTestBase
 
  26 import org.onap.cps.api.model.DataNode
 
  27 import org.onap.cps.utils.ContentType
 
  29 import static org.onap.cps.api.parameters.FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS
 
  31 class CmDataSubscriptionsPerfTest extends NcmpPerfTestBase {
 
  33     def datastore1cmHandlePlaceHolder = '{"datastores":{"datastore":[{"name":"ds-1","cm-handles":{"cm-handle":[]}}]}}'
 
  34     def xPathForDataStore1CmHandles = '/datastores/datastore[@name="ds-1"]/cm-handles'
 
  36     CpsQueryService objectUnderTest
 
  38     def setup() { objectUnderTest = cpsQueryService }
 
  40     def totalNumberOfEntries = numberOfFiltersPerCmHandle * numberOfCmHandlesPerCmDataSubscription
 
  42     def random = new Random()
 
  44     def 'Find many subscribers in large dataset.'() {
 
  45         when: 'all filters are queried'
 
  47             def cpsPath = '//filter'
 
  48             def result = objectUnderTest.queryDataNodes(NCMP_PERFORMANCE_TEST_DATASPACE, CM_DATA_SUBSCRIPTIONS_ANCHOR, cpsPath, INCLUDE_ALL_DESCENDANTS)
 
  49         then: 'got all filter entries'
 
  50             result.size() == totalNumberOfEntries
 
  51         then: 'find a random subscriptions by iteration (worst case: whole subscription matches previous entries)'
 
  52             def matches = querySubscriptionsByIteration(result, -1)
 
  54             matches.size() == numberOfFiltersPerCmHandle * numberOfCmHandlesPerCmDataSubscription
 
  55         and: 'query all subscribers within 1 second'
 
  56             def durationInSeconds = resourceMeter.getTotalTimeInSeconds()
 
  57             recordAndAssertResourceUsage("Query all subscribers", 2.56, durationInSeconds, 300, resourceMeter.getTotalMemoryUsageInMB())
 
  60     def 'Worst case subscription update (200x10 matching entries).'() {
 
  61         given: 'all filters are queried'
 
  62             def cpsPath = '//filter'
 
  63             def result = objectUnderTest.queryDataNodes(NCMP_PERFORMANCE_TEST_DATASPACE, CM_DATA_SUBSCRIPTIONS_ANCHOR, cpsPath, INCLUDE_ALL_DESCENDANTS)
 
  64         and: 'there are the expected number of subscribers per subscription'
 
  65             assert result.collect {it.leaves.subscribers.size()}.sum() == totalNumberOfEntries * numberOfCmDataSubscribers
 
  66         and: 'find all entries for an existing subscriptions'
 
  67             def matches = querySubscriptionsByIteration(result, 1)
 
  68         when: 'update all subscriptions found'
 
  70             HashMap<String, List<String>> filterEntriesPerPath = [:]
 
  71             matches.each { dataNode, subscribersAsArray ->
 
  72                 def updatedSubscribers = createLeafList('subscribers', 1 + numberOfCmDataSubscribers, subscriberIdPrefix)
 
  73                 def filterEntry = '{"xpath":"' + dataNode.leaves.xpath + '", ' + updatedSubscribers + ' }'
 
  74                 def parentPath = dataNode.xpath.toString().substring(0, dataNode.xpath.toString().indexOf('/filter[@xpath='))
 
  75                 filterEntriesPerPath.putIfAbsent(parentPath, new ArrayList<String>())
 
  76                 filterEntriesPerPath.get(parentPath).add(filterEntry)
 
  78             HashMap<String, String> jsonPerPath = [:]
 
  79             filterEntriesPerPath.each { parentPath, filterEntries ->
 
  80                 jsonPerPath.put(parentPath, '{"filter": [' + filterEntries.join(',') + ']}')
 
  83             // NOTE Below fails as updateDataNodesAndDescendants can't handle JSON lists!
 
  84             // cpsDataService.updateDataNodesAndDescendants(NCMP_PERFORMANCE_TEST_DATASPACE, CM_DATA_SUBSCRIPTIONS_ANCHOR, jsonPerPath, now)
 
  86             // So update for each CM-handle instead:
 
  87             jsonPerPath.each { parentPath, json ->
 
  88                 // Around 8.5 seconds for long strings, 4.8 with short strings
 
  89                 // cpsDataService.updateDataNodeAndDescendants(NCMP_PERFORMANCE_TEST_DATASPACE, CM_DATA_SUBSCRIPTIONS_ANCHOR, parentPath, json, now)
 
  90                 // Around 6.5 seconds for long strings, 3.3 seconds with short strings
 
  91                 cpsDataService.updateNodeLeaves(NCMP_PERFORMANCE_TEST_DATASPACE, CM_DATA_SUBSCRIPTIONS_ANCHOR, parentPath, json, now, ContentType.JSON)
 
  95             def durationInSeconds = resourceMeter.getTotalTimeInSeconds()
 
  96         then: 'a subscriber has been added to each filter entry'
 
  97             def resultAfter = objectUnderTest.queryDataNodes(NCMP_PERFORMANCE_TEST_DATASPACE, CM_DATA_SUBSCRIPTIONS_ANCHOR, cpsPath, INCLUDE_ALL_DESCENDANTS)
 
  98             assert resultAfter.collect {it.leaves.subscribers.size()}.sum() == totalNumberOfEntries * (1 + numberOfCmDataSubscribers)
 
  99         and: 'update matching subscription within 15 seconds'
 
 100             recordAndAssertResourceUsage("Update matching subscription", 6.2, durationInSeconds, 1000, resourceMeter.getTotalMemoryUsageInMB())
 
 103     def 'Worst case new subscription (200x10 new entries).'() {
 
 104         given: 'a new subscription with non-matching data'
 
 105             def subscribers = createLeafList('subscribers',1, subscriberIdPrefix)
 
 106             def filters = '"filters":' + createJsonArray('filter',numberOfFiltersPerCmHandle,'xpath','other_' + xpathPrefix,subscribers)
 
 107             def cmHandles = createJsonArray('cm-handle',numberOfCmHandlesPerCmDataSubscription,'id','other' + cmHandlePrefix, filters)
 
 108         when: 'Insert a new subscription'
 
 109             resourceMeter.start()
 
 110             cpsDataService.saveData(NCMP_PERFORMANCE_TEST_DATASPACE, CM_DATA_SUBSCRIPTIONS_ANCHOR, xPathForDataStore1CmHandles, cmHandles, now)
 
 112             def durationInSeconds = resourceMeter.getTotalTimeInSeconds()
 
 113         then: 'insert new subscription with 1 second'
 
 114             recordAndAssertResourceUsage("Insert new subscription", 1.28, durationInSeconds, 100, resourceMeter.getTotalMemoryUsageInMB())
 
 117     def querySubscriptionsByIteration(Collection<DataNode> allSubscriptionsAsDataNodes, targetSubscriptionSequenceNumber) {
 
 119         allSubscriptionsAsDataNodes.each {
 
 120             String[] subscribersAsArray = it.leaves.get('subscribers')
 
 121             Set<String> subscribersAsSet = new HashSet<>(Arrays.asList(subscribersAsArray))
 
 122             def targetSubscriptionId = subscriberIdPrefix + '-' + ( targetSubscriptionSequenceNumber > 0 ? targetSubscriptionSequenceNumber
 
 123                     : 1 + random.nextInt(numberOfCmDataSubscribers) )
 
 124             if (subscribersAsSet.contains(targetSubscriptionId)) {
 
 125                 matches.put(it, subscribersAsArray)