TCA: Replace any openecomp reference by onap
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-plugins / src / test / java / org / onap / dcae / apod / analytics / cdap / plugins / BaseAnalyticsCDAPPluginsUnitTest.java
-/*\r
- * ===============================LICENSE_START======================================\r
- *  dcae-analytics\r
- * ================================================================================\r
- *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
- * ================================================================================\r
- *  Licensed under the Apache License, Version 2.0 (the "License");\r
- *  you may not use this file except in compliance with the License.\r
- *   You may obtain a copy of the License at\r
- *\r
- *          http://www.apache.org/licenses/LICENSE-2.0\r
- *\r
- *  Unless required by applicable law or agreed to in writing, software\r
- *  distributed under the License is distributed on an "AS IS" BASIS,\r
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- *  See the License for the specific language governing permissions and\r
- *  limitations under the License.\r
- *  ============================LICENSE_END===========================================\r
- */\r
-\r
-package org.openecomp.dcae.apod.analytics.cdap.plugins;\r
-\r
-import co.cask.cdap.api.data.schema.Schema;\r
-import co.cask.cdap.etl.api.StageMetrics;\r
-import com.fasterxml.jackson.databind.ObjectMapper;\r
-import com.google.common.base.Suppliers;\r
-import org.apache.hadoop.conf.Configuration;\r
-import org.openecomp.dcae.apod.analytics.cdap.common.CDAPPluginConstants;\r
-import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSinkPluginConfig;\r
-import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSourcePluginConfig;\r
-import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.filter.TestJsonPathFilterPluginConfig;\r
-import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.tca.TestSimpleTCAPluginConfig;\r
-import org.openecomp.dcae.apod.analytics.model.util.json.AnalyticsModelObjectMapperSupplier;\r
-import org.openecomp.dcae.apod.analytics.test.BaseDCAEAnalyticsUnitTest;\r
-\r
-import java.io.IOException;\r
-import java.io.Serializable;\r
-import java.util.LinkedHashMap;\r
-import java.util.Map;\r
-\r
-/**\r
- * @author Rajiv Singla . Creation Date: 1/23/2017.\r
- */\r
-public abstract class BaseAnalyticsCDAPPluginsUnitTest extends BaseDCAEAnalyticsUnitTest {\r
-\r
-    protected static final ObjectMapper ANALYTICS_MODEL_OBJECT_MAPPER =\r
-            Suppliers.memoize(new AnalyticsModelObjectMapperSupplier()).get();\r
-\r
-    protected static final String TCA_POLICY_JSON_FILE_LOCATION = "data/json/policy/tca_policy.json";\r
-    protected static final String CEF_MESSAGE_JSON_FILE_LOCATION = "data/json/cef/cef_message.json";\r
-    protected static final String CEF_NON_COMPLIANT_MESSAGE_JSON_FILE_LOCATION =\r
-            "data/json/cef/cef_message_with_threshold_violation.json";\r
-\r
-\r
-    protected static final String DMAAP_MR_SOURCE_PLUGIN_REFERENCE_NAME = "testDMaaPMRSource";\r
-    protected static final String DMAAP_MR_SOURCE_PLUGIN_HOST_NAME = "dcae-msrt-mtl1-ftl.homer.com";\r
-    protected static final Integer DMAAP_MR_SOURCE_PLUGIN_PORT_NUMBER = 3905;\r
-    protected static final String DMAAP_MR_SOURCE_PLUGIN_TOPIC_NAME = "com.dcae.dmaap.FTL.DcaeTestVESSub";\r
-    protected static final Integer DMAAP_MR_SOURCE_PLUGIN_POLLING_INTERVAL = 1000;\r
-    protected static final String DMAAP_MR_SOURCE_PLUGIN_PROTOCOL = "https";\r
-    protected static final String DMAAP_MR_SOURCE_PLUGIN_USERNAME = "username";\r
-    protected static final String DMAAP_MR_SOURCE_PLUGIN_PASSWORD = "password";\r
-    protected static final String DMAAP_MR_SOURCE_PLUGIN_CONTENT_TYPE = "application/json";\r
-    protected static final String DMAAP_MR_SOURCE_PLUGIN_CONSUMER_GROUP = "G1";\r
-    protected static final String DMAAP_MR_SOURCE_PLUGIN_CONSUMER_ID = "C1";\r
-    protected static final Integer DMAAP_MR_SOURCE_PLUGIN_MESSAGE_LIMIT = 100;\r
-    protected static final Integer DMAAP_MR_SOURCE_PLUGIN_TIMEOUT = 10000;\r
-\r
-\r
-    protected static final String DMAAP_MR_SINK_PLUGIN_REFERENCE_NAME = "testDMaaPMRSINK";\r
-    protected static final String DMAAP_MR_SINK_PLUGIN_HOST_NAME = "dcae-msrt-mtl1-ftl.homer.com";\r
-    protected static final Integer DMAAP_MR_SINK_PLUGIN_PORT_NUMBER = 3905;\r
-    protected static final String DMAAP_MR_SINK_PLUGIN_TOPIC_NAME = "com.dcae.dmaap.FTL.DcaeTestVESPub";\r
-    protected static final String DMAAP_MR_SINK_PLUGIN_PROTOCOL = "https";\r
-    protected static final String DMAAP_MR_SINK_PLUGIN_USERNAME = "username";\r
-    protected static final String DMAAP_MR_SINK_PLUGIN_PASSWORD = "password";\r
-    protected static final String DMAAP_MR_SINK_PLUGIN_CONTENT_TYPE = "application/json";\r
-    protected static final String DMAAP_MR_SINK_MESSAGE_COLUMN_NAME = "message";\r
-    protected static final Integer DMAAP_MR_SINK_PLUGIN_MAX_BATCH_SIZE = 10;\r
-    protected static final Integer DMAAP_MR_SINK_PLUGIN_MAX_RECOVERY_QUEUE_SIZE = 100;\r
-\r
-    protected static final String VES_MESSAGE_FIELD_NAME = "message";\r
-    protected static final String TCA_PLUGIN_ALERT_FIELD_NAME = "alert";\r
-    protected static final String TCA_PLUGIN_MESSAGE_TYPE_FIELD_NAME = "tcaMessageType";\r
-\r
-\r
-    protected static final String JSON_PATH_FILTER_PLUGIN_REFERENCE_NAME = "JsonPathFilter";\r
-    protected static final String JSON_PATH_FILTER_PLUGIN_INCOMING_JSON_FIELD_NAME = "message";\r
-    protected static final String JSON_PATH_FILTER_PLUGIN_OUTPUT_SCHEMA_FILED_NAME = "filterMatched";\r
-    protected static final String JSON_PATH_FILTER_PLUGIN_JSON_FILTER_MAPPINGS =\r
-            "$.event.commonEventHeader.domain:measurementsForVfScaling," +\r
-                    "$.event.commonEventHeader.eventName:vLoadBalancer;vFirewall";\r
-    protected static final String JSON_PATH_FILTER_PLUGIN_JSON_FILTER_OUTPUT_SCHEMA =\r
-            "{\"type\":\"record\"," +\r
-                    "\"name\":\"etlSchemaBody\",\"fields\":" +\r
-                    "[" +\r
-                    "{\"name\":\"ts\",\"type\":\"long\"}," +\r
-                    "{\"name\":\"filterMatched\",\"type\":[\"boolean\",\"null\"]}," +\r
-                    "{\"name\":\"responseCode\",\"type\":\"int\"}," +\r
-                    "{\"name\":\"responseMessage\",\"type\":\"string\"}," +\r
-                    "{\"name\":\"message\",\"type\":\"string\"}" +\r
-                    "]" +\r
-                    "}";\r
-\r
-    protected static class MockStageMetrics implements StageMetrics, Serializable {\r
-\r
-        @Override\r
-        public void count(String metricName, int delta) {\r
-            LOG.debug("Mocking metric count, MetricName: {}, Delta: {}", metricName, delta);\r
-        }\r
-\r
-        @Override\r
-        public void gauge(String metricName, long value) {\r
-            LOG.debug("Mocking metric guage, MetricName: {}, Value: {}", metricName, value);\r
-        }\r
-\r
-        @Override\r
-        public void pipelineCount(String metricName, int delta) {\r
-            LOG.debug("Mocking metric pipelineCount, MetricName: {}, Delta: {}", metricName, delta);\r
-        }\r
-\r
-        @Override\r
-        public void pipelineGauge(String metricName, long value) {\r
-            LOG.debug("Mocking metric guage, pipelineGauge: {}, Value: {}", metricName, value);\r
-        }\r
-    }\r
-\r
-    protected static TestDMaaPMRSourcePluginConfig getTestDMaaPMRSourcePluginConfig() {\r
-        final TestDMaaPMRSourcePluginConfig sourcePluginConfig = new TestDMaaPMRSourcePluginConfig();\r
-        sourcePluginConfig.setReferenceName(DMAAP_MR_SOURCE_PLUGIN_REFERENCE_NAME);\r
-        sourcePluginConfig.setHostName(DMAAP_MR_SOURCE_PLUGIN_HOST_NAME);\r
-        sourcePluginConfig.setPortNumber(DMAAP_MR_SOURCE_PLUGIN_PORT_NUMBER);\r
-        sourcePluginConfig.setTopicName(DMAAP_MR_SOURCE_PLUGIN_TOPIC_NAME);\r
-        sourcePluginConfig.setPollingInterval(DMAAP_MR_SOURCE_PLUGIN_POLLING_INTERVAL);\r
-        sourcePluginConfig.setProtocol(DMAAP_MR_SOURCE_PLUGIN_PROTOCOL);\r
-        sourcePluginConfig.setUserName(DMAAP_MR_SOURCE_PLUGIN_USERNAME);\r
-        sourcePluginConfig.setUserPassword(DMAAP_MR_SOURCE_PLUGIN_PASSWORD);\r
-        sourcePluginConfig.setContentType(DMAAP_MR_SOURCE_PLUGIN_CONTENT_TYPE);\r
-        sourcePluginConfig.setConsumerGroup(DMAAP_MR_SOURCE_PLUGIN_CONSUMER_GROUP);\r
-        sourcePluginConfig.setConsumerId(DMAAP_MR_SOURCE_PLUGIN_CONSUMER_ID);\r
-        sourcePluginConfig.setMessageLimit(DMAAP_MR_SOURCE_PLUGIN_MESSAGE_LIMIT);\r
-        sourcePluginConfig.setTimeoutMS(DMAAP_MR_SOURCE_PLUGIN_TIMEOUT);\r
-        return sourcePluginConfig;\r
-    }\r
-\r
-    protected static TestDMaaPMRSinkPluginConfig getTestDMaaPMRSinkPluginConfig() {\r
-        final TestDMaaPMRSinkPluginConfig sinkPluginConfig = new TestDMaaPMRSinkPluginConfig();\r
-        sinkPluginConfig.setReferenceName(DMAAP_MR_SINK_PLUGIN_REFERENCE_NAME);\r
-        sinkPluginConfig.setHostName(DMAAP_MR_SINK_PLUGIN_HOST_NAME);\r
-        sinkPluginConfig.setPortNumber(DMAAP_MR_SINK_PLUGIN_PORT_NUMBER);\r
-        sinkPluginConfig.setTopicName(DMAAP_MR_SINK_PLUGIN_TOPIC_NAME);\r
-        sinkPluginConfig.setProtocol(DMAAP_MR_SINK_PLUGIN_PROTOCOL);\r
-        sinkPluginConfig.setUserName(DMAAP_MR_SINK_PLUGIN_USERNAME);\r
-        sinkPluginConfig.setUserPassword(DMAAP_MR_SINK_PLUGIN_PASSWORD);\r
-        sinkPluginConfig.setContentType(DMAAP_MR_SINK_PLUGIN_CONTENT_TYPE);\r
-        sinkPluginConfig.setMessageColumnName(DMAAP_MR_SINK_MESSAGE_COLUMN_NAME);\r
-        sinkPluginConfig.setMaxBatchSize(DMAAP_MR_SINK_PLUGIN_MAX_BATCH_SIZE);\r
-        sinkPluginConfig.setMaxRecoveryQueueSize(DMAAP_MR_SINK_PLUGIN_MAX_RECOVERY_QUEUE_SIZE);\r
-        return sinkPluginConfig;\r
-    }\r
-\r
-\r
-    protected static Configuration getTestConfiguration() {\r
-        final Configuration configuration = new Configuration();\r
-        final Map<String, String> sinkConfigurationMap = createSinkConfigurationMap();\r
-        for (Map.Entry<String, String> property : sinkConfigurationMap.entrySet()) {\r
-            configuration.set(property.getKey(), property.getValue());\r
-        }\r
-        return configuration;\r
-    }\r
-\r
-    protected static Map<String, String> createSinkConfigurationMap() {\r
-\r
-        Map<String, String> sinkConfig = new LinkedHashMap<>();\r
-        sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.HOST_NAME, DMAAP_MR_SINK_PLUGIN_HOST_NAME);\r
-        sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.TOPIC_NAME, DMAAP_MR_SINK_PLUGIN_TOPIC_NAME);\r
-        sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.PORT_NUMBER,\r
-                DMAAP_MR_SINK_PLUGIN_PORT_NUMBER.toString());\r
-        sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.PROTOCOL, DMAAP_MR_SINK_PLUGIN_PROTOCOL);\r
-        sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.USER_NAME, DMAAP_MR_SINK_PLUGIN_USERNAME);\r
-        sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.USER_PASS, DMAAP_MR_SINK_PLUGIN_PASSWORD);\r
-        sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.CONTENT_TYPE,\r
-                DMAAP_MR_SINK_PLUGIN_CONTENT_TYPE);\r
-        sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.MAX_BATCH_SIZE,\r
-                DMAAP_MR_SINK_PLUGIN_MAX_BATCH_SIZE.toString());\r
-        sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.MAX_RECOVER_QUEUE_SIZE,\r
-                DMAAP_MR_SINK_PLUGIN_MAX_RECOVERY_QUEUE_SIZE.toString());\r
-        return sinkConfig;\r
-    }\r
-\r
-    protected static Schema getDMaaPMRSinkTestSchema() {\r
-        return Schema.recordOf(\r
-                "DMaaPMRSinkTestSchema",\r
-                Schema.Field.of("message", Schema.of(Schema.Type.STRING)),\r
-                Schema.Field.of("field1", Schema.of(Schema.Type.STRING))\r
-        );\r
-    }\r
-\r
-\r
-    protected static TestSimpleTCAPluginConfig getTestSimpleTCAPluginConfig() {\r
-        final String policyJson;\r
-        try {\r
-            policyJson = fromStream(TCA_POLICY_JSON_FILE_LOCATION);\r
-        } catch (IOException e) {\r
-            throw new RuntimeException("Error while parsing policy", e);\r
-        }\r
-        return new TestSimpleTCAPluginConfig(VES_MESSAGE_FIELD_NAME, policyJson, TCA_PLUGIN_ALERT_FIELD_NAME,\r
-                TCA_PLUGIN_MESSAGE_TYPE_FIELD_NAME, getSimpleTCAPluginInputSchema().toString(), false);\r
-    }\r
-\r
-    protected static Schema getSimpleTCAPluginInputSchema() {\r
-        return Schema.recordOf(\r
-                "TestSimpleTCAPluginInputSchema",\r
-                Schema.Field.of("message", Schema.of(Schema.Type.STRING)),\r
-                Schema.Field.of("inputField1", Schema.nullableOf(Schema.of(Schema.Type.STRING))),\r
-                Schema.Field.of("inputField2", Schema.nullableOf(Schema.of(Schema.Type.STRING)))\r
-        );\r
-    }\r
-\r
-    protected static Schema getJsonFilterPluginInputSchema() {\r
-        return Schema.recordOf(\r
-                "TestJsonFilterInputSchema",\r
-                Schema.Field.of("ts", Schema.of(Schema.Type.LONG)),\r
-                Schema.Field.of("responseCode", Schema.of(Schema.Type.INT)),\r
-                Schema.Field.of("responseMessage", Schema.of(Schema.Type.STRING)),\r
-                Schema.Field.of("message", Schema.of(Schema.Type.STRING))\r
-        );\r
-    }\r
-\r
-    protected static TestJsonPathFilterPluginConfig getJsonPathFilterPluginConfig() {\r
-        return new TestJsonPathFilterPluginConfig(JSON_PATH_FILTER_PLUGIN_REFERENCE_NAME,\r
-                JSON_PATH_FILTER_PLUGIN_INCOMING_JSON_FIELD_NAME,\r
-                JSON_PATH_FILTER_PLUGIN_OUTPUT_SCHEMA_FILED_NAME,\r
-                JSON_PATH_FILTER_PLUGIN_JSON_FILTER_MAPPINGS,\r
-                JSON_PATH_FILTER_PLUGIN_JSON_FILTER_OUTPUT_SCHEMA);\r
-    }\r
-\r
-}\r
+/*
+ * ===============================LICENSE_START======================================
+ *  dcae-analytics
+ * ================================================================================
+ *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins;
+
+import co.cask.cdap.api.data.schema.Schema;
+import co.cask.cdap.etl.api.StageMetrics;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Suppliers;
+import org.apache.hadoop.conf.Configuration;
+import org.onap.dcae.apod.analytics.cdap.common.CDAPPluginConstants;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSinkPluginConfig;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSourcePluginConfig;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.filter.TestJsonPathFilterPluginConfig;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.tca.TestSimpleTCAPluginConfig;
+import org.onap.dcae.apod.analytics.model.util.json.AnalyticsModelObjectMapperSupplier;
+import org.onap.dcae.apod.analytics.test.BaseDCAEAnalyticsUnitTest;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * @author Rajiv Singla . Creation Date: 1/23/2017.
+ */
+public abstract class BaseAnalyticsCDAPPluginsUnitTest extends BaseDCAEAnalyticsUnitTest {
+
+    protected static final ObjectMapper ANALYTICS_MODEL_OBJECT_MAPPER =
+            Suppliers.memoize(new AnalyticsModelObjectMapperSupplier()).get();
+
+    protected static final String TCA_POLICY_JSON_FILE_LOCATION = "data/json/policy/tca_policy.json";
+    protected static final String CEF_MESSAGE_JSON_FILE_LOCATION = "data/json/cef/cef_message.json";
+    protected static final String CEF_NON_COMPLIANT_MESSAGE_JSON_FILE_LOCATION =
+            "data/json/cef/cef_message_with_threshold_violation.json";
+
+
+    protected static final String DMAAP_MR_SOURCE_PLUGIN_REFERENCE_NAME = "testDMaaPMRSource";
+    protected static final String DMAAP_MR_SOURCE_PLUGIN_HOST_NAME = "dcae-msrt-mtl1-ftl.homer.com";
+    protected static final Integer DMAAP_MR_SOURCE_PLUGIN_PORT_NUMBER = 3905;
+    protected static final String DMAAP_MR_SOURCE_PLUGIN_TOPIC_NAME = "com.dcae.dmaap.FTL.DcaeTestVESSub";
+    protected static final Integer DMAAP_MR_SOURCE_PLUGIN_POLLING_INTERVAL = 1000;
+    protected static final String DMAAP_MR_SOURCE_PLUGIN_PROTOCOL = "https";
+    protected static final String DMAAP_MR_SOURCE_PLUGIN_USERNAME = "username";
+    protected static final String DMAAP_MR_SOURCE_PLUGIN_PASSWORD = "password";
+    protected static final String DMAAP_MR_SOURCE_PLUGIN_CONTENT_TYPE = "application/json";
+    protected static final String DMAAP_MR_SOURCE_PLUGIN_CONSUMER_GROUP = "G1";
+    protected static final String DMAAP_MR_SOURCE_PLUGIN_CONSUMER_ID = "C1";
+    protected static final Integer DMAAP_MR_SOURCE_PLUGIN_MESSAGE_LIMIT = 100;
+    protected static final Integer DMAAP_MR_SOURCE_PLUGIN_TIMEOUT = 10000;
+
+
+    protected static final String DMAAP_MR_SINK_PLUGIN_REFERENCE_NAME = "testDMaaPMRSINK";
+    protected static final String DMAAP_MR_SINK_PLUGIN_HOST_NAME = "dcae-msrt-mtl1-ftl.homer.com";
+    protected static final Integer DMAAP_MR_SINK_PLUGIN_PORT_NUMBER = 3905;
+    protected static final String DMAAP_MR_SINK_PLUGIN_TOPIC_NAME = "com.dcae.dmaap.FTL.DcaeTestVESPub";
+    protected static final String DMAAP_MR_SINK_PLUGIN_PROTOCOL = "https";
+    protected static final String DMAAP_MR_SINK_PLUGIN_USERNAME = "username";
+    protected static final String DMAAP_MR_SINK_PLUGIN_PASSWORD = "password";
+    protected static final String DMAAP_MR_SINK_PLUGIN_CONTENT_TYPE = "application/json";
+    protected static final String DMAAP_MR_SINK_MESSAGE_COLUMN_NAME = "message";
+    protected static final Integer DMAAP_MR_SINK_PLUGIN_MAX_BATCH_SIZE = 10;
+    protected static final Integer DMAAP_MR_SINK_PLUGIN_MAX_RECOVERY_QUEUE_SIZE = 100;
+
+    protected static final String VES_MESSAGE_FIELD_NAME = "message";
+    protected static final String TCA_PLUGIN_ALERT_FIELD_NAME = "alert";
+    protected static final String TCA_PLUGIN_MESSAGE_TYPE_FIELD_NAME = "tcaMessageType";
+
+
+    protected static final String JSON_PATH_FILTER_PLUGIN_REFERENCE_NAME = "JsonPathFilter";
+    protected static final String JSON_PATH_FILTER_PLUGIN_INCOMING_JSON_FIELD_NAME = "message";
+    protected static final String JSON_PATH_FILTER_PLUGIN_OUTPUT_SCHEMA_FILED_NAME = "filterMatched";
+    protected static final String JSON_PATH_FILTER_PLUGIN_JSON_FILTER_MAPPINGS =
+            "$.event.commonEventHeader.domain:measurementsForVfScaling," +
+                    "$.event.commonEventHeader.eventName:vLoadBalancer;vFirewall";
+    protected static final String JSON_PATH_FILTER_PLUGIN_JSON_FILTER_OUTPUT_SCHEMA =
+            "{\"type\":\"record\"," +
+                    "\"name\":\"etlSchemaBody\",\"fields\":" +
+                    "[" +
+                    "{\"name\":\"ts\",\"type\":\"long\"}," +
+                    "{\"name\":\"filterMatched\",\"type\":[\"boolean\",\"null\"]}," +
+                    "{\"name\":\"responseCode\",\"type\":\"int\"}," +
+                    "{\"name\":\"responseMessage\",\"type\":\"string\"}," +
+                    "{\"name\":\"message\",\"type\":\"string\"}" +
+                    "]" +
+                    "}";
+
+    protected static class MockStageMetrics implements StageMetrics, Serializable {
+
+        @Override
+        public void count(String metricName, int delta) {
+            LOG.debug("Mocking metric count, MetricName: {}, Delta: {}", metricName, delta);
+        }
+
+        @Override
+        public void gauge(String metricName, long value) {
+            LOG.debug("Mocking metric guage, MetricName: {}, Value: {}", metricName, value);
+        }
+
+        @Override
+        public void pipelineCount(String metricName, int delta) {
+            LOG.debug("Mocking metric pipelineCount, MetricName: {}, Delta: {}", metricName, delta);
+        }
+
+        @Override
+        public void pipelineGauge(String metricName, long value) {
+            LOG.debug("Mocking metric guage, pipelineGauge: {}, Value: {}", metricName, value);
+        }
+    }
+
+    protected static TestDMaaPMRSourcePluginConfig getTestDMaaPMRSourcePluginConfig() {
+        final TestDMaaPMRSourcePluginConfig sourcePluginConfig = new TestDMaaPMRSourcePluginConfig();
+        sourcePluginConfig.setReferenceName(DMAAP_MR_SOURCE_PLUGIN_REFERENCE_NAME);
+        sourcePluginConfig.setHostName(DMAAP_MR_SOURCE_PLUGIN_HOST_NAME);
+        sourcePluginConfig.setPortNumber(DMAAP_MR_SOURCE_PLUGIN_PORT_NUMBER);
+        sourcePluginConfig.setTopicName(DMAAP_MR_SOURCE_PLUGIN_TOPIC_NAME);
+        sourcePluginConfig.setPollingInterval(DMAAP_MR_SOURCE_PLUGIN_POLLING_INTERVAL);
+        sourcePluginConfig.setProtocol(DMAAP_MR_SOURCE_PLUGIN_PROTOCOL);
+        sourcePluginConfig.setUserName(DMAAP_MR_SOURCE_PLUGIN_USERNAME);
+        sourcePluginConfig.setUserPassword(DMAAP_MR_SOURCE_PLUGIN_PASSWORD);
+        sourcePluginConfig.setContentType(DMAAP_MR_SOURCE_PLUGIN_CONTENT_TYPE);
+        sourcePluginConfig.setConsumerGroup(DMAAP_MR_SOURCE_PLUGIN_CONSUMER_GROUP);
+        sourcePluginConfig.setConsumerId(DMAAP_MR_SOURCE_PLUGIN_CONSUMER_ID);
+        sourcePluginConfig.setMessageLimit(DMAAP_MR_SOURCE_PLUGIN_MESSAGE_LIMIT);
+        sourcePluginConfig.setTimeoutMS(DMAAP_MR_SOURCE_PLUGIN_TIMEOUT);
+        return sourcePluginConfig;
+    }
+
+    protected static TestDMaaPMRSinkPluginConfig getTestDMaaPMRSinkPluginConfig() {
+        final TestDMaaPMRSinkPluginConfig sinkPluginConfig = new TestDMaaPMRSinkPluginConfig();
+        sinkPluginConfig.setReferenceName(DMAAP_MR_SINK_PLUGIN_REFERENCE_NAME);
+        sinkPluginConfig.setHostName(DMAAP_MR_SINK_PLUGIN_HOST_NAME);
+        sinkPluginConfig.setPortNumber(DMAAP_MR_SINK_PLUGIN_PORT_NUMBER);
+        sinkPluginConfig.setTopicName(DMAAP_MR_SINK_PLUGIN_TOPIC_NAME);
+        sinkPluginConfig.setProtocol(DMAAP_MR_SINK_PLUGIN_PROTOCOL);
+        sinkPluginConfig.setUserName(DMAAP_MR_SINK_PLUGIN_USERNAME);
+        sinkPluginConfig.setUserPassword(DMAAP_MR_SINK_PLUGIN_PASSWORD);
+        sinkPluginConfig.setContentType(DMAAP_MR_SINK_PLUGIN_CONTENT_TYPE);
+        sinkPluginConfig.setMessageColumnName(DMAAP_MR_SINK_MESSAGE_COLUMN_NAME);
+        sinkPluginConfig.setMaxBatchSize(DMAAP_MR_SINK_PLUGIN_MAX_BATCH_SIZE);
+        sinkPluginConfig.setMaxRecoveryQueueSize(DMAAP_MR_SINK_PLUGIN_MAX_RECOVERY_QUEUE_SIZE);
+        return sinkPluginConfig;
+    }
+
+
+    protected static Configuration getTestConfiguration() {
+        final Configuration configuration = new Configuration();
+        final Map<String, String> sinkConfigurationMap = createSinkConfigurationMap();
+        for (Map.Entry<String, String> property : sinkConfigurationMap.entrySet()) {
+            configuration.set(property.getKey(), property.getValue());
+        }
+        return configuration;
+    }
+
+    protected static Map<String, String> createSinkConfigurationMap() {
+
+        Map<String, String> sinkConfig = new LinkedHashMap<>();
+        sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.HOST_NAME, DMAAP_MR_SINK_PLUGIN_HOST_NAME);
+        sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.TOPIC_NAME, DMAAP_MR_SINK_PLUGIN_TOPIC_NAME);
+        sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.PORT_NUMBER,
+                DMAAP_MR_SINK_PLUGIN_PORT_NUMBER.toString());
+        sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.PROTOCOL, DMAAP_MR_SINK_PLUGIN_PROTOCOL);
+        sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.USER_NAME, DMAAP_MR_SINK_PLUGIN_USERNAME);
+        sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.USER_PASS, DMAAP_MR_SINK_PLUGIN_PASSWORD);
+        sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.CONTENT_TYPE,
+                DMAAP_MR_SINK_PLUGIN_CONTENT_TYPE);
+        sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.MAX_BATCH_SIZE,
+                DMAAP_MR_SINK_PLUGIN_MAX_BATCH_SIZE.toString());
+        sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.MAX_RECOVER_QUEUE_SIZE,
+                DMAAP_MR_SINK_PLUGIN_MAX_RECOVERY_QUEUE_SIZE.toString());
+        return sinkConfig;
+    }
+
+    protected static Schema getDMaaPMRSinkTestSchema() {
+        return Schema.recordOf(
+                "DMaaPMRSinkTestSchema",
+                Schema.Field.of("message", Schema.of(Schema.Type.STRING)),
+                Schema.Field.of("field1", Schema.of(Schema.Type.STRING))
+        );
+    }
+
+
+    protected static TestSimpleTCAPluginConfig getTestSimpleTCAPluginConfig() {
+        final String policyJson;
+        try {
+            policyJson = fromStream(TCA_POLICY_JSON_FILE_LOCATION);
+        } catch (IOException e) {
+            throw new RuntimeException("Error while parsing policy", e);
+        }
+        return new TestSimpleTCAPluginConfig(VES_MESSAGE_FIELD_NAME, policyJson, TCA_PLUGIN_ALERT_FIELD_NAME,
+                TCA_PLUGIN_MESSAGE_TYPE_FIELD_NAME, getSimpleTCAPluginInputSchema().toString(), false);
+    }
+
+    protected static Schema getSimpleTCAPluginInputSchema() {
+        return Schema.recordOf(
+                "TestSimpleTCAPluginInputSchema",
+                Schema.Field.of("message", Schema.of(Schema.Type.STRING)),
+                Schema.Field.of("inputField1", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
+                Schema.Field.of("inputField2", Schema.nullableOf(Schema.of(Schema.Type.STRING)))
+        );
+    }
+
+    protected static Schema getJsonFilterPluginInputSchema() {
+        return Schema.recordOf(
+                "TestJsonFilterInputSchema",
+                Schema.Field.of("ts", Schema.of(Schema.Type.LONG)),
+                Schema.Field.of("responseCode", Schema.of(Schema.Type.INT)),
+                Schema.Field.of("responseMessage", Schema.of(Schema.Type.STRING)),
+                Schema.Field.of("message", Schema.of(Schema.Type.STRING))
+        );
+    }
+
+    protected static TestJsonPathFilterPluginConfig getJsonPathFilterPluginConfig() {
+        return new TestJsonPathFilterPluginConfig(JSON_PATH_FILTER_PLUGIN_REFERENCE_NAME,
+                JSON_PATH_FILTER_PLUGIN_INCOMING_JSON_FIELD_NAME,
+                JSON_PATH_FILTER_PLUGIN_OUTPUT_SCHEMA_FILED_NAME,
+                JSON_PATH_FILTER_PLUGIN_JSON_FILTER_MAPPINGS,
+                JSON_PATH_FILTER_PLUGIN_JSON_FILTER_OUTPUT_SCHEMA);
+    }
+
+}