2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
24 import com.att.nsa.cambria.client.CambriaClientBuilders;
25 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
26 import com.att.nsa.cambria.client.CambriaConsumer;
27 import java.io.IOException;
28 import java.net.MalformedURLException;
29 import java.security.GeneralSecurityException;
30 import java.util.ArrayList;
32 import java.util.Properties;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.TimeUnit;
35 import org.apache.commons.lang3.StringUtils;
36 import org.onap.dmaap.mr.client.MRClientFactory;
37 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
38 import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
39 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
40 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
41 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
46 * Wrapper around libraries to consume from message bus.
48 public interface BusConsumer {
53 * @return list of messages
54 * @throws IOException when error encountered by underlying libraries
56 public Iterable<String> fetch() throws IOException;
59 * close underlying library consumer.
64 * BusConsumer that supports server-side filtering.
66 public interface FilterableBusConsumer extends BusConsumer {
69 * Sets the server-side filter.
71 * @param filter new filter value, or {@code null}
72 * @throws IllegalArgumentException if the consumer cannot be built with the new filter
74 public void setFilter(String filter);
78 * Cambria based consumer.
80 public static class CambriaConsumerWrapper implements FilterableBusConsumer {
85 private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
88 * Used to build the consumer.
90 private final ConsumerBuilder builder;
93 * Locked while updating {@link #consumer} and {@link #newConsumer}.
95 private final Object consLocker = new Object();
100 private CambriaConsumer consumer;
103 * Cambria client to use for next fetch.
105 private CambriaConsumer newConsumer = null;
110 protected int fetchTimeout;
115 protected CountDownLatch closeCondition = new CountDownLatch(1);
118 * Cambria Consumer Wrapper.
119 * BusTopicParam object contains the following parameters
120 * servers messaging bus hosts.
123 * apiSecret API Secret
124 * consumerGroup Consumer Group
125 * consumerInstance Consumer Instance
126 * fetchTimeout Fetch Timeout
127 * fetchLimit Fetch Limit
129 * @param busTopicParams - The parameters for the bus topic
130 * @throws GeneralSecurityException - Security exception
131 * @throws MalformedURLException - Malformed URL exception
133 public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
135 this.fetchTimeout = busTopicParams.getFetchTimeout();
137 this.builder = new CambriaClientBuilders.ConsumerBuilder();
139 builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
140 .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
141 .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
143 // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
144 builder.withSocketTimeout(fetchTimeout + 30000);
146 if (busTopicParams.isUseHttps()) {
147 builder.usingHttps();
149 if (busTopicParams.isAllowSelfSignedCerts()) {
150 builder.allowSelfSignedCertificates();
154 if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
155 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
158 if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
159 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
163 this.consumer = builder.build();
164 } catch (MalformedURLException | GeneralSecurityException e) {
165 throw new IllegalArgumentException(e);
170 public Iterable<String> fetch() throws IOException {
172 return getCurrentConsumer().fetch();
173 } catch (final IOException e) { //NOSONAR
174 logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
176 sleepAfterFetchFailure();
181 private void sleepAfterFetchFailure() {
183 this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR
185 } catch (InterruptedException e) {
186 logger.warn("{}: interrupted while handling fetch error", this, e);
187 Thread.currentThread().interrupt();
192 public void close() {
193 this.closeCondition.countDown();
194 getCurrentConsumer().close();
197 private CambriaConsumer getCurrentConsumer() {
198 CambriaConsumer old = null;
201 synchronized (consLocker) {
202 if (this.newConsumer != null) {
203 // replace old consumer with new consumer
205 this.consumer = this.newConsumer;
206 this.newConsumer = null;
220 public void setFilter(String filter) {
221 logger.info("{}: setting DMAAP server-side filter: {}", this, filter);
222 builder.withServerSideFilter(filter);
225 CambriaConsumer previous;
226 synchronized (consLocker) {
227 previous = this.newConsumer;
228 this.newConsumer = builder.build();
231 if (previous != null) {
232 // there was already a new consumer - close it
236 } catch (MalformedURLException | GeneralSecurityException e) {
238 * Since an exception occurred, "consumer" still has its old value, thus it should
239 * not be closed at this point.
241 throw new IllegalArgumentException(e);
246 public String toString() {
247 return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
254 public abstract class DmaapConsumerWrapper implements BusConsumer {
259 private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
262 * Name of the "protocol" property.
264 protected static final String PROTOCOL_PROP = "Protocol";
269 protected int fetchTimeout;
274 protected CountDownLatch closeCondition = new CountDownLatch(1);
279 protected MRConsumerImpl consumer;
282 * MR Consumer Wrapper.
284 * <p>servers messaging bus hosts
287 * apiSecret API Secret
289 * password AAF Password
290 * consumerGroup Consumer Group
291 * consumerInstance Consumer Instance
292 * fetchTimeout Fetch Timeout
293 * fetchLimit Fetch Limit
295 * @param busTopicParams contains above listed attributes
296 * @throws MalformedURLException URL should be valid
298 public DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
300 this.fetchTimeout = busTopicParams.getFetchTimeout();
302 if (busTopicParams.isTopicInvalid()) {
303 throw new IllegalArgumentException("No topic for DMaaP");
306 this.consumer = new MRConsumerImplBuilder()
307 .setHostPart(busTopicParams.getServers())
308 .setTopic(busTopicParams.getTopic())
309 .setConsumerGroup(busTopicParams.getConsumerGroup())
310 .setConsumerId(busTopicParams.getConsumerInstance())
311 .setTimeoutMs(busTopicParams.getFetchTimeout())
312 .setLimit(busTopicParams.getFetchLimit())
313 .setApiKey(busTopicParams.getApiKey())
314 .setApiSecret(busTopicParams.getApiSecret())
315 .createMRConsumerImpl();
317 this.consumer.setUsername(busTopicParams.getUserName());
318 this.consumer.setPassword(busTopicParams.getPassword());
322 public Iterable<String> fetch() throws IOException {
323 final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
324 if (response == null) {
325 logger.warn("{}: DMaaP NULL response received", this);
327 sleepAfterFetchFailure();
328 return new ArrayList<>();
330 logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
331 response.getResponseMessage());
333 if (!"200".equals(response.getResponseCode())) {
335 logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
336 response.getResponseMessage());
338 sleepAfterFetchFailure();
344 if (response.getActualMessages() == null) {
345 return new ArrayList<>();
347 return response.getActualMessages();
351 private void sleepAfterFetchFailure() {
353 this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR
355 } catch (InterruptedException e) {
356 logger.warn("{}: interrupted while handling fetch error", this, e);
357 Thread.currentThread().interrupt();
362 public void close() {
363 this.closeCondition.countDown();
364 this.consumer.close();
368 public String toString() {
369 return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
370 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
371 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
372 + consumer.getUsername() + "]";
379 public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
381 private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
383 private final Properties props;
386 * BusTopicParams contain the following parameters.
387 * MR Consumer Wrapper.
389 * <p>servers messaging bus hosts
392 * apiSecret API Secret
394 * aafPassword AAF Password
395 * consumerGroup Consumer Group
396 * consumerInstance Consumer Instance
397 * fetchTimeout Fetch Timeout
398 * fetchLimit Fetch Limit
400 * @param busTopicParams contains above listed params
401 * @throws MalformedURLException URL should be valid
403 public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
405 super(busTopicParams);
407 // super constructor sets servers = {""} if empty to avoid errors when using DME2
408 if (busTopicParams.isServersInvalid()) {
409 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
412 this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
414 props = new Properties();
416 if (busTopicParams.isUseHttps()) {
417 props.setProperty(PROTOCOL_PROP, "https");
418 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
421 props.setProperty(PROTOCOL_PROP, "http");
422 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
425 this.consumer.setProps(props);
426 logger.info("{}: CREATION", this);
430 public String toString() {
431 final MRConsumerImpl consumer = this.consumer;
433 return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
434 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
435 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
436 + consumer.getUsername() + "]";
440 public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
442 private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
444 private final Properties props;
449 * @param busTopicParams topic paramters
451 * @throws MalformedURLException must provide a valid URL
453 public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
456 super(busTopicParams);
459 final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
460 ? busTopicParams.getAdditionalProps().get(
461 PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
464 if (busTopicParams.isEnvironmentInvalid()) {
465 throw parmException(busTopicParams.getTopic(),
466 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
468 if (busTopicParams.isAftEnvironmentInvalid()) {
469 throw parmException(busTopicParams.getTopic(),
470 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
472 if (busTopicParams.isLatitudeInvalid()) {
473 throw parmException(busTopicParams.getTopic(),
474 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
476 if (busTopicParams.isLongitudeInvalid()) {
477 throw parmException(busTopicParams.getTopic(),
478 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
481 if ((busTopicParams.isPartnerInvalid())
482 && StringUtils.isBlank(dme2RouteOffer)) {
483 throw new IllegalArgumentException(
484 "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
485 + "." + busTopicParams.getTopic()
486 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
487 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
488 + busTopicParams.getTopic()
489 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
492 final String serviceName = busTopicParams.getServers().get(0);
494 this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
496 this.consumer.setUsername(busTopicParams.getUserName());
497 this.consumer.setPassword(busTopicParams.getPassword());
499 props = new Properties();
501 props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
503 props.setProperty("username", busTopicParams.getUserName());
504 props.setProperty("password", busTopicParams.getPassword());
506 /* These are required, no defaults */
507 props.setProperty("topic", busTopicParams.getTopic());
509 props.setProperty("Environment", busTopicParams.getEnvironment());
510 props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
512 if (busTopicParams.getPartner() != null) {
513 props.setProperty("Partner", busTopicParams.getPartner());
515 if (dme2RouteOffer != null) {
516 props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
519 props.setProperty("Latitude", busTopicParams.getLatitude());
520 props.setProperty("Longitude", busTopicParams.getLongitude());
522 /* These are optional, will default to these values if not set in additionalProps */
523 props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
524 props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
525 props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
526 props.setProperty("Version", "1.0");
527 props.setProperty("SubContextPath", "/");
528 props.setProperty("sessionstickinessrequired", "no");
530 /* These should not change */
531 props.setProperty("TransportType", "DME2");
532 props.setProperty("MethodType", "GET");
534 if (busTopicParams.isUseHttps()) {
535 props.setProperty(PROTOCOL_PROP, "https");
538 props.setProperty(PROTOCOL_PROP, "http");
541 props.setProperty("contenttype", "application/json");
543 if (busTopicParams.isAdditionalPropsValid()) {
544 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
545 props.put(entry.getKey(), entry.getValue());
549 MRClientFactory.prop = props;
550 this.consumer.setProps(props);
552 logger.info("{}: CREATION", this);
555 private IllegalArgumentException parmException(String topic, String propnm) {
556 return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
557 + topic + propnm + " property for DME2 in DMaaP");