2 * ===============================LICENSE_START======================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============================LICENSE_END===========================================
21 package org.openecomp.dcae.apod.analytics.cdap.plugins.sparkcompute.tca;
23 import co.cask.cdap.api.data.format.StructuredRecord;
24 import co.cask.cdap.api.data.schema.Schema;
25 import co.cask.cdap.etl.api.PipelineConfigurer;
26 import co.cask.cdap.etl.api.StageConfigurer;
27 import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;
28 import org.apache.spark.api.java.JavaRDD;
29 import org.apache.spark.api.java.JavaSparkContext;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.mockito.Mockito;
33 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType;
34 import org.openecomp.dcae.apod.analytics.cdap.plugins.BaseAnalyticsCDAPPluginsUnitTest;
35 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.tca.TestSimpleTCAPluginConfig;
37 import java.util.LinkedList;
38 import java.util.List;
40 import static org.hamcrest.CoreMatchers.is;
41 import static org.junit.Assert.assertNotNull;
42 import static org.junit.Assert.assertThat;
43 import static org.junit.Assert.assertTrue;
44 import static org.mockito.Mockito.mock;
45 import static org.mockito.Mockito.times;
46 import static org.mockito.Mockito.verify;
47 import static org.mockito.Mockito.when;
50 * @author Rajiv Singla . Creation Date: 2/17/2017.
52 public class SimpleTCAPluginTest extends BaseAnalyticsCDAPPluginsUnitTest {
54 private SimpleTCAPlugin simpleTCAPlugin;
57 public void before() {
58 final TestSimpleTCAPluginConfig testSimpleTCAPluginConfig = getTestSimpleTCAPluginConfig();
59 Schema outputSchema = Schema.recordOf(
60 "TestSimpleTCAPluginInputSchema",
61 Schema.Field.of("message", Schema.of(Schema.Type.STRING)),
62 Schema.Field.of("alert", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
63 Schema.Field.of("tcaMessageType", Schema.of(Schema.Type.STRING))
65 testSimpleTCAPluginConfig.setSchema(outputSchema.toString());
66 simpleTCAPlugin = new SimpleTCAPlugin(testSimpleTCAPluginConfig);
70 public void testConfigurePipeline() throws Exception {
71 final PipelineConfigurer pipelineConfigurer = mock(PipelineConfigurer.class);
72 final StageConfigurer stageConfigurer = mock(StageConfigurer.class);
73 when(pipelineConfigurer.getStageConfigurer()).thenReturn(stageConfigurer);
74 when(stageConfigurer.getInputSchema()).thenReturn(getSimpleTCAPluginInputSchema());
75 simpleTCAPlugin.configurePipeline(pipelineConfigurer);
76 verify(stageConfigurer, times(1)).getInputSchema();
80 public void testTransform() throws Exception {
82 JavaSparkContext javaSparkContext = new JavaSparkContext("local", "test");
84 Schema sourceSchema = Schema.recordOf("CEFMessageSourceSchema",
85 Schema.Field.of("message", Schema.of(Schema.Type.STRING))
88 // Inapplicable Message Structured Record
89 final StructuredRecord inapplicableSR =
90 StructuredRecord.builder(sourceSchema).set("message", "test").build();
92 final StructuredRecord compliantSR =
93 StructuredRecord.builder(sourceSchema).set("message",
94 fromStream(CEF_MESSAGE_JSON_FILE_LOCATION)).build();
96 final String nonCompliantCEF = fromStream(CEF_NON_COMPLIANT_MESSAGE_JSON_FILE_LOCATION);
97 final StructuredRecord nonCompliantSR =
98 StructuredRecord.builder(sourceSchema).set("message", nonCompliantCEF).build();
100 final List<StructuredRecord> records = new LinkedList<>();
101 records.add(inapplicableSR);
102 records.add(compliantSR);
103 records.add(nonCompliantSR);
105 final JavaRDD<StructuredRecord> input =
106 javaSparkContext.parallelize(records);
107 final SparkExecutionPluginContext context = Mockito.mock(SparkExecutionPluginContext.class);
108 final MockStageMetrics stageMetrics = Mockito.mock(MockStageMetrics.class);
109 when(context.getMetrics()).thenReturn(stageMetrics);
110 final List<StructuredRecord> outputRecord = simpleTCAPlugin.transform(context, input).collect();
111 assertNotNull(outputRecord);
112 assertThat(outputRecord.size(), is(3));
114 assertTrue(outputRecord.get(0).get("tcaMessageType").equals(TCACalculatorMessageType.INAPPLICABLE.toString()));
115 assertTrue(outputRecord.get(1).get("tcaMessageType").equals(TCACalculatorMessageType.COMPLIANT.toString()));
116 assertTrue(outputRecord.get(2).get("tcaMessageType").equals(TCACalculatorMessageType.NON_COMPLIANT.toString()));