2 * ===============================LICENSE_START======================================
\r
4 * ================================================================================
\r
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * ================================================================================
\r
7 * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * you may not use this file except in compliance with the License.
\r
9 * You may obtain a copy of the License at
\r
11 * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * Unless required by applicable law or agreed to in writing, software
\r
14 * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * See the License for the specific language governing permissions and
\r
17 * limitations under the License.
\r
18 * ============================LICENSE_END===========================================
\r
21 package org.openecomp.dcae.apod.analytics.cdap.plugins.it;
\r
23 import co.cask.cdap.api.data.format.StructuredRecord;
\r
24 import co.cask.cdap.api.data.schema.Schema;
\r
25 import co.cask.cdap.api.dataset.table.Table;
\r
26 import co.cask.cdap.api.plugin.PluginClass;
\r
27 import co.cask.cdap.api.plugin.PluginPropertyField;
\r
28 import co.cask.cdap.common.utils.Tasks;
\r
29 import co.cask.cdap.datapipeline.DataPipelineApp;
\r
30 import co.cask.cdap.datapipeline.SmartWorkflow;
\r
31 import co.cask.cdap.etl.api.batch.SparkCompute;
\r
32 import co.cask.cdap.etl.mock.batch.MockSink;
\r
33 import co.cask.cdap.etl.mock.batch.MockSource;
\r
34 import co.cask.cdap.etl.mock.test.HydratorTestBase;
\r
35 import co.cask.cdap.etl.proto.v2.ETLBatchConfig;
\r
36 import co.cask.cdap.etl.proto.v2.ETLPlugin;
\r
37 import co.cask.cdap.etl.proto.v2.ETLStage;
\r
38 import co.cask.cdap.proto.artifact.AppRequest;
\r
39 import co.cask.cdap.proto.artifact.ArtifactSummary;
\r
40 import co.cask.cdap.proto.id.ApplicationId;
\r
41 import co.cask.cdap.proto.id.ArtifactId;
\r
42 import co.cask.cdap.proto.id.NamespaceId;
\r
43 import co.cask.cdap.test.ApplicationManager;
\r
44 import co.cask.cdap.test.DataSetManager;
\r
45 import co.cask.cdap.test.WorkflowManager;
\r
46 import com.google.common.base.Joiner;
\r
47 import com.google.common.collect.ImmutableMap;
\r
48 import com.google.common.collect.ImmutableSet;
\r
49 import com.google.common.collect.Sets;
\r
50 import org.junit.AfterClass;
\r
51 import org.junit.BeforeClass;
\r
52 import org.junit.Test;
\r
53 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType;
\r
54 import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator;
\r
55 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.tca.SimpleTCAPluginConfig;
\r
56 import org.openecomp.dcae.apod.analytics.cdap.plugins.sparkcompute.tca.SimpleTCAPlugin;
\r
57 import org.openecomp.dcae.apod.analytics.common.validation.DCAEValidator;
\r
58 import org.slf4j.Logger;
\r
59 import org.slf4j.LoggerFactory;
\r
61 import java.net.URI;
\r
62 import java.nio.charset.Charset;
\r
63 import java.nio.file.Files;
\r
64 import java.nio.file.Paths;
\r
65 import java.util.ArrayList;
\r
66 import java.util.Collections;
\r
67 import java.util.HashMap;
\r
68 import java.util.LinkedList;
\r
69 import java.util.List;
\r
70 import java.util.Map;
\r
71 import java.util.concurrent.Callable;
\r
72 import java.util.concurrent.TimeUnit;
\r
75 * Integration Test which used CDAP Hydrator Test Base to Test Simple TCA Plugin
\r
77 * @author Rajiv Singla . Creation Date: 2/17/2017.
\r
79 public class SimpleTCAPluginCDAPIT extends HydratorTestBase {
\r
81 private static final Logger LOG = LoggerFactory.getLogger(SimpleTCAPluginCDAPIT.class);
\r
83 private static final String CDAP_PLUGIN_VERSION = "3.0-SNAPSHOT";
\r
84 private static final String CDAP_PLUGIN_ARTIFACT_NAME = "dcae-analytics-cdap-plugins";
\r
86 protected static final ArtifactId DATAPIPELINE_ARTIFACT_ID = NamespaceId.DEFAULT.artifact("data-pipeline",
\r
88 protected static final ArtifactSummary DATAPIPELINE_ARTIFACT = new ArtifactSummary("data-pipeline", "4.0.0");
\r
90 private static Schema sourceSchema = Schema.recordOf("CEFMessageSourceSchema",
\r
91 Schema.Field.of("message", Schema.of(Schema.Type.STRING))
\r
94 final Schema outputSchema = Schema.recordOf(
\r
96 Schema.Field.of("message", Schema.of(Schema.Type.STRING)),
\r
97 Schema.Field.of("alert", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
\r
98 Schema.Field.of("tcaMessageType", Schema.of(Schema.Type.STRING))
\r
102 public static void setupTest() throws Exception {
\r
104 setupBatchArtifacts(DATAPIPELINE_ARTIFACT_ID, DataPipelineApp.class);
\r
107 // Enable the below code if you want to run the test in Intelli IDEA editor
\r
108 // addPluginArtifact(NamespaceId.DEFAULT.artifact("spark-plugins", "1.0.0"), DATAPIPELINE_ARTIFACT_ID,
\r
109 // SimpleTCAPlugin.class, SimpleTCAPluginConfig.class);
\r
111 // Enable the below code if you want to run the test via command line
\r
112 ArtifactId dcaeAnalyticsCdapPluginsArtifact = NamespaceId.DEFAULT.artifact(
\r
113 CDAP_PLUGIN_ARTIFACT_NAME, CDAP_PLUGIN_VERSION);
\r
115 addPluginArtifact(dcaeAnalyticsCdapPluginsArtifact, DATAPIPELINE_ARTIFACT_ID,
\r
116 ImmutableSet.of(getSimpleTCAPluginClass()), SimpleTCAPlugin.class, SimpleTCAPluginConfig.class,
\r
117 CDAPAppSettingsValidator.class, DCAEValidator.class);
\r
120 private static PluginClass getSimpleTCAPluginClass() {
\r
121 final HashMap<String, PluginPropertyField> properties = new HashMap<>();
\r
122 properties.put("vesMessageFieldName", new PluginPropertyField("vesMessageFieldName", "",
\r
123 "string", false, false));
\r
124 properties.put("referenceName", new PluginPropertyField("referenceName", "",
\r
125 "string", false, false));
\r
126 properties.put("policyJson", new PluginPropertyField("policyJson", "", "string", false, false));
\r
127 properties.put("alertFieldName", new PluginPropertyField("alertFieldName", "", "string", false, false));
\r
128 properties.put("messageTypeFieldName", new PluginPropertyField(
\r
129 "messageTypeFieldName", "", "string", false, false));
\r
130 properties.put("enableAlertCEFFormat", new PluginPropertyField(
\r
131 "enableAlertCEFFormat", "", "string", false, false));
\r
132 properties.put("schema", new PluginPropertyField(
\r
133 "schema", "", "string", false, false));
\r
135 return new PluginClass("sparkcompute", "SimpleTCAPlugin", "", SimpleTCAPlugin.class.getName(),
\r
136 "pluginConfig", properties);
\r
141 public static void cleanup() {
\r
145 @SuppressWarnings("deprecation")
\r
146 public void testTransform() throws Exception {
\r
148 LOG.info("Starting Test Transform");
\r
150 final String policyString = getFileContentAsString("/data/json/policy/tca_policy.json");
\r
151 final String cefMessage = getFileContentAsString("/data/json/cef/cef_message.json");
\r
153 final Map<String, String> tcaProperties = new ImmutableMap.Builder<String, String>()
\r
154 .put("vesMessageFieldName", "message")
\r
155 .put("referenceName", "SimpleTcaPlugin")
\r
156 .put("policyJson", policyString)
\r
157 .put("alertFieldName", "alert")
\r
158 .put("messageTypeFieldName", "tcaMessageType")
\r
159 .put("enableAlertCEFFormat", "true")
\r
160 .put("schema", outputSchema.toString())
\r
163 final ETLPlugin mockSourcePlugin = MockSource.getPlugin("messages", sourceSchema);
\r
164 final ETLPlugin tcaPlugin =
\r
165 new ETLPlugin("SimpleTCAPlugin", SparkCompute.PLUGIN_TYPE, tcaProperties, null);
\r
166 final ETLPlugin mockSink = MockSink.getPlugin("tcaOutput");
\r
168 final ETLBatchConfig etlBatchConfig = ETLBatchConfig.builder("* * * * *")
\r
169 .addStage(new ETLStage("source", mockSourcePlugin))
\r
170 .addStage(new ETLStage("simpleTCAPlugin", tcaPlugin))
\r
171 .addStage(new ETLStage("sink", mockSink))
\r
172 .addConnection("source", "simpleTCAPlugin")
\r
173 .addConnection("simpleTCAPlugin", "sink")
\r
176 AppRequest<ETLBatchConfig> appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, etlBatchConfig);
\r
177 ApplicationId appId = NamespaceId.DEFAULT.app("TestSimpleTCAPlugin");
\r
178 ApplicationManager appManager = deployApplication(appId.toId(), appRequest);
\r
180 List<StructuredRecord> sourceMessages = new ArrayList<>();
\r
181 StructuredRecord.Builder builder = StructuredRecord.builder(sourceSchema);
\r
182 builder.set("message", cefMessage);
\r
183 sourceMessages.add(builder.build());
\r
185 // write records to source
\r
186 DataSetManager<Table> inputManager = getDataset(NamespaceId.DEFAULT.dataset("messages"));
\r
187 MockSource.writeInput(inputManager, sourceMessages);
\r
189 // manually trigger the pipeline
\r
190 WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME);
\r
191 workflowManager.start();
\r
192 workflowManager.waitForFinish(5, TimeUnit.MINUTES);
\r
194 final DataSetManager<Table> outputManager = getDataset("tcaOutput");
\r
197 TCACalculatorMessageType.COMPLIANT.name(),
\r
198 new Callable<String>() {
\r
200 public String call() throws Exception {
\r
201 outputManager.flush();
\r
202 List<String> tcaOutputMessageType = new LinkedList<>();
\r
203 for (StructuredRecord outputRecord : MockSink.readOutput(outputManager)) {
\r
204 tcaOutputMessageType.add(outputRecord.get("tcaMessageType").toString());
\r
205 final List<Schema.Field> fields = outputRecord.getSchema().getFields();
\r
206 LOG.debug("====>> Printing output Structured Record Contents: {}", outputRecord);
\r
207 for (Schema.Field field : fields) {
\r
208 LOG.debug("Field Name: {} - Field Type: {} ---> Field Value: {}",
\r
209 field.getName(), field.getSchema().getType(),
\r
210 outputRecord.get(field.getName()));
\r
214 return tcaOutputMessageType.get(0);
\r
222 private static String getFileContentAsString(final String fileLocation) throws Exception {
\r
223 final URI tcaPolicyURI =
\r
224 SimpleTCAPluginCDAPIT.class.getResource(fileLocation).toURI();
\r
225 List<String> lines = Files.readAllLines(Paths.get(tcaPolicyURI), Charset.defaultCharset());
\r
226 return Joiner.on("").join(lines);
\r