2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7 * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
8 * Copyright (C) 2022 Nordix Foundation.
9 * ================================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
21 * ============LICENSE_END=========================================================
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
26 import com.att.nsa.cambria.client.CambriaClientBuilders;
27 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
28 import com.att.nsa.cambria.client.CambriaConsumer;
29 import java.io.IOException;
30 import java.net.MalformedURLException;
31 import java.security.GeneralSecurityException;
32 import java.time.Duration;
33 import java.util.ArrayList;
34 import java.util.Arrays;
36 import java.util.Properties;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.TimeUnit;
40 import org.apache.commons.lang3.StringUtils;
41 import org.apache.kafka.clients.consumer.ConsumerConfig;
42 import org.apache.kafka.clients.consumer.ConsumerRecord;
43 import org.apache.kafka.clients.consumer.ConsumerRecords;
44 import org.apache.kafka.clients.consumer.KafkaConsumer;
45 import org.onap.dmaap.mr.client.MRClientFactory;
46 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
47 import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
48 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
49 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
50 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
55 * Wrapper around libraries to consume from message bus.
57 public interface BusConsumer {
62 * @return list of messages
63 * @throws IOException when error encountered by underlying libraries
65 public Iterable<String> fetch() throws IOException;
68 * close underlying library consumer.
73 * Consumer that handles fetch() failures by sleeping.
75 public abstract static class FetchingBusConsumer implements BusConsumer {
76 private static Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class);
81 protected int fetchTimeout;
84 * Time to sleep on a fetch failure.
87 private final int sleepTime;
90 * Counted down when {@link #close()} is invoked.
92 private final CountDownLatch closeCondition = new CountDownLatch(1);
96 * Constructs the object.
98 * @param busTopicParams parameters for the bus topic
100 protected FetchingBusConsumer(BusTopicParams busTopicParams) {
101 this.fetchTimeout = busTopicParams.getFetchTimeout();
103 if (this.fetchTimeout <= 0) {
104 this.sleepTime = PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH;
106 // don't sleep too long, even if fetch timeout is large
107 this.sleepTime = Math.min(this.fetchTimeout, PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
112 * Causes the thread to sleep; invoked after fetch() fails. If the consumer is closed,
113 * or the thread is interrupted, then this will return immediately.
115 protected void sleepAfterFetchFailure() {
117 logger.info("{}: backoff for {}ms", this, sleepTime);
118 if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) {
119 logger.info("{}: closed while handling fetch error", this);
122 } catch (InterruptedException e) {
123 logger.warn("{}: interrupted while handling fetch error", this, e);
124 Thread.currentThread().interrupt();
129 public void close() {
130 this.closeCondition.countDown();
135 * Cambria based consumer.
137 public static class CambriaConsumerWrapper extends FetchingBusConsumer {
142 private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
145 * Used to build the consumer.
147 private final ConsumerBuilder builder;
152 private final CambriaConsumer consumer;
155 * Cambria Consumer Wrapper.
156 * BusTopicParam object contains the following parameters
157 * servers messaging bus hosts.
160 * apiSecret API Secret
161 * consumerGroup Consumer Group
162 * consumerInstance Consumer Instance
163 * fetchTimeout Fetch Timeout
164 * fetchLimit Fetch Limit
166 * @param busTopicParams - The parameters for the bus topic
167 * @throws GeneralSecurityException - Security exception
168 * @throws MalformedURLException - Malformed URL exception
170 public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
171 super(busTopicParams);
173 this.builder = new CambriaClientBuilders.ConsumerBuilder();
175 builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
176 .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
177 .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
179 // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
180 builder.withSocketTimeout(fetchTimeout + 30000);
182 if (busTopicParams.isUseHttps()) {
183 builder.usingHttps();
185 if (busTopicParams.isAllowSelfSignedCerts()) {
186 builder.allowSelfSignedCertificates();
190 if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
191 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
194 if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
195 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
199 this.consumer = builder.build();
200 } catch (MalformedURLException | GeneralSecurityException e) {
201 throw new IllegalArgumentException(e);
206 public Iterable<String> fetch() throws IOException {
208 return this.consumer.fetch();
209 } catch (final IOException e) { //NOSONAR
210 logger.error("{}: cannot fetch because of {}", this, e.getMessage());
211 sleepAfterFetchFailure();
217 public void close() {
219 this.consumer.close();
223 public String toString() {
224 return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
229 * Kafka based consumer.
231 public static class KafkaConsumerWrapper extends FetchingBusConsumer {
236 private static Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
241 private KafkaConsumer<String, String> consumer;
244 * Kafka Consumer Wrapper.
245 * BusTopicParam object contains the following parameters
246 * servers messaging bus hosts.
249 * @param busTopicParams - The parameters for the bus topic
250 * @throws GeneralSecurityException - Security exception
251 * @throws MalformedURLException - Malformed URL exception
253 public KafkaConsumerWrapper(BusTopicParams busTopicParams) {
254 super(busTopicParams);
258 public Iterable<String> fetch() throws IOException {
259 // TODO: Not implemented yet
260 return new ArrayList<>();
264 public void close() {
266 this.consumer.close();
270 public String toString() {
271 return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
278 public abstract class DmaapConsumerWrapper extends FetchingBusConsumer {
283 private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
286 * Name of the "protocol" property.
288 protected static final String PROTOCOL_PROP = "Protocol";
293 protected MRConsumerImpl consumer;
296 * MR Consumer Wrapper.
298 * <p>servers messaging bus hosts
301 * apiSecret API Secret
303 * password AAF Password
304 * consumerGroup Consumer Group
305 * consumerInstance Consumer Instance
306 * fetchTimeout Fetch Timeout
307 * fetchLimit Fetch Limit
309 * @param busTopicParams contains above listed attributes
310 * @throws MalformedURLException URL should be valid
312 protected DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
313 super(busTopicParams);
315 if (busTopicParams.isTopicInvalid()) {
316 throw new IllegalArgumentException("No topic for DMaaP");
319 this.consumer = new MRConsumerImplBuilder()
320 .setHostPart(busTopicParams.getServers())
321 .setTopic(busTopicParams.getTopic())
322 .setConsumerGroup(busTopicParams.getConsumerGroup())
323 .setConsumerId(busTopicParams.getConsumerInstance())
324 .setTimeoutMs(busTopicParams.getFetchTimeout())
325 .setLimit(busTopicParams.getFetchLimit())
326 .setApiKey(busTopicParams.getApiKey())
327 .setApiSecret(busTopicParams.getApiSecret())
328 .createMRConsumerImpl();
330 this.consumer.setUsername(busTopicParams.getUserName());
331 this.consumer.setPassword(busTopicParams.getPassword());
335 public Iterable<String> fetch() throws IOException {
336 final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
337 if (response == null) {
338 logger.warn("{}: DMaaP NULL response received", this);
340 sleepAfterFetchFailure();
341 return new ArrayList<>();
343 logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
344 response.getResponseMessage());
346 if (!"200".equals(response.getResponseCode())) {
348 logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
349 response.getResponseMessage());
351 sleepAfterFetchFailure();
357 if (response.getActualMessages() == null) {
358 return new ArrayList<>();
360 return response.getActualMessages();
365 public void close() {
367 this.consumer.close();
371 public String toString() {
372 return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
373 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
374 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
375 + consumer.getUsername() + "]";
382 public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
384 private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
386 private final Properties props;
389 * BusTopicParams contain the following parameters.
390 * MR Consumer Wrapper.
392 * <p>servers messaging bus hosts
395 * apiSecret API Secret
397 * aafPassword AAF Password
398 * consumerGroup Consumer Group
399 * consumerInstance Consumer Instance
400 * fetchTimeout Fetch Timeout
401 * fetchLimit Fetch Limit
403 * @param busTopicParams contains above listed params
404 * @throws MalformedURLException URL should be valid
406 public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
408 super(busTopicParams);
410 // super constructor sets servers = {""} if empty to avoid errors when using DME2
411 if (busTopicParams.isServersInvalid()) {
412 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
415 this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
417 props = new Properties();
419 if (busTopicParams.isUseHttps()) {
420 props.setProperty(PROTOCOL_PROP, "https");
421 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
424 props.setProperty(PROTOCOL_PROP, "http");
425 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
428 this.consumer.setProps(props);
429 logger.info("{}: CREATION", this);
433 public String toString() {
434 final MRConsumerImpl consumer = this.consumer;
436 return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
437 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
438 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
439 + consumer.getUsername() + "]";
443 public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
445 private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
447 private final Properties props;
452 * @param busTopicParams topic paramters
454 * @throws MalformedURLException must provide a valid URL
456 public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
459 super(busTopicParams);
462 final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
463 ? busTopicParams.getAdditionalProps().get(
464 PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
467 if (busTopicParams.isEnvironmentInvalid()) {
468 throw parmException(busTopicParams.getTopic(),
469 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
471 if (busTopicParams.isAftEnvironmentInvalid()) {
472 throw parmException(busTopicParams.getTopic(),
473 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
475 if (busTopicParams.isLatitudeInvalid()) {
476 throw parmException(busTopicParams.getTopic(),
477 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
479 if (busTopicParams.isLongitudeInvalid()) {
480 throw parmException(busTopicParams.getTopic(),
481 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
484 if ((busTopicParams.isPartnerInvalid())
485 && StringUtils.isBlank(dme2RouteOffer)) {
486 throw new IllegalArgumentException(
487 "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
488 + "." + busTopicParams.getTopic()
489 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
490 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
491 + busTopicParams.getTopic()
492 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
495 final String serviceName = busTopicParams.getServers().get(0);
497 this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
499 this.consumer.setUsername(busTopicParams.getUserName());
500 this.consumer.setPassword(busTopicParams.getPassword());
502 props = new Properties();
504 props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
506 props.setProperty("username", busTopicParams.getUserName());
507 props.setProperty("password", busTopicParams.getPassword());
509 /* These are required, no defaults */
510 props.setProperty("topic", busTopicParams.getTopic());
512 props.setProperty("Environment", busTopicParams.getEnvironment());
513 props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
515 if (busTopicParams.getPartner() != null) {
516 props.setProperty("Partner", busTopicParams.getPartner());
518 if (dme2RouteOffer != null) {
519 props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
522 props.setProperty("Latitude", busTopicParams.getLatitude());
523 props.setProperty("Longitude", busTopicParams.getLongitude());
525 /* These are optional, will default to these values if not set in additionalProps */
526 props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
527 props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
528 props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
529 props.setProperty("Version", "1.0");
530 props.setProperty("SubContextPath", "/");
531 props.setProperty("sessionstickinessrequired", "no");
533 /* These should not change */
534 props.setProperty("TransportType", "DME2");
535 props.setProperty("MethodType", "GET");
537 if (busTopicParams.isUseHttps()) {
538 props.setProperty(PROTOCOL_PROP, "https");
541 props.setProperty(PROTOCOL_PROP, "http");
544 props.setProperty("contenttype", "application/json");
546 if (busTopicParams.isAdditionalPropsValid()) {
547 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
548 props.put(entry.getKey(), entry.getValue());
552 MRClientFactory.prop = props;
553 this.consumer.setProps(props);
555 logger.info("{}: CREATION", this);
558 private IllegalArgumentException parmException(String topic, String propnm) {
559 return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
560 + topic + propnm + " property for DME2 in DMaaP");