3c606debfd36650062e80c408b7ea3616817f135
[dcaegen2/collectors/datafile.git] /
1 /*-
2  * ============LICENSE_START========================================================================
3  * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
4  * ==================================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  */
18
19 package org.onap.dcaegen2.collectors.datafile.service;
20
21 import com.google.gson.JsonArray;
22 import com.google.gson.JsonElement;
23 import com.google.gson.JsonObject;
24 import com.google.gson.JsonParser;
25
26 import java.net.URI;
27 import java.util.ArrayList;
28 import java.util.List;
29 import java.util.Optional;
30 import java.util.stream.StreamSupport;
31
32 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
33 import org.onap.dcaegen2.collectors.datafile.model.FileData;
34 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
35 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
36 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
37 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
38 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 import org.springframework.util.StringUtils;
42
43 import reactor.core.publisher.Flux;
44 import reactor.core.publisher.Mono;
45
46 /**
47  * Parses the fileReady event and creates a Flux of FileReadyMessage containing the information.
48  *
49  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
50  */
51 public class JsonMessageParser {
52     private static final Logger logger = LoggerFactory.getLogger(JsonMessageParser.class);
53
54     private static final String COMMON_EVENT_HEADER = "commonEventHeader";
55     private static final String EVENT_NAME = "eventName";
56     private static final String LAST_EPOCH_MICROSEC = "lastEpochMicrosec";
57     private static final String SOURCE_NAME = "sourceName";
58     private static final String START_EPOCH_MICROSEC = "startEpochMicrosec";
59     private static final String TIME_ZONE_OFFSET = "timeZoneOffset";
60
61     private static final String EVENT = "event";
62     private static final String NOTIFICATION_FIELDS = "notificationFields";
63     private static final String CHANGE_IDENTIFIER = "changeIdentifier";
64     private static final String CHANGE_TYPE = "changeType";
65     private static final String NOTIFICATION_FIELDS_VERSION = "notificationFieldsVersion";
66
67     private static final String ARRAY_OF_NAMED_HASH_MAP = "arrayOfNamedHashMap";
68     private static final String NAME = "name";
69     private static final String HASH_MAP = "hashMap";
70     private static final String LOCATION = "location";
71     private static final String COMPRESSION = "compression";
72     private static final String FILE_FORMAT_TYPE = "fileFormatType";
73     private static final String FILE_FORMAT_VERSION = "fileFormatVersion";
74
75     private static final String FILE_READY_CHANGE_TYPE = "FileReady";
76     private static final String FILE_READY_CHANGE_IDENTIFIER = "PM_MEAS_FILES";
77
78     /**
79      * The data types available in the event name.
80      */
81     private enum EventNameDataType {
82         PRODUCT_NAME(1), VENDOR_NAME(2);
83
84         private int index;
85
86         EventNameDataType(int index) {
87             this.index = index;
88         }
89     }
90
91     public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) {
92         return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData);
93     }
94
95     public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
96         JsonParser jsonParser = new JsonParser();
97         return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
98                 : element.isJsonObject() ? Optional.of((JsonObject) element)
99                         : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
100     }
101
102     private Flux<FileReadyMessage> getMessagesFromJsonArray(JsonElement jsonElement) {
103         return createMessages(
104                 Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
105                         .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
106                                 .orElseGet(JsonObject::new)))));
107     }
108
109     /**
110      * Extract info from string and create a Flux of {@link FileReadyMessage}.
111      *
112      * @param rawMessage - results from DMaaP
113      * @return reactive Flux of FileReadyMessages
114      */
115     private Flux<FileReadyMessage> createMessageData(JsonElement jsonElement) {
116         return jsonElement.isJsonObject() ? createMessages(Flux.just(jsonElement.getAsJsonObject()))
117                 : getMessagesFromJsonArray(jsonElement);
118     }
119
120     private Mono<JsonElement> getJsonParserMessage(String message) {
121         logger.trace("original message from message router: {}", message);
122         return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
123     }
124
125     private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
126         return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
127                 ? logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)
128                 : transformMessages(monoJsonP));
129     }
130
131     private Flux<FileReadyMessage> transformMessages(JsonObject message) {
132         Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message);
133         if (optionalMessageMetaData.isPresent()) {
134             JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
135             JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
136             if (arrayOfNamedHashMap != null) {
137                 List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap);
138                 if (!allFileDataFromJson.isEmpty()) {
139                     MessageMetaData messageMetaData = optionalMessageMetaData.get();
140                     // @formatter:off
141                     return Flux.just(ImmutableFileReadyMessage.builder()
142                             .pnfName(messageMetaData.sourceName())
143                             .messageMetaData(messageMetaData)
144                             .files(allFileDataFromJson)
145                             .build());
146                     // @formatter:on
147                 } else {
148                     return Flux.empty();
149                 }
150             }
151
152             logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message);
153             return Flux.empty();
154         }
155         logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message);
156         return Flux.empty();
157     }
158
159     private Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
160         List<String> missingValues = new ArrayList<>();
161         JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER);
162         String eventName = getValueFromJson(commonEventHeader, EVENT_NAME, missingValues);
163
164         JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
165         String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER, missingValues);
166         String changeType = getValueFromJson(notificationFields, CHANGE_TYPE, missingValues);
167
168         // Just to check that it is in the message. Might be needed in the future if there is a new
169         // version.
170         getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues);
171
172         // @formatter:off
173         MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
174                 .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues))
175                 .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues))
176                 .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues))
177                 .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues))
178                 .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues))
179                 .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues))
180                 .changeIdentifier(changeIdentifier)
181                 .changeType(changeType)
182                 .build();
183         // @formatter:on
184         if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) {
185             return Optional.of(messageMetaData);
186         } else {
187             String errorMessage = "Unable to collect file from xNF.";
188             if (!missingValues.isEmpty()) {
189                 errorMessage += " Missing data: " + missingValues;
190             }
191             if (!isChangeIdentifierCorrect(changeIdentifier) || !isChangeTypeCorrect(changeType)) {
192                 errorMessage += " Change identifier or change type is wrong.";
193             }
194             errorMessage += " Message: {}";
195             logger.error(errorMessage, message);
196             return Optional.empty();
197         }
198     }
199
200     private boolean isChangeTypeCorrect(String changeType) {
201         return FILE_READY_CHANGE_TYPE.equals(changeType);
202     }
203
204     private boolean isChangeIdentifierCorrect(String changeIdentifier) {
205         return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier);
206     }
207
208     private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields) {
209         List<FileData> res = new ArrayList<>();
210         for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
211             JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
212             Optional<FileData> fileData = getFileDataFromJson(fileInfo);
213
214             if (fileData.isPresent()) {
215                 res.add(fileData.get());
216             }
217         }
218         return res;
219     }
220
221     private Optional<FileData> getFileDataFromJson(JsonObject fileInfo) {
222         logger.trace("starting to getFileDataFromJson!");
223
224         List<String> missingValues = new ArrayList<>();
225         JsonObject data = fileInfo.getAsJsonObject(HASH_MAP);
226
227         String location = getValueFromJson(data, LOCATION, missingValues);
228         Scheme scheme;
229         try {
230             scheme = Scheme.getSchemeFromString(URI.create(location).getScheme());
231         } catch (Exception e) {
232             logger.error("Unable to collect file from xNF.", e);
233             return Optional.empty();
234         }
235         // @formatter:off
236         FileData fileData = ImmutableFileData.builder()
237                 .name(getValueFromJson(fileInfo, NAME, missingValues))
238                 .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues))
239                 .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues))
240                 .location(location)
241                 .scheme(scheme)
242                 .compression(getValueFromJson(data, COMPRESSION, missingValues))
243                 .build();
244         // @formatter:on
245         if (missingValues.isEmpty()) {
246             return Optional.of(fileData);
247         }
248         logger.error("Unable to collect file from xNF. File information wrong. Missing data: {} Data: {}",
249                 missingValues, fileInfo);
250         return Optional.empty();
251     }
252
253     /**
254      * Gets data from the event name, defined as:
255      * {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example:
256      * Noti_RnNode-Ericsson_FileReady
257      *
258      * @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}.
259      * @param eventName The event name to get the data from.
260      * @param missingValues List of missing values. The dataType will be added if missing.
261      * @return String of data from event name
262      */
263     private String getDataFromEventName(EventNameDataType dataType, String eventName, List<String> missingValues) {
264         String[] eventArray = eventName.split("_|-");
265         if (eventArray.length >= 4) {
266             return eventArray[dataType.index];
267         } else {
268             missingValues.add(dataType.toString());
269             logger.error("Can not get {} from eventName, eventName is not in correct format: {}", dataType, eventName);
270         }
271         return "";
272     }
273
274     private String getValueFromJson(JsonObject jsonObject, String jsonKey, List<String> missingValues) {
275         if (jsonObject.has(jsonKey)) {
276             return jsonObject.get(jsonKey).getAsString();
277         } else {
278             missingValues.add(jsonKey);
279             return "";
280         }
281     }
282
283     private boolean containsNotificationFields(JsonObject jsonObject) {
284         return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
285     }
286
287     private Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) {
288         logger.error(errorMessage);
289         return Flux.empty();
290     }
291 }