Initial TCA commit into DCAEGEN2
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-plugins / src / test / java / org / openecomp / dcae / apod / analytics / cdap / plugins / it / SimpleTCAPluginCDAPIT.java
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.openecomp.dcae.apod.analytics.cdap.plugins.it;
22
23 import co.cask.cdap.api.data.format.StructuredRecord;
24 import co.cask.cdap.api.data.schema.Schema;
25 import co.cask.cdap.api.dataset.table.Table;
26 import co.cask.cdap.api.plugin.PluginClass;
27 import co.cask.cdap.api.plugin.PluginPropertyField;
28 import co.cask.cdap.common.utils.Tasks;
29 import co.cask.cdap.datapipeline.DataPipelineApp;
30 import co.cask.cdap.datapipeline.SmartWorkflow;
31 import co.cask.cdap.etl.api.batch.SparkCompute;
32 import co.cask.cdap.etl.mock.batch.MockSink;
33 import co.cask.cdap.etl.mock.batch.MockSource;
34 import co.cask.cdap.etl.mock.test.HydratorTestBase;
35 import co.cask.cdap.etl.proto.v2.ETLBatchConfig;
36 import co.cask.cdap.etl.proto.v2.ETLPlugin;
37 import co.cask.cdap.etl.proto.v2.ETLStage;
38 import co.cask.cdap.proto.artifact.AppRequest;
39 import co.cask.cdap.proto.artifact.ArtifactSummary;
40 import co.cask.cdap.proto.id.ApplicationId;
41 import co.cask.cdap.proto.id.ArtifactId;
42 import co.cask.cdap.proto.id.NamespaceId;
43 import co.cask.cdap.test.ApplicationManager;
44 import co.cask.cdap.test.DataSetManager;
45 import co.cask.cdap.test.WorkflowManager;
46 import com.google.common.base.Joiner;
47 import com.google.common.collect.ImmutableMap;
48 import com.google.common.collect.ImmutableSet;
49 import com.google.common.collect.Sets;
50 import org.junit.AfterClass;
51 import org.junit.BeforeClass;
52 import org.junit.Test;
53 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType;
54 import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator;
55 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.tca.SimpleTCAPluginConfig;
56 import org.openecomp.dcae.apod.analytics.cdap.plugins.sparkcompute.tca.SimpleTCAPlugin;
57 import org.openecomp.dcae.apod.analytics.common.validation.DCAEValidator;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60
61 import java.net.URI;
62 import java.nio.charset.Charset;
63 import java.nio.file.Files;
64 import java.nio.file.Paths;
65 import java.util.ArrayList;
66 import java.util.Collections;
67 import java.util.HashMap;
68 import java.util.LinkedList;
69 import java.util.List;
70 import java.util.Map;
71 import java.util.concurrent.Callable;
72 import java.util.concurrent.TimeUnit;
73
74 /**
75  * Integration Test which used CDAP Hydrator Test Base to Test Simple TCA Plugin
76  *
77  * @author Rajiv Singla . Creation Date: 2/17/2017.
78  */
79 public class SimpleTCAPluginCDAPIT extends HydratorTestBase {
80
81     private static final Logger LOG = LoggerFactory.getLogger(SimpleTCAPluginCDAPIT.class);
82
83     private static final String CDAP_PLUGIN_VERSION = "3.0-SNAPSHOT";
84     private static final String CDAP_PLUGIN_ARTIFACT_NAME = "dcae-analytics-cdap-plugins";
85
86     protected static final ArtifactId DATAPIPELINE_ARTIFACT_ID = NamespaceId.DEFAULT.artifact("data-pipeline",
87             "4.0.0");
88     protected static final ArtifactSummary DATAPIPELINE_ARTIFACT = new ArtifactSummary("data-pipeline", "4.0.0");
89
90     private static Schema sourceSchema = Schema.recordOf("CEFMessageSourceSchema",
91             Schema.Field.of("message", Schema.of(Schema.Type.STRING))
92     );
93
94     final Schema outputSchema = Schema.recordOf(
95             "outputSchema",
96             Schema.Field.of("message", Schema.of(Schema.Type.STRING)),
97             Schema.Field.of("alert", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
98             Schema.Field.of("tcaMessageType", Schema.of(Schema.Type.STRING))
99     );
100
101     @BeforeClass
102     public static void setupTest() throws Exception {
103
104         setupBatchArtifacts(DATAPIPELINE_ARTIFACT_ID, DataPipelineApp.class);
105
106
107         // Enable the below code if you want to run the test in Intelli IDEA editor
108         // addPluginArtifact(NamespaceId.DEFAULT.artifact("spark-plugins", "1.0.0"), DATAPIPELINE_ARTIFACT_ID,
109         //        SimpleTCAPlugin.class, SimpleTCAPluginConfig.class);
110
111         // Enable the below code if you want to run the test via command line
112         ArtifactId dcaeAnalyticsCdapPluginsArtifact = NamespaceId.DEFAULT.artifact(
113                 CDAP_PLUGIN_ARTIFACT_NAME, CDAP_PLUGIN_VERSION);
114
115         addPluginArtifact(dcaeAnalyticsCdapPluginsArtifact, DATAPIPELINE_ARTIFACT_ID,
116                 ImmutableSet.of(getSimpleTCAPluginClass()), SimpleTCAPlugin.class, SimpleTCAPluginConfig.class,
117                 CDAPAppSettingsValidator.class, DCAEValidator.class);
118     }
119
120     private static PluginClass getSimpleTCAPluginClass() {
121         final HashMap<String, PluginPropertyField> properties = new HashMap<>();
122         properties.put("vesMessageFieldName", new PluginPropertyField("vesMessageFieldName", "",
123                 "string", false, false));
124         properties.put("referenceName", new PluginPropertyField("referenceName", "",
125                 "string", false, false));
126         properties.put("policyJson", new PluginPropertyField("policyJson", "", "string", false, false));
127         properties.put("alertFieldName", new PluginPropertyField("alertFieldName", "", "string", false, false));
128         properties.put("messageTypeFieldName", new PluginPropertyField(
129                 "messageTypeFieldName", "", "string", false, false));
130         properties.put("enableAlertCEFFormat", new PluginPropertyField(
131                 "enableAlertCEFFormat", "", "string", false, false));
132         properties.put("schema", new PluginPropertyField(
133                 "schema", "", "string", false, false));
134
135         return new PluginClass("sparkcompute", "SimpleTCAPlugin", "", SimpleTCAPlugin.class.getName(),
136                 "pluginConfig", properties);
137     }
138
139
140     @AfterClass
141     public static void cleanup() {
142     }
143
144     @Test
145     public void testTransform() throws Exception {
146
147         LOG.info("Starting Test Transform");
148
149         final String policyString = getFileContentAsString("/data/json/policy/tca_policy.json");
150         final String cefMessage = getFileContentAsString("/data/json/cef/cef_message.json");
151
152         final Map<String, String> tcaProperties = new ImmutableMap.Builder<String, String>()
153                 .put("vesMessageFieldName", "message")
154                 .put("referenceName", "SimpleTcaPlugin")
155                 .put("policyJson", policyString)
156                 .put("alertFieldName", "alert")
157                 .put("messageTypeFieldName", "tcaMessageType")
158                 .put("enableAlertCEFFormat", "true")
159                 .put("schema", outputSchema.toString())
160                 .build();
161
162         final ETLPlugin mockSourcePlugin = MockSource.getPlugin("messages", sourceSchema);
163         final ETLPlugin tcaPlugin =
164                 new ETLPlugin("SimpleTCAPlugin", SparkCompute.PLUGIN_TYPE, tcaProperties, null);
165         final ETLPlugin mockSink = MockSink.getPlugin("tcaOutput");
166
167         final ETLBatchConfig etlBatchConfig = ETLBatchConfig.builder("* * * * *")
168                 .addStage(new ETLStage("source", mockSourcePlugin))
169                 .addStage(new ETLStage("simpleTCAPlugin", tcaPlugin))
170                 .addStage(new ETLStage("sink", mockSink))
171                 .addConnection("source", "simpleTCAPlugin")
172                 .addConnection("simpleTCAPlugin", "sink")
173                 .build();
174
175         AppRequest<ETLBatchConfig> appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, etlBatchConfig);
176         ApplicationId appId = NamespaceId.DEFAULT.app("TestSimpleTCAPlugin");
177         ApplicationManager appManager = deployApplication(appId.toId(), appRequest);
178
179         List<StructuredRecord> sourceMessages = new ArrayList<>();
180         StructuredRecord.Builder builder = StructuredRecord.builder(sourceSchema);
181         builder.set("message", cefMessage);
182         sourceMessages.add(builder.build());
183
184         // write records to source
185         DataSetManager<Table> inputManager = getDataset(NamespaceId.DEFAULT.dataset("messages"));
186         MockSource.writeInput(inputManager, sourceMessages);
187
188         // manually trigger the pipeline
189         WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME);
190         workflowManager.start();
191         workflowManager.waitForFinish(5, TimeUnit.MINUTES);
192
193         final DataSetManager<Table> outputManager = getDataset("tcaOutput");
194
195         Tasks.waitFor(
196                 TCACalculatorMessageType.COMPLIANT.name(),
197                 new Callable<String>() {
198                     @Override
199                     public String call() throws Exception {
200                         outputManager.flush();
201                         List<String> tcaOutputMessageType = new LinkedList<>();
202                         for (StructuredRecord outputRecord : MockSink.readOutput(outputManager)) {
203                             tcaOutputMessageType.add(outputRecord.get("tcaMessageType").toString());
204                             final List<Schema.Field> fields = outputRecord.getSchema().getFields();
205                             LOG.debug("====>> Printing output Structured Record Contents: {}", outputRecord);
206                             for (Schema.Field field : fields) {
207                                 LOG.debug("Field Name: {} - Field Type: {}  ---> Field Value: {}",
208                                         field.getName(), field.getSchema().getType(),
209                                         outputRecord.get(field.getName()));
210                             }
211
212                         }
213                         return tcaOutputMessageType.get(0);
214                     }
215                 },
216                 4,
217                 TimeUnit.MINUTES);
218
219     }
220
221     private static final String getFileContentAsString(final String fileLocation) throws Exception {
222         final URI tcaPolicyURI =
223                 SimpleTCAPluginCDAPIT.class.getResource(fileLocation).toURI();
224         List<String> lines = Files.readAllLines(Paths.get(tcaPolicyURI), Charset.defaultCharset());
225         return Joiner.on("").join(lines);
226     }
227
228 }