2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7 * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.common.endpoints.event.comm.bus.internal;
25 import com.att.nsa.cambria.client.CambriaClientBuilders;
26 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
27 import com.att.nsa.cambria.client.CambriaConsumer;
28 import java.io.IOException;
29 import java.net.MalformedURLException;
30 import java.security.GeneralSecurityException;
31 import java.util.ArrayList;
33 import java.util.Properties;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit;
37 import org.apache.commons.lang3.StringUtils;
38 import org.onap.dmaap.mr.client.MRClientFactory;
39 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
40 import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
41 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
42 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
43 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
48 * Wrapper around libraries to consume from message bus.
50 public interface BusConsumer {
55 * @return list of messages
56 * @throws IOException when error encountered by underlying libraries
58 public Iterable<String> fetch() throws IOException;
61 * close underlying library consumer.
66 * Consumer that handles fetch() failures by sleeping.
68 public abstract static class FetchingBusConsumer implements BusConsumer {
69 private static Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class);
74 protected int fetchTimeout;
77 * Time to sleep on a fetch failure.
80 private final int sleepTime;
83 * Counted down when {@link #close()} is invoked.
85 private final CountDownLatch closeCondition = new CountDownLatch(1);
89 * Constructs the object.
91 * @param busTopicParams parameters for the bus topic
93 protected FetchingBusConsumer(BusTopicParams busTopicParams) {
94 this.fetchTimeout = busTopicParams.getFetchTimeout();
96 if (this.fetchTimeout <= 0) {
97 this.sleepTime = PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH;
99 // don't sleep too long, even if fetch timeout is large
100 this.sleepTime = Math.min(this.fetchTimeout, PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
105 * Causes the thread to sleep; invoked after fetch() fails. If the consumer is closed,
106 * or the thread is interrupted, then this will return immediately.
108 protected void sleepAfterFetchFailure() {
110 logger.info("{}: backoff for {}ms", this, sleepTime);
111 if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) {
112 logger.info("{}: closed while handling fetch error", this);
115 } catch (InterruptedException e) {
116 logger.warn("{}: interrupted while handling fetch error", this, e);
117 Thread.currentThread().interrupt();
122 public void close() {
123 this.closeCondition.countDown();
128 * Cambria based consumer.
130 public static class CambriaConsumerWrapper extends FetchingBusConsumer {
135 private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
138 * Used to build the consumer.
140 private final ConsumerBuilder builder;
145 private final CambriaConsumer consumer;
148 * Cambria Consumer Wrapper.
149 * BusTopicParam object contains the following parameters
150 * servers messaging bus hosts.
153 * apiSecret API Secret
154 * consumerGroup Consumer Group
155 * consumerInstance Consumer Instance
156 * fetchTimeout Fetch Timeout
157 * fetchLimit Fetch Limit
159 * @param busTopicParams - The parameters for the bus topic
160 * @throws GeneralSecurityException - Security exception
161 * @throws MalformedURLException - Malformed URL exception
163 public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
164 super(busTopicParams);
166 this.builder = new CambriaClientBuilders.ConsumerBuilder();
168 builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
169 .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
170 .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
172 // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
173 builder.withSocketTimeout(fetchTimeout + 30000);
175 if (busTopicParams.isUseHttps()) {
176 builder.usingHttps();
178 if (busTopicParams.isAllowSelfSignedCerts()) {
179 builder.allowSelfSignedCertificates();
183 if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
184 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
187 if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
188 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
192 this.consumer = builder.build();
193 } catch (MalformedURLException | GeneralSecurityException e) {
194 throw new IllegalArgumentException(e);
199 public Iterable<String> fetch() throws IOException {
201 return this.consumer.fetch();
202 } catch (final IOException e) { //NOSONAR
203 logger.error("{}: cannot fetch because of {}", this, e.getMessage());
204 sleepAfterFetchFailure();
210 public void close() {
212 this.consumer.close();
216 public String toString() {
217 return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
224 public abstract class DmaapConsumerWrapper extends FetchingBusConsumer {
229 private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
232 * Name of the "protocol" property.
234 protected static final String PROTOCOL_PROP = "Protocol";
239 protected MRConsumerImpl consumer;
242 * MR Consumer Wrapper.
244 * <p>servers messaging bus hosts
247 * apiSecret API Secret
249 * password AAF Password
250 * consumerGroup Consumer Group
251 * consumerInstance Consumer Instance
252 * fetchTimeout Fetch Timeout
253 * fetchLimit Fetch Limit
255 * @param busTopicParams contains above listed attributes
256 * @throws MalformedURLException URL should be valid
258 protected DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
259 super(busTopicParams);
261 if (busTopicParams.isTopicInvalid()) {
262 throw new IllegalArgumentException("No topic for DMaaP");
265 this.consumer = new MRConsumerImplBuilder()
266 .setHostPart(busTopicParams.getServers())
267 .setTopic(busTopicParams.getTopic())
268 .setConsumerGroup(busTopicParams.getConsumerGroup())
269 .setConsumerId(busTopicParams.getConsumerInstance())
270 .setTimeoutMs(busTopicParams.getFetchTimeout())
271 .setLimit(busTopicParams.getFetchLimit())
272 .setApiKey(busTopicParams.getApiKey())
273 .setApiSecret(busTopicParams.getApiSecret())
274 .createMRConsumerImpl();
276 this.consumer.setUsername(busTopicParams.getUserName());
277 this.consumer.setPassword(busTopicParams.getPassword());
281 public Iterable<String> fetch() throws IOException {
282 final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
283 if (response == null) {
284 logger.warn("{}: DMaaP NULL response received", this);
286 sleepAfterFetchFailure();
287 return new ArrayList<>();
289 logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
290 response.getResponseMessage());
292 if (!"200".equals(response.getResponseCode())) {
294 logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
295 response.getResponseMessage());
297 sleepAfterFetchFailure();
303 if (response.getActualMessages() == null) {
304 return new ArrayList<>();
306 return response.getActualMessages();
311 public void close() {
313 this.consumer.close();
317 public String toString() {
318 return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
319 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
320 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
321 + consumer.getUsername() + "]";
328 public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
330 private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
332 private final Properties props;
335 * BusTopicParams contain the following parameters.
336 * MR Consumer Wrapper.
338 * <p>servers messaging bus hosts
341 * apiSecret API Secret
343 * aafPassword AAF Password
344 * consumerGroup Consumer Group
345 * consumerInstance Consumer Instance
346 * fetchTimeout Fetch Timeout
347 * fetchLimit Fetch Limit
349 * @param busTopicParams contains above listed params
350 * @throws MalformedURLException URL should be valid
352 public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
354 super(busTopicParams);
356 // super constructor sets servers = {""} if empty to avoid errors when using DME2
357 if (busTopicParams.isServersInvalid()) {
358 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
361 this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
363 props = new Properties();
365 if (busTopicParams.isUseHttps()) {
366 props.setProperty(PROTOCOL_PROP, "https");
367 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
370 props.setProperty(PROTOCOL_PROP, "http");
371 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
374 this.consumer.setProps(props);
375 logger.info("{}: CREATION", this);
379 public String toString() {
380 final MRConsumerImpl consumer = this.consumer;
382 return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
383 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
384 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
385 + consumer.getUsername() + "]";
389 public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
391 private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
393 private final Properties props;
398 * @param busTopicParams topic paramters
400 * @throws MalformedURLException must provide a valid URL
402 public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
405 super(busTopicParams);
408 final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
409 ? busTopicParams.getAdditionalProps().get(
410 PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
413 if (busTopicParams.isEnvironmentInvalid()) {
414 throw parmException(busTopicParams.getTopic(),
415 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
417 if (busTopicParams.isAftEnvironmentInvalid()) {
418 throw parmException(busTopicParams.getTopic(),
419 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
421 if (busTopicParams.isLatitudeInvalid()) {
422 throw parmException(busTopicParams.getTopic(),
423 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
425 if (busTopicParams.isLongitudeInvalid()) {
426 throw parmException(busTopicParams.getTopic(),
427 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
430 if ((busTopicParams.isPartnerInvalid())
431 && StringUtils.isBlank(dme2RouteOffer)) {
432 throw new IllegalArgumentException(
433 "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
434 + "." + busTopicParams.getTopic()
435 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
436 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
437 + busTopicParams.getTopic()
438 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
441 final String serviceName = busTopicParams.getServers().get(0);
443 this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
445 this.consumer.setUsername(busTopicParams.getUserName());
446 this.consumer.setPassword(busTopicParams.getPassword());
448 props = new Properties();
450 props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
452 props.setProperty("username", busTopicParams.getUserName());
453 props.setProperty("password", busTopicParams.getPassword());
455 /* These are required, no defaults */
456 props.setProperty("topic", busTopicParams.getTopic());
458 props.setProperty("Environment", busTopicParams.getEnvironment());
459 props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
461 if (busTopicParams.getPartner() != null) {
462 props.setProperty("Partner", busTopicParams.getPartner());
464 if (dme2RouteOffer != null) {
465 props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
468 props.setProperty("Latitude", busTopicParams.getLatitude());
469 props.setProperty("Longitude", busTopicParams.getLongitude());
471 /* These are optional, will default to these values if not set in additionalProps */
472 props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
473 props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
474 props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
475 props.setProperty("Version", "1.0");
476 props.setProperty("SubContextPath", "/");
477 props.setProperty("sessionstickinessrequired", "no");
479 /* These should not change */
480 props.setProperty("TransportType", "DME2");
481 props.setProperty("MethodType", "GET");
483 if (busTopicParams.isUseHttps()) {
484 props.setProperty(PROTOCOL_PROP, "https");
487 props.setProperty(PROTOCOL_PROP, "http");
490 props.setProperty("contenttype", "application/json");
492 if (busTopicParams.isAdditionalPropsValid()) {
493 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
494 props.put(entry.getKey(), entry.getValue());
498 MRClientFactory.prop = props;
499 this.consumer.setProps(props);
501 logger.info("{}: CREATION", this);
504 private IllegalArgumentException parmException(String topic, String propnm) {
505 return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
506 + topic + propnm + " property for DME2 in DMaaP");