2 * ===============================LICENSE_START======================================
\r
4 * ================================================================================
\r
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * ================================================================================
\r
7 * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * you may not use this file except in compliance with the License.
\r
9 * You may obtain a copy of the License at
\r
11 * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * Unless required by applicable law or agreed to in writing, software
\r
14 * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * See the License for the specific language governing permissions and
\r
17 * limitations under the License.
\r
18 * ============================LICENSE_END===========================================
\r
21 package org.openecomp.dcae.apod.analytics.cdap.plugins;
\r
23 import co.cask.cdap.api.data.schema.Schema;
\r
24 import co.cask.cdap.etl.api.StageMetrics;
\r
25 import com.fasterxml.jackson.databind.ObjectMapper;
\r
26 import com.google.common.base.Suppliers;
\r
27 import org.apache.hadoop.conf.Configuration;
\r
28 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPPluginConstants;
\r
29 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSinkPluginConfig;
\r
30 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.TestDMaaPMRSourcePluginConfig;
\r
31 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.filter.TestJsonPathFilterPluginConfig;
\r
32 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.tca.TestSimpleTCAPluginConfig;
\r
33 import org.openecomp.dcae.apod.analytics.model.util.json.AnalyticsModelObjectMapperSupplier;
\r
34 import org.openecomp.dcae.apod.analytics.test.BaseDCAEAnalyticsUnitTest;
\r
36 import java.io.IOException;
\r
37 import java.io.Serializable;
\r
38 import java.util.LinkedHashMap;
\r
39 import java.util.Map;
\r
42 * @author Rajiv Singla . Creation Date: 1/23/2017.
\r
44 public abstract class BaseAnalyticsCDAPPluginsUnitTest extends BaseDCAEAnalyticsUnitTest {
\r
46 protected static final ObjectMapper ANALYTICS_MODEL_OBJECT_MAPPER =
\r
47 Suppliers.memoize(new AnalyticsModelObjectMapperSupplier()).get();
\r
49 protected static final String TCA_POLICY_JSON_FILE_LOCATION = "data/json/policy/tca_policy.json";
\r
50 protected static final String CEF_MESSAGE_JSON_FILE_LOCATION = "data/json/cef/cef_message.json";
\r
51 protected static final String CEF_NON_COMPLIANT_MESSAGE_JSON_FILE_LOCATION =
\r
52 "data/json/cef/cef_message_with_threshold_violation.json";
\r
55 protected static final String DMAAP_MR_SOURCE_PLUGIN_REFERENCE_NAME = "testDMaaPMRSource";
\r
56 protected static final String DMAAP_MR_SOURCE_PLUGIN_HOST_NAME = "dcae-msrt-mtl1-ftl.homer.com";
\r
57 protected static final Integer DMAAP_MR_SOURCE_PLUGIN_PORT_NUMBER = 3905;
\r
58 protected static final String DMAAP_MR_SOURCE_PLUGIN_TOPIC_NAME = "com.dcae.dmaap.FTL.DcaeTestVESSub";
\r
59 protected static final Integer DMAAP_MR_SOURCE_PLUGIN_POLLING_INTERVAL = 1000;
\r
60 protected static final String DMAAP_MR_SOURCE_PLUGIN_PROTOCOL = "https";
\r
61 protected static final String DMAAP_MR_SOURCE_PLUGIN_USERNAME = "username";
\r
62 protected static final String DMAAP_MR_SOURCE_PLUGIN_PASSWORD = "password";
\r
63 protected static final String DMAAP_MR_SOURCE_PLUGIN_CONTENT_TYPE = "application/json";
\r
64 protected static final String DMAAP_MR_SOURCE_PLUGIN_CONSUMER_GROUP = "G1";
\r
65 protected static final String DMAAP_MR_SOURCE_PLUGIN_CONSUMER_ID = "C1";
\r
66 protected static final Integer DMAAP_MR_SOURCE_PLUGIN_MESSAGE_LIMIT = 100;
\r
67 protected static final Integer DMAAP_MR_SOURCE_PLUGIN_TIMEOUT = 10000;
\r
70 protected static final String DMAAP_MR_SINK_PLUGIN_REFERENCE_NAME = "testDMaaPMRSINK";
\r
71 protected static final String DMAAP_MR_SINK_PLUGIN_HOST_NAME = "dcae-msrt-mtl1-ftl.homer.com";
\r
72 protected static final Integer DMAAP_MR_SINK_PLUGIN_PORT_NUMBER = 3905;
\r
73 protected static final String DMAAP_MR_SINK_PLUGIN_TOPIC_NAME = "com.dcae.dmaap.FTL.DcaeTestVESPub";
\r
74 protected static final String DMAAP_MR_SINK_PLUGIN_PROTOCOL = "https";
\r
75 protected static final String DMAAP_MR_SINK_PLUGIN_USERNAME = "username";
\r
76 protected static final String DMAAP_MR_SINK_PLUGIN_PASSWORD = "password";
\r
77 protected static final String DMAAP_MR_SINK_PLUGIN_CONTENT_TYPE = "application/json";
\r
78 protected static final String DMAAP_MR_SINK_MESSAGE_COLUMN_NAME = "message";
\r
79 protected static final Integer DMAAP_MR_SINK_PLUGIN_MAX_BATCH_SIZE = 10;
\r
80 protected static final Integer DMAAP_MR_SINK_PLUGIN_MAX_RECOVERY_QUEUE_SIZE = 100;
\r
82 protected static final String VES_MESSAGE_FIELD_NAME = "message";
\r
83 protected static final String TCA_PLUGIN_ALERT_FIELD_NAME = "alert";
\r
84 protected static final String TCA_PLUGIN_MESSAGE_TYPE_FIELD_NAME = "tcaMessageType";
\r
87 protected static final String JSON_PATH_FILTER_PLUGIN_REFERENCE_NAME = "JsonPathFilter";
\r
88 protected static final String JSON_PATH_FILTER_PLUGIN_INCOMING_JSON_FIELD_NAME = "message";
\r
89 protected static final String JSON_PATH_FILTER_PLUGIN_OUTPUT_SCHEMA_FILED_NAME = "filterMatched";
\r
90 protected static final String JSON_PATH_FILTER_PLUGIN_JSON_FILTER_MAPPINGS =
\r
91 "$.event.commonEventHeader.domain:measurementsForVfScaling," +
\r
92 "$.event.commonEventHeader.eventName:vLoadBalancer;vFirewall";
\r
93 protected static final String JSON_PATH_FILTER_PLUGIN_JSON_FILTER_OUTPUT_SCHEMA =
\r
94 "{\"type\":\"record\"," +
\r
95 "\"name\":\"etlSchemaBody\",\"fields\":" +
\r
97 "{\"name\":\"ts\",\"type\":\"long\"}," +
\r
98 "{\"name\":\"filterMatched\",\"type\":[\"boolean\",\"null\"]}," +
\r
99 "{\"name\":\"responseCode\",\"type\":\"int\"}," +
\r
100 "{\"name\":\"responseMessage\",\"type\":\"string\"}," +
\r
101 "{\"name\":\"message\",\"type\":\"string\"}" +
\r
105 protected static class MockStageMetrics implements StageMetrics, Serializable {
\r
108 public void count(String metricName, int delta) {
\r
109 LOG.debug("Mocking metric count, MetricName: {}, Delta: {}", metricName, delta);
\r
113 public void gauge(String metricName, long value) {
\r
114 LOG.debug("Mocking metric guage, MetricName: {}, Value: {}", metricName, value);
\r
118 public void pipelineCount(String metricName, int delta) {
\r
119 LOG.debug("Mocking metric pipelineCount, MetricName: {}, Delta: {}", metricName, delta);
\r
123 public void pipelineGauge(String metricName, long value) {
\r
124 LOG.debug("Mocking metric guage, pipelineGauge: {}, Value: {}", metricName, value);
\r
128 protected static TestDMaaPMRSourcePluginConfig getTestDMaaPMRSourcePluginConfig() {
\r
129 final TestDMaaPMRSourcePluginConfig sourcePluginConfig = new TestDMaaPMRSourcePluginConfig();
\r
130 sourcePluginConfig.setReferenceName(DMAAP_MR_SOURCE_PLUGIN_REFERENCE_NAME);
\r
131 sourcePluginConfig.setHostName(DMAAP_MR_SOURCE_PLUGIN_HOST_NAME);
\r
132 sourcePluginConfig.setPortNumber(DMAAP_MR_SOURCE_PLUGIN_PORT_NUMBER);
\r
133 sourcePluginConfig.setTopicName(DMAAP_MR_SOURCE_PLUGIN_TOPIC_NAME);
\r
134 sourcePluginConfig.setPollingInterval(DMAAP_MR_SOURCE_PLUGIN_POLLING_INTERVAL);
\r
135 sourcePluginConfig.setProtocol(DMAAP_MR_SOURCE_PLUGIN_PROTOCOL);
\r
136 sourcePluginConfig.setUserName(DMAAP_MR_SOURCE_PLUGIN_USERNAME);
\r
137 sourcePluginConfig.setUserPassword(DMAAP_MR_SOURCE_PLUGIN_PASSWORD);
\r
138 sourcePluginConfig.setContentType(DMAAP_MR_SOURCE_PLUGIN_CONTENT_TYPE);
\r
139 sourcePluginConfig.setConsumerGroup(DMAAP_MR_SOURCE_PLUGIN_CONSUMER_GROUP);
\r
140 sourcePluginConfig.setConsumerId(DMAAP_MR_SOURCE_PLUGIN_CONSUMER_ID);
\r
141 sourcePluginConfig.setMessageLimit(DMAAP_MR_SOURCE_PLUGIN_MESSAGE_LIMIT);
\r
142 sourcePluginConfig.setTimeoutMS(DMAAP_MR_SOURCE_PLUGIN_TIMEOUT);
\r
143 return sourcePluginConfig;
\r
146 protected static TestDMaaPMRSinkPluginConfig getTestDMaaPMRSinkPluginConfig() {
\r
147 final TestDMaaPMRSinkPluginConfig sinkPluginConfig = new TestDMaaPMRSinkPluginConfig();
\r
148 sinkPluginConfig.setReferenceName(DMAAP_MR_SINK_PLUGIN_REFERENCE_NAME);
\r
149 sinkPluginConfig.setHostName(DMAAP_MR_SINK_PLUGIN_HOST_NAME);
\r
150 sinkPluginConfig.setPortNumber(DMAAP_MR_SINK_PLUGIN_PORT_NUMBER);
\r
151 sinkPluginConfig.setTopicName(DMAAP_MR_SINK_PLUGIN_TOPIC_NAME);
\r
152 sinkPluginConfig.setProtocol(DMAAP_MR_SINK_PLUGIN_PROTOCOL);
\r
153 sinkPluginConfig.setUserName(DMAAP_MR_SINK_PLUGIN_USERNAME);
\r
154 sinkPluginConfig.setUserPassword(DMAAP_MR_SINK_PLUGIN_PASSWORD);
\r
155 sinkPluginConfig.setContentType(DMAAP_MR_SINK_PLUGIN_CONTENT_TYPE);
\r
156 sinkPluginConfig.setMessageColumnName(DMAAP_MR_SINK_MESSAGE_COLUMN_NAME);
\r
157 sinkPluginConfig.setMaxBatchSize(DMAAP_MR_SINK_PLUGIN_MAX_BATCH_SIZE);
\r
158 sinkPluginConfig.setMaxRecoveryQueueSize(DMAAP_MR_SINK_PLUGIN_MAX_RECOVERY_QUEUE_SIZE);
\r
159 return sinkPluginConfig;
\r
163 protected static Configuration getTestConfiguration() {
\r
164 final Configuration configuration = new Configuration();
\r
165 final Map<String, String> sinkConfigurationMap = createSinkConfigurationMap();
\r
166 for (Map.Entry<String, String> property : sinkConfigurationMap.entrySet()) {
\r
167 configuration.set(property.getKey(), property.getValue());
\r
169 return configuration;
\r
172 protected static Map<String, String> createSinkConfigurationMap() {
\r
174 Map<String, String> sinkConfig = new LinkedHashMap<>();
\r
175 sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.HOST_NAME, DMAAP_MR_SINK_PLUGIN_HOST_NAME);
\r
176 sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.TOPIC_NAME, DMAAP_MR_SINK_PLUGIN_TOPIC_NAME);
\r
177 sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.PORT_NUMBER,
\r
178 DMAAP_MR_SINK_PLUGIN_PORT_NUMBER.toString());
\r
179 sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.PROTOCOL, DMAAP_MR_SINK_PLUGIN_PROTOCOL);
\r
180 sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.USER_NAME, DMAAP_MR_SINK_PLUGIN_USERNAME);
\r
181 sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.USER_PASS, DMAAP_MR_SINK_PLUGIN_PASSWORD);
\r
182 sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.CONTENT_TYPE,
\r
183 DMAAP_MR_SINK_PLUGIN_CONTENT_TYPE);
\r
184 sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.MAX_BATCH_SIZE,
\r
185 DMAAP_MR_SINK_PLUGIN_MAX_BATCH_SIZE.toString());
\r
186 sinkConfig.put(CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields.MAX_RECOVER_QUEUE_SIZE,
\r
187 DMAAP_MR_SINK_PLUGIN_MAX_RECOVERY_QUEUE_SIZE.toString());
\r
191 protected static Schema getDMaaPMRSinkTestSchema() {
\r
192 return Schema.recordOf(
\r
193 "DMaaPMRSinkTestSchema",
\r
194 Schema.Field.of("message", Schema.of(Schema.Type.STRING)),
\r
195 Schema.Field.of("field1", Schema.of(Schema.Type.STRING))
\r
200 protected static TestSimpleTCAPluginConfig getTestSimpleTCAPluginConfig() {
\r
201 final String policyJson;
\r
203 policyJson = fromStream(TCA_POLICY_JSON_FILE_LOCATION);
\r
204 } catch (IOException e) {
\r
205 throw new RuntimeException("Error while parsing policy", e);
\r
207 return new TestSimpleTCAPluginConfig(VES_MESSAGE_FIELD_NAME, policyJson, TCA_PLUGIN_ALERT_FIELD_NAME,
\r
208 TCA_PLUGIN_MESSAGE_TYPE_FIELD_NAME, getSimpleTCAPluginInputSchema().toString(), false);
\r
211 protected static Schema getSimpleTCAPluginInputSchema() {
\r
212 return Schema.recordOf(
\r
213 "TestSimpleTCAPluginInputSchema",
\r
214 Schema.Field.of("message", Schema.of(Schema.Type.STRING)),
\r
215 Schema.Field.of("inputField1", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
\r
216 Schema.Field.of("inputField2", Schema.nullableOf(Schema.of(Schema.Type.STRING)))
\r
220 protected static Schema getJsonFilterPluginInputSchema() {
\r
221 return Schema.recordOf(
\r
222 "TestJsonFilterInputSchema",
\r
223 Schema.Field.of("ts", Schema.of(Schema.Type.LONG)),
\r
224 Schema.Field.of("responseCode", Schema.of(Schema.Type.INT)),
\r
225 Schema.Field.of("responseMessage", Schema.of(Schema.Type.STRING)),
\r
226 Schema.Field.of("message", Schema.of(Schema.Type.STRING))
\r
230 protected static TestJsonPathFilterPluginConfig getJsonPathFilterPluginConfig() {
\r
231 return new TestJsonPathFilterPluginConfig(JSON_PATH_FILTER_PLUGIN_REFERENCE_NAME,
\r
232 JSON_PATH_FILTER_PLUGIN_INCOMING_JSON_FIELD_NAME,
\r
233 JSON_PATH_FILTER_PLUGIN_OUTPUT_SCHEMA_FILED_NAME,
\r
234 JSON_PATH_FILTER_PLUGIN_JSON_FILTER_MAPPINGS,
\r
235 JSON_PATH_FILTER_PLUGIN_JSON_FILTER_OUTPUT_SCHEMA);
\r