2 * ===============================LICENSE_START======================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============================LICENSE_END===========================================
21 package org.openecomp.dcae.apod.analytics.cdap.plugins.it;
23 import co.cask.cdap.api.data.format.StructuredRecord;
24 import co.cask.cdap.api.data.schema.Schema;
25 import co.cask.cdap.api.dataset.table.Table;
26 import co.cask.cdap.api.plugin.PluginClass;
27 import co.cask.cdap.api.plugin.PluginPropertyField;
28 import co.cask.cdap.common.utils.Tasks;
29 import co.cask.cdap.datapipeline.DataPipelineApp;
30 import co.cask.cdap.datapipeline.SmartWorkflow;
31 import co.cask.cdap.etl.api.batch.SparkCompute;
32 import co.cask.cdap.etl.mock.batch.MockSink;
33 import co.cask.cdap.etl.mock.batch.MockSource;
34 import co.cask.cdap.etl.mock.test.HydratorTestBase;
35 import co.cask.cdap.etl.proto.v2.ETLBatchConfig;
36 import co.cask.cdap.etl.proto.v2.ETLPlugin;
37 import co.cask.cdap.etl.proto.v2.ETLStage;
38 import co.cask.cdap.proto.artifact.AppRequest;
39 import co.cask.cdap.proto.artifact.ArtifactSummary;
40 import co.cask.cdap.proto.id.ApplicationId;
41 import co.cask.cdap.proto.id.ArtifactId;
42 import co.cask.cdap.proto.id.NamespaceId;
43 import co.cask.cdap.test.ApplicationManager;
44 import co.cask.cdap.test.DataSetManager;
45 import co.cask.cdap.test.WorkflowManager;
46 import com.google.common.base.Joiner;
47 import com.google.common.collect.ImmutableMap;
48 import com.google.common.collect.ImmutableSet;
49 import com.google.common.collect.Sets;
50 import org.junit.AfterClass;
51 import org.junit.BeforeClass;
52 import org.junit.Test;
53 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType;
54 import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator;
55 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.tca.SimpleTCAPluginConfig;
56 import org.openecomp.dcae.apod.analytics.cdap.plugins.sparkcompute.tca.SimpleTCAPlugin;
57 import org.openecomp.dcae.apod.analytics.common.validation.DCAEValidator;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
62 import java.nio.charset.Charset;
63 import java.nio.file.Files;
64 import java.nio.file.Paths;
65 import java.util.ArrayList;
66 import java.util.Collections;
67 import java.util.HashMap;
68 import java.util.LinkedList;
69 import java.util.List;
71 import java.util.concurrent.Callable;
72 import java.util.concurrent.TimeUnit;
75 * Integration Test which used CDAP Hydrator Test Base to Test Simple TCA Plugin
77 * @author Rajiv Singla . Creation Date: 2/17/2017.
79 public class SimpleTCAPluginCDAPIT extends HydratorTestBase {
81 private static final Logger LOG = LoggerFactory.getLogger(SimpleTCAPluginCDAPIT.class);
83 private static final String CDAP_PLUGIN_VERSION = "3.0-SNAPSHOT";
84 private static final String CDAP_PLUGIN_ARTIFACT_NAME = "dcae-analytics-cdap-plugins";
86 protected static final ArtifactId DATAPIPELINE_ARTIFACT_ID = NamespaceId.DEFAULT.artifact("data-pipeline",
88 protected static final ArtifactSummary DATAPIPELINE_ARTIFACT = new ArtifactSummary("data-pipeline", "4.0.0");
90 private static Schema sourceSchema = Schema.recordOf("CEFMessageSourceSchema",
91 Schema.Field.of("message", Schema.of(Schema.Type.STRING))
94 final Schema outputSchema = Schema.recordOf(
96 Schema.Field.of("message", Schema.of(Schema.Type.STRING)),
97 Schema.Field.of("alert", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
98 Schema.Field.of("tcaMessageType", Schema.of(Schema.Type.STRING))
102 public static void setupTest() throws Exception {
104 setupBatchArtifacts(DATAPIPELINE_ARTIFACT_ID, DataPipelineApp.class);
107 // Enable the below code if you want to run the test in Intelli IDEA editor
108 // addPluginArtifact(NamespaceId.DEFAULT.artifact("spark-plugins", "1.0.0"), DATAPIPELINE_ARTIFACT_ID,
109 // SimpleTCAPlugin.class, SimpleTCAPluginConfig.class);
111 // Enable the below code if you want to run the test via command line
112 ArtifactId dcaeAnalyticsCdapPluginsArtifact = NamespaceId.DEFAULT.artifact(
113 CDAP_PLUGIN_ARTIFACT_NAME, CDAP_PLUGIN_VERSION);
115 addPluginArtifact(dcaeAnalyticsCdapPluginsArtifact, DATAPIPELINE_ARTIFACT_ID,
116 ImmutableSet.of(getSimpleTCAPluginClass()), SimpleTCAPlugin.class, SimpleTCAPluginConfig.class,
117 CDAPAppSettingsValidator.class, DCAEValidator.class);
120 private static PluginClass getSimpleTCAPluginClass() {
121 final HashMap<String, PluginPropertyField> properties = new HashMap<>();
122 properties.put("vesMessageFieldName", new PluginPropertyField("vesMessageFieldName", "",
123 "string", false, false));
124 properties.put("referenceName", new PluginPropertyField("referenceName", "",
125 "string", false, false));
126 properties.put("policyJson", new PluginPropertyField("policyJson", "", "string", false, false));
127 properties.put("alertFieldName", new PluginPropertyField("alertFieldName", "", "string", false, false));
128 properties.put("messageTypeFieldName", new PluginPropertyField(
129 "messageTypeFieldName", "", "string", false, false));
130 properties.put("enableAlertCEFFormat", new PluginPropertyField(
131 "enableAlertCEFFormat", "", "string", false, false));
132 properties.put("schema", new PluginPropertyField(
133 "schema", "", "string", false, false));
135 return new PluginClass("sparkcompute", "SimpleTCAPlugin", "", SimpleTCAPlugin.class.getName(),
136 "pluginConfig", properties);
141 public static void cleanup() {
145 public void testTransform() throws Exception {
147 LOG.info("Starting Test Transform");
149 final String policyString = getFileContentAsString("/data/json/policy/tca_policy.json");
150 final String cefMessage = getFileContentAsString("/data/json/cef/cef_message.json");
152 final Map<String, String> tcaProperties = new ImmutableMap.Builder<String, String>()
153 .put("vesMessageFieldName", "message")
154 .put("referenceName", "SimpleTcaPlugin")
155 .put("policyJson", policyString)
156 .put("alertFieldName", "alert")
157 .put("messageTypeFieldName", "tcaMessageType")
158 .put("enableAlertCEFFormat", "true")
159 .put("schema", outputSchema.toString())
162 final ETLPlugin mockSourcePlugin = MockSource.getPlugin("messages", sourceSchema);
163 final ETLPlugin tcaPlugin =
164 new ETLPlugin("SimpleTCAPlugin", SparkCompute.PLUGIN_TYPE, tcaProperties, null);
165 final ETLPlugin mockSink = MockSink.getPlugin("tcaOutput");
167 final ETLBatchConfig etlBatchConfig = ETLBatchConfig.builder("* * * * *")
168 .addStage(new ETLStage("source", mockSourcePlugin))
169 .addStage(new ETLStage("simpleTCAPlugin", tcaPlugin))
170 .addStage(new ETLStage("sink", mockSink))
171 .addConnection("source", "simpleTCAPlugin")
172 .addConnection("simpleTCAPlugin", "sink")
175 AppRequest<ETLBatchConfig> appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, etlBatchConfig);
176 ApplicationId appId = NamespaceId.DEFAULT.app("TestSimpleTCAPlugin");
177 ApplicationManager appManager = deployApplication(appId.toId(), appRequest);
179 List<StructuredRecord> sourceMessages = new ArrayList<>();
180 StructuredRecord.Builder builder = StructuredRecord.builder(sourceSchema);
181 builder.set("message", cefMessage);
182 sourceMessages.add(builder.build());
184 // write records to source
185 DataSetManager<Table> inputManager = getDataset(NamespaceId.DEFAULT.dataset("messages"));
186 MockSource.writeInput(inputManager, sourceMessages);
188 // manually trigger the pipeline
189 WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME);
190 workflowManager.start();
191 workflowManager.waitForFinish(5, TimeUnit.MINUTES);
193 final DataSetManager<Table> outputManager = getDataset("tcaOutput");
196 TCACalculatorMessageType.COMPLIANT.name(),
197 new Callable<String>() {
199 public String call() throws Exception {
200 outputManager.flush();
201 List<String> tcaOutputMessageType = new LinkedList<>();
202 for (StructuredRecord outputRecord : MockSink.readOutput(outputManager)) {
203 tcaOutputMessageType.add(outputRecord.get("tcaMessageType").toString());
204 final List<Schema.Field> fields = outputRecord.getSchema().getFields();
205 LOG.debug("====>> Printing output Structured Record Contents: {}", outputRecord);
206 for (Schema.Field field : fields) {
207 LOG.debug("Field Name: {} - Field Type: {} ---> Field Value: {}",
208 field.getName(), field.getSchema().getType(),
209 outputRecord.get(field.getName()));
213 return tcaOutputMessageType.get(0);
221 private static final String getFileContentAsString(final String fileLocation) throws Exception {
222 final URI tcaPolicyURI =
223 SimpleTCAPluginCDAPIT.class.getResource(fileLocation).toURI();
224 List<String> lines = Files.readAllLines(Paths.get(tcaPolicyURI), Charset.defaultCharset());
225 return Joiner.on("").join(lines);