46c6e9426388113c5d365b8810cd4a029c6720c1
[dcaegen2/collectors/datafile.git] /
1 /*
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.service;
18
19 import com.google.gson.JsonArray;
20 import com.google.gson.JsonElement;
21 import com.google.gson.JsonObject;
22 import com.google.gson.JsonParser;
23
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.Optional;
27 import java.util.stream.StreamSupport;
28
29 import org.onap.dcaegen2.collectors.datafile.model.FileData;
30 import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
31 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
32 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 import org.springframework.util.StringUtils;
36
37 import reactor.core.publisher.Flux;
38 import reactor.core.publisher.Mono;
39
40 /**
41  * Parses the fileReady event and creates an array of FileData containing the information.
42  *
43  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
44  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
45  */
46 public class DmaapConsumerJsonParser {
47     private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerJsonParser.class);
48
49     private static final String COMMON_EVENT_HEADER = "commonEventHeader";
50     private static final String EVENT_NAME = "eventName";
51     private static final String LAST_EPOCH_MICROSEC = "lastEpochMicrosec";
52     private static final String SOURCE_NAME = "sourceName";
53     private static final String START_EPOCH_MICROSEC = "startEpochMicrosec";
54     private static final String TIME_ZONE_OFFSET = "timeZoneOffset";
55
56     private static final String EVENT = "event";
57     private static final String NOTIFICATION_FIELDS = "notificationFields";
58     private static final String CHANGE_IDENTIFIER = "changeIdentifier";
59     private static final String CHANGE_TYPE = "changeType";
60     private static final String NOTIFICATION_FIELDS_VERSION = "notificationFieldsVersion";
61
62     private static final String ARRAY_OF_NAMED_HASH_MAP = "arrayOfNamedHashMap";
63     private static final String NAME = "name";
64     private static final String HASH_MAP = "hashMap";
65     private static final String LOCATION = "location";
66     private static final String COMPRESSION = "compression";
67     private static final String FILE_FORMAT_TYPE = "fileFormatType";
68     private static final String FILE_FORMAT_VERSION = "fileFormatVersion";
69
70     private static final String FILE_READY_CHANGE_TYPE = "FileReady";
71     private static final String FILE_READY_CHANGE_IDENTIFIER = "PM_MEAS_FILES";
72
73     /**
74      * The data types available in the event name.
75      */
76     private enum EventNameDataType {
77         PRODUCT_NAME(1), VENDOR_NAME(2);
78
79         private int index;
80
81         EventNameDataType(int index) {
82             this.index = index;
83         }
84     }
85
86     /**
87      * Extract info from string and create a {@link FileData}.
88      *
89      * @param rawMessage - results from DMaaP
90      * @return reactive Mono with an array of FileData
91      */
92     public Flux<FileData> getJsonObject(Mono<String> rawMessage) {
93         return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
94     }
95
96     private Mono<JsonElement> getJsonParserMessage(String message) {
97         logger.trace("original message from message router: {}", message);
98         return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
99     }
100
101     private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
102         return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())))
103                 : getFileDataFromJsonArray(jsonElement);
104     }
105
106     private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
107         return create(
108                 Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
109                         .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
110                                 .orElseGet(JsonObject::new)))));
111     }
112
113     public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
114         JsonParser jsonParser = new JsonParser();
115         return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
116                 : element.isJsonObject() ? Optional.of((JsonObject) element)
117                         : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
118     }
119
120     private Flux<FileData> create(Flux<JsonObject> jsonObject) {
121         return jsonObject
122                 .flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
123                         ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject)
124                         : transform(monoJsonP));
125     }
126
127     private Flux<FileData> transform(JsonObject message) {
128         Optional<FileMetaData> fileMetaData = getFileMetaData(message);
129         if (fileMetaData.isPresent()) {
130             JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
131             JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
132             if (arrayOfNamedHashMap != null) {
133                 return getAllFileDataFromJson(fileMetaData.get(), arrayOfNamedHashMap);
134             }
135
136             logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message);
137             return Flux.empty();
138         }
139         logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message);
140         return Flux.empty();
141     }
142
143     private Optional<FileMetaData> getFileMetaData(JsonObject message) {
144         List<String> missingValues = new ArrayList<>();
145         JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER);
146         String eventName = getValueFromJson(commonEventHeader, EVENT_NAME, missingValues);
147
148         JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
149         String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER, missingValues);
150         String changeType = getValueFromJson(notificationFields, CHANGE_TYPE, missingValues);
151
152         // Just to check that it is in the message. Might be needed in the future if there is a new
153         // version.
154         getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues);
155
156         // @formatter:off
157         FileMetaData fileMetaData = ImmutableFileMetaData.builder()
158                 .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues))
159                 .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues))
160                 .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues))
161                 .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues))
162                 .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues))
163                 .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues))
164                 .changeIdentifier(changeIdentifier)
165                 .changeType(changeType)
166                 .build();
167         // @formatter:on
168         if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) {
169             return Optional.of(fileMetaData);
170         } else {
171             String errorMessage = "Unable to collect file from xNF.";
172             if (!missingValues.isEmpty()) {
173                 errorMessage += " Missing data: " + missingValues;
174             }
175             if (!isChangeIdentifierCorrect(changeIdentifier) || !isChangeTypeCorrect(changeType)) {
176                 errorMessage += " Change identifier or change type is wrong.";
177             }
178             errorMessage += " Message: {}";
179             logger.error(errorMessage, message);
180             return Optional.empty();
181         }
182     }
183
184     private boolean isChangeTypeCorrect(String changeType) {
185         return FILE_READY_CHANGE_TYPE.equals(changeType);
186     }
187
188     private boolean isChangeIdentifierCorrect(String changeIdentifier) {
189         return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier);
190     }
191
192     private Flux<FileData> getAllFileDataFromJson(FileMetaData fileMetaData, JsonArray arrayOfAdditionalFields) {
193         List<FileData> res = new ArrayList<>();
194         for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
195             JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
196             Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo);
197
198             if (fileData.isPresent()) {
199                 res.add(fileData.get());
200             }
201         }
202         return Flux.fromIterable(res);
203     }
204
205     private Optional<FileData> getFileDataFromJson(FileMetaData fileMetaData, JsonObject fileInfo) {
206         logger.trace("starting to getFileDataFromJson!");
207
208         List<String> missingValues = new ArrayList<>();
209         JsonObject data = fileInfo.getAsJsonObject(HASH_MAP);
210
211         // @formatter:off
212         FileData fileData = ImmutableFileData.builder()
213                 .fileMetaData(fileMetaData)
214                 .name(getValueFromJson(fileInfo, NAME, missingValues))
215                 .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues))
216                 .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues))
217                 .location(getValueFromJson(data, LOCATION, missingValues))
218                 .compression(getValueFromJson(data, COMPRESSION, missingValues))
219                 .build();
220         // @formatter:on
221         if (missingValues.isEmpty()) {
222             return Optional.of(fileData);
223         }
224         logger.error("Unable to collect file from xNF. File information wrong. Missing data: {} Data: {}",
225                 missingValues, fileInfo);
226         return Optional.empty();
227     }
228
229     /**
230      * Gets data from the event name, defined as:
231      * {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example:
232      * Noti_RnNode-Ericsson_FileReady
233      *
234      * @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}.
235      * @param eventName The event name to get the data from.
236      * @param missingValues List of missing values. The dataType will be added if missing.
237      * @return String of data from event name
238      */
239     private String getDataFromEventName(EventNameDataType dataType, String eventName, List<String> missingValues) {
240         String[] eventArray = eventName.split("_|-");
241         if (eventArray.length >= 4) {
242             return eventArray[dataType.index];
243         } else {
244             missingValues.add(dataType.toString());
245             logger.error("Can not get {} from eventName, eventName is not in correct format: {}", dataType, eventName);
246         }
247         return "";
248     }
249
250     private String getValueFromJson(JsonObject jsonObject, String jsonKey, List<String> missingValues) {
251         if (jsonObject.has(jsonKey)) {
252             return jsonObject.get(jsonKey).getAsString();
253         } else {
254             missingValues.add(jsonKey);
255             return "";
256         }
257     }
258
259     private boolean containsNotificationFields(JsonObject jsonObject) {
260         return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
261     }
262
263     private Flux<FileData> logErrorAndReturnEmptyFlux(String errorMessage) {
264         logger.error(errorMessage);
265         return Flux.empty();
266     }
267 }