X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=cps-ncmp-service%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fcps%2Fncmp%2Fapi%2Fimpl%2Foperations%2FDmiDataOperations.java;h=786160a9641677a4b957ddba206e4da54fff8d00;hb=04dbe3800a0a9f9809cff2da59a31904a26f17ce;hp=978855569a841fe2a86a5a4fc41bf06e2d90612a;hpb=d3e64201a957ca4a1538ea0962c3e5218a5d34e8;p=cps.git diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java index 978855569..786160a96 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java @@ -32,7 +32,6 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.api.NcmpResponseStatus; import org.onap.cps.ncmp.api.impl.client.DmiRestClient; import org.onap.cps.ncmp.api.impl.config.DmiProperties; @@ -51,13 +50,14 @@ import org.springframework.stereotype.Service; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import org.springframework.web.util.UriComponentsBuilder; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * Operations class for DMI data. */ @RequiredArgsConstructor @Service -@Slf4j public class DmiDataOperations { private final InventoryPersistence inventoryPersistence; @@ -231,28 +231,46 @@ public class DmiDataOperations { groupsOutPerDmiServiceName, final String authorization) { - groupsOutPerDmiServiceName.forEach((dmiServiceName, dmiDataOperationRequestBodies) -> { - final String dmiUrl = DmiServiceUrlBuilder.newInstance() + Flux.fromIterable(groupsOutPerDmiServiceName.entrySet()) + .flatMap(dmiDataOperationsByDmiServiceName -> { + final String dmiServiceName = dmiDataOperationsByDmiServiceName.getKey(); + final String dmiUrl = buildDmiServiceUrl(dmiServiceName, requestId, topicParamInQuery); + final List dmiDataOperationRequestBodies + = dmiDataOperationsByDmiServiceName.getValue(); + return sendDataOperationRequestToDmiService(dmiUrl, dmiDataOperationRequestBodies, authorization); + }) + .subscribe(); + } + + private String buildDmiServiceUrl(final String dmiServiceName, final String requestId, + final String topicParamInQuery) { + return DmiServiceUrlBuilder.newInstance() .pathSegment("data") .queryParameter("requestId", requestId) .queryParameter("topic", topicParamInQuery) .build(dmiServiceName, dmiProperties.getDmiBasePath()); - sendDataOperationRequestToDmiService(dmiUrl, dmiDataOperationRequestBodies, authorization); - }); } - private void sendDataOperationRequestToDmiService(final String dmiUrl, - final List dmiDataOperationRequestBodies, - final String authorization) { + private Mono sendDataOperationRequestToDmiService(final String dmiUrl, + final List dmiDataOperationRequestBodies, + final String authorization) { + final String dmiDataOperationRequestAsJsonString + = createDmiDataOperationRequestAsJsonString(dmiDataOperationRequestBodies); + return dmiRestClient.postOperationWithJsonDataAsync(DATA, dmiUrl, dmiDataOperationRequestAsJsonString, + READ, authorization) + .then() + .onErrorResume(DmiClientRequestException.class, dmiClientRequestException -> { + handleTaskCompletionException(dmiClientRequestException, dmiUrl, dmiDataOperationRequestBodies); + return Mono.empty(); + }); + } + + private String createDmiDataOperationRequestAsJsonString( + final List dmiDataOperationRequestBodies) { final DmiDataOperationRequest dmiDataOperationRequest = DmiDataOperationRequest.builder() - .operations(dmiDataOperationRequestBodies).build(); - final String dmiDataOperationRequestAsJsonString = jsonObjectMapper.asJsonString(dmiDataOperationRequest); - try { - dmiRestClient.postOperationWithJsonData(DATA, dmiUrl, dmiDataOperationRequestAsJsonString, READ, - authorization); - } catch (final DmiClientRequestException e) { - handleTaskCompletionException(e, dmiUrl, dmiDataOperationRequestBodies); - } + .operations(dmiDataOperationRequestBodies) + .build(); + return jsonObjectMapper.asJsonString(dmiDataOperationRequest); } private void handleTaskCompletionException(final DmiClientRequestException dmiClientRequestException, @@ -275,4 +293,4 @@ public class DmiDataOperations { ResourceDataOperationRequestUtils.publishErrorMessageToClientTopic(topicName, requestId, cmHandleIdsPerResponseCodesPerOperation); } -} +} \ No newline at end of file