Refactoring Consolidation Service
[sdc.git] / catalog-be / src / test / java / org / openecomp / sdc / be / components / distribution / engine / Dev2DevDmaapConsumerTest.java
1 package org.openecomp.sdc.be.components.distribution.engine;
2
3 import java.util.concurrent.ExecutorService;
4 import java.util.stream.IntStream;
5
6 import org.junit.After;
7 import org.junit.Test;
8 import org.junit.runner.RunWith;
9 import org.openecomp.sdc.be.config.ConfigurationManager;
10 import org.openecomp.sdc.be.config.DmaapConsumerConfiguration;
11 import org.openecomp.sdc.common.api.ConfigurationSource;
12 import org.openecomp.sdc.common.impl.ExternalConfiguration;
13 import org.openecomp.sdc.common.impl.FSConfigurationSource;
14 import org.springframework.beans.factory.annotation.Autowired;
15 import org.springframework.test.context.ContextConfiguration;
16 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
17
18 import com.att.nsa.mr.client.MRConsumer;
19 import com.google.gson.GsonBuilder;
20
21 @RunWith(SpringJUnit4ClassRunner.class)
22 @ContextConfiguration("classpath:application-context-test.xml")
23 public class Dev2DevDmaapConsumerTest {
24     @Autowired
25     private ExecutorFactory executorFactory;
26     @Autowired
27     private DmaapClientFactory dmaapClientFactory;
28
29     static ExecutorService notificationExecutor;
30
31     static ConfigurationSource configurationSource = new FSConfigurationSource(ExternalConfiguration.getChangeListener(), "src/test/resources/config/catalog-be");
32     static ConfigurationManager configurationManager = new ConfigurationManager(configurationSource);
33
34     @Test
35     public void runConsumer() throws Exception{
36         boolean isRunConsumer = false ;  //change this to true if you wish to run consumer,default should be false
37         if ( isRunConsumer ){
38             consumeDmaapTopic();
39         }else{
40             System.out.println( "CONSUMER TEST is disabled!!!! ");
41         }
42         assert true;
43     }
44     //@Ignore
45     //@Test
46     public void consumeDmaapTopic() throws Exception {
47         Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
48             @Override
49             public void uncaughtException(Thread t, Throwable e) {
50                 System.out.println("uncaughtException -> ");
51             }
52         };
53
54         DmaapConsumerConfiguration dmaapConsumerParams = configurationManager.getConfiguration().getDmaapConsumerConfiguration();
55         String topic = dmaapConsumerParams.getTopic();
56         System.out.println(String.format( "Starting to consume topic %s for DMAAP consumer with the next parameters %s. ", topic, dmaapConsumerParams) );
57         MRConsumer consumer = dmaapClientFactory.create( dmaapConsumerParams );
58         notificationExecutor = executorFactory.create(topic + "Consumer", handler);
59         final int LIMIT = 2;
60         IntStream.range(0,LIMIT).forEach( i -> {
61             System.out.println("Trying to fetch messages from topic: "+ topic);
62             try {
63                 Iterable<String> messages = consumer.fetch();
64                 if (messages != null) {
65                     for (String msg : messages) {
66                         System.out.println(String.format( "The DMAAP message %s received. The topic is %s.", msg, topic) );
67                         handleMessage(msg);
68                     }
69                 }
70             }
71             catch (Exception e) {
72                 System.out.println("The exception occured upon fetching DMAAP message "+ e);
73             }
74         }
75         );
76
77
78     }
79     private void handleMessage(String msg){
80         try{
81             DmaapNotificationDataImpl notificationData = new GsonBuilder().create().fromJson(msg,DmaapNotificationDataImpl.class);
82             System.out.println( "successfully parsed notification for environemnt "+notificationData.getOperationalEnvironmentId());
83         }catch (Exception e){
84             System.out.println( "failed to parse notification");
85         }
86     }
87     @After
88     public void after(){
89         if (notificationExecutor!=null && !notificationExecutor.isTerminated())
90             notificationExecutor.shutdown();
91     }
92 }