2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7 * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
8 * Modifications Copyright (C) 2022-2023 Nordix Foundation.
9 * ================================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
21 * ============LICENSE_END=========================================================
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
26 import com.att.nsa.apiClient.http.HttpClient.ConnectionType;
27 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
28 import com.att.nsa.cambria.client.CambriaClientBuilders;
29 import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor;
30 import java.net.MalformedURLException;
31 import java.security.GeneralSecurityException;
32 import java.util.ArrayList;
33 import java.util.List;
35 import java.util.Properties;
36 import java.util.UUID;
37 import java.util.concurrent.TimeUnit;
38 import org.apache.commons.lang3.StringUtils;
39 import org.apache.kafka.clients.producer.KafkaProducer;
40 import org.apache.kafka.clients.producer.Producer;
41 import org.apache.kafka.clients.producer.ProducerConfig;
42 import org.apache.kafka.clients.producer.ProducerRecord;
43 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
44 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
45 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
46 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
47 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
51 public interface BusPublisher {
53 String NO_MESSAGE_PROVIDED = "No message provided";
54 String LOG_CLOSE = "{}: CLOSE";
55 String LOG_CLOSE_FAILED = "{}: CLOSE FAILED";
60 * @param partitionId id
61 * @param message the message
62 * @return true if success, false otherwise
63 * @throws IllegalArgumentException if no message provided
65 boolean send(String partitionId, String message);
68 * closes the publisher.
73 * Cambria based library publisher.
75 class CambriaPublisherWrapper implements BusPublisher {
77 private static final Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
80 * The actual Cambria publisher.
83 protected CambriaBatchingPublisher publisher;
88 * @param busTopicParams topic parameters
90 public CambriaPublisherWrapper(BusTopicParams busTopicParams) {
92 var builder = new CambriaClientBuilders.PublisherBuilder();
94 builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic());
96 // Set read timeout to 30 seconds (TBD: this should be configurable)
97 builder.withSocketTimeout(30000);
99 if (busTopicParams.isUseHttps()) {
100 if (busTopicParams.isAllowSelfSignedCerts()) {
101 builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
103 builder.withConnectionType(ConnectionType.HTTPS);
107 if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
108 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
111 if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
112 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
116 this.publisher = builder.build();
117 } catch (MalformedURLException | GeneralSecurityException e) {
118 throw new IllegalArgumentException(e);
123 public boolean send(String partitionId, String message) {
124 if (message == null) {
125 throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
129 this.publisher.send(partitionId, message);
130 } catch (Exception e) {
131 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
138 public void close() {
139 logger.info(LOG_CLOSE, this);
142 this.publisher.close();
143 } catch (Exception e) {
144 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
149 public String toString() {
150 return "CambriaPublisherWrapper []";
156 * Kafka based library publisher.
158 class KafkaPublisherWrapper implements BusPublisher {
160 private static final Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class);
161 private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
163 private final String topic;
168 private final Producer<String, String> producer;
169 protected Properties kafkaProps;
172 * Kafka Publisher Wrapper.
174 * @param busTopicParams topic parameters
176 protected KafkaPublisherWrapper(BusTopicParams busTopicParams) {
178 if (busTopicParams.isTopicInvalid()) {
179 throw new IllegalArgumentException("No topic for Kafka");
182 this.topic = busTopicParams.getTopic();
184 // Setup Properties for consumer
185 kafkaProps = new Properties();
186 kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0));
187 if (busTopicParams.isAdditionalPropsValid()) {
188 kafkaProps.putAll(busTopicParams.getAdditionalProps());
190 if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) {
191 kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
193 if (kafkaProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) == null) {
194 kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
197 if (busTopicParams.isAllowTracing()) {
198 kafkaProps.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
199 TracingProducerInterceptor.class.getName());
202 producer = new KafkaProducer<>(kafkaProps);
206 public boolean send(String partitionId, String message) {
207 if (message == null) {
208 throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
213 ProducerRecord<String, String> producerRecord =
214 new ProducerRecord<>(topic, UUID.randomUUID().toString(), message);
216 this.producer.send(producerRecord);
218 } catch (Exception e) {
219 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
226 public void close() {
227 logger.info(LOG_CLOSE, this);
230 this.producer.close();
231 } catch (Exception e) {
232 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
237 public String toString() {
238 return "KafkaPublisherWrapper []";
244 * DmaapClient library wrapper.
246 abstract class DmaapPublisherWrapper implements BusPublisher {
248 private static final Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
251 * MR based Publisher.
253 protected MRSimplerBatchPublisher publisher;
254 protected Properties props;
257 * MR Publisher Wrapper.
259 * @param servers messaging bus hosts
261 * @param username AAF or DME2 Login
262 * @param password AAF or DME2 Password
264 protected DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic,
265 String username, String password, boolean useHttps) {
267 if (StringUtils.isBlank(topic)) {
268 throw new IllegalArgumentException("No topic for DMaaP");
271 configureProtocol(topic, protocol, servers, useHttps);
273 this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
275 this.publisher.setUsername(username);
276 this.publisher.setPassword(password);
278 props = new Properties();
280 props.setProperty("Protocol", (useHttps ? "https" : "http"));
281 props.setProperty("contenttype", "application/json");
282 props.setProperty("username", username);
283 props.setProperty("password", password);
285 props.setProperty("topic", topic);
287 this.publisher.setProps(props);
289 if (protocol == ProtocolTypeConstants.AAF_AUTH) {
290 this.publisher.setHost(servers.get(0));
293 logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
296 private void configureProtocol(String topic, ProtocolTypeConstants protocol, List<String> servers,
299 if (protocol == ProtocolTypeConstants.AAF_AUTH) {
300 if (servers == null || servers.isEmpty()) {
301 throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
304 ArrayList<String> dmaapServers = new ArrayList<>();
305 String port = useHttps ? ":3905" : ":3904";
306 for (String server : servers) {
307 dmaapServers.add(server + port);
310 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
312 this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
314 } else if (protocol == ProtocolTypeConstants.DME2) {
315 ArrayList<String> dmaapServers = new ArrayList<>();
316 dmaapServers.add("0.0.0.0:3904");
318 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
320 this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
323 throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol);
328 public void close() {
329 logger.info(LOG_CLOSE, this);
332 this.publisher.close(1, TimeUnit.SECONDS);
334 } catch (InterruptedException e) {
335 logger.warn(LOG_CLOSE_FAILED, this, e);
336 Thread.currentThread().interrupt();
338 } catch (Exception e) {
339 logger.warn(LOG_CLOSE_FAILED, this, e);
344 public boolean send(String partitionId, String message) {
345 if (message == null) {
346 throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
349 this.publisher.setPubResponse(new MRPublisherResponse());
350 this.publisher.send(partitionId, message);
351 MRPublisherResponse response = this.publisher.sendBatchWithResponse();
352 if (response != null) {
353 logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(),
354 response.getResponseMessage());
361 public String toString() {
362 return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
363 + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()="
364 + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag()
365 + ", publisher.getUsername()=" + publisher.getUsername() + "]";
370 * DmaapClient library wrapper.
372 class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
374 * MR based Publisher.
376 public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
379 super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
383 class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
388 * @param busTopicParams topic parameters
390 public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) {
392 super(ProtocolTypeConstants.DME2, busTopicParams.getServers(), busTopicParams.getTopic(),
393 busTopicParams.getUserName(), busTopicParams.getPassword(), busTopicParams.isUseHttps());
395 String dme2RouteOffer = busTopicParams.isAdditionalPropsValid()
396 ? busTopicParams.getAdditionalProps().get(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
399 validateParams(busTopicParams, dme2RouteOffer);
401 String serviceName = busTopicParams.getServers().get(0);
403 /* These are required, no defaults */
404 props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
406 BusHelper.setCommonProperties(busTopicParams, dme2RouteOffer, props);
408 props.setProperty("MethodType", "POST");
410 if (busTopicParams.isAdditionalPropsValid()) {
411 addAdditionalProps(busTopicParams);
414 this.publisher.setProps(props);
417 private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) {
418 BusHelper.validateBusTopicParams(busTopicParams, PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS);
420 if ((busTopicParams.isPartnerInvalid()) && StringUtils.isBlank(dme2RouteOffer)) {
421 throw new IllegalArgumentException("Must provide at least "
422 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + busTopicParams.getTopic()
423 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
424 + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
425 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
429 private void addAdditionalProps(BusTopicParams busTopicParams) {
430 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
431 String key = entry.getKey();
432 String value = entry.getValue();
435 props.setProperty(key, value);