1 package org.openecomp.sdc.be.components.distribution.engine;
3 import com.att.nsa.mr.client.MRConsumer;
4 import com.google.gson.GsonBuilder;
5 import org.junit.After;
7 import org.junit.runner.RunWith;
8 import org.openecomp.sdc.be.config.ConfigurationManager;
9 import org.openecomp.sdc.be.config.DmaapConsumerConfiguration;
10 import org.openecomp.sdc.common.api.ConfigurationSource;
11 import org.openecomp.sdc.common.impl.ExternalConfiguration;
12 import org.openecomp.sdc.common.impl.FSConfigurationSource;
13 import org.springframework.beans.factory.annotation.Autowired;
14 import org.springframework.test.context.ContextConfiguration;
15 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
17 import java.util.concurrent.ExecutorService;
18 import java.util.stream.IntStream;
20 @RunWith(SpringJUnit4ClassRunner.class)
21 @ContextConfiguration("classpath:application-context-test.xml")
22 public class Dev2DevDmaapConsumerTest {
24 private ExecutorFactory executorFactory;
26 private DmaapClientFactory dmaapClientFactory;
28 static ExecutorService notificationExecutor;
30 static ConfigurationSource configurationSource = new FSConfigurationSource(ExternalConfiguration.getChangeListener(), "src/test/resources/config/catalog-be");
31 static ConfigurationManager configurationManager = new ConfigurationManager(configurationSource);
34 public void runConsumer() throws Exception{
35 boolean isRunConsumer = false ; //change this to true if you wish to run consumer,default should be false
39 System.out.println( "CONSUMER TEST is disabled!!!! ");
45 public void consumeDmaapTopic() throws Exception {
46 Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
48 public void uncaughtException(Thread t, Throwable e) {
49 System.out.println("uncaughtException -> ");
53 DmaapConsumerConfiguration dmaapConsumerParams = configurationManager.getConfiguration().getDmaapConsumerConfiguration();
54 String topic = dmaapConsumerParams.getTopic();
55 System.out.println(String.format( "Starting to consume topic %s for DMAAP consumer with the next parameters %s. ", topic, dmaapConsumerParams) );
56 MRConsumer consumer = dmaapClientFactory.create( dmaapConsumerParams );
57 notificationExecutor = executorFactory.create(topic + "Consumer", handler);
59 IntStream.range(0,LIMIT).forEach( i -> {
60 System.out.println("Trying to fetch messages from topic: "+ topic);
62 Iterable<String> messages = consumer.fetch();
63 if (messages != null) {
64 for (String msg : messages) {
65 System.out.println(String.format( "The DMAAP message %s received. The topic is %s.", msg, topic) );
71 System.out.println("The exception occured upon fetching DMAAP message "+ e);
78 private void handleMessage(String msg){
80 DmaapNotificationDataImpl notificationData = new GsonBuilder().create().fromJson(msg,DmaapNotificationDataImpl.class);
81 System.out.println( "successfully parsed notification for environemnt "+notificationData.getOperationalEnvironmentId());
83 System.out.println( "failed to parse notification");
88 if (notificationExecutor!=null && !notificationExecutor.isTerminated())
89 notificationExecutor.shutdown();