Use Flux stream processing for CM-handle searches 93/140593/3
authordanielhanrahan <daniel.hanrahan@est.tech>
Sun, 9 Feb 2025 21:53:34 +0000 (21:53 +0000)
committerdanielhanrahan <daniel.hanrahan@est.tech>
Wed, 26 Mar 2025 12:00:54 +0000 (12:00 +0000)
This greatly reduces memory consumption to fetch CM-handles in NCMP
by fetching in batches in a Flux. Full CM-handle search operations
now consume much less memory than before. The lower memory usage and
database pressure improves overall performance.

Issue-ID: CPS-2712
Signed-off-by: danielhanrahan <daniel.hanrahan@est.tech>
Change-Id: I7f653fadeadbf9612e0847f9451654b01a1a5604

cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java
cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/NetworkCmProxyInventoryFacade.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/NetworkCmProxyInventoryFacadeImpl.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/ParameterizedCmHandleQueryService.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/ParameterizedCmHandleQueryServiceImpl.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/NetworkCmProxyInventoryFacadeSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/ParameterizedCmHandleQueryServiceSpec.groovy

index d7b38d1..6215427 100755 (executable)
@@ -1,7 +1,7 @@
 /*
  *  ============LICENSE_START=======================================================
  *  Copyright (C) 2021 Pantheon.tech
- *  Modifications Copyright (C) 2021-2025 Nordix Foundation
+ *  Modifications Copyright (C) 2021-2025 OpenInfra Foundation Europe
  *  Modifications Copyright (C) 2021 highstreet technologies GmbH
  *  Modifications Copyright (C) 2021-2022 Bell Canada
  *  ================================================================================
@@ -262,10 +262,9 @@ public class NetworkCmProxyController implements NetworkCmProxyApi {
             final CmHandleQueryParameters cmHandleQueryParameters) {
         final CmHandleQueryApiParameters cmHandleQueryApiParameters =
                 deprecationHelper.mapOldConditionProperties(cmHandleQueryParameters);
-        final Collection<NcmpServiceCmHandle> cmHandles = networkCmProxyInventoryFacade
-                .executeCmHandleSearch(cmHandleQueryApiParameters);
         final List<RestOutputCmHandle> restOutputCmHandles =
-                cmHandles.stream().map(this::toRestOutputCmHandle).collect(Collectors.toList());
+                networkCmProxyInventoryFacade.executeCmHandleSearch(cmHandleQueryApiParameters)
+                        .map(this::toRestOutputCmHandle).collectList().block();
         return ResponseEntity.ok(restOutputCmHandles);
     }
 
index c3aca5a..94c113c 100644 (file)
@@ -2,7 +2,7 @@
  *  ============LICENSE_START=======================================================
  *  Copyright (C) 2021 Pantheon.tech
  *  Modifications Copyright (C) 2021 highstreet technologies GmbH
- *  Modifications Copyright (C) 2021-2024 Nordix Foundation
+ *  Modifications Copyright (C) 2021-2025 OpenInfra Foundation Europe
  *  Modifications Copyright (C) 2021-2022 Bell Canada.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
@@ -59,6 +59,7 @@ import org.springframework.http.HttpStatus
 import org.springframework.http.MediaType
 import org.springframework.http.ResponseEntity
 import org.springframework.test.web.servlet.MockMvc
+import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 import spock.lang.Shared
 import spock.lang.Specification
@@ -275,7 +276,7 @@ class NetworkCmProxyControllerSpec extends Specification {
             cmHandle2.alternateId = 'someAlternateId'
             cmHandle2.moduleSetTag = 'someModuleSetTag'
             cmHandle2.dataProducerIdentifier = 'someDataProducerIdentifier'
-            mockNetworkCmProxyInventoryFacade.executeCmHandleSearch(_) >> [cmHandle1, cmHandle2]
+            mockNetworkCmProxyInventoryFacade.executeCmHandleSearch(_) >> Flux.fromIterable([cmHandle1, cmHandle2])
         when: 'the searches api is invoked'
             def response = mvc.perform(post(searchesEndpoint).contentType(MediaType.APPLICATION_JSON).content(jsonString)).andReturn().response
         then: 'response status returns OK'
@@ -352,7 +353,7 @@ class NetworkCmProxyControllerSpec extends Specification {
             cmHandle2.cmHandleId = 'ch-2'
             cmHandle2.publicProperties = [color: 'green']
             cmHandle2.currentTrustLevel = TrustLevel.NONE
-            mockNetworkCmProxyInventoryFacade.executeCmHandleSearch(_) >> [cmHandle1, cmHandle2]
+            mockNetworkCmProxyInventoryFacade.executeCmHandleSearch(_) >> Flux.fromIterable([cmHandle1, cmHandle2])
         when: 'the searches api is invoked'
             def response = mvc.perform(post(searchesEndpoint).contentType(MediaType.APPLICATION_JSON).content(jsonString)).andReturn().response
         then: 'an empty cm handle identifier is returned'
index 9bfb775..876a5e7 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2024 Nordix Foundation
+ *  Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -30,6 +30,7 @@ import org.onap.cps.ncmp.api.inventory.models.CompositeState;
 import org.onap.cps.ncmp.api.inventory.models.DmiPluginRegistration;
 import org.onap.cps.ncmp.api.inventory.models.DmiPluginRegistrationResponse;
 import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle;
+import reactor.core.publisher.Flux;
 
 public interface NetworkCmProxyInventoryFacade {
 
@@ -96,9 +97,9 @@ public interface NetworkCmProxyInventoryFacade {
      * Retrieve cm handles with details for the given query parameters.
      *
      * @param cmHandleQueryApiParameters cm handle query parameters
-     * @return cm handles with details
+     * @return cm handle objects as a reactive stream (flux)
      */
