import java.util.Set;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.api.NcmpResponseStatus;
import org.onap.cps.ncmp.api.impl.client.DmiRestClient;
import org.onap.cps.ncmp.api.impl.config.DmiProperties;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.util.UriComponentsBuilder;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
/**
* Operations class for DMI data.
*/
@RequiredArgsConstructor
@Service
-@Slf4j
public class DmiDataOperations {
private final InventoryPersistence inventoryPersistence;
groupsOutPerDmiServiceName,
final String authorization) {
- groupsOutPerDmiServiceName.forEach((dmiServiceName, dmiDataOperationRequestBodies) -> {
- final String dmiUrl = DmiServiceUrlBuilder.newInstance()
+ Flux.fromIterable(groupsOutPerDmiServiceName.entrySet())
+ .flatMap(dmiDataOperationsByDmiServiceName -> {
+ final String dmiServiceName = dmiDataOperationsByDmiServiceName.getKey();
+ final String dmiUrl = buildDmiServiceUrl(dmiServiceName, requestId, topicParamInQuery);
+ final List<DmiDataOperation> dmiDataOperationRequestBodies
+ = dmiDataOperationsByDmiServiceName.getValue();
+ return sendDataOperationRequestToDmiService(dmiUrl, dmiDataOperationRequestBodies, authorization);
+ })
+ .subscribe();
+ }
+
+ private String buildDmiServiceUrl(final String dmiServiceName, final String requestId,
+ final String topicParamInQuery) {
+ return DmiServiceUrlBuilder.newInstance()
.pathSegment("data")
.queryParameter("requestId", requestId)
.queryParameter("topic", topicParamInQuery)
.build(dmiServiceName, dmiProperties.getDmiBasePath());
- sendDataOperationRequestToDmiService(dmiUrl, dmiDataOperationRequestBodies, authorization);
- });
}
- private void sendDataOperationRequestToDmiService(final String dmiUrl,
- final List<DmiDataOperation> dmiDataOperationRequestBodies,
- final String authorization) {
+ private Mono<Void> sendDataOperationRequestToDmiService(final String dmiUrl,
+ final List<DmiDataOperation> dmiDataOperationRequestBodies,
+ final String authorization) {
+ final String dmiDataOperationRequestAsJsonString
+ = createDmiDataOperationRequestAsJsonString(dmiDataOperationRequestBodies);
+ return dmiRestClient.postOperationWithJsonDataAsync(DATA, dmiUrl, dmiDataOperationRequestAsJsonString,
+ READ, authorization)
+ .then()
+ .onErrorResume(DmiClientRequestException.class, dmiClientRequestException -> {
+ handleTaskCompletionException(dmiClientRequestException, dmiUrl, dmiDataOperationRequestBodies);
+ return Mono.empty();
+ });
+ }
+
+ private String createDmiDataOperationRequestAsJsonString(
+ final List<DmiDataOperation> dmiDataOperationRequestBodies) {
final DmiDataOperationRequest dmiDataOperationRequest = DmiDataOperationRequest.builder()
- .operations(dmiDataOperationRequestBodies).build();
- final String dmiDataOperationRequestAsJsonString = jsonObjectMapper.asJsonString(dmiDataOperationRequest);
- try {
- dmiRestClient.postOperationWithJsonData(DATA, dmiUrl, dmiDataOperationRequestAsJsonString, READ,
- authorization);
- } catch (final DmiClientRequestException e) {
- handleTaskCompletionException(e, dmiUrl, dmiDataOperationRequestBodies);
- }
+ .operations(dmiDataOperationRequestBodies)
+ .build();
+ return jsonObjectMapper.asJsonString(dmiDataOperationRequest);
}
private void handleTaskCompletionException(final DmiClientRequestException dmiClientRequestException,
ResourceDataOperationRequestUtils.publishErrorMessageToClientTopic(topicName, requestId,
cmHandleIdsPerResponseCodesPerOperation);
}
-}
+}
\ No newline at end of file