Initial TCA commit into DCAEGEN2
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-plugins / src / test / java / org / openecomp / dcae / apod / analytics / cdap / plugins / BaseAnalyticsCDAPPluginsUnitTest.java
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.openecomp.dcae.apod.analytics.cdap.plugins;
22
23 import co.cask.cdap.api.data.schema.Schema;
24 import co.cask.cdap.etl.api.StageMetrics;
25 import com.fasterxml.jackson.databind.ObjectMapper;
26 import com.google.common.base.Suppliers;
27 import org.apache.hadoop.conf.Configuration;
28 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPPluginConstants;
29 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSinkPluginConfig;
30 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSourcePluginConfig;
31 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.filter.TestJsonPathFilterPluginConfig;
32 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.tca.TestSimpleTCAPluginConfig;
33 import org.openecomp.dcae.apod.analytics.model.util.json.AnalyticsModelObjectMapperSupplier;
34 import org.openecomp.dcae.apod.analytics.test.BaseDCAEAnalyticsUnitTest;
35
36 import java.io.IOException;
37 import java.io.Serializable;
38 import java.util.LinkedHashMap;
39 import java.util.Map;
40
41 /**
42  * @author Rajiv Singla . Creation Date: 1/23/2017.
43  */
44 public abstract class BaseAnalyticsCDAPPluginsUnitTest extends BaseDCAEAnalyticsUnitTest {
45
46     protected static final ObjectMapper ANALYTICS_MODEL_OBJECT_MAPPER =
47             Suppliers.memoize(new AnalyticsModelObjectMapperSupplier()).get();
48
49     protected static final String TCA_POLICY_JSON_FILE_LOCATION = "data/json/policy/tca_policy.json";
50     protected static final String CEF_MESSAGE_JSON_FILE_LOCATION = "data/json/cef/cef_message.json";
51     protected static final String CEF_NON_COMPLIANT_MESSAGE_JSON_FILE_LOCATION =
52             "data/json/cef/non_compliant_cef_message.json";
53
54
55     protected static final String DMAAP_MR_SOURCE_PLUGIN_REFERENCE_NAME = "testDMaaPMRSource";
56     protected static final String DMAAP_MR_SOURCE_PLUGIN_HOST_NAME = "dcae-msrt-mtl1-ftl.homer.com";
57     protected static final Integer DMAAP_MR_SOURCE_PLUGIN_PORT_NUMBER = 3905;
58     protected static final String DMAAP_MR_SOURCE_PLUGIN_TOPIC_NAME = "com.dcae.dmaap.FTL.DcaeTestVESSub";
59     protected static final Integer DMAAP_MR_SOURCE_PLUGIN_POLLING_INTERVAL = 1000;
60     protected static final String DMAAP_MR_SOURCE_PLUGIN_PROTOCOL = "https";
61     protected static final String DMAAP_MR_SOURCE_PLUGIN_USERNAME = "username";
62     protected static final String DMAAP_MR_SOURCE_PLUGIN_PASSWORD = "password";
63     protected static final String DMAAP_MR_SOURCE_PLUGIN_CONTENT_TYPE = "application/json";
64     protected static final String DMAAP_MR_SOURCE_PLUGIN_CONSUMER_GROUP = "G1";
65     protected static final String DMAAP_MR_SOURCE_PLUGIN_CONSUMER_ID = "C1";
66     protected static final Integer DMAAP_MR_SOURCE_PLUGIN_MESSAGE_LIMIT = 100;
67     protected static final Integer DMAAP_MR_SOURCE_PLUGIN_TIMEOUT = 10000;
68
69
70     protected static final String DMAAP_MR_SINK_PLUGIN_REFERENCE_NAME = "testDMaaPMRSINK";
71     protected static final String DMAAP_MR_SINK_PLUGIN_HOST_NAME = "dcae-msrt-mtl1-ftl.homer.com";
72     protected static final Integer DMAAP_MR_SINK_PLUGIN_PORT_NUMBER = 3905;
73     protected static final String DMAAP_MR_SINK_PLUGIN_TOPIC_NAME = "com.dcae.dmaap.FTL.DcaeTestVESPub";
74     protected static final String DMAAP_MR_SINK_PLUGIN_PROTOCOL = "https";
75     protected static final String DMAAP_MR_SINK_PLUGIN_USERNAME = "username";
76     protected static final String DMAAP_MR_SINK_PLUGIN_PASSWORD = "password";
77     protected static final String DMAAP_MR_SINK_PLUGIN_CONTENT_TYPE = "application/json";
78     protected static final String DMAAP_MR_SINK_MESSAGE_COLUMN_NAME = "message";
79     protected static final Integer DMAAP_MR_SINK_PLUGIN_MAX_BATCH_SIZE = 10;
80     protected static final Integer DMAAP_MR_SINK_PLUGIN_MAX_RECOVERY_QUEUE_SIZE = 100;
81
82     protected static final String VES_MESSAGE_FIELD_NAME = "message";
83     protected static final String TCA_PLUGIN_ALERT_FIELD_NAME = "alert";
84     protected static final String TCA_PLUGIN_MESSAGE_TYPE_FIELD_NAME = "tcaMessageType";
85
86
87     protected static final String JSON_PATH_FILTER_PLUGIN_REFERENCE_NAME = "JsonPathFilter";
88     protected static final String JSON_PATH_FILTER_PLUGIN_INCOMING_JSON_FIELD_NAME = "message";
89     protected static final String JSON_PATH_FILTER_PLUGIN_OUTPUT_SCHEMA_FILED_NAME = "filterMatched";
90     protected static final String JSON_PATH_FILTER_PLUGIN_JSON_FILTER_MAPPINGS =
91             "$.event.commonEventHeader.domain:measurementsForVfScaling," +
92                     "$.event.commonEventHeader.functionalRole:vLoadBalancer;vFirewall";
93     protected static final String JSON_PATH_FILTER_PLUGIN_JSON_FILTER_OUTPUT_SCHEMA =
94             "{\"type\":\"record\"," +
95                     "\"name\":\"etlSchemaBody\",\"fields\":" +
96                     "[" +
97                     "{\"name\":\"ts\",\"type\":\"long\"}," +
98                     "{\"name\":\"filterMatched\",\"type\":[\"boolean\",\"null\"]}," +
99                     "{\"name\":\"responseCode\",\"type\":\"int\"}," +
100                     "{\"name\":\"responseMessage\",\"type\":\"string\"}," +
101                     "{\"name\":\"message\",\"type\":\"string\"}" +
102                     "]" +
103                     "}";
104
105     protected static class MockStageMetrics implements StageMetrics, Serializable {
106
107         @Override
108         public void count(String metricName, int delta) {
109             LOG.debug("Mocking metric count, MetricName: {}, Delta: {}", metricName, delta);
110         }
111
112         @Override
113         public void gauge(String metricName, long value) {
114             LOG.debug("Mocking metric guage, MetricName: {}, Value: {}", metricName, value);
115         }
116
117         @Override
118         public void pipelineCount(String metricName, int delta) {
119             LOG.debug("Mocking metric pipelineCount, MetricName: {}, Delta: {}", metricName, delta);
120         }
121
122         @Override
123         public void pipelineGauge(String metricName, long value) {
124             LOG.debug("Mocking metric guage, pipelineGauge: {}, Value: {}", metricName, value);
125         }
126     }
127
128     protected static TestDMaaPMRSourcePluginConfig getTestDMaaPMRSourcePluginConfig() {
129         final TestDMaaPMRSourcePluginConfig sourcePluginConfig = new TestDMaaPMRSourcePluginConfig();
130         sourcePluginConfig.setReferenceName(DMAAP_MR_SOURCE_PLUGIN_REFERENCE_NAME);
131         sourcePluginConfig.setHostName(DMAAP_MR_SOURCE_PLUGIN_HOST_NAME);
132         sourcePluginConfig.setPortNumber(DMAAP_MR_SOURCE_PLUGIN_PORT_NUMBER);
133         sourcePluginConfig.setTopicName(DMAAP_MR_SOURCE_PLUGIN_TOPIC_NAME);
134         sourcePluginConfig.setPollingInterval(DMAAP_MR_SOURCE_PLUGIN_POLLING_INTERVAL);
135         sourcePluginConfig.setProtocol(DMAAP_MR_SOURCE_PLUGIN_PROTOCOL);
136         sourcePluginConfig.setUserName(DMAAP_MR_SOURCE_PLUGIN_USERNAME);
137         sourcePluginConfig.setUserPassword(DMAAP_MR_SOURCE_PLUGIN_PASSWORD);
138         sourcePluginConfig.setContentType(DMAAP_MR_SOURCE_PLUGIN_CONTENT_TYPE);
139         sourcePluginConfig.setConsumerGroup(DMAAP_MR_SOURCE_PLUGIN_CONSUMER_GROUP);
140         sourcePluginConfig.setConsumerId(DMAAP_MR_SOURCE_PLUGIN_CONSUMER_ID);
141         sourcePluginConfig.setMessageLimit(DMAAP_MR_SOURCE_PLUGIN_MESSAGE_LIMIT);
142         sourcePluginConfig.setTimeoutMS(DMAAP_MR_SOURCE_PLUGIN_TIMEOUT);
143         return sourcePluginConfig;
144     }
145
146     protected static TestDMaaPMRSinkPluginConfig getTestDMaaPMRSinkPluginConfig() {
147         final TestDMaaPMRSinkPluginConfig sinkPluginConfig = new TestDMaaPMRSinkPluginConfig();
148         sinkPluginConfig.setReferenceName(DMAAP_MR_SINK_PLUGIN_REFERENCE_NAME);
149         sinkPluginConfig.setHostName(DMAAP_MR_SINK_PLUGIN_HOST_NAME);
150         sinkPluginConfig.setPortNumber(DMAAP_MR_SINK_PLUGIN_PORT_NUMBER);
151         sinkPluginConfig.setTopicName(DMAAP_MR_SINK_PLUGIN_TOPIC_NAME);
152         sinkPluginConfig.setProtocol(DMAAP_MR_SINK_PLUGIN_PROTOCOL);
153         sinkPluginConfig.setUserName(DMAAP_MR_SINK_PLUGIN_USERNAME);
154         sinkPluginConfig.setUserPassword(DMAAP_MR_SINK_PLUGIN_PASSWORD);
155         sinkPluginConfig.setContentType(DMAAP_MR_SINK_PLUGIN_CONTENT_TYPE);
156         sinkPluginConfig.setMessageColumnName(DMAAP_MR_SINK_MESSAGE_COLUMN_NAME);
157         sinkPluginConfig.setMaxBatchSize(DMAAP_MR_SINK_PLUGIN_MAX_BATCH_SIZE);
158         sinkPluginConfig.setMaxRecoveryQueueSize(DMAAP_MR_SINK_PLUGIN_MAX_RECOVERY_QUEUE_SIZE);
159         return sinkPluginConfig;
160     }
161
162
163     protected static Configuration getTestConfiguration() {
164         final Configuration configuration = new Configuration();
165         final Map<String, String> sinkConfigurationMap = createSinkConfigurationMap();
166         for (Map.Entry<String, String> property : sinkConfigurationMap.entrySet()) {
167             configuration.set(property.getKey(), property.getValue());
168         }
169         return configuration;
170     }
171
172     protected static Map<String, String> createSinkConfigurationMap() {
173
174         Map<String, String> sinkConfig = new LinkedHashMap<>();
175         sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.HOST_NAME, DMAAP_MR_SINK_PLUGIN_HOST_NAME);
176         sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.TOPIC_NAME, DMAAP_MR_SINK_PLUGIN_TOPIC_NAME);
177         sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.PORT_NUMBER,
178                 DMAAP_MR_SINK_PLUGIN_PORT_NUMBER.toString());
179         sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.PROTOCOL, DMAAP_MR_SINK_PLUGIN_PROTOCOL);
180         sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.USER_NAME, DMAAP_MR_SINK_PLUGIN_USERNAME);
181         sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.USER_PASS, DMAAP_MR_SINK_PLUGIN_PASSWORD);
182         sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.CONTENT_TYPE,
183                 DMAAP_MR_SINK_PLUGIN_CONTENT_TYPE);
184         sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.MAX_BATCH_SIZE,
185                 DMAAP_MR_SINK_PLUGIN_MAX_BATCH_SIZE.toString());
186         sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.MAX_RECOVER_QUEUE_SIZE,
187                 DMAAP_MR_SINK_PLUGIN_MAX_RECOVERY_QUEUE_SIZE.toString());
188         return sinkConfig;
189     }
190
191     protected static Schema getDMaaPMRSinkTestSchema() {
192         return Schema.recordOf(
193                 "DMaaPMRSinkTestSchema",
194                 Schema.Field.of("message", Schema.of(Schema.Type.STRING)),
195                 Schema.Field.of("field1", Schema.of(Schema.Type.STRING))
196         );
197     }
198
199
200     protected static TestSimpleTCAPluginConfig getTestSimpleTCAPluginConfig() {
201         final String policyJson;
202         try {
203             policyJson = fromStream(TCA_POLICY_JSON_FILE_LOCATION);
204         } catch (IOException e) {
205             throw new RuntimeException("Error while parsing policy", e);
206         }
207         return new TestSimpleTCAPluginConfig(VES_MESSAGE_FIELD_NAME, policyJson, TCA_PLUGIN_ALERT_FIELD_NAME,
208                 TCA_PLUGIN_MESSAGE_TYPE_FIELD_NAME, getSimpleTCAPluginInputSchema().toString(), false);
209     }
210
211     protected static Schema getSimpleTCAPluginInputSchema() {
212         return Schema.recordOf(
213                 "TestSimpleTCAPluginInputSchema",
214                 Schema.Field.of("message", Schema.of(Schema.Type.STRING)),
215                 Schema.Field.of("inputField1", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
216                 Schema.Field.of("inputField2", Schema.nullableOf(Schema.of(Schema.Type.STRING)))
217         );
218     }
219
220     protected static Schema getJsonFilterPluginInputSchema() {
221         return Schema.recordOf(
222                 "TestJsonFilterInputSchema",
223                 Schema.Field.of("ts", Schema.of(Schema.Type.LONG)),
224                 Schema.Field.of("responseCode", Schema.of(Schema.Type.INT)),
225                 Schema.Field.of("responseMessage", Schema.of(Schema.Type.STRING)),
226                 Schema.Field.of("message", Schema.of(Schema.Type.STRING))
227         );
228     }
229
230     protected static TestJsonPathFilterPluginConfig getJsonPathFilterPluginConfig() {
231         return new TestJsonPathFilterPluginConfig(JSON_PATH_FILTER_PLUGIN_REFERENCE_NAME,
232                 JSON_PATH_FILTER_PLUGIN_INCOMING_JSON_FIELD_NAME,
233                 JSON_PATH_FILTER_PLUGIN_OUTPUT_SCHEMA_FILED_NAME,
234                 JSON_PATH_FILTER_PLUGIN_JSON_FILTER_MAPPINGS,
235                 JSON_PATH_FILTER_PLUGIN_JSON_FILTER_OUTPUT_SCHEMA);
236     }
237
238 }