@Override
ConsumerDmaapModel consume(String message) throws PrhTaskException {
- logger.info("Consumed model from DmaaP: {}", message);
+ logger.info("Consumed model from DMaaP: {}", message);
return dmaapConsumerJsonParser.getJsonObject(message)
- .orElseThrow(() -> new DmaapNotFoundException("Null response from JSONObject in single reqeust"));
-
+ .orElseThrow(() -> new DmaapNotFoundException("Null response from JSON Object in single request"));
}
@Override
extendedDmaapConsumerHttpClient = resolveClient();
logger.trace("Method called with arg {}", object);
return consume((extendedDmaapConsumerHttpClient.getHttpConsumerResponse().orElseThrow(() ->
- new PrhTaskException("DmaapConsumerTask has returned null"))));
+ new PrhTaskException("DMaaPConsumerTask has returned null"))));
}
@Override
*/
package org.onap.dcaegen2.services.prh.tasks;
+import java.util.concurrent.Callable;
+import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
+import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
+import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import reactor.core.Disposable;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
public void scheduleMainPrhEventTask() {
logger.trace("Execution of tasks was registered");
- Mono.fromSupplier(() -> Mono.fromCallable(() ->
+ Mono<Integer> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage())
+ .doOnError(DmaapEmptyResponseException.class, error -> logger.warn("Nothing to consume from DMaaP"))
+ .flatMap(this::publishToAAIConfiguration)
+ .flatMap(this::publishToDMaaPConfiguration)
+ .subscribeOn(Schedulers.elastic());
+
+ dmaapProducerResponse.subscribe(this::onSuccess, this::onError, this::onComplete);
+ }
+
+ private void onComplete() {
+ logger.info("PRH tasks have been completed");
+ }
+
+ private void onSuccess(Integer responseCode) {
+ logger.info("Prh consumed tasks. HTTP Response code {}", responseCode);
+ }
+
+ private void onError(Throwable throwable) {
+ if (!(throwable instanceof DmaapEmptyResponseException)) {
+ logger.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable);
+ }
+ }
+
+ private Callable<ConsumerDmaapModel> consumeFromDMaaPMessage() {
+ return () ->
{
dmaapConsumerTask.initConfigs();
return dmaapConsumerTask.execute("");
- }).subscribe(consumerDmaapModel -> Mono
- .fromCallable(() -> aaiProducerTask.execute(consumerDmaapModel))
- .subscribe(
- aaiConsumerDmaapModel -> Mono.fromCallable(() -> dmaapProducerTask.execute(aaiConsumerDmaapModel))
- .subscribe(resp -> logger.info("Message was published to DmaaP, response code: {}", resp),
- error -> logger.warn("Error has been thrown in DmaapProduerTask: {}", error),
- () -> logger.info("Completed DmaapPublisher task"))),
- errorResponse -> logger
- .warn("Error has been thrown in AAIProducerTask: {}", errorResponse)
- , () -> logger.info("Completed AAIProducer task")))
- .subscribe(Disposable::dispose, tasksError -> logger
- .warn("Chain of tasks have been aborted, because some errors occur in PRH workflow ", tasksError)
- , () -> logger.info("PRH tasks was consumed properly")).dispose();
+ };
+ }
+
+ private Mono<ConsumerDmaapModel> publishToAAIConfiguration(ConsumerDmaapModel dmaapModel) {
+ try {
+ return Mono.just(aaiProducerTask.execute(dmaapModel));
+ } catch (PrhTaskException e) {
+ logger.warn("Exception in A&AIProducer task ", e);
+ return Mono.error(e);
+ }
+ }
+
+ private Mono<Integer> publishToDMaaPConfiguration(ConsumerDmaapModel aaiModel) {
+ try {
+ return Mono.just(dmaapProducerTask.execute(aaiModel));
+ } catch (PrhTaskException e) {
+ logger.warn("Exception in DMaaPProducer task ", e);
+ return Mono.error(e);
+ }
}
}