-/*
- * ===============================LICENSE_START======================================
- * dcae-analytics
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============================LICENSE_END===========================================
- */
-
-package org.openecomp.dcae.apod.analytics.cdap.plugins.utils;
-
-import co.cask.cdap.api.data.format.StructuredRecord;
-import co.cask.cdap.api.data.schema.Schema;
-import co.cask.cdap.etl.api.PipelineConfigurer;
-import com.google.common.base.Function;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.lang3.StringUtils;
-import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
-import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.schema.dmaap.DMaaPSourceOutputSchema;
-import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
-import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-/**
- * @author Rajiv Singla . Creation Date: 1/26/2017.
- */
-public abstract class CDAPPluginUtils extends AnalyticsModelJsonUtils {
-
- private static final Logger LOG = LoggerFactory.getLogger(CDAPPluginUtils.class);
-
- public static final Function<Schema, Schema.Type> SCHEMA_TO_TYPE_FUNCTION = new Function<Schema, Schema.Type>() {
- @Override
- public Schema.Type apply(@Nonnull Schema schema) {
- return schema.getType();
- }
- };
-
-
-
- private CDAPPluginUtils() {
- // private constructor
- }
-
- /**
- * Validates if CDAP Schema contains expected fields
- *
- * @param schema schema that need to be validated
- * @param expectedFields fields that are expected to be in the schema
- */
-
- public static void validateSchemaContainsFields(@Nullable final Schema schema, final String... expectedFields) {
-
- LOG.debug("Validating schema:{} contains expected fields:{}", schema, Arrays.toString(expectedFields));
-
- if (schema == null) {
- // If input schema is null then no validation possible
- LOG.warn("Input Schema is null. No validation possible");
- } else {
- // Check if expected fields are indeed present in the schema
- for (String expectedField : expectedFields) {
- final Schema.Field schemaField = schema.getField(expectedField);
- if (schemaField == null) {
- final String errorMessage = String.format(
- "Unable to find expected field: %s, in schema: %s", expectedField, schema);
- throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
- }
- }
- LOG.debug("Successfully validated schema:{}, contains expected fields:{}", schema,
- Arrays.toString(expectedFields));
- }
- }
-
-
- /**
- * Creates a new Structured Record containing DMaaP MR fetched message
- *
- * @param message DMaaP MR fetch message
- *
- * @return Structured record containing DMaaP MR Message
- */
- public static StructuredRecord createDMaaPMRResponseStructuredRecord(final String message) {
- StructuredRecord.Builder recordBuilder = StructuredRecord.builder(DMaaPSourceOutputSchema.getSchema());
- recordBuilder
- .set(DMaaPSourceOutputSchema.TIMESTAMP.getSchemaColumnName(), System.nanoTime())
- .set(DMaaPSourceOutputSchema.RESPONSE_CODE.getSchemaColumnName(), 200)
- .set(DMaaPSourceOutputSchema.RESPONSE_MESSAGE.getSchemaColumnName(), "OK")
- .set(DMaaPSourceOutputSchema.FETCHED_MESSAGE.getSchemaColumnName(), message);
- return recordBuilder.build();
- }
-
-
- /**
- * Creates output {@link StructuredRecord.Builder} which has copied values from input {@link StructuredRecord}
- *
- * @param outputSchema output Schema
- * @param inputStructuredRecord input Structured Record
- *
- * @return output Structured Record builder with pre populated values from input structured record
- */
- public static StructuredRecord.Builder createOutputStructuredRecordBuilder(
- @Nonnull final Schema outputSchema,
- @Nonnull final StructuredRecord inputStructuredRecord) {
-
- // Get input structured Record Schema
- final Schema inputSchema = inputStructuredRecord.getSchema();
- // Create new instance of output Structured Record Builder from output Schema
- final StructuredRecord.Builder outputStructuredRecordBuilder = StructuredRecord.builder(outputSchema);
-
- // iterate over input fields and if output schema has field with same name copy the value to out record builder
- for (Schema.Field inputField : inputSchema.getFields()) {
- final String inputFieldName = inputField.getName();
- if (outputSchema.getField(inputFieldName) != null) {
- outputStructuredRecordBuilder.set(inputFieldName, inputStructuredRecord.get(inputFieldName));
- }
- }
-
- return outputStructuredRecordBuilder;
- }
-
-
- /**
- * Adds Field value to {@link StructuredRecord.Builder} if schema contains that field Name
- *
- * @param structuredRecordBuilder structured record builder
- * @param structuredRecordSchema schema for structured record builder
- * @param fieldName field name
- * @param fieldValue field value
- *
- * @return structured record builder with populated field name and value if schema contains field name
- */
- public static StructuredRecord.Builder addFieldValueToStructuredRecordBuilder(
- @Nonnull final StructuredRecord.Builder structuredRecordBuilder,
- @Nonnull final Schema structuredRecordSchema,
- @Nonnull final String fieldName,
- final Object fieldValue) {
-
- // check if schema contains field Name
- if (structuredRecordSchema.getField(fieldName) != null) {
- structuredRecordBuilder.set(fieldName, fieldValue);
- } else {
- LOG.info("Unable to populate value for field Name: {} with field value: {}. " +
- "Schema Fields: {} does not contain field name: {}",
- fieldName, fieldValue, structuredRecordSchema.getFields(), fieldName);
- }
-
- return structuredRecordBuilder;
- }
-
-
- /**
- * Validates that given schema String has fieldName of expected type. If field does not exist in given schema
- * then validation will pass with warning. If field does exist in given schema then this validation will return
- * true if field type is same as expected type else false
- *
- * @param schemaString CDAP Plugin output or input schema string
- * @param fieldName field name
- * @param expectedFieldType expected schema field type
- *
- * @return true if field type matches expected field type else false. If field does not exist in
- * give schema validation will pass but will generate a warning message
- */
- public static boolean validateSchemaFieldType(@Nonnull final String schemaString,
- @Nonnull final String fieldName,
- @Nonnull final Schema.Type expectedFieldType) {
-
- try {
- // parse given schema String
- final Schema outputSchema = Schema.parseJson(schemaString);
- final Schema.Field schemaField = outputSchema.getField(fieldName);
-
- // if given schema does contain field then validated fieldName type
- if (schemaField != null) {
-
- final List<Schema> schemas = new LinkedList<>();
-
- // if it is a union type then grab all union schemas
- if (outputSchema.getField(fieldName).getSchema().getType() == Schema.Type.UNION) {
- final List<Schema> unionFieldSchemas =
- outputSchema.getField(fieldName).getSchema().getUnionSchemas();
- schemas.addAll(unionFieldSchemas);
- } else {
- // if not union type the just get the field schema
- final Schema fieldSchema = outputSchema.getField(fieldName).getSchema();
- schemas.add(fieldSchema);
- }
-
- // get all schema types
- final List<Schema.Type> fieldTypes =
- Lists.transform(schemas, CDAPPluginUtils.SCHEMA_TO_TYPE_FUNCTION);
-
- // if all schema types does not contain expected field type then return false
- if (!fieldTypes.contains(expectedFieldType)) {
- LOG.error("Validation failed for fieldName: {} is NOT of expected Type: {} in schema: {}",
- fieldName, expectedFieldType, outputSchema);
- return false;
- }
-
- // field type validation passed
- LOG.debug("Successfully validated fieldName: {} is of expected Type: {}",
- fieldName, expectedFieldType);
-
- return true;
-
- } else {
-
- // if field does not exist then the validation will pass but will generate warning message
- LOG.warn("Validation of field type not possible. Field name: {} does not exist in schema: {}",
- fieldName, outputSchema);
- return true;
- }
-
- } catch (IOException e) {
- final String errorMessage =
- String.format("Unable to parse schema: %s for field type validation. " +
- "Field Name: %s, Expected Field Type: %s Exception: %s",
- schemaString, fieldName, expectedFieldType, e);
- throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
- }
-
- }
-
-
- /**
- * Parses provided schema String as Schema object and set it as output Schema format
- *
- * @param pipelineConfigurer plugin pipeline configurer
- * @param schemaString schema String to be set as output schema
- */
- public static void setOutputSchema(final PipelineConfigurer pipelineConfigurer, final String schemaString) {
- try {
- final Schema outputSchema = Schema.parseJson(schemaString);
- pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema);
- } catch (IOException e) {
- final String errorMessage = String.format(
- "Schema specified is not a valid JSON. Schema String: %s, Exception: %s", schemaString, e);
- throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
- }
- }
-
-
- /**
- * Parses incoming plugin config mapping to key value map. If any of the key value map is blank an Illegal Argument
- * exception will be thrown
- *
- * @param mappingFieldString field Mapping String
- *
- * @return map containing mapping key values
- */
- public static Map<String, String> extractFieldMappings(final String mappingFieldString) {
- final Map<String, String> fieldMappings = Maps.newHashMap();
- if (StringUtils.isNotBlank(mappingFieldString)) {
- final Splitter commaSplitter = Splitter.on(",");
- for (String fieldMapping : commaSplitter.split(mappingFieldString)) {
- final String[] keyValueMappings = fieldMapping.split(":");
- if (keyValueMappings.length != 2 ||
- StringUtils.isBlank(keyValueMappings[0]) ||
- StringUtils.isBlank(keyValueMappings[1])) {
- final String errorMessage = "Field Mapping key or value is Blank. All field mappings must " +
- "be present in mappings: " + mappingFieldString;
- throw new DCAEAnalyticsRuntimeException(
- errorMessage, LOG, new IllegalArgumentException(errorMessage));
- }
- fieldMappings.put(keyValueMappings[0].trim(), keyValueMappings[1].trim());
- }
- }
- return fieldMappings;
- }
-
-
-
-
-}
+/*\r
+ * ===============================LICENSE_START======================================\r
+ * dcae-analytics\r
+ * ================================================================================\r
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============================LICENSE_END===========================================\r
+ */\r
+\r
+package org.openecomp.dcae.apod.analytics.cdap.plugins.utils;\r
+\r
+import co.cask.cdap.api.data.format.StructuredRecord;\r
+import co.cask.cdap.api.data.schema.Schema;\r
+import co.cask.cdap.etl.api.PipelineConfigurer;\r
+import com.google.common.base.Function;\r
+import com.google.common.base.Splitter;\r
+import com.google.common.collect.Lists;\r
+import com.google.common.collect.Maps;\r
+import org.apache.commons.lang3.StringUtils;\r
+import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;\r
+import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.schema.dmaap.DMaaPSourceOutputSchema;\r
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;\r
+import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import java.io.IOException;\r
+import java.util.Arrays;\r
+import java.util.LinkedList;\r
+import java.util.List;\r
+import java.util.Map;\r
+\r
+import javax.annotation.Nonnull;\r
+import javax.annotation.Nullable;\r
+\r
+/**\r
+ * @author Rajiv Singla . Creation Date: 1/26/2017.\r
+ */\r
+public abstract class CDAPPluginUtils extends AnalyticsModelJsonUtils {\r
+\r
+ private static final Logger LOG = LoggerFactory.getLogger(CDAPPluginUtils.class);\r
+\r
+ public static final Function<Schema, Schema.Type> SCHEMA_TO_TYPE_FUNCTION = new Function<Schema, Schema.Type>() {\r
+ @Override\r
+ public Schema.Type apply(@Nonnull Schema schema) {\r
+ return schema.getType();\r
+ }\r
+ };\r
+\r
+\r
+\r
+ private CDAPPluginUtils() {\r
+ // private constructor\r
+ }\r
+\r
+ /**\r
+ * Validates if CDAP Schema contains expected fields\r
+ *\r
+ * @param schema schema that need to be validated\r
+ * @param expectedFields fields that are expected to be in the schema\r
+ */\r
+\r
+ public static void validateSchemaContainsFields(@Nullable final Schema schema, final String... expectedFields) {\r
+\r
+ LOG.debug("Validating schema:{} contains expected fields:{}", schema, Arrays.toString(expectedFields));\r
+\r
+ if (schema == null) {\r
+ // If input schema is null then no validation possible\r
+ LOG.warn("Input Schema is null. No validation possible");\r
+ } else {\r
+ // Check if expected fields are indeed present in the schema\r
+ for (String expectedField : expectedFields) {\r
+ final Schema.Field schemaField = schema.getField(expectedField);\r
+ if (schemaField == null) {\r
+ final String errorMessage = String.format(\r
+ "Unable to find expected field: %s, in schema: %s", expectedField, schema);\r
+ throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage));\r
+ }\r
+ }\r
+ LOG.debug("Successfully validated schema:{}, contains expected fields:{}", schema,\r
+ Arrays.toString(expectedFields));\r
+ }\r
+ }\r
+\r
+\r
+ /**\r
+ * Creates a new Structured Record containing DMaaP MR fetched message\r
+ *\r
+ * @param message DMaaP MR fetch message\r
+ *\r
+ * @return Structured record containing DMaaP MR Message\r
+ */\r
+ public static StructuredRecord createDMaaPMRResponseStructuredRecord(final String message) {\r
+ StructuredRecord.Builder recordBuilder = StructuredRecord.builder(DMaaPSourceOutputSchema.getSchema());\r
+ recordBuilder\r
+ .set(DMaaPSourceOutputSchema.TIMESTAMP.getSchemaColumnName(), System.nanoTime())\r
+ .set(DMaaPSourceOutputSchema.RESPONSE_CODE.getSchemaColumnName(), 200)\r
+ .set(DMaaPSourceOutputSchema.RESPONSE_MESSAGE.getSchemaColumnName(), "OK")\r
+ .set(DMaaPSourceOutputSchema.FETCHED_MESSAGE.getSchemaColumnName(), message);\r
+ return recordBuilder.build();\r
+ }\r
+\r
+\r
+ /**\r
+ * Creates output StructuredRecord Builder which has copied values from input StructuredRecord\r
+ *\r
+ * @param outputSchema output Schema\r
+ * @param inputStructuredRecord input Structured Record\r
+ *\r
+ * @return output Structured Record builder with pre populated values from input structured record\r
+ */\r
+ public static StructuredRecord.Builder createOutputStructuredRecordBuilder(\r
+ @Nonnull final Schema outputSchema,\r
+ @Nonnull final StructuredRecord inputStructuredRecord) {\r
+\r
+ // Get input structured Record Schema\r
+ final Schema inputSchema = inputStructuredRecord.getSchema();\r
+ // Create new instance of output Structured Record Builder from output Schema\r
+ final StructuredRecord.Builder outputStructuredRecordBuilder = StructuredRecord.builder(outputSchema);\r
+\r
+ // iterate over input fields and if output schema has field with same name copy the value to out record builder\r
+ for (Schema.Field inputField : inputSchema.getFields()) {\r
+ final String inputFieldName = inputField.getName();\r
+ if (outputSchema.getField(inputFieldName) != null) {\r
+ outputStructuredRecordBuilder.set(inputFieldName, inputStructuredRecord.get(inputFieldName));\r
+ }\r
+ }\r
+\r
+ return outputStructuredRecordBuilder;\r
+ }\r
+\r
+\r
+ /**\r
+ * Adds Field value to StructuredRecord Builder if schema contains that field Name\r
+ *\r
+ * @param structuredRecordBuilder structured record builder\r
+ * @param structuredRecordSchema schema for structured record builder\r
+ * @param fieldName field name\r
+ * @param fieldValue field value\r
+ *\r
+ * @return structured record builder with populated field name and value if schema contains field name\r
+ */\r
+ public static StructuredRecord.Builder addFieldValueToStructuredRecordBuilder(\r
+ @Nonnull final StructuredRecord.Builder structuredRecordBuilder,\r
+ @Nonnull final Schema structuredRecordSchema,\r
+ @Nonnull final String fieldName,\r
+ final Object fieldValue) {\r
+\r
+ // check if schema contains field Name\r
+ if (structuredRecordSchema.getField(fieldName) != null) {\r
+ structuredRecordBuilder.set(fieldName, fieldValue);\r
+ } else {\r
+ LOG.info("Unable to populate value for field Name: {} with field value: {}. " +\r
+ "Schema Fields: {} does not contain field name: {}",\r
+ fieldName, fieldValue, structuredRecordSchema.getFields(), fieldName);\r
+ }\r
+\r
+ return structuredRecordBuilder;\r
+ }\r
+\r
+\r
+ /**\r
+ * Validates that given schema String has fieldName of expected type. If field does not exist in given schema\r
+ * then validation will pass with warning. If field does exist in given schema then this validation will return\r
+ * true if field type is same as expected type else false\r
+ *\r
+ * @param schemaString CDAP Plugin output or input schema string\r
+ * @param fieldName field name\r
+ * @param expectedFieldType expected schema field type\r
+ *\r
+ * @return true if field type matches expected field type else false. If field does not exist in\r
+ * give schema validation will pass but will generate a warning message\r
+ */\r
+ public static boolean validateSchemaFieldType(@Nonnull final String schemaString,\r
+ @Nonnull final String fieldName,\r
+ @Nonnull final Schema.Type expectedFieldType) {\r
+\r
+ try {\r
+ // parse given schema String\r
+ final Schema outputSchema = Schema.parseJson(schemaString);\r
+ final Schema.Field schemaField = outputSchema.getField(fieldName);\r
+\r
+ // if given schema does contain field then validated fieldName type\r
+ if (schemaField != null) {\r
+\r
+ final List<Schema> schemas = new LinkedList<>();\r
+\r
+ // if it is a union type then grab all union schemas\r
+ if (outputSchema.getField(fieldName).getSchema().getType() == Schema.Type.UNION) {\r
+ final List<Schema> unionFieldSchemas =\r
+ outputSchema.getField(fieldName).getSchema().getUnionSchemas();\r
+ schemas.addAll(unionFieldSchemas);\r
+ } else {\r
+ // if not union type the just get the field schema\r
+ final Schema fieldSchema = outputSchema.getField(fieldName).getSchema();\r
+ schemas.add(fieldSchema);\r
+ }\r
+\r
+ // get all schema types\r
+ final List<Schema.Type> fieldTypes =\r
+ Lists.transform(schemas, CDAPPluginUtils.SCHEMA_TO_TYPE_FUNCTION);\r
+\r
+ // if all schema types does not contain expected field type then return false\r
+ if (!fieldTypes.contains(expectedFieldType)) {\r
+ LOG.error("Validation failed for fieldName: {} is NOT of expected Type: {} in schema: {}",\r
+ fieldName, expectedFieldType, outputSchema);\r
+ return false;\r
+ }\r
+\r
+ // field type validation passed\r
+ LOG.debug("Successfully validated fieldName: {} is of expected Type: {}",\r
+ fieldName, expectedFieldType);\r
+\r
+ return true;\r
+\r
+ } else {\r
+\r
+ // if field does not exist then the validation will pass but will generate warning message\r
+ LOG.warn("Validation of field type not possible. Field name: {} does not exist in schema: {}",\r
+ fieldName, outputSchema);\r
+ return true;\r
+ }\r
+\r
+ } catch (IOException e) {\r
+ final String errorMessage =\r
+ String.format("Unable to parse schema: %s for field type validation. " +\r
+ "Field Name: %s, Expected Field Type: %s Exception: %s",\r
+ schemaString, fieldName, expectedFieldType, e);\r
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
+ }\r
+\r
+ }\r
+\r
+\r
+ /**\r
+ * Parses provided schema String as Schema object and set it as output Schema format\r
+ *\r
+ * @param pipelineConfigurer plugin pipeline configurer\r
+ * @param schemaString schema String to be set as output schema\r
+ */\r
+ public static void setOutputSchema(final PipelineConfigurer pipelineConfigurer, final String schemaString) {\r
+ try {\r
+ final Schema outputSchema = Schema.parseJson(schemaString);\r
+ pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema);\r
+ } catch (IOException e) {\r
+ final String errorMessage = String.format(\r
+ "Schema specified is not a valid JSON. Schema String: %s, Exception: %s", schemaString, e);\r
+ throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage));\r
+ }\r
+ }\r
+\r
+\r
+ /**\r
+ * Parses incoming plugin config mapping to key value map. If any of the key value map is blank an Illegal Argument\r
+ * exception will be thrown\r
+ *\r
+ * @param mappingFieldString field Mapping String\r
+ *\r
+ * @return map containing mapping key values\r
+ */\r
+ public static Map<String, String> extractFieldMappings(final String mappingFieldString) {\r
+ final Map<String, String> fieldMappings = Maps.newHashMap();\r
+ if (StringUtils.isNotBlank(mappingFieldString)) {\r
+ final Splitter commaSplitter = Splitter.on(",");\r
+ for (String fieldMapping : commaSplitter.split(mappingFieldString)) {\r
+ final String[] keyValueMappings = fieldMapping.split(":");\r
+ if (keyValueMappings.length != 2 ||\r
+ StringUtils.isBlank(keyValueMappings[0]) ||\r
+ StringUtils.isBlank(keyValueMappings[1])) {\r
+ final String errorMessage = "Field Mapping key or value is Blank. All field mappings must " +\r
+ "be present in mappings: " + mappingFieldString;\r
+ throw new DCAEAnalyticsRuntimeException(\r
+ errorMessage, LOG, new IllegalArgumentException(errorMessage));\r
+ }\r
+ fieldMappings.put(keyValueMappings[0].trim(), keyValueMappings[1].trim());\r
+ }\r
+ }\r
+ return fieldMappings;\r
+ }\r
+\r
+\r
+\r
+\r
+}\r