Cleaned in code in reactive tasks 83/59083/2
authorwasala <przemyslaw.wasala@nokia.com>
Mon, 25 Jun 2018 10:29:39 +0000 (12:29 +0200)
committerwasala <przemyslaw.wasala@nokia.com>
Mon, 6 Aug 2018 12:22:18 +0000 (14:22 +0200)
*Formated code
*Added handling exceptions
*Deleted unused code

Change-Id: I3e95bcb8ba7cdf85f6a1daaec7cadc86080e0b10
Issue-ID: DCAEGEN2-557
Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTask.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTask.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java

index a62321c..ee42ce4 100644 (file)
@@ -46,8 +46,7 @@ public class DmaapConsumerJsonParser {
     private static final String PNF_SERIAL_NUMBER = "pnfSerialNumber";
 
 
-    public Optional<ConsumerDmaapModel> getJsonObject(String message)
-        throws PrhTaskException {
+    public Optional<ConsumerDmaapModel> getJsonObject(String message) throws PrhTaskException {
         JsonElement jsonElement = new JsonParser().parse(message);
         Optional<ConsumerDmaapModel> consumerDmaapModel;
         if (jsonElement.isJsonObject()) {
index df8330f..1bb2850 100644 (file)
@@ -31,5 +31,5 @@ public abstract class AAIConsumerTask {
 
     abstract AAIConsumerClient resolveClient();
 
-    abstract protected String execute(ConsumerDmaapModel consumerDmaapModel) throws AAINotFoundException;
+    protected abstract String execute(ConsumerDmaapModel consumerDmaapModel) throws AAINotFoundException;
 }
index abd0464..4a763ef 100644 (file)
@@ -27,11 +27,11 @@ import org.onap.dcaegen2.services.prh.service.AAIProducerClient;
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
  */
-public abstract class AAIProducerTask/*<R, S, C> extends Task<R, S, C> */ {
+public abstract class AAIProducerTask {
 
     abstract ConsumerDmaapModel publish(ConsumerDmaapModel message) throws AAINotFoundException;
 
     abstract AAIProducerClient resolveClient();
 
-    abstract protected ConsumerDmaapModel execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
+    protected abstract ConsumerDmaapModel execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
 }
index 56b678a..1be3b28 100644 (file)
@@ -26,7 +26,7 @@ import org.onap.dcaegen2.services.prh.service.consumer.ExtendedDmaapConsumerHttp
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
  */
-abstract class DmaapConsumerTask /*<R, S, C> extends Task<R, S, C>*/ {
+abstract class DmaapConsumerTask {
 
     abstract ConsumerDmaapModel consume(String message) throws PrhTaskException;
 
@@ -34,5 +34,5 @@ abstract class DmaapConsumerTask /*<R, S, C> extends Task<R, S, C>*/ {
 
     abstract void initConfigs();
 
-    abstract protected ConsumerDmaapModel execute(String object) throws PrhTaskException;
+    protected abstract ConsumerDmaapModel execute(String object) throws PrhTaskException;
 }
index e72939c..3944d41 100644 (file)
@@ -58,10 +58,9 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
 
     @Override
     ConsumerDmaapModel consume(String message) throws PrhTaskException {
-        logger.info("Consumed model from DmaaP: {}", message);
+        logger.info("Consumed model from DMaaP: {}", message);
         return dmaapConsumerJsonParser.getJsonObject(message)
-            .orElseThrow(() -> new DmaapNotFoundException("Null response from JSONObject in single reqeust"));
-
+            .orElseThrow(() -> new DmaapNotFoundException("Null response from JSON Object in single request"));
     }
 
     @Override
@@ -69,7 +68,7 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
         extendedDmaapConsumerHttpClient = resolveClient();
         logger.trace("Method called with arg {}", object);
         return consume((extendedDmaapConsumerHttpClient.getHttpConsumerResponse().orElseThrow(() ->
-            new PrhTaskException("DmaapConsumerTask has returned null"))));
+            new PrhTaskException("DMaaPConsumerTask has returned null"))));
     }
 
     @Override
index bd9a874..3520d13 100644 (file)
@@ -19,7 +19,7 @@
  */
 package org.onap.dcaegen2.services.prh.tasks;
 
-import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
+import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.service.producer.ExtendedDmaapProducerHttpClientImpl;
 
@@ -28,9 +28,9 @@ import org.onap.dcaegen2.services.prh.service.producer.ExtendedDmaapProducerHttp
  */
 abstract class DmaapPublisherTask {
 
-    abstract Integer publish(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException;
+    abstract Integer publish(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
 
     abstract ExtendedDmaapProducerHttpClientImpl resolveClient();
 
-    abstract protected Integer execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException;
+    protected abstract Integer execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
 }
index addeaae..cf096b7 100644 (file)
  */
 package org.onap.dcaegen2.services.prh.tasks;
 
+import java.util.concurrent.Callable;
+import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
+import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
+import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
-import reactor.core.Disposable;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -49,22 +53,52 @@ public class ScheduledTasks {
     public void scheduleMainPrhEventTask() {
         logger.trace("Execution of tasks was registered");
 
-        Mono.fromSupplier(() -> Mono.fromCallable(() ->
+        Mono<Integer> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage())
+            .doOnError(DmaapEmptyResponseException.class, error -> logger.warn("Nothing to consume from DMaaP"))
+            .flatMap(this::publishToAAIConfiguration)
+            .flatMap(this::publishToDMaaPConfiguration)
+            .subscribeOn(Schedulers.elastic());
+
+        dmaapProducerResponse.subscribe(this::onSuccess, this::onError, this::onComplete);
+    }
+
+    private void onComplete() {
+        logger.info("PRH tasks have been completed");
+    }
+
+    private void onSuccess(Integer responseCode) {
+        logger.info("Prh consumed tasks. HTTP Response code {}", responseCode);
+    }
+
+    private void onError(Throwable throwable) {
+        if (!(throwable instanceof DmaapEmptyResponseException)) {
+            logger.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable);
+        }
+    }
+
+    private Callable<ConsumerDmaapModel> consumeFromDMaaPMessage() {
+        return () ->
         {
             dmaapConsumerTask.initConfigs();
             return dmaapConsumerTask.execute("");
-        }).subscribe(consumerDmaapModel -> Mono
-                .fromCallable(() -> aaiProducerTask.execute(consumerDmaapModel))
-                .subscribe(
-                    aaiConsumerDmaapModel -> Mono.fromCallable(() -> dmaapProducerTask.execute(aaiConsumerDmaapModel))
-                        .subscribe(resp -> logger.info("Message was published to DmaaP, response code: {}", resp),
-                            error -> logger.warn("Error has been thrown in DmaapProduerTask: {}", error),
-                            () -> logger.info("Completed DmaapPublisher task"))),
-            errorResponse -> logger
-                .warn("Error has been thrown in AAIProducerTask: {}", errorResponse)
-            , () -> logger.info("Completed AAIProducer task")))
-            .subscribe(Disposable::dispose, tasksError -> logger
-                    .warn("Chain of tasks have been aborted, because some errors occur in PRH workflow ", tasksError)
-                , () -> logger.info("PRH tasks was consumed properly")).dispose();
+        };
+    }
+
+    private Mono<ConsumerDmaapModel> publishToAAIConfiguration(ConsumerDmaapModel dmaapModel) {
+        try {
+            return Mono.just(aaiProducerTask.execute(dmaapModel));
+        } catch (PrhTaskException e) {
+            logger.warn("Exception in A&AIProducer task ", e);
+            return Mono.error(e);
+        }
+    }
+
+    private Mono<Integer> publishToDMaaPConfiguration(ConsumerDmaapModel aaiModel) {
+        try {
+            return Mono.just(dmaapProducerTask.execute(aaiModel));
+        } catch (PrhTaskException e) {
+            logger.warn("Exception in DMaaPProducer task ", e);
+            return Mono.error(e);
+        }
     }
 }