#1: Used async version of web client for batch read operation 52/138152/7
authorsourabh_sourabh <sourabh.sourabh@est.tech>
Fri, 7 Jun 2024 18:10:45 +0000 (19:10 +0100)
committersourabh_sourabh <sourabh.sourabh@est.tech>
Thu, 13 Jun 2024 09:34:42 +0000 (10:34 +0100)
  - Exposed async version of post method into dmi rest client.
  - Code change is done to use async web client for batch data
   operation.
  - Use of CpsNcmpTaskExecutor code is removed.

Issue-ID: CPS-2174
Change-Id: I7840fd8c6d9debe42e50c860f9cf39d64274df72
Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech>
cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java
cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy
cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy

index be5b93c..d716877 100644 (file)
@@ -25,13 +25,11 @@ import static org.onap.cps.ncmp.api.impl.operations.OperationType.READ;
 
 import java.util.Map;
 import java.util.UUID;
-import java.util.function.BiConsumer;
 import java.util.function.Supplier;
 import org.onap.cps.ncmp.api.NetworkCmProxyDataService;
 import org.onap.cps.ncmp.api.impl.exception.InvalidDatastoreException;
 import org.onap.cps.ncmp.api.impl.operations.DatastoreType;
 import org.onap.cps.ncmp.api.impl.operations.OperationType;
-import org.onap.cps.ncmp.api.impl.utils.data.operation.ResourceDataOperationRequestUtils;
 import org.onap.cps.ncmp.api.models.CmResourceAddress;
 import org.onap.cps.ncmp.api.models.DataOperationRequest;
 import org.onap.cps.ncmp.rest.exceptions.OperationNotSupportedException;
