8d88b0d9b3ca9edf43ff56c5b676435cdbd57356
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
8  * Copyright (C) 2022 Nordix Foundation.
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
25
26 import com.att.nsa.cambria.client.CambriaClientBuilders;
27 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
28 import com.att.nsa.cambria.client.CambriaConsumer;
29 import java.io.IOException;
30 import java.net.MalformedURLException;
31 import java.security.GeneralSecurityException;
32 import java.time.Duration;
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.Map;
36 import java.util.Properties;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.TimeUnit;
39 import lombok.Getter;
40 import org.apache.commons.lang3.StringUtils;
41 import org.apache.kafka.clients.consumer.ConsumerConfig;
42 import org.apache.kafka.clients.consumer.ConsumerRecord;
43 import org.apache.kafka.clients.consumer.ConsumerRecords;
44 import org.apache.kafka.clients.consumer.KafkaConsumer;
45 import org.onap.dmaap.mr.client.MRClientFactory;
46 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
47 import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
48 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
49 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
50 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 /**
55  * Wrapper around libraries to consume from message bus.
56  */
57 public interface BusConsumer {
58
59     /**
60      * fetch messages.
61      *
62      * @return list of messages
63      * @throws IOException when error encountered by underlying libraries
64      */
65     public Iterable<String> fetch() throws IOException;
66
67     /**
68      * close underlying library consumer.
69      */
70     public void close();
71
72     /**
73      * Consumer that handles fetch() failures by sleeping.
74      */
75     public abstract static class FetchingBusConsumer implements BusConsumer {
76         private static Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class);
77
78         /**
79          * Fetch timeout.
80          */
81         protected int fetchTimeout;
82
83         /**
84          * Time to sleep on a fetch failure.
85          */
86         @Getter
87         private final int sleepTime;
88
89         /**
90          * Counted down when {@link #close()} is invoked.
91          */
92         private final CountDownLatch closeCondition = new CountDownLatch(1);
93
94
95         /**
96          * Constructs the object.
97          *
98          * @param busTopicParams parameters for the bus topic
99          */
100         protected FetchingBusConsumer(BusTopicParams busTopicParams) {
101             this.fetchTimeout = busTopicParams.getFetchTimeout();
102
103             if (this.fetchTimeout <= 0) {
104                 this.sleepTime = PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH;
105             } else {
106                 // don't sleep too long, even if fetch timeout is large
107                 this.sleepTime = Math.min(this.fetchTimeout, PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
108             }
109         }
110
111         /**
112          * Causes the thread to sleep; invoked after fetch() fails.  If the consumer is closed,
113          * or the thread is interrupted, then this will return immediately.
114          */
115         protected void sleepAfterFetchFailure() {
116             try {
117                 logger.info("{}: backoff for {}ms", this, sleepTime);
118                 if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) {
119                     logger.info("{}: closed while handling fetch error", this);
120                 }
121
122             } catch (InterruptedException e) {
123                 logger.warn("{}: interrupted while handling fetch error", this, e);
124                 Thread.currentThread().interrupt();
125             }
126         }
127
128         @Override
129         public void close() {
130             this.closeCondition.countDown();
131         }
132     }
133
134     /**
135      * Cambria based consumer.
136      */
137     public static class CambriaConsumerWrapper extends FetchingBusConsumer {
138
139         /**
140          * logger.
141          */
142         private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
143
144         /**
145          * Used to build the consumer.
146          */
147         private final ConsumerBuilder builder;
148
149         /**
150          * Cambria client.
151          */
152         private final CambriaConsumer consumer;
153
154         /**
155          * Cambria Consumer Wrapper.
156          * BusTopicParam object contains the following parameters
157          * servers messaging bus hosts.
158          * topic topic
159          * apiKey API Key
160          * apiSecret API Secret
161          * consumerGroup Consumer Group
162          * consumerInstance Consumer Instance
163          * fetchTimeout Fetch Timeout
164          * fetchLimit Fetch Limit
165          *
166          * @param busTopicParams - The parameters for the bus topic
167          * @throws GeneralSecurityException - Security exception
168          * @throws MalformedURLException - Malformed URL exception
169          */
170         public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
171             super(busTopicParams);
172
173             this.builder = new CambriaClientBuilders.ConsumerBuilder();
174
175             builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
176                     .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
177                     .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
178
179             // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
180             builder.withSocketTimeout(fetchTimeout + 30000);
181
182             if (busTopicParams.isUseHttps()) {
183                 builder.usingHttps();
184
185                 if (busTopicParams.isAllowSelfSignedCerts()) {
186                     builder.allowSelfSignedCertificates();
187                 }
188             }
189
190             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
191                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
192             }
193
194             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
195                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
196             }
197
198             try {
199                 this.consumer = builder.build();
200             } catch (MalformedURLException | GeneralSecurityException e) {
201                 throw new IllegalArgumentException(e);
202             }
203         }
204
205         @Override
206         public Iterable<String> fetch() throws IOException {
207             try {
208                 return this.consumer.fetch();
209             } catch (final IOException e) { //NOSONAR
210                 logger.error("{}: cannot fetch because of {}", this, e.getMessage());
211                 sleepAfterFetchFailure();
212                 throw e;
213             }
214         }
215
216         @Override
217         public void close() {
218             super.close();
219             this.consumer.close();
220         }
221
222         @Override
223         public String toString() {
224             return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
225         }
226     }
227
228     /**
229      * Kafka based consumer.
230      */
231     public static class KafkaConsumerWrapper extends FetchingBusConsumer {
232
233         /**
234          * logger.
235          */
236         private static Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
237
238         /**
239          * Kafka consumer.
240          */
241         private KafkaConsumer<String, String> consumer;
242
243         /**
244          * Kafka Consumer Wrapper.
245          * BusTopicParam object contains the following parameters
246          * servers messaging bus hosts.
247          * topic topic
248          *
249          * @param busTopicParams - The parameters for the bus topic
250          * @throws GeneralSecurityException - Security exception
251          * @throws MalformedURLException - Malformed URL exception
252          */
253         public KafkaConsumerWrapper(BusTopicParams busTopicParams) {
254             super(busTopicParams);
255         }
256
257         @Override
258         public Iterable<String> fetch() throws IOException {
259             // TODO: Not implemented yet
260             return new ArrayList<>();
261         }
262
263         @Override
264         public void close() {
265             super.close();
266             this.consumer.close();
267         }
268
269         @Override
270         public String toString() {
271             return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
272         }
273     }
274
275     /**
276      * MR based consumer.
277      */
278     public abstract class DmaapConsumerWrapper extends FetchingBusConsumer {
279
280         /**
281          * logger.
282          */
283         private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
284
285         /**
286          * Name of the "protocol" property.
287          */
288         protected static final String PROTOCOL_PROP = "Protocol";
289
290         /**
291          * MR Consumer.
292          */
293         protected MRConsumerImpl consumer;
294
295         /**
296          * MR Consumer Wrapper.
297          *
298          * <p>servers          messaging bus hosts
299          * topic            topic
300          * apiKey           API Key
301          * apiSecret        API Secret
302          * username         AAF Login
303          * password         AAF Password
304          * consumerGroup    Consumer Group
305          * consumerInstance Consumer Instance
306          * fetchTimeout     Fetch Timeout
307          * fetchLimit       Fetch Limit
308          *
309          * @param busTopicParams contains above listed attributes
310          * @throws MalformedURLException URL should be valid
311          */
312         protected DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
313             super(busTopicParams);
314
315             if (busTopicParams.isTopicInvalid()) {
316                 throw new IllegalArgumentException("No topic for DMaaP");
317             }
318
319             this.consumer = new MRConsumerImplBuilder()
320                             .setHostPart(busTopicParams.getServers())
321                             .setTopic(busTopicParams.getTopic())
322                             .setConsumerGroup(busTopicParams.getConsumerGroup())
323                             .setConsumerId(busTopicParams.getConsumerInstance())
324                             .setTimeoutMs(busTopicParams.getFetchTimeout())
325                             .setLimit(busTopicParams.getFetchLimit())
326                             .setApiKey(busTopicParams.getApiKey())
327                             .setApiSecret(busTopicParams.getApiSecret())
328                             .createMRConsumerImpl();
329
330             this.consumer.setUsername(busTopicParams.getUserName());
331             this.consumer.setPassword(busTopicParams.getPassword());
332         }
333
334         @Override
335         public Iterable<String> fetch() throws IOException {
336             final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
337             if (response == null) {
338                 logger.warn("{}: DMaaP NULL response received", this);
339
340                 sleepAfterFetchFailure();
341                 return new ArrayList<>();
342             } else {
343                 logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
344                         response.getResponseMessage());
345
346                 if (!"200".equals(response.getResponseCode())) {
347
348                     logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
349                             response.getResponseMessage());
350
351                     sleepAfterFetchFailure();
352
353                     /* fall through */
354                 }
355             }
356
357             if (response.getActualMessages() == null) {
358                 return new ArrayList<>();
359             } else {
360                 return response.getActualMessages();
361             }
362         }
363
364         @Override
365         public void close() {
366             super.close();
367             this.consumer.close();
368         }
369
370         @Override
371         public String toString() {
372             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
373                     + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
374                     + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
375                     + consumer.getUsername() + "]";
376         }
377     }
378
379     /**
380      * MR based consumer.
381      */
382     public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
383
384         private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
385
386         private final Properties props;
387
388         /**
389          * BusTopicParams contain the following parameters.
390          * MR Consumer Wrapper.
391          *
392          * <p>servers messaging bus hosts
393          * topic topic
394          * apiKey API Key
395          * apiSecret API Secret
396          * aafLogin AAF Login
397          * aafPassword AAF Password
398          * consumerGroup Consumer Group
399          * consumerInstance Consumer Instance
400          * fetchTimeout Fetch Timeout
401          * fetchLimit Fetch Limit
402          *
403          * @param busTopicParams contains above listed params
404          * @throws MalformedURLException URL should be valid
405          */
406         public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
407
408             super(busTopicParams);
409
410             // super constructor sets servers = {""} if empty to avoid errors when using DME2
411             if (busTopicParams.isServersInvalid()) {
412                 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
413             }
414
415             this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
416
417             props = new Properties();
418
419             if (busTopicParams.isUseHttps()) {
420                 props.setProperty(PROTOCOL_PROP, "https");
421                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
422
423             } else {
424                 props.setProperty(PROTOCOL_PROP, "http");
425                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
426             }
427
428             this.consumer.setProps(props);
429             logger.info("{}: CREATION", this);
430         }
431
432         @Override
433         public String toString() {
434             final MRConsumerImpl consumer = this.consumer;
435
436             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
437                     + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
438                     + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
439                     + consumer.getUsername() + "]";
440         }
441     }
442
443     public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
444
445         private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
446
447         private final Properties props;
448
449         /**
450          * Constructor.
451          *
452          * @param busTopicParams topic paramters
453          *
454          * @throws MalformedURLException must provide a valid URL
455          */
456         public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
457
458
459             super(busTopicParams);
460
461
462             final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
463                             ? busTopicParams.getAdditionalProps().get(
464                                             PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
465                             : null);
466
467             if (busTopicParams.isEnvironmentInvalid()) {
468                 throw parmException(busTopicParams.getTopic(),
469                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
470             }
471             if (busTopicParams.isAftEnvironmentInvalid()) {
472                 throw parmException(busTopicParams.getTopic(),
473                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
474             }
475             if (busTopicParams.isLatitudeInvalid()) {
476                 throw parmException(busTopicParams.getTopic(),
477                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
478             }
479             if (busTopicParams.isLongitudeInvalid()) {
480                 throw parmException(busTopicParams.getTopic(),
481                         PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
482             }
483
484             if ((busTopicParams.isPartnerInvalid())
485                     && StringUtils.isBlank(dme2RouteOffer)) {
486                 throw new IllegalArgumentException(
487                         "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
488                                 + "." + busTopicParams.getTopic()
489                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
490                                 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
491                                 + busTopicParams.getTopic()
492                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
493             }
494
495             final String serviceName = busTopicParams.getServers().get(0);
496
497             this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
498
499             this.consumer.setUsername(busTopicParams.getUserName());
500             this.consumer.setPassword(busTopicParams.getPassword());
501
502             props = new Properties();
503
504             props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
505
506             props.setProperty("username", busTopicParams.getUserName());
507             props.setProperty("password", busTopicParams.getPassword());
508
509             /* These are required, no defaults */
510             props.setProperty("topic", busTopicParams.getTopic());
511
512             props.setProperty("Environment", busTopicParams.getEnvironment());
513             props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
514
515             if (busTopicParams.getPartner() != null) {
516                 props.setProperty("Partner", busTopicParams.getPartner());
517             }
518             if (dme2RouteOffer != null) {
519                 props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
520             }
521
522             props.setProperty("Latitude", busTopicParams.getLatitude());
523             props.setProperty("Longitude", busTopicParams.getLongitude());
524
525             /* These are optional, will default to these values if not set in additionalProps */
526             props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
527             props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
528             props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
529             props.setProperty("Version", "1.0");
530             props.setProperty("SubContextPath", "/");
531             props.setProperty("sessionstickinessrequired", "no");
532
533             /* These should not change */
534             props.setProperty("TransportType", "DME2");
535             props.setProperty("MethodType", "GET");
536
537             if (busTopicParams.isUseHttps()) {
538                 props.setProperty(PROTOCOL_PROP, "https");
539
540             } else {
541                 props.setProperty(PROTOCOL_PROP, "http");
542             }
543
544             props.setProperty("contenttype", "application/json");
545
546             if (busTopicParams.isAdditionalPropsValid()) {
547                 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
548                     props.put(entry.getKey(), entry.getValue());
549                 }
550             }
551
552             MRClientFactory.prop = props;
553             this.consumer.setProps(props);
554
555             logger.info("{}: CREATION", this);
556         }
557
558         private IllegalArgumentException parmException(String topic, String propnm) {
559             return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
560                     + topic + propnm + " property for DME2 in DMaaP");
561
562         }
563     }
564 }
565
566