348100ab13ae8bedfd6e355a6a3c403231532812
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import com.att.nsa.apiClient.http.HttpClient.ConnectionType;
25 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
26 import com.att.nsa.cambria.client.CambriaClientBuilders;
27 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
28 import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
29 import com.att.nsa.mr.client.response.MRPublisherResponse;
30 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
31 import com.fasterxml.jackson.annotation.JsonIgnore;
32
33 import java.net.MalformedURLException;
34 import java.security.GeneralSecurityException;
35 import java.util.ArrayList;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Properties;
39 import java.util.concurrent.TimeUnit;
40
41 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory;
42 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 public interface BusPublisher {
47
48     /**
49      * sends a message
50      * 
51      * @param partitionId id
52      * @param message the message
53      * @return true if success, false otherwise
54      * @throws IllegalArgumentException if no message provided
55      */
56     public boolean send(String partitionId, String message);
57
58     /**
59      * closes the publisher
60      */
61     public void close();
62
63     /**
64      * Cambria based library publisher
65      */
66     public static class CambriaPublisherWrapper implements BusPublisher {
67
68         private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
69
70         /**
71          * The actual Cambria publisher
72          */
73         @JsonIgnore
74         protected volatile CambriaBatchingPublisher publisher;
75
76         public CambriaPublisherWrapper(BusTopicParams busTopicParams) {
77
78             PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
79
80             builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic());
81
82             // Set read timeout to 30 seconds (TBD: this should be configurable)
83             builder.withSocketTimeout(30000);
84
85             if (busTopicParams.isUseHttps()) {
86                 if (busTopicParams.isAllowSelfSignedCerts()) {
87                     builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
88                 } else {
89                     builder.withConnectionType(ConnectionType.HTTPS);
90                 }
91             }
92
93
94             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
95                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
96             }
97
98             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
99                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
100             }
101
102             try {
103                 this.publisher = builder.build();
104             } catch (MalformedURLException | GeneralSecurityException e) {
105                 throw new IllegalArgumentException(e);
106             }
107         }
108
109         /**
110          * {@inheritDoc}
111          */
112         @Override
113         public boolean send(String partitionId, String message) {
114             if (message == null) {
115                 throw new IllegalArgumentException("No message provided");
116             }
117
118             try {
119                 this.publisher.send(partitionId, message);
120             } catch (Exception e) {
121                 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
122                 return false;
123             }
124             return true;
125         }
126
127         /**
128          * {@inheritDoc}
129          */
130         @Override
131         public void close() {
132             logger.info("{}: CLOSE", this);
133
134             try {
135                 this.publisher.close();
136             } catch (Exception e) {
137                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
138             }
139         }
140
141
142         @Override
143         public String toString() {
144             return "CambriaPublisherWrapper []";
145         }
146
147     }
148
149     /**
150      * DmaapClient library wrapper
151      */
152     public abstract class DmaapPublisherWrapper implements BusPublisher {
153
154         private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
155
156         /**
157          * MR based Publisher
158          */
159         protected MRSimplerBatchPublisher publisher;
160         protected Properties props;
161
162         /**
163          * MR Publisher Wrapper
164          *
165          * @param servers messaging bus hosts
166          * @param topic topic
167          * @param username AAF or DME2 Login
168          * @param password AAF or DME2 Password
169          */
170         public DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic,
171                 String username, String password, boolean useHttps) {
172
173
174             if (topic == null || topic.isEmpty()) {
175                 throw new IllegalArgumentException("No topic for DMaaP");
176             }
177
178
179             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
180                 if (servers == null || servers.isEmpty()) {
181                     throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
182                 }
183
184                 ArrayList<String> dmaapServers = new ArrayList<>();
185                 if (useHttps) {
186                     for (String server : servers) {
187                         dmaapServers.add(server + ":3905");
188                     }
189
190                 } else {
191                     for (String server : servers) {
192                         dmaapServers.add(server + ":3904");
193                     }
194                 }
195
196
197                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
198
199                 this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
200             } else if (protocol == ProtocolTypeConstants.DME2) {
201                 ArrayList<String> dmaapServers = new ArrayList<>();
202                 dmaapServers.add("0.0.0.0:3904");
203
204                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
205
206                 this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
207             }
208
209             this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
210
211             this.publisher.setUsername(username);
212             this.publisher.setPassword(password);
213
214             props = new Properties();
215
216             if (useHttps) {
217                 props.setProperty("Protocol", "https");
218             } else {
219                 props.setProperty("Protocol", "http");
220             }
221
222             props.setProperty("contenttype", "application/json");
223             props.setProperty("username", username);
224             props.setProperty("password", password);
225
226             props.setProperty("topic", topic);
227
228             this.publisher.setProps(props);
229
230             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
231                 this.publisher.setHost(servers.get(0));
232             }
233
234             logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
235         }
236
237         /**
238          * {@inheritDoc}
239          */
240         @Override
241         public void close() {
242             logger.info("{}: CLOSE", this);
243
244             try {
245                 this.publisher.close(1, TimeUnit.SECONDS);
246             } catch (Exception e) {
247                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
248             }
249         }
250
251         /**
252          * {@inheritDoc}
253          */
254         @Override
255         public boolean send(String partitionId, String message) {
256             if (message == null) {
257                 throw new IllegalArgumentException("No message provided");
258             }
259
260             this.publisher.setPubResponse(new MRPublisherResponse());
261             this.publisher.send(partitionId, message);
262             MRPublisherResponse response = this.publisher.sendBatchWithResponse();
263             if (response != null) {
264                 logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(),
265                         response.getResponseMessage());
266             }
267
268             return true;
269         }
270
271         @Override
272         public String toString() {
273             return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
274                     + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()="
275                     + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag()
276                     + ", publisher.getUsername()=" + publisher.getUsername() + "]";
277         }
278     }
279
280     /**
281      * DmaapClient library wrapper
282      */
283     public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
284         /**
285          * MR based Publisher
286          */
287         public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
288                 boolean useHttps) {
289
290             super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
291         }
292     }
293
294     public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
295         public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) {
296
297             super(ProtocolTypeConstants.DME2, busTopicParams.getServers(),busTopicParams.getTopic(),
298                     busTopicParams.getUserName(),busTopicParams.getPassword(),busTopicParams.isUseHttps());
299             String dme2RouteOffer = null;
300             if (busTopicParams.isAdditionalPropsValid()) {
301                 dme2RouteOffer = busTopicParams.getAdditionalProps().get(
302                         DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
303             }
304
305             if (busTopicParams.isEnvironmentNullOrEmpty()) {
306                 throw parmException(busTopicParams.getTopic(),
307                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
308             }
309             if (busTopicParams.isAftEnvironmentNullOrEmpty()) {
310                 throw parmException(busTopicParams.getTopic(),
311                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
312             }
313             if (busTopicParams.isLatitudeNullOrEmpty()) {
314                 throw parmException(busTopicParams.getTopic(),
315                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
316             }
317             if (busTopicParams.isLongitudeNullOrEmpty()) {
318                 throw parmException(busTopicParams.getTopic(),
319                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
320             }
321
322             if ((busTopicParams.isPartnerNullOrEmpty())
323                     && (dme2RouteOffer == null || dme2RouteOffer.trim().isEmpty())) {
324                 throw new IllegalArgumentException(
325                         "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
326                                 + busTopicParams.getTopic()
327                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
328                                 + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
329                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
330             }
331
332             String serviceName = busTopicParams.getServers().get(0);
333
334             /* These are required, no defaults */
335             props.setProperty("Environment", busTopicParams.getEnvironment());
336             props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
337
338             props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
339
340             if (busTopicParams.getPartner() != null) {
341                 props.setProperty("Partner", busTopicParams.getPartner());
342             }
343             if (dme2RouteOffer != null) {
344                 props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
345             }
346
347             props.setProperty("Latitude", busTopicParams.getLatitude());
348             props.setProperty("Longitude", busTopicParams.getLongitude());
349
350             // ServiceName also a default, found in additionalProps
351
352             /* These are optional, will default to these values if not set in optionalProps */
353             props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
354             props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
355             props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
356             props.setProperty("Version", "1.0");
357             props.setProperty("SubContextPath", "/");
358             props.setProperty("sessionstickinessrequired", "no");
359
360             /* These should not change */
361             props.setProperty("TransportType", "DME2");
362             props.setProperty("MethodType", "POST");
363
364             for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
365                 String key = entry.getKey();
366                 String value = entry.getValue();
367
368                 if (value != null) {
369                     props.setProperty(key, value);
370                 }
371             }
372
373             this.publisher.setProps(props);
374         }
375
376         private IllegalArgumentException parmException(String topic, String propnm) {
377             return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
378                     + topic + propnm + " property for DME2 in DMaaP");
379
380         }
381     }
382 }