5e02ecdde172ae89947be71720dac300ae3879d5
[dcaegen2/collectors/datafile.git] /
1 /*-
2  * ============LICENSE_START========================================================================
3  * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
4  * ==================================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  */
18
19 package org.onap.dcaegen2.collectors.datafile.service;
20
21 import com.google.gson.JsonArray;
22 import com.google.gson.JsonElement;
23 import com.google.gson.JsonObject;
24 import com.google.gson.JsonParser;
25 import java.net.URI;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.Optional;
29 import java.util.stream.StreamSupport;
30 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
31 import org.onap.dcaegen2.collectors.datafile.model.FileData;
32 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
33 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
34 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
35 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
36 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 import org.springframework.util.StringUtils;
40 import reactor.core.publisher.Flux;
41 import reactor.core.publisher.Mono;
42
43 /**
44  * Parses the fileReady event and creates a Flux of FileReadyMessage containing the information.
45  *
46  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
47  */
48 public class JsonMessageParser {
49     private static final Logger logger = LoggerFactory.getLogger(JsonMessageParser.class);
50
51     public static final String ERROR_MSG_VES_EVENT_PARSING = "VES event parsing. ";
52
53     private static final String COMMON_EVENT_HEADER = "commonEventHeader";
54     private static final String EVENT_NAME = "eventName";
55     private static final String LAST_EPOCH_MICROSEC = "lastEpochMicrosec";
56     private static final String SOURCE_NAME = "sourceName";
57     private static final String START_EPOCH_MICROSEC = "startEpochMicrosec";
58     private static final String TIME_ZONE_OFFSET = "timeZoneOffset";
59
60     private static final String EVENT = "event";
61     private static final String NOTIFICATION_FIELDS = "notificationFields";
62     private static final String CHANGE_IDENTIFIER = "changeIdentifier";
63     private static final String CHANGE_TYPE = "changeType";
64     private static final String NOTIFICATION_FIELDS_VERSION = "notificationFieldsVersion";
65
66     private static final String ARRAY_OF_NAMED_HASH_MAP = "arrayOfNamedHashMap";
67     private static final String NAME = "name";
68     private static final String HASH_MAP = "hashMap";
69     private static final String LOCATION = "location";
70     private static final String COMPRESSION = "compression";
71     private static final String FILE_FORMAT_TYPE = "fileFormatType";
72     private static final String FILE_FORMAT_VERSION = "fileFormatVersion";
73
74     private static final String FILE_READY_CHANGE_TYPE = "FileReady";
75
76     /**
77      * The data types available in the event name.
78      */
79     private enum EventNameDataType {
80         PRODUCT_NAME(1), VENDOR_NAME(2);
81
82         private int index;
83
84         EventNameDataType(int index) {
85             this.index = index;
86         }
87     }
88
89     /**
90      * Parses the Json message and returns a stream of messages.
91      *
92      * @param rawMessage the Json message to parse.
93      * @return a <code>Flux</code> containing messages.
94      */
95     public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) {
96         return rawMessage.flatMapMany(JsonMessageParser::getJsonParserMessage).flatMap(this::createMessageData);
97     }
98
99     Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
100         JsonParser jsonParser = new JsonParser();
101         if (element.isJsonPrimitive()) {
102             return Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject());
103         } else if (element.isJsonObject()) {
104             return Optional.of((JsonObject) element);
105         } else {
106             return Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
107         }
108     }
109
110     private Flux<FileReadyMessage> getMessagesFromJsonArray(JsonElement jsonElement) {
111         return createMessages(Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
112                 .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
113                         .orElseGet(JsonObject::new))));
114     }
115
116     /**
117      * Extract info from string and create a Flux of {@link FileReadyMessage}.
118      *
119      * @param rawMessage - results from DMaaP
120      * @return reactive Flux of FileReadyMessages
121      */
122     private Flux<FileReadyMessage> createMessageData(JsonElement jsonElement) {
123         return jsonElement.isJsonObject() ? createMessages(Flux.just(jsonElement.getAsJsonObject()))
124                 : getMessagesFromJsonArray(jsonElement);
125     }
126
127     private static Mono<JsonElement> getJsonParserMessage(String message) {
128         return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
129     }
130
131     private static Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
132         return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP)
133                 : logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject));
134     }
135
136
137     private static Mono<FileReadyMessage> transformMessages(JsonObject message) {
138         Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message);
139         if (optionalMessageMetaData.isPresent()) {
140             MessageMetaData messageMetaData = optionalMessageMetaData.get();
141             JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
142             JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
143             if (arrayOfNamedHashMap != null) {
144                 List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap, messageMetaData);
145                 if (!allFileDataFromJson.isEmpty()) {
146                     return Mono.just(ImmutableFileReadyMessage.builder() //
147                             .files(allFileDataFromJson) //
148                             .build());
149                 } else {
150                     return Mono.empty();
151                 }
152             }
153
154             logger.error(ERROR_MSG_VES_EVENT_PARSING + "Missing arrayOfNamedHashMap in message. {}", message);
155             return Mono.empty();
156         }
157         logger.error(ERROR_MSG_VES_EVENT_PARSING + "FileReady event has incorrect JsonObject. {}", message);
158         return Mono.empty();
159     }
160
161
162     private static Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
163         List<String> missingValues = new ArrayList<>();
164         JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER);
165         String eventName = getValueFromJson(commonEventHeader, EVENT_NAME, missingValues);
166
167         JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
168         String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER, missingValues);
169         String changeType = getValueFromJson(notificationFields, CHANGE_TYPE, missingValues);
170
171         // Just to check that it is in the message. Might be needed in the future if there is a new
172         // version.
173         getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues);
174
175         MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
176                 .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues)) //
177                 .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues)) //
178                 .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues)) //
179                 .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues)) //
180                 .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues)) //
181                 .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues)) //
182                 .changeIdentifier(changeIdentifier) //
183                 .changeType(changeType) //
184                 .build();
185         if (missingValues.isEmpty() && isChangeTypeCorrect(changeType)) {
186             return Optional.of(messageMetaData);
187         } else {
188             String errorMessage = ERROR_MSG_VES_EVENT_PARSING;
189             if (!missingValues.isEmpty()) {
190                 errorMessage += "Missing data: " + missingValues + ".";
191             }
192             if (!isChangeTypeCorrect(changeType)) {
193                 errorMessage += " Change type is wrong: " + changeType + " Expected: " + FILE_READY_CHANGE_TYPE;
194             }
195             errorMessage += " Message: {}";
196             logger.error(errorMessage, message);
197             return Optional.empty();
198         }
199     }
200
201     private static boolean isChangeTypeCorrect(String changeType) {
202         return FILE_READY_CHANGE_TYPE.equals(changeType);
203     }
204
205     private static List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields,
206             MessageMetaData messageMetaData) {
207         List<FileData> res = new ArrayList<>();
208         for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
209             JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
210             Optional<FileData> fileData = getFileDataFromJson(fileInfo, messageMetaData);
211
212             if (fileData.isPresent()) {
213                 res.add(fileData.get());
214             }
215         }
216         return res;
217     }
218
219     private static Optional<FileData> getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) {
220         logger.trace("starting to getFileDataFromJson!");
221
222         List<String> missingValues = new ArrayList<>();
223         JsonObject data = fileInfo.getAsJsonObject(HASH_MAP);
224
225         String location = getValueFromJson(data, LOCATION, missingValues);
226         if (StringUtils.isEmpty(location)) {
227             logger.error(ERROR_MSG_VES_EVENT_PARSING + "File information wrong. Missing location. Data: {} {}",
228                     messageMetaData, fileInfo);
229             return Optional.empty();
230         }
231         Scheme scheme;
232         try {
233             scheme = Scheme.getSchemeFromString(URI.create(location).getScheme());
234         } catch (Exception e) {
235             logger.error(ERROR_MSG_VES_EVENT_PARSING + "{}. Location: {} Data: {}", e.getMessage(), location,
236                     messageMetaData, e);
237             return Optional.empty();
238         }
239         FileData fileData = ImmutableFileData.builder() //
240                 .name(getValueFromJson(fileInfo, NAME, missingValues)) //
241                 .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues)) //
242                 .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues)) //
243                 .location(location) //
244                 .scheme(scheme) //
245                 .compression(getValueFromJson(data, COMPRESSION, missingValues)) //
246                 .messageMetaData(messageMetaData) //
247                 .build();
248         if (missingValues.isEmpty()) {
249             return Optional.of(fileData);
250         }
251         logger.error(ERROR_MSG_VES_EVENT_PARSING + "File information wrong. Missing data: {} Data: {}", missingValues,
252                 fileInfo);
253         return Optional.empty();
254     }
255
256     /**
257      * Gets data from the event name. Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description},
258      * example: Noti_RnNode-Ericsson_FileReady
259      *
260      * @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}.
261      * @param eventName The event name to get the data from.
262      * @param missingValues List of missing values. The dataType will be added if missing.
263      * @return String of data from event name
264      */
265     private static String getDataFromEventName(EventNameDataType dataType, String eventName,
266             List<String> missingValues) {
267         String[] eventArray = eventName.split("_|-");
268         if (eventArray.length >= 4) {
269             return eventArray[dataType.index];
270         } else {
271             missingValues.add(dataType.toString());
272             logger.error(
273                     ERROR_MSG_VES_EVENT_PARSING
274                             + "Can not get {} from eventName, eventName is not in correct format: {}",
275                     dataType, eventName);
276         }
277         return "";
278     }
279
280     private static String getValueFromJson(JsonObject jsonObject, String jsonKey, List<String> missingValues) {
281         if (jsonObject.has(jsonKey)) {
282             return jsonObject.get(jsonKey).getAsString();
283         } else {
284             missingValues.add(jsonKey);
285             return "";
286         }
287     }
288
289     private static boolean containsNotificationFields(JsonObject jsonObject) {
290         return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
291     }
292
293     private static Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) {
294         logger.error(errorMessage);
295         return Flux.empty();
296     }
297 }