TCA: Support for VES/A&AI enrichment
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-plugins / src / main / java / org / openecomp / dcae / apod / analytics / cdap / plugins / utils / CDAPPluginUtils.java
index af191c5..3ae1560 100644 (file)
-/*
- * ===============================LICENSE_START======================================
- *  dcae-analytics
- * ================================================================================
- *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *          http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *  ============================LICENSE_END===========================================
- */
-
-package org.openecomp.dcae.apod.analytics.cdap.plugins.utils;
-
-import co.cask.cdap.api.data.format.StructuredRecord;
-import co.cask.cdap.api.data.schema.Schema;
-import co.cask.cdap.etl.api.PipelineConfigurer;
-import com.google.common.base.Function;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.lang3.StringUtils;
-import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
-import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.schema.dmaap.DMaaPSourceOutputSchema;
-import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
-import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-/**
- * @author Rajiv Singla . Creation Date: 1/26/2017.
- */
-public abstract class CDAPPluginUtils extends AnalyticsModelJsonUtils {
-
-    private static final Logger LOG = LoggerFactory.getLogger(CDAPPluginUtils.class);
-
-    public static final Function<Schema, Schema.Type> SCHEMA_TO_TYPE_FUNCTION = new Function<Schema, Schema.Type>() {
-        @Override
-        public Schema.Type apply(@Nonnull Schema schema) {
-            return schema.getType();
-        }
-    };
-
-
-
-    private CDAPPluginUtils() {
-        // private constructor
-    }
-
-    /**
-     * Validates if CDAP Schema contains expected fields
-     *
-     * @param schema schema that need to be validated
-     * @param expectedFields fields that are expected to be in the schema
-     */
-
-    public static void validateSchemaContainsFields(@Nullable final Schema schema, final String... expectedFields) {
-
-        LOG.debug("Validating schema:{} contains expected fields:{}", schema, Arrays.toString(expectedFields));
-
-        if (schema == null) {
-            // If input schema is null then no validation possible
-            LOG.warn("Input Schema is null. No validation possible");
-        } else {
-            // Check if expected fields are indeed present in the schema
-            for (String expectedField : expectedFields) {
-                final Schema.Field schemaField = schema.getField(expectedField);
-                if (schemaField == null) {
-                    final String errorMessage = String.format(
-                            "Unable to find expected field: %s, in schema: %s", expectedField, schema);
-                    throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
-                }
-            }
-            LOG.debug("Successfully validated schema:{}, contains expected fields:{}", schema,
-                    Arrays.toString(expectedFields));
-        }
-    }
-
-
-    /**
-     * Creates a new Structured Record containing DMaaP MR fetched message
-     *
-     * @param message DMaaP MR fetch message
-     *
-     * @return Structured record containing DMaaP MR Message
-     */
-    public static StructuredRecord createDMaaPMRResponseStructuredRecord(final String message) {
-        StructuredRecord.Builder recordBuilder = StructuredRecord.builder(DMaaPSourceOutputSchema.getSchema());
-        recordBuilder
-                .set(DMaaPSourceOutputSchema.TIMESTAMP.getSchemaColumnName(), System.nanoTime())
-                .set(DMaaPSourceOutputSchema.RESPONSE_CODE.getSchemaColumnName(), 200)
-                .set(DMaaPSourceOutputSchema.RESPONSE_MESSAGE.getSchemaColumnName(), "OK")
-                .set(DMaaPSourceOutputSchema.FETCHED_MESSAGE.getSchemaColumnName(), message);
-        return recordBuilder.build();
-    }
-
-
-    /**
-     * Creates output {@link StructuredRecord.Builder} which has copied values from input {@link StructuredRecord}
-     *
-     * @param outputSchema output Schema
-     * @param inputStructuredRecord input Structured Record
-     *
-     * @return output Structured Record builder with pre populated values from input structured record
-     */
-    public static StructuredRecord.Builder createOutputStructuredRecordBuilder(
-            @Nonnull final Schema outputSchema,
-            @Nonnull final StructuredRecord inputStructuredRecord) {
-
-        // Get input structured Record Schema
-        final Schema inputSchema = inputStructuredRecord.getSchema();
-        // Create new instance of output Structured Record Builder from output Schema
-        final StructuredRecord.Builder outputStructuredRecordBuilder = StructuredRecord.builder(outputSchema);
-
-        // iterate over input fields and if output schema has field with same name copy the value to out record builder
-        for (Schema.Field inputField : inputSchema.getFields()) {
-            final String inputFieldName = inputField.getName();
-            if (outputSchema.getField(inputFieldName) != null) {
-                outputStructuredRecordBuilder.set(inputFieldName, inputStructuredRecord.get(inputFieldName));
-            }
-        }
-
-        return outputStructuredRecordBuilder;
-    }
-
-
-    /**
-     * Adds Field value to {@link StructuredRecord.Builder} if schema contains that field Name
-     *
-     * @param structuredRecordBuilder structured record builder
-     * @param structuredRecordSchema schema for structured record builder
-     * @param fieldName field name
-     * @param fieldValue field value
-     *
-     * @return structured record builder with populated field name and value if schema contains field name
-     */
-    public static StructuredRecord.Builder addFieldValueToStructuredRecordBuilder(
-            @Nonnull final StructuredRecord.Builder structuredRecordBuilder,
-            @Nonnull final Schema structuredRecordSchema,
-            @Nonnull final String fieldName,
-            final Object fieldValue) {
-
-        // check if schema contains field Name
-        if (structuredRecordSchema.getField(fieldName) != null) {
-            structuredRecordBuilder.set(fieldName, fieldValue);
-        } else {
-            LOG.info("Unable to populate value for field Name: {} with field value: {}. " +
-                            "Schema Fields: {} does not contain field name: {}",
-                    fieldName, fieldValue, structuredRecordSchema.getFields(), fieldName);
-        }
-
-        return structuredRecordBuilder;
-    }
-
-
-    /**
-     * Validates that given schema String has fieldName of expected type. If field does not exist in given schema
-     * then validation will pass with warning. If field does exist in given schema then this validation will return
-     * true if field type is same as expected type else false
-     *
-     * @param schemaString CDAP Plugin output or input schema string
-     * @param fieldName field name
-     * @param expectedFieldType expected schema field type
-     *
-     * @return true if field type matches expected field type else false. If field does not exist in
-     * give schema validation will pass but will generate a warning message
-     */
-    public static boolean validateSchemaFieldType(@Nonnull final String schemaString,
-                                                  @Nonnull final String fieldName,
-                                                  @Nonnull final Schema.Type expectedFieldType) {
-
-        try {
-            // parse given schema String
-            final Schema outputSchema = Schema.parseJson(schemaString);
-            final Schema.Field schemaField = outputSchema.getField(fieldName);
-
-            // if given schema does contain field then validated fieldName type
-            if (schemaField != null) {
-
-                final List<Schema> schemas = new LinkedList<>();
-
-                // if it is a union type then grab all union schemas
-                if (outputSchema.getField(fieldName).getSchema().getType() == Schema.Type.UNION) {
-                    final List<Schema> unionFieldSchemas =
-                            outputSchema.getField(fieldName).getSchema().getUnionSchemas();
-                    schemas.addAll(unionFieldSchemas);
-                } else {
-                    // if not union type the just get the field schema
-                    final Schema fieldSchema = outputSchema.getField(fieldName).getSchema();
-                    schemas.add(fieldSchema);
-                }
-
-                // get all schema types
-                final List<Schema.Type> fieldTypes =
-                        Lists.transform(schemas, CDAPPluginUtils.SCHEMA_TO_TYPE_FUNCTION);
-
-                // if all schema types does not contain expected field type then return false
-                if (!fieldTypes.contains(expectedFieldType)) {
-                    LOG.error("Validation failed for fieldName: {} is NOT of expected Type: {} in schema: {}",
-                            fieldName, expectedFieldType, outputSchema);
-                    return false;
-                }
-
-                // field type validation passed
-                LOG.debug("Successfully validated fieldName: {} is of expected Type: {}",
-                        fieldName, expectedFieldType);
-
-                return true;
-
-            } else {
-
-                // if field does not exist then the validation will pass but will generate warning message
-                LOG.warn("Validation of field type not possible. Field name: {} does not exist in schema: {}",
-                        fieldName, outputSchema);
-                return true;
-            }
-
-        } catch (IOException e) {
-            final String errorMessage =
-                    String.format("Unable to parse schema: %s for field type validation. " +
-                                    "Field Name: %s, Expected Field Type: %s Exception: %s",
-                            schemaString, fieldName, expectedFieldType, e);
-            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
-        }
-
-    }
-
-
-    /**
-     * Parses provided schema String as Schema object and set it as output Schema format
-     *
-     * @param pipelineConfigurer plugin pipeline configurer
-     * @param schemaString schema String to be set as output schema
-     */
-    public static void setOutputSchema(final PipelineConfigurer pipelineConfigurer, final String schemaString) {
-        try {
-            final Schema outputSchema = Schema.parseJson(schemaString);
-            pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema);
-        } catch (IOException e) {
-            final String errorMessage = String.format(
-                    "Schema specified is not a valid JSON. Schema String: %s, Exception: %s", schemaString, e);
-            throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
-        }
-    }
-
-
-    /**
-     * Parses incoming plugin config mapping to key value map. If any of the key value map is blank an Illegal Argument
-     * exception will be thrown
-     *
-     * @param mappingFieldString field Mapping String
-     *
-     * @return map containing mapping key values
-     */
-    public static Map<String, String> extractFieldMappings(final String mappingFieldString) {
-        final Map<String, String> fieldMappings = Maps.newHashMap();
-        if (StringUtils.isNotBlank(mappingFieldString)) {
-            final Splitter commaSplitter = Splitter.on(",");
-            for (String fieldMapping : commaSplitter.split(mappingFieldString)) {
-                final String[] keyValueMappings = fieldMapping.split(":");
-                if (keyValueMappings.length != 2 ||
-                        StringUtils.isBlank(keyValueMappings[0]) ||
-                        StringUtils.isBlank(keyValueMappings[1])) {
-                    final String errorMessage = "Field Mapping key or value is Blank. All field mappings must " +
-                            "be present in mappings: " + mappingFieldString;
-                    throw new DCAEAnalyticsRuntimeException(
-                            errorMessage, LOG, new IllegalArgumentException(errorMessage));
-                }
-                fieldMappings.put(keyValueMappings[0].trim(), keyValueMappings[1].trim());
-            }
-        }
-        return fieldMappings;
-    }
-
-
-
-
-}
+/*\r
+ * ===============================LICENSE_START======================================\r
+ *  dcae-analytics\r
+ * ================================================================================\r
+ *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ *  Licensed under the Apache License, Version 2.0 (the "License");\r
+ *  you may not use this file except in compliance with the License.\r
+ *   You may obtain a copy of the License at\r
+ *\r
+ *          http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ *  Unless required by applicable law or agreed to in writing, software\r
+ *  distributed under the License is distributed on an "AS IS" BASIS,\r
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ *  See the License for the specific language governing permissions and\r
+ *  limitations under the License.\r
+ *  ============================LICENSE_END===========================================\r
+ */\r
+\r
+package org.openecomp.dcae.apod.analytics.cdap.plugins.utils;\r
+\r
+import co.cask.cdap.api.data.format.StructuredRecord;\r
+import co.cask.cdap.api.data.schema.Schema;\r
+import co.cask.cdap.etl.api.PipelineConfigurer;\r
+import com.google.common.base.Function;\r
+import com.google.common.base.Splitter;\r
+import com.google.common.collect.Lists;\r
+import com.google.common.collect.Maps;\r
+import org.apache.commons.lang3.StringUtils;\r
+import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;\r
+import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.schema.dmaap.DMaaPSourceOutputSchema;\r
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;\r
+import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import java.io.IOException;\r
+import java.util.Arrays;\r
+import java.util.LinkedList;\r
+import java.util.List;\r
+import java.util.Map;\r
+\r
+import javax.annotation.Nonnull;\r
+import javax.annotation.Nullable;\r
+\r
+/**\r
+ * @author Rajiv Singla . Creation Date: 1/26/2017.\r
+ */\r
+public abstract class CDAPPluginUtils extends AnalyticsModelJsonUtils {\r
+\r
+    private static final Logger LOG = LoggerFactory.getLogger(CDAPPluginUtils.class);\r
+\r
+    public static final Function<Schema, Schema.Type> SCHEMA_TO_TYPE_FUNCTION = new Function<Schema, Schema.Type>() {\r
+        @Override\r
+        public Schema.Type apply(@Nonnull Schema schema) {\r
+            return schema.getType();\r
+        }\r
+    };\r
+\r
+\r
+\r
+    private CDAPPluginUtils() {\r
+        // private constructor\r
+    }\r
+\r
+    /**\r
+     * Validates if CDAP Schema contains expected fields\r
+     *\r
+     * @param schema schema that need to be validated\r
+     * @param expectedFields fields that are expected to be in the schema\r
+     */\r
+\r
+    public static void validateSchemaContainsFields(@Nullable final Schema schema, final String... expectedFields) {\r
+\r
+        LOG.debug("Validating schema:{} contains expected fields:{}", schema, Arrays.toString(expectedFields));\r
+\r
+        if (schema == null) {\r
+            // If input schema is null then no validation possible\r
+            LOG.warn("Input Schema is null. No validation possible");\r
+        } else {\r
+            // Check if expected fields are indeed present in the schema\r
+            for (String expectedField : expectedFields) {\r
+                final Schema.Field schemaField = schema.getField(expectedField);\r
+                if (schemaField == null) {\r
+                    final String errorMessage = String.format(\r
+                            "Unable to find expected field: %s, in schema: %s", expectedField, schema);\r
+                    throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage));\r
+                }\r
+            }\r
+            LOG.debug("Successfully validated schema:{}, contains expected fields:{}", schema,\r
+                    Arrays.toString(expectedFields));\r
+        }\r
+    }\r
+\r
+\r
+    /**\r
+     * Creates a new Structured Record containing DMaaP MR fetched message\r
+     *\r
+     * @param message DMaaP MR fetch message\r
+     *\r
+     * @return Structured record containing DMaaP MR Message\r
+     */\r
+    public static StructuredRecord createDMaaPMRResponseStructuredRecord(final String message) {\r
+        StructuredRecord.Builder recordBuilder = StructuredRecord.builder(DMaaPSourceOutputSchema.getSchema());\r
+        recordBuilder\r
+                .set(DMaaPSourceOutputSchema.TIMESTAMP.getSchemaColumnName(), System.nanoTime())\r
+                .set(DMaaPSourceOutputSchema.RESPONSE_CODE.getSchemaColumnName(), 200)\r
+                .set(DMaaPSourceOutputSchema.RESPONSE_MESSAGE.getSchemaColumnName(), "OK")\r
+                .set(DMaaPSourceOutputSchema.FETCHED_MESSAGE.getSchemaColumnName(), message);\r
+        return recordBuilder.build();\r
+    }\r
+\r
+\r
+    /**\r
+     * Creates output StructuredRecord Builder which has copied values from input StructuredRecord\r
+     *\r
+     * @param outputSchema output Schema\r
+     * @param inputStructuredRecord input Structured Record\r
+     *\r
+     * @return output Structured Record builder with pre populated values from input structured record\r
+     */\r
+    public static StructuredRecord.Builder createOutputStructuredRecordBuilder(\r
+            @Nonnull final Schema outputSchema,\r
+            @Nonnull final StructuredRecord inputStructuredRecord) {\r
+\r
+        // Get input structured Record Schema\r
+        final Schema inputSchema = inputStructuredRecord.getSchema();\r
+        // Create new instance of output Structured Record Builder from output Schema\r
+        final StructuredRecord.Builder outputStructuredRecordBuilder = StructuredRecord.builder(outputSchema);\r
+\r
+        // iterate over input fields and if output schema has field with same name copy the value to out record builder\r
+        for (Schema.Field inputField : inputSchema.getFields()) {\r
+            final String inputFieldName = inputField.getName();\r
+            if (outputSchema.getField(inputFieldName) != null) {\r
+                outputStructuredRecordBuilder.set(inputFieldName, inputStructuredRecord.get(inputFieldName));\r
+            }\r
+        }\r
+\r
+        return outputStructuredRecordBuilder;\r
+    }\r
+\r
+\r
+    /**\r
+     * Adds Field value to StructuredRecord Builder if schema contains that field Name\r
+     *\r
+     * @param structuredRecordBuilder structured record builder\r
+     * @param structuredRecordSchema schema for structured record builder\r
+     * @param fieldName field name\r
+     * @param fieldValue field value\r
+     *\r
+     * @return structured record builder with populated field name and value if schema contains field name\r
+     */\r
+    public static StructuredRecord.Builder addFieldValueToStructuredRecordBuilder(\r
+            @Nonnull final StructuredRecord.Builder structuredRecordBuilder,\r
+            @Nonnull final Schema structuredRecordSchema,\r
+            @Nonnull final String fieldName,\r
+            final Object fieldValue) {\r
+\r
+        // check if schema contains field Name\r
+        if (structuredRecordSchema.getField(fieldName) != null) {\r
+            structuredRecordBuilder.set(fieldName, fieldValue);\r
+        } else {\r
+            LOG.info("Unable to populate value for field Name: {} with field value: {}. " +\r
+                            "Schema Fields: {} does not contain field name: {}",\r
+                    fieldName, fieldValue, structuredRecordSchema.getFields(), fieldName);\r
+        }\r
+\r
+        return structuredRecordBuilder;\r
+    }\r
+\r
+\r
+    /**\r
+     * Validates that given schema String has fieldName of expected type. If field does not exist in given schema\r
+     * then validation will pass with warning. If field does exist in given schema then this validation will return\r
+     * true if field type is same as expected type else false\r
+     *\r
+     * @param schemaString CDAP Plugin output or input schema string\r
+     * @param fieldName field name\r
+     * @param expectedFieldType expected schema field type\r
+     *\r
+     * @return true if field type matches expected field type else false. If field does not exist in\r
+     * give schema validation will pass but will generate a warning message\r
+     */\r
+    public static boolean validateSchemaFieldType(@Nonnull final String schemaString,\r
+                                                  @Nonnull final String fieldName,\r
+                                                  @Nonnull final Schema.Type expectedFieldType) {\r
+\r
+        try {\r
+            // parse given schema String\r
+            final Schema outputSchema = Schema.parseJson(schemaString);\r
+            final Schema.Field schemaField = outputSchema.getField(fieldName);\r
+\r
+            // if given schema does contain field then validated fieldName type\r
+            if (schemaField != null) {\r
+\r
+                final List<Schema> schemas = new LinkedList<>();\r
+\r
+                // if it is a union type then grab all union schemas\r
+                if (outputSchema.getField(fieldName).getSchema().getType() == Schema.Type.UNION) {\r
+                    final List<Schema> unionFieldSchemas =\r
+                            outputSchema.getField(fieldName).getSchema().getUnionSchemas();\r
+                    schemas.addAll(unionFieldSchemas);\r
+                } else {\r
+                    // if not union type the just get the field schema\r
+                    final Schema fieldSchema = outputSchema.getField(fieldName).getSchema();\r
+                    schemas.add(fieldSchema);\r
+                }\r
+\r
+                // get all schema types\r
+                final List<Schema.Type> fieldTypes =\r
+                        Lists.transform(schemas, CDAPPluginUtils.SCHEMA_TO_TYPE_FUNCTION);\r
+\r
+                // if all schema types does not contain expected field type then return false\r
+                if (!fieldTypes.contains(expectedFieldType)) {\r
+                    LOG.error("Validation failed for fieldName: {} is NOT of expected Type: {} in schema: {}",\r
+                            fieldName, expectedFieldType, outputSchema);\r
+                    return false;\r
+                }\r
+\r
+                // field type validation passed\r
+                LOG.debug("Successfully validated fieldName: {} is of expected Type: {}",\r
+                        fieldName, expectedFieldType);\r
+\r
+                return true;\r
+\r
+            } else {\r
+\r
+                // if field does not exist then the validation will pass but will generate warning message\r
+                LOG.warn("Validation of field type not possible. Field name: {} does not exist in schema: {}",\r
+                        fieldName, outputSchema);\r
+                return true;\r
+            }\r
+\r
+        } catch (IOException e) {\r
+            final String errorMessage =\r
+                    String.format("Unable to parse schema: %s for field type validation. " +\r
+                                    "Field Name: %s, Expected Field Type: %s Exception: %s",\r
+                            schemaString, fieldName, expectedFieldType, e);\r
+            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
+        }\r
+\r
+    }\r
+\r
+\r
+    /**\r
+     * Parses provided schema String as Schema object and set it as output Schema format\r
+     *\r
+     * @param pipelineConfigurer plugin pipeline configurer\r
+     * @param schemaString schema String to be set as output schema\r
+     */\r
+    public static void setOutputSchema(final PipelineConfigurer pipelineConfigurer, final String schemaString) {\r
+        try {\r
+            final Schema outputSchema = Schema.parseJson(schemaString);\r
+            pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema);\r
+        } catch (IOException e) {\r
+            final String errorMessage = String.format(\r
+                    "Schema specified is not a valid JSON. Schema String: %s, Exception: %s", schemaString, e);\r
+            throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage));\r
+        }\r
+    }\r
+\r
+\r
+    /**\r
+     * Parses incoming plugin config mapping to key value map. If any of the key value map is blank an Illegal Argument\r
+     * exception will be thrown\r
+     *\r
+     * @param mappingFieldString field Mapping String\r
+     *\r
+     * @return map containing mapping key values\r
+     */\r
+    public static Map<String, String> extractFieldMappings(final String mappingFieldString) {\r
+        final Map<String, String> fieldMappings = Maps.newHashMap();\r
+        if (StringUtils.isNotBlank(mappingFieldString)) {\r
+            final Splitter commaSplitter = Splitter.on(",");\r
+            for (String fieldMapping : commaSplitter.split(mappingFieldString)) {\r
+                final String[] keyValueMappings = fieldMapping.split(":");\r
+                if (keyValueMappings.length != 2 ||\r
+                        StringUtils.isBlank(keyValueMappings[0]) ||\r
+                        StringUtils.isBlank(keyValueMappings[1])) {\r
+                    final String errorMessage = "Field Mapping key or value is Blank. All field mappings must " +\r
+                            "be present in mappings: " + mappingFieldString;\r
+                    throw new DCAEAnalyticsRuntimeException(\r
+                            errorMessage, LOG, new IllegalArgumentException(errorMessage));\r
+                }\r
+                fieldMappings.put(keyValueMappings[0].trim(), keyValueMappings[1].trim());\r
+            }\r
+        }\r
+        return fieldMappings;\r
+    }\r
+\r
+\r
+\r
+\r
+}\r