2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
24 import com.att.nsa.apiClient.http.HttpClient.ConnectionType;
25 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
26 import com.att.nsa.cambria.client.CambriaClientBuilders;
27 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
28 import com.fasterxml.jackson.annotation.JsonIgnore;
30 import java.net.MalformedURLException;
31 import java.security.GeneralSecurityException;
32 import java.util.ArrayList;
33 import java.util.List;
35 import java.util.Properties;
36 import java.util.concurrent.TimeUnit;
38 import org.apache.commons.lang3.StringUtils;
39 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
40 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
41 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
42 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory;
43 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
44 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
48 public interface BusPublisher {
53 * @param partitionId id
54 * @param message the message
55 * @return true if success, false otherwise
56 * @throws IllegalArgumentException if no message provided
58 public boolean send(String partitionId, String message);
61 * closes the publisher.
66 * Cambria based library publisher.
68 public static class CambriaPublisherWrapper implements BusPublisher {
70 private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
73 * The actual Cambria publisher.
77 protected CambriaBatchingPublisher publisher;
82 * @param busTopicParams topic parameters
84 public CambriaPublisherWrapper(BusTopicParams busTopicParams) {
86 PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
88 builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic());
90 // Set read timeout to 30 seconds (TBD: this should be configurable)
91 builder.withSocketTimeout(30000);
93 if (busTopicParams.isUseHttps()) {
94 if (busTopicParams.isAllowSelfSignedCerts()) {
95 builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
97 builder.withConnectionType(ConnectionType.HTTPS);
102 if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
103 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
106 if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
107 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
111 this.publisher = builder.build();
112 } catch (MalformedURLException | GeneralSecurityException e) {
113 throw new IllegalArgumentException(e);
118 public boolean send(String partitionId, String message) {
119 if (message == null) {
120 throw new IllegalArgumentException("No message provided");
124 this.publisher.send(partitionId, message);
125 } catch (Exception e) {
126 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
133 public void close() {
134 logger.info("{}: CLOSE", this);
137 this.publisher.close();
138 } catch (Exception e) {
139 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
145 public String toString() {
146 return "CambriaPublisherWrapper []";
152 * DmaapClient library wrapper.
154 public abstract class DmaapPublisherWrapper implements BusPublisher {
156 private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
159 * MR based Publisher.
161 protected MRSimplerBatchPublisher publisher;
162 protected Properties props;
165 * MR Publisher Wrapper.
167 * @param servers messaging bus hosts
169 * @param username AAF or DME2 Login
170 * @param password AAF or DME2 Password
172 public DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic,
173 String username, String password, boolean useHttps) {
176 if (StringUtils.isBlank(topic)) {
177 throw new IllegalArgumentException("No topic for DMaaP");
181 if (protocol == ProtocolTypeConstants.AAF_AUTH) {
182 if (servers == null || servers.isEmpty()) {
183 throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
186 ArrayList<String> dmaapServers = new ArrayList<>();
188 for (String server : servers) {
189 dmaapServers.add(server + ":3905");
193 for (String server : servers) {
194 dmaapServers.add(server + ":3904");
199 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
201 this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
202 } else if (protocol == ProtocolTypeConstants.DME2) {
203 ArrayList<String> dmaapServers = new ArrayList<>();
204 dmaapServers.add("0.0.0.0:3904");
206 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
208 this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
210 throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol);
213 this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
215 this.publisher.setUsername(username);
216 this.publisher.setPassword(password);
218 props = new Properties();
221 props.setProperty("Protocol", "https");
223 props.setProperty("Protocol", "http");
226 props.setProperty("contenttype", "application/json");
227 props.setProperty("username", username);
228 props.setProperty("password", password);
230 props.setProperty("topic", topic);
232 this.publisher.setProps(props);
234 if (protocol == ProtocolTypeConstants.AAF_AUTH) {
235 this.publisher.setHost(servers.get(0));
238 logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
242 public void close() {
243 logger.info("{}: CLOSE", this);
246 this.publisher.close(1, TimeUnit.SECONDS);
247 } catch (Exception e) {
248 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
253 public boolean send(String partitionId, String message) {
254 if (message == null) {
255 throw new IllegalArgumentException("No message provided");
258 this.publisher.setPubResponse(new MRPublisherResponse());
259 this.publisher.send(partitionId, message);
260 MRPublisherResponse response = this.publisher.sendBatchWithResponse();
261 if (response != null) {
262 logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(),
263 response.getResponseMessage());
270 public String toString() {
271 return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
272 + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()="
273 + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag()
274 + ", publisher.getUsername()=" + publisher.getUsername() + "]";
279 * DmaapClient library wrapper.
281 public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
283 * MR based Publisher.
285 public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
288 super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
292 public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
297 * @param busTopicParams topic parameters
299 public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) {
301 super(ProtocolTypeConstants.DME2, busTopicParams.getServers(),busTopicParams.getTopic(),
302 busTopicParams.getUserName(),busTopicParams.getPassword(),busTopicParams.isUseHttps());
303 String dme2RouteOffer = null;
304 if (busTopicParams.isAdditionalPropsValid()) {
305 dme2RouteOffer = busTopicParams.getAdditionalProps().get(
306 DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
309 if (busTopicParams.isEnvironmentInvalid()) {
310 throw parmException(busTopicParams.getTopic(),
311 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
313 if (busTopicParams.isAftEnvironmentInvalid()) {
314 throw parmException(busTopicParams.getTopic(),
315 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
317 if (busTopicParams.isLatitudeInvalid()) {
318 throw parmException(busTopicParams.getTopic(),
319 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
321 if (busTopicParams.isLongitudeInvalid()) {
322 throw parmException(busTopicParams.getTopic(),
323 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
326 if ((busTopicParams.isPartnerInvalid())
327 && StringUtils.isBlank(dme2RouteOffer)) {
328 throw new IllegalArgumentException(
329 "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
330 + busTopicParams.getTopic()
331 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
332 + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
333 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
336 String serviceName = busTopicParams.getServers().get(0);
338 /* These are required, no defaults */
339 props.setProperty("Environment", busTopicParams.getEnvironment());
340 props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
342 props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
344 if (busTopicParams.getPartner() != null) {
345 props.setProperty("Partner", busTopicParams.getPartner());
347 if (dme2RouteOffer != null) {
348 props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
351 props.setProperty("Latitude", busTopicParams.getLatitude());
352 props.setProperty("Longitude", busTopicParams.getLongitude());
354 // ServiceName also a default, found in additionalProps
356 /* These are optional, will default to these values if not set in optionalProps */
357 props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
358 props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
359 props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
360 props.setProperty("Version", "1.0");
361 props.setProperty("SubContextPath", "/");
362 props.setProperty("sessionstickinessrequired", "no");
364 /* These should not change */
365 props.setProperty("TransportType", "DME2");
366 props.setProperty("MethodType", "POST");
368 if (busTopicParams.isAdditionalPropsValid()) {
369 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
370 String key = entry.getKey();
371 String value = entry.getValue();
374 props.setProperty(key, value);
379 this.publisher.setProps(props);
382 private IllegalArgumentException parmException(String topic, String propnm) {
383 return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
384 + topic + propnm + " property for DME2 in DMaaP");