Initial TCA commit into DCAEGEN2
[dcaegen2/analytics/tca.git] / dcae-analytics-it / src / test / java / org / openecomp / dcae / apod / analytics / it / plugins / DMaaPMRSourcePluginIT.java
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.openecomp.dcae.apod.analytics.it.plugins;
22
23 import co.cask.cdap.api.data.format.StructuredRecord;
24 import co.cask.cdap.api.dataset.table.Table;
25 import co.cask.cdap.api.plugin.PluginClass;
26 import co.cask.cdap.api.plugin.PluginPropertyField;
27 import co.cask.cdap.common.utils.Tasks;
28 import co.cask.cdap.datastreams.DataStreamsApp;
29 import co.cask.cdap.datastreams.DataStreamsSparkLauncher;
30 import co.cask.cdap.etl.api.streaming.StreamingSource;
31 import co.cask.cdap.etl.mock.batch.MockSink;
32 import co.cask.cdap.etl.proto.v2.DataStreamsConfig;
33 import co.cask.cdap.etl.proto.v2.ETLPlugin;
34 import co.cask.cdap.etl.proto.v2.ETLStage;
35 import co.cask.cdap.proto.artifact.AppRequest;
36 import co.cask.cdap.proto.artifact.ArtifactSummary;
37 import co.cask.cdap.proto.id.ApplicationId;
38 import co.cask.cdap.proto.id.ArtifactId;
39 import co.cask.cdap.proto.id.NamespaceId;
40 import co.cask.cdap.test.ApplicationManager;
41 import co.cask.cdap.test.DataSetManager;
42 import co.cask.cdap.test.SparkManager;
43 import com.google.common.base.Joiner;
44 import com.google.common.collect.ImmutableSet;
45 import org.junit.AfterClass;
46 import org.junit.Assert;
47 import org.junit.BeforeClass;
48 import org.junit.Test;
49 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig;
50 import org.openecomp.dcae.apod.analytics.cdap.plugins.streaming.dmaap.DMaaPMRReceiver;
51 import org.openecomp.dcae.apod.analytics.cdap.plugins.streaming.dmaap.DMaaPMRSource;
52 import org.openecomp.dcae.apod.analytics.it.dmaap.DMaaPMRCreator;
53 import org.openecomp.dcae.apod.analytics.it.module.AnalyticsITInjectorSource;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 import java.util.HashMap;
58 import java.util.LinkedList;
59 import java.util.List;
60 import java.util.Map;
61 import java.util.UUID;
62 import java.util.concurrent.Callable;
63 import java.util.concurrent.TimeUnit;
64
65 /**
66  * Performs integration testing on DMaaP source plugin , where 2 sample messages are posted and verified
67  * <p/>
68  * @author Manjesh Gowda. Creation Date: 2/3/2017.
69  */
70 public class DMaaPMRSourcePluginIT extends BaseAnalyticsPluginsIT {
71
72     private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRSourcePluginIT.class);
73     protected static final ArtifactId DATASTREAMS_ARTIFACT_ID = NamespaceId.DEFAULT.artifact("data-streams", "3.2.0");
74     protected static final ArtifactSummary DATASTREAMS_ARTIFACT = new ArtifactSummary("data-streams", "3.2.0");
75
76     /**
77      * Streaming artifacts are added to the hydrator pipeline. Important. Make sure you explicitly add all the custom
78      * class that you have written in the plugin artifact, if not you will get incompatible type error
79      *
80      * @throws Exception
81      */
82     @BeforeClass
83     public static void setupTest() throws Exception {
84         setupStreamingArtifacts(DATASTREAMS_ARTIFACT_ID, DataStreamsApp.class);
85
86 //        Set<ArtifactRange> parents = ImmutableSet.of(
87 //                new ArtifactRange(NamespaceId.DEFAULT.toId(), DATASTREAMS_ARTIFACT_ID.getArtifact(),
88 //                        new ArtifactVersion(DATASTREAMS_ARTIFACT_ID.getVersion()), true,
89 //                        new ArtifactVersion(DATASTREAMS_ARTIFACT_ID.getVersion()), true)
90 //        );
91
92         ArtifactId dcaeAnalyticsCdapPluginsArtifact = NamespaceId.DEFAULT.artifact(
93                 "dcae-analytics-cdap-plugins", "2.0-SNAPSHOT");
94         addPluginArtifact(dcaeAnalyticsCdapPluginsArtifact, DATASTREAMS_ARTIFACT_ID, ImmutableSet.of(getPluginClass()),
95                 DMaaPMRSource.class, DMaaPMRSourcePluginConfig.class, DMaaPMRReceiver.class);
96
97 //        addPluginArtifact(NamespaceId.DEFAULT.artifact("spark-plugins", "1.0.0"), parents,
98 //                DMaaPMRSource.class, DMaaPMRReceiver.class, DMaaPMRSourcePluginConfig.class);
99     }
100
101     private static PluginClass getPluginClass() {
102         HashMap<String, PluginPropertyField> properties = new HashMap<>();
103         properties.put("referenceName", new PluginPropertyField("referenceName", "", "string", false, false));
104         properties.put("hostName", new PluginPropertyField("hostName", "", "string", false, false));
105         properties.put("topicName", new PluginPropertyField("topicName", "", "string", false, false));
106         properties.put("protocol", new PluginPropertyField("protocol", "", "string", false, false));
107         properties.put("userName", new PluginPropertyField("userName", "", "string", false, false));
108         properties.put("userPassword", new PluginPropertyField("userPassword", "", "string", false, false));
109         properties.put("contentType", new PluginPropertyField("contentType", "", "string", false, false));
110         properties.put("consumerId", new PluginPropertyField("consumerId", "", "string", false, false));
111         properties.put("consumerGroup", new PluginPropertyField("consumerGroup", "", "string", false, false));
112
113         properties.put("portNumber", new PluginPropertyField("portNumber", "", "long", false, false));
114         properties.put("timeoutMS", new PluginPropertyField("timeoutMS", "", "long", false, false));
115         properties.put("messageLimit", new PluginPropertyField("messageLimit", "", "long", false, false));
116         properties.put("pollingInterval", new PluginPropertyField("pollingInterval", "", "long", false, false));
117
118         return new PluginClass("streamingsource", "DMaaPMRSource", "", DMaaPMRSource.class.getName(),
119                 "pluginConfig", properties);
120     }
121
122     @AfterClass
123     public static void cleanup() {
124     }
125
126     /**
127      * Build a pipeline with a mock-sink. After that publish coupe of messages to the subscriber topic, and verify in
128      * the mock sink
129      *
130      * @throws Exception
131      */
132     @Test
133     public void testDMaaPMRSourcePlugin() throws Exception {
134         AnalyticsITInjectorSource analyticsITInjectorSource = new AnalyticsITInjectorSource();
135
136         final DMaaPMRCreator dMaaPMRCreator = analyticsITInjectorSource.getInjector().getInstance(DMaaPMRCreator.class);
137         Map<String, String> dmaapSourceProperties = dMaaPMRCreator.getDMaaPMRSubscriberConfig();
138         dmaapSourceProperties.put("consumerId", UUID.randomUUID().toString().replace("-", ""));
139         dmaapSourceProperties.put("consumerGroup", UUID.randomUUID().toString().replace("-", ""));
140         final String subscriberTopicName = dmaapSourceProperties.get("topicName");
141
142         DataStreamsConfig dmaaPMRSourcePipeline = DataStreamsConfig.builder()
143                 .addStage(new ETLStage("source", new ETLPlugin(
144                         "DMaaPMRSource", StreamingSource.PLUGIN_TYPE, dmaapSourceProperties, null)))
145                 .addStage(new ETLStage("sink", MockSink.getPlugin("dmaapOutput")))
146                 .addConnection("source", "sink")
147                 .setBatchInterval("20s")
148                 .build();
149
150         AppRequest<DataStreamsConfig> appRequest = new AppRequest<>(DATASTREAMS_ARTIFACT, dmaaPMRSourcePipeline);
151         ApplicationId appId = NamespaceId.DEFAULT.app("DMaaPMRSourceIntegrationTestingApp");
152         ApplicationManager appManager = deployApplication(appId.toId(), appRequest);
153
154         SparkManager sparkManager = appManager.getSparkManager(DataStreamsSparkLauncher.NAME);
155         sparkManager.start();
156         sparkManager.waitForStatus(true, 1, 100);
157
158         final DataSetManager<Table> outputManager = getDataset("dmaapOutput");
159         final List<String> dmaapContents = new LinkedList<>();
160
161         // Publish message to source
162
163         new Thread() {
164             @Override
165             public void run() {
166                 try {
167                     TimeUnit.MILLISECONDS.sleep(30000);
168                     dMaaPMRCreator.getDMaaPMRPublisherWithTopicName(subscriberTopicName).publish(getTwoSampleMessage());
169                 } catch (InterruptedException e) {
170                     e.printStackTrace();
171                 }
172             }
173         }.start();
174
175         try {
176             Tasks.waitFor(true, new Callable<Boolean>() {
177                         boolean initialized = false;
178
179                         @Override
180                         public Boolean call() throws Exception {
181                             try {
182                                 outputManager.flush();
183                                 for (StructuredRecord record : MockSink.readOutput(outputManager)) {
184                                     dmaapContents.add((String) record.get("message"));
185                                 }
186                                 return dmaapContents.size() >= 2;
187                             } catch (Exception e) {
188                                 e.printStackTrace();
189                                 return false;
190                             }
191                         }
192                     },
193                     90, TimeUnit.SECONDS);
194         } catch (Exception e) {
195             e.printStackTrace();
196         }
197         sparkManager.stop();
198
199         Assert.assertTrue(dmaapContents.size() == 2);
200         String allMessages = Joiner.on(",").join(dmaapContents);
201         Assert.assertTrue(allMessages.contains("Message 1"));
202         Assert.assertTrue(allMessages.contains("Message 2"));
203     }
204 }