b66b4ba5999c0125219bbc04d56c52e2b769e43e
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import com.att.nsa.cambria.client.CambriaClientBuilders;
25 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
26 import com.att.nsa.cambria.client.CambriaConsumer;
27 import java.io.IOException;
28 import java.net.MalformedURLException;
29 import java.security.GeneralSecurityException;
30 import java.util.ArrayList;
31 import java.util.Map;
32 import java.util.Properties;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.TimeUnit;
35 import org.apache.commons.lang3.StringUtils;
36 import org.onap.dmaap.mr.client.MRClientFactory;
37 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
38 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
39 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
40 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 /**
45  * Wrapper around libraries to consume from message bus.
46  */
47 public interface BusConsumer {
48
49     /**
50      * fetch messages.
51      *
52      * @return list of messages
53      * @throws IOException when error encountered by underlying libraries
54      */
55     public Iterable<String> fetch() throws IOException;
56
57     /**
58      * close underlying library consumer.
59      */
60     public void close();
61
62     /**
63      * BusConsumer that supports server-side filtering.
64      */
65     public interface FilterableBusConsumer extends BusConsumer {
66
67         /**
68          * Sets the server-side filter.
69          *
70          * @param filter new filter value, or {@code null}
71          * @throws IllegalArgumentException if the consumer cannot be built with the new filter
72          */
73         public void setFilter(String filter);
74     }
75
76     /**
77      * Cambria based consumer.
78      */
79     public static class CambriaConsumerWrapper implements FilterableBusConsumer {
80
81         /**
82          * logger.
83          */
84         private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
85
86         /**
87          * Used to build the consumer.
88          */
89         private final ConsumerBuilder builder;
90
91         /**
92          * Locked while updating {@link #consumer} and {@link #newConsumer}.
93          */
94         private final Object consLocker = new Object();
95
96         /**
97          * Cambria client.
98          */
99         private CambriaConsumer consumer;
100
101         /**
102          * Cambria client to use for next fetch.
103          */
104         private CambriaConsumer newConsumer = null;
105
106         /**
107          * fetch timeout.
108          */
109         protected int fetchTimeout;
110
111         /**
112          * close condition.
113          */
114         protected CountDownLatch closeCondition = new CountDownLatch(1);
115
116         /**
117          * Cambria Consumer Wrapper.
118          * BusTopicParam object contains the following parameters
119          * servers messaging bus hosts.
120          * topic topic
121          * apiKey API Key
122          * apiSecret API Secret
123          * consumerGroup Consumer Group
124          * consumerInstance Consumer Instance
125          * fetchTimeout Fetch Timeout
126          * fetchLimit Fetch Limit
127          *
128          * @param busTopicParams - The parameters for the bus topic
129          * @throws GeneralSecurityException - Security exception
130          * @throws MalformedURLException - Malformed URL exception
131          */
132         public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
133
134             this.fetchTimeout = busTopicParams.getFetchTimeout();
135
136             this.builder = new CambriaClientBuilders.ConsumerBuilder();
137
138             builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
139                     .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
140                     .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
141
142             // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
143             builder.withSocketTimeout(fetchTimeout + 30000);
144
145             if (busTopicParams.isUseHttps()) {
146                 builder.usingHttps();
147
148                 if (busTopicParams.isAllowSelfSignedCerts()) {
149                     builder.allowSelfSignedCertificates();
150                 }
151             }
152
153             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
154                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
155             }
156
157             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
158                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
159             }
160
161             try {
162                 this.consumer = builder.build();
163             } catch (MalformedURLException | GeneralSecurityException e) {
164                 throw new IllegalArgumentException(e);
165             }
166         }
167
168         @Override
169         public Iterable<String> fetch() throws IOException {
170             try {
171                 return getCurrentConsumer().fetch();
172             } catch (final IOException e) {
173                 logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
174                         this.fetchTimeout);
175                 sleepAfterFetchFailure();
176                 throw e;
177             }
178         }
179
180         private void sleepAfterFetchFailure() {
181             try {
182                 this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS);
183
184             } catch (InterruptedException e) {
185                 logger.warn("{}: interrupted while handling fetch error", this, e);
186                 Thread.currentThread().interrupt();
187             }
188         }
189
190         @Override
191         public void close() {
192             this.closeCondition.countDown();
193             getCurrentConsumer().close();
194         }
195
196         private CambriaConsumer getCurrentConsumer() {
197             CambriaConsumer old = null;
198             CambriaConsumer ret;
199
200             synchronized (consLocker) {
201                 if (this.newConsumer != null) {
202                     // replace old consumer with new consumer
203                     old = this.consumer;
204                     this.consumer = this.newConsumer;
205                     this.newConsumer = null;
206                 }
207
208                 ret = this.consumer;
209             }
210
211             if (old != null) {
212                 old.close();
213             }
214
215             return ret;
216         }
217
218         @Override
219         public void setFilter(String filter) {
220             logger.info("{}: setting DMAAP server-side filter: {}", this, filter);
221             builder.withServerSideFilter(filter);
222
223             try {
224                 CambriaConsumer previous;
225                 synchronized (consLocker) {
226                     previous = this.newConsumer;
227                     this.newConsumer = builder.build();
228                 }
229
230                 if (previous != null) {
231                     // there was already a new consumer - close it
232                     previous.close();
233                 }
234
235             } catch (MalformedURLException | GeneralSecurityException e) {
236                 /*
237                  * Since an exception occurred, "consumer" still has its old value, thus it should
238                  * not be closed at this point.
239                  */
240                 throw new IllegalArgumentException(e);
241             }
242         }
243
244         @Override
245         public String toString() {
246             return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
247         }
248     }
249
250     /**
251      * MR based consumer.
252      */
253     public abstract class DmaapConsumerWrapper implements BusConsumer {
254
255         /**
256          * logger.
257          */
258         private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
259
260         /**
261          * Name of the "protocol" property.
262          */
263         protected static final String PROTOCOL_PROP = "Protocol";
264
265         /**
266          * fetch timeout.
267          */
268         protected int fetchTimeout;
269
270         /**
271          * close condition.
272          */
273         protected CountDownLatch closeCondition = new CountDownLatch(1);
274
275         /**
276          * MR Consumer.
277          */
278         protected MRConsumerImpl consumer;
279
280         /**
281          * MR Consumer Wrapper.
282          *
283          * <p>servers          messaging bus hosts
284          * topic            topic
285          * apiKey           API Key
286          * apiSecret        API Secret
287          * username         AAF Login
288          * password         AAF Password
289          * consumerGroup    Consumer Group
290          * consumerInstance Consumer Instance
291          * fetchTimeout     Fetch Timeout
292          * fetchLimit       Fetch Limit
293          *
294          * @param busTopicParams contains above listed attributes
295          * @throws MalformedURLException URL should be valid
296          */
297         public DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
298
299             this.fetchTimeout = busTopicParams.getFetchTimeout();
300
301             if (busTopicParams.isTopicInvalid()) {
302                 throw new IllegalArgumentException("No topic for DMaaP");
303             }
304
305             this.consumer = new MRConsumerImpl(busTopicParams.getServers(), busTopicParams.getTopic(),
306                     busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance(),
307                     busTopicParams.getFetchTimeout(), busTopicParams.getFetchLimit(), null,
308                     busTopicParams.getApiKey(), busTopicParams.getApiSecret());
309
310             this.consumer.setUsername(busTopicParams.getUserName());
311             this.consumer.setPassword(busTopicParams.getPassword());
312         }
313
314         @Override
315         public Iterable<String> fetch() throws IOException {
316             final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
317             if (response == null) {
318                 logger.warn("{}: DMaaP NULL response received", this);
319
320                 sleepAfterFetchFailure();
321                 return new ArrayList<>();
322             } else {
323                 logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
324                         response.getResponseMessage());
325
326                 if (!"200".equals(response.getResponseCode())) {
327
328                     logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
329                             response.getResponseMessage());
330
331                     sleepAfterFetchFailure();
332
333                     /* fall through */
334                 }
335             }
336
337             if (response.getActualMessages() == null) {
338                 return new ArrayList<>();
339             } else {
340                 return response.getActualMessages();
341             }
342         }
343
344         private void sleepAfterFetchFailure() {
345             try {
346                 this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS);
347
348             } catch (InterruptedException e) {
349                 logger.warn("{}: interrupted while handling fetch error", this, e);
350                 Thread.currentThread().interrupt();
351             }
352         }
353
354         @Override
355         public void close() {
356             this.closeCondition.countDown();
357             this.consumer.close();
358         }
359
360         @Override
361         public String toString() {
362             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
363                     + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
364                     + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
365                     + consumer.getUsername() + "]";
366         }
367     }
368
369     /**
370      * MR based consumer.
371      */
372     public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
373
374         private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
375
376         private final Properties props;
377
378         /**
379          * BusTopicParams contain the following parameters.
380          * MR Consumer Wrapper.
381          *
382          * <p>servers messaging bus hosts
383          * topic topic
384          * apiKey API Key
385          * apiSecret API Secret
386          * aafLogin AAF Login
387          * aafPassword AAF Password
388          * consumerGroup Consumer Group
389          * consumerInstance Consumer Instance
390          * fetchTimeout Fetch Timeout
391          * fetchLimit Fetch Limit
392          *
393          * @param busTopicParams contains above listed params
394          * @throws MalformedURLException URL should be valid
395          */
396         public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
397
398             super(busTopicParams);
399
400             // super constructor sets servers = {""} if empty to avoid errors when using DME2
401             if (busTopicParams.isServersInvalid()) {
402                 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
403             }
404
405             this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
406
407             props = new Properties();
408
409             if (busTopicParams.isUseHttps()) {
410                 props.setProperty(PROTOCOL_PROP, "https");
411                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
412
413             } else {
414                 props.setProperty(PROTOCOL_PROP, "http");
415                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
416             }
417
418             this.consumer.setProps(props);
419             logger.info("{}: CREATION", this);
420         }
421
422         @Override
423         public String toString() {
424             final MRConsumerImpl consumer = this.consumer;
425
426             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
427                     + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
428                     + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
429                     + consumer.getUsername() + "]";
430         }
431     }
432
433     public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
434
435         private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
436
437         private final Properties props;
438
439         /**
440          * Constructor.
441          *
442          * @param busTopicParams topic paramters
443          *
444          * @throws MalformedURLException must provide a valid URL
445          */
446         public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
447
448
449             super(busTopicParams);
450
451
452             final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
453                             ? busTopicParams.getAdditionalProps().get(
454                                             PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
455                             : null);
456
457             if (busTopicParams.isEnvironmentInvalid()) {
458                 throw parmException(busTopicParams.getTopic(),
459                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
460             }
461             if (busTopicParams.isAftEnvironmentInvalid()) {
462                 throw parmException(busTopicParams.getTopic(),
463                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
464             }
465             if (busTopicParams.isLatitudeInvalid()) {
466                 throw parmException(busTopicParams.getTopic(),
467                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
468             }
469             if (busTopicParams.isLongitudeInvalid()) {
470                 throw parmException(busTopicParams.getTopic(),
471                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
472             }
473
474             if ((busTopicParams.isPartnerInvalid())
475                     && StringUtils.isBlank(dme2RouteOffer)) {
476                 throw new IllegalArgumentException(
477                         "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
478                                 + "." + busTopicParams.getTopic()
479                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
480                                 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
481                                 + busTopicParams.getTopic()
482                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
483             }
484
485             final String serviceName = busTopicParams.getServers().get(0);
486
487             this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
488
489             this.consumer.setUsername(busTopicParams.getUserName());
490             this.consumer.setPassword(busTopicParams.getPassword());
491
492             props = new Properties();
493
494             props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
495
496             props.setProperty("username", busTopicParams.getUserName());
497             props.setProperty("password", busTopicParams.getPassword());
498
499             /* These are required, no defaults */
500             props.setProperty("topic", busTopicParams.getTopic());
501
502             props.setProperty("Environment", busTopicParams.getEnvironment());
503             props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
504
505             if (busTopicParams.getPartner() != null) {
506                 props.setProperty("Partner", busTopicParams.getPartner());
507             }
508             if (dme2RouteOffer != null) {
509                 props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
510             }
511
512             props.setProperty("Latitude", busTopicParams.getLatitude());
513             props.setProperty("Longitude", busTopicParams.getLongitude());
514
515             /* These are optional, will default to these values if not set in additionalProps */
516             props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
517             props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
518             props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
519             props.setProperty("Version", "1.0");
520             props.setProperty("SubContextPath", "/");
521             props.setProperty("sessionstickinessrequired", "no");
522
523             /* These should not change */
524             props.setProperty("TransportType", "DME2");
525             props.setProperty("MethodType", "GET");
526
527             if (busTopicParams.isUseHttps()) {
528                 props.setProperty(PROTOCOL_PROP, "https");
529
530             } else {
531                 props.setProperty(PROTOCOL_PROP, "http");
532             }
533
534             props.setProperty("contenttype", "application/json");
535
536             if (busTopicParams.isAdditionalPropsValid()) {
537                 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
538                     props.put(entry.getKey(), entry.getValue());
539                 }
540             }
541
542             MRClientFactory.prop = props;
543             this.consumer.setProps(props);
544
545             logger.info("{}: CREATION", this);
546         }
547
548         private IllegalArgumentException parmException(String topic, String propnm) {
549             return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
550                     + topic + propnm + " property for DME2 in DMaaP");
551
552         }
553     }
554 }
555
556