Added reactive tasks flow control 35/58735/2
authorwasala <przemyslaw.wasala@nokia.com>
Fri, 22 Jun 2018 18:11:20 +0000 (20:11 +0200)
committerwasala <przemyslaw.wasala@nokia.com>
Mon, 6 Aug 2018 06:51:56 +0000 (08:51 +0200)
Change-Id: I9cb2bede66e9e446912f2e6a815c7b56b80813b9
Issue-ID: DCAEGEN2-557
Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
12 files changed:
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTask.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTaskImpl.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTask.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTaskImpl.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/Task.java [deleted file]
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskSpy.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapProducerTaskSpy.java

index 784bc5a..df8330f 100644 (file)
@@ -25,9 +25,11 @@ import org.onap.dcaegen2.services.prh.exceptions.AAINotFoundException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.service.AAIConsumerClient;
 
-public abstract class AAIConsumerTask<R, S, C> extends Task<R, S, C> {
+public abstract class AAIConsumerTask {
 
     abstract Optional<String> consume(ConsumerDmaapModel message) throws AAINotFoundException;
 
     abstract AAIConsumerClient resolveClient();
+
+    abstract protected String execute(ConsumerDmaapModel consumerDmaapModel) throws AAINotFoundException;
 }
index 4c35b2e..c545a1b 100644 (file)
@@ -25,7 +25,6 @@ import org.onap.dcaegen2.services.prh.config.AAIClientConfiguration;
 import org.onap.dcaegen2.services.prh.configuration.AppConfig;
 import org.onap.dcaegen2.services.prh.configuration.Config;
 import org.onap.dcaegen2.services.prh.exceptions.AAINotFoundException;
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.service.AAIConsumerClient;
 import org.slf4j.Logger;
@@ -34,8 +33,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 @Component
-public class AAIConsumerTaskImpl extends
-    AAIConsumerTask<ConsumerDmaapModel, String, AAIClientConfiguration> {
+public class AAIConsumerTaskImpl extends AAIConsumerTask {
 
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
@@ -58,14 +56,6 @@ public class AAIConsumerTaskImpl extends
         }
     }
 
-    @Override
-    protected void receiveRequest(ConsumerDmaapModel body) throws PrhTaskException {
-        String response = execute(body);
-        if (taskProcess != null && response != null && !response.isEmpty()) {
-            taskProcess.receiveRequest(response);
-        }
-    }
-
     @Override
     public String execute(ConsumerDmaapModel consumerDmaapModel) throws AAINotFoundException {
         consumerDmaapModel = Optional.ofNullable(consumerDmaapModel)
@@ -75,7 +65,6 @@ public class AAIConsumerTaskImpl extends
         return consume(consumerDmaapModel).orElseThrow(() -> new AAINotFoundException("Null response code"));
     }
 
-    @Override
     protected AAIClientConfiguration resolveConfiguration() {
         return prhAppConfig.getAAIClientConfiguration();
     }
index c7bde03..abd0464 100644 (file)
 package org.onap.dcaegen2.services.prh.tasks;
 
 import org.onap.dcaegen2.services.prh.exceptions.AAINotFoundException;
+import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.service.AAIProducerClient;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
  */
-public abstract class AAIProducerTask<R, S, C> extends Task<R, S, C> {
+public abstract class AAIProducerTask/*<R, S, C> extends Task<R, S, C> */ {
 
     abstract ConsumerDmaapModel publish(ConsumerDmaapModel message) throws AAINotFoundException;
 
     abstract AAIProducerClient resolveClient();
+
+    abstract protected ConsumerDmaapModel execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
 }
index b637bb2..005d08d 100644 (file)
  */
 package org.onap.dcaegen2.services.prh.tasks;
 
+import java.net.URISyntaxException;
+import java.util.Optional;
 import org.onap.dcaegen2.services.prh.config.AAIClientConfiguration;
 import org.onap.dcaegen2.services.prh.configuration.AppConfig;
 import org.onap.dcaegen2.services.prh.configuration.Config;
 import org.onap.dcaegen2.services.prh.exceptions.AAINotFoundException;
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.service.AAIProducerClient;
 import org.onap.dcaegen2.services.prh.service.HttpUtils;
@@ -32,15 +33,12 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import java.net.URISyntaxException;
-import java.util.Optional;
-
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
  */
 @Component
 public class AAIProducerTaskImpl extends
-    AAIProducerTask<ConsumerDmaapModel, ConsumerDmaapModel, AAIClientConfiguration> {
+    AAIProducerTask {
 
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
@@ -65,14 +63,6 @@ public class AAIProducerTaskImpl extends
         }
     }
 
-    @Override
-    protected void receiveRequest(ConsumerDmaapModel body) throws PrhTaskException {
-        ConsumerDmaapModel response = execute(body);
-        if (taskProcess != null && response != null) {
-            taskProcess.receiveRequest(response);
-        }
-    }
-
     @Override
     public ConsumerDmaapModel execute(ConsumerDmaapModel consumerDmaapModel) throws AAINotFoundException {
         consumerDmaapModel = Optional.ofNullable(consumerDmaapModel)
index 3e36bcd..56b678a 100644 (file)
@@ -26,11 +26,13 @@ import org.onap.dcaegen2.services.prh.service.consumer.ExtendedDmaapConsumerHttp
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
  */
-abstract class DmaapConsumerTask<R, S, C> extends Task<R, S, C> {
+abstract class DmaapConsumerTask /*<R, S, C> extends Task<R, S, C>*/ {
 
     abstract ConsumerDmaapModel consume(String message) throws PrhTaskException;
 
     abstract ExtendedDmaapConsumerHttpClientImpl resolveClient();
 
     abstract void initConfigs();
+
+    abstract protected ConsumerDmaapModel execute(String object) throws PrhTaskException;
 }
index 43eb9ea..e72939c 100644 (file)
@@ -23,7 +23,6 @@ import java.util.Optional;
 import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
 import org.onap.dcaegen2.services.prh.configuration.AppConfig;
 import org.onap.dcaegen2.services.prh.configuration.Config;
-import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
 import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
 import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
@@ -38,8 +37,7 @@ import org.springframework.stereotype.Component;
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
  */
 @Component
-public class DmaapConsumerTaskImpl extends
-    DmaapConsumerTask<String, ConsumerDmaapModel, DmaapConsumerConfiguration> {
+public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
 
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
     private final Config prhAppConfig;
@@ -66,20 +64,6 @@ public class DmaapConsumerTaskImpl extends
 
     }
 
-    @Override
-    protected void receiveRequest(String body) throws PrhTaskException {
-        try {
-            ConsumerDmaapModel response = execute(body);
-            if (taskProcess != null && response != null) {
-                taskProcess.receiveRequest(response);
-            }
-        } catch (DmaapEmptyResponseException e) {
-            logger.warn("Nothing to consume from DmaaP {} topic.",
-                resolveConfiguration().dmaapTopicName());
-        }
-
-    }
-
     @Override
     public ConsumerDmaapModel execute(String object) throws PrhTaskException {
         extendedDmaapConsumerHttpClient = resolveClient();
@@ -93,7 +77,6 @@ public class DmaapConsumerTaskImpl extends
         prhAppConfig.initFileStreamReader();
     }
 
-    @Override
     protected DmaapConsumerConfiguration resolveConfiguration() {
         return prhAppConfig.getDmaapConsumerConfiguration();
     }
index ba8e6e4..bd9a874 100644 (file)
@@ -26,9 +26,11 @@ import org.onap.dcaegen2.services.prh.service.producer.ExtendedDmaapProducerHttp
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
  */
-abstract class DmaapPublisherTask<R, S, C> extends Task<R, S, C> {
+abstract class DmaapPublisherTask {
 
     abstract Integer publish(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException;
 
     abstract ExtendedDmaapProducerHttpClientImpl resolveClient();
+
+    abstract protected Integer execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException;
 }
index 1a52292..7cbeb3b 100644 (file)
@@ -24,7 +24,6 @@ import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
 import org.onap.dcaegen2.services.prh.configuration.AppConfig;
 import org.onap.dcaegen2.services.prh.configuration.Config;
 import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.service.producer.ExtendedDmaapProducerHttpClientImpl;
 import org.slf4j.Logger;
@@ -37,8 +36,7 @@ import org.springframework.stereotype.Component;
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
  */
 @Component
-public class DmaapPublisherTaskImpl extends
-    DmaapPublisherTask<ConsumerDmaapModel, Integer, DmaapPublisherConfiguration> {
+public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
 
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
     private final Config prhAppConfig;
@@ -58,14 +56,6 @@ public class DmaapPublisherTaskImpl extends
             .orElseThrow(() -> new DmaapNotFoundException("Incorrect response from Dmaap"));
     }
 
-    @Override
-    protected void receiveRequest(ConsumerDmaapModel body) throws PrhTaskException {
-            Integer response = execute(body);
-            if (taskProcess != null && response != null) {
-                taskProcess.receiveRequest(response);
-            }
-    }
-
     @Override
     public Integer execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException {
         consumerDmaapModel = Optional.ofNullable(consumerDmaapModel)
@@ -75,7 +65,6 @@ public class DmaapPublisherTaskImpl extends
         return publish(consumerDmaapModel);
     }
 
-    @Override
     protected DmaapPublisherConfiguration resolveConfiguration() {
         return prhAppConfig.getDmaapPublisherConfiguration();
     }
index f776710..addeaae 100644 (file)
  */
 package org.onap.dcaegen2.services.prh.tasks;
 
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
+import reactor.core.Disposable;
+import reactor.core.publisher.Mono;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -46,19 +47,24 @@ public class ScheduledTasks {
     }
 
     public void scheduleMainPrhEventTask() {
-        logger.trace("Execution of task was registered");
-        setTaskExecutionFlow();
-        try {
-            dmaapConsumerTask.initConfigs();
-            dmaapConsumerTask.receiveRequest("");
-        } catch (PrhTaskException e) {
-            logger
-                .warn("Chain of tasks have been aborted, because some errors occur in prh workflow ", e);
-        }
-    }
+        logger.trace("Execution of tasks was registered");
 
-    private void setTaskExecutionFlow() {
-        dmaapConsumerTask.setNext(aaiProducerTask);
-        aaiProducerTask.setNext(dmaapProducerTask);
+        Mono.fromSupplier(() -> Mono.fromCallable(() ->
+        {
+            dmaapConsumerTask.initConfigs();
+            return dmaapConsumerTask.execute("");
+        }).subscribe(consumerDmaapModel -> Mono
+                .fromCallable(() -> aaiProducerTask.execute(consumerDmaapModel))
+                .subscribe(
+                    aaiConsumerDmaapModel -> Mono.fromCallable(() -> dmaapProducerTask.execute(aaiConsumerDmaapModel))
+                        .subscribe(resp -> logger.info("Message was published to DmaaP, response code: {}", resp),
+                            error -> logger.warn("Error has been thrown in DmaapProduerTask: {}", error),
+                            () -> logger.info("Completed DmaapPublisher task"))),
+            errorResponse -> logger
+                .warn("Error has been thrown in AAIProducerTask: {}", errorResponse)
+            , () -> logger.info("Completed AAIProducer task")))
+            .subscribe(Disposable::dispose, tasksError -> logger
+                    .warn("Chain of tasks have been aborted, because some errors occur in PRH workflow ", tasksError)
+                , () -> logger.info("PRH tasks was consumed properly")).dispose();
     }
 }
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/Task.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/Task.java
deleted file mode 100644 (file)
index e2b11fd..0000000
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * PNF-REGISTRATION-HANDLER
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcaegen2.services.prh.tasks;
-
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
- */
-
-
-public abstract class Task<R, S, C> {
-
-    Task taskProcess;
-
-    abstract protected void receiveRequest(R body) throws PrhTaskException;
-
-    abstract protected S execute(R object) throws PrhTaskException;
-
-    abstract protected C resolveConfiguration();
-
-    void setNext(Task task) {
-        this.taskProcess = task;
-    }
-}
index 60e1bd5..5736afe 100644 (file)
@@ -41,7 +41,7 @@ public class DmaapConsumerTaskSpy {
 
     @Bean
     @Primary
-    public Task registerSimpleDmaapConsumerTask() {
+    public DmaapConsumerTask registerSimpleDmaapConsumerTask() {
         AppConfig appConfig = spy(AppConfig.class);
         doReturn(mock(DmaapConsumerConfiguration.class)).when(appConfig).getDmaapConsumerConfiguration();
         DmaapConsumerTaskImpl dmaapConsumerTask = spy(new DmaapConsumerTaskImpl(appConfig));
index b2b97cf..0105660 100644 (file)
@@ -39,7 +39,7 @@ public class DmaapProducerTaskSpy {
 
     @Bean
     @Primary
-    public Task registerSimpleDmaapPublisherTask() {
+    public DmaapPublisherTask registerSimpleDmaapPublisherTask() {
         AppConfig appConfig = spy(AppConfig.class);
         doReturn(mock(DmaapPublisherConfiguration.class)).when(appConfig).getDmaapPublisherConfiguration();
         DmaapPublisherTaskImpl dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig));