Make DFC handle multiple messages from MR DMaaP 35/74135/1
authorelinuxhenrik <henrik.b.andersson@est.tech>
Mon, 3 Dec 2018 11:50:45 +0000 (12:50 +0100)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Mon, 3 Dec 2018 11:55:53 +0000 (12:55 +0100)
MessageRouter might send multiple messages in the same respons when DFC polls. This was not handled by DFC.

Change-Id: I3bd1f62d68351d7149b94c9d6331777d485a7d53
Issue-ID: DCAEGEN2-1001
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
datafile-app-server/pom.xml
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
datafile-commons/pom.xml
datafile-dmaap-client/pom.xml
pom.xml
version.properties

index 2426745..7e39ef0 100644 (file)
@@ -24,7 +24,7 @@
   <parent>
     <groupId>org.onap.dcaegen2.collectors</groupId>
     <artifactId>datafile</artifactId>
-    <version>1.0.4-SNAPSHOT</version>
+    <version>1.0.5-SNAPSHOT</version>
   </parent>
 
   <groupId>org.onap.dcaegen2.collectors.datafile</groupId>
index cfd06db..78c5e98 100644 (file)
@@ -26,8 +26,6 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.StreamSupport;
 
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
 import org.slf4j.Logger;
@@ -67,76 +65,70 @@ public class DmaapConsumerJsonParser {
      * @return reactive Mono with an array of FileData
      */
     public Flux<FileData> getJsonObject(Mono<String> rawMessage) {
-        return rawMessage.flatMap(this::getJsonParserMessage).flatMapMany(this::createJsonConsumerModel);
+        return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
     }
 
     private Mono<JsonElement> getJsonParserMessage(String message) {
         logger.trace("original message from message router: {}", message);
-        return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
-            : Mono.fromSupplier(() -> new JsonParser().parse(message));
+        return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
     }
 
     private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
-        return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
-            : getFileDataFromJsonArray(jsonElement);
+        return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())))
+                : getFileDataFromJsonArray(jsonElement);
     }
 
     private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
-        return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
-            .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
+        return create(
+                Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
+                        .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
+                                .orElseGet(JsonObject::new)))));
     }
 
     public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
-        logger.trace("starting to getJsonObjectFromAnArray!");
-
-        return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject());
+        JsonParser jsonParser = new JsonParser();
+        return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
+                : element.isJsonObject() ? Optional.of((JsonObject) element)
+                        : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
     }
 
-    private Flux<FileData> create(Mono<JsonObject> jsonObject) {
-        return jsonObject.flatMapMany(monoJsonP -> !containsHeader(monoJsonP)
-            ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
-            : transform(monoJsonP));
+    private Flux<FileData> create(Flux<JsonObject> jsonObject) {
+        return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
+                ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject)
+                : transform(monoJsonP));
     }
 
-    private Flux<FileData> transform(JsonObject jsonObject) {
-        if (containsHeader(jsonObject, EVENT, NOTIFICATION_FIELDS)) {
-            JsonObject notificationFields = jsonObject.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
-            String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER);
-            String changeType = getValueFromJson(notificationFields, CHANGE_TYPE);
-            String notificationFieldsVersion = getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION);
-            JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
-            if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)
+    private Flux<FileData> transform(JsonObject message) {
+        JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
+        String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER);
+        String changeType = getValueFromJson(notificationFields, CHANGE_TYPE);
+        String notificationFieldsVersion = getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION);
+        JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
+
+        if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)
                 && arrayOfNamedHashMap != null) {
-                return getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap);
-            }
+            return getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap);
+        }
 
-            if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) {
-                return Flux.error(
-                    new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject));
-            } else if (arrayOfNamedHashMap != null) {
-                return Flux.error(
-                    new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject));
-            }
-            return Flux.error(
-                new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject));
+        if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) {
+            logger.error("FileReady event header is missing information. {}", message);
+        } else if (arrayOfNamedHashMap != null) {
+            logger.error("FileReady event arrayOfNamedHashMap is missing. {}", message);
         }
