#2: Used async version of web client for single cm handle read operation 57/138157/7
authorsourabh_sourabh <sourabh.sourabh@est.tech>
Tue, 11 Jun 2024 07:56:54 +0000 (08:56 +0100)
committersourabh_sourabh <sourabh.sourabh@est.tech>
Thu, 13 Jun 2024 14:13:05 +0000 (15:13 +0100)
 - Code change is done to use async web client for single cm handle
   read data operation.
 - CpsNcmpTaskExecutor class and groovy test code is removed.
 - Use of supplier code is removed from cm handle Query service.

Issue-ID:CPS-2259
Change-Id: I2b7a36e8a151a9224937afd876fceb30be6a24ce
Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech>
16 files changed:
cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java
cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpCachedResourceRequestHandler.java
cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandler.java
cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java
cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java [deleted file]
cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy
cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy
cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandlerSpec.groovy
cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutorSpec.groovy [deleted file]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NetworkCmProxyDataService.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NetworkCmProxyQueryService.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyQueryServiceImpl.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy

index e6d6faf..80e1c44 100644 (file)
 
 package org.onap.cps.ncmp.rest.controller.handlers;
 
-import java.util.function.Supplier;
+import java.util.Collection;
 import org.onap.cps.ncmp.api.NetworkCmProxyDataService;
 import org.onap.cps.ncmp.api.NetworkCmProxyQueryService;
 import org.onap.cps.ncmp.api.models.CmResourceAddress;
-import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor;
 import org.onap.cps.spi.FetchDescendantsOption;
