2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7 * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
8 * Modifications Copyright (C) 2022-2023 Nordix Foundation.
9 * ================================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
21 * ============LICENSE_END=========================================================
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
26 import com.att.nsa.cambria.client.CambriaClientBuilders;
27 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
28 import com.att.nsa.cambria.client.CambriaConsumer;
29 import java.io.IOException;
30 import java.net.MalformedURLException;
31 import java.security.GeneralSecurityException;
32 import java.time.Duration;
33 import java.util.ArrayList;
34 import java.util.Collections;
35 import java.util.List;
37 import java.util.Properties;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.TimeUnit;
41 import org.apache.commons.lang3.StringUtils;
42 import org.apache.kafka.clients.consumer.ConsumerConfig;
43 import org.apache.kafka.clients.consumer.ConsumerRecord;
44 import org.apache.kafka.clients.consumer.ConsumerRecords;
45 import org.apache.kafka.clients.consumer.KafkaConsumer;
46 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
47 import org.apache.kafka.common.TopicPartition;
48 import org.jetbrains.annotations.NotNull;
49 import org.onap.dmaap.mr.client.MRClientFactory;
50 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
51 import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
52 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
53 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
54 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
59 * Wrapper around libraries to consume from message bus.
61 public interface BusConsumer {
66 * @return list of messages
67 * @throws IOException when error encountered by underlying libraries
69 public Iterable<String> fetch() throws IOException;
72 * close underlying library consumer.
77 * Consumer that handles fetch() failures by sleeping.
79 abstract class FetchingBusConsumer implements BusConsumer {
80 private static final Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class);
85 protected int fetchTimeout;
88 * Time to sleep on a fetch failure.
91 private final int sleepTime;
94 * Counted down when {@link #close()} is invoked.
96 private final CountDownLatch closeCondition = new CountDownLatch(1);
100 * Constructs the object.
102 * @param busTopicParams parameters for the bus topic
104 protected FetchingBusConsumer(BusTopicParams busTopicParams) {
105 this.fetchTimeout = busTopicParams.getFetchTimeout();
107 if (this.fetchTimeout <= 0) {
108 this.sleepTime = PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH;
110 // don't sleep too long, even if fetch timeout is large
111 this.sleepTime = Math.min(this.fetchTimeout, PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
116 * Causes the thread to sleep; invoked after fetch() fails. If the consumer is closed,
117 * or the thread is interrupted, then this will return immediately.
119 protected void sleepAfterFetchFailure() {
121 logger.info("{}: backoff for {}ms", this, sleepTime);
122 if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) {
123 logger.info("{}: closed while handling fetch error", this);
126 } catch (InterruptedException e) {
127 logger.warn("{}: interrupted while handling fetch error", this, e);
128 Thread.currentThread().interrupt();
133 public void close() {
134 this.closeCondition.countDown();
139 * Cambria based consumer.
141 public static class CambriaConsumerWrapper extends FetchingBusConsumer {
146 private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
149 * Used to build the consumer.
151 private final ConsumerBuilder builder;
156 private final CambriaConsumer consumer;
159 * Cambria Consumer Wrapper.
160 * BusTopicParam object contains the following parameters
161 * servers - messaging bus hosts.
162 * topic - topic for messages
164 * apiSecret - API Secret
165 * consumerGroup - Consumer Group
166 * consumerInstance - Consumer Instance
167 * fetchTimeout - Fetch Timeout
168 * fetchLimit - Fetch Limit
170 * @param busTopicParams - The parameters for the bus topic
172 public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
173 super(busTopicParams);
175 this.builder = new CambriaClientBuilders.ConsumerBuilder();
177 builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
178 .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
179 .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
181 // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
182 builder.withSocketTimeout(fetchTimeout + 30000);
184 if (busTopicParams.isUseHttps()) {
185 builder.usingHttps();
187 if (busTopicParams.isAllowSelfSignedCerts()) {
188 builder.allowSelfSignedCertificates();
192 if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
193 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
196 if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
197 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
201 this.consumer = builder.build();
202 } catch (MalformedURLException | GeneralSecurityException e) {
203 throw new IllegalArgumentException(e);
208 public Iterable<String> fetch() throws IOException {
210 return this.consumer.fetch();
211 } catch (final IOException e) { //NOSONAR
212 logger.error("{}: cannot fetch because of {}", this, e.getMessage());
213 sleepAfterFetchFailure();
219 public void close() {
221 this.consumer.close();
225 public String toString() {
226 return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
231 * Kafka based consumer.
233 class KafkaConsumerWrapper extends FetchingBusConsumer {
238 private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
240 private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
245 protected KafkaConsumer<String, String> consumer;
246 protected Properties kafkaProps;
249 * Kafka Consumer Wrapper.
250 * BusTopicParam - object contains the following parameters
251 * servers - messaging bus hosts.
254 * @param busTopicParams - The parameters for the bus topic
256 public KafkaConsumerWrapper(BusTopicParams busTopicParams) {
257 super(busTopicParams);
259 if (busTopicParams.isTopicInvalid()) {
260 throw new IllegalArgumentException("No topic for Kafka");
263 //Setup Properties for consumer
264 kafkaProps = new Properties();
265 kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
266 busTopicParams.getServers().get(0));
268 if (busTopicParams.isAdditionalPropsValid()) {
269 kafkaProps.putAll(busTopicParams.getAdditionalProps());
272 if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) {
273 kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
275 if (kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) == null) {
276 kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
278 if (kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG) == null) {
279 kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, busTopicParams.getConsumerGroup());
281 consumer = new KafkaConsumer<>(kafkaProps);
282 //Subscribe to the topic
283 consumer.subscribe(List.of(busTopicParams.getTopic()));
287 public Iterable<String> fetch() {
288 ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(fetchTimeout));
289 if (records == null || records.count() <= 0) {
290 return Collections.emptyList();
292 List<String> messages = new ArrayList<>(records.count());
294 for (TopicPartition partition : records.partitions()) {
295 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
296 for (ConsumerRecord<String, String> partitionRecord : partitionRecords) {
297 messages.add(partitionRecord.value());
299 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
300 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
302 } catch (Exception e) {
303 logger.error("{}: cannot fetch, throwing exception after sleep...", this);
304 sleepAfterFetchFailure();
311 public void close() {
313 this.consumer.close();
314 logger.info("Kafka Consumer exited {}", this);
318 public String toString() {
319 return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
326 public abstract class DmaapConsumerWrapper extends FetchingBusConsumer {
331 private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
334 * Name of the "protocol" property.
336 protected static final String PROTOCOL_PROP = "Protocol";
341 protected MRConsumerImpl consumer;
344 * MR Consumer Wrapper.
346 * <p>servers - messaging bus hosts
349 * apiSecret - API Secret
350 * username - AAF Login
351 * password - AAF Password
352 * consumerGroup - Consumer Group
353 * consumerInstance - Consumer Instance
354 * fetchTimeout - Fetch Timeout
355 * fetchLimit - Fetch Limit
357 * @param busTopicParams contains above listed attributes
358 * @throws MalformedURLException URL should be valid
360 protected DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
361 super(busTopicParams);
363 if (busTopicParams.isTopicInvalid()) {
364 throw new IllegalArgumentException("No topic for DMaaP");
367 this.consumer = new MRConsumerImplBuilder()
368 .setHostPart(busTopicParams.getServers())
369 .setTopic(busTopicParams.getTopic())
370 .setConsumerGroup(busTopicParams.getConsumerGroup())
371 .setConsumerId(busTopicParams.getConsumerInstance())
372 .setTimeoutMs(busTopicParams.getFetchTimeout())
373 .setLimit(busTopicParams.getFetchLimit())
374 .setApiKey(busTopicParams.getApiKey())
375 .setApiSecret(busTopicParams.getApiSecret())
376 .createMRConsumerImpl();
378 this.consumer.setUsername(busTopicParams.getUserName());
379 this.consumer.setPassword(busTopicParams.getPassword());
383 public Iterable<String> fetch() {
384 final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
385 if (response == null) {
386 logger.warn("{}: DMaaP NULL response received", this);
388 sleepAfterFetchFailure();
389 return new ArrayList<>();
391 logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
392 response.getResponseMessage());
394 if (!"200".equals(response.getResponseCode())) {
396 logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
397 response.getResponseMessage());
399 sleepAfterFetchFailure();
405 if (response.getActualMessages() == null) {
406 return new ArrayList<>();
408 return response.getActualMessages();
413 public void close() {
415 this.consumer.close();
419 public String toString() {
420 return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
421 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
422 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
423 + consumer.getUsername() + "]";
430 class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
432 private static final Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
435 * BusTopicParams contain the following parameters.
436 * MR Consumer Wrapper.
438 * <p>servers messaging bus hosts
441 * apiSecret - API Secret
442 * aafLogin - AAF Login
443 * aafPassword - AAF Password
444 * consumerGroup - Consumer Group
445 * consumerInstance - Consumer Instance
446 * fetchTimeout - Fetch Timeout
447 * fetchLimit - Fetch Limit
449 * @param busTopicParams contains above listed params
450 * @throws MalformedURLException URL should be valid
452 public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
454 super(busTopicParams);
456 // super constructor sets servers = {""} if empty to avoid errors when using DME2
457 if (busTopicParams.isServersInvalid()) {
458 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
461 this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
463 Properties props = new Properties();
465 if (busTopicParams.isUseHttps()) {
466 props.setProperty(PROTOCOL_PROP, "https");
467 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
470 props.setProperty(PROTOCOL_PROP, "http");
471 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
474 this.consumer.setProps(props);
475 logger.info("{}: CREATION", this);
479 public String toString() {
480 final MRConsumerImpl consumer = this.consumer;
482 return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
483 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
484 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
485 + consumer.getUsername() + "]";
489 class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
491 private static final Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
496 * @param busTopicParams topic parameters
497 * @throws MalformedURLException must provide a valid URL
499 public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
502 super(busTopicParams);
505 final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
506 ? busTopicParams.getAdditionalProps().get(
507 PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
510 BusHelper.validateBusTopicParams(busTopicParams, PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
512 if ((busTopicParams.isPartnerInvalid())
513 && StringUtils.isBlank(dme2RouteOffer)) {
514 throw new IllegalArgumentException(
515 "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
516 + "." + busTopicParams.getTopic()
517 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
518 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
519 + busTopicParams.getTopic()
520 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
523 final String serviceName = busTopicParams.getServers().get(0);
525 this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
527 this.consumer.setUsername(busTopicParams.getUserName());
528 this.consumer.setPassword(busTopicParams.getPassword());
530 Properties props = getProperties(busTopicParams, serviceName, dme2RouteOffer);
532 MRClientFactory.prop = props;
533 this.consumer.setProps(props);
535 logger.info("{}: CREATION", this);
539 private static Properties getProperties(BusTopicParams busTopicParams, String serviceName,
540 String dme2RouteOffer) {
541 Properties props = new Properties();
543 props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
545 props.setProperty("username", busTopicParams.getUserName());
546 props.setProperty("password", busTopicParams.getPassword());
548 /* These are required, no defaults */
549 props.setProperty("topic", busTopicParams.getTopic());
551 BusHelper.setCommonProperties(busTopicParams, dme2RouteOffer, props);
553 props.setProperty("MethodType", "GET");
555 if (busTopicParams.isUseHttps()) {
556 props.setProperty(PROTOCOL_PROP, "https");
559 props.setProperty(PROTOCOL_PROP, "http");
562 props.setProperty("contenttype", "application/json");
564 if (busTopicParams.isAdditionalPropsValid()) {
565 props.putAll(busTopicParams.getAdditionalProps());