29885f999dbf48ca172f214920c18b24e549fce9
[dcaegen2/collectors/datafile.git] /
1 /*
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.service;
18
19 import com.google.gson.JsonArray;
20 import com.google.gson.JsonElement;
21 import com.google.gson.JsonObject;
22 import com.google.gson.JsonParser;
23
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.Optional;
27 import java.util.stream.StreamSupport;
28
29 import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
30 import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
31 import org.onap.dcaegen2.collectors.datafile.model.FileData;
32 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 import org.springframework.util.StringUtils;
36
37 import reactor.core.publisher.Flux;
38 import reactor.core.publisher.Mono;
39
40 /**
41  * Parses the fileReady event and creates an array of FileData containing the information.
42  *
43  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
44  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
45  */
46 public class DmaapConsumerJsonParser {
47     private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerJsonParser.class);
48
49     private static final String EVENT = "event";
50     private static final String NOTIFICATION_FIELDS = "notificationFields";
51     private static final String CHANGE_IDENTIFIER = "changeIdentifier";
52     private static final String CHANGE_TYPE = "changeType";
53     private static final String NOTIFICATION_FIELDS_VERSION = "notificationFieldsVersion";
54
55     private static final String ARRAY_OF_NAMED_HASH_MAP = "arrayOfNamedHashMap";
56     private static final String NAME = "name";
57     private static final String HASH_MAP = "hashMap";
58     private static final String LOCATION = "location";
59     private static final String COMPRESSION = "compression";
60     private static final String FILE_FORMAT_TYPE = "fileFormatType";
61     private static final String FILE_FORMAT_VERSION = "fileFormatVersion";
62
63     private static final String FILE_READY_CHANGE_TYPE = "FileReady";
64     private static final String FILE_READY_CHANGE_IDENTIFIER = "PM_MEAS_FILES";
65
66     /**
67      * Extract info from string and create @see {@link FileData}.
68      *
69      * @param rawMessage - results from DMaaP
70      * @return reactive Mono with an array of FileData
71      */
72     public Flux<FileData> getJsonObject(Mono<String> rawMessage) {
73         return rawMessage.flatMap(this::getJsonParserMessage).flatMapMany(this::createJsonConsumerModel);
74     }
75
76     private Mono<JsonElement> getJsonParserMessage(String message) {
77         logger.trace("original message from message router: {}", message);
78         return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
79                 : Mono.fromSupplier(() -> new JsonParser().parse(message));
80     }
81
82     private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
83         return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
84                 : getFileDataFromJsonArray(jsonElement);
85     }
86
87     private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
88         return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
89                 .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
90     }
91
92     public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
93         logger.trace("starting to getJsonObjectFromAnArray!");
94
95         return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject());
96     }
97
98     private Flux<FileData> create(Mono<JsonObject> jsonObject) {
99         return jsonObject.flatMapMany(monoJsonP -> !containsHeader(monoJsonP)
100                 ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
101                 : transform(monoJsonP));
102     }
103
104     private Flux<FileData> transform(JsonObject jsonObject) {
105         if (containsHeader(jsonObject, EVENT, NOTIFICATION_FIELDS)) {
106             JsonObject notificationFields = jsonObject.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
107             String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER);
108             String changeType = getValueFromJson(notificationFields, CHANGE_TYPE);
109             String notificationFieldsVersion = getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION);
110             JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
111             if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)
112                     && arrayOfNamedHashMap != null && isChangeIdentifierCorrect(changeIdentifier)
113                     && isChangeTypeCorrect(changeType)) {
114                 return getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap);
115             }
116
117             return handleJsonError(changeIdentifier, changeType, notificationFieldsVersion, arrayOfNamedHashMap,
118                     jsonObject);
119         }
120         return Flux.error(
121                 new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject));
122     }
123
124     private boolean isChangeTypeCorrect(String changeType) {
125         return FILE_READY_CHANGE_TYPE.equals(changeType);
126     }
127
128     private boolean isChangeIdentifierCorrect(String changeIdentifier) {
129         return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier);
130     }
131
132     private Flux<FileData> getAllFileDataFromJson(String changeIdentifier, String changeType,
133             JsonArray arrayOfAdditionalFields) {
134         List<FileData> res = new ArrayList<>();
135         for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
136             if (arrayOfAdditionalFields.get(i) != null) {
137                 JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
138                 FileData fileData = getFileDataFromJson(fileInfo, changeIdentifier, changeType);
139
140                 if (fileData != null) {
141                     res.add(fileData);
142                 } else {
143                     logger.error("Unable to collect file from xNF. File information wrong. Data: {}", fileInfo);
144                 }
145             }
146         }
147         return Flux.fromIterable(res);
148     }
149
150     private FileData getFileDataFromJson(JsonObject fileInfo, String changeIdentifier, String changeType) {
151         logger.trace("starting to getFileDataFromJson!");
152
153         FileData fileData = null;
154
155         String name = getValueFromJson(fileInfo, NAME);
156         JsonObject data = fileInfo.getAsJsonObject(HASH_MAP);
157         String fileFormatType = getValueFromJson(data, FILE_FORMAT_TYPE);
158         String fileFormatVersion = getValueFromJson(data, FILE_FORMAT_VERSION);
159         String location = getValueFromJson(data, LOCATION);
160         String compression = getValueFromJson(data, COMPRESSION);
161
162         if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType)
163                 && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) {
164             fileData = ImmutableFileData.builder().name(name).changeIdentifier(changeIdentifier).changeType(changeType)
165                     .location(location).compression(compression).fileFormatType(fileFormatType)
166                     .fileFormatVersion(fileFormatVersion).build();
167         }
168         return fileData;
169     }
170
171     private String getValueFromJson(JsonObject jsonObject, String jsonKey) {
172         return jsonObject.has(jsonKey) ? jsonObject.get(jsonKey).getAsString() : "";
173     }
174
175     private boolean isNotificationFieldsHeaderNotEmpty(String changeIdentifier, String changeType,
176             String notificationFieldsVersion) {
177         return isStringIsNotNullAndNotEmpty(changeIdentifier) && isStringIsNotNullAndNotEmpty(changeType)
178                 && isStringIsNotNullAndNotEmpty(notificationFieldsVersion);
179     }
180
181     private boolean isFileFormatFieldsNotEmpty(String fileFormatVersion, String fileFormatType) {
182         return isStringIsNotNullAndNotEmpty(fileFormatVersion) && isStringIsNotNullAndNotEmpty(fileFormatType);
183     }
184
185     private boolean isNameAndLocationAndCompressionNotEmpty(String name, String location, String compression) {
186         return isStringIsNotNullAndNotEmpty(name) && isStringIsNotNullAndNotEmpty(location)
187                 && isStringIsNotNullAndNotEmpty(compression);
188     }
189
190     private boolean containsHeader(JsonObject jsonObject) {
191         return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
192     }
193
194     private boolean containsHeader(JsonObject jsonObject, String topHeader, String header) {
195         return jsonObject.has(topHeader) && jsonObject.getAsJsonObject(topHeader).has(header);
196     }
197
198     private boolean isStringIsNotNullAndNotEmpty(String string) {
199         return string != null && !string.isEmpty();
200     }
201
202     private Flux<FileData> handleJsonError(String changeIdentifier, String changeType, String notificationFieldsVersion,
203             JsonArray arrayOfNamedHashMap, JsonObject jsonObject) {
204         String errorMessage = "FileReady event information is incomplete or incorrect!\n";
205         if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) {
206             errorMessage += "header is missing.\n";
207         }
208         if (arrayOfNamedHashMap == null) {
209             errorMessage += "arrayOfNamedHashMap is missing.\n";
210         }
211         if (!isChangeIdentifierCorrect(changeIdentifier)) {
212             errorMessage += "changeIdentifier is incorrect.\n";
213         }
214         if (!isChangeTypeCorrect(changeType)) {
215             errorMessage += "changeType is incorrect.\n";
216         }
217         return Flux.error(new DmaapNotFoundException(errorMessage + jsonObject));
218     }
219 }