e2adde0d0259efbd628af53ed647fda85ce5f707
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
8  * Modifications Copyright (C) 2022-2024 Nordix Foundation.
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
25
26 import com.att.nsa.apiClient.http.HttpClient.ConnectionType;
27 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
28 import com.att.nsa.cambria.client.CambriaClientBuilders;
29 import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor;
30 import java.net.MalformedURLException;
31 import java.security.GeneralSecurityException;
32 import java.util.Properties;
33 import java.util.UUID;
34 import org.apache.kafka.clients.producer.KafkaProducer;
35 import org.apache.kafka.clients.producer.Producer;
36 import org.apache.kafka.clients.producer.ProducerConfig;
37 import org.apache.kafka.clients.producer.ProducerRecord;
38 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 public interface BusPublisher {
43
44     String NO_MESSAGE_PROVIDED = "No message provided";
45     String LOG_CLOSE = "{}: CLOSE";
46     String LOG_CLOSE_FAILED = "{}: CLOSE FAILED";
47
48     /**
49      * sends a message.
50      *
51      * @param partitionId id
52      * @param message the message
53      * @return true if success, false otherwise
54      * @throws IllegalArgumentException if no message provided
55      */
56     boolean send(String partitionId, String message);
57
58     /**
59      * closes the publisher.
60      */
61     void close();
62
63     /**
64      * Cambria based library publisher.
65      */
66     class CambriaPublisherWrapper implements BusPublisher {
67
68         private static final Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
69
70         /**
71          * The actual Cambria publisher.
72          */
73         @GsonJsonIgnore
74         protected CambriaBatchingPublisher publisher;
75
76         /**
77          * Constructor.
78          *
79          * @param busTopicParams topic parameters
80          */
81         public CambriaPublisherWrapper(BusTopicParams busTopicParams) {
82
83             var builder = new CambriaClientBuilders.PublisherBuilder();
84
85             builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic());
86
87             // Set read timeout to 30 seconds (TBD: this should be configurable)
88             builder.withSocketTimeout(30000);
89
90             if (busTopicParams.isUseHttps()) {
91                 if (busTopicParams.isAllowSelfSignedCerts()) {
92                     builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
93                 } else {
94                     builder.withConnectionType(ConnectionType.HTTPS);
95                 }
96             }
97
98             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
99                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
100             }
101
102             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
103                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
104             }
105
106             try {
107                 this.publisher = builder.build();
108             } catch (MalformedURLException | GeneralSecurityException e) {
109                 throw new IllegalArgumentException(e);
110             }
111         }
112
113         @Override
114         public boolean send(String partitionId, String message) {
115             if (message == null) {
116                 throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
117             }
118
119             try {
120                 this.publisher.send(partitionId, message);
121             } catch (Exception e) {
122                 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
123                 return false;
124             }
125             return true;
126         }
127
128         @Override
129         public void close() {
130             logger.info(LOG_CLOSE, this);
131
132             try {
133                 this.publisher.close();
134             } catch (Exception e) {
135                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
136             }
137         }
138
139         @Override
140         public String toString() {
141             return "CambriaPublisherWrapper []";
142         }
143
144     }
145
146     /**
147      * Kafka based library publisher.
148      */
149     class KafkaPublisherWrapper implements BusPublisher {
150
151         private static final Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class);
152         private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
153
154         private final String topic;
155
156         /**
157          * Kafka publisher.
158          */
159         private final Producer<String, String> producer;
160         protected Properties kafkaProps;
161
162         /**
163          * Kafka Publisher Wrapper.
164          *
165          * @param busTopicParams topic parameters
166          */
167         protected KafkaPublisherWrapper(BusTopicParams busTopicParams) {
168
169             if (busTopicParams.isTopicInvalid()) {
170                 throw new IllegalArgumentException("No topic for Kafka");
171             }
172
173             this.topic = busTopicParams.getTopic();
174
175             // Setup Properties for consumer
176             kafkaProps = new Properties();
177             kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0));
178             if (busTopicParams.isAdditionalPropsValid()) {
179                 kafkaProps.putAll(busTopicParams.getAdditionalProps());
180             }
181             if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) {
182                 kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
183             }
184             if (kafkaProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) == null) {
185                 kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
186             }
187
188             if (busTopicParams.isAllowTracing()) {
189                 kafkaProps.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
190                         TracingProducerInterceptor.class.getName());
191             }
192
193             producer = new KafkaProducer<>(kafkaProps);
194         }
195
196         @Override
197         public boolean send(String partitionId, String message) {
198             if (message == null) {
199                 throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
200             }
201
202             try {
203                 // Create the record
204                 ProducerRecord<String, String> producerRecord =
205                         new ProducerRecord<>(topic, UUID.randomUUID().toString(), message);
206
207                 this.producer.send(producerRecord);
208                 producer.flush();
209             } catch (Exception e) {
210                 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
211                 return false;
212             }
213             return true;
214         }
215
216         @Override
217         public void close() {
218             logger.info(LOG_CLOSE, this);
219
220             try {
221                 this.producer.close();
222             } catch (Exception e) {
223                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
224             }
225         }
226
227         @Override
228         public String toString() {
229             return "KafkaPublisherWrapper []";
230         }
231
232     }
233 }