2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7 * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
8 * Copyright (C) 2022 Nordix Foundation.
9 * ================================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
21 * ============LICENSE_END=========================================================
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
26 import com.att.nsa.apiClient.http.HttpClient.ConnectionType;
27 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
28 import com.att.nsa.cambria.client.CambriaClientBuilders;
29 import java.net.MalformedURLException;
30 import java.security.GeneralSecurityException;
31 import java.util.ArrayList;
32 import java.util.List;
34 import java.util.Properties;
35 import java.util.UUID;
36 import java.util.concurrent.TimeUnit;
37 import org.apache.commons.lang3.StringUtils;
38 import org.apache.kafka.clients.consumer.ConsumerConfig;
39 import org.apache.kafka.clients.producer.KafkaProducer;
40 import org.apache.kafka.clients.producer.Producer;
41 import org.apache.kafka.clients.producer.ProducerConfig;
42 import org.apache.kafka.clients.producer.ProducerRecord;
43 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
44 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
45 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
46 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
47 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
51 public interface BusPublisher {
56 * @param partitionId id
57 * @param message the message
58 * @return true if success, false otherwise
59 * @throws IllegalArgumentException if no message provided
61 public boolean send(String partitionId, String message);
64 * closes the publisher.
69 * Cambria based library publisher.
71 public static class CambriaPublisherWrapper implements BusPublisher {
73 private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
76 * The actual Cambria publisher.
79 protected CambriaBatchingPublisher publisher;
84 * @param busTopicParams topic parameters
86 public CambriaPublisherWrapper(BusTopicParams busTopicParams) {
88 var builder = new CambriaClientBuilders.PublisherBuilder();
90 builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic());
92 // Set read timeout to 30 seconds (TBD: this should be configurable)
93 builder.withSocketTimeout(30000);
95 if (busTopicParams.isUseHttps()) {
96 if (busTopicParams.isAllowSelfSignedCerts()) {
97 builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
99 builder.withConnectionType(ConnectionType.HTTPS);
104 if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
105 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
108 if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
109 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
113 this.publisher = builder.build();
114 } catch (MalformedURLException | GeneralSecurityException e) {
115 throw new IllegalArgumentException(e);
120 public boolean send(String partitionId, String message) {
121 if (message == null) {
122 throw new IllegalArgumentException("No message provided");
126 this.publisher.send(partitionId, message);
127 } catch (Exception e) {
128 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
135 public void close() {
136 logger.info("{}: CLOSE", this);
139 this.publisher.close();
140 } catch (Exception e) {
141 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
147 public String toString() {
148 return "CambriaPublisherWrapper []";
154 * Kafka based library publisher.
156 public static class KafkaPublisherWrapper implements BusPublisher {
158 private static Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class);
159 private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
161 private String topic;
166 private Producer<String, String> producer;
167 protected Properties kafkaProps;
170 * Kafka Publisher Wrapper.
172 * @param busTopicParams topic parameters
174 protected KafkaPublisherWrapper(BusTopicParams busTopicParams) {
176 if (busTopicParams.isTopicInvalid()) {
177 throw new IllegalArgumentException("No topic for Kafka");
180 this.topic = busTopicParams.getTopic();
182 //Setup Properties for consumer
183 kafkaProps = new Properties();
184 kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0));
185 if (busTopicParams.isAdditionalPropsValid()) {
186 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
187 kafkaProps.put(entry.getKey(), entry.getValue());
190 if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) {
191 kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
193 if (kafkaProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) == null) {
194 kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
197 producer = new KafkaProducer<>(kafkaProps);
201 public boolean send(String partitionId, String message) {
202 if (message == null) {
203 throw new IllegalArgumentException("No message provided");
208 ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,
209 UUID.randomUUID().toString(), message);
211 this.producer.send(record);
213 } catch (Exception e) {
214 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
221 public void close() {
222 logger.info("{}: CLOSE", this);
225 this.producer.close();
226 } catch (Exception e) {
227 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
233 public String toString() {
234 return "KafkaPublisherWrapper []";
240 * DmaapClient library wrapper.
242 public abstract class DmaapPublisherWrapper implements BusPublisher {
244 private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
247 * MR based Publisher.
249 protected MRSimplerBatchPublisher publisher;
250 protected Properties props;
253 * MR Publisher Wrapper.
255 * @param servers messaging bus hosts
257 * @param username AAF or DME2 Login
258 * @param password AAF or DME2 Password
260 protected DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic,
261 String username, String password, boolean useHttps) {
264 if (StringUtils.isBlank(topic)) {
265 throw new IllegalArgumentException("No topic for DMaaP");
269 configureProtocol(topic, protocol, servers, useHttps);
271 this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
273 this.publisher.setUsername(username);
274 this.publisher.setPassword(password);
276 props = new Properties();
278 props.setProperty("Protocol", (useHttps ? "https" : "http"));
279 props.setProperty("contenttype", "application/json");
280 props.setProperty("username", username);
281 props.setProperty("password", password);
283 props.setProperty("topic", topic);
285 this.publisher.setProps(props);
287 if (protocol == ProtocolTypeConstants.AAF_AUTH) {
288 this.publisher.setHost(servers.get(0));
291 logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
294 private void configureProtocol(String topic, ProtocolTypeConstants protocol, List<String> servers,
297 if (protocol == ProtocolTypeConstants.AAF_AUTH) {
298 if (servers == null || servers.isEmpty()) {
299 throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
302 ArrayList<String> dmaapServers = new ArrayList<>();
303 String port = useHttps ? ":3905" : ":3904";
304 for (String server : servers) {
305 dmaapServers.add(server + port);
309 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
311 this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
313 } else if (protocol == ProtocolTypeConstants.DME2) {
314 ArrayList<String> dmaapServers = new ArrayList<>();
315 dmaapServers.add("0.0.0.0:3904");
317 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
319 this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
322 throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol);
327 public void close() {
328 logger.info("{}: CLOSE", this);
331 this.publisher.close(1, TimeUnit.SECONDS);
333 } catch (InterruptedException e) {
334 logger.warn("{}: CLOSE FAILED", this, e);
335 Thread.currentThread().interrupt();
337 } catch (Exception e) {
338 logger.warn("{}: CLOSE FAILED", this, e);
343 public boolean send(String partitionId, String message) {
344 if (message == null) {
345 throw new IllegalArgumentException("No message provided");
348 this.publisher.setPubResponse(new MRPublisherResponse());
349 this.publisher.send(partitionId, message);
350 MRPublisherResponse response = this.publisher.sendBatchWithResponse();
351 if (response != null) {
352 logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(),
353 response.getResponseMessage());
360 public String toString() {
361 return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
362 + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()="
363 + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag()
364 + ", publisher.getUsername()=" + publisher.getUsername() + "]";
369 * DmaapClient library wrapper.
371 public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
373 * MR based Publisher.
375 public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
378 super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
382 public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
387 * @param busTopicParams topic parameters
389 public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) {
391 super(ProtocolTypeConstants.DME2, busTopicParams.getServers(), busTopicParams.getTopic(),
392 busTopicParams.getUserName(), busTopicParams.getPassword(), busTopicParams.isUseHttps());
394 String dme2RouteOffer = busTopicParams.isAdditionalPropsValid()
395 ? busTopicParams.getAdditionalProps().get(
396 PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
399 validateParams(busTopicParams, dme2RouteOffer);
401 String serviceName = busTopicParams.getServers().get(0);
403 /* These are required, no defaults */
404 props.setProperty("Environment", busTopicParams.getEnvironment());
405 props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
407 props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
409 if (busTopicParams.getPartner() != null) {
410 props.setProperty("Partner", busTopicParams.getPartner());
412 if (dme2RouteOffer != null) {
413 props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
416 props.setProperty("Latitude", busTopicParams.getLatitude());
417 props.setProperty("Longitude", busTopicParams.getLongitude());
419 // ServiceName also a default, found in additionalProps
421 /* These are optional, will default to these values if not set in optionalProps */
422 props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
423 props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
424 props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
425 props.setProperty("Version", "1.0");
426 props.setProperty("SubContextPath", "/");
427 props.setProperty("sessionstickinessrequired", "no");
429 /* These should not change */
430 props.setProperty("TransportType", "DME2");
431 props.setProperty("MethodType", "POST");
433 if (busTopicParams.isAdditionalPropsValid()) {
434 addAdditionalProps(busTopicParams);
437 this.publisher.setProps(props);
440 private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) {
441 if (busTopicParams.isEnvironmentInvalid()) {
442 throw parmException(busTopicParams.getTopic(),
443 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
445 if (busTopicParams.isAftEnvironmentInvalid()) {
446 throw parmException(busTopicParams.getTopic(),
447 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
449 if (busTopicParams.isLatitudeInvalid()) {
450 throw parmException(busTopicParams.getTopic(),
451 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
453 if (busTopicParams.isLongitudeInvalid()) {
454 throw parmException(busTopicParams.getTopic(),
455 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
458 if ((busTopicParams.isPartnerInvalid())
459 && StringUtils.isBlank(dme2RouteOffer)) {
460 throw new IllegalArgumentException(
461 "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
462 + busTopicParams.getTopic()
463 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
464 + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
465 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
469 private void addAdditionalProps(BusTopicParams busTopicParams) {
470 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
471 String key = entry.getKey();
472 String value = entry.getValue();
475 props.setProperty(key, value);
480 private IllegalArgumentException parmException(String topic, String propnm) {
481 return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
482 + topic + propnm + " property for DME2 in DMaaP");