Sonar Critical Fix
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-plugins / src / test / java / org / openecomp / dcae / apod / analytics / cdap / plugins / sparkcompute / tca / SimpleTCAPluginTest.java
1 /*\r
2  * ===============================LICENSE_START======================================\r
3  *  dcae-analytics\r
4  * ================================================================================\r
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  *  Licensed under the Apache License, Version 2.0 (the "License");\r
8  *  you may not use this file except in compliance with the License.\r
9  *   You may obtain a copy of the License at\r
10  *\r
11  *          http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  *  Unless required by applicable law or agreed to in writing, software\r
14  *  distributed under the License is distributed on an "AS IS" BASIS,\r
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  *  See the License for the specific language governing permissions and\r
17  *  limitations under the License.\r
18  *  ============================LICENSE_END===========================================\r
19  */\r
20 \r
21 package org.openecomp.dcae.apod.analytics.cdap.plugins.sparkcompute.tca;\r
22 \r
23 import co.cask.cdap.api.data.format.StructuredRecord;\r
24 import co.cask.cdap.api.data.schema.Schema;\r
25 import co.cask.cdap.etl.api.PipelineConfigurer;\r
26 import co.cask.cdap.etl.api.StageConfigurer;\r
27 import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;\r
28 import org.apache.spark.api.java.JavaRDD;\r
29 import org.apache.spark.api.java.JavaSparkContext;\r
30 import org.junit.Before;\r
31 import org.junit.Test;\r
32 import org.mockito.Mockito;\r
33 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType;\r
34 import org.openecomp.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest;\r
35 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.tca.TestSimpleTCAPluginConfig;\r
36 \r
37 import java.util.LinkedList;\r
38 import java.util.List;\r
39 \r
40 import static org.hamcrest.CoreMatchers.is;\r
41 import static org.junit.Assert.assertNotNull;\r
42 import static org.junit.Assert.assertThat;\r
43 import static org.junit.Assert.assertTrue;\r
44 import static org.mockito.Mockito.mock;\r
45 import static org.mockito.Mockito.times;\r
46 import static org.mockito.Mockito.verify;\r
47 import static org.mockito.Mockito.when;\r
48 \r
49 /**\r
50  * @author Rajiv Singla . Creation Date: 2/17/2017.\r
51  */\r
52 public class SimpleTCAPluginTest extends BaseAnalyticsCDAPPluginsUnitTest {\r
53 \r
54     private SimpleTCAPlugin simpleTCAPlugin;\r
55 \r
56     @Before\r
57     public void before() {\r
58         final TestSimpleTCAPluginConfig testSimpleTCAPluginConfig = getTestSimpleTCAPluginConfig();\r
59         Schema outputSchema = Schema.recordOf(\r
60                 "TestSimpleTCAPluginInputSchema",\r
61                 Schema.Field.of("message", Schema.of(Schema.Type.STRING)),\r
62                 Schema.Field.of("alert", Schema.nullableOf(Schema.of(Schema.Type.STRING))),\r
63                 Schema.Field.of("tcaMessageType", Schema.of(Schema.Type.STRING))\r
64         );\r
65         testSimpleTCAPluginConfig.setSchema(outputSchema.toString());\r
66         simpleTCAPlugin = new SimpleTCAPlugin(testSimpleTCAPluginConfig);\r
67     }\r
68 \r
69     @Test\r
70     public void testConfigurePipeline() throws Exception {\r
71         final PipelineConfigurer pipelineConfigurer = mock(PipelineConfigurer.class);\r
72         final StageConfigurer stageConfigurer = mock(StageConfigurer.class);\r
73         when(pipelineConfigurer.getStageConfigurer()).thenReturn(stageConfigurer);\r
74         when(stageConfigurer.getInputSchema()).thenReturn(getSimpleTCAPluginInputSchema());\r
75         simpleTCAPlugin.configurePipeline(pipelineConfigurer);\r
76         verify(stageConfigurer, times(1)).getInputSchema();\r
77     }\r
78 \r
79     @Test\r
80     public void testTransform() throws Exception {\r
81 \r
82         JavaSparkContext javaSparkContext = new JavaSparkContext("local", "test");\r
83 \r
84         Schema sourceSchema = Schema.recordOf("CEFMessageSourceSchema",\r
85                 Schema.Field.of("message", Schema.of(Schema.Type.STRING))\r
86         );\r
87 \r
88         // Inapplicable Message Structured Record\r
89         final StructuredRecord inapplicableSR =\r
90                 StructuredRecord.builder(sourceSchema).set("message", "test").build();\r
91         // compliant\r
92         final StructuredRecord compliantSR =\r
93                 StructuredRecord.builder(sourceSchema).set("message",\r
94                         fromStream(CEF_MESSAGE_JSON_FILE_LOCATION)).build();\r
95         // non compliant\r
96         final String nonCompliantCEF = fromStream(CEF_NON_COMPLIANT_MESSAGE_JSON_FILE_LOCATION);\r
97         final StructuredRecord nonCompliantSR =\r
98                 StructuredRecord.builder(sourceSchema).set("message", nonCompliantCEF).build();\r
99 \r
100         final List<StructuredRecord> records = new LinkedList<>();\r
101         records.add(inapplicableSR);\r
102         records.add(compliantSR);\r
103         records.add(nonCompliantSR);\r
104 \r
105         final JavaRDD<StructuredRecord> input =\r
106                 javaSparkContext.parallelize(records);\r
107         final SparkExecutionPluginContext context = Mockito.mock(SparkExecutionPluginContext.class);\r
108         final MockStageMetrics stageMetrics = Mockito.mock(MockStageMetrics.class);\r
109         when(context.getMetrics()).thenReturn(stageMetrics);\r
110         final List<StructuredRecord> outputRecord = simpleTCAPlugin.transform(context, input).collect();\r
111         assertNotNull(outputRecord);\r
112         assertThat(outputRecord.size(), is(3));\r
113 \r
114         assertTrue(outputRecord.get(0).get("tcaMessageType").equals(TCACalculatorMessageType.INAPPLICABLE.toString()));\r
115         assertTrue(outputRecord.get(1).get("tcaMessageType").equals(TCACalculatorMessageType.COMPLIANT.toString()));\r
116         assertTrue(outputRecord.get(2).get("tcaMessageType").equals(TCACalculatorMessageType.NON_COMPLIANT.toString()));\r
117     }\r
118 \r
119 }\r