-    Collection<NcmpServiceCmHandle> executeCmHandleSearch(final CmHandleQueryApiParameters cmHandleQueryApiParameters);
+    Flux<NcmpServiceCmHandle> executeCmHandleSearch(final CmHandleQueryApiParameters cmHandleQueryApiParameters);
 
     /**
      * Retrieve cm handle ids for the given query parameters.
index 118c2bb..7130afd 100644 (file)
@@ -1,7 +1,7 @@
 /*
  *  ============LICENSE_START=======================================================
  *  Copyright (C) 2021 highstreet technologies GmbH
- *  Modifications Copyright (C) 2021-2024 Nordix Foundation
+ *  Modifications Copyright (C) 2021-2025 OpenInfra Foundation Europe
  *  Modifications Copyright (C) 2021 Pantheon.tech
  *  Modifications Copyright (C) 2021-2022 Bell Canada
  *  Modifications Copyright (C) 2023 TechMahindra Ltd.
@@ -52,6 +52,7 @@ import org.onap.cps.ncmp.impl.utils.AlternateIdMatcher;
 import org.onap.cps.ncmp.impl.utils.YangDataConverter;
 import org.onap.cps.utils.JsonObjectMapper;
 import org.springframework.stereotype.Service;
+import reactor.core.publisher.Flux;
 
 @Service
 @RequiredArgsConstructor
@@ -118,15 +119,12 @@ public class NetworkCmProxyInventoryFacadeImpl implements NetworkCmProxyInventor
     }
 
     @Override
-    public Collection<NcmpServiceCmHandle> executeCmHandleSearch(
+    public Flux<NcmpServiceCmHandle> executeCmHandleSearch(
             final CmHandleQueryApiParameters cmHandleQueryApiParameters) {
-        final CmHandleQueryServiceParameters cmHandleQueryServiceParameters = jsonObjectMapper.convertToValueType(
-                cmHandleQueryApiParameters, CmHandleQueryServiceParameters.class);
+        final CmHandleQueryServiceParameters cmHandleQueryServiceParameters =
+                jsonObjectMapper.convertToValueType(cmHandleQueryApiParameters, CmHandleQueryServiceParameters.class);
         validateCmHandleQueryParameters(cmHandleQueryServiceParameters, CmHandleQueryConditions.ALL_CONDITION_NAMES);
-        final Collection<NcmpServiceCmHandle> ncmpServiceCmHandles =
-                parameterizedCmHandleQueryService.queryCmHandles(cmHandleQueryServiceParameters);
-        trustLevelManager.applyEffectiveTrustLevels(ncmpServiceCmHandles);
-        return ncmpServiceCmHandles;
+        return parameterizedCmHandleQueryService.queryCmHandles(cmHandleQueryServiceParameters);
     }
 
     @Override
index fc88844..5a105b3 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2022-2024 Nordix Foundation
+ *  Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@ package org.onap.cps.ncmp.impl.inventory;
 import java.util.Collection;
 import org.onap.cps.ncmp.api.inventory.models.CmHandleQueryServiceParameters;
 import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle;
+import reactor.core.publisher.Flux;
 
 public interface ParameterizedCmHandleQueryService {
     /**
@@ -31,6 +32,7 @@ public interface ParameterizedCmHandleQueryService {
      *      public properties
      *      modules
      *      cps-path
+     *      trust level
      *
      * @param cmHandleQueryServiceParameters the cm handle query parameters
      * @param outputAlternateId Boolean for cm handle reference type either
@@ -62,19 +64,12 @@ public interface ParameterizedCmHandleQueryService {
      *      public properties
      *      modules
      *      cps-path
+     *      trust level
      *
      * @param cmHandleQueryServiceParameters the cm handle query parameters
-     * @return collection of cm handles
+     * @return cm handle objects as a reactive stream (flux)
      */
