1c2d6eeb0bf7af250c8751d48e23ebb81733ea89
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import com.att.nsa.apiClient.http.HttpClient.ConnectionType;
25 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
26 import com.att.nsa.cambria.client.CambriaClientBuilders;
27 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
28 import com.fasterxml.jackson.annotation.JsonIgnore;
29
30 import java.net.MalformedURLException;
31 import java.security.GeneralSecurityException;
32 import java.util.ArrayList;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Properties;
36 import java.util.concurrent.TimeUnit;
37
38 import org.apache.commons.lang3.StringUtils;
39 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
40 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
41 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
42 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory;
43 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
44 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 public interface BusPublisher {
49
50     /**
51      * sends a message.
52      * 
53      * @param partitionId id
54      * @param message the message
55      * @return true if success, false otherwise
56      * @throws IllegalArgumentException if no message provided
57      */
58     public boolean send(String partitionId, String message);
59
60     /**
61      * closes the publisher.
62      */
63     public void close();
64
65     /**
66      * Cambria based library publisher.
67      */
68     public static class CambriaPublisherWrapper implements BusPublisher {
69
70         private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
71
72         /**
73          * The actual Cambria publisher.
74          */
75         @JsonIgnore
76         @GsonJsonIgnore
77         protected volatile CambriaBatchingPublisher publisher;
78
79         /**
80          * Constructor.
81          * 
82          * @param busTopicParams topic parameters
83          */
84         public CambriaPublisherWrapper(BusTopicParams busTopicParams) {
85
86             PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
87
88             builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic());
89
90             // Set read timeout to 30 seconds (TBD: this should be configurable)
91             builder.withSocketTimeout(30000);
92
93             if (busTopicParams.isUseHttps()) {
94                 if (busTopicParams.isAllowSelfSignedCerts()) {
95                     builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
96                 } else {
97                     builder.withConnectionType(ConnectionType.HTTPS);
98                 }
99             }
100
101
102             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
103                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
104             }
105
106             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
107                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
108             }
109
110             try {
111                 this.publisher = builder.build();
112             } catch (MalformedURLException | GeneralSecurityException e) {
113                 throw new IllegalArgumentException(e);
114             }
115         }
116
117         @Override
118         public boolean send(String partitionId, String message) {
119             if (message == null) {
120                 throw new IllegalArgumentException("No message provided");
121             }
122
123             try {
124                 this.publisher.send(partitionId, message);
125             } catch (Exception e) {
126                 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
127                 return false;
128             }
129             return true;
130         }
131
132         @Override
133         public void close() {
134             logger.info("{}: CLOSE", this);
135
136             try {
137                 this.publisher.close();
138             } catch (Exception e) {
139                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
140             }
141         }
142
143
144         @Override
145         public String toString() {
146             return "CambriaPublisherWrapper []";
147         }
148
149     }
150
151     /**
152      * DmaapClient library wrapper.
153      */
154     public abstract class DmaapPublisherWrapper implements BusPublisher {
155
156         private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
157
158         /**
159          * MR based Publisher.
160          */
161         protected MRSimplerBatchPublisher publisher;
162         protected Properties props;
163
164         /**
165          * MR Publisher Wrapper.
166          *
167          * @param servers messaging bus hosts
168          * @param topic topic
169          * @param username AAF or DME2 Login
170          * @param password AAF or DME2 Password
171          */
172         public DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic,
173                 String username, String password, boolean useHttps) {
174
175
176             if (StringUtils.isBlank(topic)) {
177                 throw new IllegalArgumentException("No topic for DMaaP");
178             }
179
180
181             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
182                 if (servers == null || servers.isEmpty()) {
183                     throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
184                 }
185
186                 ArrayList<String> dmaapServers = new ArrayList<>();
187                 if (useHttps) {
188                     for (String server : servers) {
189                         dmaapServers.add(server + ":3905");
190                     }
191
192                 } else {
193                     for (String server : servers) {
194                         dmaapServers.add(server + ":3904");
195                     }
196                 }
197
198
199                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
200
201                 this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
202             } else if (protocol == ProtocolTypeConstants.DME2) {
203                 ArrayList<String> dmaapServers = new ArrayList<>();
204                 dmaapServers.add("0.0.0.0:3904");
205
206                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
207
208                 this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
209             } else {
210                 throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol);
211             }
212
213             this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
214
215             this.publisher.setUsername(username);
216             this.publisher.setPassword(password);
217
218             props = new Properties();
219
220             if (useHttps) {
221                 props.setProperty("Protocol", "https");
222             } else {
223                 props.setProperty("Protocol", "http");
224             }
225
226             props.setProperty("contenttype", "application/json");
227             props.setProperty("username", username);
228             props.setProperty("password", password);
229
230             props.setProperty("topic", topic);
231
232             this.publisher.setProps(props);
233
234             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
235                 this.publisher.setHost(servers.get(0));
236             }
237
238             logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
239         }
240
241         @Override
242         public void close() {
243             logger.info("{}: CLOSE", this);
244
245             try {
246                 this.publisher.close(1, TimeUnit.SECONDS);
247             } catch (Exception e) {
248                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
249             }
250         }
251
252         @Override
253         public boolean send(String partitionId, String message) {
254             if (message == null) {
255                 throw new IllegalArgumentException("No message provided");
256             }
257
258             this.publisher.setPubResponse(new MRPublisherResponse());
259             this.publisher.send(partitionId, message);
260             MRPublisherResponse response = this.publisher.sendBatchWithResponse();
261             if (response != null) {
262                 logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(),
263                         response.getResponseMessage());
264             }
265
266             return true;
267         }
268
269         @Override
270         public String toString() {
271             return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
272                     + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()="
273                     + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag()
274                     + ", publisher.getUsername()=" + publisher.getUsername() + "]";
275         }
276     }
277
278     /**
279      * DmaapClient library wrapper.
280      */
281     public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
282         /**
283          * MR based Publisher.
284          */
285         public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
286                 boolean useHttps) {
287
288             super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
289         }
290     }
291
292     public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
293         
294         /**
295          * Constructor.
296          * 
297          * @param busTopicParams topic parameters
298          */
299         public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) {
300
301             super(ProtocolTypeConstants.DME2, busTopicParams.getServers(),busTopicParams.getTopic(),
302                     busTopicParams.getUserName(),busTopicParams.getPassword(),busTopicParams.isUseHttps());
303             String dme2RouteOffer = null;
304             if (busTopicParams.isAdditionalPropsValid()) {
305                 dme2RouteOffer = busTopicParams.getAdditionalProps().get(
306                         DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
307             }
308
309             if (busTopicParams.isEnvironmentInvalid()) {
310                 throw parmException(busTopicParams.getTopic(),
311                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
312             }
313             if (busTopicParams.isAftEnvironmentInvalid()) {
314                 throw parmException(busTopicParams.getTopic(),
315                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
316             }
317             if (busTopicParams.isLatitudeInvalid()) {
318                 throw parmException(busTopicParams.getTopic(),
319                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
320             }
321             if (busTopicParams.isLongitudeInvalid()) {
322                 throw parmException(busTopicParams.getTopic(),
323                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
324             }
325
326             if ((busTopicParams.isPartnerInvalid())
327                     && StringUtils.isBlank(dme2RouteOffer)) {
328                 throw new IllegalArgumentException(
329                         "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
330                                 + busTopicParams.getTopic()
331                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
332                                 + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
333                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
334             }
335
336             String serviceName = busTopicParams.getServers().get(0);
337
338             /* These are required, no defaults */
339             props.setProperty("Environment", busTopicParams.getEnvironment());
340             props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
341
342             props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
343
344             if (busTopicParams.getPartner() != null) {
345                 props.setProperty("Partner", busTopicParams.getPartner());
346             }
347             if (dme2RouteOffer != null) {
348                 props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
349             }
350
351             props.setProperty("Latitude", busTopicParams.getLatitude());
352             props.setProperty("Longitude", busTopicParams.getLongitude());
353
354             // ServiceName also a default, found in additionalProps
355
356             /* These are optional, will default to these values if not set in optionalProps */
357             props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
358             props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
359             props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
360             props.setProperty("Version", "1.0");
361             props.setProperty("SubContextPath", "/");
362             props.setProperty("sessionstickinessrequired", "no");
363
364             /* These should not change */
365             props.setProperty("TransportType", "DME2");
366             props.setProperty("MethodType", "POST");
367
368             if (busTopicParams.isAdditionalPropsValid()) {
369                 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
370                     String key = entry.getKey();
371                     String value = entry.getValue();
372
373                     if (value != null) {
374                         props.setProperty(key, value);
375                     }
376                 }
377             }
378
379             this.publisher.setProps(props);
380         }
381
382         private IllegalArgumentException parmException(String topic, String propnm) {
383             return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
384                     + topic + propnm + " property for DME2 in DMaaP");
385
386         }
387     }
388 }