6d34d32b2906f67793aa00dfca0ecc0492b38707
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import com.att.nsa.cambria.client.CambriaClientBuilders;
25 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
26 import com.att.nsa.cambria.client.CambriaConsumer;
27 import com.att.nsa.mr.client.MRClientFactory;
28 import com.att.nsa.mr.client.impl.MRConsumerImpl;
29 import com.att.nsa.mr.client.response.MRConsumerResponse;
30 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
31
32 import java.io.IOException;
33 import java.net.MalformedURLException;
34 import java.security.GeneralSecurityException;
35 import java.util.ArrayList;
36 import java.util.Map;
37 import java.util.Properties;
38
39 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory;
40 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 /**
45  * Wrapper around libraries to consume from message bus
46  */
47 public interface BusConsumer {
48
49     /**
50      * fetch messages
51      *
52      * @return list of messages
53      * @throws Exception when error encountered by underlying libraries
54      */
55     public Iterable<String> fetch() throws InterruptedException, IOException;
56
57     /**
58      * close underlying library consumer
59      */
60     public void close();
61
62     /**
63      * BusConsumer that supports server-side filtering.
64      */
65     public interface FilterableBusConsumer extends BusConsumer {
66
67         /**
68          * Sets the server-side filter.
69          *
70          * @param filter new filter value, or {@code null}
71          * @throws IllegalArgumentException if the consumer cannot be built with the new filter
72          */
73         public void setFilter(String filter);
74     }
75
76     /**
77      * Cambria based consumer
78      */
79     public static class CambriaConsumerWrapper implements FilterableBusConsumer {
80
81         /**
82          * logger
83          */
84         private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
85
86         /**
87          * Used to build the consumer.
88          */
89         private final ConsumerBuilder builder;
90
91         /**
92          * Locked while updating {@link #consumer} and {@link #newConsumer}.
93          */
94         private final Object consLocker = new Object();
95
96         /**
97          * Cambria client
98          */
99         private CambriaConsumer consumer;
100
101         /**
102          * Cambria client to use for next fetch
103          */
104         private CambriaConsumer newConsumer = null;
105
106         /**
107          * fetch timeout
108          */
109         protected int fetchTimeout;
110
111         /**
112          * close condition
113          */
114         protected Object closeCondition = new Object();
115
116         /**
117          * Cambria Consumer Wrapper
118          * BusTopicParam object contains the following parameters
119          * servers messaging bus hosts
120          * topic topic
121          * apiKey API Key
122          * apiSecret API Secret
123          * consumerGroup Consumer Group
124          * consumerInstance Consumer Instance
125          * fetchTimeout Fetch Timeout
126          * fetchLimit Fetch Limit
127          *
128          * @param busTopicParams
129          * @throws GeneralSecurityException
130          * @throws MalformedURLException
131          */
132         public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
133
134             this.fetchTimeout = busTopicParams.getFetchTimeout();
135
136             this.builder = new CambriaClientBuilders.ConsumerBuilder();
137
138             builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
139                     .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
140                     .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
141
142             // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
143             builder.withSocketTimeout(fetchTimeout + 30000);
144
145             if (busTopicParams.isUseHttps()) {
146                 builder.usingHttps();
147
148                 if (busTopicParams.isAllowSelfSignedCerts()) {
149                     builder.allowSelfSignedCertificates();
150                 }
151             }
152
153             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
154                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
155             }
156
157             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
158                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
159             }
160
161             try {
162                 this.consumer = builder.build();
163             } catch (MalformedURLException | GeneralSecurityException e) {
164                 throw new IllegalArgumentException(e);
165             }
166         }
167
168         @Override
169         public Iterable<String> fetch() throws IOException, InterruptedException {
170             try {
171                 return getCurrentConsumer().fetch();
172             } catch (final IOException e) {
173                 logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
174                         this.fetchTimeout);
175                 synchronized (this.closeCondition) {
176                     this.closeCondition.wait(this.fetchTimeout);
177                 }
178
179                 throw e;
180             }
181         }
182
183         @Override
184         public void close() {
185             synchronized (closeCondition) {
186                 closeCondition.notifyAll();
187             }
188
189             getCurrentConsumer().close();
190         }
191
192         private CambriaConsumer getCurrentConsumer() {
193             CambriaConsumer old = null;
194             CambriaConsumer ret;
195
196             synchronized (consLocker) {
197                 if (this.newConsumer != null) {
198                     // replace old consumer with new consumer
199                     old = this.consumer;
200                     this.consumer = this.newConsumer;
201                     this.newConsumer = null;
202                 }
203
204                 ret = this.consumer;
205             }
206
207             if (old != null) {
208                 old.close();
209             }
210
211             return ret;
212         }
213
214         @Override
215         public void setFilter(String filter) {
216             logger.info("{}: setting DMAAP server-side filter: {}", this, filter);
217             builder.withServerSideFilter(filter);
218
219             try {
220                 CambriaConsumer previous;
221                 synchronized (consLocker) {
222                     previous = this.newConsumer;
223                     this.newConsumer = builder.build();
224                 }
225
226                 if (previous != null) {
227                     // there was already a new consumer - close it
228                     previous.close();
229                 }
230
231             } catch (MalformedURLException | GeneralSecurityException e) {
232                 /*
233                  * Since an exception occurred, "consumer" still has its old value, thus it should
234                  * not be closed at this point.
235                  */
236                 throw new IllegalArgumentException(e);
237             }
238         }
239
240         @Override
241         public String toString() {
242             return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
243         }
244     }
245
246     /**
247      * MR based consumer
248      */
249     public abstract class DmaapConsumerWrapper implements BusConsumer {
250
251         /**
252          * logger
253          */
254         private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
255
256         /**
257          * Name of the "protocol" property.
258          */
259         protected static final String PROTOCOL_PROP = "Protocol";
260
261         /**
262          * fetch timeout
263          */
264         protected int fetchTimeout;
265
266         /**
267          * close condition
268          */
269         protected Object closeCondition = new Object();
270
271         /**
272          * MR Consumer
273          */
274         protected MRConsumerImpl consumer;
275
276         /**
277          * MR Consumer Wrapper
278          * <p>
279          * servers          messaging bus hosts
280          * topic            topic
281          * apiKey           API Key
282          * apiSecret        API Secret
283          * username         AAF Login
284          * password         AAF Password
285          * consumerGroup    Consumer Group
286          * consumerInstance Consumer Instance
287          * fetchTimeout     Fetch Timeout
288          * fetchLimit       Fetch Limit
289          *
290          * @param busTopicParams contains above listed attributes
291          * @throws MalformedURLException
292          */
293         public DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
294
295             this.fetchTimeout = busTopicParams.getFetchTimeout();
296
297             if (busTopicParams.isTopicNullOrEmpty()) {
298                 throw new IllegalArgumentException("No topic for DMaaP");
299             }
300
301             this.consumer = new MRConsumerImpl(busTopicParams.getServers(), busTopicParams.getTopic(),
302                     busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance(),
303                     busTopicParams.getFetchTimeout(), busTopicParams.getFetchLimit(), null,
304                     busTopicParams.getApiKey(), busTopicParams.getApiSecret());
305
306             this.consumer.setUsername(busTopicParams.getUserName());
307             this.consumer.setPassword(busTopicParams.getPassword());
308         }
309
310         @Override
311         public Iterable<String> fetch() throws InterruptedException, IOException {
312             final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
313             if (response == null) {
314                 logger.warn("{}: DMaaP NULL response received", this);
315
316                 synchronized (closeCondition) {
317                     closeCondition.wait(fetchTimeout);
318                 }
319                 return new ArrayList<>();
320             } else {
321                 logger.debug("DMaaP consumer received {} : {}" + response.getResponseCode(),
322                         response.getResponseMessage());
323
324                 if (response.getResponseCode() == null || !"200".equals(response.getResponseCode())) {
325
326                     logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
327                             response.getResponseMessage());
328
329                     synchronized (closeCondition) {
330                         closeCondition.wait(fetchTimeout);
331                     }
332
333                     /* fall through */
334                 }
335             }
336
337             if (response.getActualMessages() == null) {
338                 return new ArrayList<>();
339             } else {
340                 return response.getActualMessages();
341             }
342         }
343
344         @Override
345         public void close() {
346             synchronized (closeCondition) {
347                 closeCondition.notifyAll();
348             }
349
350             this.consumer.close();
351         }
352
353         @Override
354         public String toString() {
355             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
356                     + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
357                     + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
358                     + consumer.getUsername() + "]";
359         }
360     }
361
362     /**
363      * MR based consumer
364      */
365     public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
366
367         private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
368
369         private final Properties props;
370
371         /**
372          * BusTopicParams contain the following parameters
373          * MR Consumer Wrapper
374          * <p>
375          * servers messaging bus hosts
376          * topic topic
377          * apiKey API Key
378          * apiSecret API Secret
379          * aafLogin AAF Login
380          * aafPassword AAF Password
381          * consumerGroup Consumer Group
382          * consumerInstance Consumer Instance
383          * fetchTimeout Fetch Timeout
384          * fetchLimit Fetch Limit
385          *
386          * @param busTopicParams contains above listed params
387          * @throws MalformedURLException
388          */
389         public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
390
391             super(busTopicParams);
392
393             // super constructor sets servers = {""} if empty to avoid errors when using DME2
394             if (busTopicParams.isServersNullOrEmpty()) {
395                 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
396             }
397
398             this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
399
400             props = new Properties();
401
402             if (busTopicParams.isUseHttps()) {
403                 props.setProperty(PROTOCOL_PROP, "https");
404                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
405
406             } else {
407                 props.setProperty(PROTOCOL_PROP, "http");
408                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
409             }
410
411             this.consumer.setProps(props);
412             logger.info("{}: CREATION", this);
413         }
414
415         @Override
416         public String toString() {
417             final MRConsumerImpl consumer = this.consumer;
418
419             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
420                     + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
421                     + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
422                     + consumer.getUsername() + "]";
423         }
424     }
425
426     public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
427
428         private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
429
430         private final Properties props;
431
432         public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
433
434
435             super(busTopicParams);
436
437
438             final String dme2RouteOffer = busTopicParams.getAdditionalProps()
439                     .get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
440
441             if (busTopicParams.isEnvironmentNullOrEmpty()) {
442                 throw parmException(busTopicParams.getTopic(),
443                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
444             }
445             if (busTopicParams.isAftEnvironmentNullOrEmpty()) {
446                 throw parmException(busTopicParams.getTopic(),
447                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
448             }
449             if (busTopicParams.isLatitudeNullOrEmpty()) {
450                 throw parmException(busTopicParams.getTopic(),
451                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
452             }
453             if (busTopicParams.isLongitudeNullOrEmpty()) {
454                 throw parmException(busTopicParams.getTopic(),
455                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
456             }
457
458             if ((busTopicParams.isPartnerNullOrEmpty())
459                     && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
460                 throw new IllegalArgumentException(
461                         "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
462                                 + "." + busTopicParams.getTopic()
463                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
464                                 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
465                                 + busTopicParams.getTopic()
466                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
467             }
468
469             final String serviceName = busTopicParams.getServers().get(0);
470
471             this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
472
473             this.consumer.setUsername(busTopicParams.getUserName());
474             this.consumer.setPassword(busTopicParams.getPassword());
475
476             props = new Properties();
477
478             props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
479
480             props.setProperty("username", busTopicParams.getUserName());
481             props.setProperty("password", busTopicParams.getPassword());
482
483             /* These are required, no defaults */
484             props.setProperty("topic", busTopicParams.getTopic());
485
486             props.setProperty("Environment", busTopicParams.getEnvironment());
487             props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
488
489             if (busTopicParams.getPartner() != null) {
490                 props.setProperty("Partner", busTopicParams.getPartner());
491             }
492             if (dme2RouteOffer != null) {
493                 props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
494             }
495
496             props.setProperty("Latitude", busTopicParams.getLatitude());
497             props.setProperty("Longitude", busTopicParams.getLongitude());
498
499             /* These are optional, will default to these values if not set in additionalProps */
500             props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
501             props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
502             props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
503             props.setProperty("Version", "1.0");
504             props.setProperty("SubContextPath", "/");
505             props.setProperty("sessionstickinessrequired", "no");
506
507             /* These should not change */
508             props.setProperty("TransportType", "DME2");
509             props.setProperty("MethodType", "GET");
510
511             if (busTopicParams.isUseHttps()) {
512                 props.setProperty(PROTOCOL_PROP, "https");
513
514             } else {
515                 props.setProperty(PROTOCOL_PROP, "http");
516             }
517
518             props.setProperty("contenttype", "application/json");
519
520             if (busTopicParams.isAdditionalPropsValid()) {
521                 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
522                     props.put(entry.getKey(), entry.getValue());
523                 }
524             }
525
526             MRClientFactory.prop = props;
527             this.consumer.setProps(props);
528
529             logger.info("{}: CREATION", this);
530         }
531
532         private IllegalArgumentException parmException(String topic, String propnm) {
533             return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
534                     + topic + propnm + " property for DME2 in DMaaP");
535
536         }
537     }
538 }
539
540