Sync Integ to Master
[sdc.git] / utils / DmaapPublisher / src / main / java / org / openecomp / sdc / dmaap / DmaapPublishTool.java
1 package org.openecomp.sdc.dmaap;
2
3 import java.io.File;
4 import java.io.FileInputStream;
5 import java.io.FileNotFoundException;
6 import java.io.IOException;
7 import java.io.InputStream;
8 import java.util.*;
9 import java.util.concurrent.TimeUnit;
10 import java.util.stream.Stream;
11
12 import org.apache.commons.lang3.StringUtils;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15 import org.yaml.snakeyaml.Yaml;
16
17 import com.att.nsa.mr.client.MRBatchingPublisher;
18 import com.att.nsa.mr.client.MRClientFactory;
19 import com.att.nsa.mr.client.MRPublisher.message;
20
21 public class DmaapPublishTool {
22
23     private static final Logger logger = LoggerFactory.getLogger(DmaapPublishTool.class);   
24     final private TopicConfig topicConfig;
25
26     public DmaapPublishTool(String yamlPath) throws FileNotFoundException {
27         topicConfig = loadTopicConfig(yamlPath);
28         System.out.println("yaml file loaded.");
29     }
30     public DmaapPublishTool(String yamlPath , String notifications) throws FileNotFoundException {
31         topicConfig = loadTopicConfig(yamlPath);
32         if (StringUtils.isNotBlank(notifications) )
33             topicConfig.add( notifications );
34         System.out.println("yaml file loaded.");
35     }
36
37     public void addNotifications(Collection<String> notification){
38         topicConfig.addAll( notification );
39     }
40
41     //safe stream doesn't throw null pointer exception
42     public <T> Collection<T> safe(Collection<T> obj){
43         return Optional.ofNullable(obj).orElse(Collections.emptySet());
44     }
45     public <T> List<T> safe(List<T> obj){
46         return Optional.ofNullable(obj).orElse(Collections.emptyList());
47     }
48
49     public void publish(String path) throws IOException, InterruptedException {
50         MRBatchingPublisher pub = createPublisher( topicConfig, path );
51         System.out.println( "pending message count -> "+pub.getPendingMessageCount() );
52         List<String> list = this.topicConfig.getIncomingTopicMessages();
53         for(String msg : safe(list) ){
54                 publishOne( pub , msg );
55         }
56         closePublisher(pub);
57     }
58
59     private MRBatchingPublisher createPublisher(TopicConfig topicConfig,String path) throws IOException {
60         MRBatchingPublisher publisher = MRClientFactory.createBatchingPublisher(Objects.requireNonNull(Util.toPath(path,topicConfig.getPublisherPropertiesFilePath())));
61         System.out.println("publisher created.");
62         return publisher;
63     }
64
65     private TopicConfig loadTopicConfig(String yamlPath) throws FileNotFoundException {
66         File yamlFile = new File(Objects.requireNonNull(yamlPath));
67         InputStream input = new FileInputStream(yamlFile);
68         Yaml yamlHelper = new Yaml();
69         return yamlHelper.loadAs(input, TopicConfig.class);
70     }
71
72     private void publishOne(MRBatchingPublisher pub, String msg) throws IOException, InterruptedException {
73         System.out.println("sending:    " + msg);
74         pub.send(msg);
75         System.out.println("message sent.");
76     }
77
78     private void closePublisher(MRBatchingPublisher pub) throws IOException, InterruptedException {
79         System.out.println("closing publisher...");
80         // close the publisher to make sure everything's sent before exiting. The batching
81         // publisher interface allows the app to get the set of unsent messages. It could
82         // write them to disk, for example, to try to send them later.
83         final List<message> stuck = pub.close(20, TimeUnit.SECONDS);
84         if(!stuck.isEmpty())
85         {
86             final String errMsg = stuck.size() + " messages unsent";
87             logger.error(errMsg);
88             System.err.println(errMsg);
89         }
90         else
91         {
92             final String successMsg = "Clean exit; all messages sent.";
93             logger.info(successMsg);
94             System.out.println(successMsg);
95         }
96     }
97 }