def8f841cac3c4d72e9e6288c4cf040a0688e9e3
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
8  * Modifications Copyright (C) 2022-2023 Nordix Foundation.
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
25
26 import com.att.nsa.apiClient.http.HttpClient.ConnectionType;
27 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
28 import com.att.nsa.cambria.client.CambriaClientBuilders;
29 import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor;
30 import java.net.MalformedURLException;
31 import java.security.GeneralSecurityException;
32 import java.util.ArrayList;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Properties;
36 import java.util.UUID;
37 import java.util.concurrent.TimeUnit;
38 import org.apache.commons.lang3.StringUtils;
39 import org.apache.kafka.clients.producer.KafkaProducer;
40 import org.apache.kafka.clients.producer.Producer;
41 import org.apache.kafka.clients.producer.ProducerConfig;
42 import org.apache.kafka.clients.producer.ProducerRecord;
43 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
44 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
45 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
46 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
47 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 public interface BusPublisher {
52
53     String NO_MESSAGE_PROVIDED = "No message provided";
54     String LOG_CLOSE = "{}: CLOSE";
55     String LOG_CLOSE_FAILED = "{}: CLOSE FAILED";
56
57     /**
58      * sends a message.
59      *
60      * @param partitionId id
61      * @param message the message
62      * @return true if success, false otherwise
63      * @throws IllegalArgumentException if no message provided
64      */
65     boolean send(String partitionId, String message);
66
67     /**
68      * closes the publisher.
69      */
70     void close();
71
72     /**
73      * Cambria based library publisher.
74      */
75     class CambriaPublisherWrapper implements BusPublisher {
76
77         private static final Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
78
79         /**
80          * The actual Cambria publisher.
81          */
82         @GsonJsonIgnore
83         protected CambriaBatchingPublisher publisher;
84
85         /**
86          * Constructor.
87          *
88          * @param busTopicParams topic parameters
89          */
90         public CambriaPublisherWrapper(BusTopicParams busTopicParams) {
91
92             var builder = new CambriaClientBuilders.PublisherBuilder();
93
94             builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic());
95
96             // Set read timeout to 30 seconds (TBD: this should be configurable)
97             builder.withSocketTimeout(30000);
98
99             if (busTopicParams.isUseHttps()) {
100                 if (busTopicParams.isAllowSelfSignedCerts()) {
101                     builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
102                 } else {
103                     builder.withConnectionType(ConnectionType.HTTPS);
104                 }
105             }
106
107             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
108                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
109             }
110
111             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
112                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
113             }
114
115             try {
116                 this.publisher = builder.build();
117             } catch (MalformedURLException | GeneralSecurityException e) {
118                 throw new IllegalArgumentException(e);
119             }
120         }
121
122         @Override
123         public boolean send(String partitionId, String message) {
124             if (message == null) {
125                 throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
126             }
127
128             try {
129                 this.publisher.send(partitionId, message);
130             } catch (Exception e) {
131                 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
132                 return false;
133             }
134             return true;
135         }
136
137         @Override
138         public void close() {
139             logger.info(LOG_CLOSE, this);
140
141             try {
142                 this.publisher.close();
143             } catch (Exception e) {
144                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
145             }
146         }
147
148         @Override
149         public String toString() {
150             return "CambriaPublisherWrapper []";
151         }
152
153     }
154
155     /**
156      * Kafka based library publisher.
157      */
158     class KafkaPublisherWrapper implements BusPublisher {
159
160         private static final Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class);
161         private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
162
163         private final String topic;
164
165         /**
166          * Kafka publisher.
167          */
168         private final Producer<String, String> producer;
169         protected Properties kafkaProps;
170
171         /**
172          * Kafka Publisher Wrapper.
173          *
174          * @param busTopicParams topic parameters
175          */
176         protected KafkaPublisherWrapper(BusTopicParams busTopicParams) {
177
178             if (busTopicParams.isTopicInvalid()) {
179                 throw new IllegalArgumentException("No topic for Kafka");
180             }
181
182             this.topic = busTopicParams.getTopic();
183
184             // Setup Properties for consumer
185             kafkaProps = new Properties();
186             kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0));
187             if (busTopicParams.isAdditionalPropsValid()) {
188                 kafkaProps.putAll(busTopicParams.getAdditionalProps());
189             }
190             if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) {
191                 kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
192             }
193             if (kafkaProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) == null) {
194                 kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
195             }
196
197             if (busTopicParams.isAllowTracing()) {
198                 kafkaProps.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
199                         TracingProducerInterceptor.class.getName());
200             }
201
202             producer = new KafkaProducer<>(kafkaProps);
203         }
204
205         @Override
206         public boolean send(String partitionId, String message) {
207             if (message == null) {
208                 throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
209             }
210
211             try {
212                 // Create the record
213                 ProducerRecord<String, String> producerRecord =
214                         new ProducerRecord<>(topic, UUID.randomUUID().toString(), message);
215
216                 this.producer.send(producerRecord);
217                 producer.flush();
218             } catch (Exception e) {
219                 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
220                 return false;
221             }
222             return true;
223         }
224
225         @Override
226         public void close() {
227             logger.info(LOG_CLOSE, this);
228
229             try {
230                 this.producer.close();
231             } catch (Exception e) {
232                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
233             }
234         }
235
236         @Override
237         public String toString() {
238             return "KafkaPublisherWrapper []";
239         }
240
241     }
242
243     /**
244      * DmaapClient library wrapper.
245      */
246     abstract class DmaapPublisherWrapper implements BusPublisher {
247
248         private static final Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
249
250         /**
251          * MR based Publisher.
252          */
253         protected MRSimplerBatchPublisher publisher;
254         protected Properties props;
255
256         /**
257          * MR Publisher Wrapper.
258          *
259          * @param servers messaging bus hosts
260          * @param topic topic
261          * @param username AAF or DME2 Login
262          * @param password AAF or DME2 Password
263          */
264         protected DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic,
265                 String username, String password, boolean useHttps) {
266
267             if (StringUtils.isBlank(topic)) {
268                 throw new IllegalArgumentException("No topic for DMaaP");
269             }
270
271             configureProtocol(topic, protocol, servers, useHttps);
272
273             this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
274
275             this.publisher.setUsername(username);
276             this.publisher.setPassword(password);
277
278             props = new Properties();
279
280             props.setProperty("Protocol", (useHttps ? "https" : "http"));
281             props.setProperty("contenttype", "application/json");
282             props.setProperty("username", username);
283             props.setProperty("password", password);
284
285             props.setProperty("topic", topic);
286
287             this.publisher.setProps(props);
288
289             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
290                 this.publisher.setHost(servers.get(0));
291             }
292
293             logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
294         }
295
296         private void configureProtocol(String topic, ProtocolTypeConstants protocol, List<String> servers,
297                 boolean useHttps) {
298
299             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
300                 if (servers == null || servers.isEmpty()) {
301                     throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
302                 }
303
304                 ArrayList<String> dmaapServers = new ArrayList<>();
305                 String port = useHttps ? ":3905" : ":3904";
306                 for (String server : servers) {
307                     dmaapServers.add(server + port);
308                 }
309
310                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
311
312                 this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
313
314             } else if (protocol == ProtocolTypeConstants.DME2) {
315                 ArrayList<String> dmaapServers = new ArrayList<>();
316                 dmaapServers.add("0.0.0.0:3904");
317
318                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
319
320                 this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
321
322             } else {
323                 throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol);
324             }
325         }
326
327         @Override
328         public void close() {
329             logger.info(LOG_CLOSE, this);
330
331             try {
332                 this.publisher.close(1, TimeUnit.SECONDS);
333
334             } catch (InterruptedException e) {
335                 logger.warn(LOG_CLOSE_FAILED, this, e);
336                 Thread.currentThread().interrupt();
337
338             } catch (Exception e) {
339                 logger.warn(LOG_CLOSE_FAILED, this, e);
340             }
341         }
342
343         @Override
344         public boolean send(String partitionId, String message) {
345             if (message == null) {
346                 throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
347             }
348
349             this.publisher.setPubResponse(new MRPublisherResponse());
350             this.publisher.send(partitionId, message);
351             MRPublisherResponse response = this.publisher.sendBatchWithResponse();
352             if (response != null) {
353                 logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(),
354                         response.getResponseMessage());
355             }
356
357             return true;
358         }
359
360         @Override
361         public String toString() {
362             return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
363                     + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()="
364                     + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag()
365                     + ", publisher.getUsername()=" + publisher.getUsername() + "]";
366         }
367     }
368
369     /**
370      * DmaapClient library wrapper.
371      */
372     class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
373         /**
374          * MR based Publisher.
375          */
376         public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
377                 boolean useHttps) {
378
379             super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
380         }
381     }
382
383     class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
384
385         /**
386          * Constructor.
387          *
388          * @param busTopicParams topic parameters
389          */
390         public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) {
391
392             super(ProtocolTypeConstants.DME2, busTopicParams.getServers(), busTopicParams.getTopic(),
393                     busTopicParams.getUserName(), busTopicParams.getPassword(), busTopicParams.isUseHttps());
394
395             String dme2RouteOffer = busTopicParams.isAdditionalPropsValid()
396                     ? busTopicParams.getAdditionalProps().get(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
397                     : null;
398
399             validateParams(busTopicParams, dme2RouteOffer);
400
401             String serviceName = busTopicParams.getServers().get(0);
402
403             /* These are required, no defaults */
404             props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
405
406             BusHelper.setCommonProperties(busTopicParams, dme2RouteOffer, props);
407
408             props.setProperty("MethodType", "POST");
409
410             if (busTopicParams.isAdditionalPropsValid()) {
411                 addAdditionalProps(busTopicParams);
412             }
413
414             this.publisher.setProps(props);
415         }
416
417         private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) {
418             BusHelper.validateBusTopicParams(busTopicParams, PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS);
419
420             if ((busTopicParams.isPartnerInvalid()) && StringUtils.isBlank(dme2RouteOffer)) {
421                 throw new IllegalArgumentException("Must provide at least "
422                         + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + busTopicParams.getTopic()
423                         + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
424                         + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
425                         + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
426             }
427         }
428
429         private void addAdditionalProps(BusTopicParams busTopicParams) {
430             for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
431                 String key = entry.getKey();
432                 String value = entry.getValue();
433
434                 if (value != null) {
435                     props.setProperty(key, value);
436                 }
437             }
438         }
439     }
440 }