e20fb5986281f78934e9df800e0cb32419f980f6
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import com.att.nsa.cambria.client.CambriaClientBuilders;
25 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
26 import com.att.nsa.cambria.client.CambriaConsumer;
27
28 import java.io.IOException;
29 import java.net.MalformedURLException;
30 import java.security.GeneralSecurityException;
31 import java.util.ArrayList;
32 import java.util.Map;
33 import java.util.Properties;
34
35 import org.apache.commons.lang3.StringUtils;
36 import org.onap.dmaap.mr.client.MRClientFactory;
37 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
38 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
39 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
40 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory;
41 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Wrapper around libraries to consume from message bus.
47  */
48 public interface BusConsumer {
49
50     /**
51      * fetch messages.
52      *
53      * @return list of messages
54      * @throws Exception when error encountered by underlying libraries
55      */
56     public Iterable<String> fetch() throws InterruptedException, IOException;
57
58     /**
59      * close underlying library consumer.
60      */
61     public void close();
62
63     /**
64      * BusConsumer that supports server-side filtering.
65      */
66     public interface FilterableBusConsumer extends BusConsumer {
67
68         /**
69          * Sets the server-side filter.
70          *
71          * @param filter new filter value, or {@code null}
72          * @throws IllegalArgumentException if the consumer cannot be built with the new filter
73          */
74         public void setFilter(String filter);
75     }
76
77     /**
78      * Cambria based consumer.
79      */
80     public static class CambriaConsumerWrapper implements FilterableBusConsumer {
81
82         /**
83          * logger.
84          */
85         private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
86
87         /**
88          * Used to build the consumer.
89          */
90         private final ConsumerBuilder builder;
91
92         /**
93          * Locked while updating {@link #consumer} and {@link #newConsumer}.
94          */
95         private final Object consLocker = new Object();
96
97         /**
98          * Cambria client.
99          */
100         private CambriaConsumer consumer;
101
102         /**
103          * Cambria client to use for next fetch.
104          */
105         private CambriaConsumer newConsumer = null;
106
107         /**
108          * fetch timeout.
109          */
110         protected int fetchTimeout;
111
112         /**
113          * close condition.
114          */
115         protected Object closeCondition = new Object();
116
117         /**
118          * Cambria Consumer Wrapper.
119          * BusTopicParam object contains the following parameters
120          * servers messaging bus hosts.
121          * topic topic
122          * apiKey API Key
123          * apiSecret API Secret
124          * consumerGroup Consumer Group
125          * consumerInstance Consumer Instance
126          * fetchTimeout Fetch Timeout
127          * fetchLimit Fetch Limit
128          *
129          * @param busTopicParams - The parameters for the bus topic
130          * @throws GeneralSecurityException - Security exception
131          * @throws MalformedURLException - Malformed URL exception
132          */
133         public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
134
135             this.fetchTimeout = busTopicParams.getFetchTimeout();
136
137             this.builder = new CambriaClientBuilders.ConsumerBuilder();
138
139             builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
140                     .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
141                     .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
142
143             // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
144             builder.withSocketTimeout(fetchTimeout + 30000);
145
146             if (busTopicParams.isUseHttps()) {
147                 builder.usingHttps();
148
149                 if (busTopicParams.isAllowSelfSignedCerts()) {
150                     builder.allowSelfSignedCertificates();
151                 }
152             }
153
154             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
155                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
156             }
157
158             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
159                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
160             }
161
162             try {
163                 this.consumer = builder.build();
164             } catch (MalformedURLException | GeneralSecurityException e) {
165                 throw new IllegalArgumentException(e);
166             }
167         }
168
169         @Override
170         public Iterable<String> fetch() throws IOException, InterruptedException {
171             try {
172                 return getCurrentConsumer().fetch();
173             } catch (final IOException e) {
174                 logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
175                         this.fetchTimeout);
176                 synchronized (this.closeCondition) {
177                     this.closeCondition.wait(this.fetchTimeout);
178                 }
179
180                 throw e;
181             }
182         }
183
184         @Override
185         public void close() {
186             synchronized (closeCondition) {
187                 closeCondition.notifyAll();
188             }
189
190             getCurrentConsumer().close();
191         }
192
193         private CambriaConsumer getCurrentConsumer() {
194             CambriaConsumer old = null;
195             CambriaConsumer ret;
196
197             synchronized (consLocker) {
198                 if (this.newConsumer != null) {
199                     // replace old consumer with new consumer
200                     old = this.consumer;
201                     this.consumer = this.newConsumer;
202                     this.newConsumer = null;
203                 }
204
205                 ret = this.consumer;
206             }
207
208             if (old != null) {
209                 old.close();
210             }
211
212             return ret;
213         }
214
215         @Override
216         public void setFilter(String filter) {
217             logger.info("{}: setting DMAAP server-side filter: {}", this, filter);
218             builder.withServerSideFilter(filter);
219
220             try {
221                 CambriaConsumer previous;
222                 synchronized (consLocker) {
223                     previous = this.newConsumer;
224                     this.newConsumer = builder.build();
225                 }
226
227                 if (previous != null) {
228                     // there was already a new consumer - close it
229                     previous.close();
230                 }
231
232             } catch (MalformedURLException | GeneralSecurityException e) {
233                 /*
234                  * Since an exception occurred, "consumer" still has its old value, thus it should
235                  * not be closed at this point.
236                  */
237                 throw new IllegalArgumentException(e);
238             }
239         }
240
241         @Override
242         public String toString() {
243             return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
244         }
245     }
246
247     /**
248      * MR based consumer.
249      */
250     public abstract class DmaapConsumerWrapper implements BusConsumer {
251
252         /**
253          * logger.
254          */
255         private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
256
257         /**
258          * Name of the "protocol" property.
259          */
260         protected static final String PROTOCOL_PROP = "Protocol";
261
262         /**
263          * fetch timeout.
264          */
265         protected int fetchTimeout;
266
267         /**
268          * close condition.
269          */
270         protected Object closeCondition = new Object();
271
272         /**
273          * MR Consumer.
274          */
275         protected MRConsumerImpl consumer;
276
277         /**
278          * MR Consumer Wrapper.
279          * 
280          * <p>servers          messaging bus hosts
281          * topic            topic
282          * apiKey           API Key
283          * apiSecret        API Secret
284          * username         AAF Login
285          * password         AAF Password
286          * consumerGroup    Consumer Group
287          * consumerInstance Consumer Instance
288          * fetchTimeout     Fetch Timeout
289          * fetchLimit       Fetch Limit
290          *
291          * @param busTopicParams contains above listed attributes
292          * @throws MalformedURLException URL should be valid
293          */
294         public DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
295
296             this.fetchTimeout = busTopicParams.getFetchTimeout();
297
298             if (busTopicParams.isTopicInvalid()) {
299                 throw new IllegalArgumentException("No topic for DMaaP");
300             }
301
302             this.consumer = new MRConsumerImpl(busTopicParams.getServers(), busTopicParams.getTopic(),
303                     busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance(),
304                     busTopicParams.getFetchTimeout(), busTopicParams.getFetchLimit(), null,
305                     busTopicParams.getApiKey(), busTopicParams.getApiSecret());
306
307             this.consumer.setUsername(busTopicParams.getUserName());
308             this.consumer.setPassword(busTopicParams.getPassword());
309         }
310
311         @Override
312         public Iterable<String> fetch() throws InterruptedException, IOException {
313             final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
314             if (response == null) {
315                 logger.warn("{}: DMaaP NULL response received", this);
316
317                 synchronized (closeCondition) {
318                     closeCondition.wait(fetchTimeout);
319                 }
320                 return new ArrayList<>();
321             } else {
322                 logger.debug("DMaaP consumer received {} : {}" + response.getResponseCode(),
323                         response.getResponseMessage());
324
325                 if (!"200".equals(response.getResponseCode())) {
326
327                     logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
328                             response.getResponseMessage());
329
330                     synchronized (closeCondition) {
331                         closeCondition.wait(fetchTimeout);
332                     }
333
334                     /* fall through */
335                 }
336             }
337
338             if (response.getActualMessages() == null) {
339                 return new ArrayList<>();
340             } else {
341                 return response.getActualMessages();
342             }
343         }
344
345         @Override
346         public void close() {
347             synchronized (closeCondition) {
348                 closeCondition.notifyAll();
349             }
350
351             this.consumer.close();
352         }
353
354         @Override
355         public String toString() {
356             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
357                     + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
358                     + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
359                     + consumer.getUsername() + "]";
360         }
361     }
362
363     /**
364      * MR based consumer.
365      */
366     public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
367
368         private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
369
370         private final Properties props;
371
372         /**
373          * BusTopicParams contain the following parameters.
374          * MR Consumer Wrapper.
375          * 
376          * <p>servers messaging bus hosts
377          * topic topic
378          * apiKey API Key
379          * apiSecret API Secret
380          * aafLogin AAF Login
381          * aafPassword AAF Password
382          * consumerGroup Consumer Group
383          * consumerInstance Consumer Instance
384          * fetchTimeout Fetch Timeout
385          * fetchLimit Fetch Limit
386          *
387          * @param busTopicParams contains above listed params
388          * @throws MalformedURLException URL should be valid
389          */
390         public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
391
392             super(busTopicParams);
393
394             // super constructor sets servers = {""} if empty to avoid errors when using DME2
395             if (busTopicParams.isServersInvalid()) {
396                 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
397             }
398
399             this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
400
401             props = new Properties();
402
403             if (busTopicParams.isUseHttps()) {
404                 props.setProperty(PROTOCOL_PROP, "https");
405                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
406
407             } else {
408                 props.setProperty(PROTOCOL_PROP, "http");
409                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
410             }
411
412             this.consumer.setProps(props);
413             logger.info("{}: CREATION", this);
414         }
415
416         @Override
417         public String toString() {
418             final MRConsumerImpl consumer = this.consumer;
419
420             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
421                     + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
422                     + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
423                     + consumer.getUsername() + "]";
424         }
425     }
426
427     public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
428
429         private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
430
431         private final Properties props;
432
433         /**
434          * Constructor.
435          * 
436          * @param busTopicParams topic paramters
437          * 
438          * @throws MalformedURLException must provide a valid URL
439          */
440         public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
441
442
443             super(busTopicParams);
444
445
446             final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
447                             ? busTopicParams.getAdditionalProps().get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY)
448                             : null);
449
450             if (busTopicParams.isEnvironmentInvalid()) {
451                 throw parmException(busTopicParams.getTopic(),
452                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
453             }
454             if (busTopicParams.isAftEnvironmentInvalid()) {
455                 throw parmException(busTopicParams.getTopic(),
456                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
457             }
458             if (busTopicParams.isLatitudeInvalid()) {
459                 throw parmException(busTopicParams.getTopic(),
460                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
461             }
462             if (busTopicParams.isLongitudeInvalid()) {
463                 throw parmException(busTopicParams.getTopic(),
464                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
465             }
466
467             if ((busTopicParams.isPartnerInvalid())
468                     && StringUtils.isBlank(dme2RouteOffer)) {
469                 throw new IllegalArgumentException(
470                         "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
471                                 + "." + busTopicParams.getTopic()
472                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
473                                 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
474                                 + busTopicParams.getTopic()
475                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
476             }
477
478             final String serviceName = busTopicParams.getServers().get(0);
479
480             this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
481
482             this.consumer.setUsername(busTopicParams.getUserName());
483             this.consumer.setPassword(busTopicParams.getPassword());
484
485             props = new Properties();
486
487             props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
488
489             props.setProperty("username", busTopicParams.getUserName());
490             props.setProperty("password", busTopicParams.getPassword());
491
492             /* These are required, no defaults */
493             props.setProperty("topic", busTopicParams.getTopic());
494
495             props.setProperty("Environment", busTopicParams.getEnvironment());
496             props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
497
498             if (busTopicParams.getPartner() != null) {
499                 props.setProperty("Partner", busTopicParams.getPartner());
500             }
501             if (dme2RouteOffer != null) {
502                 props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
503             }
504
505             props.setProperty("Latitude", busTopicParams.getLatitude());
506             props.setProperty("Longitude", busTopicParams.getLongitude());
507
508             /* These are optional, will default to these values if not set in additionalProps */
509             props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
510             props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
511             props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
512             props.setProperty("Version", "1.0");
513             props.setProperty("SubContextPath", "/");
514             props.setProperty("sessionstickinessrequired", "no");
515
516             /* These should not change */
517             props.setProperty("TransportType", "DME2");
518             props.setProperty("MethodType", "GET");
519
520             if (busTopicParams.isUseHttps()) {
521                 props.setProperty(PROTOCOL_PROP, "https");
522
523             } else {
524                 props.setProperty(PROTOCOL_PROP, "http");
525             }
526
527             props.setProperty("contenttype", "application/json");
528
529             if (busTopicParams.isAdditionalPropsValid()) {
530                 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
531                     props.put(entry.getKey(), entry.getValue());
532                 }
533             }
534
535             MRClientFactory.prop = props;
536             this.consumer.setProps(props);
537
538             logger.info("{}: CREATION", this);
539         }
540
541         private IllegalArgumentException parmException(String topic, String propnm) {
542             return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
543                     + topic + propnm + " property for DME2 in DMaaP");
544
545         }
546     }
547 }
548
549