2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23 import com.att.nsa.apiClient.http.HttpClient.ConnectionType;
24 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
25 import com.att.nsa.cambria.client.CambriaClientBuilders;
26 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
27 import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
28 import com.att.nsa.mr.client.response.MRPublisherResponse;
29 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
30 import com.fasterxml.jackson.annotation.JsonIgnore;
32 import java.net.MalformedURLException;
33 import java.security.GeneralSecurityException;
34 import java.util.ArrayList;
35 import java.util.List;
37 import java.util.Properties;
38 import java.util.concurrent.TimeUnit;
40 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory;
41 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
45 public interface BusPublisher {
51 * @param message the message
52 * @return true if success, false otherwise
53 * @throws IllegalArgumentException if no message provided
55 public boolean send(String partitionId, String message);
58 * closes the publisher
63 * Cambria based library publisher
65 public static class CambriaPublisherWrapper implements BusPublisher {
67 private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
70 * The actual Cambria publisher
73 protected volatile CambriaBatchingPublisher publisher;
75 public CambriaPublisherWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
77 this(servers, topic, apiKey, apiSecret, null, null, useHttps, false);
80 public CambriaPublisherWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
81 String username, String password, boolean useHttps, boolean selfSignedCerts) {
83 PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
85 builder.usingHosts(servers).onTopic(topic);
87 // Set read timeout to 30 seconds (TBD: this should be configurable)
88 builder.withSocketTimeout(30000);
91 if (selfSignedCerts) {
92 builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
94 builder.withConnectionType(ConnectionType.HTTPS);
99 if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) {
100 builder.authenticatedBy(apiKey, apiSecret);
103 if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) {
104 builder.authenticatedByHttp(username, password);
108 this.publisher = builder.build();
109 } catch (MalformedURLException | GeneralSecurityException e) {
110 throw new IllegalArgumentException(e);
118 public boolean send(String partitionId, String message) {
119 if (message == null) {
120 throw new IllegalArgumentException("No message provided");
124 this.publisher.send(partitionId, message);
125 } catch (Exception e) {
126 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
136 public void close() {
137 logger.info("{}: CLOSE", this);
140 this.publisher.close();
141 } catch (Exception e) {
142 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
148 public String toString() {
149 return "CambriaPublisherWrapper []";
155 * DmaapClient library wrapper
157 public abstract class DmaapPublisherWrapper implements BusPublisher {
159 private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
164 protected MRSimplerBatchPublisher publisher;
165 protected Properties props;
168 * MR Publisher Wrapper
170 * @param servers messaging bus hosts
172 * @param username AAF or DME2 Login
173 * @param password AAF or DME2 Password
175 public DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic,
176 String username, String password, boolean useHttps) {
179 if (topic == null || topic.isEmpty()) {
180 throw new IllegalArgumentException("No topic for DMaaP");
184 if (protocol == ProtocolTypeConstants.AAF_AUTH) {
185 if (servers == null || servers.isEmpty()) {
186 throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
189 ArrayList<String> dmaapServers = new ArrayList<>();
191 for (String server : servers) {
192 dmaapServers.add(server + ":3905");
196 for (String server : servers) {
197 dmaapServers.add(server + ":3904");
202 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
204 this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
205 } else if (protocol == ProtocolTypeConstants.DME2) {
206 ArrayList<String> dmaapServers = new ArrayList<>();
207 dmaapServers.add("0.0.0.0:3904");
209 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
211 this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
214 this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
216 this.publisher.setUsername(username);
217 this.publisher.setPassword(password);
219 props = new Properties();
222 props.setProperty("Protocol", "https");
224 props.setProperty("Protocol", "http");
227 props.setProperty("contenttype", "application/json");
228 props.setProperty("username", username);
229 props.setProperty("password", password);
231 props.setProperty("topic", topic);
233 this.publisher.setProps(props);
235 if (protocol == ProtocolTypeConstants.AAF_AUTH) {
236 this.publisher.setHost(servers.get(0));
239 logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
246 public void close() {
247 logger.info("{}: CLOSE", this);
250 this.publisher.close(1, TimeUnit.SECONDS);
251 } catch (Exception e) {
252 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
260 public boolean send(String partitionId, String message) {
261 if (message == null) {
262 throw new IllegalArgumentException("No message provided");
265 this.publisher.setPubResponse(new MRPublisherResponse());
266 this.publisher.send(partitionId, message);
267 MRPublisherResponse response = this.publisher.sendBatchWithResponse();
268 if (response != null) {
269 logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(),
270 response.getResponseMessage());
277 public String toString() {
278 return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
279 + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()="
280 + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag()
281 + ", publisher.getUsername()=" + publisher.getUsername() + "]";
286 * DmaapClient library wrapper
288 public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
292 public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
295 super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
299 public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
300 public DmaapDmePublisherWrapper(List<String> servers, String topic, String username, String password,
301 String environment, String aftEnvironment, String dme2Partner, String latitude, String longitude,
302 Map<String, String> additionalProps, boolean useHttps) {
304 super(ProtocolTypeConstants.DME2, servers, topic, username, password, useHttps);
308 String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
310 if (environment == null || environment.isEmpty()) {
311 throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
313 if (aftEnvironment == null || aftEnvironment.isEmpty()) {
314 throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
316 if (latitude == null || latitude.isEmpty()) {
317 throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
319 if (longitude == null || longitude.isEmpty()) {
320 throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
323 if ((dme2Partner == null || dme2Partner.isEmpty())
324 && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
325 throw new IllegalArgumentException(
326 "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
327 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
328 + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
329 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
332 String serviceName = servers.get(0);
334 /* These are required, no defaults */
335 props.setProperty("Environment", environment);
336 props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
338 props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
340 if (dme2Partner != null) {
341 props.setProperty("Partner", dme2Partner);
343 if (dme2RouteOffer != null) {
344 props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
347 props.setProperty("Latitude", latitude);
348 props.setProperty("Longitude", longitude);
350 // ServiceName also a default, found in additionalProps
352 /* These are optional, will default to these values if not set in optionalProps */
353 props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
354 props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
355 props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
356 props.setProperty("Version", "1.0");
357 props.setProperty("SubContextPath", "/");
358 props.setProperty("sessionstickinessrequired", "no");
360 /* These should not change */
361 props.setProperty("TransportType", "DME2");
362 props.setProperty("MethodType", "POST");
364 for (Map.Entry<String, String> entry : additionalProps.entrySet()) {
365 String key = entry.getKey();
366 String value = entry.getValue();
369 props.setProperty(key, value);
373 this.publisher.setProps(props);
376 private IllegalArgumentException parmException(String topic, String propnm) {
377 return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
378 + topic + propnm + " property for DME2 in DMaaP");