629f3ef962d865544b5f145d0922e6280e382fe8
[dcaegen2/collectors/datafile.git] /
1 /*
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.service;
18
19 import com.google.gson.JsonArray;
20 import com.google.gson.JsonElement;
21 import com.google.gson.JsonObject;
22 import com.google.gson.JsonParser;
23
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.Optional;
27 import java.util.stream.StreamSupport;
28
29 import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
30 import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
31 import org.onap.dcaegen2.collectors.datafile.model.FileData;
32 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 import org.springframework.util.StringUtils;
36
37 import reactor.core.publisher.Flux;
38 import reactor.core.publisher.Mono;
39
40 /**
41  * Parses the fileReady event and creates an array of FileData containing the information.
42  *
43  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
44  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
45  */
46 public class DmaapConsumerJsonParser {
47     private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerJsonParser.class);
48
49     private static final String COMMON_EVENT_HEADER = "commonEventHeader";
50     private static final String EVENT_NAME = "eventName";
51     private static final String LAST_EPOCH_MICROSEC = "lastEpochMicrosec";
52     private static final String SOURCE_NAME = "sourceName";
53     private static final String START_EPOCH_MICROSEC = "startEpochMicrosec";
54     private static final String TIME_ZONE_OFFSET = "timeZoneOffset";
55
56     private static final String EVENT = "event";
57     private static final String NOTIFICATION_FIELDS = "notificationFields";
58     private static final String CHANGE_IDENTIFIER = "changeIdentifier";
59     private static final String CHANGE_TYPE = "changeType";
60     private static final String NOTIFICATION_FIELDS_VERSION = "notificationFieldsVersion";
61
62     private static final String ARRAY_OF_NAMED_HASH_MAP = "arrayOfNamedHashMap";
63     private static final String NAME = "name";
64     private static final String HASH_MAP = "hashMap";
65     private static final String LOCATION = "location";
66     private static final String COMPRESSION = "compression";
67     private static final String FILE_FORMAT_TYPE = "fileFormatType";
68     private static final String FILE_FORMAT_VERSION = "fileFormatVersion";
69
70     private static final String FILE_READY_CHANGE_TYPE = "FileReady";
71     private static final String FILE_READY_CHANGE_IDENTIFIER = "PM_MEAS_FILES";
72
73     /**
74      * Extract info from string and create @see {@link FileData}.
75      *
76      * @param rawMessage - results from DMaaP
77      * @return reactive Mono with an array of FileData
78      */
79     public Flux<FileData> getJsonObject(Mono<String> rawMessage) {
80         return rawMessage.flatMap(this::getJsonParserMessage).flatMapMany(this::createJsonConsumerModel);
81     }
82
83     private Mono<JsonElement> getJsonParserMessage(String message) {
84         logger.trace("original message from message router: {}", message);
85         return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
86                 : Mono.fromSupplier(() -> new JsonParser().parse(message));
87     }
88
89     private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
90         return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
91                 : getFileDataFromJsonArray(jsonElement);
92     }
93
94     private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
95         return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
96                 .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
97     }
98
99     public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
100         logger.trace("starting to getJsonObjectFromAnArray!");
101
102         return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject());
103     }
104
105     private Flux<FileData> create(Mono<JsonObject> jsonObject) {
106         return jsonObject.flatMapMany(monoJsonP -> !containsHeader(monoJsonP)
107                 ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
108                 : transform(monoJsonP));
109     }
110
111     private Flux<FileData> transform(JsonObject jsonObject) {
112         if (containsHeader(jsonObject, EVENT, NOTIFICATION_FIELDS)) {
113             JsonObject commonEventHeader = jsonObject.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER);
114             String eventName = getValueFromJson(commonEventHeader, EVENT_NAME);
115             String productName = getProductNameFromEventName(eventName);
116             String vendorName = getVendorNameFromEventName(eventName);
117             String lastEpochMicrosec = getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC);
118             String sourceName = getValueFromJson(commonEventHeader, SOURCE_NAME);
119             String startEpochMicrosec = getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC);
120             String timeZoneOffset = getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET);
121
122             JsonObject notificationFields = jsonObject.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
123             String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER);
124             String changeType = getValueFromJson(notificationFields, CHANGE_TYPE);
125             String notificationFieldsVersion = getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION);
126             JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
127             if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)
128                     && arrayOfNamedHashMap != null && isChangeIdentifierCorrect(changeIdentifier)
129                     && isChangeTypeCorrect(changeType)) {
130                 return getAllFileDataFromJson(productName, vendorName, lastEpochMicrosec, sourceName,
131                         startEpochMicrosec, timeZoneOffset, changeIdentifier, changeType, arrayOfNamedHashMap);
132             }
133
134             return handleJsonError(changeIdentifier, changeType, notificationFieldsVersion, arrayOfNamedHashMap,
135                     jsonObject);
136         }
137         return Flux.error(
138                 new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject));
139     }
140
141     private boolean isChangeTypeCorrect(String changeType) {
142         return FILE_READY_CHANGE_TYPE.equals(changeType);
143     }
144
145     private boolean isChangeIdentifierCorrect(String changeIdentifier) {
146         return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier);
147     }
148
149     private Flux<FileData> getAllFileDataFromJson(String productName, String vendorName, String lastEpochMicrosec,
150             String sourceName, String startEpochMicrosec, String timeZoneOffset, String changeIdentifier,
151             String changeType, JsonArray arrayOfAdditionalFields) {
152         List<FileData> res = new ArrayList<>();
153         for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
154             if (arrayOfAdditionalFields.get(i) != null) {
155                 JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
156                 FileData fileData = getFileDataFromJson(productName, vendorName, lastEpochMicrosec, sourceName,
157                         startEpochMicrosec, timeZoneOffset, fileInfo, changeIdentifier, changeType);
158
159                 if (fileData != null) {
160                     res.add(fileData);
161                 } else {
162                     logger.error("Unable to collect file from xNF. File information wrong. Data: {}", fileInfo);
163                 }
164             }
165         }
166         return Flux.fromIterable(res);
167     }
168
169     private FileData getFileDataFromJson(String productName, String vendorName, String lastEpochMicrosec,
170             String sourceName, String startEpochMicrosec, String timeZoneOffset, JsonObject fileInfo,
171             String changeIdentifier, String changeType) {
172         logger.trace("starting to getFileDataFromJson!");
173
174         FileData fileData = null;
175
176         String name = getValueFromJson(fileInfo, NAME);
177         JsonObject data = fileInfo.getAsJsonObject(HASH_MAP);
178         String fileFormatType = getValueFromJson(data, FILE_FORMAT_TYPE);
179         String fileFormatVersion = getValueFromJson(data, FILE_FORMAT_VERSION);
180         String location = getValueFromJson(data, LOCATION);
181         String compression = getValueFromJson(data, COMPRESSION);
182
183         if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType)
184                 && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) {
185             // @formatter:off
186             fileData = ImmutableFileData.builder()
187                     .productName(productName)
188                     .vendorName(vendorName)
189                     .lastEpochMicrosec(lastEpochMicrosec)
190                     .sourceName(sourceName)
191                     .startEpochMicrosec(startEpochMicrosec)
192                     .timeZoneOffset(timeZoneOffset)
193                     .name(name)
194                     .changeIdentifier(changeIdentifier)
195                     .changeType(changeType)
196                     .location(location)
197                     .compression(compression)
198                     .fileFormatType(fileFormatType)
199                     .fileFormatVersion(fileFormatVersion)
200                     .build();
201             // @formatter:on
202         }
203         return fileData;
204     }
205
206     /**
207      * @param eventName
208      * @return String of vendorName eventName is defined as:
209      *         {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example:
210      *         Noti_RnNode-Ericsson_FileReady
211      */
212     private String getVendorNameFromEventName(String eventName) {
213         String[] eventArray = eventName.split("_|-");
214         if (eventArray.length >= 4) {
215             return eventArray[2];
216         } else {
217             logger.trace("Can not get vendorName from eventName, eventName is not in correct format: " + eventName);
218         }
219         return "";
220     }
221
222     /**
223      * @param eventName
224      * @return String of productName
225      */
226     private String getProductNameFromEventName(String eventName) {
227         String[] eventArray = eventName.split("_|-");
228         if (eventArray.length >= 4) {
229             return eventArray[1];
230         } else {
231             logger.trace("Can not get productName from eventName, eventName is not in correct format: " + eventName);
232         }
233         return "";
234     }
235
236     private String getValueFromJson(JsonObject jsonObject, String jsonKey) {
237         return jsonObject.has(jsonKey) ? jsonObject.get(jsonKey).getAsString() : "";
238     }
239
240     private boolean isNotificationFieldsHeaderNotEmpty(String changeIdentifier, String changeType,
241             String notificationFieldsVersion) {
242         return isStringIsNotNullAndNotEmpty(changeIdentifier) && isStringIsNotNullAndNotEmpty(changeType)
243                 && isStringIsNotNullAndNotEmpty(notificationFieldsVersion);
244     }
245
246     private boolean isFileFormatFieldsNotEmpty(String fileFormatVersion, String fileFormatType) {
247         return isStringIsNotNullAndNotEmpty(fileFormatVersion) && isStringIsNotNullAndNotEmpty(fileFormatType);
248     }
249
250     private boolean isNameAndLocationAndCompressionNotEmpty(String name, String location, String compression) {
251         return isStringIsNotNullAndNotEmpty(name) && isStringIsNotNullAndNotEmpty(location)
252                 && isStringIsNotNullAndNotEmpty(compression);
253     }
254
255     private boolean containsHeader(JsonObject jsonObject) {
256         return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
257     }
258
259     private boolean containsHeader(JsonObject jsonObject, String topHeader, String header) {
260         return jsonObject.has(topHeader) && jsonObject.getAsJsonObject(topHeader).has(header);
261     }
262
263     private boolean isStringIsNotNullAndNotEmpty(String string) {
264         return string != null && !string.isEmpty();
265     }
266
267     private Flux<FileData> handleJsonError(String changeIdentifier, String changeType, String notificationFieldsVersion,
268             JsonArray arrayOfNamedHashMap, JsonObject jsonObject) {
269         String errorMessage = "FileReady event information is incomplete or incorrect!\n";
270         if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) {
271             errorMessage += "header is missing.\n";
272         }
273         if (arrayOfNamedHashMap == null) {
274             errorMessage += "arrayOfNamedHashMap is missing.\n";
275         }
276         if (!isChangeIdentifierCorrect(changeIdentifier)) {
277             errorMessage += "changeIdentifier is incorrect.\n";
278         }
279         if (!isChangeTypeCorrect(changeType)) {
280             errorMessage += "changeType is incorrect.\n";
281         }
282         return Flux.error(new DmaapNotFoundException(errorMessage + jsonObject));
283     }
284 }