708865fa40a2b68ce240dc68fe486143d8ad1991
[dcaegen2/collectors/datafile.git] /
1 /*-
2  * ============LICENSE_START========================================================================
3  * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
4  * ==================================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  */
18
19 package org.onap.dcaegen2.collectors.datafile.service;
20
21 import com.google.gson.JsonArray;
22 import com.google.gson.JsonElement;
23 import com.google.gson.JsonObject;
24 import com.google.gson.JsonParser;
25
26 import java.net.URI;
27 import java.util.ArrayList;
28 import java.util.List;
29 import java.util.Optional;
30 import java.util.stream.StreamSupport;
31
32 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
33 import org.onap.dcaegen2.collectors.datafile.model.FileData;
34 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
35 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
36 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
37 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
38 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 import org.springframework.util.StringUtils;
42
43 import reactor.core.publisher.Flux;
44 import reactor.core.publisher.Mono;
45
46 /**
47  * Parses the fileReady event and creates a Flux of FileReadyMessage containing the information.
48  *
49  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
50  */
51 public class JsonMessageParser {
52     private static final Logger logger = LoggerFactory.getLogger(JsonMessageParser.class);
53
54     public static final String ERROR_MSG_VES_EVENT_PARSING = "VES event parsing. ";
55
56     private static final String COMMON_EVENT_HEADER = "commonEventHeader";
57     private static final String EVENT_NAME = "eventName";
58     private static final String LAST_EPOCH_MICROSEC = "lastEpochMicrosec";
59     private static final String SOURCE_NAME = "sourceName";
60     private static final String START_EPOCH_MICROSEC = "startEpochMicrosec";
61     private static final String TIME_ZONE_OFFSET = "timeZoneOffset";
62
63     private static final String EVENT = "event";
64     private static final String NOTIFICATION_FIELDS = "notificationFields";
65     private static final String CHANGE_IDENTIFIER = "changeIdentifier";
66     private static final String CHANGE_TYPE = "changeType";
67     private static final String NOTIFICATION_FIELDS_VERSION = "notificationFieldsVersion";
68
69     private static final String ARRAY_OF_NAMED_HASH_MAP = "arrayOfNamedHashMap";
70     private static final String NAME = "name";
71     private static final String HASH_MAP = "hashMap";
72     private static final String LOCATION = "location";
73     private static final String COMPRESSION = "compression";
74     private static final String FILE_FORMAT_TYPE = "fileFormatType";
75     private static final String FILE_FORMAT_VERSION = "fileFormatVersion";
76
77     private static final String FILE_READY_CHANGE_TYPE = "FileReady";
78
79     /**
80      * The data types available in the event name.
81      */
82     private enum EventNameDataType {
83         PRODUCT_NAME(1), VENDOR_NAME(2);
84
85         private int index;
86
87         EventNameDataType(int index) {
88             this.index = index;
89         }
90     }
91
92     /**
93      * Parses the Json message and returns a stream of messages.
94      *
95      * @param rawMessage the Json message to parse.
96      * @return a <code>Flux</code> containing messages.
97      */
98
99     public Flux<FileReadyMessage> getMessagesFromJson(Flux<JsonElement> rawMessage) {
100         return rawMessage.flatMap(this::createMessageData);
101     }
102
103     Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
104         if (element.isJsonPrimitive()) {
105             return Optional.of(JsonParser.parseString(element.getAsString()).getAsJsonObject());
106         } else if (element.isJsonObject()) {
107             return Optional.of((JsonObject) element);
108         } else {
109             return Optional.of(JsonParser.parseString(element.toString()).getAsJsonObject());
110         }
111     }
112
113     private Flux<FileReadyMessage> getMessagesFromJsonArray(JsonElement jsonElement) {
114         return createMessages(Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
115             .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray).orElseGet(JsonObject::new))));
116     }
117
118     /**
119      * Extract info from jsonElement and create a Flux of {@link FileReadyMessage}.
120      *
121      * @param jsonElement - result from DMaaP
122      * @return reactive Flux of FileReadyMessages
123      */
124     private Flux<FileReadyMessage> createMessageData(JsonElement jsonElement) {
125         return jsonElement.isJsonObject() ? createMessages(Flux.just(jsonElement.getAsJsonObject()))
126             : getMessagesFromJsonArray(jsonElement);
127     }
128
129     private static Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
130         return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP)
131             : logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject));
132     }
133
134     private static Mono<FileReadyMessage> transformMessages(JsonObject message) {
135         Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message);
136         if (optionalMessageMetaData.isPresent()) {
137             MessageMetaData messageMetaData = optionalMessageMetaData.get();
138             JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
139             JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
140             if (arrayOfNamedHashMap != null) {
141                 List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap, messageMetaData);
142                 if (!allFileDataFromJson.isEmpty()) {
143                     return Mono.just(ImmutableFileReadyMessage.builder() //
144                         .files(allFileDataFromJson) //
145                         .build());
146                 } else {
147                     return Mono.empty();
148                 }
149             }
150
151             logger.error(ERROR_MSG_VES_EVENT_PARSING + "Missing arrayOfNamedHashMap in message. {}", message);
152             return Mono.empty();
153         }
154         logger.error(ERROR_MSG_VES_EVENT_PARSING + "FileReady event has incorrect JsonObject. {}", message);
155         return Mono.empty();
156     }
157
158     private static Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
159         List<String> missingValues = new ArrayList<>();
160         JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER);
161         String eventName = getValueFromJson(commonEventHeader, EVENT_NAME, missingValues);
162
163         JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
164         String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER, missingValues);
165         String changeType = getValueFromJson(notificationFields, CHANGE_TYPE, missingValues);
166
167         // Just to check that it is in the message. Might be needed in the future if there is a new
168         // version.
169         getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues);
170
171         MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
172             .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues)) //
173             .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues)) //
174             .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues)) //
175             .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues)) //
176             .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues)) //
177             .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues)) //
178             .changeIdentifier(changeIdentifier) //
179             .changeType(changeType) //
180             .build();
181         if (missingValues.isEmpty() && isChangeTypeCorrect(changeType)) {
182             return Optional.of(messageMetaData);
183         } else {
184             String errorMessage = ERROR_MSG_VES_EVENT_PARSING;
185             if (!missingValues.isEmpty()) {
186                 errorMessage += "Missing data: " + missingValues + ".";
187             }
188             if (!isChangeTypeCorrect(changeType)) {
189                 errorMessage += " Change type is wrong: " + changeType + " Expected: " + FILE_READY_CHANGE_TYPE;
190             }
191             errorMessage += " Message: {}";
192             logger.error(errorMessage, message);
193             return Optional.empty();
194         }
195     }
196
197     private static boolean isChangeTypeCorrect(String changeType) {
198         return FILE_READY_CHANGE_TYPE.equals(changeType);
199     }
200
201     private static List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields,
202         MessageMetaData messageMetaData) {
203         List<FileData> res = new ArrayList<>();
204         for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
205             JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
206             Optional<FileData> fileData = getFileDataFromJson(fileInfo, messageMetaData);
207
208             if (fileData.isPresent()) {
209                 res.add(fileData.get());
210             }
211         }
212         return res;
213     }
214
215     private static Optional<FileData> getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) {
216         logger.trace("starting to getFileDataFromJson!");
217
218         List<String> missingValues = new ArrayList<>();
219         JsonObject data = fileInfo.getAsJsonObject(HASH_MAP);
220
221         String location = getValueFromJson(data, LOCATION, missingValues);
222         if (StringUtils.isEmpty(location)) {
223             logger.error(ERROR_MSG_VES_EVENT_PARSING + "File information wrong. Missing location. Data: {} {}",
224                 messageMetaData, fileInfo);
225             return Optional.empty();
226         }
227         Scheme scheme;
228         try {
229             scheme = Scheme.getSchemeFromString(URI.create(location).getScheme());
230         } catch (Exception e) {
231             logger.error(ERROR_MSG_VES_EVENT_PARSING + "{}. Location: {} Data: {}", e.getMessage(), location,
232                 messageMetaData, e);
233             return Optional.empty();
234         }
235         FileData fileData = ImmutableFileData.builder() //
236             .name(getValueFromJson(fileInfo, NAME, missingValues)) //
237             .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues)) //
238             .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues)) //
239             .location(location) //
240             .scheme(scheme) //
241             .compression(getValueFromJson(data, COMPRESSION, missingValues)) //
242             .messageMetaData(messageMetaData) //
243             .build();
244         if (missingValues.isEmpty()) {
245             return Optional.of(fileData);
246         }
247         logger.error(ERROR_MSG_VES_EVENT_PARSING + "File information wrong. Missing data: {} Data: {}", missingValues,
248             fileInfo);
249         return Optional.empty();
250     }
251
252     /**
253      * Gets data from the event name. Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description},
254      * example: Noti_RnNode-Ericsson_FileReady
255      *
256      * @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}.
257      * @param eventName The event name to get the data from.
258      * @param missingValues List of missing values. The dataType will be added if missing.
259      * @return String of data from event name
260      */
261     private static String getDataFromEventName(EventNameDataType dataType, String eventName,
262         List<String> missingValues) {
263         String[] eventArray = eventName.split("_|-");
264         if (eventArray.length >= 4) {
265             return eventArray[dataType.index];
266         } else {
267             missingValues.add(dataType.toString());
268             logger.error(
269                 ERROR_MSG_VES_EVENT_PARSING + "Can not get {} from eventName, eventName is not in correct format: {}",
270                 dataType, eventName);
271         }
272         return "";
273     }
274
275     private static String getValueFromJson(JsonObject jsonObject, String jsonKey, List<String> missingValues) {
276         if (jsonObject.has(jsonKey)) {
277             return jsonObject.get(jsonKey).getAsString();
278         } else {
279             missingValues.add(jsonKey);
280             return "";
281         }
282     }
283
284     private static boolean containsNotificationFields(JsonObject jsonObject) {
285         return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
286     }
287
288     private static Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) {
289         logger.error(errorMessage);
290         return Flux.empty();
291     }
292 }