67adf3b4abeed4aa050783f867f16ace9bb4a8f0
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import com.att.nsa.apiClient.http.HttpClient.ConnectionType;
25 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
26 import com.att.nsa.cambria.client.CambriaClientBuilders;
27 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
28 import com.fasterxml.jackson.annotation.JsonIgnore;
29
30 import java.net.MalformedURLException;
31 import java.security.GeneralSecurityException;
32 import java.util.ArrayList;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Properties;
36 import java.util.concurrent.TimeUnit;
37
38 import org.apache.commons.lang3.StringUtils;
39 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
40 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
41 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
42 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory;
43 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
44 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 public interface BusPublisher {
49
50     /**
51      * sends a message.
52      *
53      * @param partitionId id
54      * @param message the message
55      * @return true if success, false otherwise
56      * @throws IllegalArgumentException if no message provided
57      */
58     public boolean send(String partitionId, String message);
59
60     /**
61      * closes the publisher.
62      */
63     public void close();
64
65     /**
66      * Cambria based library publisher.
67      */
68     public static class CambriaPublisherWrapper implements BusPublisher {
69
70         private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
71
72         /**
73          * The actual Cambria publisher.
74          */
75         @JsonIgnore
76         @GsonJsonIgnore
77         protected CambriaBatchingPublisher publisher;
78
79         /**
80          * Constructor.
81          *
82          * @param busTopicParams topic parameters
83          */
84         public CambriaPublisherWrapper(BusTopicParams busTopicParams) {
85
86             PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
87
88             builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic());
89
90             // Set read timeout to 30 seconds (TBD: this should be configurable)
91             builder.withSocketTimeout(30000);
92
93             if (busTopicParams.isUseHttps()) {
94                 if (busTopicParams.isAllowSelfSignedCerts()) {
95                     builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
96                 } else {
97                     builder.withConnectionType(ConnectionType.HTTPS);
98                 }
99             }
100
101
102             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
103                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
104             }
105
106             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
107                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
108             }
109
110             try {
111                 this.publisher = builder.build();
112             } catch (MalformedURLException | GeneralSecurityException e) {
113                 throw new IllegalArgumentException(e);
114             }
115         }
116
117         @Override
118         public boolean send(String partitionId, String message) {
119             if (message == null) {
120                 throw new IllegalArgumentException("No message provided");
121             }
122
123             try {
124                 this.publisher.send(partitionId, message);
125             } catch (Exception e) {
126                 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
127                 return false;
128             }
129             return true;
130         }
131
132         @Override
133         public void close() {
134             logger.info("{}: CLOSE", this);
135
136             try {
137                 this.publisher.close();
138             } catch (Exception e) {
139                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
140             }
141         }
142
143
144         @Override
145         public String toString() {
146             return "CambriaPublisherWrapper []";
147         }
148
149     }
150
151     /**
152      * DmaapClient library wrapper.
153      */
154     public abstract class DmaapPublisherWrapper implements BusPublisher {
155
156         private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
157
158         /**
159          * MR based Publisher.
160          */
161         protected MRSimplerBatchPublisher publisher;
162         protected Properties props;
163
164         /**
165          * MR Publisher Wrapper.
166          *
167          * @param servers messaging bus hosts
168          * @param topic topic
169          * @param username AAF or DME2 Login
170          * @param password AAF or DME2 Password
171          */
172         public DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic,
173                 String username, String password, boolean useHttps) {
174
175
176             if (StringUtils.isBlank(topic)) {
177                 throw new IllegalArgumentException("No topic for DMaaP");
178             }
179
180
181             configureProtocol(topic, protocol, servers, useHttps);
182
183             this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
184
185             this.publisher.setUsername(username);
186             this.publisher.setPassword(password);
187
188             props = new Properties();
189
190             props.setProperty("Protocol", (useHttps ? "https" : "http"));
191             props.setProperty("contenttype", "application/json");
192             props.setProperty("username", username);
193             props.setProperty("password", password);
194
195             props.setProperty("topic", topic);
196
197             this.publisher.setProps(props);
198
199             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
200                 this.publisher.setHost(servers.get(0));
201             }
202
203             logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
204         }
205
206         private void configureProtocol(String topic, ProtocolTypeConstants protocol, List<String> servers,
207                         boolean useHttps) {
208
209             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
210                 if (servers == null || servers.isEmpty()) {
211                     throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
212                 }
213
214                 ArrayList<String> dmaapServers = new ArrayList<>();
215                 String port = useHttps ? ":3905" : ":3904";
216                 for (String server : servers) {
217                     dmaapServers.add(server + port);
218                 }
219
220
221                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
222
223                 this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
224
225             } else if (protocol == ProtocolTypeConstants.DME2) {
226                 ArrayList<String> dmaapServers = new ArrayList<>();
227                 dmaapServers.add("0.0.0.0:3904");
228
229                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
230
231                 this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
232
233             } else {
234                 throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol);
235             }
236         }
237
238         @Override
239         public void close() {
240             logger.info("{}: CLOSE", this);
241
242             try {
243                 this.publisher.close(1, TimeUnit.SECONDS);
244             } catch (Exception e) {
245                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
246             }
247         }
248
249         @Override
250         public boolean send(String partitionId, String message) {
251             if (message == null) {
252                 throw new IllegalArgumentException("No message provided");
253             }
254
255             this.publisher.setPubResponse(new MRPublisherResponse());
256             this.publisher.send(partitionId, message);
257             MRPublisherResponse response = this.publisher.sendBatchWithResponse();
258             if (response != null) {
259                 logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(),
260                         response.getResponseMessage());
261             }
262
263             return true;
264         }
265
266         @Override
267         public String toString() {
268             return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
269                     + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()="
270                     + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag()
271                     + ", publisher.getUsername()=" + publisher.getUsername() + "]";
272         }
273     }
274
275     /**
276      * DmaapClient library wrapper.
277      */
278     public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
279         /**
280          * MR based Publisher.
281          */
282         public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
283                 boolean useHttps) {
284
285             super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
286         }
287     }
288
289     public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
290
291         /**
292          * Constructor.
293          *
294          * @param busTopicParams topic parameters
295          */
296         public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) {
297
298             super(ProtocolTypeConstants.DME2, busTopicParams.getServers(),busTopicParams.getTopic(),
299                     busTopicParams.getUserName(),busTopicParams.getPassword(),busTopicParams.isUseHttps());
300
301             String dme2RouteOffer = busTopicParams.isAdditionalPropsValid()
302                             ? busTopicParams.getAdditionalProps().get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY)
303                             : null;
304
305             validateParams(busTopicParams, dme2RouteOffer);
306
307             String serviceName = busTopicParams.getServers().get(0);
308
309             /* These are required, no defaults */
310             props.setProperty("Environment", busTopicParams.getEnvironment());
311             props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
312
313             props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
314
315             if (busTopicParams.getPartner() != null) {
316                 props.setProperty("Partner", busTopicParams.getPartner());
317             }
318             if (dme2RouteOffer != null) {
319                 props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
320             }
321
322             props.setProperty("Latitude", busTopicParams.getLatitude());
323             props.setProperty("Longitude", busTopicParams.getLongitude());
324
325             // ServiceName also a default, found in additionalProps
326
327             /* These are optional, will default to these values if not set in optionalProps */
328             props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
329             props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
330             props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
331             props.setProperty("Version", "1.0");
332             props.setProperty("SubContextPath", "/");
333             props.setProperty("sessionstickinessrequired", "no");
334
335             /* These should not change */
336             props.setProperty("TransportType", "DME2");
337             props.setProperty("MethodType", "POST");
338
339             if (busTopicParams.isAdditionalPropsValid()) {
340                 addAdditionalProps(busTopicParams);
341             }
342
343             this.publisher.setProps(props);
344         }
345
346         private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) {
347             if (busTopicParams.isEnvironmentInvalid()) {
348                 throw parmException(busTopicParams.getTopic(),
349                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
350             }
351             if (busTopicParams.isAftEnvironmentInvalid()) {
352                 throw parmException(busTopicParams.getTopic(),
353                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
354             }
355             if (busTopicParams.isLatitudeInvalid()) {
356                 throw parmException(busTopicParams.getTopic(),
357                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
358             }
359             if (busTopicParams.isLongitudeInvalid()) {
360                 throw parmException(busTopicParams.getTopic(),
361                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
362             }
363
364             if ((busTopicParams.isPartnerInvalid())
365                     && StringUtils.isBlank(dme2RouteOffer)) {
366                 throw new IllegalArgumentException(
367                         "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
368                                 + busTopicParams.getTopic()
369                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
370                                 + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
371                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
372             }
373         }
374
375         private void addAdditionalProps(BusTopicParams busTopicParams) {
376             for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
377                 String key = entry.getKey();
378                 String value = entry.getValue();
379
380                 if (value != null) {
381                     props.setProperty(key, value);
382                 }
383             }
384         }
385
386         private IllegalArgumentException parmException(String topic, String propnm) {
387             return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
388                     + topic + propnm + " property for DME2 in DMaaP");
389
390         }
391     }
392 }