1e2c82b13aaf9ba4c3320239afb060b98096b076
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.common.endpoints.event.comm.bus.internal;
24
25 import com.att.nsa.cambria.client.CambriaClientBuilders;
26 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
27 import com.att.nsa.cambria.client.CambriaConsumer;
28 import java.io.IOException;
29 import java.net.MalformedURLException;
30 import java.security.GeneralSecurityException;
31 import java.util.ArrayList;
32 import java.util.Map;
33 import java.util.Properties;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit;
36 import org.apache.commons.lang3.StringUtils;
37 import org.onap.dmaap.mr.client.MRClientFactory;
38 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
39 import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
40 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
41 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
42 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * Wrapper around libraries to consume from message bus.
48  */
49 public interface BusConsumer {
50
51     /**
52      * fetch messages.
53      *
54      * @return list of messages
55      * @throws IOException when error encountered by underlying libraries
56      */
57     public Iterable<String> fetch() throws IOException;
58
59     /**
60      * close underlying library consumer.
61      */
62     public void close();
63
64     /**
65      * Consumer that handles fetch() failures by sleeping.
66      */
67     public abstract static class FetchingBusConsumer implements BusConsumer {
68         private static Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class);
69
70         /**
71          * Fetch timeout.
72          */
73         protected int fetchTimeout;
74
75         /**
76          * Counted down when {@link #close()} is invoked.
77          */
78         private CountDownLatch closeCondition = new CountDownLatch(1);
79
80
81         /**
82          * Constructs the object.
83          *
84          * @param busTopicParams parameters for the bus topic
85          */
86         protected FetchingBusConsumer(BusTopicParams busTopicParams) {
87             this.fetchTimeout = busTopicParams.getFetchTimeout();
88         }
89
90         /**
91          * Causes the thread to sleep; invoked after fetch() fails.  If the consumer is closed,
92          * or the thread is interrupted, then this will return immediately.
93          */
94         protected void sleepAfterFetchFailure() {
95             try {
96                 if (this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS)) {
97                     logger.info("{}: closed while handling fetch error", this);
98                 }
99
100             } catch (InterruptedException e) {
101                 logger.warn("{}: interrupted while handling fetch error", this, e);
102                 Thread.currentThread().interrupt();
103             }
104         }
105
106         @Override
107         public void close() {
108             this.closeCondition.countDown();
109         }
110     }
111
112     /**
113      * Cambria based consumer.
114      */
115     public static class CambriaConsumerWrapper extends FetchingBusConsumer {
116
117         /**
118          * logger.
119          */
120         private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
121
122         /**
123          * Used to build the consumer.
124          */
125         private final ConsumerBuilder builder;
126
127         /**
128          * Cambria client.
129          */
130         private final CambriaConsumer consumer;
131
132         /**
133          * Cambria Consumer Wrapper.
134          * BusTopicParam object contains the following parameters
135          * servers messaging bus hosts.
136          * topic topic
137          * apiKey API Key
138          * apiSecret API Secret
139          * consumerGroup Consumer Group
140          * consumerInstance Consumer Instance
141          * fetchTimeout Fetch Timeout
142          * fetchLimit Fetch Limit
143          *
144          * @param busTopicParams - The parameters for the bus topic
145          * @throws GeneralSecurityException - Security exception
146          * @throws MalformedURLException - Malformed URL exception
147          */
148         public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
149             super(busTopicParams);
150
151             this.builder = new CambriaClientBuilders.ConsumerBuilder();
152
153             builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
154                     .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
155                     .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
156
157             // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
158             builder.withSocketTimeout(fetchTimeout + 30000);
159
160             if (busTopicParams.isUseHttps()) {
161                 builder.usingHttps();
162
163                 if (busTopicParams.isAllowSelfSignedCerts()) {
164                     builder.allowSelfSignedCertificates();
165                 }
166             }
167
168             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
169                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
170             }
171
172             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
173                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
174             }
175
176             try {
177                 this.consumer = builder.build();
178             } catch (MalformedURLException | GeneralSecurityException e) {
179                 throw new IllegalArgumentException(e);
180             }
181         }
182
183         @Override
184         public Iterable<String> fetch() throws IOException {
185             try {
186                 return this.consumer.fetch();
187             } catch (final IOException e) { //NOSONAR
188                 logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
189                         this.fetchTimeout);
190                 sleepAfterFetchFailure();
191                 throw e;
192             }
193         }
194
195         @Override
196         public void close() {
197             super.close();
198             this.consumer.close();
199         }
200
201         @Override
202         public String toString() {
203             return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
204         }
205     }
206
207     /**
208      * MR based consumer.
209      */
210     public abstract class DmaapConsumerWrapper extends FetchingBusConsumer {
211
212         /**
213          * logger.
214          */
215         private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
216
217         /**
218          * Name of the "protocol" property.
219          */
220         protected static final String PROTOCOL_PROP = "Protocol";
221
222         /**
223          * MR Consumer.
224          */
225         protected MRConsumerImpl consumer;
226
227         /**
228          * MR Consumer Wrapper.
229          *
230          * <p>servers          messaging bus hosts
231          * topic            topic
232          * apiKey           API Key
233          * apiSecret        API Secret
234          * username         AAF Login
235          * password         AAF Password
236          * consumerGroup    Consumer Group
237          * consumerInstance Consumer Instance
238          * fetchTimeout     Fetch Timeout
239          * fetchLimit       Fetch Limit
240          *
241          * @param busTopicParams contains above listed attributes
242          * @throws MalformedURLException URL should be valid
243          */
244         protected DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
245             super(busTopicParams);
246
247             if (busTopicParams.isTopicInvalid()) {
248                 throw new IllegalArgumentException("No topic for DMaaP");
249             }
250
251             this.consumer = new MRConsumerImplBuilder()
252                             .setHostPart(busTopicParams.getServers())
253                             .setTopic(busTopicParams.getTopic())
254                             .setConsumerGroup(busTopicParams.getConsumerGroup())
255                             .setConsumerId(busTopicParams.getConsumerInstance())
256                             .setTimeoutMs(busTopicParams.getFetchTimeout())
257                             .setLimit(busTopicParams.getFetchLimit())
258                             .setApiKey(busTopicParams.getApiKey())
259                             .setApiSecret(busTopicParams.getApiSecret())
260                             .createMRConsumerImpl();
261
262             this.consumer.setUsername(busTopicParams.getUserName());
263             this.consumer.setPassword(busTopicParams.getPassword());
264         }
265
266         @Override
267         public Iterable<String> fetch() throws IOException {
268             final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
269             if (response == null) {
270                 logger.warn("{}: DMaaP NULL response received", this);
271
272                 sleepAfterFetchFailure();
273                 return new ArrayList<>();
274             } else {
275                 logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
276                         response.getResponseMessage());
277
278                 if (!"200".equals(response.getResponseCode())) {
279
280                     logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
281                             response.getResponseMessage());
282
283                     sleepAfterFetchFailure();
284
285                     /* fall through */
286                 }
287             }
288
289             if (response.getActualMessages() == null) {
290                 return new ArrayList<>();
291             } else {
292                 return response.getActualMessages();
293             }
294         }
295
296         @Override
297         public void close() {
298             super.close();
299             this.consumer.close();
300         }
301
302         @Override
303         public String toString() {
304             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
305                     + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
306                     + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
307                     + consumer.getUsername() + "]";
308         }
309     }
310
311     /**
312      * MR based consumer.
313      */
314     public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
315
316         private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
317
318         private final Properties props;
319
320         /**
321          * BusTopicParams contain the following parameters.
322          * MR Consumer Wrapper.
323          *
324          * <p>servers messaging bus hosts
325          * topic topic
326          * apiKey API Key
327          * apiSecret API Secret
328          * aafLogin AAF Login
329          * aafPassword AAF Password
330          * consumerGroup Consumer Group
331          * consumerInstance Consumer Instance
332          * fetchTimeout Fetch Timeout
333          * fetchLimit Fetch Limit
334          *
335          * @param busTopicParams contains above listed params
336          * @throws MalformedURLException URL should be valid
337          */
338         public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
339
340             super(busTopicParams);
341
342             // super constructor sets servers = {""} if empty to avoid errors when using DME2
343             if (busTopicParams.isServersInvalid()) {
344                 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
345             }
346
347             this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
348
349             props = new Properties();
350
351             if (busTopicParams.isUseHttps()) {
352                 props.setProperty(PROTOCOL_PROP, "https");
353                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
354
355             } else {
356                 props.setProperty(PROTOCOL_PROP, "http");
357                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
358             }
359
360             this.consumer.setProps(props);
361             logger.info("{}: CREATION", this);
362         }
363
364         @Override
365         public String toString() {
366             final MRConsumerImpl consumer = this.consumer;
367
368             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
369                     + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
370                     + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
371                     + consumer.getUsername() + "]";
372         }
373     }
374
375     public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
376
377         private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
378
379         private final Properties props;
380
381         /**
382          * Constructor.
383          *
384          * @param busTopicParams topic paramters
385          *
386          * @throws MalformedURLException must provide a valid URL
387          */
388         public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
389
390
391             super(busTopicParams);
392
393
394             final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
395                             ? busTopicParams.getAdditionalProps().get(
396                                             PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
397                             : null);
398
399             if (busTopicParams.isEnvironmentInvalid()) {
400                 throw parmException(busTopicParams.getTopic(),
401                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
402             }
403             if (busTopicParams.isAftEnvironmentInvalid()) {
404                 throw parmException(busTopicParams.getTopic(),
405                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
406             }
407             if (busTopicParams.isLatitudeInvalid()) {
408                 throw parmException(busTopicParams.getTopic(),
409                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
410             }
411             if (busTopicParams.isLongitudeInvalid()) {
412                 throw parmException(busTopicParams.getTopic(),
413                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
414             }
415
416             if ((busTopicParams.isPartnerInvalid())
417                     && StringUtils.isBlank(dme2RouteOffer)) {
418                 throw new IllegalArgumentException(
419                         "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
420                                 + "." + busTopicParams.getTopic()
421                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
422                                 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
423                                 + busTopicParams.getTopic()
424                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
425             }
426
427             final String serviceName = busTopicParams.getServers().get(0);
428
429             this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
430
431             this.consumer.setUsername(busTopicParams.getUserName());
432             this.consumer.setPassword(busTopicParams.getPassword());
433
434             props = new Properties();
435
436             props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
437
438             props.setProperty("username", busTopicParams.getUserName());
439             props.setProperty("password", busTopicParams.getPassword());
440
441             /* These are required, no defaults */
442             props.setProperty("topic", busTopicParams.getTopic());
443
444             props.setProperty("Environment", busTopicParams.getEnvironment());
445             props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
446
447             if (busTopicParams.getPartner() != null) {
448                 props.setProperty("Partner", busTopicParams.getPartner());
449             }
450             if (dme2RouteOffer != null) {
451                 props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
452             }
453
454             props.setProperty("Latitude", busTopicParams.getLatitude());
455             props.setProperty("Longitude", busTopicParams.getLongitude());
456
457             /* These are optional, will default to these values if not set in additionalProps */
458             props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
459             props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
460             props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
461             props.setProperty("Version", "1.0");
462             props.setProperty("SubContextPath", "/");
463             props.setProperty("sessionstickinessrequired", "no");
464
465             /* These should not change */
466             props.setProperty("TransportType", "DME2");
467             props.setProperty("MethodType", "GET");
468
469             if (busTopicParams.isUseHttps()) {
470                 props.setProperty(PROTOCOL_PROP, "https");
471
472             } else {
473                 props.setProperty(PROTOCOL_PROP, "http");
474             }
475
476             props.setProperty("contenttype", "application/json");
477
478             if (busTopicParams.isAdditionalPropsValid()) {
479                 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
480                     props.put(entry.getKey(), entry.getValue());
481                 }
482             }
483
484             MRClientFactory.prop = props;
485             this.consumer.setProps(props);
486
487             logger.info("{}: CREATION", this);
488         }
489
490         private IllegalArgumentException parmException(String topic, String propnm) {
491             return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
492                     + topic + propnm + " property for DME2 in DMaaP");
493
494         }
495     }
496 }
497
498