cba456b6a209a4a051cb65c85f2e3e202ae0b954
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-plugins / src / test / java / org / openecomp / dcae / apod / analytics / cdap / plugins / BaseAnalyticsCDAPPluginsUnitTest.java
1 /*\r
2  * ===============================LICENSE_START======================================\r
3  *  dcae-analytics\r
4  * ================================================================================\r
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  *  Licensed under the Apache License, Version 2.0 (the "License");\r
8  *  you may not use this file except in compliance with the License.\r
9  *   You may obtain a copy of the License at\r
10  *\r
11  *          http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  *  Unless required by applicable law or agreed to in writing, software\r
14  *  distributed under the License is distributed on an "AS IS" BASIS,\r
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  *  See the License for the specific language governing permissions and\r
17  *  limitations under the License.\r
18  *  ============================LICENSE_END===========================================\r
19  */\r
20 \r
21 package org.openecomp.dcae.apod.analytics.cdap.plugins;\r
22 \r
23 import co.cask.cdap.api.data.schema.Schema;\r
24 import co.cask.cdap.etl.api.StageMetrics;\r
25 import com.fasterxml.jackson.databind.ObjectMapper;\r
26 import com.google.common.base.Suppliers;\r
27 import org.apache.hadoop.conf.Configuration;\r
28 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPPluginConstants;\r
29 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSinkPluginConfig;\r
30 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSourcePluginConfig;\r
31 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.filter.TestJsonPathFilterPluginConfig;\r
32 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.tca.TestSimpleTCAPluginConfig;\r
33 import org.openecomp.dcae.apod.analytics.model.util.json.AnalyticsModelObjectMapperSupplier;\r
34 import org.openecomp.dcae.apod.analytics.test.BaseDCAEAnalyticsUnitTest;\r
35 \r
36 import java.io.IOException;\r
37 import java.io.Serializable;\r
38 import java.util.LinkedHashMap;\r
39 import java.util.Map;\r
40 \r
41 /**\r
42  * @author Rajiv Singla . Creation Date: 1/23/2017.\r
43  */\r
44 public abstract class BaseAnalyticsCDAPPluginsUnitTest extends BaseDCAEAnalyticsUnitTest {\r
45 \r
46     protected static final ObjectMapper ANALYTICS_MODEL_OBJECT_MAPPER =\r
47             Suppliers.memoize(new AnalyticsModelObjectMapperSupplier()).get();\r
48 \r
49     protected static final String TCA_POLICY_JSON_FILE_LOCATION = "data/json/policy/tca_policy.json";\r
50     protected static final String CEF_MESSAGE_JSON_FILE_LOCATION = "data/json/cef/cef_message.json";\r
51     protected static final String CEF_NON_COMPLIANT_MESSAGE_JSON_FILE_LOCATION =\r
52             "data/json/cef/cef_message_with_threshold_violation.json";\r
53 \r
54 \r
55     protected static final String DMAAP_MR_SOURCE_PLUGIN_REFERENCE_NAME = "testDMaaPMRSource";\r
56     protected static final String DMAAP_MR_SOURCE_PLUGIN_HOST_NAME = "dcae-msrt-mtl1-ftl.homer.com";\r
57     protected static final Integer DMAAP_MR_SOURCE_PLUGIN_PORT_NUMBER = 3905;\r
58     protected static final String DMAAP_MR_SOURCE_PLUGIN_TOPIC_NAME = "com.dcae.dmaap.FTL.DcaeTestVESSub";\r
59     protected static final Integer DMAAP_MR_SOURCE_PLUGIN_POLLING_INTERVAL = 1000;\r
60     protected static final String DMAAP_MR_SOURCE_PLUGIN_PROTOCOL = "https";\r
61     protected static final String DMAAP_MR_SOURCE_PLUGIN_USERNAME = "username";\r
62     protected static final String DMAAP_MR_SOURCE_PLUGIN_PASSWORD = "password";\r
63     protected static final String DMAAP_MR_SOURCE_PLUGIN_CONTENT_TYPE = "application/json";\r
64     protected static final String DMAAP_MR_SOURCE_PLUGIN_CONSUMER_GROUP = "G1";\r
65     protected static final String DMAAP_MR_SOURCE_PLUGIN_CONSUMER_ID = "C1";\r
66     protected static final Integer DMAAP_MR_SOURCE_PLUGIN_MESSAGE_LIMIT = 100;\r
67     protected static final Integer DMAAP_MR_SOURCE_PLUGIN_TIMEOUT = 10000;\r
68 \r
69 \r
70     protected static final String DMAAP_MR_SINK_PLUGIN_REFERENCE_NAME = "testDMaaPMRSINK";\r
71     protected static final String DMAAP_MR_SINK_PLUGIN_HOST_NAME = "dcae-msrt-mtl1-ftl.homer.com";\r
72     protected static final Integer DMAAP_MR_SINK_PLUGIN_PORT_NUMBER = 3905;\r
73     protected static final String DMAAP_MR_SINK_PLUGIN_TOPIC_NAME = "com.dcae.dmaap.FTL.DcaeTestVESPub";\r
74     protected static final String DMAAP_MR_SINK_PLUGIN_PROTOCOL = "https";\r
75     protected static final String DMAAP_MR_SINK_PLUGIN_USERNAME = "username";\r
76     protected static final String DMAAP_MR_SINK_PLUGIN_PASSWORD = "password";\r
77     protected static final String DMAAP_MR_SINK_PLUGIN_CONTENT_TYPE = "application/json";\r
78     protected static final String DMAAP_MR_SINK_MESSAGE_COLUMN_NAME = "message";\r
79     protected static final Integer DMAAP_MR_SINK_PLUGIN_MAX_BATCH_SIZE = 10;\r
80     protected static final Integer DMAAP_MR_SINK_PLUGIN_MAX_RECOVERY_QUEUE_SIZE = 100;\r
81 \r
82     protected static final String VES_MESSAGE_FIELD_NAME = "message";\r
83     protected static final String TCA_PLUGIN_ALERT_FIELD_NAME = "alert";\r
84     protected static final String TCA_PLUGIN_MESSAGE_TYPE_FIELD_NAME = "tcaMessageType";\r
85 \r
86 \r
87     protected static final String JSON_PATH_FILTER_PLUGIN_REFERENCE_NAME = "JsonPathFilter";\r
88     protected static final String JSON_PATH_FILTER_PLUGIN_INCOMING_JSON_FIELD_NAME = "message";\r
89     protected static final String JSON_PATH_FILTER_PLUGIN_OUTPUT_SCHEMA_FILED_NAME = "filterMatched";\r
90     protected static final String JSON_PATH_FILTER_PLUGIN_JSON_FILTER_MAPPINGS =\r
91             "$.event.commonEventHeader.domain:measurementsForVfScaling," +\r
92                     "$.event.commonEventHeader.eventName:vLoadBalancer;vFirewall";\r
93     protected static final String JSON_PATH_FILTER_PLUGIN_JSON_FILTER_OUTPUT_SCHEMA =\r
94             "{\"type\":\"record\"," +\r
95                     "\"name\":\"etlSchemaBody\",\"fields\":" +\r
96                     "[" +\r
97                     "{\"name\":\"ts\",\"type\":\"long\"}," +\r
98                     "{\"name\":\"filterMatched\",\"type\":[\"boolean\",\"null\"]}," +\r
99                     "{\"name\":\"responseCode\",\"type\":\"int\"}," +\r
100                     "{\"name\":\"responseMessage\",\"type\":\"string\"}," +\r
101                     "{\"name\":\"message\",\"type\":\"string\"}" +\r
102                     "]" +\r
103                     "}";\r
104 \r
105     protected static class MockStageMetrics implements StageMetrics, Serializable {\r
106 \r
107         @Override\r
108         public void count(String metricName, int delta) {\r
109             LOG.debug("Mocking metric count, MetricName: {}, Delta: {}", metricName, delta);\r
110         }\r
111 \r
112         @Override\r
113         public void gauge(String metricName, long value) {\r
114             LOG.debug("Mocking metric guage, MetricName: {}, Value: {}", metricName, value);\r
115         }\r
116 \r
117         @Override\r
118         public void pipelineCount(String metricName, int delta) {\r
119             LOG.debug("Mocking metric pipelineCount, MetricName: {}, Delta: {}", metricName, delta);\r
120         }\r
121 \r
122         @Override\r
123         public void pipelineGauge(String metricName, long value) {\r
124             LOG.debug("Mocking metric guage, pipelineGauge: {}, Value: {}", metricName, value);\r
125         }\r
126     }\r
127 \r
128     protected static TestDMaaPMRSourcePluginConfig getTestDMaaPMRSourcePluginConfig() {\r
129         final TestDMaaPMRSourcePluginConfig sourcePluginConfig = new TestDMaaPMRSourcePluginConfig();\r
130         sourcePluginConfig.setReferenceName(DMAAP_MR_SOURCE_PLUGIN_REFERENCE_NAME);\r
131         sourcePluginConfig.setHostName(DMAAP_MR_SOURCE_PLUGIN_HOST_NAME);\r
132         sourcePluginConfig.setPortNumber(DMAAP_MR_SOURCE_PLUGIN_PORT_NUMBER);\r
133         sourcePluginConfig.setTopicName(DMAAP_MR_SOURCE_PLUGIN_TOPIC_NAME);\r
134         sourcePluginConfig.setPollingInterval(DMAAP_MR_SOURCE_PLUGIN_POLLING_INTERVAL);\r
135         sourcePluginConfig.setProtocol(DMAAP_MR_SOURCE_PLUGIN_PROTOCOL);\r
136         sourcePluginConfig.setUserName(DMAAP_MR_SOURCE_PLUGIN_USERNAME);\r
137         sourcePluginConfig.setUserPassword(DMAAP_MR_SOURCE_PLUGIN_PASSWORD);\r
138         sourcePluginConfig.setContentType(DMAAP_MR_SOURCE_PLUGIN_CONTENT_TYPE);\r
139         sourcePluginConfig.setConsumerGroup(DMAAP_MR_SOURCE_PLUGIN_CONSUMER_GROUP);\r
140         sourcePluginConfig.setConsumerId(DMAAP_MR_SOURCE_PLUGIN_CONSUMER_ID);\r
141         sourcePluginConfig.setMessageLimit(DMAAP_MR_SOURCE_PLUGIN_MESSAGE_LIMIT);\r
142         sourcePluginConfig.setTimeoutMS(DMAAP_MR_SOURCE_PLUGIN_TIMEOUT);\r
143         return sourcePluginConfig;\r
144     }\r
145 \r
146     protected static TestDMaaPMRSinkPluginConfig getTestDMaaPMRSinkPluginConfig() {\r
147         final TestDMaaPMRSinkPluginConfig sinkPluginConfig = new TestDMaaPMRSinkPluginConfig();\r
148         sinkPluginConfig.setReferenceName(DMAAP_MR_SINK_PLUGIN_REFERENCE_NAME);\r
149         sinkPluginConfig.setHostName(DMAAP_MR_SINK_PLUGIN_HOST_NAME);\r
150         sinkPluginConfig.setPortNumber(DMAAP_MR_SINK_PLUGIN_PORT_NUMBER);\r
151         sinkPluginConfig.setTopicName(DMAAP_MR_SINK_PLUGIN_TOPIC_NAME);\r
152         sinkPluginConfig.setProtocol(DMAAP_MR_SINK_PLUGIN_PROTOCOL);\r
153         sinkPluginConfig.setUserName(DMAAP_MR_SINK_PLUGIN_USERNAME);\r
154         sinkPluginConfig.setUserPassword(DMAAP_MR_SINK_PLUGIN_PASSWORD);\r
155         sinkPluginConfig.setContentType(DMAAP_MR_SINK_PLUGIN_CONTENT_TYPE);\r
156         sinkPluginConfig.setMessageColumnName(DMAAP_MR_SINK_MESSAGE_COLUMN_NAME);\r
157         sinkPluginConfig.setMaxBatchSize(DMAAP_MR_SINK_PLUGIN_MAX_BATCH_SIZE);\r
158         sinkPluginConfig.setMaxRecoveryQueueSize(DMAAP_MR_SINK_PLUGIN_MAX_RECOVERY_QUEUE_SIZE);\r
159         return sinkPluginConfig;\r
160     }\r
161 \r
162 \r
163     protected static Configuration getTestConfiguration() {\r
164         final Configuration configuration = new Configuration();\r
165         final Map<String, String> sinkConfigurationMap = createSinkConfigurationMap();\r
166         for (Map.Entry<String, String> property : sinkConfigurationMap.entrySet()) {\r
167             configuration.set(property.getKey(), property.getValue());\r
168         }\r
169         return configuration;\r
170     }\r
171 \r
172     protected static Map<String, String> createSinkConfigurationMap() {\r
173 \r
174         Map<String, String> sinkConfig = new LinkedHashMap<>();\r
175         sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.HOST_NAME, DMAAP_MR_SINK_PLUGIN_HOST_NAME);\r
176         sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.TOPIC_NAME, DMAAP_MR_SINK_PLUGIN_TOPIC_NAME);\r
177         sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.PORT_NUMBER,\r
178                 DMAAP_MR_SINK_PLUGIN_PORT_NUMBER.toString());\r
179         sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.PROTOCOL, DMAAP_MR_SINK_PLUGIN_PROTOCOL);\r
180         sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.USER_NAME, DMAAP_MR_SINK_PLUGIN_USERNAME);\r
181         sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.USER_PASS, DMAAP_MR_SINK_PLUGIN_PASSWORD);\r
182         sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.CONTENT_TYPE,\r
183                 DMAAP_MR_SINK_PLUGIN_CONTENT_TYPE);\r
184         sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.MAX_BATCH_SIZE,\r
185                 DMAAP_MR_SINK_PLUGIN_MAX_BATCH_SIZE.toString());\r
186         sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.MAX_RECOVER_QUEUE_SIZE,\r
187                 DMAAP_MR_SINK_PLUGIN_MAX_RECOVERY_QUEUE_SIZE.toString());\r
188         return sinkConfig;\r
189     }\r
190 \r
191     protected static Schema getDMaaPMRSinkTestSchema() {\r
192         return Schema.recordOf(\r
193                 "DMaaPMRSinkTestSchema",\r
194                 Schema.Field.of("message", Schema.of(Schema.Type.STRING)),\r
195                 Schema.Field.of("field1", Schema.of(Schema.Type.STRING))\r
196         );\r
197     }\r
198 \r
199 \r
200     protected static TestSimpleTCAPluginConfig getTestSimpleTCAPluginConfig() {\r
201         final String policyJson;\r
202         try {\r
203             policyJson = fromStream(TCA_POLICY_JSON_FILE_LOCATION);\r
204         } catch (IOException e) {\r
205             throw new RuntimeException("Error while parsing policy", e);\r
206         }\r
207         return new TestSimpleTCAPluginConfig(VES_MESSAGE_FIELD_NAME, policyJson, TCA_PLUGIN_ALERT_FIELD_NAME,\r
208                 TCA_PLUGIN_MESSAGE_TYPE_FIELD_NAME, getSimpleTCAPluginInputSchema().toString(), false);\r
209     }\r
210 \r
211     protected static Schema getSimpleTCAPluginInputSchema() {\r
212         return Schema.recordOf(\r
213                 "TestSimpleTCAPluginInputSchema",\r
214                 Schema.Field.of("message", Schema.of(Schema.Type.STRING)),\r
215                 Schema.Field.of("inputField1", Schema.nullableOf(Schema.of(Schema.Type.STRING))),\r
216                 Schema.Field.of("inputField2", Schema.nullableOf(Schema.of(Schema.Type.STRING)))\r
217         );\r
218     }\r
219 \r
220     protected static Schema getJsonFilterPluginInputSchema() {\r
221         return Schema.recordOf(\r
222                 "TestJsonFilterInputSchema",\r
223                 Schema.Field.of("ts", Schema.of(Schema.Type.LONG)),\r
224                 Schema.Field.of("responseCode", Schema.of(Schema.Type.INT)),\r
225                 Schema.Field.of("responseMessage", Schema.of(Schema.Type.STRING)),\r
226                 Schema.Field.of("message", Schema.of(Schema.Type.STRING))\r
227         );\r
228     }\r
229 \r
230     protected static TestJsonPathFilterPluginConfig getJsonPathFilterPluginConfig() {\r
231         return new TestJsonPathFilterPluginConfig(JSON_PATH_FILTER_PLUGIN_REFERENCE_NAME,\r
232                 JSON_PATH_FILTER_PLUGIN_INCOMING_JSON_FIELD_NAME,\r
233                 JSON_PATH_FILTER_PLUGIN_OUTPUT_SCHEMA_FILED_NAME,\r
234                 JSON_PATH_FILTER_PLUGIN_JSON_FILTER_MAPPINGS,\r
235                 JSON_PATH_FILTER_PLUGIN_JSON_FILTER_OUTPUT_SCHEMA);\r
236     }\r
237 \r
238 }\r