-        return Flux.error(
-            new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject));
+        return Flux.empty();
     }
 
     private Flux<FileData> getAllFileDataFromJson(String changeIdentifier, String changeType,
-        JsonArray arrayOfAdditionalFields) {
+            JsonArray arrayOfAdditionalFields) {
         List<FileData> res = new ArrayList<>();
         for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
-            if (arrayOfAdditionalFields.get(i) != null) {
-                JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
-                FileData fileData = getFileDataFromJson(fileInfo, changeIdentifier, changeType);
-
-                if (fileData != null) {
-                    res.add(fileData);
-                } else {
-                    logger.error("Unable to collect file from xNF. File information wrong. Data: {}", fileInfo);
-                }
+            JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
+            FileData fileData = getFileDataFromJson(fileInfo, changeIdentifier, changeType);
+
+            if (fileData != null) {
+                res.add(fileData);
+            } else {
+                logger.error("Unable to collect file from xNF. File information wrong. Data: {}", fileInfo);
             }
         }
         return Flux.fromIterable(res);
@@ -155,10 +147,18 @@ public class DmaapConsumerJsonParser {
         String compression = getValueFromJson(data, COMPRESSION);
 
         if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType)
-            && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) {
-            fileData = ImmutableFileData.builder().name(name).changeIdentifier(changeIdentifier).changeType(changeType)
-                .location(location).compression(compression).fileFormatType(fileFormatType)
-                .fileFormatVersion(fileFormatVersion).build();
+                && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) {
+            // @formatter:off
+            fileData = ImmutableFileData.builder()
+                    .name(name)
+                    .changeIdentifier(changeIdentifier)
+                    .changeType(changeType)
+                    .location(location)
+                    .compression(compression)
+                    .fileFormatType(fileFormatType)
+                    .fileFormatVersion(fileFormatVersion)
+                    .build();
+            // @formatter:on
         }
         return fileData;
     }
