1 package org.onap.policy.utils;
3 import java.util.ArrayList;
5 import java.util.Properties;
6 import java.util.concurrent.TimeUnit;
8 import org.onap.policy.common.logging.eelf.PolicyLogger;
10 import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
11 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
13 public interface BusPublisher {
19 * @param message the message
20 * @return true if success, false otherwise
21 * @throws IllegalArgumentException if no message provided
23 public boolean send(String partitionId, String message) throws IllegalArgumentException;
26 * closes the publisher
31 * DmaapClient library wrapper
33 public static class DmaapPublisherWrapper implements BusPublisher {
37 protected MRSimplerBatchPublisher publisher;
39 public DmaapPublisherWrapper(List<String> servers, String topic,
43 ArrayList<String> dmaapServers = new ArrayList<>();
44 for (String server: servers) {
45 dmaapServers.add(server + ":3904");
49 new MRSimplerBatchPublisher.Builder().
50 againstUrls(dmaapServers).
54 this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
56 this.publisher.setUsername(aafLogin);
57 this.publisher.setPassword(aafPassword);
59 Properties props = new Properties();
60 props.setProperty("Protocol", "http");
61 props.setProperty("contenttype", "application/json");
63 this.publisher.setProps(props);
65 this.publisher.setHost(servers.get(0));
67 if (PolicyLogger.isInfoEnabled())
68 PolicyLogger.info(DmaapPublisherWrapper.class.getName(),
77 if (PolicyLogger.isInfoEnabled())
78 PolicyLogger.info(DmaapPublisherWrapper.class.getName(),
82 this.publisher.close(1, TimeUnit.SECONDS);
83 } catch (Exception e) {
84 PolicyLogger.warn(DmaapPublisherWrapper.class.getName(),
85 "CLOSE: " + this + " because of " +
94 public boolean send(String partitionId, String message)
95 throws IllegalArgumentException {
97 throw new IllegalArgumentException("No message provided");
99 this.publisher.send(partitionId, message);
105 public String toString() {
106 StringBuilder builder = new StringBuilder();
107 builder.append("DmaapPublisherWrapper [").
108 append("publisher.getAuthDate()=").append(publisher.getAuthDate()).
109 append(", publisher.getAuthKey()=").append(publisher.getAuthKey()).
110 append(", publisher.getHost()=").append(publisher.getHost()).
111 append(", publisher.getProtocolFlag()=").append(publisher.getProtocolFlag()).
112 append(", publisher.getUsername()=").append(publisher.getUsername()).
113 append(", publisher.getPendingMessageCount()=").append(publisher.getPendingMessageCount()).
115 return builder.toString();