2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7 * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
8 * Modifications Copyright (C) 2022-2024 Nordix Foundation.
9 * ================================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
21 * ============LICENSE_END=========================================================
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
26 import com.att.nsa.apiClient.http.HttpClient.ConnectionType;
27 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
28 import com.att.nsa.cambria.client.CambriaClientBuilders;
29 import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor;
30 import java.net.MalformedURLException;
31 import java.security.GeneralSecurityException;
32 import java.util.Properties;
33 import java.util.UUID;
34 import org.apache.kafka.clients.producer.KafkaProducer;
35 import org.apache.kafka.clients.producer.Producer;
36 import org.apache.kafka.clients.producer.ProducerConfig;
37 import org.apache.kafka.clients.producer.ProducerRecord;
38 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
42 public interface BusPublisher {
44 String NO_MESSAGE_PROVIDED = "No message provided";
45 String LOG_CLOSE = "{}: CLOSE";
46 String LOG_CLOSE_FAILED = "{}: CLOSE FAILED";
51 * @param partitionId id
52 * @param message the message
53 * @return true if success, false otherwise
54 * @throws IllegalArgumentException if no message provided
56 boolean send(String partitionId, String message);
59 * closes the publisher.
64 * Cambria based library publisher.
66 class CambriaPublisherWrapper implements BusPublisher {
68 private static final Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
71 * The actual Cambria publisher.
74 protected CambriaBatchingPublisher publisher;
79 * @param busTopicParams topic parameters
81 public CambriaPublisherWrapper(BusTopicParams busTopicParams) {
83 var builder = new CambriaClientBuilders.PublisherBuilder();
85 builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic());
87 // Set read timeout to 30 seconds (TBD: this should be configurable)
88 builder.withSocketTimeout(30000);
90 if (busTopicParams.isUseHttps()) {
91 if (busTopicParams.isAllowSelfSignedCerts()) {
92 builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
94 builder.withConnectionType(ConnectionType.HTTPS);
98 if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
99 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
102 if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
103 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
107 this.publisher = builder.build();
108 } catch (MalformedURLException | GeneralSecurityException e) {
109 throw new IllegalArgumentException(e);
114 public boolean send(String partitionId, String message) {
115 if (message == null) {
116 throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
120 this.publisher.send(partitionId, message);
121 } catch (Exception e) {
122 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
129 public void close() {
130 logger.info(LOG_CLOSE, this);
133 this.publisher.close();
134 } catch (Exception e) {
135 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
140 public String toString() {
141 return "CambriaPublisherWrapper []";
147 * Kafka based library publisher.
149 class KafkaPublisherWrapper implements BusPublisher {
151 private static final Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class);
152 private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
154 private final String topic;
159 private final Producer<String, String> producer;
160 protected Properties kafkaProps;
163 * Kafka Publisher Wrapper.
165 * @param busTopicParams topic parameters
167 protected KafkaPublisherWrapper(BusTopicParams busTopicParams) {
169 if (busTopicParams.isTopicInvalid()) {
170 throw new IllegalArgumentException("No topic for Kafka");
173 this.topic = busTopicParams.getTopic();
175 // Setup Properties for consumer
176 kafkaProps = new Properties();
177 kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0));
178 if (busTopicParams.isAdditionalPropsValid()) {
179 kafkaProps.putAll(busTopicParams.getAdditionalProps());
181 if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) {
182 kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
184 if (kafkaProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) == null) {
185 kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
188 if (busTopicParams.isAllowTracing()) {
189 kafkaProps.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
190 TracingProducerInterceptor.class.getName());
193 producer = new KafkaProducer<>(kafkaProps);
197 public boolean send(String partitionId, String message) {
198 if (message == null) {
199 throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
204 ProducerRecord<String, String> producerRecord =
205 new ProducerRecord<>(topic, UUID.randomUUID().toString(), message);
207 this.producer.send(producerRecord);
209 } catch (Exception e) {
210 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
217 public void close() {
218 logger.info(LOG_CLOSE, this);
221 this.producer.close();
222 } catch (Exception e) {
223 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
228 public String toString() {
229 return "KafkaPublisherWrapper []";