79e374a2a0c26fc46824b3d72b86118769c611c0
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
8  * Modifications Copyright (C) 2022-2023 Nordix Foundation.
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
25
26 import com.att.nsa.cambria.client.CambriaClientBuilders;
27 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
28 import com.att.nsa.cambria.client.CambriaConsumer;
29 import java.io.IOException;
30 import java.net.MalformedURLException;
31 import java.security.GeneralSecurityException;
32 import java.time.Duration;
33 import java.util.ArrayList;
34 import java.util.Collections;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Properties;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.TimeUnit;
40 import lombok.Getter;
41 import org.apache.commons.lang3.StringUtils;
42 import org.apache.kafka.clients.consumer.ConsumerConfig;
43 import org.apache.kafka.clients.consumer.ConsumerRecord;
44 import org.apache.kafka.clients.consumer.ConsumerRecords;
45 import org.apache.kafka.clients.consumer.KafkaConsumer;
46 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
47 import org.apache.kafka.common.TopicPartition;
48 import org.jetbrains.annotations.NotNull;
49 import org.onap.dmaap.mr.client.MRClientFactory;
50 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
51 import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
52 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
53 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
54 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 /**
59  * Wrapper around libraries to consume from message bus.
60  */
61 public interface BusConsumer {
62
63     /**
64      * fetch messages.
65      *
66      * @return list of messages
67      * @throws IOException when error encountered by underlying libraries
68      */
69     public Iterable<String> fetch() throws IOException;
70
71     /**
72      * close underlying library consumer.
73      */
74     public void close();
75
76     /**
77      * Consumer that handles fetch() failures by sleeping.
78      */
79     abstract class FetchingBusConsumer implements BusConsumer {
80         private static final Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class);
81
82         /**
83          * Fetch timeout.
84          */
85         protected int fetchTimeout;
86
87         /**
88          * Time to sleep on a fetch failure.
89          */
90         @Getter
91         private final int sleepTime;
92
93         /**
94          * Counted down when {@link #close()} is invoked.
95          */
96         private final CountDownLatch closeCondition = new CountDownLatch(1);
97
98
99         /**
100          * Constructs the object.
101          *
102          * @param busTopicParams parameters for the bus topic
103          */
104         protected FetchingBusConsumer(BusTopicParams busTopicParams) {
105             this.fetchTimeout = busTopicParams.getFetchTimeout();
106
107             if (this.fetchTimeout <= 0) {
108                 this.sleepTime = PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH;
109             } else {
110                 // don't sleep too long, even if fetch timeout is large
111                 this.sleepTime = Math.min(this.fetchTimeout, PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
112             }
113         }
114
115         /**
116          * Causes the thread to sleep; invoked after fetch() fails.  If the consumer is closed,
117          * or the thread is interrupted, then this will return immediately.
118          */
119         protected void sleepAfterFetchFailure() {
120             try {
121                 logger.info("{}: backoff for {}ms", this, sleepTime);
122                 if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) {
123                     logger.info("{}: closed while handling fetch error", this);
124                 }
125
126             } catch (InterruptedException e) {
127                 logger.warn("{}: interrupted while handling fetch error", this, e);
128                 Thread.currentThread().interrupt();
129             }
130         }
131
132         @Override
133         public void close() {
134             this.closeCondition.countDown();
135         }
136     }
137
138     /**
139      * Cambria based consumer.
140      */
141     public static class CambriaConsumerWrapper extends FetchingBusConsumer {
142
143         /**
144          * logger.
145          */
146         private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
147
148         /**
149          * Used to build the consumer.
150          */
151         private final ConsumerBuilder builder;
152
153         /**
154          * Cambria client.
155          */
156         private final CambriaConsumer consumer;
157
158         /**
159          * Cambria Consumer Wrapper.
160          * BusTopicParam object contains the following parameters
161          * servers - messaging bus hosts.
162          * topic - topic for messages
163          * apiKey - API Key
164          * apiSecret - API Secret
165          * consumerGroup - Consumer Group
166          * consumerInstance - Consumer Instance
167          * fetchTimeout - Fetch Timeout
168          * fetchLimit - Fetch Limit
169          *
170          * @param busTopicParams - The parameters for the bus topic
171          */
172         public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
173             super(busTopicParams);
174
175             this.builder = new CambriaClientBuilders.ConsumerBuilder();
176
177             builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
178                 .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
179                 .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
180
181             // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
182             builder.withSocketTimeout(fetchTimeout + 30000);
183
184             if (busTopicParams.isUseHttps()) {
185                 builder.usingHttps();
186
187                 if (busTopicParams.isAllowSelfSignedCerts()) {
188                     builder.allowSelfSignedCertificates();
189                 }
190             }
191
192             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
193                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
194             }
195
196             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
197                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
198             }
199
200             try {
201                 this.consumer = builder.build();
202             } catch (MalformedURLException | GeneralSecurityException e) {
203                 throw new IllegalArgumentException(e);
204             }
205         }
206
207         @Override
208         public Iterable<String> fetch() throws IOException {
209             try {
210                 return this.consumer.fetch();
211             } catch (final IOException e) { //NOSONAR
212                 logger.error("{}: cannot fetch because of {}", this, e.getMessage());
213                 sleepAfterFetchFailure();
214                 throw e;
215             }
216         }
217
218         @Override
219         public void close() {
220             super.close();
221             this.consumer.close();
222         }
223
224         @Override
225         public String toString() {
226             return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
227         }
228     }
229
230     /**
231      * Kafka based consumer.
232      */
233     class KafkaConsumerWrapper extends FetchingBusConsumer {
234
235         /**
236          * logger.
237          */
238         private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
239
240         private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
241
242         /**
243          * Kafka consumer.
244          */
245         protected KafkaConsumer<String, String> consumer;
246         protected Properties kafkaProps;
247
248         /**
249          * Kafka Consumer Wrapper.
250          * BusTopicParam - object contains the following parameters
251          * servers - messaging bus hosts.
252          * topic - topic
253          *
254          * @param busTopicParams - The parameters for the bus topic
255          */
256         public KafkaConsumerWrapper(BusTopicParams busTopicParams) {
257             super(busTopicParams);
258
259             if (busTopicParams.isTopicInvalid()) {
260                 throw new IllegalArgumentException("No topic for Kafka");
261             }
262
263             //Setup Properties for consumer
264             kafkaProps = new Properties();
265             kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
266                 busTopicParams.getServers().get(0));
267
268             if (busTopicParams.isAdditionalPropsValid()) {
269                 kafkaProps.putAll(busTopicParams.getAdditionalProps());
270             }
271
272             if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) {
273                 kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
274             }
275             if (kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) == null) {
276                 kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
277             }
278             if (kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG) == null) {
279                 kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, busTopicParams.getConsumerGroup());
280             }
281             consumer = new KafkaConsumer<>(kafkaProps);
282             //Subscribe to the topic
283             consumer.subscribe(List.of(busTopicParams.getTopic()));
284         }
285
286         @Override
287         public Iterable<String> fetch() {
288             ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(fetchTimeout));
289             if (records == null || records.count() <= 0) {
290                 return Collections.emptyList();
291             }
292             List<String> messages = new ArrayList<>(records.count());
293             try {
294                 for (TopicPartition partition : records.partitions()) {
295                     List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
296                     for (ConsumerRecord<String, String> partitionRecord : partitionRecords) {
297                         messages.add(partitionRecord.value());
298                     }
299                     long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
300                     consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
301                 }
302             } catch (Exception e) {
303                 logger.error("{}: cannot fetch, throwing exception after sleep...", this);
304                 sleepAfterFetchFailure();
305                 throw e;
306             }
307             return messages;
308         }
309
310         @Override
311         public void close() {
312             super.close();
313             this.consumer.close();
314             logger.info("Kafka Consumer exited {}", this);
315         }
316
317         @Override
318         public String toString() {
319             return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
320         }
321     }
322
323     /**
324      * MR based consumer.
325      */
326     public abstract class DmaapConsumerWrapper extends FetchingBusConsumer {
327
328         /**
329          * logger.
330          */
331         private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
332
333         /**
334          * Name of the "protocol" property.
335          */
336         protected static final String PROTOCOL_PROP = "Protocol";
337
338         /**
339          * MR Consumer.
340          */
341         protected MRConsumerImpl consumer;
342
343         /**
344          * MR Consumer Wrapper.
345          *
346          * <p>servers - messaging bus hosts
347          * topic - topic
348          * apiKey - API Key
349          * apiSecret - API Secret
350          * username - AAF Login
351          * password - AAF Password
352          * consumerGroup - Consumer Group
353          * consumerInstance - Consumer Instance
354          * fetchTimeout - Fetch Timeout
355          * fetchLimit - Fetch Limit
356          *
357          * @param busTopicParams contains above listed attributes
358          * @throws MalformedURLException URL should be valid
359          */
360         protected DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
361             super(busTopicParams);
362
363             if (busTopicParams.isTopicInvalid()) {
364                 throw new IllegalArgumentException("No topic for DMaaP");
365             }
366
367             this.consumer = new MRConsumerImplBuilder()
368                 .setHostPart(busTopicParams.getServers())
369                 .setTopic(busTopicParams.getTopic())
370                 .setConsumerGroup(busTopicParams.getConsumerGroup())
371                 .setConsumerId(busTopicParams.getConsumerInstance())
372                 .setTimeoutMs(busTopicParams.getFetchTimeout())
373                 .setLimit(busTopicParams.getFetchLimit())
374                 .setApiKey(busTopicParams.getApiKey())
375                 .setApiSecret(busTopicParams.getApiSecret())
376                 .createMRConsumerImpl();
377
378             this.consumer.setUsername(busTopicParams.getUserName());
379             this.consumer.setPassword(busTopicParams.getPassword());
380         }
381
382         @Override
383         public Iterable<String> fetch() {
384             final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
385             if (response == null) {
386                 logger.warn("{}: DMaaP NULL response received", this);
387
388                 sleepAfterFetchFailure();
389                 return new ArrayList<>();
390             } else {
391                 logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
392                     response.getResponseMessage());
393
394                 if (!"200".equals(response.getResponseCode())) {
395
396                     logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
397                         response.getResponseMessage());
398
399                     sleepAfterFetchFailure();
400
401                     /* fall through */
402                 }
403             }
404
405             if (response.getActualMessages() == null) {
406                 return new ArrayList<>();
407             } else {
408                 return response.getActualMessages();
409             }
410         }
411
412         @Override
413         public void close() {
414             super.close();
415             this.consumer.close();
416         }
417
418         @Override
419         public String toString() {
420             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
421                 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
422                 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
423                 + consumer.getUsername() + "]";
424         }
425     }
426
427     /**
428      * MR based consumer.
429      */
430     class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
431
432         private static final Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
433
434         /**
435          * BusTopicParams contain the following parameters.
436          * MR Consumer Wrapper.
437          *
438          * <p>servers messaging bus hosts
439          * topic - topic
440          * apiKey - API Key
441          * apiSecret - API Secret
442          * aafLogin - AAF Login
443          * aafPassword - AAF Password
444          * consumerGroup - Consumer Group
445          * consumerInstance - Consumer Instance
446          * fetchTimeout - Fetch Timeout
447          * fetchLimit - Fetch Limit
448          *
449          * @param busTopicParams contains above listed params
450          * @throws MalformedURLException URL should be valid
451          */
452         public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
453
454             super(busTopicParams);
455
456             // super constructor sets servers = {""} if empty to avoid errors when using DME2
457             if (busTopicParams.isServersInvalid()) {
458                 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
459             }
460
461             this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
462
463             Properties props = new Properties();
464
465             if (busTopicParams.isUseHttps()) {
466                 props.setProperty(PROTOCOL_PROP, "https");
467                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
468
469             } else {
470                 props.setProperty(PROTOCOL_PROP, "http");
471                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
472             }
473
474             this.consumer.setProps(props);
475             logger.info("{}: CREATION", this);
476         }
477
478         @Override
479         public String toString() {
480             final MRConsumerImpl consumer = this.consumer;
481
482             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
483                 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
484                 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
485                 + consumer.getUsername() + "]";
486         }
487     }
488
489     class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
490
491         private static final Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
492
493         /**
494          * Constructor.
495          *
496          * @param busTopicParams topic parameters
497          * @throws MalformedURLException must provide a valid URL
498          */
499         public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
500
501
502             super(busTopicParams);
503
504
505             final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
506                 ? busTopicParams.getAdditionalProps().get(
507                 PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
508                 : null);
509
510             BusHelper.validateBusTopicParams(busTopicParams, PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
511
512             if ((busTopicParams.isPartnerInvalid())
513                 && StringUtils.isBlank(dme2RouteOffer)) {
514                 throw new IllegalArgumentException(
515                     "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
516                         + "." + busTopicParams.getTopic()
517                         + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
518                         + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
519                         + busTopicParams.getTopic()
520                         + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
521             }
522
523             final String serviceName = busTopicParams.getServers().get(0);
524
525             this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
526
527             this.consumer.setUsername(busTopicParams.getUserName());
528             this.consumer.setPassword(busTopicParams.getPassword());
529
530             Properties props = getProperties(busTopicParams, serviceName, dme2RouteOffer);
531
532             MRClientFactory.prop = props;
533             this.consumer.setProps(props);
534
535             logger.info("{}: CREATION", this);
536         }
537
538         @NotNull
539         private static Properties getProperties(BusTopicParams busTopicParams, String serviceName,
540                                                 String dme2RouteOffer) {
541             Properties props = new Properties();
542
543             props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
544
545             props.setProperty("username", busTopicParams.getUserName());
546             props.setProperty("password", busTopicParams.getPassword());
547
548             /* These are required, no defaults */
549             props.setProperty("topic", busTopicParams.getTopic());
550
551             BusHelper.setCommonProperties(busTopicParams, dme2RouteOffer, props);
552
553             props.setProperty("MethodType", "GET");
554
555             if (busTopicParams.isUseHttps()) {
556                 props.setProperty(PROTOCOL_PROP, "https");
557
558             } else {
559                 props.setProperty(PROTOCOL_PROP, "http");
560             }
561
562             props.setProperty("contenttype", "application/json");
563
564             if (busTopicParams.isAdditionalPropsValid()) {
565                 props.putAll(busTopicParams.getAdditionalProps());
566             }
567             return props;
568         }
569     }
570 }
571
572