2 * ===============================LICENSE_START======================================
\r
4 * ================================================================================
\r
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * ================================================================================
\r
7 * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * you may not use this file except in compliance with the License.
\r
9 * You may obtain a copy of the License at
\r
11 * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * Unless required by applicable law or agreed to in writing, software
\r
14 * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * See the License for the specific language governing permissions and
\r
17 * limitations under the License.
\r
18 * ============================LICENSE_END===========================================
\r
21 package org.openecomp.dcae.apod.analytics.cdap.plugins.utils;
\r
23 import co.cask.cdap.api.data.format.StructuredRecord;
\r
24 import co.cask.cdap.api.data.schema.Schema;
\r
25 import co.cask.cdap.etl.api.PipelineConfigurer;
\r
26 import com.google.common.base.Function;
\r
27 import com.google.common.base.Splitter;
\r
28 import com.google.common.collect.Lists;
\r
29 import com.google.common.collect.Maps;
\r
30 import org.apache.commons.lang3.StringUtils;
\r
31 import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
\r
32 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.schema.dmaap.DMaaPSourceOutputSchema;
\r
33 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
\r
34 import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;
\r
35 import org.slf4j.Logger;
\r
36 import org.slf4j.LoggerFactory;
\r
38 import java.io.IOException;
\r
39 import java.util.Arrays;
\r
40 import java.util.LinkedList;
\r
41 import java.util.List;
\r
42 import java.util.Map;
\r
44 import javax.annotation.Nonnull;
\r
45 import javax.annotation.Nullable;
\r
48 * @author Rajiv Singla . Creation Date: 1/26/2017.
\r
50 public abstract class CDAPPluginUtils extends AnalyticsModelJsonUtils {
\r
52 private static final Logger LOG = LoggerFactory.getLogger(CDAPPluginUtils.class);
\r
54 public static final Function<Schema, Schema.Type> SCHEMA_TO_TYPE_FUNCTION = new Function<Schema, Schema.Type>() {
\r
56 public Schema.Type apply(@Nonnull Schema schema) {
\r
57 return schema.getType();
\r
63 private CDAPPluginUtils() {
\r
64 // private constructor
\r
68 * Validates if CDAP Schema contains expected fields
\r
70 * @param schema schema that need to be validated
\r
71 * @param expectedFields fields that are expected to be in the schema
\r
74 public static void validateSchemaContainsFields(@Nullable final Schema schema, final String... expectedFields) {
\r
76 LOG.debug("Validating schema:{} contains expected fields:{}", schema, Arrays.toString(expectedFields));
\r
78 if (schema == null) {
\r
79 // If input schema is null then no validation possible
\r
80 LOG.warn("Input Schema is null. No validation possible");
\r
82 // Check if expected fields are indeed present in the schema
\r
83 for (String expectedField : expectedFields) {
\r
84 final Schema.Field schemaField = schema.getField(expectedField);
\r
85 if (schemaField == null) {
\r
86 final String errorMessage = String.format(
\r
87 "Unable to find expected field: %s, in schema: %s", expectedField, schema);
\r
88 throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
\r
91 LOG.debug("Successfully validated schema:{}, contains expected fields:{}", schema,
\r
92 Arrays.toString(expectedFields));
\r
98 * Creates a new Structured Record containing DMaaP MR fetched message
\r
100 * @param message DMaaP MR fetch message
\r
102 * @return Structured record containing DMaaP MR Message
\r
104 public static StructuredRecord createDMaaPMRResponseStructuredRecord(final String message) {
\r
105 StructuredRecord.Builder recordBuilder = StructuredRecord.builder(DMaaPSourceOutputSchema.getSchema());
\r
107 .set(DMaaPSourceOutputSchema.TIMESTAMP.getSchemaColumnName(), System.nanoTime())
\r
108 .set(DMaaPSourceOutputSchema.RESPONSE_CODE.getSchemaColumnName(), 200)
\r
109 .set(DMaaPSourceOutputSchema.RESPONSE_MESSAGE.getSchemaColumnName(), "OK")
\r
110 .set(DMaaPSourceOutputSchema.FETCHED_MESSAGE.getSchemaColumnName(), message);
\r
111 return recordBuilder.build();
\r
116 * Creates output StructuredRecord Builder which has copied values from input StructuredRecord
\r
118 * @param outputSchema output Schema
\r
119 * @param inputStructuredRecord input Structured Record
\r
121 * @return output Structured Record builder with pre populated values from input structured record
\r
123 public static StructuredRecord.Builder createOutputStructuredRecordBuilder(
\r
124 @Nonnull final Schema outputSchema,
\r
125 @Nonnull final StructuredRecord inputStructuredRecord) {
\r
127 // Get input structured Record Schema
\r
128 final Schema inputSchema = inputStructuredRecord.getSchema();
\r
129 // Create new instance of output Structured Record Builder from output Schema
\r
130 final StructuredRecord.Builder outputStructuredRecordBuilder = StructuredRecord.builder(outputSchema);
\r
132 // iterate over input fields and if output schema has field with same name copy the value to out record builder
\r
133 for (Schema.Field inputField : inputSchema.getFields()) {
\r
134 final String inputFieldName = inputField.getName();
\r
135 if (outputSchema.getField(inputFieldName) != null) {
\r
136 outputStructuredRecordBuilder.set(inputFieldName, inputStructuredRecord.get(inputFieldName));
\r
140 return outputStructuredRecordBuilder;
\r
145 * Adds Field value to StructuredRecord Builder if schema contains that field Name
\r
147 * @param structuredRecordBuilder structured record builder
\r
148 * @param structuredRecordSchema schema for structured record builder
\r
149 * @param fieldName field name
\r
150 * @param fieldValue field value
\r
152 * @return structured record builder with populated field name and value if schema contains field name
\r
154 public static StructuredRecord.Builder addFieldValueToStructuredRecordBuilder(
\r
155 @Nonnull final StructuredRecord.Builder structuredRecordBuilder,
\r
156 @Nonnull final Schema structuredRecordSchema,
\r
157 @Nonnull final String fieldName,
\r
158 final Object fieldValue) {
\r
160 // check if schema contains field Name
\r
161 if (structuredRecordSchema.getField(fieldName) != null) {
\r
162 structuredRecordBuilder.set(fieldName, fieldValue);
\r
164 LOG.info("Unable to populate value for field Name: {} with field value: {}. " +
\r
165 "Schema Fields: {} does not contain field name: {}",
\r
166 fieldName, fieldValue, structuredRecordSchema.getFields(), fieldName);
\r
169 return structuredRecordBuilder;
\r
174 * Validates that given schema String has fieldName of expected type. If field does not exist in given schema
\r
175 * then validation will pass with warning. If field does exist in given schema then this validation will return
\r
176 * true if field type is same as expected type else false
\r
178 * @param schemaString CDAP Plugin output or input schema string
\r
179 * @param fieldName field name
\r
180 * @param expectedFieldType expected schema field type
\r
182 * @return true if field type matches expected field type else false. If field does not exist in
\r
183 * give schema validation will pass but will generate a warning message
\r
185 public static boolean validateSchemaFieldType(@Nonnull final String schemaString,
\r
186 @Nonnull final String fieldName,
\r
187 @Nonnull final Schema.Type expectedFieldType) {
\r
190 // parse given schema String
\r
191 final Schema outputSchema = Schema.parseJson(schemaString);
\r
192 final Schema.Field schemaField = outputSchema.getField(fieldName);
\r
194 // if given schema does contain field then validated fieldName type
\r
195 if (schemaField != null) {
\r
197 final List<Schema> schemas = new LinkedList<>();
\r
199 // if it is a union type then grab all union schemas
\r
200 if (outputSchema.getField(fieldName).getSchema().getType() == Schema.Type.UNION) {
\r
201 final List<Schema> unionFieldSchemas =
\r
202 outputSchema.getField(fieldName).getSchema().getUnionSchemas();
\r
203 schemas.addAll(unionFieldSchemas);
\r
205 // if not union type the just get the field schema
\r
206 final Schema fieldSchema = outputSchema.getField(fieldName).getSchema();
\r
207 schemas.add(fieldSchema);
\r
210 // get all schema types
\r
211 final List<Schema.Type> fieldTypes =
\r
212 Lists.transform(schemas, CDAPPluginUtils.SCHEMA_TO_TYPE_FUNCTION);
\r
214 // if all schema types does not contain expected field type then return false
\r
215 if (!fieldTypes.contains(expectedFieldType)) {
\r
216 LOG.error("Validation failed for fieldName: {} is NOT of expected Type: {} in schema: {}",
\r
217 fieldName, expectedFieldType, outputSchema);
\r
221 // field type validation passed
\r
222 LOG.debug("Successfully validated fieldName: {} is of expected Type: {}",
\r
223 fieldName, expectedFieldType);
\r
229 // if field does not exist then the validation will pass but will generate warning message
\r
230 LOG.warn("Validation of field type not possible. Field name: {} does not exist in schema: {}",
\r
231 fieldName, outputSchema);
\r
235 } catch (IOException e) {
\r
236 final String errorMessage =
\r
237 String.format("Unable to parse schema: %s for field type validation. " +
\r
238 "Field Name: %s, Expected Field Type: %s Exception: %s",
\r
239 schemaString, fieldName, expectedFieldType, e);
\r
240 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
\r
247 * Parses provided schema String as Schema object and set it as output Schema format
\r
249 * @param pipelineConfigurer plugin pipeline configurer
\r
250 * @param schemaString schema String to be set as output schema
\r
252 public static void setOutputSchema(final PipelineConfigurer pipelineConfigurer, final String schemaString) {
\r
254 final Schema outputSchema = Schema.parseJson(schemaString);
\r
255 pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema);
\r
256 } catch (IOException e) {
\r
257 final String errorMessage = String.format(
\r
258 "Schema specified is not a valid JSON. Schema String: %s, Exception: %s", schemaString, e);
\r
259 throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
\r
265 * Parses incoming plugin config mapping to key value map. If any of the key value map is blank an Illegal Argument
\r
266 * exception will be thrown
\r
268 * @param mappingFieldString field Mapping String
\r
270 * @return map containing mapping key values
\r
272 public static Map<String, String> extractFieldMappings(final String mappingFieldString) {
\r
273 final Map<String, String> fieldMappings = Maps.newHashMap();
\r
274 if (StringUtils.isNotBlank(mappingFieldString)) {
\r
275 final Splitter commaSplitter = Splitter.on(",");
\r
276 for (String fieldMapping : commaSplitter.split(mappingFieldString)) {
\r
277 final String[] keyValueMappings = fieldMapping.split(":");
\r
278 if (keyValueMappings.length != 2 ||
\r
279 StringUtils.isBlank(keyValueMappings[0]) ||
\r
280 StringUtils.isBlank(keyValueMappings[1])) {
\r
281 final String errorMessage = "Field Mapping key or value is Blank. All field mappings must " +
\r
282 "be present in mappings: " + mappingFieldString;
\r
283 throw new DCAEAnalyticsRuntimeException(
\r
284 errorMessage, LOG, new IllegalArgumentException(errorMessage));
\r
286 fieldMappings.put(keyValueMappings[0].trim(), keyValueMappings[1].trim());
\r
289 return fieldMappings;
\r