.flatMapMany(this::getConsumerDmaapModelFromJsonArray);
}
- private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonArray jsonElement) {
+ private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonArray jsonArray) {
+ if(jsonArray.size() == 0) {
+ LOGGER.debug("Nothing to consume from DMaaP");
+ return Flux.empty();
+ }
return create(
- Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.spliterator(), false)
+ Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonArray.spliterator(), false)
.map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
.orElseGet(JsonObject::new)))));
}
import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.AaiHttpClientFactory;
import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.patch.AaiHttpPatchClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@Override
AaiHttpPatchClient resolveClient() {
return new AaiHttpPatchClient(resolveConfiguration(),
- new AaiJsonBodyBuilderImpl()).createAaiHttpClient(new AaiHttpClientFactory(resolveConfiguration()).build());
+ new AaiJsonBodyBuilderImpl(), new CloudHttpClient());
}
@Override