Sync Integ to Master
[sdc.git] / catalog-be / src / test / java / org / openecomp / sdc / be / components / distribution / engine / Dev2DevDmaapConsumerTest.java
1 package org.openecomp.sdc.be.components.distribution.engine;
2
3 import com.att.nsa.mr.client.MRConsumer;
4 import com.google.gson.GsonBuilder;
5 import org.junit.After;
6 import org.junit.Test;
7 import org.junit.runner.RunWith;
8 import org.openecomp.sdc.be.config.ConfigurationManager;
9 import org.openecomp.sdc.be.config.DmaapConsumerConfiguration;
10 import org.openecomp.sdc.common.api.ConfigurationSource;
11 import org.openecomp.sdc.common.impl.ExternalConfiguration;
12 import org.openecomp.sdc.common.impl.FSConfigurationSource;
13 import org.springframework.beans.factory.annotation.Autowired;
14 import org.springframework.test.context.ContextConfiguration;
15 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
16
17 import java.util.concurrent.ExecutorService;
18 import java.util.stream.IntStream;
19
20 @RunWith(SpringJUnit4ClassRunner.class)
21 @ContextConfiguration("classpath:application-context-test.xml")
22 public class Dev2DevDmaapConsumerTest {
23     @Autowired
24     private ExecutorFactory executorFactory;
25     @Autowired
26     private DmaapClientFactory dmaapClientFactory;
27
28     static ExecutorService notificationExecutor;
29
30     static ConfigurationSource configurationSource = new FSConfigurationSource(ExternalConfiguration.getChangeListener(), "src/test/resources/config/catalog-be");
31     static ConfigurationManager configurationManager = new ConfigurationManager(configurationSource);
32
33     @Test
34     public void runConsumer() throws Exception{
35         boolean isRunConsumer = false ;  //change this to true if you wish to run consumer,default should be false
36         if ( isRunConsumer ){
37             consumeDmaapTopic();
38         }else{
39             System.out.println( "CONSUMER TEST is disabled!!!! ");
40         }
41         assert true;
42     }
43     //@Ignore
44     //@Test
45     public void consumeDmaapTopic() throws Exception {
46         Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
47             @Override
48             public void uncaughtException(Thread t, Throwable e) {
49                 System.out.println("uncaughtException -> ");
50             }
51         };
52
53         DmaapConsumerConfiguration dmaapConsumerParams = configurationManager.getConfiguration().getDmaapConsumerConfiguration();
54         String topic = dmaapConsumerParams.getTopic();
55         System.out.println(String.format( "Starting to consume topic %s for DMAAP consumer with the next parameters %s. ", topic, dmaapConsumerParams) );
56         MRConsumer consumer = dmaapClientFactory.create( dmaapConsumerParams );
57         notificationExecutor = executorFactory.create(topic + "Consumer", handler);
58         final int LIMIT = 2;
59         IntStream.range(0,LIMIT).forEach( i -> {
60             System.out.println("Trying to fetch messages from topic: "+ topic);
61             try {
62                 Iterable<String> messages = consumer.fetch();
63                 if (messages != null) {
64                     for (String msg : messages) {
65                         System.out.println(String.format( "The DMAAP message %s received. The topic is %s.", msg, topic) );
66                         handleMessage(msg);
67                     }
68                 }
69             }
70             catch (Exception e) {
71                 System.out.println("The exception occured upon fetching DMAAP message "+ e);
72             }
73         }
74         );
75
76
77     }
78     private void handleMessage(String msg){
79         try{
80             DmaapNotificationDataImpl notificationData = new GsonBuilder().create().fromJson(msg,DmaapNotificationDataImpl.class);
81             System.out.println( "successfully parsed notification for environemnt "+notificationData.getOperationalEnvironmentId());
82         }catch (Exception e){
83             System.out.println( "failed to parse notification");
84         }
85     }
86     @After
87     public void after(){
88         if (notificationExecutor!=null && !notificationExecutor.isTerminated())
89             notificationExecutor.shutdown();
90     }
91 }