@@ -168,9 +168,9 @@ public class DmaapConsumerJsonParser {
     }
 
     private boolean isNotificationFieldsHeaderNotEmpty(String changeIdentifier, String changeType,
-        String notificationFieldsVersion) {
+            String notificationFieldsVersion) {
         return isStringIsNotNullAndNotEmpty(changeIdentifier) && isStringIsNotNullAndNotEmpty(changeType)
-            && isStringIsNotNullAndNotEmpty(notificationFieldsVersion);
+                && isStringIsNotNullAndNotEmpty(notificationFieldsVersion);
     }
 
     private boolean isFileFormatFieldsNotEmpty(String fileFormatVersion, String fileFormatType) {
@@ -178,19 +178,20 @@ public class DmaapConsumerJsonParser {
     }
 
     private boolean isNameAndLocationAndCompressionNotEmpty(String name, String location, String compression) {
-        return isStringIsNotNullAndNotEmpty(name) && isStringIsNotNullAndNotEmpty(location) &&
-                isStringIsNotNullAndNotEmpty(compression);
+        return isStringIsNotNullAndNotEmpty(name) && isStringIsNotNullAndNotEmpty(location)
+                && isStringIsNotNullAndNotEmpty(compression);
     }
 
-    private boolean containsHeader(JsonObject jsonObject) {
+    private boolean containsNotificationFields(JsonObject jsonObject) {
         return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
     }
 
-    private boolean containsHeader(JsonObject jsonObject, String topHeader, String header) {
-        return jsonObject.has(topHeader) && jsonObject.getAsJsonObject(topHeader).has(header);
-    }
-
     private boolean isStringIsNotNullAndNotEmpty(String string) {
         return string != null && !string.isEmpty();
     }
+
+    private Flux<FileData> logErrorAndReturnEmptyFlux(String errorMessage) {
+        logger.error(errorMessage);
+        return Flux.empty();
+    }
 }
index b5457b8..0c48300 100644 (file)
@@ -49,17 +49,32 @@ class DmaapConsumerJsonParserTest {
     private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
 
     @Test
-    void whenPassingCorrectJson_validationNotThrowingAnException() throws DmaapNotFoundException {
-        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
-                .compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION)
+    void whenPassingCorrectJson_oneFileData() throws DmaapNotFoundException {
+        // @formatter:off
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+                .name(PM_FILE_NAME)
+                .location(LOCATION)
+                .compression(GZIP_COMPRESSION)
+                .fileFormatType(FILE_FORMAT_TYPE)
+                .fileFormatVersion(FILE_FORMAT_VERSION)
+                .build();
+        JsonMessage message = new JsonMessage.JsonMessageBuilder()
+                .changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE)
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+                .addAdditionalField(additionalField)
                 .build();
-        JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
-                .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
-                .addAdditionalField(additionalField).build();
 
-        FileData expectedFileData = ImmutableFileData.builder().changeIdentifier(CHANGE_IDENTIFIER)
-                .changeType(CHANGE_TYPE).name(PM_FILE_NAME).location(LOCATION).compression(GZIP_COMPRESSION)
-                .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+        FileData expectedFileData = ImmutableFileData.builder()
+                .changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE)
+                .name(PM_FILE_NAME)
+                .location(LOCATION)
+                .compression(GZIP_COMPRESSION)
+                .fileFormatType(FILE_FORMAT_TYPE)
+                .fileFormatVersion(FILE_FORMAT_VERSION)
+                .build();
+        // @formatter:on
 
         String messageString = message.toString();
         String parsedString = message.getParsed();
@@ -72,14 +87,92 @@ class DmaapConsumerJsonParserTest {
                 .expectNext(expectedFileData).verifyComplete();
     }
 
+    @Test
+    void whenPassingCorrectJsonWithTwoEvents_twoFileData() throws DmaapNotFoundException {
+        // @formatter:off
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+                .name(PM_FILE_NAME)
+                .location(LOCATION)
+                .compression(GZIP_COMPRESSION)
+                .fileFormatType(FILE_FORMAT_TYPE)
+                .fileFormatVersion(FILE_FORMAT_VERSION)
+                .build();
+        JsonMessage message = new JsonMessage.JsonMessageBuilder()
+                .changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE)
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+                .addAdditionalField(additionalField)
+                .build();
+
+        FileData expectedFileData = ImmutableFileData.builder()
+                .changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE)
+                .name(PM_FILE_NAME)
+                .location(LOCATION)
+                .compression(GZIP_COMPRESSION)
+                .fileFormatType(FILE_FORMAT_TYPE)
+                .fileFormatVersion(FILE_FORMAT_VERSION)
+                .build();
+        // @formatter:on
+        String parsedString = message.getParsed();
+        String messageString = "[" + parsedString + "," + parsedString + "]";
+        DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+                .expectNext(expectedFileData).expectNext(expectedFileData).verifyComplete();
+    }
+
+    @Test
+    void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() throws DmaapNotFoundException {
+        // @formatter:off
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+                .name(PM_FILE_NAME)
+                .location(LOCATION)
+                .compression(GZIP_COMPRESSION)
+                .fileFormatType(FILE_FORMAT_TYPE)
+                .fileFormatVersion(FILE_FORMAT_VERSION)
+                .build();
+        JsonMessage message = new JsonMessage.JsonMessageBuilder()
+                .changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE)
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+                .addAdditionalField(additionalField)
+                .build();
+
+        FileData expectedFileData = ImmutableFileData.builder()
+                .changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE)
+                .name(PM_FILE_NAME)
+                .location(LOCATION)
+                .compression(GZIP_COMPRESSION)
+                .fileFormatType(FILE_FORMAT_TYPE)
+                .fileFormatVersion(FILE_FORMAT_VERSION)
+                .build();
+        // @formatter:on
+        String parsedString = message.getParsed();
+        String messageString = "[{\"event\":{}}," + parsedString + "]";
+        DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+                .expectNext(expectedFileData).verifyComplete();
+    }
+
     @Test
     void whenPassingCorrectJsonWihoutName_noFileData() {
-        AdditionalField additionalField =
-                new JsonMessage.AdditionalFieldBuilder().location(LOCATION).compression(GZIP_COMPRESSION)
-                        .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
-        JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
-                .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
-                .addAdditionalField(additionalField).build();
+        // @formatter:off
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+                .location(LOCATION)
+                .compression(GZIP_COMPRESSION)
+                .fileFormatType(FILE_FORMAT_TYPE)
+                .fileFormatVersion(FILE_FORMAT_VERSION)
+                .build();
+        JsonMessage message = new JsonMessage.JsonMessageBuilder()
+                .changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE)
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+                .addAdditionalField(additionalField)
+                .build();
+        // @formatter:on
 
         String messageString = message.toString();
         String parsedString = message.getParsed();
