abf793d6d97f59e104a655bf0a8e5adda0252609
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import com.att.nsa.cambria.client.CambriaClientBuilders;
25 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
26 import com.att.nsa.cambria.client.CambriaConsumer;
27
28 import java.io.IOException;
29 import java.net.MalformedURLException;
30 import java.security.GeneralSecurityException;
31 import java.util.ArrayList;
32 import java.util.Map;
33 import java.util.Properties;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit;
36 import org.apache.commons.lang3.StringUtils;
37 import org.onap.dmaap.mr.client.MRClientFactory;
38 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
39 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
40 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
41 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory;
42 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * Wrapper around libraries to consume from message bus.
48  */
49 public interface BusConsumer {
50
51     /**
52      * fetch messages.
53      *
54      * @return list of messages
55      * @throws Exception when error encountered by underlying libraries
56      */
57     public Iterable<String> fetch() throws InterruptedException, IOException;
58
59     /**
60      * close underlying library consumer.
61      */
62     public void close();
63
64     /**
65      * BusConsumer that supports server-side filtering.
66      */
67     public interface FilterableBusConsumer extends BusConsumer {
68
69         /**
70          * Sets the server-side filter.
71          *
72          * @param filter new filter value, or {@code null}
73          * @throws IllegalArgumentException if the consumer cannot be built with the new filter
74          */
75         public void setFilter(String filter);
76     }
77
78     /**
79      * Cambria based consumer.
80      */
81     public static class CambriaConsumerWrapper implements FilterableBusConsumer {
82
83         /**
84          * logger.
85          */
86         private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
87
88         /**
89          * Used to build the consumer.
90          */
91         private final ConsumerBuilder builder;
92
93         /**
94          * Locked while updating {@link #consumer} and {@link #newConsumer}.
95          */
96         private final Object consLocker = new Object();
97
98         /**
99          * Cambria client.
100          */
101         private CambriaConsumer consumer;
102
103         /**
104          * Cambria client to use for next fetch.
105          */
106         private CambriaConsumer newConsumer = null;
107
108         /**
109          * fetch timeout.
110          */
111         protected int fetchTimeout;
112
113         /**
114          * close condition.
115          */
116         protected CountDownLatch closeCondition = new CountDownLatch(1);
117
118         /**
119          * Cambria Consumer Wrapper.
120          * BusTopicParam object contains the following parameters
121          * servers messaging bus hosts.
122          * topic topic
123          * apiKey API Key
124          * apiSecret API Secret
125          * consumerGroup Consumer Group
126          * consumerInstance Consumer Instance
127          * fetchTimeout Fetch Timeout
128          * fetchLimit Fetch Limit
129          *
130          * @param busTopicParams - The parameters for the bus topic
131          * @throws GeneralSecurityException - Security exception
132          * @throws MalformedURLException - Malformed URL exception
133          */
134         public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
135
136             this.fetchTimeout = busTopicParams.getFetchTimeout();
137
138             this.builder = new CambriaClientBuilders.ConsumerBuilder();
139
140             builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
141                     .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
142                     .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
143
144             // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
145             builder.withSocketTimeout(fetchTimeout + 30000);
146
147             if (busTopicParams.isUseHttps()) {
148                 builder.usingHttps();
149
150                 if (busTopicParams.isAllowSelfSignedCerts()) {
151                     builder.allowSelfSignedCertificates();
152                 }
153             }
154
155             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
156                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
157             }
158
159             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
160                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
161             }
162
163             try {
164                 this.consumer = builder.build();
165             } catch (MalformedURLException | GeneralSecurityException e) {
166                 throw new IllegalArgumentException(e);
167             }
168         }
169
170         @Override
171         public Iterable<String> fetch() throws IOException, InterruptedException {
172             try {
173                 return getCurrentConsumer().fetch();
174             } catch (final IOException e) {
175                 logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
176                         this.fetchTimeout, e);
177
178                 this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS);
179
180                 throw e;
181             }
182         }
183
184         @Override
185         public void close() {
186             this.closeCondition.countDown();
187             getCurrentConsumer().close();
188         }
189
190         private CambriaConsumer getCurrentConsumer() {
191             CambriaConsumer old = null;
192             CambriaConsumer ret;
193
194             synchronized (consLocker) {
195                 if (this.newConsumer != null) {
196                     // replace old consumer with new consumer
197                     old = this.consumer;
198                     this.consumer = this.newConsumer;
199                     this.newConsumer = null;
200                 }
201
202                 ret = this.consumer;
203             }
204
205             if (old != null) {
206                 old.close();
207             }
208
209             return ret;
210         }
211
212         @Override
213         public void setFilter(String filter) {
214             logger.info("{}: setting DMAAP server-side filter: {}", this, filter);
215             builder.withServerSideFilter(filter);
216
217             try {
218                 CambriaConsumer previous;
219                 synchronized (consLocker) {
220                     previous = this.newConsumer;
221                     this.newConsumer = builder.build();
222                 }
223
224                 if (previous != null) {
225                     // there was already a new consumer - close it
226                     previous.close();
227                 }
228
229             } catch (MalformedURLException | GeneralSecurityException e) {
230                 /*
231                  * Since an exception occurred, "consumer" still has its old value, thus it should
232                  * not be closed at this point.
233                  */
234                 throw new IllegalArgumentException(e);
235             }
236         }
237
238         @Override
239         public String toString() {
240             return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
241         }
242     }
243
244     /**
245      * MR based consumer.
246      */
247     public abstract class DmaapConsumerWrapper implements BusConsumer {
248
249         /**
250          * logger.
251          */
252         private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
253
254         /**
255          * Name of the "protocol" property.
256          */
257         protected static final String PROTOCOL_PROP = "Protocol";
258
259         /**
260          * fetch timeout.
261          */
262         protected int fetchTimeout;
263
264         /**
265          * close condition.
266          */
267         protected CountDownLatch closeCondition = new CountDownLatch(1);
268
269         /**
270          * MR Consumer.
271          */
272         protected MRConsumerImpl consumer;
273
274         /**
275          * MR Consumer Wrapper.
276          *
277          * <p>servers          messaging bus hosts
278          * topic            topic
279          * apiKey           API Key
280          * apiSecret        API Secret
281          * username         AAF Login
282          * password         AAF Password
283          * consumerGroup    Consumer Group
284          * consumerInstance Consumer Instance
285          * fetchTimeout     Fetch Timeout
286          * fetchLimit       Fetch Limit
287          *
288          * @param busTopicParams contains above listed attributes
289          * @throws MalformedURLException URL should be valid
290          */
291         public DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
292
293             this.fetchTimeout = busTopicParams.getFetchTimeout();
294
295             if (busTopicParams.isTopicInvalid()) {
296                 throw new IllegalArgumentException("No topic for DMaaP");
297             }
298
299             this.consumer = new MRConsumerImpl(busTopicParams.getServers(), busTopicParams.getTopic(),
300                     busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance(),
301                     busTopicParams.getFetchTimeout(), busTopicParams.getFetchLimit(), null,
302                     busTopicParams.getApiKey(), busTopicParams.getApiSecret());
303
304             this.consumer.setUsername(busTopicParams.getUserName());
305             this.consumer.setPassword(busTopicParams.getPassword());
306         }
307
308         @Override
309         public Iterable<String> fetch() throws InterruptedException, IOException {
310             final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
311             if (response == null) {
312                 logger.warn("{}: DMaaP NULL response received", this);
313
314                 closeCondition.await(fetchTimeout, TimeUnit.MILLISECONDS);
315                 return new ArrayList<>();
316             } else {
317                 logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
318                         response.getResponseMessage());
319
320                 if (!"200".equals(response.getResponseCode())) {
321
322                     logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
323                             response.getResponseMessage());
324
325                     closeCondition.await(fetchTimeout, TimeUnit.MILLISECONDS);
326
327                     /* fall through */
328                 }
329             }
330
331             if (response.getActualMessages() == null) {
332                 return new ArrayList<>();
333             } else {
334                 return response.getActualMessages();
335             }
336         }
337
338         @Override
339         public void close() {
340             this.closeCondition.countDown();
341             this.consumer.close();
342         }
343
344         @Override
345         public String toString() {
346             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
347                     + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
348                     + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
349                     + consumer.getUsername() + "]";
350         }
351     }
352
353     /**
354      * MR based consumer.
355      */
356     public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
357
358         private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
359
360         private final Properties props;
361
362         /**
363          * BusTopicParams contain the following parameters.
364          * MR Consumer Wrapper.
365          *
366          * <p>servers messaging bus hosts
367          * topic topic
368          * apiKey API Key
369          * apiSecret API Secret
370          * aafLogin AAF Login
371          * aafPassword AAF Password
372          * consumerGroup Consumer Group
373          * consumerInstance Consumer Instance
374          * fetchTimeout Fetch Timeout
375          * fetchLimit Fetch Limit
376          *
377          * @param busTopicParams contains above listed params
378          * @throws MalformedURLException URL should be valid
379          */
380         public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
381
382             super(busTopicParams);
383
384             // super constructor sets servers = {""} if empty to avoid errors when using DME2
385             if (busTopicParams.isServersInvalid()) {
386                 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
387             }
388
389             this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
390
391             props = new Properties();
392
393             if (busTopicParams.isUseHttps()) {
394                 props.setProperty(PROTOCOL_PROP, "https");
395                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
396
397             } else {
398                 props.setProperty(PROTOCOL_PROP, "http");
399                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
400             }
401
402             this.consumer.setProps(props);
403             logger.info("{}: CREATION", this);
404         }
405
406         @Override
407         public String toString() {
408             final MRConsumerImpl consumer = this.consumer;
409
410             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
411                     + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
412                     + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
413                     + consumer.getUsername() + "]";
414         }
415     }
416
417     public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
418
419         private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
420
421         private final Properties props;
422
423         /**
424          * Constructor.
425          *
426          * @param busTopicParams topic paramters
427          *
428          * @throws MalformedURLException must provide a valid URL
429          */
430         public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
431
432
433             super(busTopicParams);
434
435
436             final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
437                             ? busTopicParams.getAdditionalProps().get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY)
438                             : null);
439
440             if (busTopicParams.isEnvironmentInvalid()) {
441                 throw parmException(busTopicParams.getTopic(),
442                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
443             }
444             if (busTopicParams.isAftEnvironmentInvalid()) {
445                 throw parmException(busTopicParams.getTopic(),
446                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
447             }
448             if (busTopicParams.isLatitudeInvalid()) {
449                 throw parmException(busTopicParams.getTopic(),
450                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
451             }
452             if (busTopicParams.isLongitudeInvalid()) {
453                 throw parmException(busTopicParams.getTopic(),
454                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
455             }
456
457             if ((busTopicParams.isPartnerInvalid())
458                     && StringUtils.isBlank(dme2RouteOffer)) {
459                 throw new IllegalArgumentException(
460                         "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
461                                 + "." + busTopicParams.getTopic()
462                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
463                                 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
464                                 + busTopicParams.getTopic()
465                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
466             }
467
468             final String serviceName = busTopicParams.getServers().get(0);
469
470             this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
471
472             this.consumer.setUsername(busTopicParams.getUserName());
473             this.consumer.setPassword(busTopicParams.getPassword());
474
475             props = new Properties();
476
477             props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
478
479             props.setProperty("username", busTopicParams.getUserName());
480             props.setProperty("password", busTopicParams.getPassword());
481
482             /* These are required, no defaults */
483             props.setProperty("topic", busTopicParams.getTopic());
484
485             props.setProperty("Environment", busTopicParams.getEnvironment());
486             props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
487
488             if (busTopicParams.getPartner() != null) {
489                 props.setProperty("Partner", busTopicParams.getPartner());
490             }
491             if (dme2RouteOffer != null) {
492                 props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
493             }
494
495             props.setProperty("Latitude", busTopicParams.getLatitude());
496             props.setProperty("Longitude", busTopicParams.getLongitude());
497
498             /* These are optional, will default to these values if not set in additionalProps */
499             props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
500             props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
501             props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
502             props.setProperty("Version", "1.0");
503             props.setProperty("SubContextPath", "/");
504             props.setProperty("sessionstickinessrequired", "no");
505
506             /* These should not change */
507             props.setProperty("TransportType", "DME2");
508             props.setProperty("MethodType", "GET");
509
510             if (busTopicParams.isUseHttps()) {
511                 props.setProperty(PROTOCOL_PROP, "https");
512
513             } else {
514                 props.setProperty(PROTOCOL_PROP, "http");
515             }
516
517             props.setProperty("contenttype", "application/json");
518
519             if (busTopicParams.isAdditionalPropsValid()) {
520                 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
521                     props.put(entry.getKey(), entry.getValue());
522                 }
523             }
524
525             MRClientFactory.prop = props;
526             this.consumer.setProps(props);
527
528             logger.info("{}: CREATION", this);
529         }
530
531         private IllegalArgumentException parmException(String topic, String propnm) {
532             return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
533                     + topic + propnm + " property for DME2 in DMaaP");
534
535         }
536     }
537 }
538
539