2  * ===============================LICENSE_START======================================
 
   4  * ================================================================================
 
   5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  *  Licensed under the Apache License, Version 2.0 (the "License");
 
   8  *  you may not use this file except in compliance with the License.
 
   9  *   You may obtain a copy of the License at
 
  11  *          http://www.apache.org/licenses/LICENSE-2.0
 
  13  *  Unless required by applicable law or agreed to in writing, software
 
  14  *  distributed under the License is distributed on an "AS IS" BASIS,
 
  15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  *  See the License for the specific language governing permissions and
 
  17  *  limitations under the License.
 
  18  *  ============================LICENSE_END===========================================
 
  21 package org.openecomp.dcae.apod.analytics.cdap.plugins.utils;
 
  23 import co.cask.cdap.api.data.format.StructuredRecord;
 
  24 import co.cask.cdap.api.data.schema.Schema;
 
  25 import co.cask.cdap.etl.api.PipelineConfigurer;
 
  26 import com.google.common.base.Function;
 
  27 import com.google.common.base.Splitter;
 
  28 import com.google.common.collect.Lists;
 
  29 import com.google.common.collect.Maps;
 
  30 import org.apache.commons.lang3.StringUtils;
 
  31 import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
 
  32 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.schema.dmaap.DMaaPSourceOutputSchema;
 
  33 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
 
  34 import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;
 
  35 import org.slf4j.Logger;
 
  36 import org.slf4j.LoggerFactory;
 
  38 import java.io.IOException;
 
  39 import java.util.Arrays;
 
  40 import java.util.LinkedList;
 
  41 import java.util.List;
 
  44 import javax.annotation.Nonnull;
 
  45 import javax.annotation.Nullable;
 
  48  * @author Rajiv Singla . Creation Date: 1/26/2017.
 
  50 public abstract class CDAPPluginUtils extends AnalyticsModelJsonUtils {
 
  52     private static final Logger LOG = LoggerFactory.getLogger(CDAPPluginUtils.class);
 
  54     public static final Function<Schema, Schema.Type> SCHEMA_TO_TYPE_FUNCTION = new Function<Schema, Schema.Type>() {
 
  56         public Schema.Type apply(@Nonnull Schema schema) {
 
  57             return schema.getType();
 
  63     private CDAPPluginUtils() {
 
  64         // private constructor
 
  68      * Validates if CDAP Schema contains expected fields
 
  70      * @param schema schema that need to be validated
 
  71      * @param expectedFields fields that are expected to be in the schema
 
  74     public static void validateSchemaContainsFields(@Nullable final Schema schema, final String... expectedFields) {
 
  76         LOG.debug("Validating schema:{} contains expected fields:{}", schema, Arrays.toString(expectedFields));
 
  79             // If input schema is null then no validation possible
 
  80             LOG.warn("Input Schema is null. No validation possible");
 
  82             // Check if expected fields are indeed present in the schema
 
  83             for (String expectedField : expectedFields) {
 
  84                 final Schema.Field schemaField = schema.getField(expectedField);
 
  85                 if (schemaField == null) {
 
  86                     final String errorMessage = String.format(
 
  87                             "Unable to find expected field: %s, in schema: %s", expectedField, schema);
 
  88                     throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
 
  91             LOG.debug("Successfully validated schema:{}, contains expected fields:{}", schema,
 
  92                     Arrays.toString(expectedFields));
 
  98      * Creates a new Structured Record containing DMaaP MR fetched message
 
 100      * @param message DMaaP MR fetch message
 
 102      * @return Structured record containing DMaaP MR Message
 
 104     public static StructuredRecord createDMaaPMRResponseStructuredRecord(final String message) {
 
 105         StructuredRecord.Builder recordBuilder = StructuredRecord.builder(DMaaPSourceOutputSchema.getSchema());
 
 107                 .set(DMaaPSourceOutputSchema.TIMESTAMP.getSchemaColumnName(), System.nanoTime())
 
 108                 .set(DMaaPSourceOutputSchema.RESPONSE_CODE.getSchemaColumnName(), 200)
 
 109                 .set(DMaaPSourceOutputSchema.RESPONSE_MESSAGE.getSchemaColumnName(), "OK")
 
 110                 .set(DMaaPSourceOutputSchema.FETCHED_MESSAGE.getSchemaColumnName(), message);
 
 111         return recordBuilder.build();
 
 116      * Creates output {@link StructuredRecord.Builder} which has copied values from input {@link StructuredRecord}
 
 118      * @param outputSchema output Schema
 
 119      * @param inputStructuredRecord input Structured Record
 
 121      * @return output Structured Record builder with pre populated values from input structured record
 
 123     public static StructuredRecord.Builder createOutputStructuredRecordBuilder(
 
 124             @Nonnull final Schema outputSchema,
 
 125             @Nonnull final StructuredRecord inputStructuredRecord) {
 
 127         // Get input structured Record Schema
 
 128         final Schema inputSchema = inputStructuredRecord.getSchema();
 
 129         // Create new instance of output Structured Record Builder from output Schema
 
 130         final StructuredRecord.Builder outputStructuredRecordBuilder = StructuredRecord.builder(outputSchema);
 
 132         // iterate over input fields and if output schema has field with same name copy the value to out record builder
 
 133         for (Schema.Field inputField : inputSchema.getFields()) {
 
 134             final String inputFieldName = inputField.getName();
 
 135             if (outputSchema.getField(inputFieldName) != null) {
 
 136                 outputStructuredRecordBuilder.set(inputFieldName, inputStructuredRecord.get(inputFieldName));
 
 140         return outputStructuredRecordBuilder;
 
 145      * Adds Field value to {@link StructuredRecord.Builder} if schema contains that field Name
 
 147      * @param structuredRecordBuilder structured record builder
 
 148      * @param structuredRecordSchema schema for structured record builder
 
 149      * @param fieldName field name
 
 150      * @param fieldValue field value
 
 152      * @return structured record builder with populated field name and value if schema contains field name
 
 154     public static StructuredRecord.Builder addFieldValueToStructuredRecordBuilder(
 
 155             @Nonnull final StructuredRecord.Builder structuredRecordBuilder,
 
 156             @Nonnull final Schema structuredRecordSchema,
 
 157             @Nonnull final String fieldName,
 
 158             final Object fieldValue) {
 
 160         // check if schema contains field Name
 
 161         if (structuredRecordSchema.getField(fieldName) != null) {
 
 162             structuredRecordBuilder.set(fieldName, fieldValue);
 
 164             LOG.info("Unable to populate value for field Name: {} with field value: {}. " +
 
 165                             "Schema Fields: {} does not contain field name: {}",
 
 166                     fieldName, fieldValue, structuredRecordSchema.getFields(), fieldName);
 
 169         return structuredRecordBuilder;
 
 174      * Validates that given schema String has fieldName of expected type. If field does not exist in given schema
 
 175      * then validation will pass with warning. If field does exist in given schema then this validation will return
 
 176      * true if field type is same as expected type else false
 
 178      * @param schemaString CDAP Plugin output or input schema string
 
 179      * @param fieldName field name
 
 180      * @param expectedFieldType expected schema field type
 
 182      * @return true if field type matches expected field type else false. If field does not exist in
 
 183      * give schema validation will pass but will generate a warning message
 
 185     public static boolean validateSchemaFieldType(@Nonnull final String schemaString,
 
 186                                                   @Nonnull final String fieldName,
 
 187                                                   @Nonnull final Schema.Type expectedFieldType) {
 
 190             // parse given schema String
 
 191             final Schema outputSchema = Schema.parseJson(schemaString);
 
 192             final Schema.Field schemaField = outputSchema.getField(fieldName);
 
 194             // if given schema does contain field then validated fieldName type
 
 195             if (schemaField != null) {
 
 197                 final List<Schema> schemas = new LinkedList<>();
 
 199                 // if it is a union type then grab all union schemas
 
 200                 if (outputSchema.getField(fieldName).getSchema().getType() == Schema.Type.UNION) {
 
 201                     final List<Schema> unionFieldSchemas =
 
 202                             outputSchema.getField(fieldName).getSchema().getUnionSchemas();
 
 203                     schemas.addAll(unionFieldSchemas);
 
 205                     // if not union type the just get the field schema
 
 206                     final Schema fieldSchema = outputSchema.getField(fieldName).getSchema();
 
 207                     schemas.add(fieldSchema);
 
 210                 // get all schema types
 
 211                 final List<Schema.Type> fieldTypes =
 
 212                         Lists.transform(schemas, CDAPPluginUtils.SCHEMA_TO_TYPE_FUNCTION);
 
 214                 // if all schema types does not contain expected field type then return false
 
 215                 if (!fieldTypes.contains(expectedFieldType)) {
 
 216                     LOG.error("Validation failed for fieldName: {} is NOT of expected Type: {} in schema: {}",
 
 217                             fieldName, expectedFieldType, outputSchema);
 
 221                 // field type validation passed
 
 222                 LOG.debug("Successfully validated fieldName: {} is of expected Type: {}",
 
 223                         fieldName, expectedFieldType);
 
 229                 // if field does not exist then the validation will pass but will generate warning message
 
 230                 LOG.warn("Validation of field type not possible. Field name: {} does not exist in schema: {}",
 
 231                         fieldName, outputSchema);
 
 235         } catch (IOException e) {
 
 236             final String errorMessage =
 
 237                     String.format("Unable to parse schema: %s for field type validation. " +
 
 238                                     "Field Name: %s, Expected Field Type: %s Exception: %s",
 
 239                             schemaString, fieldName, expectedFieldType, e);
 
 240             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
 
 247      * Parses provided schema String as Schema object and set it as output Schema format
 
 249      * @param pipelineConfigurer plugin pipeline configurer
 
 250      * @param schemaString schema String to be set as output schema
 
 252     public static void setOutputSchema(final PipelineConfigurer pipelineConfigurer, final String schemaString) {
 
 254             final Schema outputSchema = Schema.parseJson(schemaString);
 
 255             pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema);
 
 256         } catch (IOException e) {
 
 257             final String errorMessage = String.format(
 
 258                     "Schema specified is not a valid JSON. Schema String: %s, Exception: %s", schemaString, e);
 
 259             throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
 
 265      * Parses incoming plugin config mapping to key value map. If any of the key value map is blank an Illegal Argument
 
 266      * exception will be thrown
 
 268      * @param mappingFieldString field Mapping String
 
 270      * @return map containing mapping key values
 
 272     public static Map<String, String> extractFieldMappings(final String mappingFieldString) {
 
 273         final Map<String, String> fieldMappings = Maps.newHashMap();
 
 274         if (StringUtils.isNotBlank(mappingFieldString)) {
 
 275             final Splitter commaSplitter = Splitter.on(",");
 
 276             for (String fieldMapping : commaSplitter.split(mappingFieldString)) {
 
 277                 final String[] keyValueMappings = fieldMapping.split(":");
 
 278                 if (keyValueMappings.length != 2 ||
 
 279                         StringUtils.isBlank(keyValueMappings[0]) ||
 
 280                         StringUtils.isBlank(keyValueMappings[1])) {
 
 281                     final String errorMessage = "Field Mapping key or value is Blank. All field mappings must " +
 
 282                             "be present in mappings: " + mappingFieldString;
 
 283                     throw new DCAEAnalyticsRuntimeException(
 
 284                             errorMessage, LOG, new IllegalArgumentException(errorMessage));
 
 286                 fieldMappings.put(keyValueMappings[0].trim(), keyValueMappings[1].trim());
 
 289         return fieldMappings;