@@ -94,12 +187,20 @@ class DmaapConsumerJsonParserTest {
 
     @Test
     void whenPassingCorrectJsonWihoutLocation_noFileData() {
-        AdditionalField additionalField =
-                new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).compression(GZIP_COMPRESSION)
-                        .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
-        JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
-                .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
-                .addAdditionalField(additionalField).build();
+        // @formatter:off
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+                .name(PM_FILE_NAME)
+                .compression(GZIP_COMPRESSION)
+                .fileFormatType(FILE_FORMAT_TYPE)
+                .fileFormatVersion(FILE_FORMAT_VERSION)
+                .build();
+        JsonMessage message = new JsonMessage.JsonMessageBuilder()
+                .changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE)
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+                .addAdditionalField(additionalField)
+                .build();
+        // @formatter:on
 
         String messageString = message.toString();
         String parsedString = message.getParsed();
@@ -114,11 +215,20 @@ class DmaapConsumerJsonParserTest {
 
     @Test
     void whenPassingCorrectJsonWihoutCompression_noFileData() {
-        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
-                .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
-        JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
-                .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
-                .addAdditionalField(additionalField).build();
+        // @formatter:off
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+                .name(PM_FILE_NAME)
+                .location(LOCATION)
+                .fileFormatType(FILE_FORMAT_TYPE)
+                .fileFormatVersion(FILE_FORMAT_VERSION)
+                .build();
+        JsonMessage message = new JsonMessage.JsonMessageBuilder()
+                .changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE)
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+                .addAdditionalField(additionalField)
+                .build();
+        // @formatter:on
 
         String messageString = message.toString();
         String parsedString = message.getParsed();
@@ -133,11 +243,20 @@ class DmaapConsumerJsonParserTest {
 
     @Test
     void whenPassingCorrectJsonWihoutFileFormatType_noFileData() {
-        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
-                .compression(GZIP_COMPRESSION).fileFormatVersion(FILE_FORMAT_VERSION).build();
-        JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
-                .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
-                .addAdditionalField(additionalField).build();
+        // @formatter:off
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+                .name(PM_FILE_NAME)
+                .location(LOCATION)
+                .compression(GZIP_COMPRESSION)
+                .fileFormatVersion(FILE_FORMAT_VERSION)
+                .build();
+        JsonMessage message = new JsonMessage.JsonMessageBuilder()
+                .changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE)
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+                .addAdditionalField(additionalField)
+                .build();
+        // @formatter:on
 
         String messageString = message.toString();
         String parsedString = message.getParsed();
@@ -152,18 +271,38 @@ class DmaapConsumerJsonParserTest {
 
     @Test
     void whenPassingOneCorrectJsonWihoutFileFormatVersionAndOneCorrect_oneFileData() {
-        AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME)
-                .location(LOCATION).compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).build();
-        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
-                .compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION)
+        // @formatter:off
+        AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder()
+                .name(PM_FILE_NAME)
+                .location(LOCATION)
+                .compression(GZIP_COMPRESSION)
+                .fileFormatType(FILE_FORMAT_TYPE)
+                .build();
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+                .name(PM_FILE_NAME)
+                .location(LOCATION)
+                .compression(GZIP_COMPRESSION)
+                .fileFormatType(FILE_FORMAT_TYPE)
+                .fileFormatVersion(FILE_FORMAT_VERSION)
+                .build();
+        JsonMessage message = new JsonMessage.JsonMessageBuilder()
+                .changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE)
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+                .addAdditionalField(additionalFaultyField)
+                .addAdditionalField(additionalField)
                 .build();
-        JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
-                .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
-                .addAdditionalField(additionalFaultyField).addAdditionalField(additionalField).build();
 
