e4afd3ae7865e6dcb5a4fef8688575838e0eff12
[dcaegen2/collectors/datafile.git] /
1 /*
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.service;
18
19 import com.google.gson.JsonArray;
20 import com.google.gson.JsonElement;
21 import com.google.gson.JsonObject;
22 import com.google.gson.JsonParser;
23
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.Optional;
27 import java.util.stream.StreamSupport;
28
29 import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
30 import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import org.onap.dcaegen2.collectors.datafile.model.FileData;
34 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
35 import org.springframework.util.StringUtils;
36
37 import reactor.core.publisher.Mono;
38
39 /**
40  * Parses the fileReady event and creates an array of FileData containing the information.
41  *
42  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
43  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
44  */
45 public class DmaapConsumerJsonParser {
46
47     private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerJsonParser.class);
48
49     private static final String EVENT = "event";
50     private static final String NOTIFICATION_FIELDS = "notificationFields";
51     private static final String CHANGE_IDENTIFIER = "changeIdentifier";
52     private static final String CHANGE_TYPE = "changeType";
53     private static final String NOTIFICATION_FIELDS_VERSION = "notificationFieldsVersion";
54
55     private static final String ARRAY_OF_NAMED_HASH_MAP = "arrayOfNamedHashMap";
56     private static final String NAME = "name";
57     private static final String HASH_MAP = "hashMap";
58     private static final String LOCATION = "location";
59     private static final String COMPRESSION = "compression";
60     private static final String FILE_FORMAT_TYPE = "fileFormatType";
61     private static final String FILE_FORMAT_VERSION = "fileFormatVersion";
62
63     /**
64      * Extract info from string and create @see {@link FileData}.
65      *
66      * @param rawMessage - results from DMaaP
67      * @return reactive Mono with an array of FileData
68      */
69     public Mono<List<FileData>> getJsonObject(Mono<String> rawMessage) {
70         return rawMessage.flatMap(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
71     }
72
73     private Mono<JsonElement> getJsonParserMessage(String message) {
74         return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
75             : Mono.fromSupplier(() -> new JsonParser().parse(message));
76     }
77
78     private Mono<List<FileData>> createJsonConsumerModel(JsonElement jsonElement) {
79         return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
80             : getFileDataFromJsonArray(jsonElement);
81     }
82
83     private Mono<List<FileData>> getFileDataFromJsonArray(JsonElement jsonElement) {
84         return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
85             .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
86     }
87
88     public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
89         return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject());
90     }
91
92     private Mono<List<FileData>> create(Mono<JsonObject> jsonObject) {
93         return jsonObject.flatMap(monoJsonP -> !containsHeader(monoJsonP)
94             ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
95             : transform(monoJsonP));
96     }
97
98     private Mono<List<FileData>> transform(JsonObject jsonObject) {
99         if (containsHeader(jsonObject, EVENT, NOTIFICATION_FIELDS)) {
100             JsonObject notificationFields = jsonObject.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
101             String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER);
102             String changeType = getValueFromJson(notificationFields, CHANGE_TYPE);
103             String notificationFieldsVersion = getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION);
104             JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
105
106             if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)
107                 && arrayOfNamedHashMap != null) {
108                 Mono<List<FileData>> res = getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap);
109                 return res;
110             }
111
112             if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) {
113                 return Mono.error(
114                     new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject));
115             } else if (arrayOfNamedHashMap != null) {
116                 return Mono.error(
117                     new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject));
118             }
119             return Mono.error(
120                 new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject));
121         }
122         return Mono.error(
123             new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject));
124
125     }
126
127     private Mono<List<FileData>> getAllFileDataFromJson(String changeIdentifier, String changeType,
128         JsonArray arrayOfAdditionalFields) {
129         List<FileData> res = new ArrayList<>();
130         for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
131             if (arrayOfAdditionalFields.get(i) != null) {
132                 JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
133                 FileData fileData = getFileDataFromJson(fileInfo, changeIdentifier, changeType);
134
135                 if (fileData != null) {
136                     res.add(fileData);
137                 } else {
138                     logger.error("Unable to collect file from xNF. File information wrong. " + fileInfo);
139                 }
140             }
141         }
142         return Mono.just(res);
143     }
144
145     private FileData getFileDataFromJson(JsonObject fileInfo, String changeIdentifier, String changeType) {
146         FileData fileData = null;
147
148         String name = getValueFromJson(fileInfo, NAME);
149         JsonObject data = fileInfo.getAsJsonObject(HASH_MAP);
150         String fileFormatType = getValueFromJson(data, FILE_FORMAT_TYPE);
151         String fileFormatVersion = getValueFromJson(data, FILE_FORMAT_VERSION);
152         String location = getValueFromJson(data, LOCATION);
153         String compression = getValueFromJson(data, COMPRESSION);
154
155         if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType)
156             && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) {
157             fileData = ImmutableFileData.builder().changeIdentifier(changeIdentifier).changeType(changeType)
158                 .location(location).compression(compression).fileFormatType(fileFormatType)
159                 .fileFormatVersion(fileFormatVersion).build();
160         }
161         return fileData;
162     }
163
164     private String getValueFromJson(JsonObject jsonObject, String jsonKey) {
165         return jsonObject.has(jsonKey) ? jsonObject.get(jsonKey).getAsString() : "";
166     }
167
168     private boolean isNotificationFieldsHeaderNotEmpty(String changeIdentifier, String changeType,
169         String notificationFieldsVersion) {
170         return ((changeIdentifier != null && !changeIdentifier.isEmpty())
171             && (changeType != null && !changeType.isEmpty())
172             && (notificationFieldsVersion != null && !notificationFieldsVersion.isEmpty()));
173     }
174
175     private boolean isFileFormatFieldsNotEmpty(String fileFormatVersion, String fileFormatType) {
176         return ((fileFormatVersion != null && !fileFormatVersion.isEmpty())
177             && (fileFormatType != null && !fileFormatType.isEmpty()));
178     }
179
180     private boolean isNameAndLocationAndCompressionNotEmpty(String name, String location, String compression) {
181         return (name != null && !name.isEmpty()) && (location != null && !location.isEmpty())
182             && (compression != null && !compression.isEmpty());
183     }
184
185     private boolean containsHeader(JsonObject jsonObject) {
186         return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
187     }
188
189     private boolean containsHeader(JsonObject jsonObject, String topHeader, String header) {
190         return jsonObject.has(topHeader) && jsonObject.getAsJsonObject(topHeader).has(header);
191     }
192 }