2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7 * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.common.endpoints.event.comm.bus.internal;
25 import com.att.nsa.cambria.client.CambriaClientBuilders;
26 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
27 import com.att.nsa.cambria.client.CambriaConsumer;
28 import java.io.IOException;
29 import java.net.MalformedURLException;
30 import java.security.GeneralSecurityException;
31 import java.util.ArrayList;
33 import java.util.Properties;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit;
36 import org.apache.commons.lang3.StringUtils;
37 import org.onap.dmaap.mr.client.MRClientFactory;
38 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
39 import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
40 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
41 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
42 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
47 * Wrapper around libraries to consume from message bus.
49 public interface BusConsumer {
54 * @return list of messages
55 * @throws IOException when error encountered by underlying libraries
57 public Iterable<String> fetch() throws IOException;
60 * close underlying library consumer.
65 * Consumer that handles fetch() failures by sleeping.
67 public abstract static class FetchingBusConsumer implements BusConsumer {
68 private static Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class);
73 protected int fetchTimeout;
76 * Counted down when {@link #close()} is invoked.
78 private CountDownLatch closeCondition = new CountDownLatch(1);
82 * Constructs the object.
84 * @param busTopicParams parameters for the bus topic
86 protected FetchingBusConsumer(BusTopicParams busTopicParams) {
87 this.fetchTimeout = busTopicParams.getFetchTimeout();
91 * Causes the thread to sleep; invoked after fetch() fails. If the consumer is closed,
92 * or the thread is interrupted, then this will return immediately.
94 protected void sleepAfterFetchFailure() {
96 if (this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS)) {
97 logger.info("{}: closed while handling fetch error", this);
100 } catch (InterruptedException e) {
101 logger.warn("{}: interrupted while handling fetch error", this, e);
102 Thread.currentThread().interrupt();
107 public void close() {
108 this.closeCondition.countDown();
113 * Cambria based consumer.
115 public static class CambriaConsumerWrapper extends FetchingBusConsumer {
120 private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
123 * Used to build the consumer.
125 private final ConsumerBuilder builder;
130 private final CambriaConsumer consumer;
133 * Cambria Consumer Wrapper.
134 * BusTopicParam object contains the following parameters
135 * servers messaging bus hosts.
138 * apiSecret API Secret
139 * consumerGroup Consumer Group
140 * consumerInstance Consumer Instance
141 * fetchTimeout Fetch Timeout
142 * fetchLimit Fetch Limit
144 * @param busTopicParams - The parameters for the bus topic
145 * @throws GeneralSecurityException - Security exception
146 * @throws MalformedURLException - Malformed URL exception
148 public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
149 super(busTopicParams);
151 this.builder = new CambriaClientBuilders.ConsumerBuilder();
153 builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
154 .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
155 .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
157 // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
158 builder.withSocketTimeout(fetchTimeout + 30000);
160 if (busTopicParams.isUseHttps()) {
161 builder.usingHttps();
163 if (busTopicParams.isAllowSelfSignedCerts()) {
164 builder.allowSelfSignedCertificates();
168 if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
169 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
172 if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
173 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
177 this.consumer = builder.build();
178 } catch (MalformedURLException | GeneralSecurityException e) {
179 throw new IllegalArgumentException(e);
184 public Iterable<String> fetch() throws IOException {
186 return this.consumer.fetch();
187 } catch (final IOException e) { //NOSONAR
188 logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
190 sleepAfterFetchFailure();
196 public void close() {
198 this.consumer.close();
202 public String toString() {
203 return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
210 public abstract class DmaapConsumerWrapper extends FetchingBusConsumer {
215 private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
218 * Name of the "protocol" property.
220 protected static final String PROTOCOL_PROP = "Protocol";
225 protected MRConsumerImpl consumer;
228 * MR Consumer Wrapper.
230 * <p>servers messaging bus hosts
233 * apiSecret API Secret
235 * password AAF Password
236 * consumerGroup Consumer Group
237 * consumerInstance Consumer Instance
238 * fetchTimeout Fetch Timeout
239 * fetchLimit Fetch Limit
241 * @param busTopicParams contains above listed attributes
242 * @throws MalformedURLException URL should be valid
244 protected DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
245 super(busTopicParams);
247 if (busTopicParams.isTopicInvalid()) {
248 throw new IllegalArgumentException("No topic for DMaaP");
251 this.consumer = new MRConsumerImplBuilder()
252 .setHostPart(busTopicParams.getServers())
253 .setTopic(busTopicParams.getTopic())
254 .setConsumerGroup(busTopicParams.getConsumerGroup())
255 .setConsumerId(busTopicParams.getConsumerInstance())
256 .setTimeoutMs(busTopicParams.getFetchTimeout())
257 .setLimit(busTopicParams.getFetchLimit())
258 .setApiKey(busTopicParams.getApiKey())
259 .setApiSecret(busTopicParams.getApiSecret())
260 .createMRConsumerImpl();
262 this.consumer.setUsername(busTopicParams.getUserName());
263 this.consumer.setPassword(busTopicParams.getPassword());
267 public Iterable<String> fetch() throws IOException {
268 final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
269 if (response == null) {
270 logger.warn("{}: DMaaP NULL response received", this);
272 sleepAfterFetchFailure();
273 return new ArrayList<>();
275 logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
276 response.getResponseMessage());
278 if (!"200".equals(response.getResponseCode())) {
280 logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
281 response.getResponseMessage());
283 sleepAfterFetchFailure();
289 if (response.getActualMessages() == null) {
290 return new ArrayList<>();
292 return response.getActualMessages();
297 public void close() {
299 this.consumer.close();
303 public String toString() {
304 return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
305 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
306 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
307 + consumer.getUsername() + "]";
314 public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
316 private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
318 private final Properties props;
321 * BusTopicParams contain the following parameters.
322 * MR Consumer Wrapper.
324 * <p>servers messaging bus hosts
327 * apiSecret API Secret
329 * aafPassword AAF Password
330 * consumerGroup Consumer Group
331 * consumerInstance Consumer Instance
332 * fetchTimeout Fetch Timeout
333 * fetchLimit Fetch Limit
335 * @param busTopicParams contains above listed params
336 * @throws MalformedURLException URL should be valid
338 public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
340 super(busTopicParams);
342 // super constructor sets servers = {""} if empty to avoid errors when using DME2
343 if (busTopicParams.isServersInvalid()) {
344 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
347 this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
349 props = new Properties();
351 if (busTopicParams.isUseHttps()) {
352 props.setProperty(PROTOCOL_PROP, "https");
353 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
356 props.setProperty(PROTOCOL_PROP, "http");
357 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
360 this.consumer.setProps(props);
361 logger.info("{}: CREATION", this);
365 public String toString() {
366 final MRConsumerImpl consumer = this.consumer;
368 return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
369 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
370 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
371 + consumer.getUsername() + "]";
375 public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
377 private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
379 private final Properties props;
384 * @param busTopicParams topic paramters
386 * @throws MalformedURLException must provide a valid URL
388 public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
391 super(busTopicParams);
394 final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
395 ? busTopicParams.getAdditionalProps().get(
396 PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
399 if (busTopicParams.isEnvironmentInvalid()) {
400 throw parmException(busTopicParams.getTopic(),
401 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
403 if (busTopicParams.isAftEnvironmentInvalid()) {
404 throw parmException(busTopicParams.getTopic(),
405 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
407 if (busTopicParams.isLatitudeInvalid()) {
408 throw parmException(busTopicParams.getTopic(),
409 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
411 if (busTopicParams.isLongitudeInvalid()) {
412 throw parmException(busTopicParams.getTopic(),
413 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
416 if ((busTopicParams.isPartnerInvalid())
417 && StringUtils.isBlank(dme2RouteOffer)) {
418 throw new IllegalArgumentException(
419 "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
420 + "." + busTopicParams.getTopic()
421 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
422 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
423 + busTopicParams.getTopic()
424 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
427 final String serviceName = busTopicParams.getServers().get(0);
429 this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
431 this.consumer.setUsername(busTopicParams.getUserName());
432 this.consumer.setPassword(busTopicParams.getPassword());
434 props = new Properties();
436 props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
438 props.setProperty("username", busTopicParams.getUserName());
439 props.setProperty("password", busTopicParams.getPassword());
441 /* These are required, no defaults */
442 props.setProperty("topic", busTopicParams.getTopic());
444 props.setProperty("Environment", busTopicParams.getEnvironment());
445 props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
447 if (busTopicParams.getPartner() != null) {
448 props.setProperty("Partner", busTopicParams.getPartner());
450 if (dme2RouteOffer != null) {
451 props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
454 props.setProperty("Latitude", busTopicParams.getLatitude());
455 props.setProperty("Longitude", busTopicParams.getLongitude());
457 /* These are optional, will default to these values if not set in additionalProps */
458 props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
459 props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
460 props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
461 props.setProperty("Version", "1.0");
462 props.setProperty("SubContextPath", "/");
463 props.setProperty("sessionstickinessrequired", "no");
465 /* These should not change */
466 props.setProperty("TransportType", "DME2");
467 props.setProperty("MethodType", "GET");
469 if (busTopicParams.isUseHttps()) {
470 props.setProperty(PROTOCOL_PROP, "https");
473 props.setProperty(PROTOCOL_PROP, "http");
476 props.setProperty("contenttype", "application/json");
478 if (busTopicParams.isAdditionalPropsValid()) {
479 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
480 props.put(entry.getKey(), entry.getValue());
484 MRClientFactory.prop = props;
485 this.consumer.setProps(props);
487 logger.info("{}: CREATION", this);
490 private IllegalArgumentException parmException(String topic, String propnm) {
491 return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
492 + topic + propnm + " property for DME2 in DMaaP");