aaf0cfb59a8ba4333e417d4a683390c8dd877f6c
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import com.att.nsa.apiClient.http.HttpClient.ConnectionType;
25 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
26 import com.att.nsa.cambria.client.CambriaClientBuilders;
27 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
28 import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
29 import com.att.nsa.mr.client.response.MRPublisherResponse;
30 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
31 import com.fasterxml.jackson.annotation.JsonIgnore;
32
33 import java.net.MalformedURLException;
34 import java.security.GeneralSecurityException;
35 import java.util.ArrayList;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Properties;
39 import java.util.concurrent.TimeUnit;
40
41 import org.apache.commons.lang3.StringUtils;
42 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory;
43 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 public interface BusPublisher {
48
49     /**
50      * sends a message.
51      * 
52      * @param partitionId id
53      * @param message the message
54      * @return true if success, false otherwise
55      * @throws IllegalArgumentException if no message provided
56      */
57     public boolean send(String partitionId, String message);
58
59     /**
60      * closes the publisher.
61      */
62     public void close();
63
64     /**
65      * Cambria based library publisher.
66      */
67     public static class CambriaPublisherWrapper implements BusPublisher {
68
69         private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
70
71         /**
72          * The actual Cambria publisher.
73          */
74         @JsonIgnore
75         protected volatile CambriaBatchingPublisher publisher;
76
77         /**
78          * Constructor.
79          * 
80          * @param busTopicParams topic parameters
81          */
82         public CambriaPublisherWrapper(BusTopicParams busTopicParams) {
83
84             PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
85
86             builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic());
87
88             // Set read timeout to 30 seconds (TBD: this should be configurable)
89             builder.withSocketTimeout(30000);
90
91             if (busTopicParams.isUseHttps()) {
92                 if (busTopicParams.isAllowSelfSignedCerts()) {
93                     builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
94                 } else {
95                     builder.withConnectionType(ConnectionType.HTTPS);
96                 }
97             }
98
99
100             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
101                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
102             }
103
104             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
105                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
106             }
107
108             try {
109                 this.publisher = builder.build();
110             } catch (MalformedURLException | GeneralSecurityException e) {
111                 throw new IllegalArgumentException(e);
112             }
113         }
114
115         /**
116          * {@inheritDoc}
117          */
118         @Override
119         public boolean send(String partitionId, String message) {
120             if (message == null) {
121                 throw new IllegalArgumentException("No message provided");
122             }
123
124             try {
125                 this.publisher.send(partitionId, message);
126             } catch (Exception e) {
127                 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
128                 return false;
129             }
130             return true;
131         }
132
133         /**
134          * {@inheritDoc}
135          */
136         @Override
137         public void close() {
138             logger.info("{}: CLOSE", this);
139
140             try {
141                 this.publisher.close();
142             } catch (Exception e) {
143                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
144             }
145         }
146
147
148         @Override
149         public String toString() {
150             return "CambriaPublisherWrapper []";
151         }
152
153     }
154
155     /**
156      * DmaapClient library wrapper.
157      */
158     public abstract class DmaapPublisherWrapper implements BusPublisher {
159
160         private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
161
162         /**
163          * MR based Publisher.
164          */
165         protected MRSimplerBatchPublisher publisher;
166         protected Properties props;
167
168         /**
169          * MR Publisher Wrapper.
170          *
171          * @param servers messaging bus hosts
172          * @param topic topic
173          * @param username AAF or DME2 Login
174          * @param password AAF or DME2 Password
175          */
176         public DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic,
177                 String username, String password, boolean useHttps) {
178
179
180             if (StringUtils.isBlank(topic)) {
181                 throw new IllegalArgumentException("No topic for DMaaP");
182             }
183
184
185             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
186                 if (servers == null || servers.isEmpty()) {
187                     throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
188                 }
189
190                 ArrayList<String> dmaapServers = new ArrayList<>();
191                 if (useHttps) {
192                     for (String server : servers) {
193                         dmaapServers.add(server + ":3905");
194                     }
195
196                 } else {
197                     for (String server : servers) {
198                         dmaapServers.add(server + ":3904");
199                     }
200                 }
201
202
203                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
204
205                 this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
206             } else if (protocol == ProtocolTypeConstants.DME2) {
207                 ArrayList<String> dmaapServers = new ArrayList<>();
208                 dmaapServers.add("0.0.0.0:3904");
209
210                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
211
212                 this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
213             }
214
215             this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
216
217             this.publisher.setUsername(username);
218             this.publisher.setPassword(password);
219
220             props = new Properties();
221
222             if (useHttps) {
223                 props.setProperty("Protocol", "https");
224             } else {
225                 props.setProperty("Protocol", "http");
226             }
227
228             props.setProperty("contenttype", "application/json");
229             props.setProperty("username", username);
230             props.setProperty("password", password);
231
232             props.setProperty("topic", topic);
233
234             this.publisher.setProps(props);
235
236             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
237                 this.publisher.setHost(servers.get(0));
238             }
239
240             logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
241         }
242
243         /**
244          * {@inheritDoc}
245          */
246         @Override
247         public void close() {
248             logger.info("{}: CLOSE", this);
249
250             try {
251                 this.publisher.close(1, TimeUnit.SECONDS);
252             } catch (Exception e) {
253                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
254             }
255         }
256
257         /**
258          * {@inheritDoc}
259          */
260         @Override
261         public boolean send(String partitionId, String message) {
262             if (message == null) {
263                 throw new IllegalArgumentException("No message provided");
264             }
265
266             this.publisher.setPubResponse(new MRPublisherResponse());
267             this.publisher.send(partitionId, message);
268             MRPublisherResponse response = this.publisher.sendBatchWithResponse();
269             if (response != null) {
270                 logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(),
271                         response.getResponseMessage());
272             }
273
274             return true;
275         }
276
277         @Override
278         public String toString() {
279             return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
280                     + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()="
281                     + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag()
282                     + ", publisher.getUsername()=" + publisher.getUsername() + "]";
283         }
284     }
285
286     /**
287      * DmaapClient library wrapper.
288      */
289     public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
290         /**
291          * MR based Publisher.
292          */
293         public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
294                 boolean useHttps) {
295
296             super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
297         }
298     }
299
300     public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
301         
302         /**
303          * Constructor.
304          * 
305          * @param busTopicParams topic parameters
306          */
307         public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) {
308
309             super(ProtocolTypeConstants.DME2, busTopicParams.getServers(),busTopicParams.getTopic(),
310                     busTopicParams.getUserName(),busTopicParams.getPassword(),busTopicParams.isUseHttps());
311             String dme2RouteOffer = null;
312             if (busTopicParams.isAdditionalPropsValid()) {
313                 dme2RouteOffer = busTopicParams.getAdditionalProps().get(
314                         DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
315             }
316
317             if (busTopicParams.isEnvironmentInvalid()) {
318                 throw parmException(busTopicParams.getTopic(),
319                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
320             }
321             if (busTopicParams.isAftEnvironmentInvalid()) {
322                 throw parmException(busTopicParams.getTopic(),
323                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
324             }
325             if (busTopicParams.isLatitudeInvalid()) {
326                 throw parmException(busTopicParams.getTopic(),
327                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
328             }
329             if (busTopicParams.isLongitudeInvalid()) {
330                 throw parmException(busTopicParams.getTopic(),
331                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
332             }
333
334             if ((busTopicParams.isPartnerInvalid())
335                     && StringUtils.isBlank(dme2RouteOffer)) {
336                 throw new IllegalArgumentException(
337                         "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
338                                 + busTopicParams.getTopic()
339                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
340                                 + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
341                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
342             }
343
344             String serviceName = busTopicParams.getServers().get(0);
345
346             /* These are required, no defaults */
347             props.setProperty("Environment", busTopicParams.getEnvironment());
348             props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
349
350             props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
351
352             if (busTopicParams.getPartner() != null) {
353                 props.setProperty("Partner", busTopicParams.getPartner());
354             }
355             if (dme2RouteOffer != null) {
356                 props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
357             }
358
359             props.setProperty("Latitude", busTopicParams.getLatitude());
360             props.setProperty("Longitude", busTopicParams.getLongitude());
361
362             // ServiceName also a default, found in additionalProps
363
364             /* These are optional, will default to these values if not set in optionalProps */
365             props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
366             props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
367             props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
368             props.setProperty("Version", "1.0");
369             props.setProperty("SubContextPath", "/");
370             props.setProperty("sessionstickinessrequired", "no");
371
372             /* These should not change */
373             props.setProperty("TransportType", "DME2");
374             props.setProperty("MethodType", "POST");
375
376             for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
377                 String key = entry.getKey();
378                 String value = entry.getValue();
379
380                 if (value != null) {
381                     props.setProperty(key, value);
382                 }
383             }
384
385             this.publisher.setProps(props);
386         }
387
388         private IllegalArgumentException parmException(String topic, String propnm) {
389             return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
390                     + topic + propnm + " property for DME2 in DMaaP");
391
392         }
393     }
394 }