Added oparent to sdc main
[sdc.git] / utils / DmaapPublisher / src / main / java / org / openecomp / sdc / dmaap / DmaapPublishTool.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.dmaap;
22
23 import com.att.nsa.mr.client.MRBatchingPublisher;
24 import com.att.nsa.mr.client.MRClientFactory;
25 import com.att.nsa.mr.client.MRPublisher.message;
26 import org.apache.commons.lang3.StringUtils;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import org.yaml.snakeyaml.Yaml;
30
31 import java.util.concurrent.TimeUnit;
32
33 public class DmaapPublishTool {
34
35     private static final Logger logger = LoggerFactory.getLogger(DmaapPublishTool.class);   
36     final private TopicConfig topicConfig;
37
38     public DmaapPublishTool(String yamlPath) throws FileNotFoundException {
39         topicConfig = loadTopicConfig(yamlPath);
40         System.out.println("yaml file loaded.");
41     }
42     public DmaapPublishTool(String yamlPath , String notifications) throws FileNotFoundException {
43         topicConfig = loadTopicConfig(yamlPath);
44         if (StringUtils.isNotBlank(notifications) )
45             topicConfig.add( notifications );
46         System.out.println("yaml file loaded.");
47     }
48
49     public void addNotifications(Collection<String> notification){
50         topicConfig.addAll( notification );
51     }
52
53     //safe stream doesn't throw null pointer exception
54     public <T> Collection<T> safe(Collection<T> obj){
55         return Optional.ofNullable(obj).orElse(Collections.emptySet());
56     }
57     public <T> List<T> safe(List<T> obj){
58         return Optional.ofNullable(obj).orElse(Collections.emptyList());
59     }
60
61     public void publish(String path) throws IOException, InterruptedException {
62         MRBatchingPublisher pub = createPublisher( topicConfig, path );
63         System.out.println( "pending message count -> "+pub.getPendingMessageCount() );
64         List<String> list = this.topicConfig.getIncomingTopicMessages();
65         for(String msg : safe(list) ){
66                 publishOne( pub , msg );
67         }
68         closePublisher(pub);
69     }
70
71     private MRBatchingPublisher createPublisher(TopicConfig topicConfig,String path) throws IOException {
72         MRBatchingPublisher publisher = MRClientFactory.createBatchingPublisher(Objects.requireNonNull(Util.toPath(path,topicConfig.getPublisherPropertiesFilePath())));
73         System.out.println("publisher created.");
74         return publisher;
75     }
76
77     private TopicConfig loadTopicConfig(String yamlPath) throws FileNotFoundException {
78         File yamlFile = new File(Objects.requireNonNull(yamlPath));
79         InputStream input = new FileInputStream(yamlFile);
80         Yaml yamlHelper = new Yaml();
81         return yamlHelper.loadAs(input, TopicConfig.class);
82     }
83
84     private void publishOne(MRBatchingPublisher pub, String msg) throws IOException, InterruptedException {
85         System.out.println("sending:    " + msg);
86         pub.send(msg);
87         System.out.println("message sent.");
88     }
89
90     private void closePublisher(MRBatchingPublisher pub) throws IOException, InterruptedException {
91         System.out.println("closing publisher...");
92         // close the publisher to make sure everything's sent before exiting. The batching
93         // publisher interface allows the app to get the set of unsent messages. It could
94         // write them to disk, for example, to try to send them later.
95         final List<message> stuck = pub.close(20, TimeUnit.SECONDS);
96         if(!stuck.isEmpty())
97         {
98             final String errMsg = stuck.size() + " messages unsent";
99             logger.error(errMsg);
100             System.err.println(errMsg);
101         }
102         else
103         {
104             final String successMsg = "Clean exit; all messages sent.";
105             logger.info(successMsg);
106             System.out.println(successMsg);
107         }
108     }
109 }