e0df709581ad066a34e97be6031e8038f83d4925
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
8  * Copyright (C) 2022 Nordix Foundation.
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
25
26 import com.att.nsa.apiClient.http.HttpClient.ConnectionType;
27 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
28 import com.att.nsa.cambria.client.CambriaClientBuilders;
29 import java.net.MalformedURLException;
30 import java.security.GeneralSecurityException;
31 import java.util.ArrayList;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Properties;
35 import java.util.Random;
36 import java.util.concurrent.TimeUnit;
37 import org.apache.commons.lang3.StringUtils;
38 import org.apache.kafka.clients.producer.KafkaProducer;
39 import org.apache.kafka.clients.producer.ProducerConfig;
40 import org.apache.kafka.clients.producer.ProducerRecord;
41 import org.apache.kafka.common.record.CompressionType;
42 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
43 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
44 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
45 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
46 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 public interface BusPublisher {
51
52     /**
53      * sends a message.
54      *
55      * @param partitionId id
56      * @param message the message
57      * @return true if success, false otherwise
58      * @throws IllegalArgumentException if no message provided
59      */
60     public boolean send(String partitionId, String message);
61
62     /**
63      * closes the publisher.
64      */
65     public void close();
66
67     /**
68      * Cambria based library publisher.
69      */
70     public static class CambriaPublisherWrapper implements BusPublisher {
71
72         private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
73
74         /**
75          * The actual Cambria publisher.
76          */
77         @GsonJsonIgnore
78         protected CambriaBatchingPublisher publisher;
79
80         /**
81          * Constructor.
82          *
83          * @param busTopicParams topic parameters
84          */
85         public CambriaPublisherWrapper(BusTopicParams busTopicParams) {
86
87             var builder = new CambriaClientBuilders.PublisherBuilder();
88
89             builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic());
90
91             // Set read timeout to 30 seconds (TBD: this should be configurable)
92             builder.withSocketTimeout(30000);
93
94             if (busTopicParams.isUseHttps()) {
95                 if (busTopicParams.isAllowSelfSignedCerts()) {
96                     builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
97                 } else {
98                     builder.withConnectionType(ConnectionType.HTTPS);
99                 }
100             }
101
102
103             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
104                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
105             }
106
107             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
108                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
109             }
110
111             try {
112                 this.publisher = builder.build();
113             } catch (MalformedURLException | GeneralSecurityException e) {
114                 throw new IllegalArgumentException(e);
115             }
116         }
117
118         @Override
119         public boolean send(String partitionId, String message) {
120             if (message == null) {
121                 throw new IllegalArgumentException("No message provided");
122             }
123
124             try {
125                 this.publisher.send(partitionId, message);
126             } catch (Exception e) {
127                 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
128                 return false;
129             }
130             return true;
131         }
132
133         @Override
134         public void close() {
135             logger.info("{}: CLOSE", this);
136
137             try {
138                 this.publisher.close();
139             } catch (Exception e) {
140                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
141             }
142         }
143
144
145         @Override
146         public String toString() {
147             return "CambriaPublisherWrapper []";
148         }
149
150     }
151
152     /**
153      * Kafka based library publisher.
154      */
155     public static class KafkaPublisherWrapper implements BusPublisher {
156
157         private static Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class);
158
159         /**
160          * The actual Kafka publisher.
161          */
162         private final KafkaProducer producer;
163
164         /**
165          * Constructor.
166          *
167          * @param busTopicParams topic parameters
168          */
169         public KafkaPublisherWrapper(BusTopicParams busTopicParams) {
170             // TODO Setting of topic parameters is not implemented yet.
171             //Setup Properties for Kafka Producer
172             Properties kafkaProps = new Properties();
173             this.producer = new KafkaProducer(kafkaProps);
174         }
175
176         @Override
177         public boolean send(String partitionId, String message) {
178             if (message == null) {
179                 throw new IllegalArgumentException("No message provided");
180             }
181             // TODO Sending messages is not implemented yet
182             return true;
183         }
184
185         @Override
186         public void close() {
187             logger.info("{}: CLOSE", this);
188
189             try (this.producer) {
190                 this.producer.close();
191             } catch (Exception e) {
192                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
193             }
194         }
195
196
197         @Override
198         public String toString() {
199             return "KafkaPublisherWrapper []";
200         }
201
202     }
203
204     /**
205      * DmaapClient library wrapper.
206      */
207     public abstract class DmaapPublisherWrapper implements BusPublisher {
208
209         private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
210
211         /**
212          * MR based Publisher.
213          */
214         protected MRSimplerBatchPublisher publisher;
215         protected Properties props;
216
217         /**
218          * MR Publisher Wrapper.
219          *
220          * @param servers messaging bus hosts
221          * @param topic topic
222          * @param username AAF or DME2 Login
223          * @param password AAF or DME2 Password
224          */
225         protected DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic,
226                 String username, String password, boolean useHttps) {
227
228
229             if (StringUtils.isBlank(topic)) {
230                 throw new IllegalArgumentException("No topic for DMaaP");
231             }
232
233
234             configureProtocol(topic, protocol, servers, useHttps);
235
236             this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
237
238             this.publisher.setUsername(username);
239             this.publisher.setPassword(password);
240
241             props = new Properties();
242
243             props.setProperty("Protocol", (useHttps ? "https" : "http"));
244             props.setProperty("contenttype", "application/json");
245             props.setProperty("username", username);
246             props.setProperty("password", password);
247
248             props.setProperty("topic", topic);
249
250             this.publisher.setProps(props);
251
252             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
253                 this.publisher.setHost(servers.get(0));
254             }
255
256             logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
257         }
258
259         private void configureProtocol(String topic, ProtocolTypeConstants protocol, List<String> servers,
260                         boolean useHttps) {
261
262             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
263                 if (servers == null || servers.isEmpty()) {
264                     throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
265                 }
266
267                 ArrayList<String> dmaapServers = new ArrayList<>();
268                 String port = useHttps ? ":3905" : ":3904";
269                 for (String server : servers) {
270                     dmaapServers.add(server + port);
271                 }
272
273
274                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
275
276                 this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
277
278             } else if (protocol == ProtocolTypeConstants.DME2) {
279                 ArrayList<String> dmaapServers = new ArrayList<>();
280                 dmaapServers.add("0.0.0.0:3904");
281
282                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
283
284                 this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
285
286             } else {
287                 throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol);
288             }
289         }
290
291         @Override
292         public void close() {
293             logger.info("{}: CLOSE", this);
294
295             try {
296                 this.publisher.close(1, TimeUnit.SECONDS);
297
298             } catch (InterruptedException e) {
299                 logger.warn("{}: CLOSE FAILED", this, e);
300                 Thread.currentThread().interrupt();
301
302             } catch (Exception e) {
303                 logger.warn("{}: CLOSE FAILED", this, e);
304             }
305         }
306
307         @Override
308         public boolean send(String partitionId, String message) {
309             if (message == null) {
310                 throw new IllegalArgumentException("No message provided");
311             }
312
313             this.publisher.setPubResponse(new MRPublisherResponse());
314             this.publisher.send(partitionId, message);
315             MRPublisherResponse response = this.publisher.sendBatchWithResponse();
316             if (response != null) {
317                 logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(),
318                         response.getResponseMessage());
319             }
320
321             return true;
322         }
323
324         @Override
325         public String toString() {
326             return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
327                     + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()="
328                     + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag()
329                     + ", publisher.getUsername()=" + publisher.getUsername() + "]";
330         }
331     }
332
333     /**
334      * DmaapClient library wrapper.
335      */
336     public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
337         /**
338          * MR based Publisher.
339          */
340         public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
341                 boolean useHttps) {
342
343             super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
344         }
345     }
346
347     public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
348
349         /**
350          * Constructor.
351          *
352          * @param busTopicParams topic parameters
353          */
354         public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) {
355
356             super(ProtocolTypeConstants.DME2, busTopicParams.getServers(), busTopicParams.getTopic(),
357                     busTopicParams.getUserName(), busTopicParams.getPassword(), busTopicParams.isUseHttps());
358
359             String dme2RouteOffer = busTopicParams.isAdditionalPropsValid()
360                             ? busTopicParams.getAdditionalProps().get(
361                                             PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
362                             : null;
363
364             validateParams(busTopicParams, dme2RouteOffer);
365
366             String serviceName = busTopicParams.getServers().get(0);
367
368             /* These are required, no defaults */
369             props.setProperty("Environment", busTopicParams.getEnvironment());
370             props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
371
372             props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
373
374             if (busTopicParams.getPartner() != null) {
375                 props.setProperty("Partner", busTopicParams.getPartner());
376             }
377             if (dme2RouteOffer != null) {
378                 props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
379             }
380
381             props.setProperty("Latitude", busTopicParams.getLatitude());
382             props.setProperty("Longitude", busTopicParams.getLongitude());
383
384             // ServiceName also a default, found in additionalProps
385
386             /* These are optional, will default to these values if not set in optionalProps */
387             props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
388             props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
389             props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
390             props.setProperty("Version", "1.0");
391             props.setProperty("SubContextPath", "/");
392             props.setProperty("sessionstickinessrequired", "no");
393
394             /* These should not change */
395             props.setProperty("TransportType", "DME2");
396             props.setProperty("MethodType", "POST");
397
398             if (busTopicParams.isAdditionalPropsValid()) {
399                 addAdditionalProps(busTopicParams);
400             }
401
402             this.publisher.setProps(props);
403         }
404
405         private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) {
406             if (busTopicParams.isEnvironmentInvalid()) {
407                 throw parmException(busTopicParams.getTopic(),
408                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
409             }
410             if (busTopicParams.isAftEnvironmentInvalid()) {
411                 throw parmException(busTopicParams.getTopic(),
412                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
413             }
414             if (busTopicParams.isLatitudeInvalid()) {
415                 throw parmException(busTopicParams.getTopic(),
416                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
417             }
418             if (busTopicParams.isLongitudeInvalid()) {
419                 throw parmException(busTopicParams.getTopic(),
420                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
421             }
422
423             if ((busTopicParams.isPartnerInvalid())
424                     && StringUtils.isBlank(dme2RouteOffer)) {
425                 throw new IllegalArgumentException(
426                         "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
427                                 + busTopicParams.getTopic()
428                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
429                                 + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
430                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
431             }
432         }
433
434         private void addAdditionalProps(BusTopicParams busTopicParams) {
435             for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
436                 String key = entry.getKey();
437                 String value = entry.getValue();
438
439                 if (value != null) {
440                     props.setProperty(key, value);
441                 }
442             }
443         }
444
445         private IllegalArgumentException parmException(String topic, String propnm) {
446             return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
447                     + topic + propnm + " property for DME2 in DMaaP");
448
449         }
450     }
451 }