2d1d825765543f0bce57c34cdd58ec81e670e488
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import com.att.nsa.apiClient.http.HttpClient.ConnectionType;
25 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
26 import com.att.nsa.cambria.client.CambriaClientBuilders;
27 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
28 import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
29 import com.att.nsa.mr.client.response.MRPublisherResponse;
30 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
31 import com.fasterxml.jackson.annotation.JsonIgnore;
32
33 import java.net.MalformedURLException;
34 import java.security.GeneralSecurityException;
35 import java.util.ArrayList;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Properties;
39 import java.util.concurrent.TimeUnit;
40
41 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory;
42 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 public interface BusPublisher {
47
48     /**
49      * sends a message.
50      * 
51      * @param partitionId id
52      * @param message the message
53      * @return true if success, false otherwise
54      * @throws IllegalArgumentException if no message provided
55      */
56     public boolean send(String partitionId, String message);
57
58     /**
59      * closes the publisher.
60      */
61     public void close();
62
63     /**
64      * Cambria based library publisher.
65      */
66     public static class CambriaPublisherWrapper implements BusPublisher {
67
68         private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
69
70         /**
71          * The actual Cambria publisher.
72          */
73         @JsonIgnore
74         protected volatile CambriaBatchingPublisher publisher;
75
76         /**
77          * Constructor.
78          * 
79          * @param busTopicParams topic parameters
80          */
81         public CambriaPublisherWrapper(BusTopicParams busTopicParams) {
82
83             PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
84
85             builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic());
86
87             // Set read timeout to 30 seconds (TBD: this should be configurable)
88             builder.withSocketTimeout(30000);
89
90             if (busTopicParams.isUseHttps()) {
91                 if (busTopicParams.isAllowSelfSignedCerts()) {
92                     builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
93                 } else {
94                     builder.withConnectionType(ConnectionType.HTTPS);
95                 }
96             }
97
98
99             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
100                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
101             }
102
103             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
104                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
105             }
106
107             try {
108                 this.publisher = builder.build();
109             } catch (MalformedURLException | GeneralSecurityException e) {
110                 throw new IllegalArgumentException(e);
111             }
112         }
113
114         /**
115          * {@inheritDoc}
116          */
117         @Override
118         public boolean send(String partitionId, String message) {
119             if (message == null) {
120                 throw new IllegalArgumentException("No message provided");
121             }
122
123             try {
124                 this.publisher.send(partitionId, message);
125             } catch (Exception e) {
126                 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
127                 return false;
128             }
129             return true;
130         }
131
132         /**
133          * {@inheritDoc}
134          */
135         @Override
136         public void close() {
137             logger.info("{}: CLOSE", this);
138
139             try {
140                 this.publisher.close();
141             } catch (Exception e) {
142                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
143             }
144         }
145
146
147         @Override
148         public String toString() {
149             return "CambriaPublisherWrapper []";
150         }
151
152     }
153
154     /**
155      * DmaapClient library wrapper.
156      */
157     public abstract class DmaapPublisherWrapper implements BusPublisher {
158
159         private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
160
161         /**
162          * MR based Publisher.
163          */
164         protected MRSimplerBatchPublisher publisher;
165         protected Properties props;
166
167         /**
168          * MR Publisher Wrapper.
169          *
170          * @param servers messaging bus hosts
171          * @param topic topic
172          * @param username AAF or DME2 Login
173          * @param password AAF or DME2 Password
174          */
175         public DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic,
176                 String username, String password, boolean useHttps) {
177
178
179             if (topic == null || topic.isEmpty()) {
180                 throw new IllegalArgumentException("No topic for DMaaP");
181             }
182
183
184             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
185                 if (servers == null || servers.isEmpty()) {
186                     throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
187                 }
188
189                 ArrayList<String> dmaapServers = new ArrayList<>();
190                 if (useHttps) {
191                     for (String server : servers) {
192                         dmaapServers.add(server + ":3905");
193                     }
194
195                 } else {
196                     for (String server : servers) {
197                         dmaapServers.add(server + ":3904");
198                     }
199                 }
200
201
202                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
203
204                 this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
205             } else if (protocol == ProtocolTypeConstants.DME2) {
206                 ArrayList<String> dmaapServers = new ArrayList<>();
207                 dmaapServers.add("0.0.0.0:3904");
208
209                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
210
211                 this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
212             }
213
214             this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
215
216             this.publisher.setUsername(username);
217             this.publisher.setPassword(password);
218
219             props = new Properties();
220
221             if (useHttps) {
222                 props.setProperty("Protocol", "https");
223             } else {
224                 props.setProperty("Protocol", "http");
225             }
226
227             props.setProperty("contenttype", "application/json");
228             props.setProperty("username", username);
229             props.setProperty("password", password);
230
231             props.setProperty("topic", topic);
232
233             this.publisher.setProps(props);
234
235             if (protocol == ProtocolTypeConstants.AAF_AUTH) {
236                 this.publisher.setHost(servers.get(0));
237             }
238
239             logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
240         }
241
242         /**
243          * {@inheritDoc}
244          */
245         @Override
246         public void close() {
247             logger.info("{}: CLOSE", this);
248
249             try {
250                 this.publisher.close(1, TimeUnit.SECONDS);
251             } catch (Exception e) {
252                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
253             }
254         }
255
256         /**
257          * {@inheritDoc}
258          */
259         @Override
260         public boolean send(String partitionId, String message) {
261             if (message == null) {
262                 throw new IllegalArgumentException("No message provided");
263             }
264
265             this.publisher.setPubResponse(new MRPublisherResponse());
266             this.publisher.send(partitionId, message);
267             MRPublisherResponse response = this.publisher.sendBatchWithResponse();
268             if (response != null) {
269                 logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(),
270                         response.getResponseMessage());
271             }
272
273             return true;
274         }
275
276         @Override
277         public String toString() {
278             return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
279                     + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()="
280                     + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag()
281                     + ", publisher.getUsername()=" + publisher.getUsername() + "]";
282         }
283     }
284
285     /**
286      * DmaapClient library wrapper.
287      */
288     public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
289         /**
290          * MR based Publisher.
291          */
292         public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
293                 boolean useHttps) {
294
295             super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
296         }
297     }
298
299     public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
300         
301         /**
302          * Constructor.
303          * 
304          * @param busTopicParams topic parameters
305          */
306         public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) {
307
308             super(ProtocolTypeConstants.DME2, busTopicParams.getServers(),busTopicParams.getTopic(),
309                     busTopicParams.getUserName(),busTopicParams.getPassword(),busTopicParams.isUseHttps());
310             String dme2RouteOffer = null;
311             if (busTopicParams.isAdditionalPropsValid()) {
312                 dme2RouteOffer = busTopicParams.getAdditionalProps().get(
313                         DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
314             }
315
316             if (busTopicParams.isEnvironmentNullOrEmpty()) {
317                 throw parmException(busTopicParams.getTopic(),
318                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
319             }
320             if (busTopicParams.isAftEnvironmentNullOrEmpty()) {
321                 throw parmException(busTopicParams.getTopic(),
322                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
323             }
324             if (busTopicParams.isLatitudeNullOrEmpty()) {
325                 throw parmException(busTopicParams.getTopic(),
326                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
327             }
328             if (busTopicParams.isLongitudeNullOrEmpty()) {
329                 throw parmException(busTopicParams.getTopic(),
330                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
331             }
332
333             if ((busTopicParams.isPartnerNullOrEmpty())
334                     && (dme2RouteOffer == null || dme2RouteOffer.trim().isEmpty())) {
335                 throw new IllegalArgumentException(
336                         "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
337                                 + busTopicParams.getTopic()
338                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
339                                 + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
340                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
341             }
342
343             String serviceName = busTopicParams.getServers().get(0);
344
345             /* These are required, no defaults */
346             props.setProperty("Environment", busTopicParams.getEnvironment());
347             props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
348
349             props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
350
351             if (busTopicParams.getPartner() != null) {
352                 props.setProperty("Partner", busTopicParams.getPartner());
353             }
354             if (dme2RouteOffer != null) {
355                 props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
356             }
357
358             props.setProperty("Latitude", busTopicParams.getLatitude());
359             props.setProperty("Longitude", busTopicParams.getLongitude());
360
361             // ServiceName also a default, found in additionalProps
362
363             /* These are optional, will default to these values if not set in optionalProps */
364             props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
365             props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
366             props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
367             props.setProperty("Version", "1.0");
368             props.setProperty("SubContextPath", "/");
369             props.setProperty("sessionstickinessrequired", "no");
370
371             /* These should not change */
372             props.setProperty("TransportType", "DME2");
373             props.setProperty("MethodType", "POST");
374
375             for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
376                 String key = entry.getKey();
377                 String value = entry.getValue();
378
379                 if (value != null) {
380                     props.setProperty(key, value);
381                 }
382             }
383
384             this.publisher.setProps(props);
385         }
386
387         private IllegalArgumentException parmException(String topic, String propnm) {
388             return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
389                     + topic + propnm + " property for DME2 in DMaaP");
390
391         }
392     }
393 }