re base code
[sdc.git] / utils / DmaapPublisher / src / main / java / org / openecomp / sdc / dmaap / DmaapPublishTool.java
1 package org.openecomp.sdc.dmaap;
2
3 import com.att.nsa.mr.client.MRBatchingPublisher;
4 import com.att.nsa.mr.client.MRClientFactory;
5 import com.att.nsa.mr.client.MRPublisher.message;
6 import org.apache.commons.lang3.StringUtils;
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9 import org.yaml.snakeyaml.Yaml;
10
11 import java.util.concurrent.TimeUnit;
12
13 public class DmaapPublishTool {
14
15     private static final Logger logger = LoggerFactory.getLogger(DmaapPublishTool.class);   
16     final private TopicConfig topicConfig;
17
18     public DmaapPublishTool(String yamlPath) throws FileNotFoundException {
19         topicConfig = loadTopicConfig(yamlPath);
20         System.out.println("yaml file loaded.");
21     }
22     public DmaapPublishTool(String yamlPath , String notifications) throws FileNotFoundException {
23         topicConfig = loadTopicConfig(yamlPath);
24         if (StringUtils.isNotBlank(notifications) )
25             topicConfig.add( notifications );
26         System.out.println("yaml file loaded.");
27     }
28
29     public void addNotifications(Collection<String> notification){
30         topicConfig.addAll( notification );
31     }
32
33     //safe stream doesn't throw null pointer exception
34     public <T> Collection<T> safe(Collection<T> obj){
35         return Optional.ofNullable(obj).orElse(Collections.emptySet());
36     }
37     public <T> List<T> safe(List<T> obj){
38         return Optional.ofNullable(obj).orElse(Collections.emptyList());
39     }
40
41     public void publish(String path) throws IOException, InterruptedException {
42         MRBatchingPublisher pub = createPublisher( topicConfig, path );
43         System.out.println( "pending message count -> "+pub.getPendingMessageCount() );
44         List<String> list = this.topicConfig.getIncomingTopicMessages();
45         for(String msg : safe(list) ){
46                 publishOne( pub , msg );
47         }
48         closePublisher(pub);
49     }
50
51     private MRBatchingPublisher createPublisher(TopicConfig topicConfig,String path) throws IOException {
52         MRBatchingPublisher publisher = MRClientFactory.createBatchingPublisher(Objects.requireNonNull(Util.toPath(path,topicConfig.getPublisherPropertiesFilePath())));
53         System.out.println("publisher created.");
54         return publisher;
55     }
56
57     private TopicConfig loadTopicConfig(String yamlPath) throws FileNotFoundException {
58         File yamlFile = new File(Objects.requireNonNull(yamlPath));
59         InputStream input = new FileInputStream(yamlFile);
60         Yaml yamlHelper = new Yaml();
61         return yamlHelper.loadAs(input, TopicConfig.class);
62     }
63
64     private void publishOne(MRBatchingPublisher pub, String msg) throws IOException, InterruptedException {
65         System.out.println("sending:    " + msg);
66         pub.send(msg);
67         System.out.println("message sent.");
68     }
69
70     private void closePublisher(MRBatchingPublisher pub) throws IOException, InterruptedException {
71         System.out.println("closing publisher...");
72         // close the publisher to make sure everything's sent before exiting. The batching
73         // publisher interface allows the app to get the set of unsent messages. It could
74         // write them to disk, for example, to try to send them later.
75         final List<message> stuck = pub.close(20, TimeUnit.SECONDS);
76         if(!stuck.isEmpty())
77         {
78             final String errMsg = stuck.size() + " messages unsent";
79             logger.error(errMsg);
80             System.err.println(errMsg);
81         }
82         else
83         {
84             final String successMsg = "Clean exit; all messages sent.";
85             logger.info(successMsg);
86             System.out.println(successMsg);
87         }
88     }
89 }