2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
24 import com.att.nsa.cambria.client.CambriaClientBuilders;
25 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
26 import com.att.nsa.cambria.client.CambriaConsumer;
28 import java.io.IOException;
29 import java.net.MalformedURLException;
30 import java.security.GeneralSecurityException;
31 import java.util.ArrayList;
33 import java.util.Properties;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit;
36 import org.apache.commons.lang3.StringUtils;
37 import org.onap.dmaap.mr.client.MRClientFactory;
38 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
39 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
40 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
41 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory;
42 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
47 * Wrapper around libraries to consume from message bus.
49 public interface BusConsumer {
54 * @return list of messages
55 * @throws Exception when error encountered by underlying libraries
57 public Iterable<String> fetch() throws InterruptedException, IOException;
60 * close underlying library consumer.
65 * BusConsumer that supports server-side filtering.
67 public interface FilterableBusConsumer extends BusConsumer {
70 * Sets the server-side filter.
72 * @param filter new filter value, or {@code null}
73 * @throws IllegalArgumentException if the consumer cannot be built with the new filter
75 public void setFilter(String filter);
79 * Cambria based consumer.
81 public static class CambriaConsumerWrapper implements FilterableBusConsumer {
86 private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
89 * Used to build the consumer.
91 private final ConsumerBuilder builder;
94 * Locked while updating {@link #consumer} and {@link #newConsumer}.
96 private final Object consLocker = new Object();
101 private CambriaConsumer consumer;
104 * Cambria client to use for next fetch.
106 private CambriaConsumer newConsumer = null;
111 protected int fetchTimeout;
116 protected CountDownLatch closeCondition = new CountDownLatch(1);
119 * Cambria Consumer Wrapper.
120 * BusTopicParam object contains the following parameters
121 * servers messaging bus hosts.
124 * apiSecret API Secret
125 * consumerGroup Consumer Group
126 * consumerInstance Consumer Instance
127 * fetchTimeout Fetch Timeout
128 * fetchLimit Fetch Limit
130 * @param busTopicParams - The parameters for the bus topic
131 * @throws GeneralSecurityException - Security exception
132 * @throws MalformedURLException - Malformed URL exception
134 public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
136 this.fetchTimeout = busTopicParams.getFetchTimeout();
138 this.builder = new CambriaClientBuilders.ConsumerBuilder();
140 builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
141 .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
142 .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
144 // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
145 builder.withSocketTimeout(fetchTimeout + 30000);
147 if (busTopicParams.isUseHttps()) {
148 builder.usingHttps();
150 if (busTopicParams.isAllowSelfSignedCerts()) {
151 builder.allowSelfSignedCertificates();
155 if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
156 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
159 if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
160 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
164 this.consumer = builder.build();
165 } catch (MalformedURLException | GeneralSecurityException e) {
166 throw new IllegalArgumentException(e);
171 public Iterable<String> fetch() throws IOException, InterruptedException {
173 return getCurrentConsumer().fetch();
174 } catch (final IOException e) {
175 logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
176 this.fetchTimeout, e);
178 this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS);
185 public void close() {
186 this.closeCondition.countDown();
187 getCurrentConsumer().close();
190 private CambriaConsumer getCurrentConsumer() {
191 CambriaConsumer old = null;
194 synchronized (consLocker) {
195 if (this.newConsumer != null) {
196 // replace old consumer with new consumer
198 this.consumer = this.newConsumer;
199 this.newConsumer = null;
213 public void setFilter(String filter) {
214 logger.info("{}: setting DMAAP server-side filter: {}", this, filter);
215 builder.withServerSideFilter(filter);
218 CambriaConsumer previous;
219 synchronized (consLocker) {
220 previous = this.newConsumer;
221 this.newConsumer = builder.build();
224 if (previous != null) {
225 // there was already a new consumer - close it
229 } catch (MalformedURLException | GeneralSecurityException e) {
231 * Since an exception occurred, "consumer" still has its old value, thus it should
232 * not be closed at this point.
234 throw new IllegalArgumentException(e);
239 public String toString() {
240 return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
247 public abstract class DmaapConsumerWrapper implements BusConsumer {
252 private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
255 * Name of the "protocol" property.
257 protected static final String PROTOCOL_PROP = "Protocol";
262 protected int fetchTimeout;
267 protected CountDownLatch closeCondition = new CountDownLatch(1);
272 protected MRConsumerImpl consumer;
275 * MR Consumer Wrapper.
277 * <p>servers messaging bus hosts
280 * apiSecret API Secret
282 * password AAF Password
283 * consumerGroup Consumer Group
284 * consumerInstance Consumer Instance
285 * fetchTimeout Fetch Timeout
286 * fetchLimit Fetch Limit
288 * @param busTopicParams contains above listed attributes
289 * @throws MalformedURLException URL should be valid
291 public DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
293 this.fetchTimeout = busTopicParams.getFetchTimeout();
295 if (busTopicParams.isTopicInvalid()) {
296 throw new IllegalArgumentException("No topic for DMaaP");
299 this.consumer = new MRConsumerImpl(busTopicParams.getServers(), busTopicParams.getTopic(),
300 busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance(),
301 busTopicParams.getFetchTimeout(), busTopicParams.getFetchLimit(), null,
302 busTopicParams.getApiKey(), busTopicParams.getApiSecret());
304 this.consumer.setUsername(busTopicParams.getUserName());
305 this.consumer.setPassword(busTopicParams.getPassword());
309 public Iterable<String> fetch() throws InterruptedException, IOException {
310 final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
311 if (response == null) {
312 logger.warn("{}: DMaaP NULL response received", this);
314 closeCondition.await(fetchTimeout, TimeUnit.MILLISECONDS);
315 return new ArrayList<>();
317 logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
318 response.getResponseMessage());
320 if (!"200".equals(response.getResponseCode())) {
322 logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
323 response.getResponseMessage());
325 closeCondition.await(fetchTimeout, TimeUnit.MILLISECONDS);
331 if (response.getActualMessages() == null) {
332 return new ArrayList<>();
334 return response.getActualMessages();
339 public void close() {
340 this.closeCondition.countDown();
341 this.consumer.close();
345 public String toString() {
346 return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
347 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
348 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
349 + consumer.getUsername() + "]";
356 public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
358 private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
360 private final Properties props;
363 * BusTopicParams contain the following parameters.
364 * MR Consumer Wrapper.
366 * <p>servers messaging bus hosts
369 * apiSecret API Secret
371 * aafPassword AAF Password
372 * consumerGroup Consumer Group
373 * consumerInstance Consumer Instance
374 * fetchTimeout Fetch Timeout
375 * fetchLimit Fetch Limit
377 * @param busTopicParams contains above listed params
378 * @throws MalformedURLException URL should be valid
380 public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
382 super(busTopicParams);
384 // super constructor sets servers = {""} if empty to avoid errors when using DME2
385 if (busTopicParams.isServersInvalid()) {
386 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
389 this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
391 props = new Properties();
393 if (busTopicParams.isUseHttps()) {
394 props.setProperty(PROTOCOL_PROP, "https");
395 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
398 props.setProperty(PROTOCOL_PROP, "http");
399 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
402 this.consumer.setProps(props);
403 logger.info("{}: CREATION", this);
407 public String toString() {
408 final MRConsumerImpl consumer = this.consumer;
410 return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
411 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
412 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
413 + consumer.getUsername() + "]";
417 public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
419 private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
421 private final Properties props;
426 * @param busTopicParams topic paramters
428 * @throws MalformedURLException must provide a valid URL
430 public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
433 super(busTopicParams);
436 final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
437 ? busTopicParams.getAdditionalProps().get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY)
440 if (busTopicParams.isEnvironmentInvalid()) {
441 throw parmException(busTopicParams.getTopic(),
442 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
444 if (busTopicParams.isAftEnvironmentInvalid()) {
445 throw parmException(busTopicParams.getTopic(),
446 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
448 if (busTopicParams.isLatitudeInvalid()) {
449 throw parmException(busTopicParams.getTopic(),
450 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
452 if (busTopicParams.isLongitudeInvalid()) {
453 throw parmException(busTopicParams.getTopic(),
454 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
457 if ((busTopicParams.isPartnerInvalid())
458 && StringUtils.isBlank(dme2RouteOffer)) {
459 throw new IllegalArgumentException(
460 "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
461 + "." + busTopicParams.getTopic()
462 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
463 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
464 + busTopicParams.getTopic()
465 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
468 final String serviceName = busTopicParams.getServers().get(0);
470 this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
472 this.consumer.setUsername(busTopicParams.getUserName());
473 this.consumer.setPassword(busTopicParams.getPassword());
475 props = new Properties();
477 props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
479 props.setProperty("username", busTopicParams.getUserName());
480 props.setProperty("password", busTopicParams.getPassword());
482 /* These are required, no defaults */
483 props.setProperty("topic", busTopicParams.getTopic());
485 props.setProperty("Environment", busTopicParams.getEnvironment());
486 props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
488 if (busTopicParams.getPartner() != null) {
489 props.setProperty("Partner", busTopicParams.getPartner());
491 if (dme2RouteOffer != null) {
492 props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
495 props.setProperty("Latitude", busTopicParams.getLatitude());
496 props.setProperty("Longitude", busTopicParams.getLongitude());
498 /* These are optional, will default to these values if not set in additionalProps */
499 props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
500 props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
501 props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
502 props.setProperty("Version", "1.0");
503 props.setProperty("SubContextPath", "/");
504 props.setProperty("sessionstickinessrequired", "no");
506 /* These should not change */
507 props.setProperty("TransportType", "DME2");
508 props.setProperty("MethodType", "GET");
510 if (busTopicParams.isUseHttps()) {
511 props.setProperty(PROTOCOL_PROP, "https");
514 props.setProperty(PROTOCOL_PROP, "http");
517 props.setProperty("contenttype", "application/json");
519 if (busTopicParams.isAdditionalPropsValid()) {
520 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
521 props.put(entry.getKey(), entry.getValue());
525 MRClientFactory.prop = props;
526 this.consumer.setProps(props);
528 logger.info("{}: CREATION", this);
531 private IllegalArgumentException parmException(String topic, String propnm) {
532 return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
533 + topic + propnm + " property for DME2 in DMaaP");