Sonar Critical Fix
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-plugins / src / main / java / org / openecomp / dcae / apod / analytics / cdap / plugins / utils / CDAPPluginUtils.java
1 /*\r
2  * ===============================LICENSE_START======================================\r
3  *  dcae-analytics\r
4  * ================================================================================\r
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  *  Licensed under the Apache License, Version 2.0 (the "License");\r
8  *  you may not use this file except in compliance with the License.\r
9  *   You may obtain a copy of the License at\r
10  *\r
11  *          http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  *  Unless required by applicable law or agreed to in writing, software\r
14  *  distributed under the License is distributed on an "AS IS" BASIS,\r
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  *  See the License for the specific language governing permissions and\r
17  *  limitations under the License.\r
18  *  ============================LICENSE_END===========================================\r
19  */\r
20 \r
21 package org.openecomp.dcae.apod.analytics.cdap.plugins.utils;\r
22 \r
23 import co.cask.cdap.api.data.format.StructuredRecord;\r
24 import co.cask.cdap.api.data.schema.Schema;\r
25 import co.cask.cdap.etl.api.PipelineConfigurer;\r
26 import com.google.common.base.Function;\r
27 import com.google.common.base.Splitter;\r
28 import com.google.common.collect.Lists;\r
29 import com.google.common.collect.Maps;\r
30 import org.apache.commons.lang3.StringUtils;\r
31 import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;\r
32 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.schema.dmaap.DMaaPSourceOutputSchema;\r
33 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;\r
34 import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;\r
35 import org.slf4j.Logger;\r
36 import org.slf4j.LoggerFactory;\r
37 \r
38 import java.io.IOException;\r
39 import java.util.Arrays;\r
40 import java.util.LinkedList;\r
41 import java.util.List;\r
42 import java.util.Map;\r
43 \r
44 import javax.annotation.Nonnull;\r
45 import javax.annotation.Nullable;\r
46 \r
47 /**\r
48  * @author Rajiv Singla . Creation Date: 1/26/2017.\r
49  */\r
50 public abstract class CDAPPluginUtils extends AnalyticsModelJsonUtils {\r
51 \r
52     private static final Logger LOG = LoggerFactory.getLogger(CDAPPluginUtils.class);\r
53 \r
54     public static final Function<Schema, Schema.Type> SCHEMA_TO_TYPE_FUNCTION = new Function<Schema, Schema.Type>() {\r
55         @Override\r
56         public Schema.Type apply(@Nonnull Schema schema) {\r
57             return schema.getType();\r
58         }\r
59     };\r
60 \r
61 \r
62 \r
63     private CDAPPluginUtils() {\r
64         // private constructor\r
65     }\r
66 \r
67     /**\r
68      * Validates if CDAP Schema contains expected fields\r
69      *\r
70      * @param schema schema that need to be validated\r
71      * @param expectedFields fields that are expected to be in the schema\r
72      */\r
73 \r
74     public static void validateSchemaContainsFields(@Nullable final Schema schema, final String... expectedFields) {\r
75 \r
76         LOG.debug("Validating schema:{} contains expected fields:{}", schema, Arrays.toString(expectedFields));\r
77 \r
78         if (schema == null) {\r
79             // If input schema is null then no validation possible\r
80             LOG.warn("Input Schema is null. No validation possible");\r
81         } else {\r
82             // Check if expected fields are indeed present in the schema\r
83             for (String expectedField : expectedFields) {\r
84                 final Schema.Field schemaField = schema.getField(expectedField);\r
85                 if (schemaField == null) {\r
86                     final String errorMessage = String.format(\r
87                             "Unable to find expected field: %s, in schema: %s", expectedField, schema);\r
88                     throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage));\r
89                 }\r
90             }\r
91             LOG.debug("Successfully validated schema:{}, contains expected fields:{}", schema,\r
92                     Arrays.toString(expectedFields));\r
93         }\r
94     }\r
95 \r
96 \r
97     /**\r
98      * Creates a new Structured Record containing DMaaP MR fetched message\r
99      *\r
100      * @param message DMaaP MR fetch message\r
101      *\r
102      * @return Structured record containing DMaaP MR Message\r
103      */\r
104     public static StructuredRecord createDMaaPMRResponseStructuredRecord(final String message) {\r
105         StructuredRecord.Builder recordBuilder = StructuredRecord.builder(DMaaPSourceOutputSchema.getSchema());\r
106         recordBuilder\r
107                 .set(DMaaPSourceOutputSchema.TIMESTAMP.getSchemaColumnName(), System.nanoTime())\r
108                 .set(DMaaPSourceOutputSchema.RESPONSE_CODE.getSchemaColumnName(), 200)\r
109                 .set(DMaaPSourceOutputSchema.RESPONSE_MESSAGE.getSchemaColumnName(), "OK")\r
110                 .set(DMaaPSourceOutputSchema.FETCHED_MESSAGE.getSchemaColumnName(), message);\r
111         return recordBuilder.build();\r
112     }\r
113 \r
114 \r
115     /**\r
116      * Creates output StructuredRecord Builder which has copied values from input StructuredRecord\r
117      *\r
118      * @param outputSchema output Schema\r
119      * @param inputStructuredRecord input Structured Record\r
120      *\r
121      * @return output Structured Record builder with pre populated values from input structured record\r
122      */\r
123     public static StructuredRecord.Builder createOutputStructuredRecordBuilder(\r
124             @Nonnull final Schema outputSchema,\r
125             @Nonnull final StructuredRecord inputStructuredRecord) {\r
126 \r
127         // Get input structured Record Schema\r
128         final Schema inputSchema = inputStructuredRecord.getSchema();\r
129         // Create new instance of output Structured Record Builder from output Schema\r
130         final StructuredRecord.Builder outputStructuredRecordBuilder = StructuredRecord.builder(outputSchema);\r
131 \r
132         // iterate over input fields and if output schema has field with same name copy the value to out record builder\r
133         for (Schema.Field inputField : inputSchema.getFields()) {\r
134             final String inputFieldName = inputField.getName();\r
135             if (outputSchema.getField(inputFieldName) != null) {\r
136                 outputStructuredRecordBuilder.set(inputFieldName, inputStructuredRecord.get(inputFieldName));\r
137             }\r
138         }\r
139 \r
140         return outputStructuredRecordBuilder;\r
141     }\r
142 \r
143 \r
144     /**\r
145      * Adds Field value to StructuredRecord Builder if schema contains that field Name\r
146      *\r
147      * @param structuredRecordBuilder structured record builder\r
148      * @param structuredRecordSchema schema for structured record builder\r
149      * @param fieldName field name\r
150      * @param fieldValue field value\r
151      *\r
152      * @return structured record builder with populated field name and value if schema contains field name\r
153      */\r
154     public static StructuredRecord.Builder addFieldValueToStructuredRecordBuilder(\r
155             @Nonnull final StructuredRecord.Builder structuredRecordBuilder,\r
156             @Nonnull final Schema structuredRecordSchema,\r
157             @Nonnull final String fieldName,\r
158             final Object fieldValue) {\r
159 \r
160         // check if schema contains field Name\r
161         if (structuredRecordSchema.getField(fieldName) != null) {\r
162             structuredRecordBuilder.set(fieldName, fieldValue);\r
163         } else {\r
164             LOG.info("Unable to populate value for field Name: {} with field value: {}. " +\r
165                             "Schema Fields: {} does not contain field name: {}",\r
166                     fieldName, fieldValue, structuredRecordSchema.getFields(), fieldName);\r
167         }\r
168 \r
169         return structuredRecordBuilder;\r
170     }\r
171 \r
172 \r
173     /**\r
174      * Validates that given schema String has fieldName of expected type. If field does not exist in given schema\r
175      * then validation will pass with warning. If field does exist in given schema then this validation will return\r
176      * true if field type is same as expected type else false\r
177      *\r
178      * @param schemaString CDAP Plugin output or input schema string\r
179      * @param fieldName field name\r
180      * @param expectedFieldType expected schema field type\r
181      *\r
182      * @return true if field type matches expected field type else false. If field does not exist in\r
183      * give schema validation will pass but will generate a warning message\r
184      */\r
185     public static boolean validateSchemaFieldType(@Nonnull final String schemaString,\r
186                                                   @Nonnull final String fieldName,\r
187                                                   @Nonnull final Schema.Type expectedFieldType) {\r
188 \r
189         try {\r
190             // parse given schema String\r
191             final Schema outputSchema = Schema.parseJson(schemaString);\r
192             final Schema.Field schemaField = outputSchema.getField(fieldName);\r
193 \r
194             // if given schema does contain field then validated fieldName type\r
195             if (schemaField != null) {\r
196 \r
197                 final List<Schema> schemas = new LinkedList<>();\r
198 \r
199                 // if it is a union type then grab all union schemas\r
200                 if (outputSchema.getField(fieldName).getSchema().getType() == Schema.Type.UNION) {\r
201                     final List<Schema> unionFieldSchemas =\r
202                             outputSchema.getField(fieldName).getSchema().getUnionSchemas();\r
203                     schemas.addAll(unionFieldSchemas);\r
204                 } else {\r
205                     // if not union type the just get the field schema\r
206                     final Schema fieldSchema = outputSchema.getField(fieldName).getSchema();\r
207                     schemas.add(fieldSchema);\r
208                 }\r
209 \r
210                 // get all schema types\r
211                 final List<Schema.Type> fieldTypes =\r
212                         Lists.transform(schemas, CDAPPluginUtils.SCHEMA_TO_TYPE_FUNCTION);\r
213 \r
214                 // if all schema types does not contain expected field type then return false\r
215                 if (!fieldTypes.contains(expectedFieldType)) {\r
216                     LOG.error("Validation failed for fieldName: {} is NOT of expected Type: {} in schema: {}",\r
217                             fieldName, expectedFieldType, outputSchema);\r
218                     return false;\r
219                 }\r
220 \r
221                 // field type validation passed\r
222                 LOG.debug("Successfully validated fieldName: {} is of expected Type: {}",\r
223                         fieldName, expectedFieldType);\r
224 \r
225                 return true;\r
226 \r
227             } else {\r
228 \r
229                 // if field does not exist then the validation will pass but will generate warning message\r
230                 LOG.warn("Validation of field type not possible. Field name: {} does not exist in schema: {}",\r
231                         fieldName, outputSchema);\r
232                 return true;\r
233             }\r
234 \r
235         } catch (IOException e) {\r
236             final String errorMessage =\r
237                     String.format("Unable to parse schema: %s for field type validation. " +\r
238                                     "Field Name: %s, Expected Field Type: %s Exception: %s",\r
239                             schemaString, fieldName, expectedFieldType, e);\r
240             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
241         }\r
242 \r
243     }\r
244 \r
245 \r
246     /**\r
247      * Parses provided schema String as Schema object and set it as output Schema format\r
248      *\r
249      * @param pipelineConfigurer plugin pipeline configurer\r
250      * @param schemaString schema String to be set as output schema\r
251      */\r
252     public static void setOutputSchema(final PipelineConfigurer pipelineConfigurer, final String schemaString) {\r
253         try {\r
254             final Schema outputSchema = Schema.parseJson(schemaString);\r
255             pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema);\r
256         } catch (IOException e) {\r
257             final String errorMessage = String.format(\r
258                     "Schema specified is not a valid JSON. Schema String: %s, Exception: %s", schemaString, e);\r
259             throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage));\r
260         }\r
261     }\r
262 \r
263 \r
264     /**\r
265      * Parses incoming plugin config mapping to key value map. If any of the key value map is blank an Illegal Argument\r
266      * exception will be thrown\r
267      *\r
268      * @param mappingFieldString field Mapping String\r
269      *\r
270      * @return map containing mapping key values\r
271      */\r
272     public static Map<String, String> extractFieldMappings(final String mappingFieldString) {\r
273         final Map<String, String> fieldMappings = Maps.newHashMap();\r
274         if (StringUtils.isNotBlank(mappingFieldString)) {\r
275             final Splitter commaSplitter = Splitter.on(",");\r
276             for (String fieldMapping : commaSplitter.split(mappingFieldString)) {\r
277                 final String[] keyValueMappings = fieldMapping.split(":");\r
278                 if (keyValueMappings.length != 2 ||\r
279                         StringUtils.isBlank(keyValueMappings[0]) ||\r
280                         StringUtils.isBlank(keyValueMappings[1])) {\r
281                     final String errorMessage = "Field Mapping key or value is Blank. All field mappings must " +\r
282                             "be present in mappings: " + mappingFieldString;\r
283                     throw new DCAEAnalyticsRuntimeException(\r
284                             errorMessage, LOG, new IllegalArgumentException(errorMessage));\r
285                 }\r
286                 fieldMappings.put(keyValueMappings[0].trim(), keyValueMappings[1].trim());\r
287             }\r
288         }\r
289         return fieldMappings;\r
290     }\r
291 \r
292 \r
293 \r
294 \r
295 }\r