d6fa06457565fc938e966887e49facba6cb5e105
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
8  * Modifications Copyright (C) 2022-2024 Nordix Foundation.
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
25
26 import com.att.nsa.cambria.client.CambriaClientBuilders;
27 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
28 import com.att.nsa.cambria.client.CambriaConsumer;
29 import io.opentelemetry.api.trace.Span;
30 import io.opentelemetry.api.trace.SpanContext;
31 import io.opentelemetry.api.trace.TraceFlags;
32 import io.opentelemetry.api.trace.TraceState;
33 import io.opentelemetry.context.Context;
34 import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor;
35 import java.io.IOException;
36 import java.net.MalformedURLException;
37 import java.nio.charset.StandardCharsets;
38 import java.security.GeneralSecurityException;
39 import java.time.Duration;
40 import java.util.ArrayList;
41 import java.util.Collections;
42 import java.util.List;
43 import java.util.Properties;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.TimeUnit;
46 import lombok.Data;
47 import lombok.Getter;
48 import lombok.NoArgsConstructor;
49 import org.apache.commons.lang3.StringUtils;
50 import org.apache.kafka.clients.consumer.ConsumerConfig;
51 import org.apache.kafka.clients.consumer.ConsumerRecord;
52 import org.apache.kafka.clients.consumer.ConsumerRecords;
53 import org.apache.kafka.clients.consumer.KafkaConsumer;
54 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
55 import org.apache.kafka.common.TopicPartition;
56 import org.apache.kafka.common.header.Headers;
57 import org.jetbrains.annotations.NotNull;
58 import org.onap.dmaap.mr.client.MRClientFactory;
59 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
60 import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
61 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
62 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
63 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66
67 /**
68  * Wrapper around libraries to consume from message bus.
69  */
70 public interface BusConsumer {
71
72     /**
73      * fetch messages.
74      *
75      * @return list of messages
76      * @throws IOException when error encountered by underlying libraries
77      */
78     public Iterable<String> fetch() throws IOException;
79
80     /**
81      * close underlying library consumer.
82      */
83     public void close();
84
85     /**
86      * Consumer that handles fetch() failures by sleeping.
87      */
88     abstract class FetchingBusConsumer implements BusConsumer {
89         private static final Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class);
90
91         /**
92          * Fetch timeout.
93          */
94         protected int fetchTimeout;
95
96         /**
97          * Time to sleep on a fetch failure.
98          */
99         @Getter
100         private final int sleepTime;
101
102         /**
103          * Counted down when {@link #close()} is invoked.
104          */
105         private final CountDownLatch closeCondition = new CountDownLatch(1);
106
107
108         /**
109          * Constructs the object.
110          *
111          * @param busTopicParams parameters for the bus topic
112          */
113         protected FetchingBusConsumer(BusTopicParams busTopicParams) {
114             this.fetchTimeout = busTopicParams.getFetchTimeout();
115
116             if (this.fetchTimeout <= 0) {
117                 this.sleepTime = PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH;
118             } else {
119                 // don't sleep too long, even if fetch timeout is large
120                 this.sleepTime = Math.min(this.fetchTimeout, PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
121             }
122         }
123
124         /**
125          * Causes the thread to sleep; invoked after fetch() fails.  If the consumer is closed,
126          * or the thread is interrupted, then this will return immediately.
127          */
128         protected void sleepAfterFetchFailure() {
129             try {
130                 logger.info("{}: backoff for {}ms", this, sleepTime);
131                 if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) {
132                     logger.info("{}: closed while handling fetch error", this);
133                 }
134
135             } catch (InterruptedException e) {
136                 logger.warn("{}: interrupted while handling fetch error", this, e);
137                 Thread.currentThread().interrupt();
138             }
139         }
140
141         @Override
142         public void close() {
143             this.closeCondition.countDown();
144         }
145     }
146
147     /**
148      * Cambria based consumer.
149      */
150     public static class CambriaConsumerWrapper extends FetchingBusConsumer {
151
152         /**
153          * logger.
154          */
155         private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
156
157         /**
158          * Used to build the consumer.
159          */
160         private final ConsumerBuilder builder;
161
162         /**
163          * Cambria client.
164          */
165         private final CambriaConsumer consumer;
166
167         /**
168          * Cambria Consumer Wrapper.
169          * BusTopicParam object contains the following parameters
170          * servers - messaging bus hosts.
171          * topic - topic for messages
172          * apiKey - API Key
173          * apiSecret - API Secret
174          * consumerGroup - Consumer Group
175          * consumerInstance - Consumer Instance
176          * fetchTimeout - Fetch Timeout
177          * fetchLimit - Fetch Limit
178          *
179          * @param busTopicParams - The parameters for the bus topic
180          */
181         public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
182             super(busTopicParams);
183
184             this.builder = new CambriaClientBuilders.ConsumerBuilder();
185
186             builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
187                 .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
188                 .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
189
190             // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
191             builder.withSocketTimeout(fetchTimeout + 30000);
192
193             if (busTopicParams.isUseHttps()) {
194                 builder.usingHttps();
195
196                 if (busTopicParams.isAllowSelfSignedCerts()) {
197                     builder.allowSelfSignedCertificates();
198                 }
199             }
200
201             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
202                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
203             }
204
205             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
206                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
207             }
208
209             try {
210                 this.consumer = builder.build();
211             } catch (MalformedURLException | GeneralSecurityException e) {
212                 throw new IllegalArgumentException(e);
213             }
214         }
215
216         @Override
217         public Iterable<String> fetch() throws IOException {
218             try {
219                 return this.consumer.fetch();
220             } catch (final IOException e) { //NOSONAR
221                 logger.error("{}: cannot fetch because of {}", this, e.getMessage());
222                 sleepAfterFetchFailure();
223                 throw e;
224             }
225         }
226
227         @Override
228         public void close() {
229             super.close();
230             this.consumer.close();
231         }
232
233         @Override
234         public String toString() {
235             return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
236         }
237     }
238
239     /**
240      * Kafka based consumer.
241      */
242     class KafkaConsumerWrapper extends FetchingBusConsumer {
243
244         /**
245          * logger.
246          */
247         private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
248
249         private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
250
251         /**
252          * Kafka consumer.
253          */
254         protected KafkaConsumer<String, String> consumer;
255         protected Properties kafkaProps;
256
257         protected boolean allowTracing;
258
259         /**
260          * Kafka Consumer Wrapper.
261          * BusTopicParam - object contains the following parameters
262          * servers - messaging bus hosts.
263          * topic - topic
264          *
265          * @param busTopicParams - The parameters for the bus topic
266          */
267         public KafkaConsumerWrapper(BusTopicParams busTopicParams) {
268             super(busTopicParams);
269
270             if (busTopicParams.isTopicInvalid()) {
271                 throw new IllegalArgumentException("No topic for Kafka");
272             }
273
274             //Setup Properties for consumer
275             kafkaProps = new Properties();
276             kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
277                 busTopicParams.getServers().get(0));
278
279             if (busTopicParams.isAdditionalPropsValid()) {
280                 kafkaProps.putAll(busTopicParams.getAdditionalProps());
281             }
282
283             if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) {
284                 kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
285             }
286             if (kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) == null) {
287                 kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
288             }
289             if (kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG) == null) {
290                 kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, busTopicParams.getConsumerGroup());
291             }
292             if (busTopicParams.isAllowTracing()) {
293                 this.allowTracing = true;
294                 kafkaProps.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
295                         TracingConsumerInterceptor.class.getName());
296             }
297
298             consumer = new KafkaConsumer<>(kafkaProps);
299             //Subscribe to the topic
300             consumer.subscribe(List.of(busTopicParams.getTopic()));
301         }
302
303         @Override
304         public Iterable<String> fetch() {
305             ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(fetchTimeout));
306             if (records == null || records.count() <= 0) {
307                 return Collections.emptyList();
308             }
309             List<String> messages = new ArrayList<>(records.count());
310             try {
311                 if (allowTracing) {
312                     createParentTraceContext(records);
313                 }
314
315                 for (TopicPartition partition : records.partitions()) {
316                     List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
317                     for (ConsumerRecord<String, String> partitionRecord : partitionRecords) {
318                         messages.add(partitionRecord.value());
319                     }
320                     long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
321                     consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
322                 }
323             } catch (Exception e) {
324                 logger.error("{}: cannot fetch, throwing exception after sleep...", this);
325                 sleepAfterFetchFailure();
326                 throw e;
327             }
328             return messages;
329         }
330
331         private void createParentTraceContext(ConsumerRecords<String, String> records) {
332             TraceParentInfo traceParentInfo = new TraceParentInfo();
333             for (ConsumerRecord<String, String> consumerRecord : records) {
334
335                 Headers consumerRecordHeaders = consumerRecord.headers();
336                 traceParentInfo = processTraceParentHeader(consumerRecordHeaders);
337             }
338
339             SpanContext spanContext = SpanContext.createFromRemoteParent(
340                     traceParentInfo.getTraceId(), traceParentInfo.getSpanId(),
341                     TraceFlags.getSampled(), TraceState.builder().build());
342
343             Context.current().with(Span.wrap(spanContext)).makeCurrent();
344         }
345
346         private TraceParentInfo processTraceParentHeader(Headers headers) {
347             TraceParentInfo traceParentInfo = new TraceParentInfo();
348             if (headers.lastHeader("traceparent") != null) {
349                 traceParentInfo.setParentTraceId(new String(headers.lastHeader(
350                         "traceparent").value(), StandardCharsets.UTF_8));
351
352                 String[] parts = traceParentInfo.getParentTraceId().split("-");
353                 traceParentInfo.setTraceId(parts[1]);
354                 traceParentInfo.setSpanId(parts[2]);
355             }
356
357             return traceParentInfo;
358         }
359
360         @Data
361         @NoArgsConstructor
362         private static class TraceParentInfo {
363             private String parentTraceId;
364             private String traceId;
365             private String spanId;
366         }
367
368         @Override
369         public void close() {
370             super.close();
371             this.consumer.close();
372             logger.info("Kafka Consumer exited {}", this);
373         }
374
375         @Override
376         public String toString() {
377             return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
378         }
379     }
380
381     /**
382      * MR based consumer.
383      */
384     public abstract class DmaapConsumerWrapper extends FetchingBusConsumer {
385
386         /**
387          * logger.
388          */
389         private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
390
391         /**
392          * Name of the "protocol" property.
393          */
394         protected static final String PROTOCOL_PROP = "Protocol";
395
396         /**
397          * MR Consumer.
398          */
399         protected MRConsumerImpl consumer;
400
401         /**
402          * MR Consumer Wrapper.
403          *
404          * <p>servers - messaging bus hosts
405          * topic - topic
406          * apiKey - API Key
407          * apiSecret - API Secret
408          * username - AAF Login
409          * password - AAF Password
410          * consumerGroup - Consumer Group
411          * consumerInstance - Consumer Instance
412          * fetchTimeout - Fetch Timeout
413          * fetchLimit - Fetch Limit
414          *
415          * @param busTopicParams contains above listed attributes
416          * @throws MalformedURLException URL should be valid
417          */
418         protected DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
419             super(busTopicParams);
420
421             if (busTopicParams.isTopicInvalid()) {
422                 throw new IllegalArgumentException("No topic for DMaaP");
423             }
424
425             this.consumer = new MRConsumerImplBuilder()
426                 .setHostPart(busTopicParams.getServers())
427                 .setTopic(busTopicParams.getTopic())
428                 .setConsumerGroup(busTopicParams.getConsumerGroup())
429                 .setConsumerId(busTopicParams.getConsumerInstance())
430                 .setTimeoutMs(busTopicParams.getFetchTimeout())
431                 .setLimit(busTopicParams.getFetchLimit())
432                 .setApiKey(busTopicParams.getApiKey())
433                 .setApiSecret(busTopicParams.getApiSecret())
434                 .createMRConsumerImpl();
435
436             this.consumer.setUsername(busTopicParams.getUserName());
437             this.consumer.setPassword(busTopicParams.getPassword());
438         }
439
440         @Override
441         public Iterable<String> fetch() {
442             final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
443             if (response == null) {
444                 logger.warn("{}: DMaaP NULL response received", this);
445
446                 sleepAfterFetchFailure();
447                 return new ArrayList<>();
448             } else {
449                 logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
450                     response.getResponseMessage());
451
452                 if (!"200".equals(response.getResponseCode())) {
453
454                     logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
455                         response.getResponseMessage());
456
457                     sleepAfterFetchFailure();
458
459                     /* fall through */
460                 }
461             }
462
463             if (response.getActualMessages() == null) {
464                 return new ArrayList<>();
465             } else {
466                 return response.getActualMessages();
467             }
468         }
469
470         @Override
471         public void close() {
472             super.close();
473             this.consumer.close();
474         }
475
476         @Override
477         public String toString() {
478             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
479                 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
480                 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
481                 + consumer.getUsername() + "]";
482         }
483     }
484
485     /**
486      * MR based consumer.
487      */
488     class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
489
490         private static final Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
491
492         /**
493          * BusTopicParams contain the following parameters.
494          * MR Consumer Wrapper.
495          *
496          * <p>servers messaging bus hosts
497          * topic - topic
498          * apiKey - API Key
499          * apiSecret - API Secret
500          * aafLogin - AAF Login
501          * aafPassword - AAF Password
502          * consumerGroup - Consumer Group
503          * consumerInstance - Consumer Instance
504          * fetchTimeout - Fetch Timeout
505          * fetchLimit - Fetch Limit
506          *
507          * @param busTopicParams contains above listed params
508          * @throws MalformedURLException URL should be valid
509          */
510         public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
511
512             super(busTopicParams);
513
514             // super constructor sets servers = {""} if empty to avoid errors when using DME2
515             if (busTopicParams.isServersInvalid()) {
516                 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
517             }
518
519             this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
520
521             Properties props = new Properties();
522
523             if (busTopicParams.isUseHttps()) {
524                 props.setProperty(PROTOCOL_PROP, "https");
525                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
526
527             } else {
528                 props.setProperty(PROTOCOL_PROP, "http");
529                 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
530             }
531
532             this.consumer.setProps(props);
533             logger.info("{}: CREATION", this);
534         }
535
536         @Override
537         public String toString() {
538             final MRConsumerImpl consumer = this.consumer;
539
540             return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
541                 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
542                 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
543                 + consumer.getUsername() + "]";
544         }
545     }
546
547     class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
548
549         private static final Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
550
551         /**
552          * Constructor.
553          *
554          * @param busTopicParams topic parameters
555          * @throws MalformedURLException must provide a valid URL
556          */
557         public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
558
559
560             super(busTopicParams);
561
562
563             final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
564                 ? busTopicParams.getAdditionalProps().get(
565                 PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
566                 : null);
567
568             BusHelper.validateBusTopicParams(busTopicParams, PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
569
570             if ((busTopicParams.isPartnerInvalid())
571                 && StringUtils.isBlank(dme2RouteOffer)) {
572                 throw new IllegalArgumentException(
573                     "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
574                         + "." + busTopicParams.getTopic()
575                         + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
576                         + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
577                         + busTopicParams.getTopic()
578                         + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
579             }
580
581             final String serviceName = busTopicParams.getServers().get(0);
582
583             this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
584
585             this.consumer.setUsername(busTopicParams.getUserName());
586             this.consumer.setPassword(busTopicParams.getPassword());
587
588             Properties props = getProperties(busTopicParams, serviceName, dme2RouteOffer);
589
590             MRClientFactory.prop = props;
591             this.consumer.setProps(props);
592
593             logger.info("{}: CREATION", this);
594         }
595
596         @NotNull
597         private static Properties getProperties(BusTopicParams busTopicParams, String serviceName,
598                                                 String dme2RouteOffer) {
599             Properties props = new Properties();
600
601             props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
602
603             props.setProperty("username", busTopicParams.getUserName());
604             props.setProperty("password", busTopicParams.getPassword());
605
606             /* These are required, no defaults */
607             props.setProperty("topic", busTopicParams.getTopic());
608
609             BusHelper.setCommonProperties(busTopicParams, dme2RouteOffer, props);
610
611             props.setProperty("MethodType", "GET");
612
613             if (busTopicParams.isUseHttps()) {
614                 props.setProperty(PROTOCOL_PROP, "https");
615
616             } else {
617                 props.setProperty(PROTOCOL_PROP, "http");
618             }
619
620             props.setProperty("contenttype", "application/json");
621
622             if (busTopicParams.isAdditionalPropsValid()) {
623                 props.putAll(busTopicParams.getAdditionalProps());
624             }
625             return props;
626         }
627     }
628 }
629
630