Bug fix due to failures on CSIT tests 57/70257/1 1.1.0
authorpwielebs <piotr.wielebski@nokia.com>
Thu, 11 Oct 2018 13:11:23 +0000 (15:11 +0200)
committerpwielebs <piotr.wielebski@nokia.com>
Thu, 11 Oct 2018 13:11:23 +0000 (15:11 +0200)
Change-Id: Ib35b9448178f906f47527e49bdd14be75a17e578
Issue-ID: DCAEGEN2-866
Signed-off-by: pwielebs <piotr.wielebski@nokia.com>
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java

index aed9974..bf3987c 100644 (file)
@@ -23,21 +23,25 @@ package org.onap.dcaegen2.services.prh.service;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
-import java.util.Optional;
-import java.util.stream.StreamSupport;
-import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
 import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.util.StringUtils;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.util.Optional;
+import java.util.stream.StreamSupport;
+
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
  */
 public class DmaapConsumerJsonParser {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerJsonParser.class);
+
     private static final String EVENT = "event";
     private static final String COMMON_EVENT_HEADER = "commonEventHeader";
     private static final String PNF_REGISTRATION_FIELDS = "pnfRegistrationFields";
@@ -59,7 +63,7 @@ public class DmaapConsumerJsonParser {
     }
 
     private Mono<JsonElement> getJsonParserMessage(String message) {
-        return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
+        return StringUtils.isEmpty(message) ? logErrorAndReturnMonoEmpty("DmaaP response is empty")
             : Mono.fromCallable(() -> new JsonParser().parse(message));
     }
 
@@ -84,7 +88,7 @@ public class DmaapConsumerJsonParser {
 
     private Flux<ConsumerDmaapModel> create(Flux<JsonObject> jsonObject) {
         return jsonObject.flatMap(monoJsonP ->
-            !containsHeader(monoJsonP) ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
+            !containsHeader(monoJsonP) ? logErrorAndReturnMonoEmpty("Incorrect JsonObject - missing header")
                 : transform(monoJsonP))
             .onErrorResume(exception -> exception instanceof DmaapNotFoundException, e -> Mono.empty());
     }
@@ -101,8 +105,8 @@ public class DmaapConsumerJsonParser {
         String pnfOamIpv6Address = getValueFromJson(pnfRegistrationFields, OAM_IPV_6_ADDRESS);
 
         return (StringUtils.isEmpty(pnfSourceName) || !ipPropertiesNotEmpty(pnfOamIpv4Address, pnfOamIpv6Address))
-            ? Mono.error(new DmaapNotFoundException("Incorrect json, consumerDmaapModel can not be created: "
-            + printMessage(pnfSourceName, pnfOamIpv4Address, pnfOamIpv6Address))) :
+            ? logErrorAndReturnMonoEmpty("Incorrect json, consumerDmaapModel can not be created: "
+            + printMessage(pnfSourceName, pnfOamIpv4Address, pnfOamIpv6Address)) :
             Mono.just(ImmutableConsumerDmaapModel.builder()
                 .correlationId(pnfSourceName)
                 .ipv4(pnfOamIpv4Address)
@@ -128,4 +132,9 @@ public class DmaapConsumerJsonParser {
             + "\"" + OAM_IPV_6_ADDRESS + "\": \"%s\""
             + "%n}", sourceName, oamIpv4Address, oamIpv6Address);
     }
+
+    private <T> Mono<T> logErrorAndReturnMonoEmpty(String messageForLogger) {
+        LOGGER.warn(messageForLogger);
+        return Mono.empty();
+    }
 }
index 689a732..c17e254 100644 (file)
@@ -109,25 +109,25 @@ class DmaapConsumerTaskImplTest {
         //given
         prepareMocksForDmaapConsumer(Optional.empty());
 
-        //then
-        StepVerifier.create(dmaapConsumerTask.execute("Sample input")).expectSubscription()
-            .expectError(DmaapEmptyResponseException.class).verify();
+        //when
+        Flux<ConsumerDmaapModel> response = dmaapConsumerTask.execute("Sample input");
 
+        //then
         verify(dMaaPConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
+        assertEquals(null, response.blockFirst());
     }
 
     @Test
     void whenPassedObjectFits_ReturnsCorrectResponse() {
         //given
         prepareMocksForDmaapConsumer(Optional.of(message));
+
         //when
         Flux<ConsumerDmaapModel> response = dmaapConsumerTask.execute("Sample input");
 
         //then
         verify(dMaaPConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
         assertEquals(consumerDmaapModel, response.blockFirst());
-
-
     }
 
     private void prepareMocksForDmaapConsumer(Optional<String> message) {