2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7 * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.common.endpoints.event.comm.bus.internal;
25 import com.att.nsa.apiClient.http.HttpClient.ConnectionType;
26 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
27 import com.att.nsa.cambria.client.CambriaClientBuilders;
28 import java.net.MalformedURLException;
29 import java.security.GeneralSecurityException;
30 import java.util.ArrayList;
31 import java.util.List;
33 import java.util.Properties;
34 import java.util.concurrent.TimeUnit;
35 import org.apache.commons.lang3.StringUtils;
36 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
37 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
38 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
39 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
40 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
44 public interface BusPublisher {
49 * @param partitionId id
50 * @param message the message
51 * @return true if success, false otherwise
52 * @throws IllegalArgumentException if no message provided
54 public boolean send(String partitionId, String message);
57 * closes the publisher.
62 * Cambria based library publisher.
64 public static class CambriaPublisherWrapper implements BusPublisher {
66 private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
69 * The actual Cambria publisher.
72 protected CambriaBatchingPublisher publisher;
77 * @param busTopicParams topic parameters
79 public CambriaPublisherWrapper(BusTopicParams busTopicParams) {
81 var builder = new CambriaClientBuilders.PublisherBuilder();
83 builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic());
85 // Set read timeout to 30 seconds (TBD: this should be configurable)
86 builder.withSocketTimeout(30000);
88 if (busTopicParams.isUseHttps()) {
89 if (busTopicParams.isAllowSelfSignedCerts()) {
90 builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
92 builder.withConnectionType(ConnectionType.HTTPS);
97 if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
98 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
101 if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
102 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
106 this.publisher = builder.build();
107 } catch (MalformedURLException | GeneralSecurityException e) {
108 throw new IllegalArgumentException(e);
113 public boolean send(String partitionId, String message) {
114 if (message == null) {
115 throw new IllegalArgumentException("No message provided");
119 this.publisher.send(partitionId, message);
120 } catch (Exception e) {
121 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
128 public void close() {
129 logger.info("{}: CLOSE", this);
132 this.publisher.close();
133 } catch (Exception e) {
134 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
140 public String toString() {
141 return "CambriaPublisherWrapper []";
147 * DmaapClient library wrapper.
149 public abstract class DmaapPublisherWrapper implements BusPublisher {
151 private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
154 * MR based Publisher.
156 protected MRSimplerBatchPublisher publisher;
157 protected Properties props;
160 * MR Publisher Wrapper.
162 * @param servers messaging bus hosts
164 * @param username AAF or DME2 Login
165 * @param password AAF or DME2 Password
167 protected DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic,
168 String username, String password, boolean useHttps) {
171 if (StringUtils.isBlank(topic)) {
172 throw new IllegalArgumentException("No topic for DMaaP");
176 configureProtocol(topic, protocol, servers, useHttps);
178 this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
180 this.publisher.setUsername(username);
181 this.publisher.setPassword(password);
183 props = new Properties();
185 props.setProperty("Protocol", (useHttps ? "https" : "http"));
186 props.setProperty("contenttype", "application/json");
187 props.setProperty("username", username);
188 props.setProperty("password", password);
190 props.setProperty("topic", topic);
192 this.publisher.setProps(props);
194 if (protocol == ProtocolTypeConstants.AAF_AUTH) {
195 this.publisher.setHost(servers.get(0));
198 logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
201 private void configureProtocol(String topic, ProtocolTypeConstants protocol, List<String> servers,
204 if (protocol == ProtocolTypeConstants.AAF_AUTH) {
205 if (servers == null || servers.isEmpty()) {
206 throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
209 ArrayList<String> dmaapServers = new ArrayList<>();
210 String port = useHttps ? ":3905" : ":3904";
211 for (String server : servers) {
212 dmaapServers.add(server + port);
216 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
218 this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
220 } else if (protocol == ProtocolTypeConstants.DME2) {
221 ArrayList<String> dmaapServers = new ArrayList<>();
222 dmaapServers.add("0.0.0.0:3904");
224 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
226 this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
229 throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol);
234 public void close() {
235 logger.info("{}: CLOSE", this);
238 this.publisher.close(1, TimeUnit.SECONDS);
240 } catch (InterruptedException e) {
241 logger.warn("{}: CLOSE FAILED", this, e);
242 Thread.currentThread().interrupt();
244 } catch (Exception e) {
245 logger.warn("{}: CLOSE FAILED", this, e);
250 public boolean send(String partitionId, String message) {
251 if (message == null) {
252 throw new IllegalArgumentException("No message provided");
255 this.publisher.setPubResponse(new MRPublisherResponse());
256 this.publisher.send(partitionId, message);
257 MRPublisherResponse response = this.publisher.sendBatchWithResponse();
258 if (response != null) {
259 logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(),
260 response.getResponseMessage());
267 public String toString() {
268 return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
269 + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()="
270 + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag()
271 + ", publisher.getUsername()=" + publisher.getUsername() + "]";
276 * DmaapClient library wrapper.
278 public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
280 * MR based Publisher.
282 public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
285 super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
289 public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
294 * @param busTopicParams topic parameters
296 public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) {
298 super(ProtocolTypeConstants.DME2, busTopicParams.getServers(), busTopicParams.getTopic(),
299 busTopicParams.getUserName(), busTopicParams.getPassword(), busTopicParams.isUseHttps());
301 String dme2RouteOffer = busTopicParams.isAdditionalPropsValid()
302 ? busTopicParams.getAdditionalProps().get(
303 PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
306 validateParams(busTopicParams, dme2RouteOffer);
308 String serviceName = busTopicParams.getServers().get(0);
310 /* These are required, no defaults */
311 props.setProperty("Environment", busTopicParams.getEnvironment());
312 props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
314 props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
316 if (busTopicParams.getPartner() != null) {
317 props.setProperty("Partner", busTopicParams.getPartner());
319 if (dme2RouteOffer != null) {
320 props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
323 props.setProperty("Latitude", busTopicParams.getLatitude());
324 props.setProperty("Longitude", busTopicParams.getLongitude());
326 // ServiceName also a default, found in additionalProps
328 /* These are optional, will default to these values if not set in optionalProps */
329 props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
330 props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
331 props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
332 props.setProperty("Version", "1.0");
333 props.setProperty("SubContextPath", "/");
334 props.setProperty("sessionstickinessrequired", "no");
336 /* These should not change */
337 props.setProperty("TransportType", "DME2");
338 props.setProperty("MethodType", "POST");
340 if (busTopicParams.isAdditionalPropsValid()) {
341 addAdditionalProps(busTopicParams);
344 this.publisher.setProps(props);
347 private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) {
348 if (busTopicParams.isEnvironmentInvalid()) {
349 throw parmException(busTopicParams.getTopic(),
350 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
352 if (busTopicParams.isAftEnvironmentInvalid()) {
353 throw parmException(busTopicParams.getTopic(),
354 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
356 if (busTopicParams.isLatitudeInvalid()) {
357 throw parmException(busTopicParams.getTopic(),
358 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
360 if (busTopicParams.isLongitudeInvalid()) {
361 throw parmException(busTopicParams.getTopic(),
362 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
365 if ((busTopicParams.isPartnerInvalid())
366 && StringUtils.isBlank(dme2RouteOffer)) {
367 throw new IllegalArgumentException(
368 "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
369 + busTopicParams.getTopic()
370 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
371 + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
372 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
376 private void addAdditionalProps(BusTopicParams busTopicParams) {
377 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
378 String key = entry.getKey();
379 String value = entry.getValue();
382 props.setProperty(key, value);
387 private IllegalArgumentException parmException(String topic, String propnm) {
388 return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
389 + topic + propnm + " property for DME2 in DMaaP");