2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7 * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.common.endpoints.event.comm.bus.internal;
25 import com.att.nsa.cambria.client.CambriaClientBuilders;
26 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
27 import com.att.nsa.cambria.client.CambriaConsumer;
28 import java.io.IOException;
29 import java.net.MalformedURLException;
30 import java.security.GeneralSecurityException;
31 import java.util.ArrayList;
33 import java.util.Properties;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit;
36 import org.apache.commons.lang3.StringUtils;
37 import org.onap.dmaap.mr.client.MRClientFactory;
38 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
39 import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
40 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
41 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
42 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
47 * Wrapper around libraries to consume from message bus.
49 public interface BusConsumer {
54 * @return list of messages
55 * @throws IOException when error encountered by underlying libraries
57 public Iterable<String> fetch() throws IOException;
60 * close underlying library consumer.
65 * Cambria based consumer.
67 public static class CambriaConsumerWrapper implements BusConsumer {
72 private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
75 * Used to build the consumer.
77 private final ConsumerBuilder builder;
82 private final CambriaConsumer consumer;
87 protected int fetchTimeout;
92 protected CountDownLatch closeCondition = new CountDownLatch(1);
95 * Cambria Consumer Wrapper.
96 * BusTopicParam object contains the following parameters
97 * servers messaging bus hosts.
100 * apiSecret API Secret
101 * consumerGroup Consumer Group
102 * consumerInstance Consumer Instance
103 * fetchTimeout Fetch Timeout
104 * fetchLimit Fetch Limit
106 * @param busTopicParams - The parameters for the bus topic
107 * @throws GeneralSecurityException - Security exception
108 * @throws MalformedURLException - Malformed URL exception
110 public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
112 this.fetchTimeout = busTopicParams.getFetchTimeout();
114 this.builder = new CambriaClientBuilders.ConsumerBuilder();
116 builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
117 .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
118 .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
120 // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
121 builder.withSocketTimeout(fetchTimeout + 30000);
123 if (busTopicParams.isUseHttps()) {
124 builder.usingHttps();
126 if (busTopicParams.isAllowSelfSignedCerts()) {
127 builder.allowSelfSignedCertificates();
131 if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
132 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
135 if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
136 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
140 this.consumer = builder.build();
141 } catch (MalformedURLException | GeneralSecurityException e) {
142 throw new IllegalArgumentException(e);
147 public Iterable<String> fetch() throws IOException {
149 return this.consumer.fetch();
150 } catch (final IOException e) { //NOSONAR
151 logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
153 sleepAfterFetchFailure();
158 private void sleepAfterFetchFailure() {
160 this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR
162 } catch (InterruptedException e) {
163 logger.warn("{}: interrupted while handling fetch error", this, e);
164 Thread.currentThread().interrupt();
169 public void close() {
170 this.closeCondition.countDown();
171 this.consumer.close();
175 public String toString() {
176 return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
183 public abstract class DmaapConsumerWrapper implements BusConsumer {
188 private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
191 * Name of the "protocol" property.
193 protected static final String PROTOCOL_PROP = "Protocol";
198 protected int fetchTimeout;
203 protected CountDownLatch closeCondition = new CountDownLatch(1);
208 protected MRConsumerImpl consumer;
211 * MR Consumer Wrapper.
213 * <p>servers messaging bus hosts
216 * apiSecret API Secret
218 * password AAF Password
219 * consumerGroup Consumer Group
220 * consumerInstance Consumer Instance
221 * fetchTimeout Fetch Timeout
222 * fetchLimit Fetch Limit
224 * @param busTopicParams contains above listed attributes
225 * @throws MalformedURLException URL should be valid
227 protected DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
229 this.fetchTimeout = busTopicParams.getFetchTimeout();
231 if (busTopicParams.isTopicInvalid()) {
232 throw new IllegalArgumentException("No topic for DMaaP");
235 this.consumer = new MRConsumerImplBuilder()
236 .setHostPart(busTopicParams.getServers())
237 .setTopic(busTopicParams.getTopic())
238 .setConsumerGroup(busTopicParams.getConsumerGroup())
239 .setConsumerId(busTopicParams.getConsumerInstance())
240 .setTimeoutMs(busTopicParams.getFetchTimeout())
241 .setLimit(busTopicParams.getFetchLimit())
242 .setApiKey(busTopicParams.getApiKey())
243 .setApiSecret(busTopicParams.getApiSecret())
244 .createMRConsumerImpl();
246 this.consumer.setUsername(busTopicParams.getUserName());
247 this.consumer.setPassword(busTopicParams.getPassword());
251 public Iterable<String> fetch() throws IOException {
252 final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
253 if (response == null) {
254 logger.warn("{}: DMaaP NULL response received", this);
256 sleepAfterFetchFailure();
257 return new ArrayList<>();
259 logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
260 response.getResponseMessage());
262 if (!"200".equals(response.getResponseCode())) {
264 logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
265 response.getResponseMessage());
267 sleepAfterFetchFailure();
273 if (response.getActualMessages() == null) {
274 return new ArrayList<>();
276 return response.getActualMessages();
280 private void sleepAfterFetchFailure() {
282 this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR
284 } catch (InterruptedException e) {
285 logger.warn("{}: interrupted while handling fetch error", this, e);
286 Thread.currentThread().interrupt();
291 public void close() {
292 this.closeCondition.countDown();
293 this.consumer.close();
297 public String toString() {
298 return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
299 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
300 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
301 + consumer.getUsername() + "]";
308 public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
310 private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
312 private final Properties props;
315 * BusTopicParams contain the following parameters.
316 * MR Consumer Wrapper.
318 * <p>servers messaging bus hosts
321 * apiSecret API Secret
323 * aafPassword AAF Password
324 * consumerGroup Consumer Group
325 * consumerInstance Consumer Instance
326 * fetchTimeout Fetch Timeout
327 * fetchLimit Fetch Limit
329 * @param busTopicParams contains above listed params
330 * @throws MalformedURLException URL should be valid
332 public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
334 super(busTopicParams);
336 // super constructor sets servers = {""} if empty to avoid errors when using DME2
337 if (busTopicParams.isServersInvalid()) {
338 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
341 this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
343 props = new Properties();
345 if (busTopicParams.isUseHttps()) {
346 props.setProperty(PROTOCOL_PROP, "https");
347 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
350 props.setProperty(PROTOCOL_PROP, "http");
351 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
354 this.consumer.setProps(props);
355 logger.info("{}: CREATION", this);
359 public String toString() {
360 final MRConsumerImpl consumer = this.consumer;
362 return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
363 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
364 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
365 + consumer.getUsername() + "]";
369 public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
371 private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
373 private final Properties props;
378 * @param busTopicParams topic paramters
380 * @throws MalformedURLException must provide a valid URL
382 public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
385 super(busTopicParams);
388 final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
389 ? busTopicParams.getAdditionalProps().get(
390 PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
393 if (busTopicParams.isEnvironmentInvalid()) {
394 throw parmException(busTopicParams.getTopic(),
395 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
397 if (busTopicParams.isAftEnvironmentInvalid()) {
398 throw parmException(busTopicParams.getTopic(),
399 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
401 if (busTopicParams.isLatitudeInvalid()) {
402 throw parmException(busTopicParams.getTopic(),
403 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
405 if (busTopicParams.isLongitudeInvalid()) {
406 throw parmException(busTopicParams.getTopic(),
407 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
410 if ((busTopicParams.isPartnerInvalid())
411 && StringUtils.isBlank(dme2RouteOffer)) {
412 throw new IllegalArgumentException(
413 "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
414 + "." + busTopicParams.getTopic()
415 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
416 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
417 + busTopicParams.getTopic()
418 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
421 final String serviceName = busTopicParams.getServers().get(0);
423 this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
425 this.consumer.setUsername(busTopicParams.getUserName());
426 this.consumer.setPassword(busTopicParams.getPassword());
428 props = new Properties();
430 props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
432 props.setProperty("username", busTopicParams.getUserName());
433 props.setProperty("password", busTopicParams.getPassword());
435 /* These are required, no defaults */
436 props.setProperty("topic", busTopicParams.getTopic());
438 props.setProperty("Environment", busTopicParams.getEnvironment());
439 props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
441 if (busTopicParams.getPartner() != null) {
442 props.setProperty("Partner", busTopicParams.getPartner());
444 if (dme2RouteOffer != null) {
445 props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
448 props.setProperty("Latitude", busTopicParams.getLatitude());
449 props.setProperty("Longitude", busTopicParams.getLongitude());
451 /* These are optional, will default to these values if not set in additionalProps */
452 props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
453 props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
454 props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
455 props.setProperty("Version", "1.0");
456 props.setProperty("SubContextPath", "/");
457 props.setProperty("sessionstickinessrequired", "no");
459 /* These should not change */
460 props.setProperty("TransportType", "DME2");
461 props.setProperty("MethodType", "GET");
463 if (busTopicParams.isUseHttps()) {
464 props.setProperty(PROTOCOL_PROP, "https");
467 props.setProperty(PROTOCOL_PROP, "http");
470 props.setProperty("contenttype", "application/json");
472 if (busTopicParams.isAdditionalPropsValid()) {
473 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
474 props.put(entry.getKey(), entry.getValue());
478 MRClientFactory.prop = props;
479 this.consumer.setProps(props);
481 logger.info("{}: CREATION", this);
484 private IllegalArgumentException parmException(String topic, String propnm) {
485 return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
486 + topic + propnm + " property for DME2 in DMaaP");