92f7bc6fa30954a4effc56c470dfd95e11881863
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
8  * Modifications Copyright (C) 2022-2023 Nordix Foundation.
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
25
26 import com.att.nsa.apiClient.http.HttpClient.ConnectionType;
27 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
28 import com.att.nsa.cambria.client.CambriaClientBuilders;
29 import java.net.MalformedURLException;
30 import java.security.GeneralSecurityException;
31 import java.util.ArrayList;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Properties;
35 import java.util.UUID;
36 import java.util.concurrent.TimeUnit;
37 import org.apache.commons.lang3.StringUtils;
38 import org.apache.kafka.clients.producer.KafkaProducer;
39 import org.apache.kafka.clients.producer.Producer;
40 import org.apache.kafka.clients.producer.ProducerConfig;
41 import org.apache.kafka.clients.producer.ProducerRecord;
42 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
43 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
44 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
45 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
46 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 public interface BusPublisher {
51
52     public static final String NO_MESSAGE_PROVIDED = "No message provided";
53
54     /**
55      * sends a message.
56      *
57      * @param partitionId id
58      * @param message the message
59      * @return true if success, false otherwise
60      * @throws IllegalArgumentException if no message provided
61      */
62     public boolean send(String partitionId, String message);
63
64     /**
65      * closes the publisher.
66      */
67     public void close();
68
69     /**
70      * Cambria based library publisher.
71      */
72     public static class CambriaPublisherWrapper implements BusPublisher {
73
74         private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
75
76         /**
77          * The actual Cambria publisher.
78          */
79         @GsonJsonIgnore
80         protected CambriaBatchingPublisher publisher;
81
82         /**
83          * Constructor.
84          *
85          * @param busTopicParams topic parameters
86          */
87         public CambriaPublisherWrapper(BusTopicParams busTopicParams) {
88
89             var builder = new CambriaClientBuilders.PublisherBuilder();
90
91             builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic());
92
93             // Set read timeout to 30 seconds (TBD: this should be configurable)
94             builder.withSocketTimeout(30000);
95
96             if (busTopicParams.isUseHttps()) {
97                 if (busTopicParams.isAllowSelfSignedCerts()) {
98                     builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
99                 } else {
100                     builder.withConnectionType(ConnectionType.HTTPS);
101                 }
102             }
103
104             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
105                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
106             }
107
108             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
109                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
110             }
111
112             try {
113                 this.publisher = builder.build();
114             } catch (MalformedURLException | GeneralSecurityException e) {
115                 throw new IllegalArgumentException(e);
116             }
117         }
118
119         @Override
120         public boolean send(String partitionId, String message) {
121             if (message == null) {
122                 throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
123             }
124
125             try {
126                 this.publisher.send(partitionId, message);
127             } catch (Exception e) {
128                 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
129                 return false;
130             }
131             return true;
132         }
133
134         @Override
135         public void close() {
136             logger.info("{}: CLOSE", this);
137
138             try {
139                 this.publisher.close();
140             } catch (Exception e) {
141                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
142             }
143         }
144
145         @Override
146         public String toString() {
147             return "CambriaPublisherWrapper []";
148         }
149
150     }
151
152     /**
153      * Kafka based library publisher.
154      */
155     public static class KafkaPublisherWrapper implements BusPublisher {
156
157         private static Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class);
158         private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
159
160         private String topic;
161
162         /**
163          * Kafka publisher.
164          */
165         private Producer<String, String> producer;
166         protected Properties kafkaProps;
167
168         /**
169          * Kafka Publisher Wrapper.
170          *
171          * @param busTopicParams topic parameters
172          */
173         protected KafkaPublisherWrapper(BusTopicParams busTopicParams) {
174
175             if (busTopicParams.isTopicInvalid()) {
176                 throw new IllegalArgumentException("No topic for Kafka");
177             }
178
179             this.topic = busTopicParams.getTopic();
180
181             // Setup Properties for consumer
182             kafkaProps = new Properties();
183             kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0));
184             if (busTopicParams.isAdditionalPropsValid()) {
185                 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
186                     kafkaProps.put(entry.getKey(), entry.getValue());
187                 }
188             }
189             if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) {
190                 kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
191             }
192             if (kafkaProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) == null) {
193                 kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
194             }
195
196             producer = new KafkaProducer<>(kafkaProps);
197         }
198
199         @Override
200         public boolean send(String partitionId, String message) {
201             if (message == null) {
202                 throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
203             }
204
205             try {
206                 // Create the record
207                 ProducerRecord<String, String> producerRecord =
208                         new ProducerRecord<>(topic, UUID.randomUUID().toString(), message);
209
210                 this.producer.send(producerRecord);
211                 producer.flush();
212             } catch (Exception e) {
213                 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
214                 return false;
215             }
216             return true;
217         }
218
219         @Override
220         public void close() {
221             logger.info("{}: CLOSE", this);
222
223             try {
224                 this.producer.close();
225             } catch (Exception e) {
226                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
227             }
228         }
229
230         @Override
231         public String toString() {
232             return "KafkaPublisherWrapper []";
233         }
234
235     }
236
237     /**
238      * DmaapClient library wrapper.
239      */
240     public abstract class DmaapPublisherWrapper implements BusPublisher {
241
242         private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
243
244         /**
245          * MR based Publisher.
246          */
247         protected MRSimplerBatchPublisher publisher;
248         protected Properties props;
249
250         /**
251          * MR Publisher Wrapper.
252          *
253          * @param servers messaging bus hosts
254          * @param topic topic
255          * @param username AAF or DME2 Login
256          * @param password AAF or DME2 Password
257          */
258         protected DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic,
259                 String username, String password, boolean useHttps) {
260
261             if (StringUtils.isBlank(topic)) {
262                 throw new IllegalArgumentException("No topic for DMaaP");
263             }
264
265             configureProtocol(topic, protocol, servers, useHttps);
266
267             this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
268
269             this.publisher.setUsername(username);
270             this.publisher.setPassword(password);
271
272             props = new Properties();
273
274             props.setProperty("Protocol", (useHttps ? "https" : "http"));
275             props.setProperty("contenttype", "application/json");
276             props.setProperty("username", username);
277             props.setProperty("password", password);
278
279             props.setProperty("topic", topic);
280
281             this.publisher.setProps(props);
282
283             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
284                 this.publisher.setHost(servers.get(0));
285             }
286
287             logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
288         }
289
290         private void configureProtocol(String topic, ProtocolTypeConstants protocol, List<String> servers,
291                 boolean useHttps) {
292
293             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
294                 if (servers == null || servers.isEmpty()) {
295                     throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
296                 }
297
298                 ArrayList<String> dmaapServers = new ArrayList<>();
299                 String port = useHttps ? ":3905" : ":3904";
300                 for (String server : servers) {
301                     dmaapServers.add(server + port);
302                 }
303
304                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
305
306                 this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
307
308             } else if (protocol == ProtocolTypeConstants.DME2) {
309                 ArrayList<String> dmaapServers = new ArrayList<>();
310                 dmaapServers.add("0.0.0.0:3904");
311
312                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
313
314                 this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
315
316             } else {
317                 throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol);
318             }
319         }
320
321         @Override
322         public void close() {
323             logger.info("{}: CLOSE", this);
324
325             try {
326                 this.publisher.close(1, TimeUnit.SECONDS);
327
328             } catch (InterruptedException e) {
329                 logger.warn("{}: CLOSE FAILED", this, e);
330                 Thread.currentThread().interrupt();
331
332             } catch (Exception e) {
333                 logger.warn("{}: CLOSE FAILED", this, e);
334             }
335         }
336
337         @Override
338         public boolean send(String partitionId, String message) {
339             if (message == null) {
340                 throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
341             }
342
343             this.publisher.setPubResponse(new MRPublisherResponse());
344             this.publisher.send(partitionId, message);
345             MRPublisherResponse response = this.publisher.sendBatchWithResponse();
346             if (response != null) {
347                 logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(),
348                         response.getResponseMessage());
349             }
350
351             return true;
352         }
353
354         @Override
355         public String toString() {
356             return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
357                     + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()="
358                     + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag()
359                     + ", publisher.getUsername()=" + publisher.getUsername() + "]";
360         }
361     }
362
363     /**
364      * DmaapClient library wrapper.
365      */
366     public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
367         /**
368          * MR based Publisher.
369          */
370         public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
371                 boolean useHttps) {
372
373             super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
374         }
375     }
376
377     public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
378
379         /**
380          * Constructor.
381          *
382          * @param busTopicParams topic parameters
383          */
384         public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) {
385
386             super(ProtocolTypeConstants.DME2, busTopicParams.getServers(), busTopicParams.getTopic(),
387                     busTopicParams.getUserName(), busTopicParams.getPassword(), busTopicParams.isUseHttps());
388
389             String dme2RouteOffer = busTopicParams.isAdditionalPropsValid()
390                     ? busTopicParams.getAdditionalProps().get(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
391                     : null;
392
393             validateParams(busTopicParams, dme2RouteOffer);
394
395             String serviceName = busTopicParams.getServers().get(0);
396
397             /* These are required, no defaults */
398             props.setProperty("Environment", busTopicParams.getEnvironment());
399             props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
400
401             props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
402
403             if (busTopicParams.getPartner() != null) {
404                 props.setProperty("Partner", busTopicParams.getPartner());
405             }
406             if (dme2RouteOffer != null) {
407                 props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
408             }
409
410             props.setProperty("Latitude", busTopicParams.getLatitude());
411             props.setProperty("Longitude", busTopicParams.getLongitude());
412
413             // ServiceName also a default, found in additionalProps
414
415             /* These are optional, will default to these values if not set in optionalProps */
416             props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
417             props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
418             props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
419             props.setProperty("Version", "1.0");
420             props.setProperty("SubContextPath", "/");
421             props.setProperty("sessionstickinessrequired", "no");
422
423             /* These should not change */
424             props.setProperty("TransportType", "DME2");
425             props.setProperty("MethodType", "POST");
426
427             if (busTopicParams.isAdditionalPropsValid()) {
428                 addAdditionalProps(busTopicParams);
429             }
430
431             this.publisher.setProps(props);
432         }
433
434         private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) {
435             if (busTopicParams.isEnvironmentInvalid()) {
436                 throw parmException(busTopicParams.getTopic(),
437                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
438             }
439             if (busTopicParams.isAftEnvironmentInvalid()) {
440                 throw parmException(busTopicParams.getTopic(),
441                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
442             }
443             if (busTopicParams.isLatitudeInvalid()) {
444                 throw parmException(busTopicParams.getTopic(),
445                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
446             }
447             if (busTopicParams.isLongitudeInvalid()) {
448                 throw parmException(busTopicParams.getTopic(),
449                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
450             }
451
452             if ((busTopicParams.isPartnerInvalid()) && StringUtils.isBlank(dme2RouteOffer)) {
453                 throw new IllegalArgumentException("Must provide at least "
454                         + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + busTopicParams.getTopic()
455                         + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
456                         + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
457                         + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
458             }
459         }
460
461         private void addAdditionalProps(BusTopicParams busTopicParams) {
462             for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
463                 String key = entry.getKey();
464                 String value = entry.getValue();
465
466                 if (value != null) {
467                     props.setProperty(key, value);
468                 }
469             }
470         }
471
472         private IllegalArgumentException parmException(String topic, String propnm) {
473             return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
474                     + topic + propnm + " property for DME2 in DMaaP");
475
476         }
477     }
478 }