8bf805bf43cbac7b857376e1f28532b40ddc02e5
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.common.endpoints.event.comm.bus.internal;
24
25 import com.att.nsa.apiClient.http.HttpClient.ConnectionType;
26 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
27 import com.att.nsa.cambria.client.CambriaClientBuilders;
28 import java.net.MalformedURLException;
29 import java.security.GeneralSecurityException;
30 import java.util.ArrayList;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Properties;
34 import java.util.concurrent.TimeUnit;
35 import org.apache.commons.lang3.StringUtils;
36 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
37 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
38 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
39 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
40 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 public interface BusPublisher {
45
46     /**
47      * sends a message.
48      *
49      * @param partitionId id
50      * @param message the message
51      * @return true if success, false otherwise
52      * @throws IllegalArgumentException if no message provided
53      */
54     public boolean send(String partitionId, String message);
55
56     /**
57      * closes the publisher.
58      */
59     public void close();
60
61     /**
62      * Cambria based library publisher.
63      */
64     public static class CambriaPublisherWrapper implements BusPublisher {
65
66         private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
67
68         /**
69          * The actual Cambria publisher.
70          */
71         @GsonJsonIgnore
72         protected CambriaBatchingPublisher publisher;
73
74         /**
75          * Constructor.
76          *
77          * @param busTopicParams topic parameters
78          */
79         public CambriaPublisherWrapper(BusTopicParams busTopicParams) {
80
81             var builder = new CambriaClientBuilders.PublisherBuilder();
82
83             builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic());
84
85             // Set read timeout to 30 seconds (TBD: this should be configurable)
86             builder.withSocketTimeout(30000);
87
88             if (busTopicParams.isUseHttps()) {
89                 if (busTopicParams.isAllowSelfSignedCerts()) {
90                     builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
91                 } else {
92                     builder.withConnectionType(ConnectionType.HTTPS);
93                 }
94             }
95
96
97             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
98                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
99             }
100
101             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
102                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
103             }
104
105             try {
106                 this.publisher = builder.build();
107             } catch (MalformedURLException | GeneralSecurityException e) {
108                 throw new IllegalArgumentException(e);
109             }
110         }
111
112         @Override
113         public boolean send(String partitionId, String message) {
114             if (message == null) {
115                 throw new IllegalArgumentException("No message provided");
116             }
117
118             try {
119                 this.publisher.send(partitionId, message);
120             } catch (Exception e) {
121                 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
122                 return false;
123             }
124             return true;
125         }
126
127         @Override
128         public void close() {
129             logger.info("{}: CLOSE", this);
130
131             try {
132                 this.publisher.close();
133             } catch (Exception e) {
134                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
135             }
136         }
137
138
139         @Override
140         public String toString() {
141             return "CambriaPublisherWrapper []";
142         }
143
144     }
145
146     /**
147      * DmaapClient library wrapper.
148      */
149     public abstract class DmaapPublisherWrapper implements BusPublisher {
150
151         private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
152
153         /**
154          * MR based Publisher.
155          */
156         protected MRSimplerBatchPublisher publisher;
157         protected Properties props;
158
159         /**
160          * MR Publisher Wrapper.
161          *
162          * @param servers messaging bus hosts
163          * @param topic topic
164          * @param username AAF or DME2 Login
165          * @param password AAF or DME2 Password
166          */
167         protected DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic,
168                 String username, String password, boolean useHttps) {
169
170
171             if (StringUtils.isBlank(topic)) {
172                 throw new IllegalArgumentException("No topic for DMaaP");
173             }
174
175
176             configureProtocol(topic, protocol, servers, useHttps);
177
178             this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
179
180             this.publisher.setUsername(username);
181             this.publisher.setPassword(password);
182
183             props = new Properties();
184
185             props.setProperty("Protocol", (useHttps ? "https" : "http"));
186             props.setProperty("contenttype", "application/json");
187             props.setProperty("username", username);
188             props.setProperty("password", password);
189
190             props.setProperty("topic", topic);
191
192             this.publisher.setProps(props);
193
194             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
195                 this.publisher.setHost(servers.get(0));
196             }
197
198             logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
199         }
200
201         private void configureProtocol(String topic, ProtocolTypeConstants protocol, List<String> servers,
202                         boolean useHttps) {
203
204             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
205                 if (servers == null || servers.isEmpty()) {
206                     throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
207                 }
208
209                 ArrayList<String> dmaapServers = new ArrayList<>();
210                 String port = useHttps ? ":3905" : ":3904";
211                 for (String server : servers) {
212                     dmaapServers.add(server + port);
213                 }
214
215
216                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
217
218                 this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
219
220             } else if (protocol == ProtocolTypeConstants.DME2) {
221                 ArrayList<String> dmaapServers = new ArrayList<>();
222                 dmaapServers.add("0.0.0.0:3904");
223
224                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
225
226                 this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
227
228             } else {
229                 throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol);
230             }
231         }
232
233         @Override
234         public void close() {
235             logger.info("{}: CLOSE", this);
236
237             try {
238                 this.publisher.close(1, TimeUnit.SECONDS);
239
240             } catch (InterruptedException e) {
241                 logger.warn("{}: CLOSE FAILED", this, e);
242                 Thread.currentThread().interrupt();
243
244             } catch (Exception e) {
245                 logger.warn("{}: CLOSE FAILED", this, e);
246             }
247         }
248
249         @Override
250         public boolean send(String partitionId, String message) {
251             if (message == null) {
252                 throw new IllegalArgumentException("No message provided");
253             }
254
255             this.publisher.setPubResponse(new MRPublisherResponse());
256             this.publisher.send(partitionId, message);
257             MRPublisherResponse response = this.publisher.sendBatchWithResponse();
258             if (response != null) {
259                 logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(),
260                         response.getResponseMessage());
261             }
262
263             return true;
264         }
265
266         @Override
267         public String toString() {
268             return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
269                     + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()="
270                     + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag()
271                     + ", publisher.getUsername()=" + publisher.getUsername() + "]";
272         }
273     }
274
275     /**
276      * DmaapClient library wrapper.
277      */
278     public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
279         /**
280          * MR based Publisher.
281          */
282         public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
283                 boolean useHttps) {
284
285             super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
286         }
287     }
288
289     public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
290
291         /**
292          * Constructor.
293          *
294          * @param busTopicParams topic parameters
295          */
296         public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) {
297
298             super(ProtocolTypeConstants.DME2, busTopicParams.getServers(), busTopicParams.getTopic(),
299                     busTopicParams.getUserName(), busTopicParams.getPassword(), busTopicParams.isUseHttps());
300
301             String dme2RouteOffer = busTopicParams.isAdditionalPropsValid()
302                             ? busTopicParams.getAdditionalProps().get(
303                                             PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
304                             : null;
305
306             validateParams(busTopicParams, dme2RouteOffer);
307
308             String serviceName = busTopicParams.getServers().get(0);
309
310             /* These are required, no defaults */
311             props.setProperty("Environment", busTopicParams.getEnvironment());
312             props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
313
314             props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
315
316             if (busTopicParams.getPartner() != null) {
317                 props.setProperty("Partner", busTopicParams.getPartner());
318             }
319             if (dme2RouteOffer != null) {
320                 props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
321             }
322
323             props.setProperty("Latitude", busTopicParams.getLatitude());
324             props.setProperty("Longitude", busTopicParams.getLongitude());
325
326             // ServiceName also a default, found in additionalProps
327
328             /* These are optional, will default to these values if not set in optionalProps */
329             props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
330             props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
331             props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
332             props.setProperty("Version", "1.0");
333             props.setProperty("SubContextPath", "/");
334             props.setProperty("sessionstickinessrequired", "no");
335
336             /* These should not change */
337             props.setProperty("TransportType", "DME2");
338             props.setProperty("MethodType", "POST");
339
340             if (busTopicParams.isAdditionalPropsValid()) {
341                 addAdditionalProps(busTopicParams);
342             }
343
344             this.publisher.setProps(props);
345         }
346
347         private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) {
348             if (busTopicParams.isEnvironmentInvalid()) {
349                 throw parmException(busTopicParams.getTopic(),
350                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
351             }
352             if (busTopicParams.isAftEnvironmentInvalid()) {
353                 throw parmException(busTopicParams.getTopic(),
354                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
355             }
356             if (busTopicParams.isLatitudeInvalid()) {
357                 throw parmException(busTopicParams.getTopic(),
358                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
359             }
360             if (busTopicParams.isLongitudeInvalid()) {
361                 throw parmException(busTopicParams.getTopic(),
362                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
363             }
364
365             if ((busTopicParams.isPartnerInvalid())
366                     && StringUtils.isBlank(dme2RouteOffer)) {
367                 throw new IllegalArgumentException(
368                         "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
369                                 + busTopicParams.getTopic()
370                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
371                                 + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
372                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
373             }
374         }
375
376         private void addAdditionalProps(BusTopicParams busTopicParams) {
377             for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
378                 String key = entry.getKey();
379                 String value = entry.getValue();
380
381                 if (value != null) {
382                     props.setProperty(key, value);
383                 }
384             }
385         }
386
387         private IllegalArgumentException parmException(String topic, String propnm) {
388             return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
389                     + topic + propnm + " property for DME2 in DMaaP");
390
391         }
392     }
393 }