Sonar major issue
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-plugins / src / main / java / org / onap / dcae / apod / analytics / cdap / plugins / utils / CDAPPluginUtils.java
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.onap.dcae.apod.analytics.cdap.plugins.utils;
22
23 import co.cask.cdap.api.data.format.StructuredRecord;
24 import co.cask.cdap.api.data.schema.Schema;
25 import co.cask.cdap.etl.api.PipelineConfigurer;
26 import com.google.common.base.Function;
27 import com.google.common.base.Splitter;
28 import com.google.common.collect.Lists;
29 import com.google.common.collect.Maps;
30 import org.apache.commons.lang3.StringUtils;
31 import org.onap.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
32 import org.onap.dcae.apod.analytics.cdap.plugins.domain.schema.dmaap.DMaaPSourceOutputSchema;
33 import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
34 import org.onap.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 import java.io.IOException;
39 import java.util.Arrays;
40 import java.util.LinkedList;
41 import java.util.List;
42 import java.util.Map;
43
44 import javax.annotation.Nonnull;
45 import javax.annotation.Nullable;
46
47 /**
48  * @author Rajiv Singla . Creation Date: 1/26/2017.
49  */
50 public abstract class CDAPPluginUtils extends AnalyticsModelJsonUtils {
51
52     private static final Logger LOG = LoggerFactory.getLogger(CDAPPluginUtils.class);
53
54     public static final Function<Schema, Schema.Type> SCHEMA_TO_TYPE_FUNCTION = new Function<Schema, Schema.Type>() {
55         @Override
56         public Schema.Type apply(Schema schema) {
57             return schema.getType();
58         }
59     };
60
61
62
63     private CDAPPluginUtils() {
64         // private constructor
65     }
66
67     /**
68      * Validates if CDAP Schema contains expected fields
69      *
70      * @param schema schema that need to be validated
71      * @param expectedFields fields that are expected to be in the schema
72      */
73
74     public static void validateSchemaContainsFields(@Nullable final Schema schema, final String... expectedFields) {
75
76         LOG.debug("Validating schema:{} contains expected fields:{}", schema, Arrays.toString(expectedFields));
77
78         if (schema == null) {
79             // If input schema is null then no validation possible
80             LOG.warn("Input Schema is null. No validation possible");
81         } else {
82             // Check if expected fields are indeed present in the schema
83             for (String expectedField : expectedFields) {
84                 final Schema.Field schemaField = schema.getField(expectedField);
85                 if (schemaField == null) {
86                     final String errorMessage = String.format(
87                             "Unable to find expected field: %s, in schema: %s", expectedField, schema);
88                     throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
89                 }
90             }
91             LOG.debug("Successfully validated schema:{}, contains expected fields:{}", schema,
92                     Arrays.toString(expectedFields));
93         }
94     }
95
96
97     /**
98      * Creates a new Structured Record containing DMaaP MR fetched message
99      *
100      * @param message DMaaP MR fetch message
101      *
102      * @return Structured record containing DMaaP MR Message
103      */
104     public static StructuredRecord createDMaaPMRResponseStructuredRecord(final String message) {
105         StructuredRecord.Builder recordBuilder = StructuredRecord.builder(DMaaPSourceOutputSchema.getSchema());
106         recordBuilder
107                 .set(DMaaPSourceOutputSchema.TIMESTAMP.getSchemaColumnName(), System.nanoTime())
108                 .set(DMaaPSourceOutputSchema.RESPONSE_CODE.getSchemaColumnName(), 200)
109                 .set(DMaaPSourceOutputSchema.RESPONSE_MESSAGE.getSchemaColumnName(), "OK")
110                 .set(DMaaPSourceOutputSchema.FETCHED_MESSAGE.getSchemaColumnName(), message);
111         return recordBuilder.build();
112     }
113
114
115     /**
116      * Creates output StructuredRecord Builder which has copied values from input StructuredRecord
117      *
118      * @param outputSchema output Schema
119      * @param inputStructuredRecord input Structured Record
120      *
121      * @return output Structured Record builder with pre populated values from input structured record
122      */
123     public static StructuredRecord.Builder createOutputStructuredRecordBuilder(
124             @Nonnull final Schema outputSchema,
125             @Nonnull final StructuredRecord inputStructuredRecord) {
126
127         // Get input structured Record Schema
128         final Schema inputSchema = inputStructuredRecord.getSchema();
129         // Create new instance of output Structured Record Builder from output Schema
130         final StructuredRecord.Builder outputStructuredRecordBuilder = StructuredRecord.builder(outputSchema);
131
132         // iterate over input fields and if output schema has field with same name copy the value to out record builder
133         for (Schema.Field inputField : inputSchema.getFields()) {
134             final String inputFieldName = inputField.getName();
135             if (outputSchema.getField(inputFieldName) != null) {
136                 outputStructuredRecordBuilder.set(inputFieldName, inputStructuredRecord.get(inputFieldName));
137             }
138         }
139
140         return outputStructuredRecordBuilder;
141     }
142
143
144     /**
145      * Adds Field value to StructuredRecord Builder if schema contains that field Name
146      *
147      * @param structuredRecordBuilder structured record builder
148      * @param structuredRecordSchema schema for structured record builder
149      * @param fieldName field name
150      * @param fieldValue field value
151      *
152      * @return structured record builder with populated field name and value if schema contains field name
153      */
154     public static StructuredRecord.Builder addFieldValueToStructuredRecordBuilder(
155             @Nonnull final StructuredRecord.Builder structuredRecordBuilder,
156             @Nonnull final Schema structuredRecordSchema,
157             @Nonnull final String fieldName,
158             final Object fieldValue) {
159
160         // check if schema contains field Name
161         if (structuredRecordSchema.getField(fieldName) != null) {
162             structuredRecordBuilder.set(fieldName, fieldValue);
163         } else {
164             LOG.info("Unable to populate value for field Name: {} with field value: {}. " +
165                             "Schema Fields: {} does not contain field name: {}",
166                     fieldName, fieldValue, structuredRecordSchema.getFields(), fieldName);
167         }
168
169         return structuredRecordBuilder;
170     }
171
172
173     /**
174      * Validates that given schema String has fieldName of expected type. If field does not exist in given schema
175      * then validation will pass with warning. If field does exist in given schema then this validation will return
176      * true if field type is same as expected type else false
177      *
178      * @param schemaString CDAP Plugin output or input schema string
179      * @param fieldName field name
180      * @param expectedFieldType expected schema field type
181      *
182      * @return true if field type matches expected field type else false. If field does not exist in
183      * give schema validation will pass but will generate a warning message
184      */
185     public static boolean validateSchemaFieldType(@Nonnull final String schemaString,
186                                                   @Nonnull final String fieldName,
187                                                   @Nonnull final Schema.Type expectedFieldType) {
188
189         try {
190             // parse given schema String
191             final Schema outputSchema = Schema.parseJson(schemaString);
192             final Schema.Field schemaField = outputSchema.getField(fieldName);
193
194             // if given schema does contain field then validated fieldName type
195             if (schemaField != null) {
196
197                 final List<Schema> schemas = new LinkedList<>();
198
199                 // if it is a union type then grab all union schemas
200                 if (outputSchema.getField(fieldName).getSchema().getType() == Schema.Type.UNION) {
201                     final List<Schema> unionFieldSchemas =
202                             outputSchema.getField(fieldName).getSchema().getUnionSchemas();
203                     schemas.addAll(unionFieldSchemas);
204                 } else {
205                     // if not union type the just get the field schema
206                     final Schema fieldSchema = outputSchema.getField(fieldName).getSchema();
207                     schemas.add(fieldSchema);
208                 }
209
210                 // get all schema types
211                 final List<Schema.Type> fieldTypes =
212                         Lists.transform(schemas, CDAPPluginUtils.SCHEMA_TO_TYPE_FUNCTION);
213
214                 // if all schema types does not contain expected field type then return false
215                 if (!fieldTypes.contains(expectedFieldType)) {
216                     LOG.error("Validation failed for fieldName: {} is NOT of expected Type: {} in schema: {}",
217                             fieldName, expectedFieldType, outputSchema);
218                     return false;
219                 }
220
221                 // field type validation passed
222                 LOG.debug("Successfully validated fieldName: {} is of expected Type: {}",
223                         fieldName, expectedFieldType);
224
225                 return true;
226
227             } else {
228
229                 // if field does not exist then the validation will pass but will generate warning message
230                 LOG.warn("Validation of field type not possible. Field name: {} does not exist in schema: {}",
231                         fieldName, outputSchema);
232                 return true;
233             }
234
235         } catch (IOException e) {
236             final String errorMessage =
237                     String.format("Unable to parse schema: %s for field type validation. " +
238                                     "Field Name: %s, Expected Field Type: %s Exception: %s",
239                             schemaString, fieldName, expectedFieldType, e);
240             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
241         }
242
243     }
244
245
246     /**
247      * Parses provided schema String as Schema object and set it as output Schema format
248      *
249      * @param pipelineConfigurer plugin pipeline configurer
250      * @param schemaString schema String to be set as output schema
251      */
252     public static void setOutputSchema(final PipelineConfigurer pipelineConfigurer, final String schemaString) {
253         try {
254             final Schema outputSchema = Schema.parseJson(schemaString);
255             pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema);
256         } catch (IOException e) {
257             final String errorMessage = String.format(
258                     "Schema specified is not a valid JSON. Schema String: %s, Exception: %s", schemaString, e);
259             throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
260         }
261     }
262
263
264     /**
265      * Parses incoming plugin config mapping to key value map. If any of the key value map is blank an Illegal Argument
266      * exception will be thrown
267      *
268      * @param mappingFieldString field Mapping String
269      *
270      * @return map containing mapping key values
271      */
272     public static Map<String, String> extractFieldMappings(final String mappingFieldString) {
273         final Map<String, String> fieldMappings = Maps.newHashMap();
274         if (StringUtils.isNotBlank(mappingFieldString)) {
275             final Splitter commaSplitter = Splitter.on(",");
276             for (String fieldMapping : commaSplitter.split(mappingFieldString)) {
277                 final String[] keyValueMappings = fieldMapping.split(":");
278                 if (keyValueMappings.length != 2 ||
279                         StringUtils.isBlank(keyValueMappings[0]) ||
280                         StringUtils.isBlank(keyValueMappings[1])) {
281                     final String errorMessage = "Field Mapping key or value is Blank. All field mappings must " +
282                             "be present in mappings: " + mappingFieldString;
283                     throw new DCAEAnalyticsRuntimeException(
284                             errorMessage, LOG, new IllegalArgumentException(errorMessage));
285                 }
286                 fieldMappings.put(keyValueMappings[0].trim(), keyValueMappings[1].trim());
287             }
288         }
289         return fieldMappings;
290     }
291
292
293
294
295 }