TCA: Support for VES/A&AI enrichment
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-plugins / src / test / java / org / openecomp / dcae / apod / analytics / cdap / plugins / it / SimpleTCAPluginCDAPIT.java
index 73c2533..f43a3df 100644 (file)
-/*
- * ===============================LICENSE_START======================================
- *  dcae-analytics
- * ================================================================================
- *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *          http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *  ============================LICENSE_END===========================================
- */
-
-package org.openecomp.dcae.apod.analytics.cdap.plugins.it;
-
-import co.cask.cdap.api.data.format.StructuredRecord;
-import co.cask.cdap.api.data.schema.Schema;
-import co.cask.cdap.api.dataset.table.Table;
-import co.cask.cdap.api.plugin.PluginClass;
-import co.cask.cdap.api.plugin.PluginPropertyField;
-import co.cask.cdap.common.utils.Tasks;
-import co.cask.cdap.datapipeline.DataPipelineApp;
-import co.cask.cdap.datapipeline.SmartWorkflow;
-import co.cask.cdap.etl.api.batch.SparkCompute;
-import co.cask.cdap.etl.mock.batch.MockSink;
-import co.cask.cdap.etl.mock.batch.MockSource;
-import co.cask.cdap.etl.mock.test.HydratorTestBase;
-import co.cask.cdap.etl.proto.v2.ETLBatchConfig;
-import co.cask.cdap.etl.proto.v2.ETLPlugin;
-import co.cask.cdap.etl.proto.v2.ETLStage;
-import co.cask.cdap.proto.artifact.AppRequest;
-import co.cask.cdap.proto.artifact.ArtifactSummary;
-import co.cask.cdap.proto.id.ApplicationId;
-import co.cask.cdap.proto.id.ArtifactId;
-import co.cask.cdap.proto.id.NamespaceId;
-import co.cask.cdap.test.ApplicationManager;
-import co.cask.cdap.test.DataSetManager;
-import co.cask.cdap.test.WorkflowManager;
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType;
-import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator;
-import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.tca.SimpleTCAPluginConfig;
-import org.openecomp.dcae.apod.analytics.cdap.plugins.sparkcompute.tca.SimpleTCAPlugin;
-import org.openecomp.dcae.apod.analytics.common.validation.DCAEValidator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URI;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Integration Test which used CDAP Hydrator Test Base to Test Simple TCA Plugin
- *
- * @author Rajiv Singla . Creation Date: 2/17/2017.
- */
-public class SimpleTCAPluginCDAPIT extends HydratorTestBase {
-
-    private static final Logger LOG = LoggerFactory.getLogger(SimpleTCAPluginCDAPIT.class);
-
-    private static final String CDAP_PLUGIN_VERSION = "3.0-SNAPSHOT";
-    private static final String CDAP_PLUGIN_ARTIFACT_NAME = "dcae-analytics-cdap-plugins";
-
-    protected static final ArtifactId DATAPIPELINE_ARTIFACT_ID = NamespaceId.DEFAULT.artifact("data-pipeline",
-            "4.0.0");
-    protected static final ArtifactSummary DATAPIPELINE_ARTIFACT = new ArtifactSummary("data-pipeline", "4.0.0");
-
-    private static Schema sourceSchema = Schema.recordOf("CEFMessageSourceSchema",
-            Schema.Field.of("message", Schema.of(Schema.Type.STRING))
-    );
-
-    final Schema outputSchema = Schema.recordOf(
-            "outputSchema",
-            Schema.Field.of("message", Schema.of(Schema.Type.STRING)),
-            Schema.Field.of("alert", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
-            Schema.Field.of("tcaMessageType", Schema.of(Schema.Type.STRING))
-    );
-
-    @BeforeClass
-    public static void setupTest() throws Exception {
-
-        setupBatchArtifacts(DATAPIPELINE_ARTIFACT_ID, DataPipelineApp.class);
-
-
-        // Enable the below code if you want to run the test in Intelli IDEA editor
-        // addPluginArtifact(NamespaceId.DEFAULT.artifact("spark-plugins", "1.0.0"), DATAPIPELINE_ARTIFACT_ID,
-        //        SimpleTCAPlugin.class, SimpleTCAPluginConfig.class);
-
-        // Enable the below code if you want to run the test via command line
-        ArtifactId dcaeAnalyticsCdapPluginsArtifact = NamespaceId.DEFAULT.artifact(
-                CDAP_PLUGIN_ARTIFACT_NAME, CDAP_PLUGIN_VERSION);
-
-        addPluginArtifact(dcaeAnalyticsCdapPluginsArtifact, DATAPIPELINE_ARTIFACT_ID,
-                ImmutableSet.of(getSimpleTCAPluginClass()), SimpleTCAPlugin.class, SimpleTCAPluginConfig.class,
-                CDAPAppSettingsValidator.class, DCAEValidator.class);
-    }
-
-    private static PluginClass getSimpleTCAPluginClass() {
-        final HashMap<String, PluginPropertyField> properties = new HashMap<>();
-        properties.put("vesMessageFieldName", new PluginPropertyField("vesMessageFieldName", "",
-                "string", false, false));
-        properties.put("referenceName", new PluginPropertyField("referenceName", "",
-                "string", false, false));
-        properties.put("policyJson", new PluginPropertyField("policyJson", "", "string", false, false));
-        properties.put("alertFieldName", new PluginPropertyField("alertFieldName", "", "string", false, false));
-        properties.put("messageTypeFieldName", new PluginPropertyField(
-                "messageTypeFieldName", "", "string", false, false));
-        properties.put("enableAlertCEFFormat", new PluginPropertyField(
-                "enableAlertCEFFormat", "", "string", false, false));
-        properties.put("schema", new PluginPropertyField(
-                "schema", "", "string", false, false));
-
-        return new PluginClass("sparkcompute", "SimpleTCAPlugin", "", SimpleTCAPlugin.class.getName(),
-                "pluginConfig", properties);
-    }
-
-
-    @AfterClass
-    public static void cleanup() {
-    }
-
-    @Test
-    public void testTransform() throws Exception {
-
-        LOG.info("Starting Test Transform");
-
-        final String policyString = getFileContentAsString("/data/json/policy/tca_policy.json");
-        final String cefMessage = getFileContentAsString("/data/json/cef/cef_message.json");
-
-        final Map<String, String> tcaProperties = new ImmutableMap.Builder<String, String>()
-                .put("vesMessageFieldName", "message")
-                .put("referenceName", "SimpleTcaPlugin")
-                .put("policyJson", policyString)
-                .put("alertFieldName", "alert")
-                .put("messageTypeFieldName", "tcaMessageType")
-                .put("enableAlertCEFFormat", "true")
-                .put("schema", outputSchema.toString())
-                .build();
-
-        final ETLPlugin mockSourcePlugin = MockSource.getPlugin("messages", sourceSchema);
-        final ETLPlugin tcaPlugin =
-                new ETLPlugin("SimpleTCAPlugin", SparkCompute.PLUGIN_TYPE, tcaProperties, null);
-        final ETLPlugin mockSink = MockSink.getPlugin("tcaOutput");
-
-        final ETLBatchConfig etlBatchConfig = ETLBatchConfig.builder("* * * * *")
-                .addStage(new ETLStage("source", mockSourcePlugin))
-                .addStage(new ETLStage("simpleTCAPlugin", tcaPlugin))
-                .addStage(new ETLStage("sink", mockSink))
-                .addConnection("source", "simpleTCAPlugin")
-                .addConnection("simpleTCAPlugin", "sink")
-                .build();
-
-        AppRequest<ETLBatchConfig> appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, etlBatchConfig);
-        ApplicationId appId = NamespaceId.DEFAULT.app("TestSimpleTCAPlugin");
-        ApplicationManager appManager = deployApplication(appId.toId(), appRequest);
-
-        List<StructuredRecord> sourceMessages = new ArrayList<>();
-        StructuredRecord.Builder builder = StructuredRecord.builder(sourceSchema);
-        builder.set("message", cefMessage);
-        sourceMessages.add(builder.build());
-
-        // write records to source
-        DataSetManager<Table> inputManager = getDataset(NamespaceId.DEFAULT.dataset("messages"));
-        MockSource.writeInput(inputManager, sourceMessages);
-
-        // manually trigger the pipeline
-        WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME);
-        workflowManager.start();
-        workflowManager.waitForFinish(5, TimeUnit.MINUTES);
-
-        final DataSetManager<Table> outputManager = getDataset("tcaOutput");
-
-        Tasks.waitFor(
-                TCACalculatorMessageType.COMPLIANT.name(),
-                new Callable<String>() {
-                    @Override
-                    public String call() throws Exception {
-                        outputManager.flush();
-                        List<String> tcaOutputMessageType = new LinkedList<>();
-                        for (StructuredRecord outputRecord : MockSink.readOutput(outputManager)) {
-                            tcaOutputMessageType.add(outputRecord.get("tcaMessageType").toString());
-                            final List<Schema.Field> fields = outputRecord.getSchema().getFields();
-                            LOG.debug("====>> Printing output Structured Record Contents: {}", outputRecord);
-                            for (Schema.Field field : fields) {
-                                LOG.debug("Field Name: {} - Field Type: {}  ---> Field Value: {}",
-                                        field.getName(), field.getSchema().getType(),
-                                        outputRecord.get(field.getName()));
-                            }
-
-                        }
-                        return tcaOutputMessageType.get(0);
-                    }
-                },
-                4,
-                TimeUnit.MINUTES);
-
-    }
-
-    private static final String getFileContentAsString(final String fileLocation) throws Exception {
-        final URI tcaPolicyURI =
-                SimpleTCAPluginCDAPIT.class.getResource(fileLocation).toURI();
-        List<String> lines = Files.readAllLines(Paths.get(tcaPolicyURI), Charset.defaultCharset());
-        return Joiner.on("").join(lines);
-    }
-
-}
+/*\r
+ * ===============================LICENSE_START======================================\r
+ *  dcae-analytics\r
+ * ================================================================================\r
+ *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ *  Licensed under the Apache License, Version 2.0 (the "License");\r
+ *  you may not use this file except in compliance with the License.\r
+ *   You may obtain a copy of the License at\r
+ *\r
+ *          http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ *  Unless required by applicable law or agreed to in writing, software\r
+ *  distributed under the License is distributed on an "AS IS" BASIS,\r
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ *  See the License for the specific language governing permissions and\r
+ *  limitations under the License.\r
+ *  ============================LICENSE_END===========================================\r
+ */\r
+\r
+package org.openecomp.dcae.apod.analytics.cdap.plugins.it;\r
+\r
+import co.cask.cdap.api.data.format.StructuredRecord;\r
+import co.cask.cdap.api.data.schema.Schema;\r
+import co.cask.cdap.api.dataset.table.Table;\r
+import co.cask.cdap.api.plugin.PluginClass;\r
+import co.cask.cdap.api.plugin.PluginPropertyField;\r
+import co.cask.cdap.common.utils.Tasks;\r
+import co.cask.cdap.datapipeline.DataPipelineApp;\r
+import co.cask.cdap.datapipeline.SmartWorkflow;\r
+import co.cask.cdap.etl.api.batch.SparkCompute;\r
+import co.cask.cdap.etl.mock.batch.MockSink;\r
+import co.cask.cdap.etl.mock.batch.MockSource;\r
+import co.cask.cdap.etl.mock.test.HydratorTestBase;\r
+import co.cask.cdap.etl.proto.v2.ETLBatchConfig;\r
+import co.cask.cdap.etl.proto.v2.ETLPlugin;\r
+import co.cask.cdap.etl.proto.v2.ETLStage;\r
+import co.cask.cdap.proto.artifact.AppRequest;\r
+import co.cask.cdap.proto.artifact.ArtifactSummary;\r
+import co.cask.cdap.proto.id.ApplicationId;\r
+import co.cask.cdap.proto.id.ArtifactId;\r
+import co.cask.cdap.proto.id.NamespaceId;\r
+import co.cask.cdap.test.ApplicationManager;\r
+import co.cask.cdap.test.DataSetManager;\r
+import co.cask.cdap.test.WorkflowManager;\r
+import com.google.common.base.Joiner;\r
+import com.google.common.collect.ImmutableMap;\r
+import com.google.common.collect.ImmutableSet;\r
+import com.google.common.collect.Sets;\r
+import org.junit.AfterClass;\r
+import org.junit.BeforeClass;\r
+import org.junit.Test;\r
+import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType;\r
+import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator;\r
+import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.tca.SimpleTCAPluginConfig;\r
+import org.openecomp.dcae.apod.analytics.cdap.plugins.sparkcompute.tca.SimpleTCAPlugin;\r
+import org.openecomp.dcae.apod.analytics.common.validation.DCAEValidator;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import java.net.URI;\r
+import java.nio.charset.Charset;\r
+import java.nio.file.Files;\r
+import java.nio.file.Paths;\r
+import java.util.ArrayList;\r
+import java.util.Collections;\r
+import java.util.HashMap;\r
+import java.util.LinkedList;\r
+import java.util.List;\r
+import java.util.Map;\r
+import java.util.concurrent.Callable;\r
+import java.util.concurrent.TimeUnit;\r
+\r
+/**\r
+ * Integration Test which used CDAP Hydrator Test Base to Test Simple TCA Plugin\r
+ *\r
+ * @author Rajiv Singla . Creation Date: 2/17/2017.\r
+ */\r
+public class SimpleTCAPluginCDAPIT extends HydratorTestBase {\r
+\r
+    private static final Logger LOG = LoggerFactory.getLogger(SimpleTCAPluginCDAPIT.class);\r
+\r
+    private static final String CDAP_PLUGIN_VERSION = "3.0-SNAPSHOT";\r
+    private static final String CDAP_PLUGIN_ARTIFACT_NAME = "dcae-analytics-cdap-plugins";\r
+\r
+    protected static final ArtifactId DATAPIPELINE_ARTIFACT_ID = NamespaceId.DEFAULT.artifact("data-pipeline",\r
+            "4.0.0");\r
+    protected static final ArtifactSummary DATAPIPELINE_ARTIFACT = new ArtifactSummary("data-pipeline", "4.0.0");\r
+\r
+    private static Schema sourceSchema = Schema.recordOf("CEFMessageSourceSchema",\r
+            Schema.Field.of("message", Schema.of(Schema.Type.STRING))\r
+    );\r
+\r
+    final Schema outputSchema = Schema.recordOf(\r
+            "outputSchema",\r
+            Schema.Field.of("message", Schema.of(Schema.Type.STRING)),\r
+            Schema.Field.of("alert", Schema.nullableOf(Schema.of(Schema.Type.STRING))),\r
+            Schema.Field.of("tcaMessageType", Schema.of(Schema.Type.STRING))\r
+    );\r
+\r
+    @BeforeClass\r
+    public static void setupTest() throws Exception {\r
+\r
+        setupBatchArtifacts(DATAPIPELINE_ARTIFACT_ID, DataPipelineApp.class);\r
+\r
+\r
+        // Enable the below code if you want to run the test in Intelli IDEA editor\r
+        // addPluginArtifact(NamespaceId.DEFAULT.artifact("spark-plugins", "1.0.0"), DATAPIPELINE_ARTIFACT_ID,\r
+        //        SimpleTCAPlugin.class, SimpleTCAPluginConfig.class);\r
+\r
+        // Enable the below code if you want to run the test via command line\r
+        ArtifactId dcaeAnalyticsCdapPluginsArtifact = NamespaceId.DEFAULT.artifact(\r
+                CDAP_PLUGIN_ARTIFACT_NAME, CDAP_PLUGIN_VERSION);\r
+\r
+        addPluginArtifact(dcaeAnalyticsCdapPluginsArtifact, DATAPIPELINE_ARTIFACT_ID,\r
+                ImmutableSet.of(getSimpleTCAPluginClass()), SimpleTCAPlugin.class, SimpleTCAPluginConfig.class,\r
+                CDAPAppSettingsValidator.class, DCAEValidator.class);\r
+    }\r
+\r
+    private static PluginClass getSimpleTCAPluginClass() {\r
+        final HashMap<String, PluginPropertyField> properties = new HashMap<>();\r
+        properties.put("vesMessageFieldName", new PluginPropertyField("vesMessageFieldName", "",\r
+                "string", false, false));\r
+        properties.put("referenceName", new PluginPropertyField("referenceName", "",\r
+                "string", false, false));\r
+        properties.put("policyJson", new PluginPropertyField("policyJson", "", "string", false, false));\r
+        properties.put("alertFieldName", new PluginPropertyField("alertFieldName", "", "string", false, false));\r
+        properties.put("messageTypeFieldName", new PluginPropertyField(\r
+                "messageTypeFieldName", "", "string", false, false));\r
+        properties.put("enableAlertCEFFormat", new PluginPropertyField(\r
+                "enableAlertCEFFormat", "", "string", false, false));\r
+        properties.put("schema", new PluginPropertyField(\r
+                "schema", "", "string", false, false));\r
+\r
+        return new PluginClass("sparkcompute", "SimpleTCAPlugin", "", SimpleTCAPlugin.class.getName(),\r
+                "pluginConfig", properties);\r
+    }\r
+\r
+\r
+    @AfterClass\r
+    public static void cleanup() {\r
+    }\r
+\r
+    @Test\r
+    @SuppressWarnings("deprecation")\r
+    public void testTransform() throws Exception {\r
+\r
+        LOG.info("Starting Test Transform");\r
+\r
+        final String policyString = getFileContentAsString("/data/json/policy/tca_policy.json");\r
+        final String cefMessage = getFileContentAsString("/data/json/cef/cef_message.json");\r
+\r
+        final Map<String, String> tcaProperties = new ImmutableMap.Builder<String, String>()\r
+                .put("vesMessageFieldName", "message")\r
+                .put("referenceName", "SimpleTcaPlugin")\r
+                .put("policyJson", policyString)\r
+                .put("alertFieldName", "alert")\r
+                .put("messageTypeFieldName", "tcaMessageType")\r
+                .put("enableAlertCEFFormat", "true")\r
+                .put("schema", outputSchema.toString())\r
+                .build();\r
+\r
+        final ETLPlugin mockSourcePlugin = MockSource.getPlugin("messages", sourceSchema);\r
+        final ETLPlugin tcaPlugin =\r
+                new ETLPlugin("SimpleTCAPlugin", SparkCompute.PLUGIN_TYPE, tcaProperties, null);\r
+        final ETLPlugin mockSink = MockSink.getPlugin("tcaOutput");\r
+\r
+        final ETLBatchConfig etlBatchConfig = ETLBatchConfig.builder("* * * * *")\r
+                .addStage(new ETLStage("source", mockSourcePlugin))\r
+                .addStage(new ETLStage("simpleTCAPlugin", tcaPlugin))\r
+                .addStage(new ETLStage("sink", mockSink))\r
+                .addConnection("source", "simpleTCAPlugin")\r
+                .addConnection("simpleTCAPlugin", "sink")\r
+                .build();\r
+\r
+        AppRequest<ETLBatchConfig> appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, etlBatchConfig);\r
+        ApplicationId appId = NamespaceId.DEFAULT.app("TestSimpleTCAPlugin");\r
+        ApplicationManager appManager = deployApplication(appId.toId(), appRequest);\r
+\r
+        List<StructuredRecord> sourceMessages = new ArrayList<>();\r
+        StructuredRecord.Builder builder = StructuredRecord.builder(sourceSchema);\r
+        builder.set("message", cefMessage);\r
+        sourceMessages.add(builder.build());\r
+\r
+        // write records to source\r
+        DataSetManager<Table> inputManager = getDataset(NamespaceId.DEFAULT.dataset("messages"));\r
+        MockSource.writeInput(inputManager, sourceMessages);\r
+\r
+        // manually trigger the pipeline\r
+        WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME);\r
+        workflowManager.start();\r
+        workflowManager.waitForFinish(5, TimeUnit.MINUTES);\r
+\r
+        final DataSetManager<Table> outputManager = getDataset("tcaOutput");\r
+\r
+        Tasks.waitFor(\r
+                TCACalculatorMessageType.COMPLIANT.name(),\r
+                new Callable<String>() {\r
+                    @Override\r
+                    public String call() throws Exception {\r
+                        outputManager.flush();\r
+                        List<String> tcaOutputMessageType = new LinkedList<>();\r
+                        for (StructuredRecord outputRecord : MockSink.readOutput(outputManager)) {\r
+                            tcaOutputMessageType.add(outputRecord.get("tcaMessageType").toString());\r
+                            final List<Schema.Field> fields = outputRecord.getSchema().getFields();\r
+                            LOG.debug("====>> Printing output Structured Record Contents: {}", outputRecord);\r
+                            for (Schema.Field field : fields) {\r
+                                LOG.debug("Field Name: {} - Field Type: {}  ---> Field Value: {}",\r
+                                        field.getName(), field.getSchema().getType(),\r
+                                        outputRecord.get(field.getName()));\r
+                            }\r
+\r
+                        }\r
+                        return tcaOutputMessageType.get(0);\r
+                    }\r
+                },\r
+                4,\r
+                TimeUnit.MINUTES);\r
+\r
+    }\r
+\r
+    private static String getFileContentAsString(final String fileLocation) throws Exception {\r
+        final URI tcaPolicyURI =\r
+                SimpleTCAPluginCDAPIT.class.getResource(fileLocation).toURI();\r
+        List<String> lines = Files.readAllLines(Paths.get(tcaPolicyURI), Charset.defaultCharset());\r
+        return Joiner.on("").join(lines);\r
+    }\r
+\r
+}\r