-        FileData expectedFileData = ImmutableFileData.builder().changeIdentifier(CHANGE_IDENTIFIER)
-                .changeType(CHANGE_TYPE).name(PM_FILE_NAME).location(LOCATION).compression(GZIP_COMPRESSION)
-                .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+        FileData expectedFileData = ImmutableFileData.builder()
+                .changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE)
+                .name(PM_FILE_NAME)
+                .location(LOCATION)
+                .compression(GZIP_COMPRESSION)
+                .fileFormatType(FILE_FORMAT_TYPE)
+                .fileFormatVersion(FILE_FORMAT_VERSION)
+                .build();
+        // @formatter:on
 
         String messageString = message.toString();
         String parsedString = message.getParsed();
@@ -177,9 +316,14 @@ class DmaapConsumerJsonParserTest {
     }
 
     @Test
-    void whenPassingJsonWithoutMandatoryHeaderInformation_validationThrowingAnException() {
-        JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES_INVALID")
-                .changeType("FileReady_INVALID").notificationFieldsVersion("1.0_INVALID").build();
+    void whenPassingJsonWithoutMandatoryHeaderInformation_noFileData() {
+        // @formatter:off
+        JsonMessage message = new JsonMessage.JsonMessageBuilder()
+                .changeIdentifier("PM_MEAS_FILES_INVALID")
+                .changeType("FileReady_INVALID")
+                .notificationFieldsVersion("1.0_INVALID")
+                .build();
+        // @formatter:on
 
         String incorrectMessageString = message.toString();
         String parsedString = message.getParsed();
@@ -189,11 +333,11 @@ class DmaapConsumerJsonParserTest {
                 .getJsonObjectFromAnArray(jsonElement);
 
         StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString)))
-                .expectSubscription().expectError(DmaapNotFoundException.class).verify();
+                .expectSubscription().verifyComplete();
     }
 
     @Test
-    void whenPassingJsonWithNullJsonElement_validationThrowingAnException() {
+    void whenPassingJsonWithNullJsonElement_noFileData() {
         JsonMessage message = new JsonMessage.JsonMessageBuilder().build();
 
         String incorrectMessageString = message.toString();
@@ -205,6 +349,6 @@ class DmaapConsumerJsonParserTest {
                 .getJsonObjectFromAnArray(jsonElement);
 
         StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString)))
-                .expectSubscription().expectError(DmaapNotFoundException.class).verify();
+                .expectSubscription().verifyComplete();
     }
 }
index 8c6e2e6..080c5f9 100644 (file)
@@ -24,7 +24,7 @@
   <parent>
     <groupId>org.onap.dcaegen2.collectors</groupId>
     <artifactId>datafile</artifactId>
-    <version>1.0.4-SNAPSHOT</version>
+    <version>1.0.5-SNAPSHOT</version>
   </parent>
 
   <groupId>org.onap.dcaegen2.collectors.datafile</groupId>
index 52394ad..562eb29 100644 (file)
@@ -24,7 +24,7 @@
   <parent>
     <groupId>org.onap.dcaegen2.collectors</groupId>
     <artifactId>datafile</artifactId>
-    <version>1.0.4-SNAPSHOT</version>
+    <version>1.0.5-SNAPSHOT</version>
   </parent>
 
   <groupId>org.onap.dcaegen2.collectors.datafile</groupId>
diff --git a/pom.xml b/pom.xml
index 5bd941f..7842994 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -30,7 +30,7 @@
 
   <groupId>org.onap.dcaegen2.collectors</groupId>
   <artifactId>datafile</artifactId>
-  <version>1.0.4-SNAPSHOT</version>
+  <version>1.0.5-SNAPSHOT</version>
 
   <name>dcaegen2-collectors.datafile</name>
   <description>datafile collector</description>
index d489444..65b11d1 100644 (file)
@@ -1,6 +1,6 @@
 major=1\r
 minor=0\r
-patch=4\r
+patch=5\r
 base_version=${major}.${minor}.${patch}\r
 release_version=${base_version}\r
 snapshot_version=${base_version}-SNAPSHOT\r