2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23 import com.att.nsa.cambria.client.CambriaClientBuilders;
24 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
25 import com.att.nsa.cambria.client.CambriaConsumer;
26 import com.att.nsa.mr.client.MRClientFactory;
27 import com.att.nsa.mr.client.impl.MRConsumerImpl;
28 import com.att.nsa.mr.client.response.MRConsumerResponse;
29 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
31 import java.io.IOException;
32 import java.net.MalformedURLException;
33 import java.security.GeneralSecurityException;
34 import java.util.ArrayList;
35 import java.util.List;
37 import java.util.Properties;
39 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory;
40 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
45 * Wrapper around libraries to consume from message bus
48 public interface BusConsumer {
53 * @return list of messages
54 * @throws Exception when error encountered by underlying libraries
56 public Iterable<String> fetch() throws InterruptedException, IOException;
59 * close underlying library consumer
64 * BusConsumer that supports server-side filtering.
66 public interface FilterableBusConsumer extends BusConsumer {
69 * Sets the server-side filter.
71 * @param filter new filter value, or {@code null}
72 * @throws IllegalArgumentException if the consumer cannot be built with the new filter
74 public void setFilter(String filter);
78 * Cambria based consumer
80 public static class CambriaConsumerWrapper implements FilterableBusConsumer {
85 private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
88 * Used to build the consumer.
90 private final ConsumerBuilder builder;
93 * Locked while updating {@link #consumer} and {@link #newConsumer}.
95 private final Object consLocker = new Object();
100 private CambriaConsumer consumer;
103 * Cambria client to use for next fetch
105 private CambriaConsumer newConsumer = null;
110 protected int fetchTimeout;
115 protected Object closeCondition = new Object();
118 * Cambria Consumer Wrapper
120 * @param servers messaging bus hosts
122 * @param apiKey API Key
123 * @param apiSecret API Secret
124 * @param consumerGroup Consumer Group
125 * @param consumerInstance Consumer Instance
126 * @param fetchTimeout Fetch Timeout
127 * @param fetchLimit Fetch Limit
128 * @throws GeneralSecurityException
129 * @throws MalformedURLException
131 public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
132 String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps,
133 boolean useSelfSignedCerts) {
134 this(servers, topic, apiKey, apiSecret, null, null, consumerGroup, consumerInstance, fetchTimeout,
135 fetchLimit, useHttps, useSelfSignedCerts);
138 public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
139 String username, String password, String consumerGroup, String consumerInstance, int fetchTimeout,
140 int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) {
142 this.fetchTimeout = fetchTimeout;
144 this.builder = new CambriaClientBuilders.ConsumerBuilder();
146 builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
147 .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit);
149 // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
150 builder.withSocketTimeout(fetchTimeout + 30000);
153 builder.usingHttps();
155 if (useSelfSignedCerts) {
156 builder.allowSelfSignedCertificates();
160 if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) {
161 builder.authenticatedBy(apiKey, apiSecret);
164 if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) {
165 builder.authenticatedByHttp(username, password);
169 this.consumer = builder.build();
170 } catch (MalformedURLException | GeneralSecurityException e) {
171 throw new IllegalArgumentException(e);
176 public Iterable<String> fetch() throws IOException, InterruptedException {
178 return getCurrentConsumer().fetch();
179 } catch (final IOException e) {
180 logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
182 synchronized (this.closeCondition) {
183 this.closeCondition.wait(this.fetchTimeout);
191 public void close() {
192 synchronized (closeCondition) {
193 closeCondition.notifyAll();
196 getCurrentConsumer().close();
199 private CambriaConsumer getCurrentConsumer() {
200 CambriaConsumer old = null;
203 synchronized (consLocker) {
204 if (this.newConsumer != null) {
205 // replace old consumer with new consumer
207 this.consumer = this.newConsumer;
208 this.newConsumer = null;
222 public void setFilter(String filter) {
223 logger.info("{}: setting DMAAP server-side filter: {}", this, filter);
224 builder.withServerSideFilter(filter);
227 CambriaConsumer previous;
228 synchronized (consLocker) {
229 previous = this.newConsumer;
230 this.newConsumer = builder.build();
233 if (previous != null) {
234 // there was already a new consumer - close it
238 } catch (MalformedURLException | GeneralSecurityException e) {
240 * Since an exception occurred, "consumer" still has its old value, thus it should
241 * not be closed at this point.
243 throw new IllegalArgumentException(e);
248 public String toString() {
249 return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
256 public abstract class DmaapConsumerWrapper implements BusConsumer {
261 private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
264 * Name of the "protocol" property.
266 protected static final String PROTOCOL_PROP = "Protocol";
271 protected int fetchTimeout;
276 protected Object closeCondition = new Object();
281 protected MRConsumerImpl consumer;
284 * MR Consumer Wrapper
286 * @param servers messaging bus hosts
288 * @param apiKey API Key
289 * @param apiSecret API Secret
290 * @param username AAF Login
291 * @param password AAF Password
292 * @param consumerGroup Consumer Group
293 * @param consumerInstance Consumer Instance
294 * @param fetchTimeout Fetch Timeout
295 * @param fetchLimit Fetch Limit
296 * @throws MalformedURLException
298 public DmaapConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
299 String username, String password, String consumerGroup, String consumerInstance, int fetchTimeout,
300 int fetchLimit) throws MalformedURLException {
302 this.fetchTimeout = fetchTimeout;
304 if (topic == null || topic.isEmpty()) {
305 throw new IllegalArgumentException("No topic for DMaaP");
308 this.consumer = new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance, fetchTimeout,
309 fetchLimit, null, apiKey, apiSecret);
311 this.consumer.setUsername(username);
312 this.consumer.setPassword(password);
316 public Iterable<String> fetch() throws InterruptedException, IOException {
317 final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
318 if (response == null) {
319 logger.warn("{}: DMaaP NULL response received", this);
321 synchronized (closeCondition) {
322 closeCondition.wait(fetchTimeout);
324 return new ArrayList<>();
326 logger.debug("DMaaP consumer received {} : {}" + response.getResponseCode(),
327 response.getResponseMessage());
329 if (response.getResponseCode() == null || !"200".equals(response.getResponseCode())) {
331 logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
332 response.getResponseMessage());
334 synchronized (closeCondition) {
335 closeCondition.wait(fetchTimeout);
342 if (response.getActualMessages() == null) {
343 return new ArrayList<>();
345 return response.getActualMessages();
350 public void close() {
351 synchronized (closeCondition) {
352 closeCondition.notifyAll();
355 this.consumer.close();
359 public String toString() {
360 return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
361 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
362 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
363 + consumer.getUsername() + "]";
370 public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
372 private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
374 private final Properties props;
377 * MR Consumer Wrapper
379 * @param servers messaging bus hosts
381 * @param apiKey API Key
382 * @param apiSecret API Secret
383 * @param aafLogin AAF Login
384 * @param aafPassword AAF Password
385 * @param consumerGroup Consumer Group
386 * @param consumerInstance Consumer Instance
387 * @param fetchTimeout Fetch Timeout
388 * @param fetchLimit Fetch Limit
389 * @throws MalformedURLException
391 public DmaapAafConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
392 String aafLogin, String aafPassword, String consumerGroup, String consumerInstance, int fetchTimeout,
393 int fetchLimit, boolean useHttps) throws MalformedURLException {
395 super(servers, topic, apiKey, apiSecret, aafLogin, aafPassword, consumerGroup, consumerInstance,
396 fetchTimeout, fetchLimit);
398 // super constructor sets servers = {""} if empty to avoid errors when using DME2
399 if ((servers.size() == 1 && ("".equals(servers.get(0)))) || (servers == null) || (servers.isEmpty())) {
400 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
403 this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
405 props = new Properties();
408 props.setProperty(PROTOCOL_PROP, "https");
409 this.consumer.setHost(servers.get(0) + ":3905");
412 props.setProperty(PROTOCOL_PROP, "http");
413 this.consumer.setHost(servers.get(0) + ":3904");
416 this.consumer.setProps(props);
417 logger.info("{}: CREATION", this);
421 public String toString() {
422 final MRConsumerImpl consumer = this.consumer;
424 return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
425 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
426 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
427 + consumer.getUsername() + "]";
431 public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
433 private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
435 private final Properties props;
437 public DmaapDmeConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
438 String dme2Login, String dme2Password, String consumerGroup, String consumerInstance, int fetchTimeout,
439 int fetchLimit, String environment, String aftEnvironment, String dme2Partner, String latitude,
440 String longitude, Map<String, String> additionalProps, boolean useHttps) throws MalformedURLException {
444 super(servers, topic, apiKey, apiSecret, dme2Login, dme2Password, consumerGroup, consumerInstance,
445 fetchTimeout, fetchLimit);
448 final String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
450 if (environment == null || environment.isEmpty()) {
451 throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
453 if (aftEnvironment == null || aftEnvironment.isEmpty()) {
454 throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
456 if (latitude == null || latitude.isEmpty()) {
457 throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
459 if (longitude == null || longitude.isEmpty()) {
460 throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
463 if ((dme2Partner == null || dme2Partner.isEmpty())
464 && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
465 throw new IllegalArgumentException(
466 "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
467 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
468 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
469 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
472 final String serviceName = servers.get(0);
474 this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
476 this.consumer.setUsername(dme2Login);
477 this.consumer.setPassword(dme2Password);
479 props = new Properties();
481 props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
483 props.setProperty("username", dme2Login);
484 props.setProperty("password", dme2Password);
486 /* These are required, no defaults */
487 props.setProperty("topic", topic);
489 props.setProperty("Environment", environment);
490 props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
492 if (dme2Partner != null) {
493 props.setProperty("Partner", dme2Partner);
495 if (dme2RouteOffer != null) {
496 props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
499 props.setProperty("Latitude", latitude);
500 props.setProperty("Longitude", longitude);
502 /* These are optional, will default to these values if not set in additionalProps */
503 props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
504 props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
505 props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
506 props.setProperty("Version", "1.0");
507 props.setProperty("SubContextPath", "/");
508 props.setProperty("sessionstickinessrequired", "no");
510 /* These should not change */
511 props.setProperty("TransportType", "DME2");
512 props.setProperty("MethodType", "GET");
515 props.setProperty(PROTOCOL_PROP, "https");
518 props.setProperty(PROTOCOL_PROP, "http");
521 props.setProperty("contenttype", "application/json");
523 if (additionalProps != null) {
524 for (Map.Entry<String, String> entry : additionalProps.entrySet()) {
525 props.put(entry.getKey(), entry.getValue());
529 MRClientFactory.prop = props;
530 this.consumer.setProps(props);
532 logger.info("{}: CREATION", this);
535 private IllegalArgumentException parmException(String topic, String propnm) {
536 return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
537 + topic + propnm + " property for DME2 in DMaaP");