PRH DMaaP objects batching 25/68825/7
authorwasala <przemyslaw.wasala@nokia.com>
Tue, 25 Sep 2018 10:24:48 +0000 (12:24 +0200)
committerwasala <przemyslaw.wasala@nokia.com>
Mon, 8 Oct 2018 05:50:46 +0000 (07:50 +0200)
*Getting collection of object
in one request
*Refator the workflow
in the old implementation

Change-Id: I4fdbf4bd8ae70cd78dbf5c3c441ba01c28e6ce4f
Issue-ID: DCAEGEN2-834
Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
pom.xml
prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClient.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java

diff --git a/pom.xml b/pom.xml
index f563de5..1749eaf 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -32,7 +32,7 @@
 
   <groupId>org.onap.dcaegen2.services</groupId>
   <artifactId>prh</artifactId>
-  <version>1.0.0-SNAPSHOT</version>
+  <version>1.1.0-SNAPSHOT</version>
 
   <name>dcaegen2-services-prh</name>
   <description>PNF Registration Handler</description>
index 665d65a..43f6ce9 100644 (file)
@@ -27,7 +27,6 @@ import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.X_ONAP_R
 
 import java.net.URI;
 import java.util.UUID;
-
 import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.slf4j.MDC;
@@ -37,8 +36,6 @@ import org.springframework.web.util.DefaultUriBuilderFactory;
 import reactor.core.publisher.Mono;
 
 
-
-
 public class AaiProducerReactiveHttpClient {
 
     private WebClient webClient;
@@ -90,4 +87,4 @@ public class AaiProducerReactiveHttpClient {
         return new DefaultUriBuilderFactory().builder().scheme(aaiProtocol).host(aaiHost).port(aaiHostPortNumber)
             .path(aaiBasePath + aaiPnfPath + "/" + pnfName).build();
     }
-}
+}
\ No newline at end of file
index 1d121b3..aed9974 100644 (file)
@@ -30,6 +30,7 @@ import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
 import org.springframework.util.StringUtils;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
@@ -51,9 +52,9 @@ public class DmaapConsumerJsonParser {
      * @param monoMessage - results from DMaaP
      * @return reactive DMaaPModel
      */
-    public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
+    public Flux<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
         return monoMessage
-            .flatMap(this::getJsonParserMessage)
+            .flatMapMany(this::getJsonParserMessage)
             .flatMap(this::createJsonConsumerModel);
     }
 
@@ -62,27 +63,30 @@ public class DmaapConsumerJsonParser {
             : Mono.fromCallable(() -> new JsonParser().parse(message));
     }
 
-    private Mono<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) {
+    private Flux<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) {
         return jsonElement.isJsonObject()
-            ? create(Mono.fromCallable(jsonElement::getAsJsonObject))
+            ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())))
             : getConsumerDmaapModelFromJsonArray(jsonElement);
     }
 
-    private Mono<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) {
+    private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) {
         return create(
-            Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
-                .flatMap(this::getJsonObjectFromAnArray)
-                .orElseThrow(DmaapEmptyResponseException::new)));
+            Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
+                .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
+                    .orElseGet(JsonObject::new)))));
     }
 
     public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
-        return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject());
+        JsonParser jsonParser = new JsonParser();
+        return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
+            : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
     }
 
