private static final String FILE_FORMAT_TYPE = "fileFormatType";
private static final String FILE_FORMAT_VERSION = "fileFormatVersion";
+ private static final String FILE_READY_CHANGE_TYPE = "FileReady";
+ private static final String FILE_READY_CHANGE_IDENTIFIER = "PM_MEAS_FILES";
+
/**
* Extract info from string and create @see {@link FileData}.
*
private Mono<JsonElement> getJsonParserMessage(String message) {
logger.trace("original message from message router: {}", message);
return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
- : Mono.fromSupplier(() -> new JsonParser().parse(message));
+ : Mono.fromSupplier(() -> new JsonParser().parse(message));
}
private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
- : getFileDataFromJsonArray(jsonElement);
+ : getFileDataFromJsonArray(jsonElement);
}
private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
- .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
+ .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
}
public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
private Flux<FileData> create(Mono<JsonObject> jsonObject) {
return jsonObject.flatMapMany(monoJsonP -> !containsHeader(monoJsonP)
- ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
- : transform(monoJsonP));
+ ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
+ : transform(monoJsonP));
}
private Flux<FileData> transform(JsonObject jsonObject) {
String notificationFieldsVersion = getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION);
JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)
- && arrayOfNamedHashMap != null) {
+ && arrayOfNamedHashMap != null && isChangeIdentifierCorrect(changeIdentifier)
+ && isChangeTypeCorrect(changeType)) {
return getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap);
}
- if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) {
- return Flux.error(
- new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject));
- } else if (arrayOfNamedHashMap != null) {
- return Flux.error(
- new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject));
- }
- return Flux.error(
- new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject));
+ return handleJsonError(changeIdentifier, changeType, notificationFieldsVersion, arrayOfNamedHashMap,
+ jsonObject);
}
return Flux.error(
- new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject));
+ new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject));
+ }
+
+ private boolean isChangeTypeCorrect(String changeType) {
+ return FILE_READY_CHANGE_TYPE.equals(changeType);
+ }
+
+ private boolean isChangeIdentifierCorrect(String changeIdentifier) {
+ return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier);
}
private Flux<FileData> getAllFileDataFromJson(String changeIdentifier, String changeType,
- JsonArray arrayOfAdditionalFields) {
+ JsonArray arrayOfAdditionalFields) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
if (arrayOfAdditionalFields.get(i) != null) {
String compression = getValueFromJson(data, COMPRESSION);
if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType)
- && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) {
+ && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) {
fileData = ImmutableFileData.builder().name(name).changeIdentifier(changeIdentifier).changeType(changeType)
- .location(location).compression(compression).fileFormatType(fileFormatType)
- .fileFormatVersion(fileFormatVersion).build();
+ .location(location).compression(compression).fileFormatType(fileFormatType)
+ .fileFormatVersion(fileFormatVersion).build();
}
return fileData;
}
}
private boolean isNotificationFieldsHeaderNotEmpty(String changeIdentifier, String changeType,
- String notificationFieldsVersion) {
+ String notificationFieldsVersion) {
return isStringIsNotNullAndNotEmpty(changeIdentifier) && isStringIsNotNullAndNotEmpty(changeType)
- && isStringIsNotNullAndNotEmpty(notificationFieldsVersion);
+ && isStringIsNotNullAndNotEmpty(notificationFieldsVersion);
}
private boolean isFileFormatFieldsNotEmpty(String fileFormatVersion, String fileFormatType) {
}
private boolean isNameAndLocationAndCompressionNotEmpty(String name, String location, String compression) {
- return isStringIsNotNullAndNotEmpty(name) && isStringIsNotNullAndNotEmpty(location) &&
- isStringIsNotNullAndNotEmpty(compression);
+ return isStringIsNotNullAndNotEmpty(name) && isStringIsNotNullAndNotEmpty(location)
+ && isStringIsNotNullAndNotEmpty(compression);
}
private boolean containsHeader(JsonObject jsonObject) {
private boolean isStringIsNotNullAndNotEmpty(String string) {
return string != null && !string.isEmpty();
}
+
+ private Flux<FileData> handleJsonError(String changeIdentifier, String changeType, String notificationFieldsVersion,
+ JsonArray arrayOfNamedHashMap, JsonObject jsonObject) {
+ String errorMessage = "FileReady event information is incomplete or incorrect!\n";
+ if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) {
+ errorMessage += "header is missing.\n";
+ }
+ if (arrayOfNamedHashMap == null) {
+ errorMessage += "arrayOfNamedHashMap is missing.\n";
+ }
+ if (!isChangeIdentifierCorrect(changeIdentifier)) {
+ errorMessage += "changeIdentifier is incorrect.\n";
+ }
+ if (!isChangeTypeCorrect(changeType)) {
+ errorMessage += "changeType is incorrect.\n";
+ }
+ return Flux.error(new DmaapNotFoundException(errorMessage + jsonObject));
+ }
}
private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
private static final String FILE_FORMAT_VERSION = "V10";
private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
+ private static final String INCORRECT_CHANGE_IDENTIFIER = "INCORRECT_PM_MEAS_FILES";
private static final String CHANGE_TYPE = "FileReady";
+ private static final String INCORRECT_CHANGE_TYPE = "IncorrectFileReady";
private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
@Test
}
@Test
- void whenPassingCorrectJsonWihoutName_noFileData() {
+ void whenPassingCorrectJsonWithoutName_noFileData() {
AdditionalField additionalField =
new JsonMessage.AdditionalFieldBuilder().location(LOCATION).compression(GZIP_COMPRESSION)
.fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
}
@Test
- void whenPassingCorrectJsonWihoutLocation_noFileData() {
+ void whenPassingCorrectJsonWithoutLocation_noFileData() {
AdditionalField additionalField =
new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).compression(GZIP_COMPRESSION)
.fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
}
@Test
- void whenPassingCorrectJsonWihoutCompression_noFileData() {
+ void whenPassingCorrectJsonWithoutCompression_noFileData() {
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
.fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
}
@Test
- void whenPassingCorrectJsonWihoutFileFormatType_noFileData() {
+ void whenPassingCorrectJsonWithoutFileFormatType_noFileData() {
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
.compression(GZIP_COMPRESSION).fileFormatVersion(FILE_FORMAT_VERSION).build();
JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
}
@Test
- void whenPassingOneCorrectJsonWihoutFileFormatVersionAndOneCorrect_oneFileData() {
+ void whenPassingOneCorrectJsonWithoutFileFormatVersionAndOneCorrect_oneFileData() {
AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME)
.location(LOCATION).compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).build();
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString)))
.expectSubscription().expectError(DmaapNotFoundException.class).verify();
}
+
+ @Test
+ void whenPassingCorrectJsonWithIncorrectChangeType_validationThrowingAnException() {
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
+ .compression(GZIP_COMPRESSION).fileFormatVersion(FILE_FORMAT_VERSION).build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
+ .changeType(INCORRECT_CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField).build();
+
+ String messageString = message.toString();
+ String parsedString = message.getParsed();
+ DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonElement jsonElement = new JsonParser().parse(parsedString);
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ .getJsonObjectFromAnArray(jsonElement);
+
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNextCount(0).expectError(DmaapNotFoundException.class).verify();
+ }
+
+ @Test
+ void whenPassingCorrectJsonWithIncorrectChangeIdentifier_validationThrowingAnException() {
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
+ .compression(GZIP_COMPRESSION).fileFormatVersion(FILE_FORMAT_VERSION).build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(INCORRECT_CHANGE_IDENTIFIER)
+ .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+ .addAdditionalField(additionalField).build();
+
+ String messageString = message.toString();
+ String parsedString = message.getParsed();
+ DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+ JsonElement jsonElement = new JsonParser().parse(parsedString);
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+ .getJsonObjectFromAnArray(jsonElement);
+
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+ .expectNextCount(0).expectError(DmaapNotFoundException.class).verify();
+ }
}