2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
24 import com.att.nsa.cambria.client.CambriaClientBuilders;
25 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
26 import com.att.nsa.cambria.client.CambriaConsumer;
27 import java.io.IOException;
28 import java.net.MalformedURLException;
29 import java.security.GeneralSecurityException;
30 import java.util.ArrayList;
32 import java.util.Properties;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.TimeUnit;
35 import org.apache.commons.lang3.StringUtils;
36 import org.onap.dmaap.mr.client.MRClientFactory;
37 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
38 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
39 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
40 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
45 * Wrapper around libraries to consume from message bus.
47 public interface BusConsumer {
52 * @return list of messages
53 * @throws IOException when error encountered by underlying libraries
55 public Iterable<String> fetch() throws IOException;
58 * close underlying library consumer.
63 * BusConsumer that supports server-side filtering.
65 public interface FilterableBusConsumer extends BusConsumer {
68 * Sets the server-side filter.
70 * @param filter new filter value, or {@code null}
71 * @throws IllegalArgumentException if the consumer cannot be built with the new filter
73 public void setFilter(String filter);
77 * Cambria based consumer.
79 public static class CambriaConsumerWrapper implements FilterableBusConsumer {
84 private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
87 * Used to build the consumer.
89 private final ConsumerBuilder builder;
92 * Locked while updating {@link #consumer} and {@link #newConsumer}.
94 private final Object consLocker = new Object();
99 private CambriaConsumer consumer;
102 * Cambria client to use for next fetch.
104 private CambriaConsumer newConsumer = null;
109 protected int fetchTimeout;
114 protected CountDownLatch closeCondition = new CountDownLatch(1);
117 * Cambria Consumer Wrapper.
118 * BusTopicParam object contains the following parameters
119 * servers messaging bus hosts.
122 * apiSecret API Secret
123 * consumerGroup Consumer Group
124 * consumerInstance Consumer Instance
125 * fetchTimeout Fetch Timeout
126 * fetchLimit Fetch Limit
128 * @param busTopicParams - The parameters for the bus topic
129 * @throws GeneralSecurityException - Security exception
130 * @throws MalformedURLException - Malformed URL exception
132 public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
134 this.fetchTimeout = busTopicParams.getFetchTimeout();
136 this.builder = new CambriaClientBuilders.ConsumerBuilder();
138 builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
139 .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
140 .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
142 // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
143 builder.withSocketTimeout(fetchTimeout + 30000);
145 if (busTopicParams.isUseHttps()) {
146 builder.usingHttps();
148 if (busTopicParams.isAllowSelfSignedCerts()) {
149 builder.allowSelfSignedCertificates();
153 if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
154 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
157 if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
158 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
162 this.consumer = builder.build();
163 } catch (MalformedURLException | GeneralSecurityException e) {
164 throw new IllegalArgumentException(e);
169 public Iterable<String> fetch() throws IOException {
171 return getCurrentConsumer().fetch();
172 } catch (final IOException e) { //NOSONAR
173 logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
175 sleepAfterFetchFailure();
180 private void sleepAfterFetchFailure() {
182 this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR
184 } catch (InterruptedException e) {
185 logger.warn("{}: interrupted while handling fetch error", this, e);
186 Thread.currentThread().interrupt();
191 public void close() {
192 this.closeCondition.countDown();
193 getCurrentConsumer().close();
196 private CambriaConsumer getCurrentConsumer() {
197 CambriaConsumer old = null;
200 synchronized (consLocker) {
201 if (this.newConsumer != null) {
202 // replace old consumer with new consumer
204 this.consumer = this.newConsumer;
205 this.newConsumer = null;
219 public void setFilter(String filter) {
220 logger.info("{}: setting DMAAP server-side filter: {}", this, filter);
221 builder.withServerSideFilter(filter);
224 CambriaConsumer previous;
225 synchronized (consLocker) {
226 previous = this.newConsumer;
227 this.newConsumer = builder.build();
230 if (previous != null) {
231 // there was already a new consumer - close it
235 } catch (MalformedURLException | GeneralSecurityException e) {
237 * Since an exception occurred, "consumer" still has its old value, thus it should
238 * not be closed at this point.
240 throw new IllegalArgumentException(e);
245 public String toString() {
246 return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
253 public abstract class DmaapConsumerWrapper implements BusConsumer {
258 private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
261 * Name of the "protocol" property.
263 protected static final String PROTOCOL_PROP = "Protocol";
268 protected int fetchTimeout;
273 protected CountDownLatch closeCondition = new CountDownLatch(1);
278 protected MRConsumerImpl consumer;
281 * MR Consumer Wrapper.
283 * <p>servers messaging bus hosts
286 * apiSecret API Secret
288 * password AAF Password
289 * consumerGroup Consumer Group
290 * consumerInstance Consumer Instance
291 * fetchTimeout Fetch Timeout
292 * fetchLimit Fetch Limit
294 * @param busTopicParams contains above listed attributes
295 * @throws MalformedURLException URL should be valid
297 public DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
299 this.fetchTimeout = busTopicParams.getFetchTimeout();
301 if (busTopicParams.isTopicInvalid()) {
302 throw new IllegalArgumentException("No topic for DMaaP");
305 this.consumer = new MRConsumerImpl(busTopicParams.getServers(), busTopicParams.getTopic(),
306 busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance(),
307 busTopicParams.getFetchTimeout(), busTopicParams.getFetchLimit(), null,
308 busTopicParams.getApiKey(), busTopicParams.getApiSecret());
310 this.consumer.setUsername(busTopicParams.getUserName());
311 this.consumer.setPassword(busTopicParams.getPassword());
315 public Iterable<String> fetch() throws IOException {
316 final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
317 if (response == null) {
318 logger.warn("{}: DMaaP NULL response received", this);
320 sleepAfterFetchFailure();
321 return new ArrayList<>();
323 logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
324 response.getResponseMessage());
326 if (!"200".equals(response.getResponseCode())) {
328 logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
329 response.getResponseMessage());
331 sleepAfterFetchFailure();
337 if (response.getActualMessages() == null) {
338 return new ArrayList<>();
340 return response.getActualMessages();
344 private void sleepAfterFetchFailure() {
346 this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR
348 } catch (InterruptedException e) {
349 logger.warn("{}: interrupted while handling fetch error", this, e);
350 Thread.currentThread().interrupt();
355 public void close() {
356 this.closeCondition.countDown();
357 this.consumer.close();
361 public String toString() {
362 return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
363 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
364 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
365 + consumer.getUsername() + "]";
372 public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
374 private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
376 private final Properties props;
379 * BusTopicParams contain the following parameters.
380 * MR Consumer Wrapper.
382 * <p>servers messaging bus hosts
385 * apiSecret API Secret
387 * aafPassword AAF Password
388 * consumerGroup Consumer Group
389 * consumerInstance Consumer Instance
390 * fetchTimeout Fetch Timeout
391 * fetchLimit Fetch Limit
393 * @param busTopicParams contains above listed params
394 * @throws MalformedURLException URL should be valid
396 public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
398 super(busTopicParams);
400 // super constructor sets servers = {""} if empty to avoid errors when using DME2
401 if (busTopicParams.isServersInvalid()) {
402 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
405 this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
407 props = new Properties();
409 if (busTopicParams.isUseHttps()) {
410 props.setProperty(PROTOCOL_PROP, "https");
411 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
414 props.setProperty(PROTOCOL_PROP, "http");
415 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
418 this.consumer.setProps(props);
419 logger.info("{}: CREATION", this);
423 public String toString() {
424 final MRConsumerImpl consumer = this.consumer;
426 return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
427 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
428 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
429 + consumer.getUsername() + "]";
433 public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
435 private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
437 private final Properties props;
442 * @param busTopicParams topic paramters
444 * @throws MalformedURLException must provide a valid URL
446 public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
449 super(busTopicParams);
452 final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
453 ? busTopicParams.getAdditionalProps().get(
454 PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
457 if (busTopicParams.isEnvironmentInvalid()) {
458 throw parmException(busTopicParams.getTopic(),
459 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
461 if (busTopicParams.isAftEnvironmentInvalid()) {
462 throw parmException(busTopicParams.getTopic(),
463 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
465 if (busTopicParams.isLatitudeInvalid()) {
466 throw parmException(busTopicParams.getTopic(),
467 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
469 if (busTopicParams.isLongitudeInvalid()) {
470 throw parmException(busTopicParams.getTopic(),
471 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
474 if ((busTopicParams.isPartnerInvalid())
475 && StringUtils.isBlank(dme2RouteOffer)) {
476 throw new IllegalArgumentException(
477 "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
478 + "." + busTopicParams.getTopic()
479 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
480 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
481 + busTopicParams.getTopic()
482 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
485 final String serviceName = busTopicParams.getServers().get(0);
487 this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
489 this.consumer.setUsername(busTopicParams.getUserName());
490 this.consumer.setPassword(busTopicParams.getPassword());
492 props = new Properties();
494 props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
496 props.setProperty("username", busTopicParams.getUserName());
497 props.setProperty("password", busTopicParams.getPassword());
499 /* These are required, no defaults */
500 props.setProperty("topic", busTopicParams.getTopic());
502 props.setProperty("Environment", busTopicParams.getEnvironment());
503 props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
505 if (busTopicParams.getPartner() != null) {
506 props.setProperty("Partner", busTopicParams.getPartner());
508 if (dme2RouteOffer != null) {
509 props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
512 props.setProperty("Latitude", busTopicParams.getLatitude());
513 props.setProperty("Longitude", busTopicParams.getLongitude());
515 /* These are optional, will default to these values if not set in additionalProps */
516 props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
517 props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
518 props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
519 props.setProperty("Version", "1.0");
520 props.setProperty("SubContextPath", "/");
521 props.setProperty("sessionstickinessrequired", "no");
523 /* These should not change */
524 props.setProperty("TransportType", "DME2");
525 props.setProperty("MethodType", "GET");
527 if (busTopicParams.isUseHttps()) {
528 props.setProperty(PROTOCOL_PROP, "https");
531 props.setProperty(PROTOCOL_PROP, "http");
534 props.setProperty("contenttype", "application/json");
536 if (busTopicParams.isAdditionalPropsValid()) {
537 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
538 props.put(entry.getKey(), entry.getValue());
542 MRClientFactory.prop = props;
543 this.consumer.setProps(props);
545 logger.info("{}: CREATION", this);
548 private IllegalArgumentException parmException(String topic, String propnm) {
549 return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
550 + topic + propnm + " property for DME2 in DMaaP");