2 * ============LICENSE_START========================================================================
3 * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
4 * ==================================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
19 package org.onap.dcaegen2.collectors.datafile.service;
21 import com.google.gson.JsonArray;
22 import com.google.gson.JsonElement;
23 import com.google.gson.JsonObject;
24 import com.google.gson.JsonParser;
27 import java.util.ArrayList;
28 import java.util.List;
29 import java.util.Optional;
30 import java.util.stream.StreamSupport;
32 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
33 import org.onap.dcaegen2.collectors.datafile.model.FileData;
34 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
35 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
36 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
37 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
38 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 import org.springframework.util.StringUtils;
43 import reactor.core.publisher.Flux;
44 import reactor.core.publisher.Mono;
47 * Parses the fileReady event and creates a Flux of FileReadyMessage containing the information.
49 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
51 public class JsonMessageParser {
52 private static final Logger logger = LoggerFactory.getLogger(JsonMessageParser.class);
54 public static final String ERROR_MSG_VES_EVENT_PARSING = "VES event parsing. ";
56 private static final String COMMON_EVENT_HEADER = "commonEventHeader";
57 private static final String EVENT_NAME = "eventName";
58 private static final String LAST_EPOCH_MICROSEC = "lastEpochMicrosec";
59 private static final String SOURCE_NAME = "sourceName";
60 private static final String START_EPOCH_MICROSEC = "startEpochMicrosec";
61 private static final String TIME_ZONE_OFFSET = "timeZoneOffset";
63 private static final String EVENT = "event";
64 private static final String NOTIFICATION_FIELDS = "notificationFields";
65 private static final String CHANGE_IDENTIFIER = "changeIdentifier";
66 private static final String CHANGE_TYPE = "changeType";
67 private static final String NOTIFICATION_FIELDS_VERSION = "notificationFieldsVersion";
69 private static final String ARRAY_OF_NAMED_HASH_MAP = "arrayOfNamedHashMap";
70 private static final String NAME = "name";
71 private static final String HASH_MAP = "hashMap";
72 private static final String LOCATION = "location";
73 private static final String COMPRESSION = "compression";
74 private static final String FILE_FORMAT_TYPE = "fileFormatType";
75 private static final String FILE_FORMAT_VERSION = "fileFormatVersion";
77 private static final String FILE_READY_CHANGE_TYPE = "FileReady";
80 * The data types available in the event name.
82 private enum EventNameDataType {
83 PRODUCT_NAME(1), VENDOR_NAME(2);
87 EventNameDataType(int index) {
93 * Parses the Json message and returns a stream of messages.
95 * @param rawMessage the Json message to parse.
96 * @return a <code>Flux</code> containing messages.
98 public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) {
99 return rawMessage.flatMapMany(JsonMessageParser::getJsonParserMessage).flatMap(this::createMessageData);
102 Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
103 JsonParser jsonParser = new JsonParser();
104 if (element.isJsonPrimitive()) {
105 return Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject());
106 } else if (element.isJsonObject()) {
107 return Optional.of((JsonObject) element);
109 return Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
113 private Flux<FileReadyMessage> getMessagesFromJsonArray(JsonElement jsonElement) {
114 return createMessages(Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
115 .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray).orElseGet(JsonObject::new))));
119 * Extract info from string and create a Flux of {@link FileReadyMessage}.
121 * @param rawMessage - results from DMaaP
122 * @return reactive Flux of FileReadyMessages
124 private Flux<FileReadyMessage> createMessageData(JsonElement jsonElement) {
125 return jsonElement.isJsonObject() ? createMessages(Flux.just(jsonElement.getAsJsonObject()))
126 : getMessagesFromJsonArray(jsonElement);
129 private static Mono<JsonElement> getJsonParserMessage(String message) {
130 return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
133 private static Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
134 return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP)
135 : logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject));
138 private static Mono<FileReadyMessage> transformMessages(JsonObject message) {
139 Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message);
140 if (optionalMessageMetaData.isPresent()) {
141 MessageMetaData messageMetaData = optionalMessageMetaData.get();
142 JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
143 JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
144 if (arrayOfNamedHashMap != null) {
145 List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap, messageMetaData);
146 if (!allFileDataFromJson.isEmpty()) {
147 return Mono.just(ImmutableFileReadyMessage.builder() //
148 .files(allFileDataFromJson) //
155 logger.error(ERROR_MSG_VES_EVENT_PARSING + "Missing arrayOfNamedHashMap in message. {}", message);
158 logger.error(ERROR_MSG_VES_EVENT_PARSING + "FileReady event has incorrect JsonObject. {}", message);
162 private static Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
163 List<String> missingValues = new ArrayList<>();
164 JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER);
165 String eventName = getValueFromJson(commonEventHeader, EVENT_NAME, missingValues);
167 JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
168 String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER, missingValues);
169 String changeType = getValueFromJson(notificationFields, CHANGE_TYPE, missingValues);
171 // Just to check that it is in the message. Might be needed in the future if there is a new
173 getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues);
175 MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
176 .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues)) //
177 .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues)) //
178 .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues)) //
179 .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues)) //
180 .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues)) //
181 .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues)) //
182 .changeIdentifier(changeIdentifier) //
183 .changeType(changeType) //
185 if (missingValues.isEmpty() && isChangeTypeCorrect(changeType)) {
186 return Optional.of(messageMetaData);
188 String errorMessage = ERROR_MSG_VES_EVENT_PARSING;
189 if (!missingValues.isEmpty()) {
190 errorMessage += "Missing data: " + missingValues + ".";
192 if (!isChangeTypeCorrect(changeType)) {
193 errorMessage += " Change type is wrong: " + changeType + " Expected: " + FILE_READY_CHANGE_TYPE;
195 errorMessage += " Message: {}";
196 logger.error(errorMessage, message);
197 return Optional.empty();
201 private static boolean isChangeTypeCorrect(String changeType) {
202 return FILE_READY_CHANGE_TYPE.equals(changeType);
205 private static List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields,
206 MessageMetaData messageMetaData) {
207 List<FileData> res = new ArrayList<>();
208 for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
209 JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
210 Optional<FileData> fileData = getFileDataFromJson(fileInfo, messageMetaData);
212 if (fileData.isPresent()) {
213 res.add(fileData.get());
219 private static Optional<FileData> getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) {
220 logger.trace("starting to getFileDataFromJson!");
222 List<String> missingValues = new ArrayList<>();
223 JsonObject data = fileInfo.getAsJsonObject(HASH_MAP);
225 String location = getValueFromJson(data, LOCATION, missingValues);
226 if (StringUtils.isEmpty(location)) {
227 logger.error(ERROR_MSG_VES_EVENT_PARSING + "File information wrong. Missing location. Data: {} {}",
228 messageMetaData, fileInfo);
229 return Optional.empty();
233 scheme = Scheme.getSchemeFromString(URI.create(location).getScheme());
234 } catch (Exception e) {
235 logger.error(ERROR_MSG_VES_EVENT_PARSING + "{}. Location: {} Data: {}", e.getMessage(), location,
237 return Optional.empty();
239 FileData fileData = ImmutableFileData.builder() //
240 .name(getValueFromJson(fileInfo, NAME, missingValues)) //
241 .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues)) //
242 .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues)) //
243 .location(location) //
245 .compression(getValueFromJson(data, COMPRESSION, missingValues)) //
246 .messageMetaData(messageMetaData) //
248 if (missingValues.isEmpty()) {
249 return Optional.of(fileData);
251 logger.error(ERROR_MSG_VES_EVENT_PARSING + "File information wrong. Missing data: {} Data: {}", missingValues,
253 return Optional.empty();
257 * Gets data from the event name. Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description},
258 * example: Noti_RnNode-Ericsson_FileReady
260 * @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}.
261 * @param eventName The event name to get the data from.
262 * @param missingValues List of missing values. The dataType will be added if missing.
263 * @return String of data from event name
265 private static String getDataFromEventName(EventNameDataType dataType, String eventName,
266 List<String> missingValues) {
267 String[] eventArray = eventName.split("_|-");
268 if (eventArray.length >= 4) {
269 return eventArray[dataType.index];
271 missingValues.add(dataType.toString());
273 ERROR_MSG_VES_EVENT_PARSING + "Can not get {} from eventName, eventName is not in correct format: {}",
274 dataType, eventName);
279 private static String getValueFromJson(JsonObject jsonObject, String jsonKey, List<String> missingValues) {
280 if (jsonObject.has(jsonKey)) {
281 return jsonObject.get(jsonKey).getAsString();
283 missingValues.add(jsonKey);
288 private static boolean containsNotificationFields(JsonObject jsonObject) {
289 return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
292 private static Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) {
293 logger.error(errorMessage);