2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus.internal.impl;
23 import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
24 import com.att.nsa.mr.client.response.MRPublisherResponse;
25 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
27 import java.util.ArrayList;
28 import java.util.List;
29 import java.util.Properties;
30 import java.util.concurrent.TimeUnit;
32 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
37 * DmaapClient library wrapper
39 public abstract class DmaapPublisherWrapper implements BusPublisher {
41 private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
46 protected MRSimplerBatchPublisher publisher;
47 protected Properties props;
50 * MR Publisher Wrapper
52 * @param servers messaging bus hosts
54 * @param username AAF or DME2 Login
55 * @param password AAF or DME2 Password
57 public DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic, String username,
58 String password, boolean useHttps) {
61 if (topic == null || topic.isEmpty()) {
62 throw new IllegalArgumentException("No topic for DMaaP");
66 if (protocol == ProtocolTypeConstants.AAF_AUTH) {
67 if (servers == null || servers.isEmpty()) {
68 throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
71 ArrayList<String> dmaapServers = new ArrayList<>();
73 for (String server : servers) {
74 dmaapServers.add(server + ":3905");
78 for (String server : servers) {
79 dmaapServers.add(server + ":3904");
84 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
86 this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
87 } else if (protocol == ProtocolTypeConstants.DME2) {
88 ArrayList<String> dmaapServers = new ArrayList<>();
89 dmaapServers.add("0.0.0.0:3904");
91 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
93 this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
96 this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
98 this.publisher.setUsername(username);
99 this.publisher.setPassword(password);
101 props = new Properties();
104 props.setProperty("Protocol", "https");
106 props.setProperty("Protocol", "http");
109 props.setProperty("contenttype", "application/json");
110 props.setProperty("username", username);
111 props.setProperty("password", password);
113 props.setProperty("topic", topic);
115 this.publisher.setProps(props);
117 if (protocol == ProtocolTypeConstants.AAF_AUTH) {
118 this.publisher.setHost(servers.get(0));
121 logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
128 public void close() {
129 logger.info("{}: CLOSE", this);
132 this.publisher.close(1, TimeUnit.SECONDS);
133 } catch (Exception e) {
134 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
142 public boolean send(String partitionId, String message) {
143 if (message == null) {
144 throw new IllegalArgumentException("No message provided");
147 this.publisher.setPubResponse(new MRPublisherResponse());
148 this.publisher.send(partitionId, message);
149 MRPublisherResponse response = this.publisher.sendBatchWithResponse();
150 if (response != null) {
151 logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(), response.getResponseMessage());
158 public String toString() {
159 return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
160 + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()=" + publisher.getHost()
161 + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag() + ", publisher.getUsername()="
162 + publisher.getUsername() + "]";