1b57e48ea3e125ec3c3dd50c14fabbe3fa382ca5
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
8  * Modifications Copyright (C) 2022-2024 Nordix Foundation.
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
25
26 import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor;
27 import java.util.Properties;
28 import java.util.UUID;
29 import org.apache.kafka.clients.producer.KafkaProducer;
30 import org.apache.kafka.clients.producer.Producer;
31 import org.apache.kafka.clients.producer.ProducerConfig;
32 import org.apache.kafka.clients.producer.ProducerRecord;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 public interface BusPublisher {
37
38     String NO_MESSAGE_PROVIDED = "No message provided";
39     String LOG_CLOSE = "{}: CLOSE";
40     String LOG_CLOSE_FAILED = "{}: CLOSE FAILED";
41
42     /**
43      * sends a message.
44      *
45      * @param partitionId id
46      * @param message the message
47      * @return true if success, false otherwise
48      * @throws IllegalArgumentException if no message provided
49      */
50     boolean send(String partitionId, String message);
51
52     /**
53      * closes the publisher.
54      */
55     void close();
56
57     /**
58      * Kafka based library publisher.
59      */
60     class KafkaPublisherWrapper implements BusPublisher {
61
62         private static final Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class);
63         private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
64
65         private final String topic;
66
67         /**
68          * Kafka publisher.
69          */
70         private final Producer<String, String> producer;
71         protected Properties kafkaProps;
72
73         /**
74          * Kafka Publisher Wrapper.
75          *
76          * @param busTopicParams topic parameters
77          */
78         protected KafkaPublisherWrapper(BusTopicParams busTopicParams) {
79
80             if (busTopicParams.isTopicInvalid()) {
81                 throw new IllegalArgumentException("No topic for Kafka");
82             }
83
84             this.topic = busTopicParams.getTopic();
85
86             // Setup Properties for consumer
87             kafkaProps = new Properties();
88             kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0));
89             if (busTopicParams.isAdditionalPropsValid()) {
90                 kafkaProps.putAll(busTopicParams.getAdditionalProps());
91             }
92             if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) {
93                 kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
94             }
95             if (kafkaProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) == null) {
96                 kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
97             }
98
99             if (busTopicParams.isAllowTracing()) {
100                 kafkaProps.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
101                         TracingProducerInterceptor.class.getName());
102             }
103
104             producer = new KafkaProducer<>(kafkaProps);
105         }
106
107         @Override
108         public boolean send(String partitionId, String message) {
109             if (message == null) {
110                 throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
111             }
112
113             try {
114                 // Create the record
115                 ProducerRecord<String, String> producerRecord =
116                         new ProducerRecord<>(topic, UUID.randomUUID().toString(), message);
117
118                 this.producer.send(producerRecord);
119                 producer.flush();
120             } catch (Exception e) {
121                 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
122                 return false;
123             }
124             return true;
125         }
126
127         @Override
128         public void close() {
129             logger.info(LOG_CLOSE, this);
130
131             try {
132                 this.producer.close();
133             } catch (Exception e) {
134                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
135             }
136         }
137
138         @Override
139         public String toString() {
140             return "KafkaPublisherWrapper []";
141         }
142
143     }
144 }