Sonar Critical Fix
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-plugins / src / test / java / org / openecomp / dcae / apod / analytics / cdap / plugins / it / SimpleTCAPluginCDAPIT.java
1 /*\r
2  * ===============================LICENSE_START======================================\r
3  *  dcae-analytics\r
4  * ================================================================================\r
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  *  Licensed under the Apache License, Version 2.0 (the "License");\r
8  *  you may not use this file except in compliance with the License.\r
9  *   You may obtain a copy of the License at\r
10  *\r
11  *          http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  *  Unless required by applicable law or agreed to in writing, software\r
14  *  distributed under the License is distributed on an "AS IS" BASIS,\r
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  *  See the License for the specific language governing permissions and\r
17  *  limitations under the License.\r
18  *  ============================LICENSE_END===========================================\r
19  */\r
20 \r
21 package org.openecomp.dcae.apod.analytics.cdap.plugins.it;\r
22 \r
23 import co.cask.cdap.api.data.format.StructuredRecord;\r
24 import co.cask.cdap.api.data.schema.Schema;\r
25 import co.cask.cdap.api.dataset.table.Table;\r
26 import co.cask.cdap.api.plugin.PluginClass;\r
27 import co.cask.cdap.api.plugin.PluginPropertyField;\r
28 import co.cask.cdap.common.utils.Tasks;\r
29 import co.cask.cdap.datapipeline.DataPipelineApp;\r
30 import co.cask.cdap.datapipeline.SmartWorkflow;\r
31 import co.cask.cdap.etl.api.batch.SparkCompute;\r
32 import co.cask.cdap.etl.mock.batch.MockSink;\r
33 import co.cask.cdap.etl.mock.batch.MockSource;\r
34 import co.cask.cdap.etl.mock.test.HydratorTestBase;\r
35 import co.cask.cdap.etl.proto.v2.ETLBatchConfig;\r
36 import co.cask.cdap.etl.proto.v2.ETLPlugin;\r
37 import co.cask.cdap.etl.proto.v2.ETLStage;\r
38 import co.cask.cdap.proto.artifact.AppRequest;\r
39 import co.cask.cdap.proto.artifact.ArtifactSummary;\r
40 import co.cask.cdap.proto.id.ApplicationId;\r
41 import co.cask.cdap.proto.id.ArtifactId;\r
42 import co.cask.cdap.proto.id.NamespaceId;\r
43 import co.cask.cdap.test.ApplicationManager;\r
44 import co.cask.cdap.test.DataSetManager;\r
45 import co.cask.cdap.test.WorkflowManager;\r
46 import com.google.common.base.Joiner;\r
47 import com.google.common.collect.ImmutableMap;\r
48 import com.google.common.collect.ImmutableSet;\r
49 import com.google.common.collect.Sets;\r
50 import org.junit.AfterClass;\r
51 import org.junit.BeforeClass;\r
52 import org.junit.Test;\r
53 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType;\r
54 import org.openecomp.dcae.apod.analytics.cdap.common.validation.CDAPAppSettingsValidator;\r
55 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.tca.SimpleTCAPluginConfig;\r
56 import org.openecomp.dcae.apod.analytics.cdap.plugins.sparkcompute.tca.SimpleTCAPlugin;\r
57 import org.openecomp.dcae.apod.analytics.common.validation.DCAEValidator;\r
58 import org.slf4j.Logger;\r
59 import org.slf4j.LoggerFactory;\r
60 \r
61 import java.net.URI;\r
62 import java.nio.charset.Charset;\r
63 import java.nio.file.Files;\r
64 import java.nio.file.Paths;\r
65 import java.util.ArrayList;\r
66 import java.util.Collections;\r
67 import java.util.HashMap;\r
68 import java.util.LinkedList;\r
69 import java.util.List;\r
70 import java.util.Map;\r
71 import java.util.concurrent.Callable;\r
72 import java.util.concurrent.TimeUnit;\r
73 \r
74 /**\r
75  * Integration Test which used CDAP Hydrator Test Base to Test Simple TCA Plugin\r
76  *\r
77  * @author Rajiv Singla . Creation Date: 2/17/2017.\r
78  */\r
79 public class SimpleTCAPluginCDAPIT extends HydratorTestBase {\r
80 \r
81     private static final Logger LOG = LoggerFactory.getLogger(SimpleTCAPluginCDAPIT.class);\r
82 \r
83     private static final String CDAP_PLUGIN_VERSION = "3.0-SNAPSHOT";\r
84     private static final String CDAP_PLUGIN_ARTIFACT_NAME = "dcae-analytics-cdap-plugins";\r
85 \r
86     protected static final ArtifactId DATAPIPELINE_ARTIFACT_ID = NamespaceId.DEFAULT.artifact("data-pipeline",\r
87             "4.0.0");\r
88     protected static final ArtifactSummary DATAPIPELINE_ARTIFACT = new ArtifactSummary("data-pipeline", "4.0.0");\r
89 \r
90     private static Schema sourceSchema = Schema.recordOf("CEFMessageSourceSchema",\r
91             Schema.Field.of("message", Schema.of(Schema.Type.STRING))\r
92     );\r
93 \r
94     final Schema outputSchema = Schema.recordOf(\r
95             "outputSchema",\r
96             Schema.Field.of("message", Schema.of(Schema.Type.STRING)),\r
97             Schema.Field.of("alert", Schema.nullableOf(Schema.of(Schema.Type.STRING))),\r
98             Schema.Field.of("tcaMessageType", Schema.of(Schema.Type.STRING))\r
99     );\r
100 \r
101     @BeforeClass\r
102     public static void setupTest() throws Exception {\r
103 \r
104         setupBatchArtifacts(DATAPIPELINE_ARTIFACT_ID, DataPipelineApp.class);\r
105 \r
106 \r
107         // Enable the below code if you want to run the test in Intelli IDEA editor\r
108         // addPluginArtifact(NamespaceId.DEFAULT.artifact("spark-plugins", "1.0.0"), DATAPIPELINE_ARTIFACT_ID,\r
109         //        SimpleTCAPlugin.class, SimpleTCAPluginConfig.class);\r
110 \r
111         // Enable the below code if you want to run the test via command line\r
112         ArtifactId dcaeAnalyticsCdapPluginsArtifact = NamespaceId.DEFAULT.artifact(\r
113                 CDAP_PLUGIN_ARTIFACT_NAME, CDAP_PLUGIN_VERSION);\r
114 \r
115         addPluginArtifact(dcaeAnalyticsCdapPluginsArtifact, DATAPIPELINE_ARTIFACT_ID,\r
116                 ImmutableSet.of(getSimpleTCAPluginClass()), SimpleTCAPlugin.class, SimpleTCAPluginConfig.class,\r
117                 CDAPAppSettingsValidator.class, DCAEValidator.class);\r
118     }\r
119 \r
120     private static PluginClass getSimpleTCAPluginClass() {\r
121         final HashMap<String, PluginPropertyField> properties = new HashMap<>();\r
122         properties.put("vesMessageFieldName", new PluginPropertyField("vesMessageFieldName", "",\r
123                 "string", false, false));\r
124         properties.put("referenceName", new PluginPropertyField("referenceName", "",\r
125                 "string", false, false));\r
126         properties.put("policyJson", new PluginPropertyField("policyJson", "", "string", false, false));\r
127         properties.put("alertFieldName", new PluginPropertyField("alertFieldName", "", "string", false, false));\r
128         properties.put("messageTypeFieldName", new PluginPropertyField(\r
129                 "messageTypeFieldName", "", "string", false, false));\r
130         properties.put("enableAlertCEFFormat", new PluginPropertyField(\r
131                 "enableAlertCEFFormat", "", "string", false, false));\r
132         properties.put("schema", new PluginPropertyField(\r
133                 "schema", "", "string", false, false));\r
134 \r
135         return new PluginClass("sparkcompute", "SimpleTCAPlugin", "", SimpleTCAPlugin.class.getName(),\r
136                 "pluginConfig", properties);\r
137     }\r
138 \r
139 \r
140     @AfterClass\r
141     public static void cleanup() {\r
142     }\r
143 \r
144     @Test\r
145     @SuppressWarnings("deprecation")\r
146     public void testTransform() throws Exception {\r
147 \r
148         LOG.info("Starting Test Transform");\r
149 \r
150         final String policyString = getFileContentAsString("/data/json/policy/tca_policy.json");\r
151         final String cefMessage = getFileContentAsString("/data/json/cef/cef_message.json");\r
152 \r
153         final Map<String, String> tcaProperties = new ImmutableMap.Builder<String, String>()\r
154                 .put("vesMessageFieldName", "message")\r
155                 .put("referenceName", "SimpleTcaPlugin")\r
156                 .put("policyJson", policyString)\r
157                 .put("alertFieldName", "alert")\r
158                 .put("messageTypeFieldName", "tcaMessageType")\r
159                 .put("enableAlertCEFFormat", "true")\r
160                 .put("schema", outputSchema.toString())\r
161                 .build();\r
162 \r
163         final ETLPlugin mockSourcePlugin = MockSource.getPlugin("messages", sourceSchema);\r
164         final ETLPlugin tcaPlugin =\r
165                 new ETLPlugin("SimpleTCAPlugin", SparkCompute.PLUGIN_TYPE, tcaProperties, null);\r
166         final ETLPlugin mockSink = MockSink.getPlugin("tcaOutput");\r
167 \r
168         final ETLBatchConfig etlBatchConfig = ETLBatchConfig.builder("* * * * *")\r
169                 .addStage(new ETLStage("source", mockSourcePlugin))\r
170                 .addStage(new ETLStage("simpleTCAPlugin", tcaPlugin))\r
171                 .addStage(new ETLStage("sink", mockSink))\r
172                 .addConnection("source", "simpleTCAPlugin")\r
173                 .addConnection("simpleTCAPlugin", "sink")\r
174                 .build();\r
175 \r
176         AppRequest<ETLBatchConfig> appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, etlBatchConfig);\r
177         ApplicationId appId = NamespaceId.DEFAULT.app("TestSimpleTCAPlugin");\r
178         ApplicationManager appManager = deployApplication(appId.toId(), appRequest);\r
179 \r
180         List<StructuredRecord> sourceMessages = new ArrayList<>();\r
181         StructuredRecord.Builder builder = StructuredRecord.builder(sourceSchema);\r
182         builder.set("message", cefMessage);\r
183         sourceMessages.add(builder.build());\r
184 \r
185         // write records to source\r
186         DataSetManager<Table> inputManager = getDataset(NamespaceId.DEFAULT.dataset("messages"));\r
187         MockSource.writeInput(inputManager, sourceMessages);\r
188 \r
189         // manually trigger the pipeline\r
190         WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME);\r
191         workflowManager.start();\r
192         workflowManager.waitForFinish(5, TimeUnit.MINUTES);\r
193 \r
194         final DataSetManager<Table> outputManager = getDataset("tcaOutput");\r
195 \r
196         Tasks.waitFor(\r
197                 TCACalculatorMessageType.COMPLIANT.name(),\r
198                 new Callable<String>() {\r
199                     @Override\r
200                     public String call() throws Exception {\r
201                         outputManager.flush();\r
202                         List<String> tcaOutputMessageType = new LinkedList<>();\r
203                         for (StructuredRecord outputRecord : MockSink.readOutput(outputManager)) {\r
204                             tcaOutputMessageType.add(outputRecord.get("tcaMessageType").toString());\r
205                             final List<Schema.Field> fields = outputRecord.getSchema().getFields();\r
206                             LOG.debug("====>> Printing output Structured Record Contents: {}", outputRecord);\r
207                             for (Schema.Field field : fields) {\r
208                                 LOG.debug("Field Name: {} - Field Type: {}  ---> Field Value: {}",\r
209                                         field.getName(), field.getSchema().getType(),\r
210                                         outputRecord.get(field.getName()));\r
211                             }\r
212 \r
213                         }\r
214                         return tcaOutputMessageType.get(0);\r
215                     }\r
216                 },\r
217                 4,\r
218                 TimeUnit.MINUTES);\r
219 \r
220     }\r
221 \r
222     private static String getFileContentAsString(final String fileLocation) throws Exception {\r
223         final URI tcaPolicyURI =\r
224                 SimpleTCAPluginCDAPIT.class.getResource(fileLocation).toURI();\r
225         List<String> lines = Files.readAllLines(Paths.get(tcaPolicyURI), Charset.defaultCharset());\r
226         return Joiner.on("").join(lines);\r
227     }\r
228 \r
229 }\r