2 * ============LICENSE_START======================================================================
3 * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
4 * ===============================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6 * in compliance with the License. You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software distributed under the License
11 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12 * or implied. See the License for the specific language governing permissions and limitations under
14 * ============LICENSE_END========================================================================
17 package org.onap.dcaegen2.collectors.datafile.service;
19 import com.google.gson.JsonArray;
20 import com.google.gson.JsonElement;
21 import com.google.gson.JsonObject;
22 import com.google.gson.JsonParser;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.Optional;
27 import java.util.stream.StreamSupport;
29 import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
30 import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
31 import org.onap.dcaegen2.collectors.datafile.model.FileData;
32 import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
33 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
34 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37 import org.springframework.util.StringUtils;
39 import reactor.core.publisher.Flux;
40 import reactor.core.publisher.Mono;
43 * Parses the fileReady event and creates an array of FileData containing the information.
45 * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
46 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
48 public class DmaapConsumerJsonParser {
49 private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerJsonParser.class);
51 private static final String COMMON_EVENT_HEADER = "commonEventHeader";
52 private static final String EVENT_NAME = "eventName";
53 private static final String LAST_EPOCH_MICROSEC = "lastEpochMicrosec";
54 private static final String SOURCE_NAME = "sourceName";
55 private static final String START_EPOCH_MICROSEC = "startEpochMicrosec";
56 private static final String TIME_ZONE_OFFSET = "timeZoneOffset";
58 private static final String EVENT = "event";
59 private static final String NOTIFICATION_FIELDS = "notificationFields";
60 private static final String CHANGE_IDENTIFIER = "changeIdentifier";
61 private static final String CHANGE_TYPE = "changeType";
62 private static final String NOTIFICATION_FIELDS_VERSION = "notificationFieldsVersion";
64 private static final String ARRAY_OF_NAMED_HASH_MAP = "arrayOfNamedHashMap";
65 private static final String NAME = "name";
66 private static final String HASH_MAP = "hashMap";
67 private static final String LOCATION = "location";
68 private static final String COMPRESSION = "compression";
69 private static final String FILE_FORMAT_TYPE = "fileFormatType";
70 private static final String FILE_FORMAT_VERSION = "fileFormatVersion";
72 private static final String FILE_READY_CHANGE_TYPE = "FileReady";
73 private static final String FILE_READY_CHANGE_IDENTIFIER = "PM_MEAS_FILES";
76 * The data types available in the event name.
78 private enum EventNameDataType {
79 PRODUCT_NAME(1), VENDOR_NAME(2);
83 EventNameDataType(int index) {
89 * Extract info from string and create a {@link FileData}.
91 * @param rawMessage - results from DMaaP
92 * @return reactive Mono with an array of FileData
94 public Flux<FileData> getJsonObject(Mono<String> rawMessage) {
95 return rawMessage.flatMap(this::getJsonParserMessage).flatMapMany(this::createJsonConsumerModel);
98 private Mono<JsonElement> getJsonParserMessage(String message) {
99 logger.trace("original message from message router: {}", message);
100 return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
101 : Mono.fromSupplier(() -> new JsonParser().parse(message));
104 private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
105 return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
106 : getFileDataFromJsonArray(jsonElement);
109 private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
110 return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
111 .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
114 public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
115 logger.trace("starting to getJsonObjectFromAnArray!");
117 return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject());
120 private Flux<FileData> create(Mono<JsonObject> jsonObject) {
121 return jsonObject.flatMapMany(monoJsonP -> !containsNotificationFields(monoJsonP)
122 ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header. " + jsonObject))
123 : transform(monoJsonP));
126 private Flux<FileData> transform(JsonObject message) {
127 Optional<FileMetaData> fileMetaData = getFileMetaData(message);
128 if (fileMetaData.isPresent()) {
129 JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
130 JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
131 if (arrayOfNamedHashMap != null) {
132 return getAllFileDataFromJson(fileMetaData.get(), arrayOfNamedHashMap);
135 return Flux.error(new DmaapNotFoundException(
136 "Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. " + message));
138 return Flux.error(new DmaapNotFoundException(
139 "Unable to collect file from xNF. FileReady event has incorrect JsonObject"));
142 private Optional<FileMetaData> getFileMetaData(JsonObject message) {
143 List<String> missingValues = new ArrayList<>();
144 JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER);
145 String eventName = getValueFromJson(commonEventHeader, EVENT_NAME, missingValues);
147 JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
148 String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER, missingValues);
149 String changeType = getValueFromJson(notificationFields, CHANGE_TYPE, missingValues);
151 // Just to check that it is in the message. Might be needed in the future if there is a new
153 getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues);
156 FileMetaData fileMetaData = ImmutableFileMetaData.builder()
157 .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues))
158 .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues))
159 .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues))
160 .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues))
161 .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues))
162 .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues))
163 .changeIdentifier(changeIdentifier)
164 .changeType(changeType)
167 if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) {
168 return Optional.of(fileMetaData);
170 String errorMessage = "Unable to collect file from xNF.";
171 if (!missingValues.isEmpty()) {
172 errorMessage += " Missing data: " + missingValues;
174 if (!isChangeIdentifierCorrect(changeIdentifier) || !isChangeTypeCorrect(changeType)) {
175 errorMessage += " Change identifier or change type is wrong.";
177 errorMessage += " Message: {}";
178 logger.error(errorMessage, message);
179 return Optional.empty();
183 private boolean isChangeTypeCorrect(String changeType) {
184 return FILE_READY_CHANGE_TYPE.equals(changeType);
187 private boolean isChangeIdentifierCorrect(String changeIdentifier) {
188 return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier);
191 private Flux<FileData> getAllFileDataFromJson(FileMetaData fileMetaData, JsonArray arrayOfAdditionalFields) {
192 List<FileData> res = new ArrayList<>();
193 for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
194 if (arrayOfAdditionalFields.get(i) != null) {
195 JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
196 Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo);
198 if (fileData.isPresent()) {
199 res.add(fileData.get());
203 return Flux.fromIterable(res);
206 private Optional<FileData> getFileDataFromJson(FileMetaData fileMetaData, JsonObject fileInfo) {
207 logger.trace("starting to getFileDataFromJson!");
209 List<String> missingValues = new ArrayList<>();
210 JsonObject data = fileInfo.getAsJsonObject(HASH_MAP);
213 FileData fileData = ImmutableFileData.builder()
214 .fileMetaData(fileMetaData)
215 .name(getValueFromJson(fileInfo, NAME, missingValues))
216 .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues))
217 .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues))
218 .location(getValueFromJson(data, LOCATION, missingValues))
219 .compression(getValueFromJson(data, COMPRESSION, missingValues))
222 if (missingValues.isEmpty()) {
223 return Optional.of(fileData);
225 logger.error("Unable to collect file from xNF. File information wrong. Missing data: {} Data: {}",
226 missingValues, fileInfo);
227 return Optional.empty();
231 * Gets data from the event name, defined as:
232 * {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example:
233 * Noti_RnNode-Ericsson_FileReady
235 * @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}.
236 * @param eventName The event name to get the data from.
237 * @param missingValues List of missing values. The dataType will be added if missing.
238 * @return String of data from event name
240 private String getDataFromEventName(EventNameDataType dataType, String eventName, List<String> missingValues) {
241 String[] eventArray = eventName.split("_|-");
242 if (eventArray.length >= 4) {
243 return eventArray[dataType.index];
245 missingValues.add(dataType.toString());
246 logger.error("Can not get {} from eventName, eventName is not in correct format: {}", dataType, eventName);
251 private String getValueFromJson(JsonObject jsonObject, String jsonKey, List<String> missingValues) {
252 if (jsonObject.has(jsonKey)) {
253 return jsonObject.get(jsonKey).getAsString();
255 missingValues.add(jsonKey);
260 private boolean containsNotificationFields(JsonObject jsonObject) {
261 return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);