1 package org.openecomp.sdc.be.components.distribution.engine;
3 import java.util.concurrent.ExecutorService;
4 import java.util.stream.IntStream;
6 import org.junit.After;
8 import org.junit.runner.RunWith;
9 import org.openecomp.sdc.be.config.ConfigurationManager;
10 import org.openecomp.sdc.be.config.DmaapConsumerConfiguration;
11 import org.openecomp.sdc.common.api.ConfigurationSource;
12 import org.openecomp.sdc.common.impl.ExternalConfiguration;
13 import org.openecomp.sdc.common.impl.FSConfigurationSource;
14 import org.springframework.beans.factory.annotation.Autowired;
15 import org.springframework.test.context.ContextConfiguration;
16 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
18 import com.att.nsa.mr.client.MRConsumer;
19 import com.google.gson.GsonBuilder;
21 @RunWith(SpringJUnit4ClassRunner.class)
22 @ContextConfiguration("classpath:application-context-test.xml")
23 public class Dev2DevDmaapConsumerTest {
25 private ExecutorFactory executorFactory;
27 private DmaapClientFactory dmaapClientFactory;
29 static ExecutorService notificationExecutor;
31 static ConfigurationSource configurationSource = new FSConfigurationSource(ExternalConfiguration.getChangeListener(), "src/test/resources/config/catalog-be");
32 static ConfigurationManager configurationManager = new ConfigurationManager(configurationSource);
35 public void runConsumer() throws Exception{
36 boolean isRunConsumer = false ; //change this to true if you wish to run consumer,default should be false
40 System.out.println( "CONSUMER TEST is disabled!!!! ");
46 public void consumeDmaapTopic() throws Exception {
47 Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
49 public void uncaughtException(Thread t, Throwable e) {
50 System.out.println("uncaughtException -> ");
54 DmaapConsumerConfiguration dmaapConsumerParams = configurationManager.getConfiguration().getDmaapConsumerConfiguration();
55 String topic = dmaapConsumerParams.getTopic();
56 System.out.println(String.format( "Starting to consume topic %s for DMAAP consumer with the next parameters %s. ", topic, dmaapConsumerParams) );
57 MRConsumer consumer = dmaapClientFactory.create( dmaapConsumerParams );
58 notificationExecutor = executorFactory.create(topic + "Consumer", handler);
60 IntStream.range(0,LIMIT).forEach( i -> {
61 System.out.println("Trying to fetch messages from topic: "+ topic);
63 Iterable<String> messages = consumer.fetch();
64 if (messages != null) {
65 for (String msg : messages) {
66 System.out.println(String.format( "The DMAAP message %s received. The topic is %s.", msg, topic) );
72 System.out.println("The exception occured upon fetching DMAAP message "+ e);
79 private void handleMessage(String msg){
81 DmaapNotificationDataImpl notificationData = new GsonBuilder().create().fromJson(msg,DmaapNotificationDataImpl.class);
82 System.out.println( "successfully parsed notification for environemnt "+notificationData.getOperationalEnvironmentId());
84 System.out.println( "failed to parse notification");
89 if (notificationExecutor!=null && !notificationExecutor.isTerminated())
90 notificationExecutor.shutdown();