#1: Used async version of web client for batch read operation
[cps.git] / cps-ncmp-service / src / main / java / org / onap / cps / ncmp / api / impl / operations / DmiDataOperations.java
index 9788555..786160a 100644 (file)
@@ -32,7 +32,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.ncmp.api.NcmpResponseStatus;
 import org.onap.cps.ncmp.api.impl.client.DmiRestClient;
 import org.onap.cps.ncmp.api.impl.config.DmiProperties;
@@ -51,13 +50,14 @@ import org.springframework.stereotype.Service;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 import org.springframework.web.util.UriComponentsBuilder;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 /**
  * Operations class for DMI data.
  */
 @RequiredArgsConstructor
 @Service
-@Slf4j
 public class DmiDataOperations {
 
     private final InventoryPersistence inventoryPersistence;
@@ -231,28 +231,46 @@ public class DmiDataOperations {
                                                                          groupsOutPerDmiServiceName,
                                                                  final String authorization) {
 
-        groupsOutPerDmiServiceName.forEach((dmiServiceName, dmiDataOperationRequestBodies) -> {
-            final String dmiUrl = DmiServiceUrlBuilder.newInstance()
+        Flux.fromIterable(groupsOutPerDmiServiceName.entrySet())
+                .flatMap(dmiDataOperationsByDmiServiceName -> {
+                    final String dmiServiceName = dmiDataOperationsByDmiServiceName.getKey();
+                    final String dmiUrl = buildDmiServiceUrl(dmiServiceName, requestId, topicParamInQuery);
+                    final List<DmiDataOperation> dmiDataOperationRequestBodies
+                            = dmiDataOperationsByDmiServiceName.getValue();
+                    return sendDataOperationRequestToDmiService(dmiUrl, dmiDataOperationRequestBodies, authorization);
+                })
+                .subscribe();
+    }
+
+    private String buildDmiServiceUrl(final String dmiServiceName, final String requestId,
+                                      final String topicParamInQuery) {
+        return DmiServiceUrlBuilder.newInstance()
                 .pathSegment("data")
                 .queryParameter("requestId", requestId)
                 .queryParameter("topic", topicParamInQuery)
                 .build(dmiServiceName, dmiProperties.getDmiBasePath());
-            sendDataOperationRequestToDmiService(dmiUrl, dmiDataOperationRequestBodies, authorization);
-        });
     }
 
-    private void sendDataOperationRequestToDmiService(final String dmiUrl,
-                                                      final List<DmiDataOperation> dmiDataOperationRequestBodies,
-                                                      final String authorization) {
+    private Mono<Void> sendDataOperationRequestToDmiService(final String dmiUrl,
+                                                            final List<DmiDataOperation> dmiDataOperationRequestBodies,
+                                                            final String authorization) {
+        final String dmiDataOperationRequestAsJsonString
+                = createDmiDataOperationRequestAsJsonString(dmiDataOperationRequestBodies);
+        return dmiRestClient.postOperationWithJsonDataAsync(DATA, dmiUrl, dmiDataOperationRequestAsJsonString,
+                        READ, authorization)
+                .then()
+                .onErrorResume(DmiClientRequestException.class, dmiClientRequestException -> {
+                    handleTaskCompletionException(dmiClientRequestException, dmiUrl, dmiDataOperationRequestBodies);
+                    return Mono.empty();
+                });
+    }
+
+    private String createDmiDataOperationRequestAsJsonString(
+            final List<DmiDataOperation> dmiDataOperationRequestBodies) {
         final DmiDataOperationRequest dmiDataOperationRequest = DmiDataOperationRequest.builder()
-                .operations(dmiDataOperationRequestBodies).build();
-        final String dmiDataOperationRequestAsJsonString = jsonObjectMapper.asJsonString(dmiDataOperationRequest);
-        try {
-            dmiRestClient.postOperationWithJsonData(DATA, dmiUrl, dmiDataOperationRequestAsJsonString, READ,
-                    authorization);
-        } catch (final DmiClientRequestException e) {
-            handleTaskCompletionException(e, dmiUrl, dmiDataOperationRequestBodies);
-        }
+                .operations(dmiDataOperationRequestBodies)
+                .build();
+        return jsonObjectMapper.asJsonString(dmiDataOperationRequest);
     }
 
     private void handleTaskCompletionException(final DmiClientRequestException dmiClientRequestException,
@@ -275,4 +293,4 @@ public class DmiDataOperations {
         ResourceDataOperationRequestUtils.publishErrorMessageToClientTopic(topicName, requestId,
                 cmHandleIdsPerResponseCodesPerOperation);
     }
-}
+}
\ No newline at end of file