c2b590a0ca4bf05fdb2e8eff9aa408ba324041d0
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import com.att.nsa.apiClient.http.HttpClient.ConnectionType;
25 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
26 import com.att.nsa.cambria.client.CambriaClientBuilders;
27 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
28 import com.fasterxml.jackson.annotation.JsonIgnore;
29 import java.net.MalformedURLException;
30 import java.security.GeneralSecurityException;
31 import java.util.ArrayList;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Properties;
35 import java.util.concurrent.TimeUnit;
36 import org.apache.commons.lang3.StringUtils;
37 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
38 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
39 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
40 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
41 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 public interface BusPublisher {
46
47     /**
48      * sends a message.
49      *
50      * @param partitionId id
51      * @param message the message
52      * @return true if success, false otherwise
53      * @throws IllegalArgumentException if no message provided
54      */
55     public boolean send(String partitionId, String message);
56
57     /**
58      * closes the publisher.
59      */
60     public void close();
61
62     /**
63      * Cambria based library publisher.
64      */
65     public static class CambriaPublisherWrapper implements BusPublisher {
66
67         private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
68
69         /**
70          * The actual Cambria publisher.
71          */
72         @JsonIgnore
73         @GsonJsonIgnore
74         protected CambriaBatchingPublisher publisher;
75
76         /**
77          * Constructor.
78          *
79          * @param busTopicParams topic parameters
80          */
81         public CambriaPublisherWrapper(BusTopicParams busTopicParams) {
82
83             PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
84
85             builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic());
86
87             // Set read timeout to 30 seconds (TBD: this should be configurable)
88             builder.withSocketTimeout(30000);
89
90             if (busTopicParams.isUseHttps()) {
91                 if (busTopicParams.isAllowSelfSignedCerts()) {
92                     builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
93                 } else {
94                     builder.withConnectionType(ConnectionType.HTTPS);
95                 }
96             }
97
98
99             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
100                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
101             }
102
103             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
104                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
105             }
106
107             try {
108                 this.publisher = builder.build();
109             } catch (MalformedURLException | GeneralSecurityException e) {
110                 throw new IllegalArgumentException(e);
111             }
112         }
113
114         @Override
115         public boolean send(String partitionId, String message) {
116             if (message == null) {
117                 throw new IllegalArgumentException("No message provided");
118             }
119
120             try {
121                 this.publisher.send(partitionId, message);
122             } catch (Exception e) {
123                 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
124                 return false;
125             }
126             return true;
127         }
128
129         @Override
130         public void close() {
131             logger.info("{}: CLOSE", this);
132
133             try {
134                 this.publisher.close();
135             } catch (Exception e) {
136                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
137             }
138         }
139
140
141         @Override
142         public String toString() {
143             return "CambriaPublisherWrapper []";
144         }
145
146     }
147
148     /**
149      * DmaapClient library wrapper.
150      */
151     public abstract class DmaapPublisherWrapper implements BusPublisher {
152
153         private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
154
155         /**
156          * MR based Publisher.
157          */
158         protected MRSimplerBatchPublisher publisher;
159         protected Properties props;
160
161         /**
162          * MR Publisher Wrapper.
163          *
164          * @param servers messaging bus hosts
165          * @param topic topic
166          * @param username AAF or DME2 Login
167          * @param password AAF or DME2 Password
168          */
169         public DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic,
170                 String username, String password, boolean useHttps) {
171
172
173             if (StringUtils.isBlank(topic)) {
174                 throw new IllegalArgumentException("No topic for DMaaP");
175             }
176
177
178             configureProtocol(topic, protocol, servers, useHttps);
179
180             this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
181
182             this.publisher.setUsername(username);
183             this.publisher.setPassword(password);
184
185             props = new Properties();
186
187             props.setProperty("Protocol", (useHttps ? "https" : "http"));
188             props.setProperty("contenttype", "application/json");
189             props.setProperty("username", username);
190             props.setProperty("password", password);
191
192             props.setProperty("topic", topic);
193
194             this.publisher.setProps(props);
195
196             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
197                 this.publisher.setHost(servers.get(0));
198             }
199
200             logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
201         }
202
203         private void configureProtocol(String topic, ProtocolTypeConstants protocol, List<String> servers,
204                         boolean useHttps) {
205
206             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
207                 if (servers == null || servers.isEmpty()) {
208                     throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
209                 }
210
211                 ArrayList<String> dmaapServers = new ArrayList<>();
212                 String port = useHttps ? ":3905" : ":3904";
213                 for (String server : servers) {
214                     dmaapServers.add(server + port);
215                 }
216
217
218                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
219
220                 this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
221
222             } else if (protocol == ProtocolTypeConstants.DME2) {
223                 ArrayList<String> dmaapServers = new ArrayList<>();
224                 dmaapServers.add("0.0.0.0:3904");
225
226                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
227
228                 this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
229
230             } else {
231                 throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol);
232             }
233         }
234
235         @Override
236         public void close() {
237             logger.info("{}: CLOSE", this);
238
239             try {
240                 this.publisher.close(1, TimeUnit.SECONDS);
241             } catch (Exception e) {
242                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
243             }
244         }
245
246         @Override
247         public boolean send(String partitionId, String message) {
248             if (message == null) {
249                 throw new IllegalArgumentException("No message provided");
250             }
251
252             this.publisher.setPubResponse(new MRPublisherResponse());
253             this.publisher.send(partitionId, message);
254             MRPublisherResponse response = this.publisher.sendBatchWithResponse();
255             if (response != null) {
256                 logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(),
257                         response.getResponseMessage());
258             }
259
260             return true;
261         }
262
263         @Override
264         public String toString() {
265             return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
266                     + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()="
267                     + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag()
268                     + ", publisher.getUsername()=" + publisher.getUsername() + "]";
269         }
270     }
271
272     /**
273      * DmaapClient library wrapper.
274      */
275     public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
276         /**
277          * MR based Publisher.
278          */
279         public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
280                 boolean useHttps) {
281
282             super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
283         }
284     }
285
286     public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
287
288         /**
289          * Constructor.
290          *
291          * @param busTopicParams topic parameters
292          */
293         public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) {
294
295             super(ProtocolTypeConstants.DME2, busTopicParams.getServers(), busTopicParams.getTopic(),
296                     busTopicParams.getUserName(), busTopicParams.getPassword(), busTopicParams.isUseHttps());
297
298             String dme2RouteOffer = busTopicParams.isAdditionalPropsValid()
299                             ? busTopicParams.getAdditionalProps().get(
300                                             PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
301                             : null;
302
303             validateParams(busTopicParams, dme2RouteOffer);
304
305             String serviceName = busTopicParams.getServers().get(0);
306
307             /* These are required, no defaults */
308             props.setProperty("Environment", busTopicParams.getEnvironment());
309             props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
310
311             props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
312
313             if (busTopicParams.getPartner() != null) {
314                 props.setProperty("Partner", busTopicParams.getPartner());
315             }
316             if (dme2RouteOffer != null) {
317                 props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
318             }
319
320             props.setProperty("Latitude", busTopicParams.getLatitude());
321             props.setProperty("Longitude", busTopicParams.getLongitude());
322
323             // ServiceName also a default, found in additionalProps
324
325             /* These are optional, will default to these values if not set in optionalProps */
326             props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
327             props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
328             props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
329             props.setProperty("Version", "1.0");
330             props.setProperty("SubContextPath", "/");
331             props.setProperty("sessionstickinessrequired", "no");
332
333             /* These should not change */
334             props.setProperty("TransportType", "DME2");
335             props.setProperty("MethodType", "POST");
336
337             if (busTopicParams.isAdditionalPropsValid()) {
338                 addAdditionalProps(busTopicParams);
339             }
340
341             this.publisher.setProps(props);
342         }
343
344         private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) {
345             if (busTopicParams.isEnvironmentInvalid()) {
346                 throw parmException(busTopicParams.getTopic(),
347                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
348             }
349             if (busTopicParams.isAftEnvironmentInvalid()) {
350                 throw parmException(busTopicParams.getTopic(),
351                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
352             }
353             if (busTopicParams.isLatitudeInvalid()) {
354                 throw parmException(busTopicParams.getTopic(),
355                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
356             }
357             if (busTopicParams.isLongitudeInvalid()) {
358                 throw parmException(busTopicParams.getTopic(),
359                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
360             }
361
362             if ((busTopicParams.isPartnerInvalid())
363                     && StringUtils.isBlank(dme2RouteOffer)) {
364                 throw new IllegalArgumentException(
365                         "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
366                                 + busTopicParams.getTopic()
367                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
368                                 + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
369                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
370             }
371         }
372
373         private void addAdditionalProps(BusTopicParams busTopicParams) {
374             for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
375                 String key = entry.getKey();
376                 String value = entry.getValue();
377
378                 if (value != null) {
379                     props.setProperty(key, value);
380                 }
381             }
382         }
383
384         private IllegalArgumentException parmException(String topic, String propnm) {
385             return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
386                     + topic + propnm + " property for DME2 in DMaaP");
387
388         }
389     }
390 }