233434f11fa02ab5a9cad4f9fbac0bfe51146f06
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import com.att.nsa.cambria.client.CambriaClientBuilders;
25 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
26 import com.att.nsa.cambria.client.CambriaConsumer;
27 import java.io.IOException;
28 import java.net.MalformedURLException;
29 import java.security.GeneralSecurityException;
30 import java.util.ArrayList;
31 import java.util.Map;
32 import java.util.Properties;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.TimeUnit;
35 import org.apache.commons.lang3.StringUtils;
36 import org.onap.dmaap.mr.client.MRClientFactory;
37 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
38 import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
39 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
40 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
41 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Wrapper around libraries to consume from message bus.
47  */
48 public interface BusConsumer {
49
50     /**
51      * fetch messages.
52      *
53      * @return list of messages
54      * @throws IOException when error encountered by underlying libraries
55      */
56     public Iterable<String> fetch() throws IOException;
57
58     /**
59      * close underlying library consumer.
60      */
61     public void close();
62
63     /**
64      * BusConsumer that supports server-side filtering.
65      */
66     public interface FilterableBusConsumer extends BusConsumer {
67
68         /**
69          * Sets the server-side filter.
70          *
71          * @param filter new filter value, or {@code null}
72          * @throws IllegalArgumentException if the consumer cannot be built with the new filter
73          */
74         public void setFilter(String filter);
75     }
76
77     /**
78      * Cambria based consumer.
79      */
80     public static class CambriaConsumerWrapper implements FilterableBusConsumer {
81
82         /**
83          * logger.
84          */
85         private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
86
87         /**
88          * Used to build the consumer.
89          */
90         private final ConsumerBuilder builder;
91
92         /**
93          * Locked while updating {@link #consumer} and {@link #newConsumer}.
94          */
95         private final Object consLocker = new Object();
96
97         /**
98          * Cambria client.
99          */
100         private CambriaConsumer consumer;
101
102         /**
103          * Cambria client to use for next fetch.
104          */
105         private CambriaConsumer newConsumer = null;
106
107         /**
108          * fetch timeout.
109          */
110         protected int fetchTimeout;
111
112         /**
113          * close condition.
114          */
115         protected CountDownLatch closeCondition = new CountDownLatch(1);
116
117         /**
118          * Cambria Consumer Wrapper.
119          * BusTopicParam object contains the following parameters
120          * servers messaging bus hosts.
121          * topic topic
122          * apiKey API Key
123          * apiSecret API Secret
124          * consumerGroup Consumer Group
125          * consumerInstance Consumer Instance
126          * fetchTimeout Fetch Timeout
127          * fetchLimit Fetch Limit
128          *
129          * @param busTopicParams - The parameters for the bus topic
130          * @throws GeneralSecurityException - Security exception
131          * @throws MalformedURLException - Malformed URL exception
132          */
133         public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
134
135             this.fetchTimeout = busTopicParams.getFetchTimeout();
136
137             this.builder = new CambriaClientBuilders.ConsumerBuilder();
138
139             builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
140                     .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
141                     .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
142
143             // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
144             builder.withSocketTimeout(fetchTimeout + 30000);
145
146             if (busTopicParams.isUseHttps()) {
147                 builder.usingHttps();
148
149                 if (busTopicParams.isAllowSelfSignedCerts()) {
150                     builder.allowSelfSignedCertificates();
151                 }
152             }
153
154             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
155                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
156             }
157
158             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
159                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
160             }
161
162             try {
163                 this.consumer = builder.build();
164             } catch (MalformedURLException | GeneralSecurityException e) {
165                 throw new IllegalArgumentException(e);
166             }
167         }
168
169         @Override
170         public Iterable<String> fetch() throws IOException {
171             try {
172                 return getCurrentConsumer().fetch();
173             } catch (final IOException e) { //NOSONAR
174                 logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
175                         this.fetchTimeout);
176                 sleepAfterFetchFailure();
177                 throw e;
178             }
179         }
180
181         private void sleepAfterFetchFailure() {
182             try {
183                 this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR
184
185             } catch (InterruptedException e) {
186                 logger.warn("{}: interrupted while handling fetch error", this, e);
187                 Thread.currentThread().interrupt();
188             }
189         }
190
191         @Override
192         public void close() {
193             this.closeCondition.countDown();
194             getCurrentConsumer().close();
195         }
196
197         private CambriaConsumer getCurrentConsumer() {
198             CambriaConsumer old = null;
199             CambriaConsumer ret;
200
201             synchronized (consLocker) {
202                 if (this.newConsumer != null) {
203                     // replace old consumer with new consumer
204                     old = this.consumer;
205                     this.consumer = this.newConsumer;
206                     this.newConsumer = null;
207                 }
208
209                 ret = this.consumer;
210             }
211
212             if (old != null) {
213                 old.close();
214             }
215
216             return ret;
217         }
218
219         @Override
220         public void setFilter(String filter) {
221             logger.info("{}: setting DMAAP server-side filter: {}", this, filter);
222             builder.withServerSideFilter(filter);
223
224             try {
225                 CambriaConsumer previous;
226                 synchronized (consLocker) {
227                     previous = this.newConsumer;
228                     this.newConsumer = builder.build();
229                 }
230
231                 if (previous != null) {
232                     // there was already a new consumer - close it
233                     previous.close();
234                 }
235
236             } catch (MalformedURLException | GeneralSecurityException e) {
237                 /*
238                  * Since an exception occurred, "consumer" still has its old value, thus it should
239                  * not be closed at this point.
240                  */
241                 throw new IllegalArgumentException(e);
242             }
243         }
244
245         @Override
246         public String toString() {
247             return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
248         }
249     }
250
251     /**
252      * MR based consumer.
253      */
254     public abstract class DmaapConsumerWrapper implements BusConsumer {
255
256         /**
257          * logger.
258          */
259         private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
260
261         /**
262          * Name of the "protocol" property.
263          */
264         protected static final String PROTOCOL_PROP = "Protocol";
265
266         /**
267          * fetch timeout.
268          */
269         protected int fetchTimeout;
270
271         /**
272          * close condition.
273          */
274         protected CountDownLatch closeCondition = new CountDownLatch(1);
275
276         /**
277          * MR Consumer.
278          */
279         protected MRConsumerImpl consumer;
280
281         /**
282          * MR Consumer Wrapper.
283          *
284          * <p>servers          messaging bus hosts
285          * topic            topic
286          * apiKey           API Key
287          * apiSecret        API Secret
288          * username         AAF Login
289          * password         AAF Password
290          * consumerGroup    Consumer Group
291          * consumerInstance Consumer Instance
292          * fetchTimeout     Fetch Timeout
293          * fetchLimit       Fetch Limit
294          *
295          * @param busTopicParams contains above listed attributes
296          * @throws MalformedURLException URL should be valid
297          */
298         public DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
299
300             this.fetchTimeout = busTopicParams.getFetchTimeout();
301
302             if (busTopicParams.isTopicInvalid()) {
303                 throw new IllegalArgumentException("No topic for DMaaP");
304             }
305
306             this.consumer = new MRConsumerImplBuilder()
307                             .setHostPart(busTopicParams.getServers())
308                             .setTopic(busTopicParams.getTopic())
309                             .setConsumerGroup(busTopicParams.getConsumerGroup())
310                             .setConsumerId(busTopicParams.getConsumerInstance())
311                             .setTimeoutMs(busTopicParams.getFetchTimeout())
312                             .setLimit(busTopicParams.getFetchLimit())
313                             .setApiKey(busTopicParams.getApiKey())
314                             .setApiSecret(busTopicParams.getApiSecret())
315                             .createMRConsumerImpl();
316
317             this.consumer.setUsername(busTopicParams.getUserName());
318             this.consumer.setPassword(busTopicParams.getPassword());
319         }
320
321         @Override
322         public Iterable<String> fetch() throws IOException {
323             final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
324             if (response == null) {
325                 logger.warn("{}: DMaaP NULL response received", this);
326
327                 sleepAfterFetchFailure();
328                 return new ArrayList<>();
329             } else {
330                 logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
331                         response.getResponseMessage());
332
333                 if (!"200".equals(response.getResponseCode())) {
334
335                     logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
336                             response.getResponseMessage());
337
338                     sleepAfterFetchFailure();
339
340                     /* fall through */
341                 }
342             }
343
344             if (response.getActualMessages() == null) {
345                 return new ArrayList<>();
346             } else {
347                 return response.getActualMessages();
348             }
349         }
350
351         private void sleepAfterFetchFailure() {
352             try {
353                 this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR
354
355             } catch (InterruptedException e) {
356                 logger.warn("{}: interrupted while handling fetch error", this, e);
357                 Thread.currentThread().interrupt();
358             }
359         }
360
361         @Override
362         public void close() {
363             this.closeCondition.countDown();
364             this.consumer.close();
365         }
366
367         @Override
368         public String toString() {
369             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
370                     + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
371                     + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
372                     + consumer.getUsername() + "]";
373         }
374     }
375
376     /**
377      * MR based consumer.
378      */
379     public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
380
381         private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
382
383         private final Properties props;
384
385         /**
386          * BusTopicParams contain the following parameters.
387          * MR Consumer Wrapper.
388          *
389          * <p>servers messaging bus hosts
390          * topic topic
391          * apiKey API Key
392          * apiSecret API Secret
393          * aafLogin AAF Login
394          * aafPassword AAF Password
395          * consumerGroup Consumer Group
396          * consumerInstance Consumer Instance
397          * fetchTimeout Fetch Timeout
398          * fetchLimit Fetch Limit
399          *
400          * @param busTopicParams contains above listed params
401          * @throws MalformedURLException URL should be valid
402          */
403         public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
404
405             super(busTopicParams);
406
407             // super constructor sets servers = {""} if empty to avoid errors when using DME2
408             if (busTopicParams.isServersInvalid()) {
409                 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
410             }
411
412             this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
413
414             props = new Properties();
415
416             if (busTopicParams.isUseHttps()) {
417                 props.setProperty(PROTOCOL_PROP, "https");
418                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
419
420             } else {
421                 props.setProperty(PROTOCOL_PROP, "http");
422                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
423             }
424
425             this.consumer.setProps(props);
426             logger.info("{}: CREATION", this);
427         }
428
429         @Override
430         public String toString() {
431             final MRConsumerImpl consumer = this.consumer;
432
433             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
434                     + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
435                     + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
436                     + consumer.getUsername() + "]";
437         }
438     }
439
440     public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
441
442         private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
443
444         private final Properties props;
445
446         /**
447          * Constructor.
448          *
449          * @param busTopicParams topic paramters
450          *
451          * @throws MalformedURLException must provide a valid URL
452          */
453         public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
454
455
456             super(busTopicParams);
457
458
459             final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
460                             ? busTopicParams.getAdditionalProps().get(
461                                             PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
462                             : null);
463
464             if (busTopicParams.isEnvironmentInvalid()) {
465                 throw parmException(busTopicParams.getTopic(),
466                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
467             }
468             if (busTopicParams.isAftEnvironmentInvalid()) {
469                 throw parmException(busTopicParams.getTopic(),
470                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
471             }
472             if (busTopicParams.isLatitudeInvalid()) {
473                 throw parmException(busTopicParams.getTopic(),
474                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
475             }
476             if (busTopicParams.isLongitudeInvalid()) {
477                 throw parmException(busTopicParams.getTopic(),
478                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
479             }
480
481             if ((busTopicParams.isPartnerInvalid())
482                     && StringUtils.isBlank(dme2RouteOffer)) {
483                 throw new IllegalArgumentException(
484                         "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
485                                 + "." + busTopicParams.getTopic()
486                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
487                                 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
488                                 + busTopicParams.getTopic()
489                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
490             }
491
492             final String serviceName = busTopicParams.getServers().get(0);
493
494             this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
495
496             this.consumer.setUsername(busTopicParams.getUserName());
497             this.consumer.setPassword(busTopicParams.getPassword());
498
499             props = new Properties();
500
501             props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
502
503             props.setProperty("username", busTopicParams.getUserName());
504             props.setProperty("password", busTopicParams.getPassword());
505
506             /* These are required, no defaults */
507             props.setProperty("topic", busTopicParams.getTopic());
508
509             props.setProperty("Environment", busTopicParams.getEnvironment());
510             props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
511
512             if (busTopicParams.getPartner() != null) {
513                 props.setProperty("Partner", busTopicParams.getPartner());
514             }
515             if (dme2RouteOffer != null) {
516                 props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
517             }
518
519             props.setProperty("Latitude", busTopicParams.getLatitude());
520             props.setProperty("Longitude", busTopicParams.getLongitude());
521
522             /* These are optional, will default to these values if not set in additionalProps */
523             props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
524             props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
525             props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
526             props.setProperty("Version", "1.0");
527             props.setProperty("SubContextPath", "/");
528             props.setProperty("sessionstickinessrequired", "no");
529
530             /* These should not change */
531             props.setProperty("TransportType", "DME2");
532             props.setProperty("MethodType", "GET");
533
534             if (busTopicParams.isUseHttps()) {
535                 props.setProperty(PROTOCOL_PROP, "https");
536
537             } else {
538                 props.setProperty(PROTOCOL_PROP, "http");
539             }
540
541             props.setProperty("contenttype", "application/json");
542
543             if (busTopicParams.isAdditionalPropsValid()) {
544                 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
545                     props.put(entry.getKey(), entry.getValue());
546                 }
547             }
548
549             MRClientFactory.prop = props;
550             this.consumer.setProps(props);
551
552             logger.info("{}: CREATION", this);
553         }
554
555         private IllegalArgumentException parmException(String topic, String propnm) {
556             return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
557                     + topic + propnm + " property for DME2 in DMaaP");
558
559         }
560     }
561 }
562
563