-    private Mono<ConsumerDmaapModel> create(Mono<JsonObject> jsonObject) {
+    private Flux<ConsumerDmaapModel> create(Flux<JsonObject> jsonObject) {
         return jsonObject.flatMap(monoJsonP ->
-            !containsHeader(monoJsonP) ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
-                : transform(monoJsonP));
+            !containsHeader(monoJsonP) ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
+                : transform(monoJsonP))
+            .onErrorResume(exception -> exception instanceof DmaapNotFoundException, e -> Mono.empty());
     }
 
     private Mono<ConsumerDmaapModel> transform(JsonObject responseFromDmaap) {
index a6baf4a..4cde225 100644 (file)
@@ -25,6 +25,7 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient;
 import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
 import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
@@ -32,7 +33,7 @@ import reactor.core.publisher.Mono;
  */
 abstract class DmaapConsumerTask {
 
-    abstract Mono<ConsumerDmaapModel> consume(Mono<String> message);
+    abstract Flux<ConsumerDmaapModel> consume(Mono<String> message);
 
     abstract DMaaPConsumerReactiveHttpClient resolveClient();
 
@@ -40,7 +41,7 @@ abstract class DmaapConsumerTask {
 
     protected abstract DmaapConsumerConfiguration resolveConfiguration();
 
-    protected abstract Mono<ConsumerDmaapModel> execute(String object);
+    protected abstract Flux<ConsumerDmaapModel> execute(String object);
 
     WebClient buildWebClient() {
         return new DMaaPReactiveWebClient().build();
index 341a229..3a5f213 100644 (file)
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
@@ -54,12 +55,12 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
     }
 
     @Override
-    Mono<ConsumerDmaapModel> consume(Mono<String> message) {
+    Flux<ConsumerDmaapModel> consume(Mono<String> message) {
         return dmaapConsumerJsonParser.getJsonObject(message);
     }
 
     @Override
-    public Mono<ConsumerDmaapModel> execute(String object) {
+    public Flux<ConsumerDmaapModel> execute(String object) {
         DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient();
         LOGGER.debug("Method called with arg {}", object);
         return consume(dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse());
index de7837e..0876742 100644 (file)
@@ -26,6 +26,7 @@ import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.RESPONSE
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.function.Predicate;
 import javax.net.ssl.SSLException;
 import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
 import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
@@ -39,6 +40,7 @@ import org.slf4j.MarkerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
@@ -83,7 +85,13 @@ public class ScheduledTasks {
                     logger.warn("Nothing to consume from DMaaP")
                 )
                 .flatMap(this::publishToAaiConfiguration)
+                .doOnError(exception ->
+                    logger.warn("AAIProducerTask exception has been registered: ", exception))
+                .onErrorResume(resumePrhPredicate(), exception -> Mono.empty())
                 .flatMap(this::publishToDmaapConfiguration)
+                .doOnError(exception ->
+                    logger.warn("DMaaPProducerTask exception has been registered: ", exception))
+                .onErrorResume(resumePrhPredicate(), exception -> Mono.empty())
                 .doOnTerminate(mainCountDownLatch::countDown)
                 .subscribe(this::onSuccess, this::onError, this::onComplete);
 
@@ -113,8 +121,8 @@ public class ScheduledTasks {
     }
 
 
-    private Mono<ConsumerDmaapModel> consumeFromDMaaPMessage() {
-        return Mono.defer(() -> {
+    private Flux<ConsumerDmaapModel> consumeFromDMaaPMessage() {
+        return Flux.defer(() -> {
             MdcVariables.setMdcContextMap(mdcContextMap);
             MDC.put(INSTANCE_UUID, UUID.randomUUID().toString());
             logger.info(INVOKE, "Init configs");
@@ -138,4 +146,8 @@ public class ScheduledTasks {
             return Mono.error(e);
         }
     }
+
+    private Predicate<Throwable> resumePrhPredicate() {
+        return exception -> exception instanceof PrhTaskException;
+    }
 }
index 225d46e..01ce741 100644 (file)
@@ -28,7 +28,6 @@ import java.util.Optional;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
-import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
 import reactor.core.publisher.Mono;
@@ -43,36 +42,36 @@ class DmaapConsumerJsonParserTest {
     void whenPassingCorrectJson_validationNotThrowingAnException() {
         //given
         String message = "[{\"event\": {"
-                + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"},"
-                + "\"pnfRegistrationFields\": {"
-                + " \"unitType\": \"AirScale\","
-                + " \"serialNumber\": \"QTFCOC540002E\","
-                + " \"pnfRegistrationFieldsVersion\": \"2.0\","
-                + " \"manufactureDate\": \"1535014037024\","
-                + " \"modelNumber\": \"7BEA\",\n"
-                + " \"lastServiceDate\": \"1535014037024\","
-                + " \"unitFamily\": \"BBU\","
-                + " \"vendorName\": \"Nokia\","
-                + " \"oamV4IpAddress\": \"10.16.123.234\","
-                + " \"softwareVersion\": \"v4.5.0.1\","
-                + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\""
-                + "}}}]";
+            + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"},"
+            + "\"pnfRegistrationFields\": {"
+            + " \"unitType\": \"AirScale\","
+            + " \"serialNumber\": \"QTFCOC540002E\","
+            + " \"pnfRegistrationFieldsVersion\": \"2.0\","
+            + " \"manufactureDate\": \"1535014037024\","
+            + " \"modelNumber\": \"7BEA\",\n"
+            + " \"lastServiceDate\": \"1535014037024\","
+            + " \"unitFamily\": \"BBU\","
+            + " \"vendorName\": \"Nokia\","
+            + " \"oamV4IpAddress\": \"10.16.123.234\","
+            + " \"softwareVersion\": \"v4.5.0.1\","
+            + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\""
+            + "}}}]";
 
         String parsed = "{\"event\": {"
-                + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"},"
-                + "\"pnfRegistrationFields\": {"
-                + " \"unitType\": \"AirScale\","
-                + " \"serialNumber\": \"QTFCOC540002E\","
-                + " \"pnfRegistrationFieldsVersion\": \"2.0\","
-                + " \"manufactureDate\": \"1535014037024\","
-                + " \"modelNumber\": \"7BEA\",\n"
-                + " \"lastServiceDate\": \"1535014037024\","
-                + " \"unitFamily\": \"BBU\","
-                + " \"vendorName\": \"Nokia\","
-                + " \"oamV4IpAddress\": \"10.16.123.234\","
-                + " \"softwareVersion\": \"v4.5.0.1\","
-                + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\""
-                + "}}}";
+            + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"},"
+            + "\"pnfRegistrationFields\": {"
+            + " \"unitType\": \"AirScale\","
+            + " \"serialNumber\": \"QTFCOC540002E\","
+            + " \"pnfRegistrationFieldsVersion\": \"2.0\","
+            + " \"manufactureDate\": \"1535014037024\","
+            + " \"modelNumber\": \"7BEA\",\n"
+            + " \"lastServiceDate\": \"1535014037024\","
+            + " \"unitFamily\": \"BBU\","
+            + " \"vendorName\": \"Nokia\","
+            + " \"oamV4IpAddress\": \"10.16.123.234\","
+            + " \"softwareVersion\": \"v4.5.0.1\","
+            + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\""
+            + "}}}";
 
         ConsumerDmaapModel expectedObject = ImmutableConsumerDmaapModel.builder().ipv4("10.16.123.234")
             .ipv6("0:0:0:0:0:FFFF:0A10:7BEA")
@@ -83,7 +82,7 @@ class DmaapConsumerJsonParserTest {
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
         ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser
-            .getJsonObject(Mono.just((message))).block();
+            .getJsonObject(Mono.just((message))).blockFirst();
         //then
         Assertions.assertNotNull(consumerDmaapModel);
         Assertions.assertEquals(expectedObject, consumerDmaapModel);
@@ -93,34 +92,34 @@ class DmaapConsumerJsonParserTest {
     void whenPassingCorrectJsonWithoutIpv4_validationNotThrowingAnException() {
         //given
         String message = "[{\"event\": {"
-                + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"},"
-                + "\"pnfRegistrationFields\": {"
-                + " \"unitType\": \"AirScale\","
-                + " \"serialNumber\": \"QTFCOC540002E\","
-                + " \"pnfRegistrationFieldsVersion\": \"2.0\","
-                + " \"manufactureDate\": \"1535014037024\","
-                + " \"modelNumber\": \"7BEA\",\n"
-                + " \"lastServiceDate\": \"1535014037024\","
-                + " \"unitFamily\": \"BBU\","
-                + " \"vendorName\": \"Nokia\","
-                + " \"softwareVersion\": \"v4.5.0.1\","
-                + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\""
-                + "}}}]";
+            + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"},"
+            + "\"pnfRegistrationFields\": {"
+            + " \"unitType\": \"AirScale\","
+            + " \"serialNumber\": \"QTFCOC540002E\","
+            + " \"pnfRegistrationFieldsVersion\": \"2.0\","
+            + " \"manufactureDate\": \"1535014037024\","
+            + " \"modelNumber\": \"7BEA\",\n"
+            + " \"lastServiceDate\": \"1535014037024\","
+            + " \"unitFamily\": \"BBU\","
+            + " \"vendorName\": \"Nokia\","
+            + " \"softwareVersion\": \"v4.5.0.1\","
+            + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\""
+            + "}}}]";
 
         String parsed = "{\"event\": {"
-                + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"},"
-                + "\"pnfRegistrationFields\": {"
-                + " \"unitType\": \"AirScale\","
-                + " \"serialNumber\": \"QTFCOC540002E\","
-                + " \"pnfRegistrationFieldsVersion\": \"2.0\","
-                + " \"manufactureDate\": \"1535014037024\","
-                + " \"modelNumber\": \"7BEA\",\n"
-                + " \"lastServiceDate\": \"1535014037024\","
-                + " \"unitFamily\": \"BBU\","
-                + " \"vendorName\": \"Nokia\","
-                + " \"softwareVersion\": \"v4.5.0.1\","
-                + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\""
-                + "}}}";
+            + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"},"
+            + "\"pnfRegistrationFields\": {"
+            + " \"unitType\": \"AirScale\","
+            + " \"serialNumber\": \"QTFCOC540002E\","
+            + " \"pnfRegistrationFieldsVersion\": \"2.0\","
+            + " \"manufactureDate\": \"1535014037024\","
+            + " \"modelNumber\": \"7BEA\",\n"
+            + " \"lastServiceDate\": \"1535014037024\","
+            + " \"unitFamily\": \"BBU\","
+            + " \"vendorName\": \"Nokia\","
+            + " \"softwareVersion\": \"v4.5.0.1\","
+            + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\""
+            + "}}}";
 
         //when
         DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
@@ -129,7 +128,7 @@ class DmaapConsumerJsonParserTest {
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
         dmaapConsumerJsonParser.getJsonObject(Mono.just((message)));
         ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just((message)))
-            .block();
+            .blockFirst();
         //then
         ConsumerDmaapModel expectedObject = ImmutableConsumerDmaapModel.builder().ipv4("")
             .ipv6("0:0:0:0:0:FFFF:0A10:7BEA")
@@ -142,34 +141,34 @@ class DmaapConsumerJsonParserTest {
     void whenPassingCorrectJsonWithoutIpv6_validationNotThrowingAnException() {
         //given
         String message = "[{\"event\": {"
-                + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"},"
-                + "\"pnfRegistrationFields\": {"
-                + " \"unitType\": \"AirScale\","
-                + " \"serialNumber\": \"QTFCOC540002E\","
-                + " \"pnfRegistrationFieldsVersion\": \"2.0\","
-                + " \"manufactureDate\": \"1535014037024\","
-                + " \"modelNumber\": \"7BEA\",\n"
-                + " \"lastServiceDate\": \"1535014037024\","
-                + " \"unitFamily\": \"BBU\","
-                + " \"vendorName\": \"Nokia\","
-                + " \"oamV4IpAddress\": \"10.16.123.234\","
-                + " \"softwareVersion\": \"v4.5.0.1\""
-                + "}}}]";
+            + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"},"
+            + "\"pnfRegistrationFields\": {"
+            + " \"unitType\": \"AirScale\","
+            + " \"serialNumber\": \"QTFCOC540002E\","
+            + " \"pnfRegistrationFieldsVersion\": \"2.0\","
+            + " \"manufactureDate\": \"1535014037024\","
+            + " \"modelNumber\": \"7BEA\",\n"
+            + " \"lastServiceDate\": \"1535014037024\","
+            + " \"unitFamily\": \"BBU\","
+            + " \"vendorName\": \"Nokia\","
+            + " \"oamV4IpAddress\": \"10.16.123.234\","
+            + " \"softwareVersion\": \"v4.5.0.1\""
+            + "}}}]";
 
         String parsed = "{\"event\": {"
-                + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"},"
-                + "\"pnfRegistrationFields\": {"
-                + " \"unitType\": \"AirScale\","
-                + " \"serialNumber\": \"QTFCOC540002E\","
-                + " \"pnfRegistrationFieldsVersion\": \"2.0\","
-                + " \"manufactureDate\": \"1535014037024\","
-                + " \"modelNumber\": \"7BEA\",\n"
-                + " \"lastServiceDate\": \"1535014037024\","
-                + " \"unitFamily\": \"BBU\","
-                + " \"vendorName\": \"Nokia\","
-                + " \"oamV4IpAddress\": \"10.16.123.234\","
-                + " \"softwareVersion\": \"v4.5.0.1\""
-                + "}}}";
+            + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"},"
+            + "\"pnfRegistrationFields\": {"
+            + " \"unitType\": \"AirScale\","
+            + " \"serialNumber\": \"QTFCOC540002E\","
+            + " \"pnfRegistrationFieldsVersion\": \"2.0\","
+            + " \"manufactureDate\": \"1535014037024\","
+            + " \"modelNumber\": \"7BEA\",\n"
+            + " \"lastServiceDate\": \"1535014037024\","
+            + " \"unitFamily\": \"BBU\","
+            + " \"vendorName\": \"Nokia\","
+            + " \"oamV4IpAddress\": \"10.16.123.234\","
+            + " \"softwareVersion\": \"v4.5.0.1\""
+            + "}}}";
 
         ConsumerDmaapModel expectedObject = ImmutableConsumerDmaapModel.builder().ipv4("10.16.123.234").ipv6("")
             .correlationId("NOKQTFCOC540002E").build();
@@ -179,55 +178,112 @@ class DmaapConsumerJsonParserTest {
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
         ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just((message)))
-            .block();
+            .blockFirst();
         //then
         Assertions.assertNotNull(consumerDmaapModel);
         Assertions.assertEquals(expectedObject, consumerDmaapModel);
     }
 
     @Test
-    void whenPassingCorrectJsonWithoutIpv4andIpv6_validationThrowingAnException() {
+    void whenPassingCorrectJsonWithoutIpv4andIpv6_validationAddingAnException() {
         //given
         String message = "[{\"event\": {"
-                + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"},"
-                + "\"pnfRegistrationFields\": {"
-                + " \"unitType\": \"AirScale\","
-                + " \"serialNumber\": \"QTFCOC540002E\","
-                + " \"pnfRegistrationFieldsVersion\": \"2.0\","
-                + " \"manufactureDate\": \"1535014037024\","
-                + " \"modelNumber\": \"7BEA\",\n"
-                + " \"lastServiceDate\": \"1535014037024\","
-                + " \"unitFamily\": \"BBU\","
-                + " \"vendorName\": \"Nokia\","
-                + " \"softwareVersion\": \"v4.5.0.1\""
-                + "}}}]";
+            + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"},"
+            + "\"pnfRegistrationFields\": {"
+            + " \"unitType\": \"AirScale\","
+            + " \"serialNumber\": \"QTFCOC540002E\","
+            + " \"pnfRegistrationFieldsVersion\": \"2.0\","
+            + " \"manufactureDate\": \"1535014037024\","
+            + " \"modelNumber\": \"7BEA\",\n"
+            + " \"lastServiceDate\": \"1535014037024\","
+            + " \"unitFamily\": \"BBU\","
+            + " \"vendorName\": \"Nokia\","
+            + " \"softwareVersion\": \"v4.5.0.1\""
+            + "}}}]";
 
         String parsed = "{\"event\": {"
-                + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"},"
-                + "\"pnfRegistrationFields\": {"
-                + " \"unitType\": \"AirScale\","
-                + " \"serialNumber\": \"QTFCOC540002E\","
-                + " \"pnfRegistrationFieldsVersion\": \"2.0\","
-                + " \"manufactureDate\": \"1535014037024\","
-                + " \"modelNumber\": \"7BEA\",\n"
-                + " \"lastServiceDate\": \"1535014037024\","
-                + " \"unitFamily\": \"BBU\","
-                + " \"vendorName\": \"Nokia\","
-                + " \"softwareVersion\": \"v4.5.0.1\""
-                + "}}}";
+            + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"},"
+            + "\"pnfRegistrationFields\": {"
+            + " \"unitType\": \"AirScale\","
+            + " \"serialNumber\": \"QTFCOC540002E\","
+            + " \"pnfRegistrationFieldsVersion\": \"2.0\","
+            + " \"manufactureDate\": \"1535014037024\","
+            + " \"modelNumber\": \"7BEA\",\n"
+            + " \"lastServiceDate\": \"1535014037024\","
+            + " \"unitFamily\": \"BBU\","
+            + " \"vendorName\": \"Nokia\","
+            + " \"softwareVersion\": \"v4.5.0.1\""
+            + "}}}";
 
         DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
         StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(message)))
-            .expectSubscription().expectError(DmaapNotFoundException.class).verify();
+            .expectSubscription().thenRequest(1).verifyComplete();
 
     }
 
     @Test
-    void whenPassingJsonWithoutMandatoryHeaderInformation_validationThrowingAnException() {
+    void whenPassingJsonWithoutMandatoryHeaderInformation_validationAddingAnException() {
         String parsed = "{\"event\": {"
+            + "\"commonEventHeader\": {},"
+            + "\"pnfRegistrationFields\": {"
+            + " \"unitType\": \"AirScale\","
+            + " \"serialNumber\": \"QTFCOC540002E\","
+            + " \"pnfRegistrationFieldsVersion\": \"2.0\","
+            + " \"manufactureDate\": \"1535014037024\","
+            + " \"modelNumber\": \"7BEA\",\n"
+            + " \"lastServiceDate\": \"1535014037024\","
+            + " \"unitFamily\": \"BBU\","
+            + " \"vendorName\": \"Nokia\","
+            + " \"softwareVersion\": \"v4.5.0.1\""
+            + "}}}";
+
+        DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+        JsonElement jsonElement = new JsonParser().parse(parsed);
+        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
+            .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
+        String incorrectMessage = "[{\"event\": {"
+            + "\"commonEventHeader\": {},"
+            + "\"pnfRegistrationFields\": {"
+            + " \"unitType\": \"AirScale\","
+            + " \"serialNumber\": \"QTFCOC540002E\","
+            + " \"pnfRegistrationFieldsVersion\": \"2.0\","
+            + " \"manufactureDate\": \"1535014037024\","
+            + " \"modelNumber\": \"7BEA\",\n"
+            + " \"lastServiceDate\": \"1535014037024\","
+            + " \"unitFamily\": \"BBU\","
+            + " \"vendorName\": \"Nokia\","
+            + " \"softwareVersion\": \"v4.5.0.1\""
+            + "}}}]";
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessage)))
+            .expectSubscription().thenRequest(1).verifyComplete();
+    }
+
+    @Test
+    void whenPassingJsonWithoutSourceName_validationAddingAnException() {
+        String parsed = "{\"event\": {"
+            + "\"commonEventHeader\": {},"
+            + "\"pnfRegistrationFields\": {"
+            + " \"unitType\": \"AirScale\","
+            + " \"serialNumber\": \"QTFCOC540002E\","
+            + " \"pnfRegistrationFieldsVersion\": \"2.0\","
+            + " \"manufactureDate\": \"1535014037024\","
+            + " \"modelNumber\": \"7BEA\",\n"
+            + " \"lastServiceDate\": \"1535014037024\","
+            + " \"unitFamily\": \"BBU\","
+            + " \"vendorName\": \"Nokia\","
+            + " \"softwareVersion\": \"v4.5.0.1\","
+            + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\""
+            + "}}}";
+
+        DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+        JsonElement jsonElement = new JsonParser().parse(parsed);
+        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
+            .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
+        String jsonWithoutSourceName =
+            "[{\"event\": {"
                 + "\"commonEventHeader\": {},"
                 + "\"pnfRegistrationFields\": {"
                 + " \"unitType\": \"AirScale\","
@@ -238,15 +294,41 @@ class DmaapConsumerJsonParserTest {
                 + " \"lastServiceDate\": \"1535014037024\","
                 + " \"unitFamily\": \"BBU\","
                 + " \"vendorName\": \"Nokia\","
-                + " \"softwareVersion\": \"v4.5.0.1\""
+                + " \"softwareVersion\": \"v4.5.0.1\","
+                + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\""
+                + "}}}]";
+        StepVerifier
+            .create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutSourceName)))
+            .expectSubscription().thenRequest(1)
+            .verifyComplete();
+    }
+
+    @Test
+    void whenPassingJsonWithoutIpInformation_validationAddingAnException() {
+        String parsed =
+            "{\"event\": {"
+                + "\"commonEventHeader\": {\"sourceName\": \"NOKQTFCOC540002E\"},"
+                + "\"pnfRegistrationFields\": {"
+                + " \"unitType\": \"AirScale\","
+                + " \"serialNumber\": \"QTFCOC540002E\","
+                + " \"pnfRegistrationFieldsVersion\": \"2.0\","
+                + " \"manufactureDate\": \"1535014037024\","
+                + " \"modelNumber\": \"7BEA\",\n"
+                + " \"lastServiceDate\": \"1535014037024\","
+                + " \"unitFamily\": \"BBU\","
+                + " \"vendorName\": \"Nokia\","
+                + " \"softwareVersion\": \"v4.5.0.1\","
+                + " \"oamV4IpAddress\": \"\","
+                + " \"oamV6IpAddress\": \"\""
                 + "}}}";
 
         DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
-        String incorrectMessage = "[{\"event\": {"
-                + "\"commonEventHeader\": {},"
+        String jsonWithoutIpInformation =
+            "[{\"event\": {"
+                + "\"commonEventHeader\": {\"sourceName\": \"NOKQTFCOC540002E\"},"
                 + "\"pnfRegistrationFields\": {"
                 + " \"unitType\": \"AirScale\","
                 + " \"serialNumber\": \"QTFCOC540002E\","
@@ -256,16 +338,19 @@ class DmaapConsumerJsonParserTest {
                 + " \"lastServiceDate\": \"1535014037024\","
                 + " \"unitFamily\": \"BBU\","
                 + " \"vendorName\": \"Nokia\","
-                + " \"softwareVersion\": \"v4.5.0.1\""
+                + " \"softwareVersion\": \"v4.5.0.1\","
+                + " \"oamV4IpAddress\": \"\","
+                + " \"oamV6IpAddress\": \"\""
                 + "}}}]";
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessage)))
-            .expectSubscription().expectError(DmaapNotFoundException.class).verify();
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutIpInformation)))
+            .expectSubscription().thenRequest(1).verifyComplete();
     }
 
     @Test
-    void whenPassingJsonWithoutSourceName_validationThrowingAnException() {
-        String parsed = "{\"event\": {"
-                + "\"commonEventHeader\": {},"
+    void whenPassingJsonWithoutSourceNameValue_validationAddingAnException() {
+        String parsed =
+            "{\"event\": {"
+                + "\"commonEventHeader\": {\"sourceName\": \"\"},"
                 + "\"pnfRegistrationFields\": {"
                 + " \"unitType\": \"AirScale\","
                 + " \"serialNumber\": \"QTFCOC540002E\","
@@ -276,116 +361,77 @@ class DmaapConsumerJsonParserTest {
                 + " \"unitFamily\": \"BBU\","
                 + " \"vendorName\": \"Nokia\","
                 + " \"softwareVersion\": \"v4.5.0.1\","
+                + " \"oamV4IpAddress\": \"10.16.123.234\","
                 + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\""
                 + "}}}";
 
-        DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
-        JsonElement jsonElement = new JsonParser().parse(parsed);
-        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
-            .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
-        String jsonWithoutSourceName =
-                "[{\"event\": {"
-                        + "\"commonEventHeader\": {},"
-                        + "\"pnfRegistrationFields\": {"
-                        + " \"unitType\": \"AirScale\","
-                        + " \"serialNumber\": \"QTFCOC540002E\","
-                        + " \"pnfRegistrationFieldsVersion\": \"2.0\","
-                        + " \"manufactureDate\": \"1535014037024\","
-                        + " \"modelNumber\": \"7BEA\",\n"
-                        + " \"lastServiceDate\": \"1535014037024\","
-                        + " \"unitFamily\": \"BBU\","
-                        + " \"vendorName\": \"Nokia\","
-                        + " \"softwareVersion\": \"v4.5.0.1\","
-                        + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\""
-                        + "}}}]";
-        StepVerifier
-            .create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutSourceName)))
-            .expectSubscription().expectError(DmaapNotFoundException.class).verify();
-    }
-
-    @Test
-    void whenPassingJsonWithoutIpInformation_validationThrowingAnException() {
-        String parsed =
-                "{\"event\": {"
-                        + "\"commonEventHeader\": {\"sourceName\": \"NOKQTFCOC540002E\"},"
-                        + "\"pnfRegistrationFields\": {"
-                        + " \"unitType\": \"AirScale\","
-                        + " \"serialNumber\": \"QTFCOC540002E\","
-                        + " \"pnfRegistrationFieldsVersion\": \"2.0\","
-                        + " \"manufactureDate\": \"1535014037024\","
-                        + " \"modelNumber\": \"7BEA\",\n"
-                        + " \"lastServiceDate\": \"1535014037024\","
-                        + " \"unitFamily\": \"BBU\","
-                        + " \"vendorName\": \"Nokia\","
-                        + " \"softwareVersion\": \"v4.5.0.1\","
-                        + " \"oamV4IpAddress\": \"\","
-                        + " \"oamV6IpAddress\": \"\""
-                        + "}}}";
-
         DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
         String jsonWithoutIpInformation =
-                "[{\"event\": {"
-                        + "\"commonEventHeader\": {\"sourceName\": \"NOKQTFCOC540002E\"},"
-                        + "\"pnfRegistrationFields\": {"
-                        + " \"unitType\": \"AirScale\","
-                        + " \"serialNumber\": \"QTFCOC540002E\","
-                        + " \"pnfRegistrationFieldsVersion\": \"2.0\","
-                        + " \"manufactureDate\": \"1535014037024\","
-                        + " \"modelNumber\": \"7BEA\",\n"
-                        + " \"lastServiceDate\": \"1535014037024\","
-                        + " \"unitFamily\": \"BBU\","
-                        + " \"vendorName\": \"Nokia\","
-                        + " \"softwareVersion\": \"v4.5.0.1\","
-                        + " \"oamV4IpAddress\": \"\","
-                        + " \"oamV6IpAddress\": \"\""
-                        + "}}}]";
+            "[{\"event\": {"
+                + "\"commonEventHeader\": {\"sourceName\": \"\"},"
+                + "\"pnfRegistrationFields\": {"
+                + " \"unitType\": \"AirScale\","
+                + " \"serialNumber\": \"QTFCOC540002E\","
+                + " \"pnfRegistrationFieldsVersion\": \"2.0\","
+                + " \"manufactureDate\": \"1535014037024\","
+                + " \"modelNumber\": \"7BEA\",\n"
+                + " \"lastServiceDate\": \"1535014037024\","
+                + " \"unitFamily\": \"BBU\","
+                + " \"vendorName\": \"Nokia\","
+                + " \"softwareVersion\": \"v4.5.0.1\","
+                + " \"oamV4IpAddress\": \"10.16.123.234\","
+                + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\""
+                + "}}}]";
         StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutIpInformation)))
-            .expectSubscription().expectError(DmaapNotFoundException.class).verify();
+            .expectSubscription().thenRequest(1).verifyComplete();
     }
 
     @Test
-    void whenPassingJsonWithoutSourceNameValue_validationThrowingAnException() {
-        String parsed =
-                "{\"event\": {"
-                        + "\"commonEventHeader\": {\"sourceName\": \"\"},"
-                        + "\"pnfRegistrationFields\": {"
-                        + " \"unitType\": \"AirScale\","
-                        + " \"serialNumber\": \"QTFCOC540002E\","
-                        + " \"pnfRegistrationFieldsVersion\": \"2.0\","
-                        + " \"manufactureDate\": \"1535014037024\","
-                        + " \"modelNumber\": \"7BEA\",\n"
-                        + " \"lastServiceDate\": \"1535014037024\","
-                        + " \"unitFamily\": \"BBU\","
-                        + " \"vendorName\": \"Nokia\","
-                        + " \"softwareVersion\": \"v4.5.0.1\","
-                        + " \"oamV4IpAddress\": \"10.16.123.234\","
-                        + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\""
-                        + "}}}";
+    void whenPassingCorrectJsoArraynWithoutIpv4_validationNotThrowingAnException() {
+        //given
+        String message = "[{\"event\": {"
+            + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"},"
+            + "\"pnfRegistrationFields\": {"
+            + " \"unitType\": \"AirScale\","
+            + " \"serialNumber\": \"QTFCOC540002E\","
+            + " \"pnfRegistrationFieldsVersion\": \"2.0\","
+            + " \"manufactureDate\": \"1535014037024\","
+            + " \"modelNumber\": \"7BEA\",\n"
+            + " \"lastServiceDate\": \"1535014037024\","
+            + " \"unitFamily\": \"BBU\","
+            + " \"vendorName\": \"Nokia\","
+            + " \"oamV4IpAddress\": \"10.16.123.234\","
+            + " \"softwareVersion\": \"v4.5.0.1\","
+            + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\""
+            + "}}},"
+            + "{\"event\": {"
+            + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"},"
+            + "\"pnfRegistrationFields\": {"
+            + " \"unitType\": \"AirScale\","
+            + " \"serialNumber\": \"QTFCOC540002E\","
+            + " \"pnfRegistrationFieldsVersion\": \"2.0\","
+            + " \"manufactureDate\": \"1535014037024\","
+            + " \"modelNumber\": \"7BEA\",\n"
+            + " \"lastServiceDate\": \"1535014037024\","
+            + " \"unitFamily\": \"BBU\","
+            + " \"vendorName\": \"Nokia\","
+            + " \"oamV4IpAddress\": \"10.16.123.234\","
+            + " \"softwareVersion\": \"v4.5.0.1\","
+            + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\""
+            + "}}}"
+            + "]";
 
-        DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
-        JsonElement jsonElement = new JsonParser().parse(parsed);
-        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
-                .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
-        String jsonWithoutIpInformation =
-                "[{\"event\": {"
-                        + "\"commonEventHeader\": {\"sourceName\": \"\"},"
-                        + "\"pnfRegistrationFields\": {"
-                        + " \"unitType\": \"AirScale\","
-                        + " \"serialNumber\": \"QTFCOC540002E\","
-                        + " \"pnfRegistrationFieldsVersion\": \"2.0\","
-                        + " \"manufactureDate\": \"1535014037024\","
-                        + " \"modelNumber\": \"7BEA\",\n"
-                        + " \"lastServiceDate\": \"1535014037024\","
-                        + " \"unitFamily\": \"BBU\","
-                        + " \"vendorName\": \"Nokia\","
-                        + " \"softwareVersion\": \"v4.5.0.1\","
-                        + " \"oamV4IpAddress\": \"10.16.123.234\","
-                        + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\""
-                        + "}}}]";
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutIpInformation)))
-                .expectSubscription().expectError(DmaapNotFoundException.class).verify();
+        ConsumerDmaapModel expectedObject = ImmutableConsumerDmaapModel.builder().ipv4("10.16.123.234")
+            .ipv6("0:0:0:0:0:FFFF:0A10:7BEA")
+            .correlationId("NOKQTFCOC540002E").build();
+        //when
+        DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+
+        //then
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(message)))
+            .expectSubscription().expectNext(expectedObject).expectNext(expectedObject).verifyComplete();
     }
 }