+import org.onap.cps.spi.model.DataNode;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Component;
+import reactor.core.publisher.Mono;
 
 @Component
 public class NcmpCachedResourceRequestHandler extends NcmpDatastoreRequestHandler {
@@ -38,14 +39,11 @@ public class NcmpCachedResourceRequestHandler extends NcmpDatastoreRequestHandle
     /**
      * Constructor.
      *
-     * @param cpsNcmpTaskExecutor        @see org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor
      * @param networkCmProxyDataService  @see org.onap.cps.ncmp.api.NetworkCmProxyDataService
      * @param networkCmProxyQueryService @see org.onap.cps.ncmp.api.NetworkCmProxyQueryService
      */
-    public NcmpCachedResourceRequestHandler(final CpsNcmpTaskExecutor cpsNcmpTaskExecutor,
-                                            final NetworkCmProxyDataService networkCmProxyDataService,
+    public NcmpCachedResourceRequestHandler(final NetworkCmProxyDataService networkCmProxyDataService,
                                             final NetworkCmProxyQueryService networkCmProxyQueryService) {
-        super(cpsNcmpTaskExecutor);
         this.networkCmProxyDataService = networkCmProxyDataService;
         this.networkCmProxyQueryService = networkCmProxyQueryService;
     }
@@ -59,35 +57,30 @@ public class NcmpCachedResourceRequestHandler extends NcmpDatastoreRequestHandle
      * @param includeDescendants whether include descendants
      * @return the response entity
      */
-    public ResponseEntity<Object> executeRequest(final String cmHandleId,
-                                                 final String resourceIdentifier,
+    public ResponseEntity<Object> executeRequest(final String cmHandleId, final String resourceIdentifier,
                                                  final boolean includeDescendants) {
-
-        final Supplier<Object> taskSupplier = getTaskSupplierForQueryRequest(cmHandleId, resourceIdentifier,
-            includeDescendants);
-        return executeTaskSync(taskSupplier);
+        final Collection<DataNode> dataNodes = getTaskSupplierForQueryRequest(cmHandleId, resourceIdentifier,
+                includeDescendants);
+        return ResponseEntity.ok(dataNodes);
     }
 
     @Override
-    protected Supplier<Object> getTaskSupplierForGetRequest(final CmResourceAddress cmResourceAddress,
-                                                  final String optionsParamInQuery,
-                                                  final String topicParamInQuery,
-                                                  final String requestId,
-                                                  final boolean includeDescendants,
-                                                  final String authorization) {
-
+    protected Mono<Object> getResourceDataForCmHandle(final CmResourceAddress cmResourceAddress,
+                                                      final String optionsParamInQuery,
+                                                      final String topicParamInQuery,
+                                                      final String requestId,
+                                                      final boolean includeDescendants,
+                                                      final String authorization) {
         final FetchDescendantsOption fetchDescendantsOption = getFetchDescendantsOption(includeDescendants);
-
-        return () -> networkCmProxyDataService.getResourceDataForCmHandle(cmResourceAddress, fetchDescendantsOption);
+        return Mono.fromSupplier(
+                () -> networkCmProxyDataService.getResourceDataForCmHandle(cmResourceAddress, fetchDescendantsOption));
     }
 
-    private Supplier<Object> getTaskSupplierForQueryRequest(final String cmHandleId,
-                                                            final String resourceIdentifier,
-                                                            final boolean includeDescendants) {
-
+    private Collection<DataNode> getTaskSupplierForQueryRequest(final String cmHandleId,
+                                                                final String resourceIdentifier,
+                                                                final boolean includeDescendants) {
         final FetchDescendantsOption fetchDescendantsOption = getFetchDescendantsOption(includeDescendants);
-
-        return () -> networkCmProxyQueryService.queryResourceDataOperational(cmHandleId, resourceIdentifier,
+        return networkCmProxyQueryService.queryResourceDataOperational(cmHandleId, resourceIdentifier,
             fetchDescendantsOption);
     }
 
@@ -95,6 +88,4 @@ public class NcmpCachedResourceRequestHandler extends NcmpDatastoreRequestHandle
         return includeDescendants ? FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS
             : FetchDescendantsOption.OMIT_DESCENDANTS;
     }
-
-
 }
index 1ae1682..20059e2 100644 (file)
@@ -22,15 +22,14 @@ package org.onap.cps.ncmp.rest.controller.handlers;
 
 import java.util.Map;
 import java.util.UUID;
-import java.util.function.Supplier;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.ncmp.api.models.CmResourceAddress;
-import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor;
 import org.onap.cps.ncmp.rest.util.TopicValidator;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Service;
+import reactor.core.publisher.Mono;
 
 @Slf4j
 @Service
@@ -42,12 +41,9 @@ public abstract class NcmpDatastoreRequestHandler {
 
     @Value("${notification.async.executor.time-out-value-in-ms:60000}")
     protected int timeOutInMilliSeconds;
-
     @Value("${notification.enabled:true}")
     protected boolean notificationFeatureEnabled;
 
-    protected final CpsNcmpTaskExecutor cpsNcmpTaskExecutor;
-
     /**
      * Executes synchronous/asynchronous get request for given cm handle.
      *
@@ -66,7 +62,7 @@ public abstract class NcmpDatastoreRequestHandler {
 
         final boolean asyncResponseRequested = topicParamInQuery != null;
         if (asyncResponseRequested && notificationFeatureEnabled) {
-            return executeAsyncTaskAndGetResponseEntity(cmResourceAddress, optionsParamInQuery, topicParamInQuery,
+            return fetchResourceDataAsynchronously(cmResourceAddress, optionsParamInQuery, topicParamInQuery,
                 includeDescendants, authorization);
         }
 
@@ -74,41 +70,36 @@ public abstract class NcmpDatastoreRequestHandler {
             log.warn("Asynchronous request is unavailable as notification feature is currently disabled, "
                     + "will use synchronous operation.");
         }
-        final Supplier<Object> taskSupplier = getTaskSupplierForGetRequest(cmResourceAddress, optionsParamInQuery,
-            NO_TOPIC, NO_REQUEST_ID, includeDescendants, authorization);
-        return executeTaskSync(taskSupplier);
+        final Mono<Object> resourceDataMono = getResourceDataForCmHandle(cmResourceAddress, optionsParamInQuery,
+                NO_TOPIC, NO_REQUEST_ID, includeDescendants, authorization);
+        return fetchResourceDataSynchronously(resourceDataMono);
     }
 
+    private ResponseEntity<Object> fetchResourceDataSynchronously(final Mono<Object> resourceDataMono) {
+        return ResponseEntity.ok(resourceDataMono.block());
+    }
 
-    private ResponseEntity<Object> executeTaskAsync(final String topicParamInQuery,
-                                                      final String requestId,
-                                                      final Supplier<Object> taskSupplier) {
+    private ResponseEntity<Object> fetchResourceDataAsynchronously(final CmResourceAddress cmResourceAddress,
+                                                                   final String optionsParamInQuery,
+                                                                   final String topicParamInQuery,
+                                                                   final boolean includeDescendants,
+                                                                   final String authorization) {
         TopicValidator.validateTopicName(topicParamInQuery);
+        final String requestId = UUID.randomUUID().toString();
+        getResourceDataForCmHandle(cmResourceAddress, optionsParamInQuery, topicParamInQuery, requestId,
+                includeDescendants, authorization)
+                .doOnSuccess(result -> log.debug("Async operation succeeded for request id {}: {}", requestId, result))
+                .doOnError(error ->
+                        log.error("Async operation failed for request id {}: {}", requestId, error.getMessage()))
+                .subscribe();
         log.debug("Received Async request with id {}", requestId);
-        cpsNcmpTaskExecutor.executeTask(taskSupplier, timeOutInMilliSeconds);
         return ResponseEntity.ok(Map.of("requestId", requestId));
     }
 
-    protected ResponseEntity<Object> executeTaskSync(final Supplier<Object> taskSupplier) {
-        return ResponseEntity.ok(taskSupplier.get());
-    }
-
-    private ResponseEntity<Object> executeAsyncTaskAndGetResponseEntity(final CmResourceAddress cmResourceAddress,
-                                                                        final String optionsParamInQuery,
-                                                                        final String topicParamInQuery,
-                                                                        final boolean includeDescendants,
-                                                                        final String authorization) {
-        final String requestId = UUID.randomUUID().toString();
-        final Supplier<Object> taskSupplier = getTaskSupplierForGetRequest(cmResourceAddress,
-            optionsParamInQuery, topicParamInQuery, requestId, includeDescendants, authorization);
-        return executeTaskAsync(topicParamInQuery, requestId, taskSupplier);
-    }
-
-    protected abstract Supplier<Object> getTaskSupplierForGetRequest(final CmResourceAddress cmResourceAddress,
-                                                  final String optionsParamInQuery,
-                                                  final String topicParamInQuery,
-                                                  final String requestId,
-                                                  final boolean includeDescendant,
-                                                  final String authorization);
-
+    protected abstract Mono<Object> getResourceDataForCmHandle(final CmResourceAddress cmResourceAddress,
+                                                               final String optionsParamInQuery,
+                                                               final String topicParamInQuery,
+                                                               final String requestId,
+                                                               final boolean includeDescendant,
+                                                               final String authorization);
 }
index d716877..53e374d 100644 (file)
@@ -25,7 +25,6 @@ import static org.onap.cps.ncmp.api.impl.operations.OperationType.READ;
 
 import java.util.Map;
 import java.util.UUID;
-import java.util.function.Supplier;
 import org.onap.cps.ncmp.api.NetworkCmProxyDataService;
 import org.onap.cps.ncmp.api.impl.exception.InvalidDatastoreException;
 import org.onap.cps.ncmp.api.impl.operations.DatastoreType;
@@ -34,12 +33,12 @@ import org.onap.cps.ncmp.api.models.CmResourceAddress;
 import org.onap.cps.ncmp.api.models.DataOperationRequest;
 import org.onap.cps.ncmp.rest.exceptions.OperationNotSupportedException;
 import org.onap.cps.ncmp.rest.exceptions.PayloadTooLargeException;
-import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor;
 import org.onap.cps.ncmp.rest.util.TopicValidator;
 import org.springframework.http.ResponseEntity;
-import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
+import reactor.core.publisher.Mono;
 
-@Component
+@Service
 public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestHandler {
 
     private final NetworkCmProxyDataService networkCmProxyDataService;
@@ -49,12 +48,9 @@ public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestH
     /**
      * Constructor.
      *
-     * @param cpsNcmpTaskExecutor        @see org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor
      * @param networkCmProxyDataService  @see org.onap.cps.ncmp.api.NetworkCmProxyDataService
      */
-    public NcmpPassthroughResourceRequestHandler(final CpsNcmpTaskExecutor cpsNcmpTaskExecutor,
-                                                 final NetworkCmProxyDataService networkCmProxyDataService) {
-        super(cpsNcmpTaskExecutor);
+    public NcmpPassthroughResourceRequestHandler(final NetworkCmProxyDataService networkCmProxyDataService) {
         this.networkCmProxyDataService = networkCmProxyDataService;
     }
 
@@ -79,15 +75,14 @@ public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestH
     }
 
     @Override
-    protected Supplier<Object> getTaskSupplierForGetRequest(final CmResourceAddress cmResourceAddress,
-                                                            final String optionsParamInQuery,
-                                                            final String topicParamInQuery,
-                                                            final String requestId,
-                                                            final boolean includeDescendants,
-                                                            final String authorization) {
-
-        return () -> networkCmProxyDataService.getResourceDataForCmHandle(cmResourceAddress, optionsParamInQuery,
-            topicParamInQuery, requestId, authorization);
+    protected Mono<Object> getResourceDataForCmHandle(final CmResourceAddress cmResourceAddress,
+                                                      final String optionsParamInQuery,
+                                                      final String topicParamInQuery,
+                                                      final String requestId,
+                                                      final boolean includeDescendants,
+                                                      final String authorization) {
+        return networkCmProxyDataService.getResourceDataForCmHandle(cmResourceAddress, optionsParamInQuery,
+                topicParamInQuery, requestId, authorization);
     }
 
     private ResponseEntity<Object> getRequestIdAndSendDataOperationRequestToDmiService(
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java
deleted file mode 100644 (file)
index 2601c7a..0000000
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- *  ============LICENSE_START=======================================================
- *  Copyright (C) 2022-2024 Nordix Foundation
- *  ================================================================================
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *        http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- *  SPDX-License-Identifier: Apache-2.0
- *  ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.rest.executor;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.function.BiConsumer;
-import java.util.function.Supplier;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Service;
-
-@Slf4j
-@Service
-public class CpsNcmpTaskExecutor {
-
-    /**
-     * Execute a task asynchronously, and invoke completion handler when done.
-     *
-     * @param taskSupplier functional method is get() task needed to be executed asynchronously
-     * @param taskCompletionHandler the action to perform on task completion or error
-     * @param timeOutInMillis the time-out value in milliseconds
-     */
-    public void executeTaskWithErrorHandling(final Supplier<Object> taskSupplier,
-                                             final BiConsumer<Object, Throwable> taskCompletionHandler,
-                                             final long timeOutInMillis) {
-        CompletableFuture.supplyAsync(taskSupplier)
-                .orTimeout(timeOutInMillis, MILLISECONDS)
-                .whenCompleteAsync(taskCompletionHandler);
-    }
-
-    /**
-     * Execute a task asynchronously.
-     *
-     * @param taskSupplier functional method is get() task needed to be executed asynchronously
-     * @param timeOutInMillis the time-out value in milliseconds
-     */
-    public void executeTask(final Supplier<Object> taskSupplier, final long timeOutInMillis) {
-        executeTaskWithErrorHandling(taskSupplier, (taskResult, throwable) -> handleTaskCompletion(throwable),
-                timeOutInMillis);
-    }
-
-    private void handleTaskCompletion(final Throwable throwable) {
-        if (throwable == null) {
-            log.info("Async task completed successfully.");
-        } else {
-            log.error("Async task failed. caused by : {}", throwable.toString());
-        }
-    }
-}
-
-
-
index c9dbc29..3a5748f 100644 (file)
@@ -59,7 +59,6 @@ import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle
 import org.onap.cps.ncmp.api.models.CmResourceAddress
 import org.onap.cps.ncmp.rest.controller.handlers.NcmpCachedResourceRequestHandler
 import org.onap.cps.ncmp.rest.controller.handlers.NcmpPassthroughResourceRequestHandler
-import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor
 import org.onap.cps.ncmp.rest.mapper.CmHandleStateMapper
 import org.onap.cps.ncmp.rest.mapper.DataOperationRequestMapper
 import org.onap.cps.ncmp.rest.model.DataOperationDefinition
@@ -82,6 +81,9 @@ import spock.lang.Specification
 import java.time.OffsetDateTime
 import java.time.ZoneOffset
 import java.time.format.DateTimeFormatter
+import groovy.json.JsonSlurper
+import org.springframework.http.ResponseEntity
+import reactor.core.publisher.Mono
 
 @WebMvcTest(NetworkCmProxyController)
 class NetworkCmProxyControllerSpec extends Specification {
@@ -113,17 +115,14 @@ class NetworkCmProxyControllerSpec extends Specification {
     @SpringBean
     Map<String, TrustLevel> trustLevelPerCmHandle = [:]
 
-    @SpringBean
-    CpsNcmpTaskExecutor mockCpsTaskExecutor = Mock()
-
     @SpringBean
     DeprecationHelper stubbedDeprecationHelper = Stub()
 
     @SpringBean
-    NcmpCachedResourceRequestHandler ncmpCachedResourceRequestHandler = new NcmpCachedResourceRequestHandler(mockCpsTaskExecutor, mockNetworkCmProxyDataService, mockNetworkCmProxyQueryService)
+    NcmpCachedResourceRequestHandler ncmpCachedResourceRequestHandler = new NcmpCachedResourceRequestHandler(mockNetworkCmProxyDataService, mockNetworkCmProxyQueryService)
 
     @SpringBean
-    NcmpPassthroughResourceRequestHandler ncmpPassthroughResourceRequestHandler = new NcmpPassthroughResourceRequestHandler(mockCpsTaskExecutor, mockNetworkCmProxyDataService)
+    NcmpPassthroughResourceRequestHandler ncmpPassthroughResourceRequestHandler = new NcmpPassthroughResourceRequestHandler(mockNetworkCmProxyDataService)
 
     @Value('${rest.api.ncmp-base-path}/v1')
     def ncmpBasePathV1
@@ -160,7 +159,7 @@ class NetworkCmProxyControllerSpec extends Specification {
         when: 'get data resource request is performed'
             def response = mvc.perform(get(getUrl).contentType(MediaType.APPLICATION_JSON)).andReturn().response
         then: 'the NCMP data service is called with correct parameters'
-            1 * mockNetworkCmProxyDataService.getResourceDataForCmHandle(expectedCmResourceAddress, '(a=1,b=2)', NO_TOPIC, NO_REQUEST_ID, NO_AUTH_HEADER)
+            1 * mockNetworkCmProxyDataService.getResourceDataForCmHandle(expectedCmResourceAddress, '(a=1,b=2)', NO_TOPIC, NO_REQUEST_ID, NO_AUTH_HEADER) >> Mono.just(new ResponseEntity<Object>(HttpStatus.OK))
         and: 'response status is Ok'
             assert response.status == HttpStatus.OK.value()
     }
@@ -253,13 +252,15 @@ class NetworkCmProxyControllerSpec extends Specification {
             def getUrl = "$ncmpBasePathV1/ch/testCmHandle/data/ds/ncmp-datastore:passthrough-running?resourceIdentifier=$resourceIdentifier&options=(a=1,b=2)"
         and: 'ncmp service returns json object'
             def expectedCmResourceAddress = new CmResourceAddress(PASSTHROUGH_RUNNING.datastoreName, 'testCmHandle', resourceIdentifier)
-            mockNetworkCmProxyDataService.getResourceDataForCmHandle(expectedCmResourceAddress,'(a=1,b=2)', NO_TOPIC, NO_REQUEST_ID, NO_AUTH_HEADER) >> '{valid-json}'
+            1 * mockNetworkCmProxyDataService.getResourceDataForCmHandle(expectedCmResourceAddress, '(a=1,b=2)', NO_TOPIC, NO_REQUEST_ID, NO_AUTH_HEADER)
+                    >> Mono.just(new ResponseEntity<Object>('{valid-json}', HttpStatus.OK))
         when: 'get data resource request is performed'
             def response = mvc.perform(get(getUrl).contentType(MediaType.APPLICATION_JSON)).andReturn().response
         then: 'response status is Ok'
-            response.status == HttpStatus.OK.value()
-        and: 'response contains valid object body'
-            response.getContentAsString() == '{valid-json}'
+            assert response.status == 200
+        and: 'response contains the object returned by the service'
+            def responseAsJsonObject = new JsonSlurper().parseText(response.getContentAsString())
+            assert responseAsJsonObject.body == '{valid-json}'
         where: 'tokens are used in the resource identifier parameter'
             scenario                       | resourceIdentifier
             '/'                            | 'id/with/slashes'
@@ -452,6 +453,8 @@ class NetworkCmProxyControllerSpec extends Specification {
     def 'Get resource data from DMI with valid topic i.e. async request for #scenario'() {
         given: 'resource data url'
             def getUrl = "$ncmpBasePathV1/ch/testCmHandle/data/ds/ncmp-datastore:${datastoreInUrl}?resourceIdentifier=parent/child&options=(a=1,b=2)&topic=my-topic-name"
+        and: 'the NCMP data service is called with correct parameters'
+            1 * mockNetworkCmProxyDataService.getResourceDataForCmHandle(_, '(a=1,b=2)', 'my-topic-name', _, NO_AUTH_HEADER) >> Mono.just(new ResponseEntity<Object>(HttpStatus.OK))
         when: 'get data resource request is performed'
             def response = mvc.perform(get(getUrl).contentType(MediaType.APPLICATION_JSON).accept(MediaType.APPLICATION_JSON_VALUE)).andReturn().response
         then: 'async request id is generated'
index 8835c99..00b0cb0 100644 (file)
@@ -28,16 +28,16 @@ import org.onap.cps.ncmp.api.models.DataOperationRequest
 import org.onap.cps.ncmp.api.models.CmResourceAddress
 import org.onap.cps.ncmp.rest.exceptions.OperationNotSupportedException
 import org.onap.cps.ncmp.rest.exceptions.PayloadTooLargeException
-import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor
+import org.springframework.http.HttpStatus
+import reactor.core.publisher.Mono
 import spock.lang.Specification
 import spock.util.concurrent.PollingConditions
 
 class NcmpDatastoreRequestHandlerSpec extends Specification {
 
-    def spiedCpsNcmpTaskExecutor = Spy(CpsNcmpTaskExecutor)
     def mockNetworkCmProxyDataService = Mock(NetworkCmProxyDataService)
 
-    def objectUnderTest = new NcmpPassthroughResourceRequestHandler(spiedCpsNcmpTaskExecutor, mockNetworkCmProxyDataService)
+    def objectUnderTest = new NcmpPassthroughResourceRequestHandler(mockNetworkCmProxyDataService)
 
     def NO_AUTH_HEADER = null
 
@@ -48,32 +48,26 @@ class NcmpDatastoreRequestHandlerSpec extends Specification {
     def 'Attempt to execute async get request with #scenario.'() {
         given: 'notification feature is turned on/off'
             objectUnderTest.notificationFeatureEnabled = notificationFeatureEnabled
-        and: 'a flag to track the network service call'
-            def networkServiceMethodCalled = false
         and: 'a CM resource address'
             def cmResourceAddress = new CmResourceAddress('ds', 'ch1', 'resource1')
-        and: 'the (mocked) service will use the flag to indicate if it is called'
-            mockNetworkCmProxyDataService.getResourceDataForCmHandle(cmResourceAddress, 'options', _, _, NO_AUTH_HEADER) >>
-                { networkServiceMethodCalled = true }
+        and: 'the (mocked) service is called with the correct parameters returns OK'
+            1 * mockNetworkCmProxyDataService.getResourceDataForCmHandle(cmResourceAddress, 'options', _, _, NO_AUTH_HEADER) >> Mono.just(HttpStatus.OK)
         when: 'get request is executed with topic = #topic'
-            objectUnderTest.executeRequest(cmResourceAddress, 'options', topic, false, NO_AUTH_HEADER)
-        then: 'the task is executed in an async fashion or not'
-            expectedCalls * spiedCpsNcmpTaskExecutor.executeTask(*_)
-        and: 'the service request is invoked'
-            new PollingConditions().within(1) {
-                assert networkServiceMethodCalled == true
-            }
+            def response= objectUnderTest.executeRequest(cmResourceAddress, 'options', topic, false, NO_AUTH_HEADER)
+        then: 'a successful response with/without request id is returned'
+            assert response.statusCode.value == 200
+            assert response.body instanceof Map == expectedResponseBodyIsMap
         where: 'the following parameters are used'
-            scenario                   | notificationFeatureEnabled | topic   || expectedCalls
-            'feature on, valid topic'  | true                       | 'valid' || 1
-            'feature on, no topic'     | true                       | null    || 0
-            'feature off, valid topic' | false                      | 'valid' || 0
-            'feature off, no topic'    | false                      | null    || 0
+            scenario                   | notificationFeatureEnabled | topic   || expectedCalls | expectedResponseBodyIsMap
+            'feature on, valid topic'  | true                       | 'valid' || 1             | true
+            'feature on, no topic'     | true                       | null    || 0             | false
+            'feature off, valid topic' | false                      | 'valid' || 0             | false
+            'feature off, no topic'    | false                      | null    || 0             | false
     }
 
     def 'Attempt to execute async data operation request with feature #scenario.'() {
         given: 'a extended request handler that supports bulk requests'
-           def objectUnderTest = new NcmpPassthroughResourceRequestHandler(spiedCpsNcmpTaskExecutor, mockNetworkCmProxyDataService)
+           def objectUnderTest = new NcmpPassthroughResourceRequestHandler(mockNetworkCmProxyDataService)
         and: 'notification feature is turned on/off'
             objectUnderTest.notificationFeatureEnabled = notificationFeatureEnabled
         when: 'data operation request is executed'
index ea472cd..33eb48f 100644 (file)
@@ -43,7 +43,6 @@ import org.onap.cps.ncmp.api.impl.exception.ServerNcmpException
 import org.onap.cps.ncmp.rest.controller.NcmpRestInputMapper
 import org.onap.cps.ncmp.rest.controller.handlers.NcmpCachedResourceRequestHandler
 import org.onap.cps.ncmp.rest.controller.handlers.NcmpPassthroughResourceRequestHandler
-import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor
 import org.onap.cps.ncmp.rest.mapper.CmHandleStateMapper
 import org.onap.cps.ncmp.rest.mapper.DataOperationRequestMapper
 import org.onap.cps.ncmp.rest.util.DeprecationHelper
@@ -82,9 +81,6 @@ class NetworkCmProxyRestExceptionHandlerSpec extends Specification {
     @SpringBean
     DataOperationRequestMapper dataOperationRequestMapper = Mappers.getMapper(DataOperationRequestMapper)
 
-    @SpringBean
-    CpsNcmpTaskExecutor stubbedCpsTaskExecutor = Stub()
-
     @SpringBean
     DeprecationHelper stubbedDeprecationHelper = Stub()
 
diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutorSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutorSpec.groovy
deleted file mode 100644 (file)
index 4c8c40f..0000000
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- *  ============LICENSE_START=======================================================
- *  Copyright (C) 2023-2024 Nordix Foundation
- *  ================================================================================
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *        http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- *  SPDX-License-Identifier: Apache-2.0
- *  ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.rest.executor
-
-import ch.qos.logback.classic.Level
-import ch.qos.logback.classic.Logger
-import ch.qos.logback.classic.spi.ILoggingEvent
-import ch.qos.logback.core.read.ListAppender
-import org.slf4j.LoggerFactory
-import spock.lang.Specification
-import spock.util.concurrent.PollingConditions
-
-class CpsNcmpTaskExecutorSpec extends Specification {
-
-    def objectUnderTest = new CpsNcmpTaskExecutor()
-    def logger = Spy(ListAppender<ILoggingEvent>)
-    def enoughTime = 100
-    def notEnoughTime = 10
-
-    void setup() {
-        ((Logger) LoggerFactory.getLogger(CpsNcmpTaskExecutor.class)).addAppender(logger)
-        logger.start()
-    }
-
-    void cleanup() {
-        ((Logger) LoggerFactory.getLogger(CpsNcmpTaskExecutor.class)).detachAndStopAllAppenders()
-    }
-
-    def 'Execute successful task.'() {
-        when: 'task is executed'
-            objectUnderTest.executeTask(taskSupplier(), enoughTime)
-        then: 'an event is logged with level INFO'
-            new PollingConditions().within(1) {
-                def loggingEvent = getLoggingEvent()
-                assert loggingEvent.level == Level.INFO
-            }
-        and: 'the log indicates the task completed successfully'
-            assert loggingEvent.formattedMessage == 'Async task completed successfully.'
-    }
-
-    def 'Execute failing task.'() {
-        when: 'task is executed'
-            objectUnderTest.executeTask(taskSupplierForFailingTask(), enoughTime)
-        then: 'an event is logged with level ERROR'
-            new PollingConditions().within(1) {
-                def loggingEvent = getLoggingEvent()
-                assert loggingEvent.level == Level.ERROR
-            }
-        and: 'the original error message is logged'
-            assert loggingEvent.formattedMessage.contains('original exception message')
-    }
-
-    def 'Task times out.'() {
-        when: 'task is executed without enough time to complete'
-            objectUnderTest.executeTask(taskSupplierForLongRunningTask(), notEnoughTime)
-        then: 'an event is logged with level ERROR'
-            new PollingConditions().within(1) {
-                def loggingEvent = getLoggingEvent()
-                assert loggingEvent.level == Level.ERROR
-            }
-        and: 'a timeout error message is logged'
-            assert loggingEvent.formattedMessage.contains('java.util.concurrent.TimeoutException')
-    }
-
-    def taskSupplier() {
-        return () -> 'hello world'
-    }
-
-    def taskSupplierForFailingTask() {
-        return () -> { throw new RuntimeException('original exception message') }
-    }
-
-    def taskSupplierForLongRunningTask() {
-        return () -> { sleep(enoughTime) }
-    }
-
-    def getLoggingEvent() {
-        return logger.list[0]
-    }
-
-}
index 20545d7..73c8d96 100644 (file)
@@ -37,6 +37,7 @@ import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle;
 import org.onap.cps.spi.FetchDescendantsOption;
 import org.onap.cps.spi.model.ModuleDefinition;
 import org.onap.cps.spi.model.ModuleReference;
+import reactor.core.publisher.Mono;
 
 /*
  * Datastore interface for handling CPS data.
@@ -52,20 +53,29 @@ public interface NetworkCmProxyDataService {
     DmiPluginRegistrationResponse updateDmiRegistrationAndSyncModule(DmiPluginRegistration dmiPluginRegistration);
 
     /**
-     * Get resource data for given data store using dmi.
-     *
-     * @param cmResourceAddress   target datastore, cm handle and resource identifier
-     * @param optionsParamInQuery options query
-     * @param topicParamInQuery   topic name for (triggering) async responses
-     * @param requestId           unique requestId for async request
-     * @param authorization       contents of Authorization header, or null if not present
-     * @return {@code Object} resource data
-     */
-    Object getResourceDataForCmHandle(CmResourceAddress cmResourceAddress,
-                                      String optionsParamInQuery,
-                                      String topicParamInQuery,
-                                      String requestId,
-                                      String authorization);
+     * Fetches resource data for a given data store using DMI (Data Management Interface).
+     * This method retrieves data based on the provided CmResourceAddress and additional query parameters.
+     * It supports asynchronous processing and handles authorization if required.
+     *
+     * @param cmResourceAddress   The target data store, including the CM handle and resource identifier.
+     *                            This parameter must not be null.
+     * @param optionsParamInQuery Additional query parameters that may influence the data retrieval process,
+     *                            such as filters or limits. This parameter can be null.
+     * @param topicParamInQuery   The topic name for triggering asynchronous responses. If specified,
+     *                            the response will be sent to this topic. This parameter can be null.
+     * @param requestId           A unique identifier for the request, used for tracking and correlating
+     *                            asynchronous operations. This parameter must not be null.
+     * @param authorization       The contents of the Authorization header. This parameter can be null
+     *                            if authorization is not required.
+     * @return {@code Mono<Object>} A reactive Mono that emits the resource data on successful retrieval
+     *     or an error signal if the operation fails. The Mono represents a single asynchronous
+     *     computation result.
+     */
+    Mono<Object> getResourceDataForCmHandle(CmResourceAddress cmResourceAddress,
+                                            String optionsParamInQuery,
+                                            String topicParamInQuery,
+                                            String requestId,
+                                            String authorization);
 
     /**
      * Get resource data for operational.
index 340806b..39d4972 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2022 Nordix Foundation
+ *  Copyright (C) 2022-2024 Nordix Foundation
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -20,7 +20,9 @@
 
 package org.onap.cps.ncmp.api;
 
+import java.util.Collection;
 import org.onap.cps.spi.FetchDescendantsOption;
+import org.onap.cps.spi.model.DataNode;
 
 /*
  * Datastore interface for handling cached CPS data query requests.
@@ -28,14 +30,21 @@ import org.onap.cps.spi.FetchDescendantsOption;
 public interface NetworkCmProxyQueryService {
 
     /**
-     * Get resource data for operational.
+     * Fetches operational resource data based on the provided CM handle identifier and CPS path.
+     * This method retrieves data nodes from the specified path within the context of a given CM handle.
+     * It supports options for fetching descendant nodes.
      *
-     * @param cmHandleId cm handle identifier
-     * @param cpsPath cps path
-     * @Link FetchDescendantsOption fetch descendants option
-     * @return {@code Object} resource data
+     * @param cmHandleId             The CM handle identifier, which uniquely identifies the CM handle.
+     *                               This parameter must not be null.
+     * @param cpsPath                The CPS (Control Plane Service) path specifying the location of the
+     *                               resource data within the CM handle. This parameter must not be null.
+     * @param fetchDescendantsOption The option specifying whether to fetch descendant nodes along with the specified
+     *                               resource data.
+     * @return {@code Collection<DataNode>} A collection of DataNode objects representing the resource data
+     *     retrieved from the specified path. The collection may include descendant nodes based on the
+     *     fetchDescendantsOption.
      */
-    Object queryResourceDataOperational(String cmHandleId,
-                                      String cpsPath,
-                                      FetchDescendantsOption fetchDescendantsOption);
+    Collection<DataNode> queryResourceDataOperational(String cmHandleId,
+                                                      String cpsPath,
+                                                      FetchDescendantsOption fetchDescendantsOption);
 }
index 17b3d7a..7540509 100755 (executable)
@@ -87,8 +87,8 @@ import org.onap.cps.spi.model.ModuleDefinition;
 import org.onap.cps.spi.model.ModuleReference;
 import org.onap.cps.utils.JsonObjectMapper;
 import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Service;
+import reactor.core.publisher.Mono;
 
 @Slf4j
 @Service
@@ -133,17 +133,14 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService
     }
 
     @Override
-    public Object getResourceDataForCmHandle(final CmResourceAddress cmResourceAddress,
+    public Mono<Object> getResourceDataForCmHandle(final CmResourceAddress cmResourceAddress,
                                              final String optionsParamInQuery,
                                              final String topicParamInQuery,
                                              final String requestId,
                                              final String authorization) {
-        final ResponseEntity<?> responseEntity = dmiDataOperations.getResourceDataFromDmi(cmResourceAddress,
-            optionsParamInQuery,
-            topicParamInQuery,
-            requestId,
-            authorization);
-        return responseEntity.getBody();
+        return dmiDataOperations.getResourceDataFromDmi(cmResourceAddress, optionsParamInQuery, topicParamInQuery,
+                        requestId, authorization)
+                .flatMap(responseEntity -> Mono.justOrEmpty(responseEntity.getBody()));
     }
 
     @Override
index d8353f3..8d3b6ed 100644 (file)
@@ -22,11 +22,13 @@ package org.onap.cps.ncmp.api.impl;
 
 import static org.onap.cps.ncmp.api.impl.ncmppersistence.NcmpPersistence.NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME;
 
+import java.util.Collection;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.api.CpsQueryService;
 import org.onap.cps.ncmp.api.NetworkCmProxyQueryService;
 import org.onap.cps.spi.FetchDescendantsOption;
+import org.onap.cps.spi.model.DataNode;
 import org.springframework.stereotype.Service;
 
 @Slf4j
@@ -37,9 +39,9 @@ public class NetworkCmProxyQueryServiceImpl implements NetworkCmProxyQueryServic
     private final CpsQueryService cpsQueryService;
 
     @Override
-    public Object queryResourceDataOperational(final String cmHandleId,
-                                               final String cpsPath,
-                                               final FetchDescendantsOption fetchDescendantsOption) {
+    public Collection<DataNode> queryResourceDataOperational(final String cmHandleId,
+                                                             final String cpsPath,
+                                                             final FetchDescendantsOption fetchDescendantsOption) {
         return cpsQueryService.queryDataNodes(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId, cpsPath,
             fetchDescendantsOption);
     }
index 786160a..3db8455 100644 (file)
@@ -66,20 +66,20 @@ public class DmiDataOperations {
     private final DmiRestClient dmiRestClient;
 
     /**
-     * This method fetches the resource data from operational data store for given cm handle
-     * identifier on given resource using dmi client.
+     * This method fetches the resource data from the operational data store for a given CM handle
+     * identifier on the specified resource using the DMI client.
      *
-     * @param cmResourceAddress   target datastore, cm handle and resource identifier
-     * @param optionsParamInQuery options query
-     * @param topicParamInQuery   topic name for (triggering) async responses
-     * @param requestId           requestId for async responses
-     * @param authorization       contents of Authorization header, or null if not present
-     * @return {@code ResponseEntity} response entity
+     * @param cmResourceAddress   Target datastore, CM handle, and resource identifier.
+     * @param optionsParamInQuery Options query string.
+     * @param topicParamInQuery   Topic name for triggering asynchronous responses.
+     * @param requestId           Request ID for asynchronous responses.
+     * @param authorization       Contents of the Authorization header, or null if not present.
+     * @return {@code Mono<ResponseEntity<Object>>} A reactive type representing the response entity.
      */
     @Timed(value = "cps.ncmp.dmi.get",
             description = "Time taken to fetch the resource data from operational data store for given cm handle "
                     + "identifier on given resource using dmi client")
-    public ResponseEntity<Object> getResourceDataFromDmi(final CmResourceAddress cmResourceAddress,
+    public Mono<ResponseEntity<Object>> getResourceDataFromDmi(final CmResourceAddress cmResourceAddress,
                                                          final String optionsParamInQuery,
                                                          final String topicParamInQuery,
                                                          final String requestId,
@@ -90,7 +90,7 @@ public class DmiDataOperations {
         final String jsonRequestBody = getDmiRequestBody(READ, requestId, null, null, yangModelCmHandle);
         final String dmiUrl = getDmiResourceDataUrl(cmResourceAddress.datastoreName(), yangModelCmHandle,
                 cmResourceAddress.resourceIdentifier(), optionsParamInQuery, topicParamInQuery);
-        return dmiRestClient.postOperationWithJsonData(DATA, dmiUrl, jsonRequestBody, READ, authorization);
+        return dmiRestClient.postOperationWithJsonDataAsync(DATA, dmiUrl, jsonRequestBody, READ, authorization);
     }
 
     /**
index 4d0af6f..d91c79d 100644 (file)
@@ -23,6 +23,8 @@
 
 package org.onap.cps.ncmp.api.impl
 
+import reactor.core.publisher.Mono
+
 import static org.onap.cps.ncmp.api.impl.ncmppersistence.NcmpPersistence.NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME
 import static org.onap.cps.ncmp.api.impl.ncmppersistence.NcmpPersistence.NCMP_DATASPACE_NAME
 import static org.onap.cps.ncmp.api.impl.ncmppersistence.NcmpPersistence.NCMP_DMI_REGISTRY_ANCHOR
@@ -120,16 +122,16 @@ class NetworkCmProxyDataServiceImplSpec extends Specification {
                 >> { new ResponseEntity<>(HttpStatus.CREATED) }
     }
 
-    def 'Get resource data for from DMI.'() {
+    def 'Get resource data from DMI.'() {
         given: 'cpsDataService returns valid data node'
             mockDataNode()
         and: 'some cm resource address'
-            def cmResourceAddress = new CmResourceAddress('some datastore','some CM Handle', 'some resource Id')
+            def cmResourceAddress = new CmResourceAddress('some datastore', 'some CM Handle', 'some resource Id')
         and: 'get resource data from DMI is called'
             mockDmiDataOperations.getResourceDataFromDmi(cmResourceAddress, OPTIONS_PARAM, NO_TOPIC, NO_REQUEST_ID, NO_AUTH_HEADER) >>
-                    new ResponseEntity<>('dmi-response', HttpStatus.OK)
+                    Mono.just(new ResponseEntity<>('dmi-response', HttpStatus.OK))
         when: 'get resource data operational for the given cm resource address is called'
-            def response = objectUnderTest.getResourceDataForCmHandle(cmResourceAddress, OPTIONS_PARAM, NO_TOPIC, NO_REQUEST_ID, NO_AUTH_HEADER)
+            def response = objectUnderTest.getResourceDataForCmHandle(cmResourceAddress, OPTIONS_PARAM, NO_TOPIC, NO_REQUEST_ID, NO_AUTH_HEADER).block()
         then: 'DMI returns a json response'
             assert response == 'dmi-response'
     }
index a861809..b286e9f 100644 (file)
@@ -21,8 +21,6 @@
 
 package org.onap.cps.ncmp.api.impl.operations
 
-import reactor.core.publisher.Mono
-
 import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent
 import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_OPERATIONAL
 import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_RUNNING
@@ -50,6 +48,7 @@ import org.springframework.http.HttpStatus
 import org.springframework.http.ResponseEntity
 import org.springframework.test.context.ContextConfiguration
 import spock.lang.Shared
+import reactor.core.publisher.Mono
 
 @SpringBootTest
 @ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext, DmiProperties, DmiDataOperations])
@@ -76,15 +75,16 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec {
         given: 'a cm handle for #cmHandleId'
             mockYangModelCmHandleRetrieval(dmiProperties)
         and: 'a positive response from DMI service when it is called with the expected parameters'
-            def responseFromDmi = new ResponseEntity<Object>(HttpStatus.OK)
+            def responseFromDmi = Mono.just(new ResponseEntity<Object>('{some-key:some-value}', HttpStatus.OK))
             def expectedUrl = "${dmiServiceBaseUrl}${expectedDatastoreInUrl}?resourceIdentifier=${resourceIdentifier}${expectedOptionsInUrl}"
-            def expectedJson = '{"operation":"read","cmHandleProperties":'  + expectedProperties + ',"moduleSetTag":""}'
-            mockDmiRestClient.postOperationWithJsonData(DATA, expectedUrl, expectedJson, READ, NO_AUTH_HEADER) >> responseFromDmi
+            def expectedJson = '{"operation":"read","cmHandleProperties":' + expectedProperties + ',"moduleSetTag":""}'
+            mockDmiRestClient.postOperationWithJsonDataAsync(DATA, expectedUrl, expectedJson, READ, NO_AUTH_HEADER) >> responseFromDmi
         when: 'get resource data is invoked'
             def cmResourceAddress = new CmResourceAddress(dataStore.datastoreName, cmHandleId, resourceIdentifier)
-            def result = objectUnderTest.getResourceDataFromDmi(cmResourceAddress, options, NO_TOPIC, NO_REQUEST_ID, NO_AUTH_HEADER)
+            def result = objectUnderTest.getResourceDataFromDmi(cmResourceAddress, options, NO_TOPIC, NO_REQUEST_ID, NO_AUTH_HEADER).block()
         then: 'the result is the response from the DMI service'
-            assert result == responseFromDmi
+            assert result.body == '{some-key:some-value}'
+            assert result.statusCode.'2xxSuccessful'
         where: 'the following parameters are used'
             scenario                               | dmiProperties               | dataStore               | options       || expectedProperties | expectedDatastoreInUrl    | expectedOptionsInUrl
             'without properties'                   | []                          | PASSTHROUGH_OPERATIONAL | OPTIONS_PARAM || '{}'               | 'passthrough-operational' | '&options=(a%3D1,b%3D2)'