import java.util.Optional;
import java.util.stream.StreamSupport;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
* @return reactive Mono with an array of FileData
*/
public Flux<FileData> getJsonObject(Mono<String> rawMessage) {
- return rawMessage.flatMap(this::getJsonParserMessage).flatMapMany(this::createJsonConsumerModel);
+ return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
}
private Mono<JsonElement> getJsonParserMessage(String message) {
logger.trace("original message from message router: {}", message);
- return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
- : Mono.fromSupplier(() -> new JsonParser().parse(message));
+ return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
}
private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
- return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
+ return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())))
: getFileDataFromJsonArray(jsonElement);
}
private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
- return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
- .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
+ return create(
+ Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
+ .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
+ .orElseGet(JsonObject::new)))));
}
public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
- logger.trace("starting to getJsonObjectFromAnArray!");
-
- return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject());
+ JsonParser jsonParser = new JsonParser();
+ return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
+ : element.isJsonObject() ? Optional.of((JsonObject) element)
+ : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
}
- private Flux<FileData> create(Mono<JsonObject> jsonObject) {
- return jsonObject.flatMapMany(monoJsonP -> !containsNotificationFields(monoJsonP)
- ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header. " + jsonObject))
- : transform(monoJsonP));
+ private Flux<FileData> create(Flux<JsonObject> jsonObject) {
+ return jsonObject
+ .flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
+ ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject)
+ : transform(monoJsonP));
}
private Flux<FileData> transform(JsonObject message) {
return getAllFileDataFromJson(fileMetaData.get(), arrayOfNamedHashMap);
}
- return Flux.error(new DmaapNotFoundException(
- "Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. " + message));
+ logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message);
+ return Flux.empty();
}
- return Flux.error(new DmaapNotFoundException(
- "Unable to collect file from xNF. FileReady event has incorrect JsonObject"));
+ logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message);
+ return Flux.empty();
}
private Optional<FileMetaData> getFileMetaData(JsonObject message) {
private Flux<FileData> getAllFileDataFromJson(FileMetaData fileMetaData, JsonArray arrayOfAdditionalFields) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
- if (arrayOfAdditionalFields.get(i) != null) {
- JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
- Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo);
+ JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
+ Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo);
- if (fileData.isPresent()) {
- res.add(fileData.get());
- }
+ if (fileData.isPresent()) {
+ res.add(fileData.get());
}
}
return Flux.fromIterable(res);
private boolean containsNotificationFields(JsonObject jsonObject) {
return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
}
+
+ private Flux<FileData> logErrorAndReturnEmptyFlux(String errorMessage) {
+ logger.error(errorMessage);
+ return Flux.empty();
+ }
}
private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
@Test
- void whenPassingCorrectJson_validationNotThrowingAnException() throws DmaapNotFoundException {
+ void whenPassingCorrectJson_oneFileData() throws DmaapNotFoundException {
// @formatter:off
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
.name(PM_FILE_NAME)
}
@Test
- void whenPassingCorrectJsonWithFaultyEventName_validationThrowingAnException() {
+ void whenPassingCorrectJsonWithTwoEvents_twoFileData() throws DmaapNotFoundException {
+ // @formatter:off
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField)
+ .build();
+
+ FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ .productName(PRODUCT_NAME)
+ .vendorName(VENDOR_NAME)
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
+ .sourceName(SOURCE_NAME)
+ .startEpochMicrosec(START_EPOCH_MICROSEC)
+ .timeZoneOffset(TIME_ZONE_OFFSET)
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .build();
+ FileData expectedFileData = ImmutableFileData.builder()
+ .fileMetaData(fileMetaData)
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ // @formatter:on
+ String parsedString = message.getParsed();
+ String messageString = "[" + parsedString + "," + parsedString + "]";
+ DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNext(expectedFileData).expectNext(expectedFileData).verifyComplete();
+ }
+
+ @Test
+ void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan()
+ throws DmaapNotFoundException {
+ // @formatter:off
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField)
+ .build();
+
+ FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+ .productName(PRODUCT_NAME)
+ .vendorName(VENDOR_NAME)
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
+ .sourceName(SOURCE_NAME)
+ .startEpochMicrosec(START_EPOCH_MICROSEC)
+ .timeZoneOffset(TIME_ZONE_OFFSET)
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .build();
+ FileData expectedFileData = ImmutableFileData.builder()
+ .fileMetaData(fileMetaData)
+ .name(PM_FILE_NAME)
+ .location(LOCATION)
+ .compression(GZIP_COMPRESSION)
+ .fileFormatType(FILE_FORMAT_TYPE)
+ .fileFormatVersion(FILE_FORMAT_VERSION)
+ .build();
+ // @formatter:on
+ String parsedString = message.getParsed();
+ String messageString = "[{\"event\":{}}," + parsedString + "]";
+ DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNext(expectedFileData).verifyComplete();
+ }
+
+ @Test
+ void whenPassingCorrectJsonWithFaultyEventName_noFileData() {
// @formatter:off
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
.location(LOCATION)
.getJsonObjectFromAnArray(jsonElement);
StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectError(DmaapNotFoundException.class).verify();
+ .expectComplete().verify();
}
@Test
.expectNextCount(0).verifyComplete();
}
+ @Test
+ void whenPassingCorrectJsonWithoutAdditionalFields_noFileData() {
+ // @formatter:off
+ JsonMessage message = new JsonMessage.JsonMessageBuilder()
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
+ .changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE)
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .build();
+ // @formatter:on
+ String messageString = message.toString();
+ String parsedString = message.getParsed();
+ DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonElement jsonElement = new JsonParser().parse(parsedString);
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ .getJsonObjectFromAnArray(jsonElement);
+
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNextCount(0).verifyComplete();
+ }
+
@Test
void whenPassingCorrectJsonWithoutLocation_noFileData() {
// @formatter:off
}
@Test
- void whenPassingJsonWithoutMandatoryHeaderInformation_validationThrowingAnException() {
+ void whenPassingJsonWithoutMandatoryHeaderInformation_noFileData() {
// @formatter:off
JsonMessage message = new JsonMessage.JsonMessageBuilder()
.eventName(NR_RADIO_ERICSSON_EVENT_NAME)
.getJsonObjectFromAnArray(jsonElement);
StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString)))
- .expectSubscription().expectError(DmaapNotFoundException.class).verify();
+ .expectSubscription().expectComplete().verify();
}
@Test
- void whenPassingJsonWithNullJsonElement_validationThrowingAnException() {
- JsonMessage message = new JsonMessage.JsonMessageBuilder().build();
-
- String incorrectMessageString = message.toString();
- String parsedString = message.getParsed();
+ void whenPassingJsonWithNullJsonElement_noFileData() {
DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = new JsonParser().parse("{}");
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString)))
- .expectSubscription().expectError(DmaapNotFoundException.class).verify();
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just("[{}]"))).expectSubscription()
+ .expectComplete().verify();
}
@Test
- void whenPassingCorrectJsonWithIncorrectChangeType_validationThrowingAnException() {
+ void whenPassingCorrectJsonWithIncorrectChangeType_noFileData() {
// @formatter:off
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
.name(PM_FILE_NAME)
.getJsonObjectFromAnArray(jsonElement);
StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNextCount(0).expectError(DmaapNotFoundException.class).verify();
+ .expectNextCount(0).expectComplete().verify();
}
@Test
- void whenPassingCorrectJsonWithIncorrectChangeIdentifier_validationThrowingAnException() {
+ void whenPassingCorrectJsonWithIncorrectChangeIdentifier_noFileData() {
// @formatter:off
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
.name(PM_FILE_NAME)
.getJsonObjectFromAnArray(jsonElement);
StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
- .expectNextCount(0).expectError(DmaapNotFoundException.class).verify();
+ .expectComplete().verify();
}
}