6c6a9183ac97b12d61665045ca97c0c12d7f67d9
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
8  * Modifications Copyright (C) 2022-2023 Nordix Foundation.
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
25
26 import com.att.nsa.apiClient.http.HttpClient.ConnectionType;
27 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
28 import com.att.nsa.cambria.client.CambriaClientBuilders;
29 import java.net.MalformedURLException;
30 import java.security.GeneralSecurityException;
31 import java.util.ArrayList;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Properties;
35 import java.util.UUID;
36 import java.util.concurrent.TimeUnit;
37 import org.apache.commons.lang3.StringUtils;
38 import org.apache.kafka.clients.producer.KafkaProducer;
39 import org.apache.kafka.clients.producer.Producer;
40 import org.apache.kafka.clients.producer.ProducerConfig;
41 import org.apache.kafka.clients.producer.ProducerRecord;
42 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
43 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
44 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
45 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
46 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 public interface BusPublisher {
51
52     /**
53      * sends a message.
54      *
55      * @param partitionId id
56      * @param message the message
57      * @return true if success, false otherwise
58      * @throws IllegalArgumentException if no message provided
59      */
60     public boolean send(String partitionId, String message);
61
62     /**
63      * closes the publisher.
64      */
65     public void close();
66
67     /**
68      * Cambria based library publisher.
69      */
70     public static class CambriaPublisherWrapper implements BusPublisher {
71
72         private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
73
74         /**
75          * The actual Cambria publisher.
76          */
77         @GsonJsonIgnore
78         protected CambriaBatchingPublisher publisher;
79
80         /**
81          * Constructor.
82          *
83          * @param busTopicParams topic parameters
84          */
85         public CambriaPublisherWrapper(BusTopicParams busTopicParams) {
86
87             var builder = new CambriaClientBuilders.PublisherBuilder();
88
89             builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic());
90
91             // Set read timeout to 30 seconds (TBD: this should be configurable)
92             builder.withSocketTimeout(30000);
93
94             if (busTopicParams.isUseHttps()) {
95                 if (busTopicParams.isAllowSelfSignedCerts()) {
96                     builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
97                 } else {
98                     builder.withConnectionType(ConnectionType.HTTPS);
99                 }
100             }
101
102
103             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
104                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
105             }
106
107             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
108                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
109             }
110
111             try {
112                 this.publisher = builder.build();
113             } catch (MalformedURLException | GeneralSecurityException e) {
114                 throw new IllegalArgumentException(e);
115             }
116         }
117
118         @Override
119         public boolean send(String partitionId, String message) {
120             if (message == null) {
121                 throw new IllegalArgumentException("No message provided");
122             }
123
124             try {
125                 this.publisher.send(partitionId, message);
126             } catch (Exception e) {
127                 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
128                 return false;
129             }
130             return true;
131         }
132
133         @Override
134         public void close() {
135             logger.info("{}: CLOSE", this);
136
137             try {
138                 this.publisher.close();
139             } catch (Exception e) {
140                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
141             }
142         }
143
144
145         @Override
146         public String toString() {
147             return "CambriaPublisherWrapper []";
148         }
149
150     }
151
152     /**
153      * Kafka based library publisher.
154      */
155     public static class KafkaPublisherWrapper implements BusPublisher {
156
157         private static Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class);
158         private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
159
160         private String topic;
161
162         /**
163          * Kafka publisher.
164          */
165         private Producer<String, String> producer;
166         protected Properties kafkaProps;
167
168         /**
169          * Kafka Publisher Wrapper.
170          *
171          * @param busTopicParams topic parameters
172          */
173         protected KafkaPublisherWrapper(BusTopicParams busTopicParams) {
174
175             if (busTopicParams.isTopicInvalid()) {
176                 throw new IllegalArgumentException("No topic for Kafka");
177             }
178
179             this.topic = busTopicParams.getTopic();
180
181             //Setup Properties for consumer
182             kafkaProps = new Properties();
183             kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0));
184             if (busTopicParams.isAdditionalPropsValid()) {
185                 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
186                     kafkaProps.put(entry.getKey(), entry.getValue());
187                 }
188             }
189             if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) {
190                 kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
191             }
192             if (kafkaProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) == null) {
193                 kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
194             }
195
196             producer = new KafkaProducer<>(kafkaProps);
197         }
198
199         @Override
200         public boolean send(String partitionId, String message) {
201             if (message == null) {
202                 throw new IllegalArgumentException("No message provided");
203             }
204
205             try {
206                 //Create the record
207                 ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,
208                     UUID.randomUUID().toString(), message);
209
210                 this.producer.send(record);
211                 producer.flush();
212             } catch (Exception e) {
213                 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
214                 return false;
215             }
216             return true;
217         }
218
219         @Override
220         public void close() {
221             logger.info("{}: CLOSE", this);
222
223             try {
224                 this.producer.close();
225             } catch (Exception e) {
226                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
227             }
228         }
229
230
231         @Override
232         public String toString() {
233             return "KafkaPublisherWrapper []";
234         }
235
236     }
237
238     /**
239      * DmaapClient library wrapper.
240      */
241     public abstract class DmaapPublisherWrapper implements BusPublisher {
242
243         private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
244
245         /**
246          * MR based Publisher.
247          */
248         protected MRSimplerBatchPublisher publisher;
249         protected Properties props;
250
251         /**
252          * MR Publisher Wrapper.
253          *
254          * @param servers messaging bus hosts
255          * @param topic topic
256          * @param username AAF or DME2 Login
257          * @param password AAF or DME2 Password
258          */
259         protected DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic,
260                 String username, String password, boolean useHttps) {
261
262
263             if (StringUtils.isBlank(topic)) {
264                 throw new IllegalArgumentException("No topic for DMaaP");
265             }
266
267
268             configureProtocol(topic, protocol, servers, useHttps);
269
270             this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
271
272             this.publisher.setUsername(username);
273             this.publisher.setPassword(password);
274
275             props = new Properties();
276
277             props.setProperty("Protocol", (useHttps ? "https" : "http"));
278             props.setProperty("contenttype", "application/json");
279             props.setProperty("username", username);
280             props.setProperty("password", password);
281
282             props.setProperty("topic", topic);
283
284             this.publisher.setProps(props);
285
286             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
287                 this.publisher.setHost(servers.get(0));
288             }
289
290             logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
291         }
292
293         private void configureProtocol(String topic, ProtocolTypeConstants protocol, List<String> servers,
294                         boolean useHttps) {
295
296             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
297                 if (servers == null || servers.isEmpty()) {
298                     throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
299                 }
300
301                 ArrayList<String> dmaapServers = new ArrayList<>();
302                 String port = useHttps ? ":3905" : ":3904";
303                 for (String server : servers) {
304                     dmaapServers.add(server + port);
305                 }
306
307
308                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
309
310                 this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
311
312             } else if (protocol == ProtocolTypeConstants.DME2) {
313                 ArrayList<String> dmaapServers = new ArrayList<>();
314                 dmaapServers.add("0.0.0.0:3904");
315
316                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
317
318                 this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
319
320             } else {
321                 throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol);
322             }
323         }
324
325         @Override
326         public void close() {
327             logger.info("{}: CLOSE", this);
328
329             try {
330                 this.publisher.close(1, TimeUnit.SECONDS);
331
332             } catch (InterruptedException e) {
333                 logger.warn("{}: CLOSE FAILED", this, e);
334                 Thread.currentThread().interrupt();
335
336             } catch (Exception e) {
337                 logger.warn("{}: CLOSE FAILED", this, e);
338             }
339         }
340
341         @Override
342         public boolean send(String partitionId, String message) {
343             if (message == null) {
344                 throw new IllegalArgumentException("No message provided");
345             }
346
347             this.publisher.setPubResponse(new MRPublisherResponse());
348             this.publisher.send(partitionId, message);
349             MRPublisherResponse response = this.publisher.sendBatchWithResponse();
350             if (response != null) {
351                 logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(),
352                         response.getResponseMessage());
353             }
354
355             return true;
356         }
357
358         @Override
359         public String toString() {
360             return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
361                     + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()="
362                     + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag()
363                     + ", publisher.getUsername()=" + publisher.getUsername() + "]";
364         }
365     }
366
367     /**
368      * DmaapClient library wrapper.
369      */
370     public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
371         /**
372          * MR based Publisher.
373          */
374         public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
375                 boolean useHttps) {
376
377             super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
378         }
379     }
380
381     public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
382
383         /**
384          * Constructor.
385          *
386          * @param busTopicParams topic parameters
387          */
388         public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) {
389
390             super(ProtocolTypeConstants.DME2, busTopicParams.getServers(), busTopicParams.getTopic(),
391                     busTopicParams.getUserName(), busTopicParams.getPassword(), busTopicParams.isUseHttps());
392
393             String dme2RouteOffer = busTopicParams.isAdditionalPropsValid()
394                             ? busTopicParams.getAdditionalProps().get(
395                                             PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
396                             : null;
397
398             validateParams(busTopicParams, dme2RouteOffer);
399
400             String serviceName = busTopicParams.getServers().get(0);
401
402             /* These are required, no defaults */
403             props.setProperty("Environment", busTopicParams.getEnvironment());
404             props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
405
406             props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
407
408             if (busTopicParams.getPartner() != null) {
409                 props.setProperty("Partner", busTopicParams.getPartner());
410             }
411             if (dme2RouteOffer != null) {
412                 props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
413             }
414
415             props.setProperty("Latitude", busTopicParams.getLatitude());
416             props.setProperty("Longitude", busTopicParams.getLongitude());
417
418             // ServiceName also a default, found in additionalProps
419
420             /* These are optional, will default to these values if not set in optionalProps */
421             props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
422             props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
423             props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
424             props.setProperty("Version", "1.0");
425             props.setProperty("SubContextPath", "/");
426             props.setProperty("sessionstickinessrequired", "no");
427
428             /* These should not change */
429             props.setProperty("TransportType", "DME2");
430             props.setProperty("MethodType", "POST");
431
432             if (busTopicParams.isAdditionalPropsValid()) {
433                 addAdditionalProps(busTopicParams);
434             }
435
436             this.publisher.setProps(props);
437         }
438
439         private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) {
440             if (busTopicParams.isEnvironmentInvalid()) {
441                 throw parmException(busTopicParams.getTopic(),
442                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
443             }
444             if (busTopicParams.isAftEnvironmentInvalid()) {
445                 throw parmException(busTopicParams.getTopic(),
446                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
447             }
448             if (busTopicParams.isLatitudeInvalid()) {
449                 throw parmException(busTopicParams.getTopic(),
450                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
451             }
452             if (busTopicParams.isLongitudeInvalid()) {
453                 throw parmException(busTopicParams.getTopic(),
454                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
455             }
456
457             if ((busTopicParams.isPartnerInvalid())
458                     && StringUtils.isBlank(dme2RouteOffer)) {
459                 throw new IllegalArgumentException(
460                         "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
461                                 + busTopicParams.getTopic()
462                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
463                                 + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
464                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
465             }
466         }
467
468         private void addAdditionalProps(BusTopicParams busTopicParams) {
469             for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
470                 String key = entry.getKey();
471                 String value = entry.getValue();
472
473                 if (value != null) {
474                     props.setProperty(key, value);
475                 }
476             }
477         }
478
479         private IllegalArgumentException parmException(String topic, String propnm) {
480             return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
481                     + topic + propnm + " property for DME2 in DMaaP");
482
483         }
484     }
485 }