Make DFC handle multiple messages from MR 36/74136/1
authorelinuxhenrik <henrik.b.andersson@est.tech>
Fri, 30 Nov 2018 09:06:54 +0000 (10:06 +0100)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Mon, 3 Dec 2018 11:59:46 +0000 (12:59 +0100)
DMaaP MessageRouter might send multiple messages in the same respons when
DFC polls. This was not handled by DFC.

Change-Id: I78c2fc7f4512a07fadf61c2cf1f6399d466fc873
Issue-ID: DCAEGEN2-1001
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java

index e828776..46c6e94 100644 (file)
@@ -26,8 +26,6 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.StreamSupport;
 
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
@@ -92,35 +90,38 @@ public class DmaapConsumerJsonParser {
      * @return reactive Mono with an array of FileData
      */
     public Flux<FileData> getJsonObject(Mono<String> rawMessage) {
-        return rawMessage.flatMap(this::getJsonParserMessage).flatMapMany(this::createJsonConsumerModel);
+        return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
     }
 
     private Mono<JsonElement> getJsonParserMessage(String message) {
         logger.trace("original message from message router: {}", message);
-        return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
-                : Mono.fromSupplier(() -> new JsonParser().parse(message));
+        return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
     }
 
     private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
-        return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
+        return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())))
                 : getFileDataFromJsonArray(jsonElement);
     }
 
     private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
-        return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
-                .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
+        return create(
+                Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
+                        .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
+                                .orElseGet(JsonObject::new)))));
     }
 
     public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
-        logger.trace("starting to getJsonObjectFromAnArray!");
-
-        return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject());
+        JsonParser jsonParser = new JsonParser();
+        return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
+                : element.isJsonObject() ? Optional.of((JsonObject) element)
+                        : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
     }
 
-    private Flux<FileData> create(Mono<JsonObject> jsonObject) {
-        return jsonObject.flatMapMany(monoJsonP -> !containsNotificationFields(monoJsonP)
-                ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header. " + jsonObject))
-                : transform(monoJsonP));
+    private Flux<FileData> create(Flux<JsonObject> jsonObject) {
+        return jsonObject
+                .flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
+                        ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject)
+                        : transform(monoJsonP));
     }
 
     private Flux<FileData> transform(JsonObject message) {
@@ -132,11 +133,11 @@ public class DmaapConsumerJsonParser {
                 return getAllFileDataFromJson(fileMetaData.get(), arrayOfNamedHashMap);
             }
 
-            return Flux.error(new DmaapNotFoundException(
-                    "Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. " + message));
+            logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message);
+            return Flux.empty();
         }
