98f3a72a9abdd67648559399399e439aff4cdc89
[dcaegen2/collectors/datafile.git] /
1 /*
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.service;
18
19 import com.google.gson.JsonArray;
20 import com.google.gson.JsonElement;
21 import com.google.gson.JsonObject;
22 import com.google.gson.JsonParser;
23
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.Optional;
27 import java.util.stream.StreamSupport;
28
29 import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
30 import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
31 import org.springframework.util.StringUtils;
32
33 import reactor.core.publisher.Mono;
34
35 /**
36  * Parses the fileReady event and creates an array of FileData containing the information.
37  *
38  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
39  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
40  */
41 public class DmaapConsumerJsonParser {
42
43     private static final String EVENT = "event";
44     private static final String NOTIFICATION_FIELDS = "notificationFields";
45     private static final String CHANGE_IDENTIFIER = "changeIdentifier";
46     private static final String CHANGE_TYPE = "changeType";
47     private static final String NOTIFICATION_FIELDS_VERSION = "notificationFieldsVersion";
48
49     private static final String LOCATION = "location";
50     private static final String COMPRESSION = "compression";
51     private static final String FILE_FORMAT_TYPE = "fileFormatType";
52     private static final String FILE_FORMAT_VERSION = "fileFormatVersion";
53
54     /**
55      * Extract info from string and create @see
56      * {@link org.onap.dcaegen2.collectors.datafile.service.FileData}.
57      *
58      * @param monoMessage - results from DMaaP
59      * @return reactive Mono with an array of FileData
60      */
61     public Mono<List<FileData>> getJsonObject(Mono<String> monoMessage) {
62         return monoMessage.flatMap(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
63     }
64
65     private Mono<JsonElement> getJsonParserMessage(String message) {
66         return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
67                 : Mono.fromSupplier(() -> new JsonParser().parse(message));
68     }
69
70     private Mono<List<FileData>> createJsonConsumerModel(JsonElement jsonElement) {
71         return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
72                 : getFileDataFromJsonArray(jsonElement);
73     }
74
75     private Mono<List<FileData>> getFileDataFromJsonArray(JsonElement jsonElement) {
76         return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
77                 .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
78     }
79
80     public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
81         return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject());
82     }
83
84     private Mono<List<FileData>> create(Mono<JsonObject> jsonObject) {
85         return jsonObject.flatMap(monoJsonP -> !containsHeader(monoJsonP)
86                 ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
87                 : transform(monoJsonP));
88     }
89
90     private Mono<List<FileData>> transform(JsonObject jsonObject) {
91         if (containsHeader(jsonObject, EVENT, NOTIFICATION_FIELDS)) {
92             JsonObject notificationFields = jsonObject.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
93             String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER);
94             String changeType = getValueFromJson(notificationFields, CHANGE_TYPE);
95             String notificationFieldsVersion = getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION);
96             JsonArray arrayOfAdditionalFields = notificationFields.getAsJsonArray("arrayOfAdditionalFields");
97
98             if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)
99                     && arrayOfAdditionalFields != null) {
100                 Mono<List<FileData>> res =
101                         getFileDataFromJson(changeIdentifier, changeType, arrayOfAdditionalFields);
102                 return res;
103             }
104
105             if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) {
106                 return Mono.error(
107                         new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject));
108             } else if (arrayOfAdditionalFields != null) {
109                 return Mono.error(new DmaapNotFoundException(
110                         "FileReady event arrayOfAdditionalFields is missing. " + jsonObject));
111             }
112             return Mono.error(
113                     new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject));
114         }
115         return Mono.error(
116                 new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject));
117
118     }
119
120     private Mono<List<FileData>> getFileDataFromJson(String changeIdentifier, String changeType,
121             JsonArray arrayOfAdditionalFields) {
122         List<FileData> res = new ArrayList<>();
123         for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
124             if (arrayOfAdditionalFields.get(i) != null) {
125                 JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
126                 String fileFormatType = getValueFromJson(fileInfo, FILE_FORMAT_TYPE);
127                 String fileFormatVersion = getValueFromJson(fileInfo, FILE_FORMAT_VERSION);
128                 String location = getValueFromJson(fileInfo, LOCATION);
129                 String compression = getValueFromJson(fileInfo, COMPRESSION);
130                 if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType)
131                         && isLocationAndCompressionNotEmpty(location, compression)) {
132                     res.add(ImmutableFileData.builder().changeIdentifier(changeIdentifier).changeType(changeType)
133                             .location(location).compression(compression).fileFormatType(fileFormatType)
134                             .fileFormatVersion(fileFormatVersion).build());
135                 } else {
136                     return Mono.error(new DmaapNotFoundException(
137                             "FileReady event does not contain correct file format information. " + fileInfo));
138                 }
139             }
140         }
141         return Mono.just(res);
142     }
143
144     private String getValueFromJson(JsonObject jsonObject, String jsonKey) {
145         return jsonObject.has(jsonKey) ? jsonObject.get(jsonKey).getAsString() : "";
146     }
147
148     private boolean isNotificationFieldsHeaderNotEmpty(String changeIdentifier, String changeType,
149             String notificationFieldsVersion) {
150         return ((changeIdentifier != null && !changeIdentifier.isEmpty())
151                 && (changeType != null && !changeType.isEmpty())
152                 && (notificationFieldsVersion != null && !notificationFieldsVersion.isEmpty()));
153     }
154
155     private boolean isFileFormatFieldsNotEmpty(String fileFormatVersion, String fileFormatType) {
156         return ((fileFormatVersion != null && !fileFormatVersion.isEmpty())
157                 && (fileFormatType != null && !fileFormatType.isEmpty()));
158     }
159
160     private boolean isLocationAndCompressionNotEmpty(String location, String compression) {
161         return (location != null && !location.isEmpty()) && (compression != null && !compression.isEmpty());
162     }
163
164     private boolean containsHeader(JsonObject jsonObject) {
165         return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
166     }
167
168     private boolean containsHeader(JsonObject jsonObject, String topHeader, String header) {
169         return jsonObject.has(topHeader) && jsonObject.getAsJsonObject(topHeader).has(header);
170     }
171 }