2 * ============LICENSE_START======================================================================
3 * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
4 * ===============================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6 * in compliance with the License. You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software distributed under the License
11 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12 * or implied. See the License for the specific language governing permissions and limitations under
14 * ============LICENSE_END========================================================================
17 package org.onap.dcaegen2.collectors.datafile.service;
19 import com.google.gson.JsonArray;
20 import com.google.gson.JsonElement;
21 import com.google.gson.JsonObject;
22 import com.google.gson.JsonParser;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.Optional;
27 import java.util.stream.StreamSupport;
29 import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
30 import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
31 import org.onap.dcaegen2.collectors.datafile.model.FileData;
32 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 import org.springframework.util.StringUtils;
37 import reactor.core.publisher.Flux;
38 import reactor.core.publisher.Mono;
41 * Parses the fileReady event and creates an array of FileData containing the information.
43 * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
44 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
46 public class DmaapConsumerJsonParser {
47 private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerJsonParser.class);
49 private static final String EVENT = "event";
50 private static final String NOTIFICATION_FIELDS = "notificationFields";
51 private static final String CHANGE_IDENTIFIER = "changeIdentifier";
52 private static final String CHANGE_TYPE = "changeType";
53 private static final String NOTIFICATION_FIELDS_VERSION = "notificationFieldsVersion";
55 private static final String ARRAY_OF_NAMED_HASH_MAP = "arrayOfNamedHashMap";
56 private static final String NAME = "name";
57 private static final String HASH_MAP = "hashMap";
58 private static final String LOCATION = "location";
59 private static final String COMPRESSION = "compression";
60 private static final String FILE_FORMAT_TYPE = "fileFormatType";
61 private static final String FILE_FORMAT_VERSION = "fileFormatVersion";
64 * Extract info from string and create @see {@link FileData}.
66 * @param rawMessage - results from DMaaP
67 * @return reactive Mono with an array of FileData
69 public Flux<FileData> getJsonObject(Mono<String> rawMessage) {
70 return rawMessage.flatMap(this::getJsonParserMessage).flatMapMany(this::createJsonConsumerModel);
73 private Mono<JsonElement> getJsonParserMessage(String message) {
74 logger.trace("original message from message router: {}", message);
75 return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
76 : Mono.fromSupplier(() -> new JsonParser().parse(message));
79 private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
80 return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
81 : getFileDataFromJsonArray(jsonElement);
84 private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
85 return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
86 .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
89 public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
90 logger.trace("starting to getJsonObjectFromAnArray!");
92 return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject());
95 private Flux<FileData> create(Mono<JsonObject> jsonObject) {
96 return jsonObject.flatMapMany(monoJsonP -> !containsHeader(monoJsonP)
97 ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
98 : transform(monoJsonP));
101 private Flux<FileData> transform(JsonObject jsonObject) {
102 if (containsHeader(jsonObject, EVENT, NOTIFICATION_FIELDS)) {
103 JsonObject notificationFields = jsonObject.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
104 String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER);
105 String changeType = getValueFromJson(notificationFields, CHANGE_TYPE);
106 String notificationFieldsVersion = getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION);
107 JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
108 if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)
109 && arrayOfNamedHashMap != null) {
110 return getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap);
113 if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) {
115 new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject));
116 } else if (arrayOfNamedHashMap != null) {
118 new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject));
121 new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject));
124 new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject));
127 private Flux<FileData> getAllFileDataFromJson(String changeIdentifier, String changeType,
128 JsonArray arrayOfAdditionalFields) {
129 List<FileData> res = new ArrayList<>();
130 for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
131 if (arrayOfAdditionalFields.get(i) != null) {
132 JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
133 FileData fileData = getFileDataFromJson(fileInfo, changeIdentifier, changeType);
135 if (fileData != null) {
138 logger.error("Unable to collect file from xNF. File information wrong. Data: {}", fileInfo);
142 return Flux.fromIterable(res);
145 private FileData getFileDataFromJson(JsonObject fileInfo, String changeIdentifier, String changeType) {
146 logger.trace("starting to getFileDataFromJson!");
148 FileData fileData = null;
150 String name = getValueFromJson(fileInfo, NAME);
151 JsonObject data = fileInfo.getAsJsonObject(HASH_MAP);
152 String fileFormatType = getValueFromJson(data, FILE_FORMAT_TYPE);
153 String fileFormatVersion = getValueFromJson(data, FILE_FORMAT_VERSION);
154 String location = getValueFromJson(data, LOCATION);
155 String compression = getValueFromJson(data, COMPRESSION);
157 if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType)
158 && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) {
159 fileData = ImmutableFileData.builder().name(name).changeIdentifier(changeIdentifier).changeType(changeType)
160 .location(location).compression(compression).fileFormatType(fileFormatType)
161 .fileFormatVersion(fileFormatVersion).build();
166 private String getValueFromJson(JsonObject jsonObject, String jsonKey) {
167 return jsonObject.has(jsonKey) ? jsonObject.get(jsonKey).getAsString() : "";
170 private boolean isNotificationFieldsHeaderNotEmpty(String changeIdentifier, String changeType,
171 String notificationFieldsVersion) {
172 return isStringIsNotNullAndNotEmpty(changeIdentifier) && isStringIsNotNullAndNotEmpty(changeType)
173 && isStringIsNotNullAndNotEmpty(notificationFieldsVersion);
176 private boolean isFileFormatFieldsNotEmpty(String fileFormatVersion, String fileFormatType) {
177 return isStringIsNotNullAndNotEmpty(fileFormatVersion) && isStringIsNotNullAndNotEmpty(fileFormatType);
180 private boolean isNameAndLocationAndCompressionNotEmpty(String name, String location, String compression) {
181 return isStringIsNotNullAndNotEmpty(name) && isStringIsNotNullAndNotEmpty(location) &&
182 isStringIsNotNullAndNotEmpty(compression);
185 private boolean containsHeader(JsonObject jsonObject) {
186 return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
189 private boolean containsHeader(JsonObject jsonObject, String topHeader, String header) {
190 return jsonObject.has(topHeader) && jsonObject.getAsJsonObject(topHeader).has(header);
193 private boolean isStringIsNotNullAndNotEmpty(String string) {
194 return string != null && !string.isEmpty();