636dc6e3960254235a976d6361c2847ced46d091
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
22
23 import com.att.nsa.cambria.client.CambriaClientBuilders;
24 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
25 import com.att.nsa.cambria.client.CambriaConsumer;
26 import com.att.nsa.mr.client.MRClientFactory;
27 import com.att.nsa.mr.client.impl.MRConsumerImpl;
28 import com.att.nsa.mr.client.response.MRConsumerResponse;
29 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
30
31 import java.io.IOException;
32 import java.net.MalformedURLException;
33 import java.security.GeneralSecurityException;
34 import java.util.ArrayList;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Properties;
38
39 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory;
40 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 /**
45  * Wrapper around libraries to consume from message bus
46  *
47  */
48 public interface BusConsumer {
49
50     /**
51      * fetch messages
52      *
53      * @return list of messages
54      * @throws Exception when error encountered by underlying libraries
55      */
56     public Iterable<String> fetch() throws InterruptedException, IOException;
57
58     /**
59      * close underlying library consumer
60      */
61     public void close();
62
63     /**
64      * BusConsumer that supports server-side filtering.
65      */
66     public interface FilterableBusConsumer extends BusConsumer {
67
68         /**
69          * Sets the server-side filter.
70          * 
71          * @param filter new filter value, or {@code null}
72          * @throws IllegalArgumentException if the consumer cannot be built with the new filter
73          */
74         public void setFilter(String filter);
75     }
76
77     /**
78      * Cambria based consumer
79      */
80     public static class CambriaConsumerWrapper implements FilterableBusConsumer {
81
82         /**
83          * logger
84          */
85         private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
86
87         /**
88          * Used to build the consumer.
89          */
90         private final ConsumerBuilder builder;
91
92         /**
93          * Locked while updating {@link #consumer} and {@link #newConsumer}.
94          */
95         private final Object consLocker = new Object();
96
97         /**
98          * Cambria client
99          */
100         private CambriaConsumer consumer;
101
102         /**
103          * Cambria client to use for next fetch
104          */
105         private CambriaConsumer newConsumer = null;
106
107         /**
108          * fetch timeout
109          */
110         protected int fetchTimeout;
111
112         /**
113          * close condition
114          */
115         protected Object closeCondition = new Object();
116
117         /**
118          * Cambria Consumer Wrapper
119          *
120          * @param servers messaging bus hosts
121          * @param topic topic
122          * @param apiKey API Key
123          * @param apiSecret API Secret
124          * @param consumerGroup Consumer Group
125          * @param consumerInstance Consumer Instance
126          * @param fetchTimeout Fetch Timeout
127          * @param fetchLimit Fetch Limit
128          * @throws GeneralSecurityException
129          * @throws MalformedURLException
130          */
131         public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
132                 String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps,
133                 boolean useSelfSignedCerts) {
134             this(servers, topic, apiKey, apiSecret, null, null, consumerGroup, consumerInstance, fetchTimeout,
135                     fetchLimit, useHttps, useSelfSignedCerts);
136         }
137
138         public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
139                 String username, String password, String consumerGroup, String consumerInstance, int fetchTimeout,
140                 int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) {
141
142             this.fetchTimeout = fetchTimeout;
143
144             this.builder = new CambriaClientBuilders.ConsumerBuilder();
145
146             builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
147                     .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit);
148
149             // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
150             builder.withSocketTimeout(fetchTimeout + 30000);
151
152             if (useHttps) {
153                 builder.usingHttps();
154
155                 if (useSelfSignedCerts) {
156                     builder.allowSelfSignedCertificates();
157                 }
158             }
159
160             if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) {
161                 builder.authenticatedBy(apiKey, apiSecret);
162             }
163
164             if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) {
165                 builder.authenticatedByHttp(username, password);
166             }
167
168             try {
169                 this.consumer = builder.build();
170             } catch (MalformedURLException | GeneralSecurityException e) {
171                 throw new IllegalArgumentException(e);
172             }
173         }
174
175         @Override
176         public Iterable<String> fetch() throws IOException, InterruptedException {
177             try {
178                 return getCurrentConsumer().fetch();
179             } catch (final IOException e) {
180                 logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
181                         this.fetchTimeout);
182                 synchronized (this.closeCondition) {
183                     this.closeCondition.wait(this.fetchTimeout);
184                 }
185
186                 throw e;
187             }
188         }
189
190         @Override
191         public void close() {
192             synchronized (closeCondition) {
193                 closeCondition.notifyAll();
194             }
195
196             getCurrentConsumer().close();
197         }
198
199         private CambriaConsumer getCurrentConsumer() {
200             CambriaConsumer old = null;
201             CambriaConsumer ret;
202
203             synchronized (consLocker) {
204                 if (this.newConsumer != null) {
205                     // replace old consumer with new consumer
206                     old = this.consumer;
207                     this.consumer = this.newConsumer;
208                     this.newConsumer = null;
209                 }
210
211                 ret = this.consumer;
212             }
213
214             if (old != null) {
215                 old.close();
216             }
217
218             return ret;
219         }
220
221         @Override
222         public void setFilter(String filter) {
223             logger.info("{}: setting DMAAP server-side filter: {}", this, filter);
224             builder.withServerSideFilter(filter);
225
226             try {
227                 CambriaConsumer previous;
228                 synchronized (consLocker) {
229                     previous = this.newConsumer;
230                     this.newConsumer = builder.build();
231                 }
232
233                 if (previous != null) {
234                     // there was already a new consumer - close it
235                     previous.close();
236                 }
237
238             } catch (MalformedURLException | GeneralSecurityException e) {
239                 /*
240                  * Since an exception occurred, "consumer" still has its old value, thus it should
241                  * not be closed at this point.
242                  */
243                 throw new IllegalArgumentException(e);
244             }
245         }
246
247         @Override
248         public String toString() {
249             return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
250         }
251     }
252
253     /**
254      * MR based consumer
255      */
256     public abstract class DmaapConsumerWrapper implements BusConsumer {
257
258         /**
259          * logger
260          */
261         private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
262
263         /**
264          * Name of the "protocol" property.
265          */
266         protected static final String PROTOCOL_PROP = "Protocol";
267
268         /**
269          * fetch timeout
270          */
271         protected int fetchTimeout;
272
273         /**
274          * close condition
275          */
276         protected Object closeCondition = new Object();
277
278         /**
279          * MR Consumer
280          */
281         protected MRConsumerImpl consumer;
282
283         /**
284          * MR Consumer Wrapper
285          *
286          * @param servers messaging bus hosts
287          * @param topic topic
288          * @param apiKey API Key
289          * @param apiSecret API Secret
290          * @param username AAF Login
291          * @param password AAF Password
292          * @param consumerGroup Consumer Group
293          * @param consumerInstance Consumer Instance
294          * @param fetchTimeout Fetch Timeout
295          * @param fetchLimit Fetch Limit
296          * @throws MalformedURLException
297          */
298         public DmaapConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
299                 String username, String password, String consumerGroup, String consumerInstance, int fetchTimeout,
300                 int fetchLimit) throws MalformedURLException {
301
302             this.fetchTimeout = fetchTimeout;
303
304             if (topic == null || topic.isEmpty()) {
305                 throw new IllegalArgumentException("No topic for DMaaP");
306             }
307
308             this.consumer = new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance, fetchTimeout,
309                     fetchLimit, null, apiKey, apiSecret);
310
311             this.consumer.setUsername(username);
312             this.consumer.setPassword(password);
313         }
314
315         @Override
316         public Iterable<String> fetch() throws InterruptedException, IOException {
317             final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
318             if (response == null) {
319                 logger.warn("{}: DMaaP NULL response received", this);
320
321                 synchronized (closeCondition) {
322                     closeCondition.wait(fetchTimeout);
323                 }
324                 return new ArrayList<>();
325             } else {
326                 logger.debug("DMaaP consumer received {} : {}" + response.getResponseCode(),
327                         response.getResponseMessage());
328
329                 if (response.getResponseCode() == null || !"200".equals(response.getResponseCode())) {
330
331                     logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
332                             response.getResponseMessage());
333
334                     synchronized (closeCondition) {
335                         closeCondition.wait(fetchTimeout);
336                     }
337
338                     /* fall through */
339                 }
340             }
341
342             if (response.getActualMessages() == null) {
343                 return new ArrayList<>();
344             } else {
345                 return response.getActualMessages();
346             }
347         }
348
349         @Override
350         public void close() {
351             synchronized (closeCondition) {
352                 closeCondition.notifyAll();
353             }
354
355             this.consumer.close();
356         }
357
358         @Override
359         public String toString() {
360             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
361                     + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
362                     + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
363                     + consumer.getUsername() + "]";
364         }
365     }
366
367     /**
368      * MR based consumer
369      */
370     public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
371
372         private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
373
374         private final Properties props;
375
376         /**
377          * MR Consumer Wrapper
378          *
379          * @param servers messaging bus hosts
380          * @param topic topic
381          * @param apiKey API Key
382          * @param apiSecret API Secret
383          * @param aafLogin AAF Login
384          * @param aafPassword AAF Password
385          * @param consumerGroup Consumer Group
386          * @param consumerInstance Consumer Instance
387          * @param fetchTimeout Fetch Timeout
388          * @param fetchLimit Fetch Limit
389          * @throws MalformedURLException
390          */
391         public DmaapAafConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
392                 String aafLogin, String aafPassword, String consumerGroup, String consumerInstance, int fetchTimeout,
393                 int fetchLimit, boolean useHttps) throws MalformedURLException {
394
395             super(servers, topic, apiKey, apiSecret, aafLogin, aafPassword, consumerGroup, consumerInstance,
396                     fetchTimeout, fetchLimit);
397
398             // super constructor sets servers = {""} if empty to avoid errors when using DME2
399             if ((servers.size() == 1 && ("".equals(servers.get(0)))) || (servers == null) || (servers.isEmpty())) {
400                 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
401             }
402
403             this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
404
405             props = new Properties();
406
407             if (useHttps) {
408                 props.setProperty(PROTOCOL_PROP, "https");
409                 this.consumer.setHost(servers.get(0) + ":3905");
410
411             } else {
412                 props.setProperty(PROTOCOL_PROP, "http");
413                 this.consumer.setHost(servers.get(0) + ":3904");
414             }
415
416             this.consumer.setProps(props);
417             logger.info("{}: CREATION", this);
418         }
419
420         @Override
421         public String toString() {
422             final MRConsumerImpl consumer = this.consumer;
423
424             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
425                     + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
426                     + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
427                     + consumer.getUsername() + "]";
428         }
429     }
430
431     public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
432
433         private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
434
435         private final Properties props;
436
437         public DmaapDmeConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
438                 String dme2Login, String dme2Password, String consumerGroup, String consumerInstance, int fetchTimeout,
439                 int fetchLimit, String environment, String aftEnvironment, String dme2Partner, String latitude,
440                 String longitude, Map<String, String> additionalProps, boolean useHttps) throws MalformedURLException {
441
442
443
444             super(servers, topic, apiKey, apiSecret, dme2Login, dme2Password, consumerGroup, consumerInstance,
445                     fetchTimeout, fetchLimit);
446
447
448             final String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
449
450             if (environment == null || environment.isEmpty()) {
451                 throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
452             }
453             if (aftEnvironment == null || aftEnvironment.isEmpty()) {
454                 throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
455             }
456             if (latitude == null || latitude.isEmpty()) {
457                 throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
458             }
459             if (longitude == null || longitude.isEmpty()) {
460                 throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
461             }
462
463             if ((dme2Partner == null || dme2Partner.isEmpty())
464                     && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
465                 throw new IllegalArgumentException(
466                         "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
467                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
468                                 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
469                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
470             }
471
472             final String serviceName = servers.get(0);
473
474             this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
475
476             this.consumer.setUsername(dme2Login);
477             this.consumer.setPassword(dme2Password);
478
479             props = new Properties();
480
481             props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
482
483             props.setProperty("username", dme2Login);
484             props.setProperty("password", dme2Password);
485
486             /* These are required, no defaults */
487             props.setProperty("topic", topic);
488
489             props.setProperty("Environment", environment);
490             props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
491
492             if (dme2Partner != null) {
493                 props.setProperty("Partner", dme2Partner);
494             }
495             if (dme2RouteOffer != null) {
496                 props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
497             }
498
499             props.setProperty("Latitude", latitude);
500             props.setProperty("Longitude", longitude);
501
502             /* These are optional, will default to these values if not set in additionalProps */
503             props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
504             props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
505             props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
506             props.setProperty("Version", "1.0");
507             props.setProperty("SubContextPath", "/");
508             props.setProperty("sessionstickinessrequired", "no");
509
510             /* These should not change */
511             props.setProperty("TransportType", "DME2");
512             props.setProperty("MethodType", "GET");
513
514             if (useHttps) {
515                 props.setProperty(PROTOCOL_PROP, "https");
516
517             } else {
518                 props.setProperty(PROTOCOL_PROP, "http");
519             }
520
521             props.setProperty("contenttype", "application/json");
522
523             if (additionalProps != null) {
524                 for (Map.Entry<String, String> entry : additionalProps.entrySet()) {
525                     props.put(entry.getKey(), entry.getValue());
526                 }
527             }
528
529             MRClientFactory.prop = props;
530             this.consumer.setProps(props);
531
532             logger.info("{}: CREATION", this);
533         }
534
535         private IllegalArgumentException parmException(String topic, String propnm) {
536             return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
537                     + topic + propnm + " property for DME2 in DMaaP");
538
539         }
540     }
541 }
542
543