[POLICY-73] replace openecomp for policy-engine
[policy/engine.git] / PolicyEngineUtils / src / main / java / org / onap / policy / utils / BusPublisher.java
1 package org.onap.policy.utils;
2
3 import java.util.ArrayList;
4 import java.util.List;
5 import java.util.Properties;
6 import java.util.concurrent.TimeUnit;
7
8 import org.onap.policy.common.logging.eelf.PolicyLogger;
9
10 import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
11 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
12
13 public interface BusPublisher {
14         
15         /**
16          * sends a message
17          * 
18          * @param partition id
19          * @param message the message
20          * @return true if success, false otherwise
21          * @throws IllegalArgumentException if no message provided
22          */
23         public boolean send(String partitionId, String message) throws IllegalArgumentException;
24         
25         /**
26          * closes the publisher
27          */
28         public void close();
29         
30         /**
31          * DmaapClient library wrapper
32          */
33         public static class DmaapPublisherWrapper implements BusPublisher {
34                 /**
35                  * MR based Publisher
36                  */             
37                 protected MRSimplerBatchPublisher publisher;
38                 
39                 public DmaapPublisherWrapper(List<String> servers, String topic,
40                                                      String aafLogin,
41                                                      String aafPassword) {
42                         
43                         ArrayList<String> dmaapServers = new ArrayList<>();
44                         for (String server: servers) {
45                                 dmaapServers.add(server + ":3904");
46                         }
47                                         
48                         this.publisher = 
49                                 new MRSimplerBatchPublisher.Builder().
50                                                         againstUrls(dmaapServers).
51                                                         onTopic(topic).
52                                                         build();
53                         
54                         this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
55                         
56                         this.publisher.setUsername(aafLogin);
57                         this.publisher.setPassword(aafPassword);  
58                         
59                         Properties props = new Properties();
60                         props.setProperty("Protocol", "http");
61                         props.setProperty("contenttype", "application/json");
62                         
63                         this.publisher.setProps(props);
64                         
65                         this.publisher.setHost(servers.get(0));
66                         
67                         if (PolicyLogger.isInfoEnabled())
68                                 PolicyLogger.info(DmaapPublisherWrapper.class.getName(), 
69                                                           "CREATION: " + this);
70                 }
71
72                 /**
73                  * {@inheritDoc}
74                  */
75                 @Override
76                 public void close() {
77                         if (PolicyLogger.isInfoEnabled())
78                                 PolicyLogger.info(DmaapPublisherWrapper.class.getName(), 
79                                                   "CREATION: " + this);
80                         
81                         try {
82                                 this.publisher.close(1, TimeUnit.SECONDS);
83                         } catch (Exception e) {
84                                 PolicyLogger.warn(DmaapPublisherWrapper.class.getName(), 
85                                                   "CLOSE: " + this + " because of " + 
86                                                                   e.getMessage());
87                         }
88                 }
89                 
90                 /**
91                  * {@inheritDoc}
92                  */
93                 @Override
94                 public boolean send(String partitionId, String message) 
95                                 throws IllegalArgumentException {
96                         if (message == null)
97                                 throw new IllegalArgumentException("No message provided");
98                         
99                         this.publisher.send(partitionId, message);
100                         return true;
101                         
102                 }
103                 
104                 @Override
105                 public String toString() {
106                         StringBuilder builder = new StringBuilder();
107                         builder.append("DmaapPublisherWrapper [").
108                         append("publisher.getAuthDate()=").append(publisher.getAuthDate()).
109                         append(", publisher.getAuthKey()=").append(publisher.getAuthKey()).
110                         append(", publisher.getHost()=").append(publisher.getHost()).
111                         append(", publisher.getProtocolFlag()=").append(publisher.getProtocolFlag()).
112                         append(", publisher.getUsername()=").append(publisher.getUsername()).
113                         append(", publisher.getPendingMessageCount()=").append(publisher.getPendingMessageCount()).
114                         append("]");
115                         return builder.toString();
116                 }
117         }
118
119 }