fe9bab23045e04adb4d00ed41069f76c7aa8e0d8
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
8  * Copyright (C) 2022 Nordix Foundation.
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
25
26 import com.att.nsa.apiClient.http.HttpClient.ConnectionType;
27 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
28 import com.att.nsa.cambria.client.CambriaClientBuilders;
29 import java.net.MalformedURLException;
30 import java.security.GeneralSecurityException;
31 import java.util.ArrayList;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Properties;
35 import java.util.UUID;
36 import java.util.concurrent.TimeUnit;
37 import org.apache.commons.lang3.StringUtils;
38 import org.apache.kafka.clients.consumer.ConsumerConfig;
39 import org.apache.kafka.clients.producer.KafkaProducer;
40 import org.apache.kafka.clients.producer.Producer;
41 import org.apache.kafka.clients.producer.ProducerConfig;
42 import org.apache.kafka.clients.producer.ProducerRecord;
43 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
44 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
45 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
46 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
47 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 public interface BusPublisher {
52
53     /**
54      * sends a message.
55      *
56      * @param partitionId id
57      * @param message the message
58      * @return true if success, false otherwise
59      * @throws IllegalArgumentException if no message provided
60      */
61     public boolean send(String partitionId, String message);
62
63     /**
64      * closes the publisher.
65      */
66     public void close();
67
68     /**
69      * Cambria based library publisher.
70      */
71     public static class CambriaPublisherWrapper implements BusPublisher {
72
73         private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
74
75         /**
76          * The actual Cambria publisher.
77          */
78         @GsonJsonIgnore
79         protected CambriaBatchingPublisher publisher;
80
81         /**
82          * Constructor.
83          *
84          * @param busTopicParams topic parameters
85          */
86         public CambriaPublisherWrapper(BusTopicParams busTopicParams) {
87
88             var builder = new CambriaClientBuilders.PublisherBuilder();
89
90             builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic());
91
92             // Set read timeout to 30 seconds (TBD: this should be configurable)
93             builder.withSocketTimeout(30000);
94
95             if (busTopicParams.isUseHttps()) {
96                 if (busTopicParams.isAllowSelfSignedCerts()) {
97                     builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
98                 } else {
99                     builder.withConnectionType(ConnectionType.HTTPS);
100                 }
101             }
102
103
104             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
105                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
106             }
107
108             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
109                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
110             }
111
112             try {
113                 this.publisher = builder.build();
114             } catch (MalformedURLException | GeneralSecurityException e) {
115                 throw new IllegalArgumentException(e);
116             }
117         }
118
119         @Override
120         public boolean send(String partitionId, String message) {
121             if (message == null) {
122                 throw new IllegalArgumentException("No message provided");
123             }
124
125             try {
126                 this.publisher.send(partitionId, message);
127             } catch (Exception e) {
128                 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
129                 return false;
130             }
131             return true;
132         }
133
134         @Override
135         public void close() {
136             logger.info("{}: CLOSE", this);
137
138             try {
139                 this.publisher.close();
140             } catch (Exception e) {
141                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
142             }
143         }
144
145
146         @Override
147         public String toString() {
148             return "CambriaPublisherWrapper []";
149         }
150
151     }
152
153     /**
154      * Kafka based library publisher.
155      */
156     public static class KafkaPublisherWrapper implements BusPublisher {
157
158         private static Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class);
159         private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
160
161         private String topic;
162
163         /**
164          * Kafka publisher.
165          */
166         private Producer<String, String> producer;
167         protected Properties kafkaProps;
168
169         /**
170          * Kafka Publisher Wrapper.
171          *
172          * @param busTopicParams topic parameters
173          */
174         protected KafkaPublisherWrapper(BusTopicParams busTopicParams) {
175
176             if (busTopicParams.isTopicInvalid()) {
177                 throw new IllegalArgumentException("No topic for Kafka");
178             }
179
180             this.topic = busTopicParams.getTopic();
181
182             //Setup Properties for consumer
183             kafkaProps = new Properties();
184             kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0));
185             if (busTopicParams.isAdditionalPropsValid()) {
186                 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
187                     kafkaProps.put(entry.getKey(), entry.getValue());
188                 }
189             }
190             if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) {
191                 kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
192             }
193             if (kafkaProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) == null) {
194                 kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
195             }
196
197             producer = new KafkaProducer<>(kafkaProps);
198         }
199
200         @Override
201         public boolean send(String partitionId, String message) {
202             if (message == null) {
203                 throw new IllegalArgumentException("No message provided");
204             }
205
206             try {
207                 //Create the record
208                 ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,
209                     UUID.randomUUID().toString(), message);
210
211                 this.producer.send(record);
212                 producer.flush();
213             } catch (Exception e) {
214                 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
215                 return false;
216             }
217             return true;
218         }
219
220         @Override
221         public void close() {
222             logger.info("{}: CLOSE", this);
223
224             try {
225                 this.producer.close();
226             } catch (Exception e) {
227                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
228             }
229         }
230
231
232         @Override
233         public String toString() {
234             return "KafkaPublisherWrapper []";
235         }
236
237     }
238
239     /**
240      * DmaapClient library wrapper.
241      */
242     public abstract class DmaapPublisherWrapper implements BusPublisher {
243
244         private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
245
246         /**
247          * MR based Publisher.
248          */
249         protected MRSimplerBatchPublisher publisher;
250         protected Properties props;
251
252         /**
253          * MR Publisher Wrapper.
254          *
255          * @param servers messaging bus hosts
256          * @param topic topic
257          * @param username AAF or DME2 Login
258          * @param password AAF or DME2 Password
259          */
260         protected DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic,
261                 String username, String password, boolean useHttps) {
262
263
264             if (StringUtils.isBlank(topic)) {
265                 throw new IllegalArgumentException("No topic for DMaaP");
266             }
267
268
269             configureProtocol(topic, protocol, servers, useHttps);
270
271             this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
272
273             this.publisher.setUsername(username);
274             this.publisher.setPassword(password);
275
276             props = new Properties();
277
278             props.setProperty("Protocol", (useHttps ? "https" : "http"));
279             props.setProperty("contenttype", "application/json");
280             props.setProperty("username", username);
281             props.setProperty("password", password);
282
283             props.setProperty("topic", topic);
284
285             this.publisher.setProps(props);
286
287             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
288                 this.publisher.setHost(servers.get(0));
289             }
290
291             logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
292         }
293
294         private void configureProtocol(String topic, ProtocolTypeConstants protocol, List<String> servers,
295                         boolean useHttps) {
296
297             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
298                 if (servers == null || servers.isEmpty()) {
299                     throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
300                 }
301
302                 ArrayList<String> dmaapServers = new ArrayList<>();
303                 String port = useHttps ? ":3905" : ":3904";
304                 for (String server : servers) {
305                     dmaapServers.add(server + port);
306                 }
307
308
309                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
310
311                 this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
312
313             } else if (protocol == ProtocolTypeConstants.DME2) {
314                 ArrayList<String> dmaapServers = new ArrayList<>();
315                 dmaapServers.add("0.0.0.0:3904");
316
317                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
318
319                 this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
320
321             } else {
322                 throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol);
323             }
324         }
325
326         @Override
327         public void close() {
328             logger.info("{}: CLOSE", this);
329
330             try {
331                 this.publisher.close(1, TimeUnit.SECONDS);
332
333             } catch (InterruptedException e) {
334                 logger.warn("{}: CLOSE FAILED", this, e);
335                 Thread.currentThread().interrupt();
336
337             } catch (Exception e) {
338                 logger.warn("{}: CLOSE FAILED", this, e);
339             }
340         }
341
342         @Override
343         public boolean send(String partitionId, String message) {
344             if (message == null) {
345                 throw new IllegalArgumentException("No message provided");
346             }
347
348             this.publisher.setPubResponse(new MRPublisherResponse());
349             this.publisher.send(partitionId, message);
350             MRPublisherResponse response = this.publisher.sendBatchWithResponse();
351             if (response != null) {
352                 logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(),
353                         response.getResponseMessage());
354             }
355
356             return true;
357         }
358
359         @Override
360         public String toString() {
361             return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
362                     + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()="
363                     + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag()
364                     + ", publisher.getUsername()=" + publisher.getUsername() + "]";
365         }
366     }
367
368     /**
369      * DmaapClient library wrapper.
370      */
371     public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
372         /**
373          * MR based Publisher.
374          */
375         public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
376                 boolean useHttps) {
377
378             super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
379         }
380     }
381
382     public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
383
384         /**
385          * Constructor.
386          *
387          * @param busTopicParams topic parameters
388          */
389         public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) {
390
391             super(ProtocolTypeConstants.DME2, busTopicParams.getServers(), busTopicParams.getTopic(),
392                     busTopicParams.getUserName(), busTopicParams.getPassword(), busTopicParams.isUseHttps());
393
394             String dme2RouteOffer = busTopicParams.isAdditionalPropsValid()
395                             ? busTopicParams.getAdditionalProps().get(
396                                             PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
397                             : null;
398
399             validateParams(busTopicParams, dme2RouteOffer);
400
401             String serviceName = busTopicParams.getServers().get(0);
402
403             /* These are required, no defaults */
404             props.setProperty("Environment", busTopicParams.getEnvironment());
405             props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
406
407             props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
408
409             if (busTopicParams.getPartner() != null) {
410                 props.setProperty("Partner", busTopicParams.getPartner());
411             }
412             if (dme2RouteOffer != null) {
413                 props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
414             }
415
416             props.setProperty("Latitude", busTopicParams.getLatitude());
417             props.setProperty("Longitude", busTopicParams.getLongitude());
418
419             // ServiceName also a default, found in additionalProps
420
421             /* These are optional, will default to these values if not set in optionalProps */
422             props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
423             props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
424             props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
425             props.setProperty("Version", "1.0");
426             props.setProperty("SubContextPath", "/");
427             props.setProperty("sessionstickinessrequired", "no");
428
429             /* These should not change */
430             props.setProperty("TransportType", "DME2");
431             props.setProperty("MethodType", "POST");
432
433             if (busTopicParams.isAdditionalPropsValid()) {
434                 addAdditionalProps(busTopicParams);
435             }
436
437             this.publisher.setProps(props);
438         }
439
440         private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) {
441             if (busTopicParams.isEnvironmentInvalid()) {
442                 throw parmException(busTopicParams.getTopic(),
443                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
444             }
445             if (busTopicParams.isAftEnvironmentInvalid()) {
446                 throw parmException(busTopicParams.getTopic(),
447                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
448             }
449             if (busTopicParams.isLatitudeInvalid()) {
450                 throw parmException(busTopicParams.getTopic(),
451                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
452             }
453             if (busTopicParams.isLongitudeInvalid()) {
454                 throw parmException(busTopicParams.getTopic(),
455                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
456             }
457
458             if ((busTopicParams.isPartnerInvalid())
459                     && StringUtils.isBlank(dme2RouteOffer)) {
460                 throw new IllegalArgumentException(
461                         "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
462                                 + busTopicParams.getTopic()
463                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
464                                 + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
465                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
466             }
467         }
468
469         private void addAdditionalProps(BusTopicParams busTopicParams) {
470             for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
471                 String key = entry.getKey();
472                 String value = entry.getValue();
473
474                 if (value != null) {
475                     props.setProperty(key, value);
476                 }
477             }
478         }
479
480         private IllegalArgumentException parmException(String topic, String propnm) {
481             return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
482                     + topic + propnm + " property for DME2 in DMaaP");
483
484         }
485     }
486 }