2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus.internal.impl;
23 import com.att.nsa.cambria.client.CambriaClientBuilders;
24 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
25 import com.att.nsa.cambria.client.CambriaConsumer;
27 import java.io.IOException;
28 import java.net.MalformedURLException;
29 import java.security.GeneralSecurityException;
30 import java.util.List;
32 import org.onap.policy.common.endpoints.event.comm.bus.internal.FilterableBusConsumer;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
37 * Cambria based consumer
39 public class CambriaConsumerWrapper implements FilterableBusConsumer {
44 private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
47 * Used to build the consumer.
49 private final ConsumerBuilder builder;
52 * Locked while updating {@link #consumer} and {@link #newConsumer}.
54 private final Object consLocker = new Object();
59 private CambriaConsumer consumer;
62 * Cambria client to use for next fetch
64 private CambriaConsumer newConsumer = null;
69 protected int fetchTimeout;
74 protected Object closeCondition = new Object();
77 * Cambria Consumer Wrapper
79 * @param servers messaging bus hosts
81 * @param apiKey API Key
82 * @param apiSecret API Secret
83 * @param consumerGroup Consumer Group
84 * @param consumerInstance Consumer Instance
85 * @param fetchTimeout Fetch Timeout
86 * @param fetchLimit Fetch Limit
87 * @throws GeneralSecurityException
88 * @throws MalformedURLException
90 public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
91 String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps,
92 boolean useSelfSignedCerts) {
93 this(servers, topic, apiKey, apiSecret, null, null, consumerGroup, consumerInstance, fetchTimeout, fetchLimit,
94 useHttps, useSelfSignedCerts);
97 public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, String username,
98 String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
99 boolean useHttps, boolean useSelfSignedCerts) {
101 this.fetchTimeout = fetchTimeout;
103 this.builder = new CambriaClientBuilders.ConsumerBuilder();
105 builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic).waitAtServer(fetchTimeout)
106 .receivingAtMost(fetchLimit);
108 // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
109 builder.withSocketTimeout(fetchTimeout + 30000);
112 builder.usingHttps();
114 if (useSelfSignedCerts) {
115 builder.allowSelfSignedCertificates();
119 if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) {
120 builder.authenticatedBy(apiKey, apiSecret);
123 if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) {
124 builder.authenticatedByHttp(username, password);
128 this.consumer = builder.build();
129 } catch (MalformedURLException | GeneralSecurityException e) {
130 throw new IllegalArgumentException(e);
135 public Iterable<String> fetch() throws IOException, InterruptedException {
137 return getCurrentConsumer().fetch();
138 } catch (final IOException e) {
139 logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
141 synchronized (this.closeCondition) {
142 this.closeCondition.wait(this.fetchTimeout);
150 public void close() {
151 synchronized (closeCondition) {
152 closeCondition.notifyAll();
155 getCurrentConsumer().close();
158 private CambriaConsumer getCurrentConsumer() {
159 CambriaConsumer old = null;
162 synchronized (consLocker) {
163 if (this.newConsumer != null) {
164 // replace old consumer with new consumer
166 this.consumer = this.newConsumer;
167 this.newConsumer = null;
181 public void setFilter(String filter) {
182 logger.info("{}: setting DMAAP server-side filter: {}", this, filter);
183 builder.withServerSideFilter(filter);
186 CambriaConsumer previous;
187 synchronized (consLocker) {
188 previous = this.newConsumer;
189 this.newConsumer = builder.build();
192 if (previous != null) {
193 // there was already a new consumer - close it
197 } catch (MalformedURLException | GeneralSecurityException e) {
199 * Since an exception occurred, "consumer" still has its old value, thus it should not
200 * be closed at this point.
202 throw new IllegalArgumentException(e);
207 public String toString() {
208 return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";