-    Collection<NcmpServiceCmHandle> queryCmHandles(CmHandleQueryServiceParameters cmHandleQueryServiceParameters);
-
-    /**
-     * Get all cm handle objects.
-     * Note: it is similar to all the queries above but simply no conditions and hence not 'parameterized'
-     *
-     * @return collection of cm handles
-     */
-    Collection<NcmpServiceCmHandle> getAllCmHandles();
+    Flux<NcmpServiceCmHandle> queryCmHandles(CmHandleQueryServiceParameters cmHandleQueryServiceParameters);
 
     /**
      * Retrieves all {@code NcmpServiceCmHandle} instances without their associated properties.
index 9ce0e04..49b7726 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2022-2025 Nordix Foundation
+ *  Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -49,16 +49,20 @@ import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle;
 import org.onap.cps.ncmp.impl.inventory.models.InventoryQueryConditions;
 import org.onap.cps.ncmp.impl.inventory.models.PropertyType;
 import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
+import org.onap.cps.ncmp.impl.inventory.trustlevel.TrustLevelManager;
 import org.onap.cps.ncmp.impl.utils.YangDataConverter;
 import org.springframework.stereotype.Service;
+import reactor.core.publisher.Flux;
 
 @Service
 @RequiredArgsConstructor
 public class ParameterizedCmHandleQueryServiceImpl implements ParameterizedCmHandleQueryService {
 
+    private static final int FLUX_BUFFER_SIZE = 1000;
     private static final Collection<String> NO_QUERY_TO_EXECUTE = null;
     private final CmHandleQueryService cmHandleQueryService;
     private final InventoryPersistence inventoryPersistence;
+    private final TrustLevelManager trustLevelManager;
 
     @Override
     public Collection<String> queryCmHandleReferenceIds(
@@ -83,23 +87,11 @@ public class ParameterizedCmHandleQueryServiceImpl implements ParameterizedCmHan
     }
 
     @Override
-    public Collection<NcmpServiceCmHandle> queryCmHandles(
-            final CmHandleQueryServiceParameters cmHandleQueryServiceParameters) {
-
-        if (cmHandleQueryServiceParameters.getCmHandleQueryParameters().isEmpty()) {
-            return getAllCmHandles();
-        }
-
-        final Collection<String> cmHandleIds = queryCmHandleReferenceIds(cmHandleQueryServiceParameters, false);
-
+    public Flux<NcmpServiceCmHandle> queryCmHandles(final CmHandleQueryServiceParameters queryParameters) {
+        final Collection<String> cmHandleIds = queryCmHandleReferenceIds(queryParameters, false);
         return getNcmpServiceCmHandles(cmHandleIds);
     }
 
-    @Override
-    public Collection<NcmpServiceCmHandle> getAllCmHandles() {
-        return toNcmpServiceCmHandles(inventoryPersistence.getDataNode(NCMP_DMI_REGISTRY_PARENT));
-    }
-
     @Override
     public Collection<NcmpServiceCmHandle> getAllCmHandlesWithoutProperties() {
         return toNcmpServiceCmHandles(inventoryPersistence.getDataNode(NCMP_DMI_REGISTRY_PARENT, DIRECT_CHILDREN_ONLY));
@@ -236,7 +228,14 @@ public class ParameterizedCmHandleQueryServiceImpl implements ParameterizedCmHan
         return cmHandleQueryService.getAllCmHandleReferences(outputAlternateId);
     }
 
-    private Collection<NcmpServiceCmHandle> getNcmpServiceCmHandles(final Collection<String> cmHandleIds) {
+    private Flux<NcmpServiceCmHandle> getNcmpServiceCmHandles(final Collection<String> cmHandleIds) {
+        return Flux.fromIterable(cmHandleIds)
+                .buffer(FLUX_BUFFER_SIZE)
+                .map(this::getNcmpServiceCmHandleBatch)
+                .flatMap(Flux::fromIterable);
+    }
+
+    private Collection<NcmpServiceCmHandle> getNcmpServiceCmHandleBatch(final Collection<String> cmHandleIds) {
         final Collection<YangModelCmHandle> yangModelcmHandles
                 = inventoryPersistence.getYangModelCmHandles(cmHandleIds);
 
@@ -245,6 +244,7 @@ public class ParameterizedCmHandleQueryServiceImpl implements ParameterizedCmHan
         yangModelcmHandles.forEach(yangModelcmHandle ->
                 ncmpServiceCmHandles.add(YangDataConverter.toNcmpServiceCmHandle(yangModelcmHandle))
         );
+        trustLevelManager.applyEffectiveTrustLevels(ncmpServiceCmHandles);
         return ncmpServiceCmHandles;
     }
 
index eff8082..29cd92d 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2021-2024 Nordix Foundation
+ *  Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
  *  Modifications Copyright (C) 2021 Pantheon.tech
  *  Modifications Copyright (C) 2021-2022 Bell Canada
  *  Modifications Copyright (C) 2023 TechMahindra Ltd.
@@ -40,6 +40,7 @@ import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
 import org.onap.cps.ncmp.impl.inventory.trustlevel.TrustLevelManager
 import org.onap.cps.ncmp.impl.utils.AlternateIdMatcher
 import org.onap.cps.utils.JsonObjectMapper
+import reactor.core.publisher.Flux
 import spock.lang.Specification
 
 class NetworkCmProxyInventoryFacadeSpec extends Specification {
@@ -249,11 +250,10 @@ class NetworkCmProxyInventoryFacadeSpec extends Specification {
         and: 'query cm handle method returns two cm handles'
             mockParameterizedCmHandleQueryService.queryCmHandles(
                 spiedJsonObjectMapper.convertToValueType(cmHandleQueryApiParameters, CmHandleQueryServiceParameters.class))
-                >> [new NcmpServiceCmHandle(cmHandleId: 'ch-0'), new NcmpServiceCmHandle(cmHandleId: 'ch-1')]
-        and: 'a trust level for cm handles'
-            1 * mockTrustLevelManager.applyEffectiveTrustLevels(_) >> { args -> args[0].forEach{it.currentTrustLevel = TrustLevel.COMPLETE } }
+                >> Flux.fromIterable([new NcmpServiceCmHandle(cmHandleId: 'ch-0', currentTrustLevel: TrustLevel.COMPLETE),
+                                      new NcmpServiceCmHandle(cmHandleId: 'ch-1', currentTrustLevel: TrustLevel.COMPLETE)])
         when: 'execute cm handle search is called'
-            def result = objectUnderTest.executeCmHandleSearch(cmHandleQueryApiParameters)
+            def result = objectUnderTest.executeCmHandleSearch(cmHandleQueryApiParameters).collectList().block()
         then: 'result consists of the two cm handles returned by the CPS Data Service'
             assert result.size() == 2
             assert result[0].cmHandleId == 'ch-0'
index 8bb4551..b2e08af 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2022-2025 Nordix Foundation
+ *  Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@ import org.onap.cps.api.exceptions.DataInUseException
 import org.onap.cps.api.exceptions.DataValidationException
 import org.onap.cps.api.model.ConditionProperties
 import org.onap.cps.api.model.DataNode
+import org.onap.cps.ncmp.impl.inventory.trustlevel.TrustLevelManager
 import spock.lang.Specification
 
 import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DMI_REGISTRY_PARENT
@@ -38,11 +39,12 @@ class ParameterizedCmHandleQueryServiceSpec extends Specification {
     def cmHandleQueries = Mock(CmHandleQueryService)
     def partiallyMockedCmHandleQueries = Spy(CmHandleQueryService)
     def mockInventoryPersistence = Mock(InventoryPersistence)
+    def mockTrustLevelManager = Mock(TrustLevelManager)
 
     def dmiRegistry = new DataNode(xpath: NCMP_DMI_REGISTRY_PARENT, childDataNodes: createDataNodeList(['PNFDemo1', 'PNFDemo2', 'PNFDemo3', 'PNFDemo4']))
 
-    def objectUnderTest = new ParameterizedCmHandleQueryServiceImpl(cmHandleQueries, mockInventoryPersistence)
-    def objectUnderTestWithPartiallyMockedQueries = new ParameterizedCmHandleQueryServiceImpl(partiallyMockedCmHandleQueries, mockInventoryPersistence)
+    def objectUnderTest = new ParameterizedCmHandleQueryServiceImpl(cmHandleQueries, mockInventoryPersistence, mockTrustLevelManager)
+    def objectUnderTestWithPartiallyMockedQueries = new ParameterizedCmHandleQueryServiceImpl(partiallyMockedCmHandleQueries, mockInventoryPersistence, mockTrustLevelManager)
 
     def 'Query cm handle ids with cpsPath.'() {
         given: 'a cmHandleWithCpsPath condition property'
@@ -141,7 +143,7 @@ class ParameterizedCmHandleQueryServiceSpec extends Specification {
             def conditionProperties = createConditionProperties('hasAllModules', [['moduleName': 'some-module-name']])
             cmHandleQueryParameters.setCmHandleQueryParameters([conditionProperties])
         when: 'the query is executed for cm handle ids'
-            def result = objectUnderTest.queryCmHandles(cmHandleQueryParameters)
+            def result = objectUnderTest.queryCmHandles(cmHandleQueryParameters).collectList().block()
         then: 'the inventory service is called with the correct module names'
             1 * mockInventoryPersistence.getCmHandleReferencesWithGivenModules(['some-module-name'], false) >> ['ch1']
         and: 'the inventory service is called with teh correct if and returns a yang model cm handle'
@@ -171,10 +173,12 @@ class ParameterizedCmHandleQueryServiceSpec extends Specification {
     def 'Query cm handle details when the query is empty.'() {
         given: 'We use an empty query'
             def cmHandleQueryParameters = new CmHandleQueryServiceParameters()
-        and: 'the inventory persistence returns the dmi registry datanode with just ids'
-            mockInventoryPersistence.getDataNode(NCMP_DMI_REGISTRY_PARENT) >> [dmiRegistry]
+        and: 'the inventory persistence returns the cm handle ids of all cm handles'
+            cmHandleQueries.getAllCmHandleReferences(false) >> getCmHandleReferencesForDmiRegistry(false)
+        and: 'the inventory persistence returns the cm handle details when requested'
+            mockInventoryPersistence.getYangModelCmHandles(_) >> dmiRegistry.childDataNodes.collect { new YangModelCmHandle(id: it.leaves.get("id").toString(), dmiProperties: [], publicProperties: []) }
         when: 'the query is executed for both cm handle details'
-            def result = objectUnderTest.queryCmHandles(cmHandleQueryParameters)
+            def result = objectUnderTest.queryCmHandles(cmHandleQueryParameters).collectList().block()
         then: 'the correct cm handles are returned'
             assert result.size() == 4
             assert result.cmHandleId.containsAll('PNFDemo1', 'PNFDemo2', 'PNFDemo3', 'PNFDemo4')