index c128fb9..689a732 100644 (file)
@@ -42,6 +42,7 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
 import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
@@ -71,36 +72,36 @@ class DmaapConsumerTaskImplTest {
         appConfig = mock(AppConfig.class);
 
         message = "[{\"event\": {"
-                + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"},"
-                + "\"pnfRegistrationFields\": {"
-                    + " \"unitType\": \"AirScale\","
-                    + " \"serialNumber\": \"QTFCOC540002E\","
-                    + " \"pnfRegistrationFieldsVersion\": \"2.0\","
-                    + " \"manufactureDate\": \"1535014037024\","
-                    + " \"modelNumber\": \"7BEA\",\n"
-                    + " \"lastServiceDate\": \"1535014037024\","
-                    + " \"unitFamily\": \"BBU\","
-                    + " \"vendorName\": \"Nokia\","
-                    + " \"oamV4IpAddress\": \"10.16.123.234\","
-                    + " \"softwareVersion\": \"v4.5.0.1\","
-                    + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\""
-                + "}}}]";
+            + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"},"
+            + "\"pnfRegistrationFields\": {"
+            + " \"unitType\": \"AirScale\","
+            + " \"serialNumber\": \"QTFCOC540002E\","
+            + " \"pnfRegistrationFieldsVersion\": \"2.0\","
+            + " \"manufactureDate\": \"1535014037024\","
+            + " \"modelNumber\": \"7BEA\",\n"
+            + " \"lastServiceDate\": \"1535014037024\","
+            + " \"unitFamily\": \"BBU\","
+            + " \"vendorName\": \"Nokia\","
+            + " \"oamV4IpAddress\": \"10.16.123.234\","
+            + " \"softwareVersion\": \"v4.5.0.1\","
+            + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\""
+            + "}}}]";
 
         parsed = "{\"event\": {"
-                + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"},"
-                + "\"pnfRegistrationFields\": {"
-                + " \"unitType\": \"AirScale\","
-                + " \"serialNumber\": \"QTFCOC540002E\","
-                + " \"pnfRegistrationFieldsVersion\": \"2.0\","
-                + " \"manufactureDate\": \"1535014037024\","
-                + " \"modelNumber\": \"7BEA\",\n"
-                + " \"lastServiceDate\": \"1535014037024\","
-                + " \"unitFamily\": \"BBU\","
-                + " \"vendorName\": \"Nokia\","
-                + " \"oamV4IpAddress\": \"10.16.123.234\","
-                + " \"softwareVersion\": \"v4.5.0.1\","
-                + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\""
-                + "}}}";
+            + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"},"
+            + "\"pnfRegistrationFields\": {"
+            + " \"unitType\": \"AirScale\","
+            + " \"serialNumber\": \"QTFCOC540002E\","
+            + " \"pnfRegistrationFieldsVersion\": \"2.0\","
+            + " \"manufactureDate\": \"1535014037024\","
+            + " \"modelNumber\": \"7BEA\",\n"
+            + " \"lastServiceDate\": \"1535014037024\","
+            + " \"unitFamily\": \"BBU\","
+            + " \"vendorName\": \"Nokia\","
+            + " \"oamV4IpAddress\": \"10.16.123.234\","
+            + " \"softwareVersion\": \"v4.5.0.1\","
+            + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\""
+            + "}}}";
     }
 
     @Test
@@ -120,11 +121,11 @@ class DmaapConsumerTaskImplTest {
         //given
         prepareMocksForDmaapConsumer(Optional.of(message));
         //when
-        Mono<ConsumerDmaapModel> response = dmaapConsumerTask.execute("Sample input");
+        Flux<ConsumerDmaapModel> response = dmaapConsumerTask.execute("Sample input");
 
         //then
         verify(dMaaPConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
-        assertEquals(consumerDmaapModel, response.block());
+        assertEquals(consumerDmaapModel, response.blockFirst());
 
 
     }