0c058adb09ce992869ffc83afa0a4f1212e892f5
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.common.endpoints.event.comm.bus.internal;
24
25 import com.att.nsa.apiClient.http.HttpClient.ConnectionType;
26 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
27 import com.att.nsa.cambria.client.CambriaClientBuilders;
28 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
29 import java.net.MalformedURLException;
30 import java.security.GeneralSecurityException;
31 import java.util.ArrayList;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Properties;
35 import java.util.concurrent.TimeUnit;
36 import org.apache.commons.lang3.StringUtils;
37 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
38 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
39 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
40 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
41 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 public interface BusPublisher {
46
47     /**
48      * sends a message.
49      *
50      * @param partitionId id
51      * @param message the message
52      * @return true if success, false otherwise
53      * @throws IllegalArgumentException if no message provided
54      */
55     public boolean send(String partitionId, String message);
56
57     /**
58      * closes the publisher.
59      */
60     public void close();
61
62     /**
63      * Cambria based library publisher.
64      */
65     public static class CambriaPublisherWrapper implements BusPublisher {
66
67         private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
68
69         /**
70          * The actual Cambria publisher.
71          */
72         @GsonJsonIgnore
73         protected CambriaBatchingPublisher publisher;
74
75         /**
76          * Constructor.
77          *
78          * @param busTopicParams topic parameters
79          */
80         public CambriaPublisherWrapper(BusTopicParams busTopicParams) {
81
82             PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
83
84             builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic());
85
86             // Set read timeout to 30 seconds (TBD: this should be configurable)
87             builder.withSocketTimeout(30000);
88
89             if (busTopicParams.isUseHttps()) {
90                 if (busTopicParams.isAllowSelfSignedCerts()) {
91                     builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
92                 } else {
93                     builder.withConnectionType(ConnectionType.HTTPS);
94                 }
95             }
96
97
98             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
99                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
100             }
101
102             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
103                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
104             }
105
106             try {
107                 this.publisher = builder.build();
108             } catch (MalformedURLException | GeneralSecurityException e) {
109                 throw new IllegalArgumentException(e);
110             }
111         }
112
113         @Override
114         public boolean send(String partitionId, String message) {
115             if (message == null) {
116                 throw new IllegalArgumentException("No message provided");
117             }
118
119             try {
120                 this.publisher.send(partitionId, message);
121             } catch (Exception e) {
122                 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
123                 return false;
124             }
125             return true;
126         }
127
128         @Override
129         public void close() {
130             logger.info("{}: CLOSE", this);
131
132             try {
133                 this.publisher.close();
134             } catch (Exception e) {
135                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
136             }
137         }
138
139
140         @Override
141         public String toString() {
142             return "CambriaPublisherWrapper []";
143         }
144
145     }
146
147     /**
148      * DmaapClient library wrapper.
149      */
150     public abstract class DmaapPublisherWrapper implements BusPublisher {
151
152         private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
153
154         /**
155          * MR based Publisher.
156          */
157         protected MRSimplerBatchPublisher publisher;
158         protected Properties props;
159
160         /**
161          * MR Publisher Wrapper.
162          *
163          * @param servers messaging bus hosts
164          * @param topic topic
165          * @param username AAF or DME2 Login
166          * @param password AAF or DME2 Password
167          */
168         protected DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic,
169                 String username, String password, boolean useHttps) {
170
171
172             if (StringUtils.isBlank(topic)) {
173                 throw new IllegalArgumentException("No topic for DMaaP");
174             }
175
176
177             configureProtocol(topic, protocol, servers, useHttps);
178
179             this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
180
181             this.publisher.setUsername(username);
182             this.publisher.setPassword(password);
183
184             props = new Properties();
185
186             props.setProperty("Protocol", (useHttps ? "https" : "http"));
187             props.setProperty("contenttype", "application/json");
188             props.setProperty("username", username);
189             props.setProperty("password", password);
190
191             props.setProperty("topic", topic);
192
193             this.publisher.setProps(props);
194
195             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
196                 this.publisher.setHost(servers.get(0));
197             }
198
199             logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
200         }
201
202         private void configureProtocol(String topic, ProtocolTypeConstants protocol, List<String> servers,
203                         boolean useHttps) {
204
205             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
206                 if (servers == null || servers.isEmpty()) {
207                     throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
208                 }
209
210                 ArrayList<String> dmaapServers = new ArrayList<>();
211                 String port = useHttps ? ":3905" : ":3904";
212                 for (String server : servers) {
213                     dmaapServers.add(server + port);
214                 }
215
216
217                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
218
219                 this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
220
221             } else if (protocol == ProtocolTypeConstants.DME2) {
222                 ArrayList<String> dmaapServers = new ArrayList<>();
223                 dmaapServers.add("0.0.0.0:3904");
224
225                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
226
227                 this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
228
229             } else {
230                 throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol);
231             }
232         }
233
234         @Override
235         public void close() {
236             logger.info("{}: CLOSE", this);
237
238             try {
239                 this.publisher.close(1, TimeUnit.SECONDS);
240             } catch (Exception e) {
241                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
242             }
243         }
244
245         @Override
246         public boolean send(String partitionId, String message) {
247             if (message == null) {
248                 throw new IllegalArgumentException("No message provided");
249             }
250
251             this.publisher.setPubResponse(new MRPublisherResponse());
252             this.publisher.send(partitionId, message);
253             MRPublisherResponse response = this.publisher.sendBatchWithResponse();
254             if (response != null) {
255                 logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(),
256                         response.getResponseMessage());
257             }
258
259             return true;
260         }
261
262         @Override
263         public String toString() {
264             return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
265                     + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()="
266                     + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag()
267                     + ", publisher.getUsername()=" + publisher.getUsername() + "]";
268         }
269     }
270
271     /**
272      * DmaapClient library wrapper.
273      */
274     public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
275         /**
276          * MR based Publisher.
277          */
278         public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
279                 boolean useHttps) {
280
281             super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
282         }
283     }
284
285     public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
286
287         /**
288          * Constructor.
289          *
290          * @param busTopicParams topic parameters
291          */
292         public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) {
293
294             super(ProtocolTypeConstants.DME2, busTopicParams.getServers(), busTopicParams.getTopic(),
295                     busTopicParams.getUserName(), busTopicParams.getPassword(), busTopicParams.isUseHttps());
296
297             String dme2RouteOffer = busTopicParams.isAdditionalPropsValid()
298                             ? busTopicParams.getAdditionalProps().get(
299                                             PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
300                             : null;
301
302             validateParams(busTopicParams, dme2RouteOffer);
303
304             String serviceName = busTopicParams.getServers().get(0);
305
306             /* These are required, no defaults */
307             props.setProperty("Environment", busTopicParams.getEnvironment());
308             props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
309
310             props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
311
312             if (busTopicParams.getPartner() != null) {
313                 props.setProperty("Partner", busTopicParams.getPartner());
314             }
315             if (dme2RouteOffer != null) {
316                 props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
317             }
318
319             props.setProperty("Latitude", busTopicParams.getLatitude());
320             props.setProperty("Longitude", busTopicParams.getLongitude());
321
322             // ServiceName also a default, found in additionalProps
323
324             /* These are optional, will default to these values if not set in optionalProps */
325             props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
326             props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
327             props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
328             props.setProperty("Version", "1.0");
329             props.setProperty("SubContextPath", "/");
330             props.setProperty("sessionstickinessrequired", "no");
331
332             /* These should not change */
333             props.setProperty("TransportType", "DME2");
334             props.setProperty("MethodType", "POST");
335
336             if (busTopicParams.isAdditionalPropsValid()) {
337                 addAdditionalProps(busTopicParams);
338             }
339
340             this.publisher.setProps(props);
341         }
342
343         private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) {
344             if (busTopicParams.isEnvironmentInvalid()) {
345                 throw parmException(busTopicParams.getTopic(),
346                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
347             }
348             if (busTopicParams.isAftEnvironmentInvalid()) {
349                 throw parmException(busTopicParams.getTopic(),
350                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
351             }
352             if (busTopicParams.isLatitudeInvalid()) {
353                 throw parmException(busTopicParams.getTopic(),
354                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
355             }
356             if (busTopicParams.isLongitudeInvalid()) {
357                 throw parmException(busTopicParams.getTopic(),
358                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
359             }
360
361             if ((busTopicParams.isPartnerInvalid())
362                     && StringUtils.isBlank(dme2RouteOffer)) {
363                 throw new IllegalArgumentException(
364                         "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
365                                 + busTopicParams.getTopic()
366                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
367                                 + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
368                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
369             }
370         }
371
372         private void addAdditionalProps(BusTopicParams busTopicParams) {
373             for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
374                 String key = entry.getKey();
375                 String value = entry.getValue();
376
377                 if (value != null) {
378                     props.setProperty(key, value);
379                 }
380             }
381         }
382
383         private IllegalArgumentException parmException(String topic, String propnm) {
384             return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
385                     + topic + propnm + " property for DME2 in DMaaP");
386
387         }
388     }
389 }