@@ -45,11 +43,7 @@ import org.springframework.stereotype.Component;
 public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestHandler {
 
     private final NetworkCmProxyDataService networkCmProxyDataService;
-
-    private static final Object noReturn = null;
-
     private static final int MAXIMUM_CM_HANDLES_PER_OPERATION = 200;
-
     private static final String PAYLOAD_TOO_LARGE_TEMPLATE = "Operation '%s' affects too many (%d) cm handles";
 
     /**
@@ -101,10 +95,8 @@ public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestH
             final DataOperationRequest dataOperationRequest,
             final String authorization) {
         final String requestId = UUID.randomUUID().toString();
-        cpsNcmpTaskExecutor.executeTaskWithErrorHandling(
-            getTaskSupplierForDataOperationRequest(topicParamInQuery, dataOperationRequest, requestId, authorization),
-            getTaskCompletionHandlerForDataOperationRequest(topicParamInQuery, dataOperationRequest, requestId),
-            timeOutInMilliSeconds);
+        networkCmProxyDataService.executeDataOperationForCmHandles(topicParamInQuery, dataOperationRequest, requestId,
+                authorization);
         return ResponseEntity.ok(Map.of("requestId", requestId));
     }
 
@@ -114,41 +106,18 @@ public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestH
         dataOperationRequest.getDataOperationDefinitions().forEach(dataOperationDetail -> {
             if (OperationType.fromOperationName(dataOperationDetail.getOperation()) != READ) {
                 throw new OperationNotSupportedException(
-                    dataOperationDetail.getOperation() + " operation not yet supported");
+                        dataOperationDetail.getOperation() + " operation not yet supported");
             }
             if (DatastoreType.fromDatastoreName(dataOperationDetail.getDatastore()) == OPERATIONAL) {
                 throw new InvalidDatastoreException(dataOperationDetail.getDatastore()
-                    + " datastore is not supported");
+                        + " datastore is not supported");
             }
             if (dataOperationDetail.getCmHandleIds().size() > MAXIMUM_CM_HANDLES_PER_OPERATION) {
                 final String errorMessage = String.format(PAYLOAD_TOO_LARGE_TEMPLATE,
-                    dataOperationDetail.getOperationId(),
-                    dataOperationDetail.getCmHandleIds().size());
+                        dataOperationDetail.getOperationId(),
+                        dataOperationDetail.getCmHandleIds().size());
                 throw new PayloadTooLargeException(errorMessage);
             }
         });
     }
-
-    private Supplier<Object> getTaskSupplierForDataOperationRequest(final String topicParamInQuery,
-                                                                    final DataOperationRequest dataOperationRequest,
-                                                                    final String requestId,
-                                                                    final String authorization) {
-        return () -> {
-            networkCmProxyDataService.executeDataOperationForCmHandles(topicParamInQuery,
-                dataOperationRequest,
-                requestId,
-                authorization);
-            return noReturn;
-        };
-    }
-
-    private static BiConsumer<Object, Throwable> getTaskCompletionHandlerForDataOperationRequest(
-            final String topicParamInQuery,
-            final DataOperationRequest dataOperationRequest,
-            final String requestId) {
-        return (result, throwable) ->
-                ResourceDataOperationRequestUtils.handleAsyncTaskCompletionForDataOperationsRequest(topicParamInQuery,
-                        requestId, dataOperationRequest, throwable);
-    }
-
 }
index 2d7e9b2..c9dbc29 100644 (file)
 
 package org.onap.cps.ncmp.rest.controller
 
+import static org.onap.cps.ncmp.api.impl.inventory.CompositeState.DataStores
+import static org.onap.cps.ncmp.api.impl.inventory.CompositeState.Operational
+import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.OPERATIONAL
+import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_OPERATIONAL
+import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_RUNNING
+import static org.onap.cps.ncmp.api.impl.operations.OperationType.CREATE
+import static org.onap.cps.ncmp.api.impl.operations.OperationType.DELETE
+import static org.onap.cps.ncmp.api.impl.operations.OperationType.PATCH
+import static org.onap.cps.ncmp.api.impl.operations.OperationType.UPDATE
+import static org.onap.cps.spi.FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS
+import static org.onap.cps.spi.FetchDescendantsOption.OMIT_DESCENDANTS
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.patch
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put
+
 import ch.qos.logback.classic.Level
 import ch.qos.logback.classic.Logger
 import ch.qos.logback.classic.spi.ILoggingEvent
@@ -62,28 +79,10 @@ import org.springframework.http.MediaType
 import org.springframework.test.web.servlet.MockMvc
 import spock.lang.Shared
 import spock.lang.Specification
-
 import java.time.OffsetDateTime
 import java.time.ZoneOffset
 import java.time.format.DateTimeFormatter
 
-import static org.onap.cps.ncmp.api.impl.inventory.CompositeState.DataStores
-import static org.onap.cps.ncmp.api.impl.inventory.CompositeState.Operational
-import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.OPERATIONAL
-import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_OPERATIONAL
-import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_RUNNING
-import static org.onap.cps.ncmp.api.impl.operations.OperationType.CREATE
-import static org.onap.cps.ncmp.api.impl.operations.OperationType.DELETE
-import static org.onap.cps.ncmp.api.impl.operations.OperationType.PATCH
-import static org.onap.cps.ncmp.api.impl.operations.OperationType.UPDATE
-import static org.onap.cps.spi.FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS
-import static org.onap.cps.spi.FetchDescendantsOption.OMIT_DESCENDANTS
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.patch
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put
-
 @WebMvcTest(NetworkCmProxyController)
 class NetworkCmProxyControllerSpec extends Specification {
 
@@ -197,8 +196,8 @@ class NetworkCmProxyControllerSpec extends Specification {
             assert response.status == HttpStatus.OK.value()
         and: 'async request id is generated'
             assert response.contentAsString.contains('requestId')
-        then: 'the request is handled asynchronously'
-            1 * mockCpsTaskExecutor.executeTaskWithErrorHandling(*_)
+        then: 'the request for (async) data operation invoked once'
+            1 * mockNetworkCmProxyDataService.executeDataOperationForCmHandles('my-topic-name', _, _, null)
         where: 'the following data stores are used'
             datastore << [PASSTHROUGH_RUNNING, PASSTHROUGH_OPERATIONAL]
     }
index 641715d..8835c99 100644 (file)
@@ -79,7 +79,7 @@ class NcmpDatastoreRequestHandlerSpec extends Specification {
         when: 'data operation request is executed'
             objectUnderTest.executeRequest('someTopic', new DataOperationRequest(), NO_AUTH_HEADER)
         then: 'the task is executed in an async fashion or not'
-            expectedCalls * spiedCpsNcmpTaskExecutor.executeTaskWithErrorHandling(*_)
+            expectedCalls * mockNetworkCmProxyDataService.executeDataOperationForCmHandles('someTopic', _, _, null)
         where: 'the following parameters are used'
             scenario | notificationFeatureEnabled || expectedCalls
             'on'     | true                       || 1
@@ -99,10 +99,11 @@ class NcmpDatastoreRequestHandlerSpec extends Specification {
                 networkServiceMethodCalled = true
             }
         when: 'data operation request is executed'
-            objectUnderTest.executeRequest('myTopic', dataOperationRequest, NO_AUTH_HEADER)
-        then: 'the task is executed in an async fashion'
-            1 * spiedCpsNcmpTaskExecutor.executeTaskWithErrorHandling(*_)
-        and: 'the network service is invoked'
+            def response = objectUnderTest.executeRequest('myTopic', dataOperationRequest, NO_AUTH_HEADER)
+        and: 'a successful response with request id is returned'
+            assert response.statusCode.value == 200
+            assert response.body.requestId != null
+        then: 'the network service is invoked'
             new PollingConditions().within(1) {
                 assert networkServiceMethodCalled == true
             }
index 6aa0976..17b3d7a 100755 (executable)
@@ -160,8 +160,7 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService
                                                  final DataOperationRequest dataOperationRequest,
                                                  final String requestId,
                                                  final String authorization) {
-        dmiDataOperations.requestResourceDataFromDmi(topicParamInQuery, dataOperationRequest, requestId,
-                authorization);
+        dmiDataOperations.requestResourceDataFromDmi(topicParamInQuery, dataOperationRequest, requestId, authorization);
     }
 
     @Override
index 7878c5d..5811cf9 100644 (file)
@@ -50,6 +50,7 @@ import org.springframework.web.reactive.function.BodyInserters;
 import org.springframework.web.reactive.function.client.WebClient;
 import org.springframework.web.reactive.function.client.WebClientRequestException;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Mono;
 
 @Component
 @RequiredArgsConstructor
@@ -85,22 +86,40 @@ public class DmiRestClient {
                                                             final String requestBodyAsJsonString,
                                                             final OperationType operationType,
                                                             final String authorization) {
-        final WebClient webClient = requiredDmiService.equals(RequiredDmiService.DATA)
-                ? dataServicesWebClient : modelServicesWebClient;
         try {
-            return webClient.post()
-                    .uri(toUri(dmiUrl))
-                    .headers(httpHeaders -> configureHttpHeaders(httpHeaders, authorization))
-                    .body(BodyInserters.fromValue(requestBodyAsJsonString))
-                    .retrieve()
-                    .toEntity(Object.class)
-                    .onErrorMap(httpError -> handleDmiClientException(httpError, operationType.getOperationName()))
-                    .block();
+            return postOperationWithJsonDataAsync(requiredDmiService, dmiUrl, requestBodyAsJsonString, operationType,
+                    authorization).block();
         } catch (final HttpServerErrorException e) {
             throw handleDmiClientException(e, operationType.getOperationName());
         }
     }
 
+    /**
+     * Asynchronously performs an HTTP POST operation with the given JSON data.
+     *
+     * @param requiredDmiService      The service object required for retrieving or configuring the WebClient.
+     * @param dmiUrl                  The URL to which the POST request is sent.
+     * @param requestBodyAsJsonString The JSON string that will be sent as the request body.
+     * @param operationType           An enumeration or object that holds information about the type of operation
+     *                                being performed.
+     * @param authorization           The authorization token to be added to the request headers.
+     * @return A Mono emitting the response entity containing the server's response.
+     */
+    public Mono<ResponseEntity<Object>> postOperationWithJsonDataAsync(final RequiredDmiService requiredDmiService,
+                                                                       final String dmiUrl,
+                                                                       final String requestBodyAsJsonString,
+                                                                       final OperationType operationType,
+                                                                       final String authorization) {
+        final WebClient webClient = getWebClient(requiredDmiService);
+        return webClient.post()
+                .uri(toUri(dmiUrl))
+                .headers(httpHeaders -> configureHttpHeaders(httpHeaders, authorization))
+                .body(BodyInserters.fromValue(requestBodyAsJsonString))
+                .retrieve()
+                .toEntity(Object.class)
+                .onErrorMap(throwable -> handleDmiClientException(throwable, operationType.getOperationName()));
+    }
+
     /**
      * Get DMI plugin health status.
      *
@@ -123,6 +142,10 @@ public class DmiRestClient {
         }
     }
 
+    private WebClient getWebClient(final RequiredDmiService requiredDmiService) {
+        return requiredDmiService.equals(RequiredDmiService.DATA) ? dataServicesWebClient : modelServicesWebClient;
+    }
+
     private void configureHttpHeaders(final HttpHeaders httpHeaders, final String authorization) {
         if (dmiProperties.isDmiBasicAuthEnabled()) {
             httpHeaders.setBasicAuth(dmiProperties.getAuthUsername(), dmiProperties.getAuthPassword());
index 2c0b702..08885a9 100644 (file)
@@ -106,13 +106,11 @@ public class DmiWebClientConfiguration {
         final ConnectionProvider dmiWebClientConnectionProvider = ConnectionProvider.create(connectionProviderName,
                 maximumConnectionsTotal);
 
-        final HttpClient httpClient = HttpClient.create(dmiWebClientConnectionProvider)
+        return HttpClient.create(dmiWebClientConnectionProvider)
                 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeoutInSeconds * 1000)
                 .doOnConnected(connection -> connection.addHandlerLast(new ReadTimeoutHandler(readTimeoutInSeconds,
                         TimeUnit.SECONDS)).addHandlerLast(new WriteTimeoutHandler(writeTimeoutInSeconds,
                         TimeUnit.SECONDS)));
-        httpClient.warmup().block();
-        return httpClient;
     }
 
     private static WebClient buildAndGetWebClient(final HttpClient httpClient,
index 9788555..786160a 100644 (file)
@@ -32,7 +32,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.ncmp.api.NcmpResponseStatus;
 import org.onap.cps.ncmp.api.impl.client.DmiRestClient;
 import org.onap.cps.ncmp.api.impl.config.DmiProperties;
@@ -51,13 +50,14 @@ import org.springframework.stereotype.Service;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 import org.springframework.web.util.UriComponentsBuilder;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 /**
  * Operations class for DMI data.
  */
 @RequiredArgsConstructor
 @Service
-@Slf4j
 public class DmiDataOperations {
 
     private final InventoryPersistence inventoryPersistence;
@@ -231,28 +231,46 @@ public class DmiDataOperations {
                                                                          groupsOutPerDmiServiceName,
                                                                  final String authorization) {
 
-        groupsOutPerDmiServiceName.forEach((dmiServiceName, dmiDataOperationRequestBodies) -> {
-            final String dmiUrl = DmiServiceUrlBuilder.newInstance()
+        Flux.fromIterable(groupsOutPerDmiServiceName.entrySet())
+                .flatMap(dmiDataOperationsByDmiServiceName -> {
+                    final String dmiServiceName = dmiDataOperationsByDmiServiceName.getKey();
+                    final String dmiUrl = buildDmiServiceUrl(dmiServiceName, requestId, topicParamInQuery);
+                    final List<DmiDataOperation> dmiDataOperationRequestBodies
+                            = dmiDataOperationsByDmiServiceName.getValue();
+                    return sendDataOperationRequestToDmiService(dmiUrl, dmiDataOperationRequestBodies, authorization);
+                })
+                .subscribe();
+    }
+
+    private String buildDmiServiceUrl(final String dmiServiceName, final String requestId,
+                                      final String topicParamInQuery) {
+        return DmiServiceUrlBuilder.newInstance()
                 .pathSegment("data")
                 .queryParameter("requestId", requestId)
                 .queryParameter("topic", topicParamInQuery)
                 .build(dmiServiceName, dmiProperties.getDmiBasePath());
-            sendDataOperationRequestToDmiService(dmiUrl, dmiDataOperationRequestBodies, authorization);
-        });
     }
 
-    private void sendDataOperationRequestToDmiService(final String dmiUrl,
-                                                      final List<DmiDataOperation> dmiDataOperationRequestBodies,
-                                                      final String authorization) {
+    private Mono<Void> sendDataOperationRequestToDmiService(final String dmiUrl,
+                                                            final List<DmiDataOperation> dmiDataOperationRequestBodies,
+                                                            final String authorization) {
+        final String dmiDataOperationRequestAsJsonString
+                = createDmiDataOperationRequestAsJsonString(dmiDataOperationRequestBodies);
+        return dmiRestClient.postOperationWithJsonDataAsync(DATA, dmiUrl, dmiDataOperationRequestAsJsonString,
+                        READ, authorization)
+                .then()
+                .onErrorResume(DmiClientRequestException.class, dmiClientRequestException -> {
+                    handleTaskCompletionException(dmiClientRequestException, dmiUrl, dmiDataOperationRequestBodies);
+                    return Mono.empty();
+                });
+    }
+
+    private String createDmiDataOperationRequestAsJsonString(
+            final List<DmiDataOperation> dmiDataOperationRequestBodies) {
         final DmiDataOperationRequest dmiDataOperationRequest = DmiDataOperationRequest.builder()
-                .operations(dmiDataOperationRequestBodies).build();
-        final String dmiDataOperationRequestAsJsonString = jsonObjectMapper.asJsonString(dmiDataOperationRequest);
-        try {
-            dmiRestClient.postOperationWithJsonData(DATA, dmiUrl, dmiDataOperationRequestAsJsonString, READ,
-                    authorization);
-        } catch (final DmiClientRequestException e) {
-            handleTaskCompletionException(e, dmiUrl, dmiDataOperationRequestBodies);
-        }
+                .operations(dmiDataOperationRequestBodies)
+                .build();
+        return jsonObjectMapper.asJsonString(dmiDataOperationRequest);
     }
 
     private void handleTaskCompletionException(final DmiClientRequestException dmiClientRequestException,
@@ -275,4 +293,4 @@ public class DmiDataOperations {
         ResourceDataOperationRequestUtils.publishErrorMessageToClientTopic(topicName, requestId,
                 cmHandleIdsPerResponseCodesPerOperation);
     }
-}
+}
\ No newline at end of file
index dc4108c..407fcf0 100644 (file)
@@ -31,7 +31,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
@@ -49,8 +48,8 @@ import org.onap.cps.ncmp.api.models.DataOperationRequest;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
-@Slf4j
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
+@Slf4j
 public class ResourceDataOperationRequestUtils {
 
     private static final String UNKNOWN_SERVICE_NAME = null;
@@ -125,58 +124,6 @@ public class ResourceDataOperationRequestUtils {
         return moduleSetTagPerCmHandle;
     }
 
-    /**
-     * Handles the async task completion for an entire data, publishing errors to client topic on task failure.
-     *
-     * @param topicParamInQuery      client given topic
-     * @param requestId              unique identifier per request
-     * @param dataOperationRequest   incoming data operation request details
-     * @param throwable              error cause, or null if task completed with no exception
-     */
-    public static void handleAsyncTaskCompletionForDataOperationsRequest(
-            final String topicParamInQuery,
-            final String requestId,
-            final DataOperationRequest dataOperationRequest,
-            final Throwable throwable) {
-        if (throwable == null) {
-            log.info("Data operations request {} completed.", requestId);
-        } else if (throwable instanceof TimeoutException) {
-            log.error("Data operations request {} timed out.", requestId);
-            ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery,
-                    requestId, dataOperationRequest, NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING);
-        } else {
-            log.error("Data operations request {} failed.", requestId, throwable);
-            ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery,
-                    requestId, dataOperationRequest, NcmpResponseStatus.UNKNOWN_ERROR);
-        }
-    }
-
-    /**
-     * Creates data operation cloud event for when the entire data operation fails and publishes it to client topic.
-     *
-     * @param topicParamInQuery      client given topic
-     * @param requestId              unique identifier per request
-     * @param dataOperationRequestIn incoming data operation request details
-     * @param ncmpResponseStatus     response code to be sent for all cm handle ids in all operations
-     */
-    private static void publishErrorMessageToClientTopicForEntireOperation(
-            final String topicParamInQuery,
-            final String requestId,
-            final DataOperationRequest dataOperationRequestIn,
-            final NcmpResponseStatus ncmpResponseStatus) {
-
-        final MultiValueMap<DmiDataOperation, Map<NcmpResponseStatus, List<String>>>
-                cmHandleIdsPerResponseCodesPerOperation = new LinkedMultiValueMap<>();
-
-        for (final DataOperationDefinition dataOperationDefinitionIn :
-                dataOperationRequestIn.getDataOperationDefinitions()) {
-            cmHandleIdsPerResponseCodesPerOperation.add(
-                    DmiDataOperation.buildDmiDataOperationRequestBodyWithoutCmHandles(dataOperationDefinitionIn),
-                    Map.of(ncmpResponseStatus, dataOperationDefinitionIn.getCmHandleIds()));
-        }
-        publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation);
-    }
-
     /**
      * Creates data operation cloud event and publish it to client topic.
      *
@@ -193,6 +140,8 @@ public class ResourceDataOperationRequestUtils {
             final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic,
                     requestId, cmHandleIdsPerResponseCodesPerOperation);
             final EventsPublisher<CloudEvent> eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class);
+            log.warn("publishing error message to client topic: {} ,requestId: {}, data operation cloud event id: {}",
+                    clientTopic, requestId, dataOperationCloudEvent.getId());
             eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
         }
     }
index ad3f85c..a861809 100644 (file)
@@ -21,6 +21,8 @@
 
 package org.onap.cps.ncmp.api.impl.operations
 
+import reactor.core.publisher.Mono
+
 import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent
 import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_OPERATIONAL
 import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_RUNNING
@@ -100,14 +102,14 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec {
             def dataOperationRequest = spiedJsonObjectMapper.convertJsonString(dataOperationBatchRequestJsonData, DataOperationRequest.class)
             dataOperationRequest.dataOperationDefinitions[0].cmHandleIds = [cmHandleId]
         and: 'a positive response from DMI service when it is called with valid request parameters'
-            def responseFromDmi = new ResponseEntity<Object>(HttpStatus.ACCEPTED)
+            def responseFromDmi = Mono.just(new ResponseEntity<Object>(HttpStatus.ACCEPTED))
             def expectedDmiBatchResourceDataUrl = "someServiceName/dmi/v1/data?requestId=requestId&topic=my-topic-name"
             def expectedBatchRequestAsJson = '{"operations":[{"operation":"read","operationId":"operational-14","datastore":"ncmp-datastore:passthrough-operational","options":"some option","resourceIdentifier":"some resource identifier","cmHandles":[{"id":"some-cm-handle","moduleSetTag":"","cmHandleProperties":{"prop1":"val1"}}]}]}'
-            mockDmiRestClient.postOperationWithJsonData(DATA, expectedDmiBatchResourceDataUrl, _, READ, NO_AUTH_HEADER) >> responseFromDmi
-        when: 'get resource data for group of cm handles are invoked'
+            mockDmiRestClient.postOperationWithJsonDataAsync(DATA, expectedDmiBatchResourceDataUrl, _, READ, NO_AUTH_HEADER) >> responseFromDmi
+        when: 'get resource data for group of cm handles is invoked'
             objectUnderTest.requestResourceDataFromDmi('my-topic-name', dataOperationRequest, 'requestId', NO_AUTH_HEADER)
-        then: 'the post operation was called and ncmp generated dmi request body json args'
-            1 * mockDmiRestClient.postOperationWithJsonData(DATA, expectedDmiBatchResourceDataUrl, expectedBatchRequestAsJson, READ, NO_AUTH_HEADER)
+        then: 'the post operation was called with the expected URL and JSON request body'
+            1 * mockDmiRestClient.postOperationWithJsonDataAsync(DATA, expectedDmiBatchResourceDataUrl, expectedBatchRequestAsJson, READ, NO_AUTH_HEADER)
     }
 
     def 'Execute (async) data operation from DMI service with Exception.'() {
@@ -116,12 +118,12 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec {
             def dataOperationBatchRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
             def dataOperationRequest = spiedJsonObjectMapper.convertJsonString(dataOperationBatchRequestJsonData, DataOperationRequest.class)
             dataOperationRequest.dataOperationDefinitions[0].cmHandleIds = [cmHandleId]
-        and: 'the published cloud will be captured'
+        and: 'the published cloud event will be captured'
             def actualDataOperationCloudEvent = null
             eventsPublisher.publishCloudEvent('my-topic-name', 'my-request-id', _) >> { args -> actualDataOperationCloudEvent = args[2] }
-        and: 'a positive response from DMI service when it is called with valid request parameters'
-            mockDmiRestClient.postOperationWithJsonData(*_) >> { throw new DmiClientRequestException(123,'','', UNKNOWN_ERROR) }
-        when: 'attempt tp get resource data for group of cm handles are invoked'
+        and: 'a DMI client request exception is thrown when DMI service is called'
+            mockDmiRestClient.postOperationWithJsonDataAsync(*_) >> { Mono.error(new DmiClientRequestException(123, '', '', UNKNOWN_ERROR)) }
+        when: 'attempt to get resource data for group of cm handles is invoked'
             objectUnderTest.requestResourceDataFromDmi('my-topic-name', dataOperationRequest, 'my-request-id', NO_AUTH_HEADER)
         then: 'the event contains the expected error details'
             def eventDataValue = extractDataValue(actualDataOperationCloudEvent)
index 9028b9e..6530685 100644 (file)
 
 package org.onap.cps.ncmp.api.impl.utils.data.operation
 
+import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent
+import static org.onap.cps.ncmp.api.impl.inventory.CmHandleState.ADVISED
+import static org.onap.cps.ncmp.api.impl.inventory.CmHandleState.READY
+
 import com.fasterxml.jackson.databind.ObjectMapper
 import io.cloudevents.CloudEvent
 import io.cloudevents.kafka.CloudEventDeserializer
 import io.cloudevents.kafka.impl.KafkaHeaders
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.onap.cps.events.EventsPublisher
-import org.onap.cps.ncmp.api.NcmpResponseStatus
+import org.onap.cps.ncmp.api.impl.operations.DmiDataOperation
+import org.onap.cps.ncmp.api.impl.operations.OperationType
 import org.onap.cps.ncmp.api.impl.utils.context.CpsApplicationContext
 import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
-import org.onap.cps.ncmp.api.impl.inventory.CmHandleState
 import org.onap.cps.ncmp.api.impl.inventory.CompositeStateBuilder
 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
 import org.onap.cps.ncmp.api.models.DataOperationRequest
@@ -37,15 +41,11 @@ import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent
 import org.onap.cps.ncmp.utils.TestUtils
 import org.onap.cps.utils.JsonObjectMapper
 import org.spockframework.spring.SpringBean
-import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.test.context.ContextConfiguration
-
+import org.springframework.util.LinkedMultiValueMap
 import java.time.Duration
-import java.util.concurrent.TimeoutException
-
-import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent
 
-@ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext, ObjectMapper])
+@ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext])
 class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
 
     def static clientTopic = 'my-topic-name'
@@ -57,9 +57,6 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
     @SpringBean
     EventsPublisher eventPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
-    @Autowired
-    ObjectMapper objectMapper
-
     def 'Process per data operation request with #serviceName.'() {
         given: 'data operation request with 3 operations'
             def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
@@ -135,34 +132,10 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
             jsonObjectMapper.asJsonString(dataOperationResponseEvent.data.responses) == dataOperationResponseEventJson
     }
 
-    def 'Publish error response for entire data operations request when async task fails'() {
-        given: 'consumer subscribing to client topic'
-            def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties(consumerGroupId, CloudEventDeserializer))
-            cloudEventKafkaConsumer.subscribe([clientTopic])
-        and: 'data operation request having non-ready and non-existing cm handle ids'
-            def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
-            def dataOperationRequest = jsonObjectMapper.convertJsonString(dataOperationRequestJsonData, DataOperationRequest.class)
-        when: 'an error occurs for the entire data operations request'
-            ResourceDataOperationRequestUtils.handleAsyncTaskCompletionForDataOperationsRequest(clientTopic, 'request-id', dataOperationRequest, exceptionThrown)
-        and: 'subscribed client specified topic is polled and first record is selected'
-            def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).last()
-            def dataOperationResponseEvent = toTargetEvent(consumerRecordOut.value(), DataOperationEvent.class)
-        then: 'data operation response event response size is 3'
-            dataOperationResponseEvent.data.responses.size() == 3
-        and: 'all 3 have the expected error code'
-            dataOperationResponseEvent.data.responses.each {
-                assert it.statusCode == errorReportedToClientTopic.code
-            }
-        where:
-            scenario             | exceptionThrown        | consumerGroupId || errorReportedToClientTopic
-            'task timed out'     | new TimeoutException() | 'test-2'        || NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING
-            'unspecified error'  | new RuntimeException() | 'test-3'        || NcmpResponseStatus.UNKNOWN_ERROR
-    }
-
     static def getYangModelCmHandles() {
         def dmiProperties = [new YangModelCmHandle.Property('prop', 'some DMI property')]
-        def readyState = new CompositeStateBuilder().withCmHandleState(CmHandleState.READY).withLastUpdatedTimeNow().build()
-        def advisedState = new CompositeStateBuilder().withCmHandleState(CmHandleState.ADVISED).withLastUpdatedTimeNow().build()
+        def readyState = new CompositeStateBuilder().withCmHandleState(READY).withLastUpdatedTimeNow().build()
+        def advisedState = new CompositeStateBuilder().withCmHandleState(ADVISED).withLastUpdatedTimeNow().build()
         return [new YangModelCmHandle(id: 'ch1-dmi1', dmiServiceName: 'dmi1', dmiProperties: dmiProperties, compositeState: readyState),
                 new YangModelCmHandle(id: 'ch2-dmi1', dmiServiceName: 'dmi1', dmiProperties: dmiProperties, compositeState: readyState),
                 new YangModelCmHandle(id: 'ch6-dmi1', dmiServiceName: 'dmi1', dmiProperties: dmiProperties, compositeState: readyState),
@@ -176,7 +149,16 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
 
     static def getYangModelCmHandlesForOneCmHandle() {
         def dmiProperties = [new YangModelCmHandle.Property('prop', 'some DMI property')]
-        def readyState = new CompositeStateBuilder().withCmHandleState(CmHandleState.READY).withLastUpdatedTimeNow().build()
+        def readyState = new CompositeStateBuilder().withCmHandleState(READY).withLastUpdatedTimeNow().build()
         return [new YangModelCmHandle(id: 'ch1-dmi1', dmiServiceName: 'dmi1', moduleSetTag: 'module-set-tag1', dmiProperties: dmiProperties, compositeState: readyState)]
     }
+
+    def mockAndPopulateErrorMap(errorReportedToClientTopic) {
+        def dmiDataOperation = DmiDataOperation.builder().operation(OperationType.fromOperationName('read'))
+                .operationId('some-op-id').datastore('ncmp-datastore:passthrough-operational')
+                .options('some-option').resourceIdentifier('some-resource-identifier').build()
+        def cmHandleIdsPerResponseCodesPerOperation = new LinkedMultiValueMap<>()
+        cmHandleIdsPerResponseCodesPerOperation.add(dmiDataOperation, Map.of(errorReportedToClientTopic, ['some-cm-handle-id']))
+        return cmHandleIdsPerResponseCodesPerOperation
+    }
 }