1 package org.openecomp.sdc.dmaap;
3 import com.att.nsa.mr.client.MRBatchingPublisher;
4 import com.att.nsa.mr.client.MRClientFactory;
5 import com.att.nsa.mr.client.MRPublisher.message;
6 import org.apache.commons.lang3.StringUtils;
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9 import org.yaml.snakeyaml.Yaml;
11 import java.util.concurrent.TimeUnit;
13 public class DmaapPublishTool {
15 private static final Logger logger = LoggerFactory.getLogger(DmaapPublishTool.class);
16 final private TopicConfig topicConfig;
18 public DmaapPublishTool(String yamlPath) throws FileNotFoundException {
19 topicConfig = loadTopicConfig(yamlPath);
20 System.out.println("yaml file loaded.");
22 public DmaapPublishTool(String yamlPath , String notifications) throws FileNotFoundException {
23 topicConfig = loadTopicConfig(yamlPath);
24 if (StringUtils.isNotBlank(notifications) )
25 topicConfig.add( notifications );
26 System.out.println("yaml file loaded.");
29 public void addNotifications(Collection<String> notification){
30 topicConfig.addAll( notification );
33 //safe stream doesn't throw null pointer exception
34 public <T> Collection<T> safe(Collection<T> obj){
35 return Optional.ofNullable(obj).orElse(Collections.emptySet());
37 public <T> List<T> safe(List<T> obj){
38 return Optional.ofNullable(obj).orElse(Collections.emptyList());
41 public void publish(String path) throws IOException, InterruptedException {
42 MRBatchingPublisher pub = createPublisher( topicConfig, path );
43 System.out.println( "pending message count -> "+pub.getPendingMessageCount() );
44 List<String> list = this.topicConfig.getIncomingTopicMessages();
45 for(String msg : safe(list) ){
46 publishOne( pub , msg );
51 private MRBatchingPublisher createPublisher(TopicConfig topicConfig,String path) throws IOException {
52 MRBatchingPublisher publisher = MRClientFactory.createBatchingPublisher(Objects.requireNonNull(Util.toPath(path,topicConfig.getPublisherPropertiesFilePath())));
53 System.out.println("publisher created.");
57 private TopicConfig loadTopicConfig(String yamlPath) throws FileNotFoundException {
58 File yamlFile = new File(Objects.requireNonNull(yamlPath));
59 InputStream input = new FileInputStream(yamlFile);
60 Yaml yamlHelper = new Yaml();
61 return yamlHelper.loadAs(input, TopicConfig.class);
64 private void publishOne(MRBatchingPublisher pub, String msg) throws IOException, InterruptedException {
65 System.out.println("sending: " + msg);
67 System.out.println("message sent.");
70 private void closePublisher(MRBatchingPublisher pub) throws IOException, InterruptedException {
71 System.out.println("closing publisher...");
72 // close the publisher to make sure everything's sent before exiting. The batching
73 // publisher interface allows the app to get the set of unsent messages. It could
74 // write them to disk, for example, to try to send them later.
75 final List<message> stuck = pub.close(20, TimeUnit.SECONDS);
78 final String errMsg = stuck.size() + " messages unsent";
80 System.err.println(errMsg);
84 final String successMsg = "Clean exit; all messages sent.";
85 logger.info(successMsg);
86 System.out.println(successMsg);