2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7 * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
8 * Copyright (C) 2022 Nordix Foundation.
9 * ================================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
21 * ============LICENSE_END=========================================================
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
26 import com.att.nsa.cambria.client.CambriaClientBuilders;
27 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
28 import com.att.nsa.cambria.client.CambriaConsumer;
29 import java.io.IOException;
30 import java.net.MalformedURLException;
31 import java.security.GeneralSecurityException;
32 import java.time.Duration;
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.List;
38 import java.util.Properties;
39 import java.util.concurrent.CountDownLatch;
40 import java.util.concurrent.TimeUnit;
42 import org.apache.commons.lang3.StringUtils;
43 import org.apache.kafka.clients.consumer.ConsumerConfig;
44 import org.apache.kafka.clients.consumer.ConsumerRecord;
45 import org.apache.kafka.clients.consumer.ConsumerRecords;
46 import org.apache.kafka.clients.consumer.KafkaConsumer;
47 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
48 import org.apache.kafka.common.TopicPartition;
49 import org.onap.dmaap.mr.client.MRClientFactory;
50 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
51 import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
52 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
53 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
54 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
59 * Wrapper around libraries to consume from message bus.
61 public interface BusConsumer {
66 * @return list of messages
67 * @throws IOException when error encountered by underlying libraries
69 public Iterable<String> fetch() throws IOException;
72 * close underlying library consumer.
77 * Consumer that handles fetch() failures by sleeping.
79 public abstract static class FetchingBusConsumer implements BusConsumer {
80 private static Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class);
85 protected int fetchTimeout;
88 * Time to sleep on a fetch failure.
91 private final int sleepTime;
94 * Counted down when {@link #close()} is invoked.
96 private final CountDownLatch closeCondition = new CountDownLatch(1);
100 * Constructs the object.
102 * @param busTopicParams parameters for the bus topic
104 protected FetchingBusConsumer(BusTopicParams busTopicParams) {
105 this.fetchTimeout = busTopicParams.getFetchTimeout();
107 if (this.fetchTimeout <= 0) {
108 this.sleepTime = PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH;
110 // don't sleep too long, even if fetch timeout is large
111 this.sleepTime = Math.min(this.fetchTimeout, PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
116 * Causes the thread to sleep; invoked after fetch() fails. If the consumer is closed,
117 * or the thread is interrupted, then this will return immediately.
119 protected void sleepAfterFetchFailure() {
121 logger.info("{}: backoff for {}ms", this, sleepTime);
122 if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) {
123 logger.info("{}: closed while handling fetch error", this);
126 } catch (InterruptedException e) {
127 logger.warn("{}: interrupted while handling fetch error", this, e);
128 Thread.currentThread().interrupt();
133 public void close() {
134 this.closeCondition.countDown();
139 * Cambria based consumer.
141 public static class CambriaConsumerWrapper extends FetchingBusConsumer {
146 private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
149 * Used to build the consumer.
151 private final ConsumerBuilder builder;
156 private final CambriaConsumer consumer;
159 * Cambria Consumer Wrapper.
160 * BusTopicParam object contains the following parameters
161 * servers messaging bus hosts.
164 * apiSecret API Secret
165 * consumerGroup Consumer Group
166 * consumerInstance Consumer Instance
167 * fetchTimeout Fetch Timeout
168 * fetchLimit Fetch Limit
170 * @param busTopicParams - The parameters for the bus topic
171 * @throws GeneralSecurityException - Security exception
172 * @throws MalformedURLException - Malformed URL exception
174 public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
175 super(busTopicParams);
177 this.builder = new CambriaClientBuilders.ConsumerBuilder();
179 builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
180 .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
181 .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
183 // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
184 builder.withSocketTimeout(fetchTimeout + 30000);
186 if (busTopicParams.isUseHttps()) {
187 builder.usingHttps();
189 if (busTopicParams.isAllowSelfSignedCerts()) {
190 builder.allowSelfSignedCertificates();
194 if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
195 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
198 if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
199 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
203 this.consumer = builder.build();
204 } catch (MalformedURLException | GeneralSecurityException e) {
205 throw new IllegalArgumentException(e);
210 public Iterable<String> fetch() throws IOException {
212 return this.consumer.fetch();
213 } catch (final IOException e) { //NOSONAR
214 logger.error("{}: cannot fetch because of {}", this, e.getMessage());
215 sleepAfterFetchFailure();
221 public void close() {
223 this.consumer.close();
227 public String toString() {
228 return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
233 * Kafka based consumer.
235 public static class KafkaConsumerWrapper extends FetchingBusConsumer {
240 private static Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
242 private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
247 protected KafkaConsumer<String, String> consumer;
248 protected Properties kafkaProps;
251 * Kafka Consumer Wrapper.
252 * BusTopicParam object contains the following parameters
253 * servers messaging bus hosts.
256 * @param busTopicParams - The parameters for the bus topic
257 * @throws GeneralSecurityException - Security exception
258 * @throws MalformedURLException - Malformed URL exception
260 public KafkaConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
261 super(busTopicParams);
263 if (busTopicParams.isTopicInvalid()) {
264 throw new IllegalArgumentException("No topic for Kafka");
267 //Setup Properties for consumer
268 kafkaProps = new Properties();
269 kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
270 busTopicParams.getServers().get(0));
272 if (busTopicParams.isAdditionalPropsValid()) {
273 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
274 kafkaProps.put(entry.getKey(), entry.getValue());
278 if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) {
279 kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
281 if (kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) == null) {
282 kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
284 if (kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG) == null) {
285 kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, busTopicParams.getConsumerGroup());
287 consumer = new KafkaConsumer<>(kafkaProps);
288 //Subscribe to the topic
289 consumer.subscribe(Arrays.asList(busTopicParams.getTopic()));
293 public Iterable<String> fetch() throws IOException {
294 ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(fetchTimeout));
295 if (records == null || records.count() <= 0) {
296 return Collections.emptyList();
298 List<String> messages = new ArrayList<>(records.count());
300 for (TopicPartition partition : records.partitions()) {
301 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
302 for (ConsumerRecord<String, String> record : partitionRecords) {
303 messages.add(record.value());
305 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
306 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
308 } catch (Exception e) {
309 logger.error("{}: cannot fetch because of {}", this, e.getMessage());
310 sleepAfterFetchFailure();
317 public void close() {
319 this.consumer.close();
320 logger.info("Kafka Consumer exited {}", this);
324 public String toString() {
325 return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
332 public abstract class DmaapConsumerWrapper extends FetchingBusConsumer {
337 private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
340 * Name of the "protocol" property.
342 protected static final String PROTOCOL_PROP = "Protocol";
347 protected MRConsumerImpl consumer;
350 * MR Consumer Wrapper.
352 * <p>servers messaging bus hosts
355 * apiSecret API Secret
357 * password AAF Password
358 * consumerGroup Consumer Group
359 * consumerInstance Consumer Instance
360 * fetchTimeout Fetch Timeout
361 * fetchLimit Fetch Limit
363 * @param busTopicParams contains above listed attributes
364 * @throws MalformedURLException URL should be valid
366 protected DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
367 super(busTopicParams);
369 if (busTopicParams.isTopicInvalid()) {
370 throw new IllegalArgumentException("No topic for DMaaP");
373 this.consumer = new MRConsumerImplBuilder()
374 .setHostPart(busTopicParams.getServers())
375 .setTopic(busTopicParams.getTopic())
376 .setConsumerGroup(busTopicParams.getConsumerGroup())
377 .setConsumerId(busTopicParams.getConsumerInstance())
378 .setTimeoutMs(busTopicParams.getFetchTimeout())
379 .setLimit(busTopicParams.getFetchLimit())
380 .setApiKey(busTopicParams.getApiKey())
381 .setApiSecret(busTopicParams.getApiSecret())
382 .createMRConsumerImpl();
384 this.consumer.setUsername(busTopicParams.getUserName());
385 this.consumer.setPassword(busTopicParams.getPassword());
389 public Iterable<String> fetch() throws IOException {
390 final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
391 if (response == null) {
392 logger.warn("{}: DMaaP NULL response received", this);
394 sleepAfterFetchFailure();
395 return new ArrayList<>();
397 logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
398 response.getResponseMessage());
400 if (!"200".equals(response.getResponseCode())) {
402 logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
403 response.getResponseMessage());
405 sleepAfterFetchFailure();
411 if (response.getActualMessages() == null) {
412 return new ArrayList<>();
414 return response.getActualMessages();
419 public void close() {
421 this.consumer.close();
425 public String toString() {
426 return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
427 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
428 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
429 + consumer.getUsername() + "]";
436 public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
438 private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
440 private final Properties props;
443 * BusTopicParams contain the following parameters.
444 * MR Consumer Wrapper.
446 * <p>servers messaging bus hosts
449 * apiSecret API Secret
451 * aafPassword AAF Password
452 * consumerGroup Consumer Group
453 * consumerInstance Consumer Instance
454 * fetchTimeout Fetch Timeout
455 * fetchLimit Fetch Limit
457 * @param busTopicParams contains above listed params
458 * @throws MalformedURLException URL should be valid
460 public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
462 super(busTopicParams);
464 // super constructor sets servers = {""} if empty to avoid errors when using DME2
465 if (busTopicParams.isServersInvalid()) {
466 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
469 this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
471 props = new Properties();
473 if (busTopicParams.isUseHttps()) {
474 props.setProperty(PROTOCOL_PROP, "https");
475 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
478 props.setProperty(PROTOCOL_PROP, "http");
479 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
482 this.consumer.setProps(props);
483 logger.info("{}: CREATION", this);
487 public String toString() {
488 final MRConsumerImpl consumer = this.consumer;
490 return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
491 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
492 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
493 + consumer.getUsername() + "]";
497 public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
499 private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
501 private final Properties props;
506 * @param busTopicParams topic paramters
508 * @throws MalformedURLException must provide a valid URL
510 public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
513 super(busTopicParams);
516 final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
517 ? busTopicParams.getAdditionalProps().get(
518 PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
521 if (busTopicParams.isEnvironmentInvalid()) {
522 throw parmException(busTopicParams.getTopic(),
523 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
525 if (busTopicParams.isAftEnvironmentInvalid()) {
526 throw parmException(busTopicParams.getTopic(),
527 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
529 if (busTopicParams.isLatitudeInvalid()) {
530 throw parmException(busTopicParams.getTopic(),
531 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
533 if (busTopicParams.isLongitudeInvalid()) {
534 throw parmException(busTopicParams.getTopic(),
535 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
538 if ((busTopicParams.isPartnerInvalid())
539 && StringUtils.isBlank(dme2RouteOffer)) {
540 throw new IllegalArgumentException(
541 "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
542 + "." + busTopicParams.getTopic()
543 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
544 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
545 + busTopicParams.getTopic()
546 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
549 final String serviceName = busTopicParams.getServers().get(0);
551 this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
553 this.consumer.setUsername(busTopicParams.getUserName());
554 this.consumer.setPassword(busTopicParams.getPassword());
556 props = new Properties();
558 props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
560 props.setProperty("username", busTopicParams.getUserName());
561 props.setProperty("password", busTopicParams.getPassword());
563 /* These are required, no defaults */
564 props.setProperty("topic", busTopicParams.getTopic());
566 props.setProperty("Environment", busTopicParams.getEnvironment());
567 props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
569 if (busTopicParams.getPartner() != null) {
570 props.setProperty("Partner", busTopicParams.getPartner());
572 if (dme2RouteOffer != null) {
573 props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
576 props.setProperty("Latitude", busTopicParams.getLatitude());
577 props.setProperty("Longitude", busTopicParams.getLongitude());
579 /* These are optional, will default to these values if not set in additionalProps */
580 props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
581 props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
582 props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
583 props.setProperty("Version", "1.0");
584 props.setProperty("SubContextPath", "/");
585 props.setProperty("sessionstickinessrequired", "no");
587 /* These should not change */
588 props.setProperty("TransportType", "DME2");
589 props.setProperty("MethodType", "GET");
591 if (busTopicParams.isUseHttps()) {
592 props.setProperty(PROTOCOL_PROP, "https");
595 props.setProperty(PROTOCOL_PROP, "http");
598 props.setProperty("contenttype", "application/json");
600 if (busTopicParams.isAdditionalPropsValid()) {
601 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
602 props.put(entry.getKey(), entry.getValue());
606 MRClientFactory.prop = props;
607 this.consumer.setProps(props);
609 logger.info("{}: CREATION", this);
612 private IllegalArgumentException parmException(String topic, String propnm) {
613 return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
614 + topic + propnm + " property for DME2 in DMaaP");