-        return Flux.error(new DmaapNotFoundException(
-                "Unable to collect file from xNF. FileReady event has incorrect JsonObject"));
+        logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message);
+        return Flux.empty();
     }
 
     private Optional<FileMetaData> getFileMetaData(JsonObject message) {
@@ -191,13 +192,11 @@ public class DmaapConsumerJsonParser {
     private Flux<FileData> getAllFileDataFromJson(FileMetaData fileMetaData, JsonArray arrayOfAdditionalFields) {
         List<FileData> res = new ArrayList<>();
         for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
-            if (arrayOfAdditionalFields.get(i) != null) {
-                JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
-                Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo);
+            JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
+            Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo);
 
-                if (fileData.isPresent()) {
-                    res.add(fileData.get());
-                }
+            if (fileData.isPresent()) {
+                res.add(fileData.get());
             }
         }
         return Flux.fromIterable(res);
@@ -260,4 +259,9 @@ public class DmaapConsumerJsonParser {
     private boolean containsNotificationFields(JsonObject jsonObject) {
         return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
     }
+
+    private Flux<FileData> logErrorAndReturnEmptyFlux(String errorMessage) {
+        logger.error(errorMessage);
+        return Flux.empty();
+    }
 }
index a9bc546..0ae9ece 100644 (file)
@@ -60,7 +60,7 @@ class DmaapConsumerJsonParserTest {
     private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
 
     @Test
-    void whenPassingCorrectJson_validationNotThrowingAnException() throws DmaapNotFoundException {
+    void whenPassingCorrectJson_oneFileData() throws DmaapNotFoundException {
         // @formatter:off
         AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
                 .name(PM_FILE_NAME)
@@ -108,7 +108,98 @@ class DmaapConsumerJsonParserTest {
     }
 
     @Test
-    void whenPassingCorrectJsonWithFaultyEventName_validationThrowingAnException() {
+    void whenPassingCorrectJsonWithTwoEvents_twoFileData() throws DmaapNotFoundException {
+        // @formatter:off
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+                .name(PM_FILE_NAME)
+                .location(LOCATION)
+                .compression(GZIP_COMPRESSION)
+                .fileFormatType(FILE_FORMAT_TYPE)
+                .fileFormatVersion(FILE_FORMAT_VERSION)
+                .build();
+        JsonMessage message = new JsonMessage.JsonMessageBuilder()
+                .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
+                .changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE)
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+                .addAdditionalField(additionalField)
+                .build();
+
+        FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+                .productName(PRODUCT_NAME)
+                .vendorName(VENDOR_NAME)
+                .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
+                .sourceName(SOURCE_NAME)
+                .startEpochMicrosec(START_EPOCH_MICROSEC)
+                .timeZoneOffset(TIME_ZONE_OFFSET)
+                .changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE)
+                .build();
+        FileData expectedFileData = ImmutableFileData.builder()
+                .fileMetaData(fileMetaData)
+                .name(PM_FILE_NAME)
+                .location(LOCATION)
+                .compression(GZIP_COMPRESSION)
+                .fileFormatType(FILE_FORMAT_TYPE)
+                .fileFormatVersion(FILE_FORMAT_VERSION)
+                .build();
+        // @formatter:on
+        String parsedString = message.getParsed();
+        String messageString = "[" + parsedString + "," + parsedString + "]";
+        DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+                .expectNext(expectedFileData).expectNext(expectedFileData).verifyComplete();
+    }
+
+    @Test
+    void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan()
+            throws DmaapNotFoundException {
+        // @formatter:off
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+                .name(PM_FILE_NAME)
+                .location(LOCATION)
+                .compression(GZIP_COMPRESSION)
+                .fileFormatType(FILE_FORMAT_TYPE)
+                .fileFormatVersion(FILE_FORMAT_VERSION)
+                .build();
+        JsonMessage message = new JsonMessage.JsonMessageBuilder()
+                .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
+                .changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE)
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+                .addAdditionalField(additionalField)
+                .build();
+
+        FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+                .productName(PRODUCT_NAME)
+                .vendorName(VENDOR_NAME)
+                .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
+                .sourceName(SOURCE_NAME)
+                .startEpochMicrosec(START_EPOCH_MICROSEC)
+                .timeZoneOffset(TIME_ZONE_OFFSET)
+                .changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE)
+                .build();
+        FileData expectedFileData = ImmutableFileData.builder()
+                .fileMetaData(fileMetaData)
+                .name(PM_FILE_NAME)
+                .location(LOCATION)
+                .compression(GZIP_COMPRESSION)
+                .fileFormatType(FILE_FORMAT_TYPE)
+                .fileFormatVersion(FILE_FORMAT_VERSION)
+                .build();
+        // @formatter:on
+        String parsedString = message.getParsed();
+        String messageString = "[{\"event\":{}}," + parsedString + "]";
+        DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+                .expectNext(expectedFileData).verifyComplete();
+    }
+
+    @Test
+    void whenPassingCorrectJsonWithFaultyEventName_noFileData() {
         // @formatter:off
         AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
                 .location(LOCATION)
@@ -132,7 +223,7 @@ class DmaapConsumerJsonParserTest {
                 .getJsonObjectFromAnArray(jsonElement);
 
         StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
-                .expectError(DmaapNotFoundException.class).verify();
+                .expectComplete().verify();
     }
 
     @Test
@@ -163,6 +254,27 @@ class DmaapConsumerJsonParserTest {
                 .expectNextCount(0).verifyComplete();
     }
 
+    @Test
+    void whenPassingCorrectJsonWithoutAdditionalFields_noFileData() {
+        // @formatter:off
+        JsonMessage message = new JsonMessage.JsonMessageBuilder()
+                .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
+                .changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE)
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+                .build();
+        // @formatter:on
+        String messageString = message.toString();
+        String parsedString = message.getParsed();
+        DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+        JsonElement jsonElement = new JsonParser().parse(parsedString);
+        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+        .getJsonObjectFromAnArray(jsonElement);
+
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+        .expectNextCount(0).verifyComplete();
+    }
+
     @Test
     void whenPassingCorrectJsonWithoutLocation_noFileData() {
         // @formatter:off
@@ -303,7 +415,7 @@ class DmaapConsumerJsonParserTest {
     }
 
     @Test
-    void whenPassingJsonWithoutMandatoryHeaderInformation_validationThrowingAnException() {
+    void whenPassingJsonWithoutMandatoryHeaderInformation_noFileData() {
         // @formatter:off
         JsonMessage message = new JsonMessage.JsonMessageBuilder()
                 .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
@@ -320,27 +432,23 @@ class DmaapConsumerJsonParserTest {
                 .getJsonObjectFromAnArray(jsonElement);
 
         StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString)))
-                .expectSubscription().expectError(DmaapNotFoundException.class).verify();
+                .expectSubscription().expectComplete().verify();
     }
 
     @Test
-    void whenPassingJsonWithNullJsonElement_validationThrowingAnException() {
-        JsonMessage message = new JsonMessage.JsonMessageBuilder().build();
-
-        String incorrectMessageString = message.toString();
-        String parsedString = message.getParsed();
+    void whenPassingJsonWithNullJsonElement_noFileData() {
         DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
-        JsonElement jsonElement = new JsonParser().parse(parsedString);
+        JsonElement jsonElement = new JsonParser().parse("{}");
 
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
                 .getJsonObjectFromAnArray(jsonElement);
 
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString)))
-                .expectSubscription().expectError(DmaapNotFoundException.class).verify();
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just("[{}]"))).expectSubscription()
+                .expectComplete().verify();
     }
 
     @Test
-    void whenPassingCorrectJsonWithIncorrectChangeType_validationThrowingAnException() {
+    void whenPassingCorrectJsonWithIncorrectChangeType_noFileData() {
         // @formatter:off
         AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
                 .name(PM_FILE_NAME)
@@ -364,11 +472,11 @@ class DmaapConsumerJsonParserTest {
                 .getJsonObjectFromAnArray(jsonElement);
 
         StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
-                .expectNextCount(0).expectError(DmaapNotFoundException.class).verify();
+                .expectNextCount(0).expectComplete().verify();
     }
 
     @Test
-    void whenPassingCorrectJsonWithIncorrectChangeIdentifier_validationThrowingAnException() {
+    void whenPassingCorrectJsonWithIncorrectChangeIdentifier_noFileData() {
         // @formatter:off
         AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
                 .name(PM_FILE_NAME)
@@ -392,6 +500,6 @@ class DmaapConsumerJsonParserTest {
                 .getJsonObjectFromAnArray(jsonElement);
 
         StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
-                .expectNextCount(0).expectError(DmaapNotFoundException.class).verify();
+                .expectComplete().verify();
     }
 }