2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
24 import com.att.nsa.cambria.client.CambriaClientBuilders;
25 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
26 import com.att.nsa.cambria.client.CambriaConsumer;
27 import java.io.IOException;
28 import java.net.MalformedURLException;
29 import java.security.GeneralSecurityException;
30 import java.util.ArrayList;
32 import java.util.Properties;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.TimeUnit;
35 import org.apache.commons.lang3.StringUtils;
36 import org.onap.dmaap.mr.client.MRClientFactory;
37 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
38 import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
39 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
40 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
41 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
46 * Wrapper around libraries to consume from message bus.
48 public interface BusConsumer {
53 * @return list of messages
54 * @throws IOException when error encountered by underlying libraries
56 public Iterable<String> fetch() throws IOException;
59 * close underlying library consumer.
64 * Cambria based consumer.
66 public static class CambriaConsumerWrapper implements BusConsumer {
71 private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
74 * Used to build the consumer.
76 private final ConsumerBuilder builder;
81 private final CambriaConsumer consumer;
86 protected int fetchTimeout;
91 protected CountDownLatch closeCondition = new CountDownLatch(1);
94 * Cambria Consumer Wrapper.
95 * BusTopicParam object contains the following parameters
96 * servers messaging bus hosts.
99 * apiSecret API Secret
100 * consumerGroup Consumer Group
101 * consumerInstance Consumer Instance
102 * fetchTimeout Fetch Timeout
103 * fetchLimit Fetch Limit
105 * @param busTopicParams - The parameters for the bus topic
106 * @throws GeneralSecurityException - Security exception
107 * @throws MalformedURLException - Malformed URL exception
109 public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
111 this.fetchTimeout = busTopicParams.getFetchTimeout();
113 this.builder = new CambriaClientBuilders.ConsumerBuilder();
115 builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
116 .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
117 .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
119 // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
120 builder.withSocketTimeout(fetchTimeout + 30000);
122 if (busTopicParams.isUseHttps()) {
123 builder.usingHttps();
125 if (busTopicParams.isAllowSelfSignedCerts()) {
126 builder.allowSelfSignedCertificates();
130 if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
131 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
134 if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
135 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
139 this.consumer = builder.build();
140 } catch (MalformedURLException | GeneralSecurityException e) {
141 throw new IllegalArgumentException(e);
146 public Iterable<String> fetch() throws IOException {
148 return this.consumer.fetch();
149 } catch (final IOException e) { //NOSONAR
150 logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
152 sleepAfterFetchFailure();
157 private void sleepAfterFetchFailure() {
159 this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR
161 } catch (InterruptedException e) {
162 logger.warn("{}: interrupted while handling fetch error", this, e);
163 Thread.currentThread().interrupt();
168 public void close() {
169 this.closeCondition.countDown();
170 this.consumer.close();
174 public String toString() {
175 return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
182 public abstract class DmaapConsumerWrapper implements BusConsumer {
187 private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
190 * Name of the "protocol" property.
192 protected static final String PROTOCOL_PROP = "Protocol";
197 protected int fetchTimeout;
202 protected CountDownLatch closeCondition = new CountDownLatch(1);
207 protected MRConsumerImpl consumer;
210 * MR Consumer Wrapper.
212 * <p>servers messaging bus hosts
215 * apiSecret API Secret
217 * password AAF Password
218 * consumerGroup Consumer Group
219 * consumerInstance Consumer Instance
220 * fetchTimeout Fetch Timeout
221 * fetchLimit Fetch Limit
223 * @param busTopicParams contains above listed attributes
224 * @throws MalformedURLException URL should be valid
226 public DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
228 this.fetchTimeout = busTopicParams.getFetchTimeout();
230 if (busTopicParams.isTopicInvalid()) {
231 throw new IllegalArgumentException("No topic for DMaaP");
234 this.consumer = new MRConsumerImplBuilder()
235 .setHostPart(busTopicParams.getServers())
236 .setTopic(busTopicParams.getTopic())
237 .setConsumerGroup(busTopicParams.getConsumerGroup())
238 .setConsumerId(busTopicParams.getConsumerInstance())
239 .setTimeoutMs(busTopicParams.getFetchTimeout())
240 .setLimit(busTopicParams.getFetchLimit())
241 .setApiKey(busTopicParams.getApiKey())
242 .setApiSecret(busTopicParams.getApiSecret())
243 .createMRConsumerImpl();
245 this.consumer.setUsername(busTopicParams.getUserName());
246 this.consumer.setPassword(busTopicParams.getPassword());
250 public Iterable<String> fetch() throws IOException {
251 final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
252 if (response == null) {
253 logger.warn("{}: DMaaP NULL response received", this);
255 sleepAfterFetchFailure();
256 return new ArrayList<>();
258 logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
259 response.getResponseMessage());
261 if (!"200".equals(response.getResponseCode())) {
263 logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
264 response.getResponseMessage());
266 sleepAfterFetchFailure();
272 if (response.getActualMessages() == null) {
273 return new ArrayList<>();
275 return response.getActualMessages();
279 private void sleepAfterFetchFailure() {
281 this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR
283 } catch (InterruptedException e) {
284 logger.warn("{}: interrupted while handling fetch error", this, e);
285 Thread.currentThread().interrupt();
290 public void close() {
291 this.closeCondition.countDown();
292 this.consumer.close();
296 public String toString() {
297 return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
298 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
299 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
300 + consumer.getUsername() + "]";
307 public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
309 private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
311 private final Properties props;
314 * BusTopicParams contain the following parameters.
315 * MR Consumer Wrapper.
317 * <p>servers messaging bus hosts
320 * apiSecret API Secret
322 * aafPassword AAF Password
323 * consumerGroup Consumer Group
324 * consumerInstance Consumer Instance
325 * fetchTimeout Fetch Timeout
326 * fetchLimit Fetch Limit
328 * @param busTopicParams contains above listed params
329 * @throws MalformedURLException URL should be valid
331 public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
333 super(busTopicParams);
335 // super constructor sets servers = {""} if empty to avoid errors when using DME2
336 if (busTopicParams.isServersInvalid()) {
337 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
340 this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
342 props = new Properties();
344 if (busTopicParams.isUseHttps()) {
345 props.setProperty(PROTOCOL_PROP, "https");
346 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
349 props.setProperty(PROTOCOL_PROP, "http");
350 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
353 this.consumer.setProps(props);
354 logger.info("{}: CREATION", this);
358 public String toString() {
359 final MRConsumerImpl consumer = this.consumer;
361 return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
362 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
363 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
364 + consumer.getUsername() + "]";
368 public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
370 private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
372 private final Properties props;
377 * @param busTopicParams topic paramters
379 * @throws MalformedURLException must provide a valid URL
381 public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
384 super(busTopicParams);
387 final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
388 ? busTopicParams.getAdditionalProps().get(
389 PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
392 if (busTopicParams.isEnvironmentInvalid()) {
393 throw parmException(busTopicParams.getTopic(),
394 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
396 if (busTopicParams.isAftEnvironmentInvalid()) {
397 throw parmException(busTopicParams.getTopic(),
398 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
400 if (busTopicParams.isLatitudeInvalid()) {
401 throw parmException(busTopicParams.getTopic(),
402 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
404 if (busTopicParams.isLongitudeInvalid()) {
405 throw parmException(busTopicParams.getTopic(),
406 PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
409 if ((busTopicParams.isPartnerInvalid())
410 && StringUtils.isBlank(dme2RouteOffer)) {
411 throw new IllegalArgumentException(
412 "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
413 + "." + busTopicParams.getTopic()
414 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
415 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
416 + busTopicParams.getTopic()
417 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
420 final String serviceName = busTopicParams.getServers().get(0);
422 this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
424 this.consumer.setUsername(busTopicParams.getUserName());
425 this.consumer.setPassword(busTopicParams.getPassword());
427 props = new Properties();
429 props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
431 props.setProperty("username", busTopicParams.getUserName());
432 props.setProperty("password", busTopicParams.getPassword());
434 /* These are required, no defaults */
435 props.setProperty("topic", busTopicParams.getTopic());
437 props.setProperty("Environment", busTopicParams.getEnvironment());
438 props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
440 if (busTopicParams.getPartner() != null) {
441 props.setProperty("Partner", busTopicParams.getPartner());
443 if (dme2RouteOffer != null) {
444 props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
447 props.setProperty("Latitude", busTopicParams.getLatitude());
448 props.setProperty("Longitude", busTopicParams.getLongitude());
450 /* These are optional, will default to these values if not set in additionalProps */
451 props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
452 props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
453 props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
454 props.setProperty("Version", "1.0");
455 props.setProperty("SubContextPath", "/");
456 props.setProperty("sessionstickinessrequired", "no");
458 /* These should not change */
459 props.setProperty("TransportType", "DME2");
460 props.setProperty("MethodType", "GET");
462 if (busTopicParams.isUseHttps()) {
463 props.setProperty(PROTOCOL_PROP, "https");
466 props.setProperty(PROTOCOL_PROP, "http");
469 props.setProperty("contenttype", "application/json");
471 if (busTopicParams.isAdditionalPropsValid()) {
472 for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
473 props.put(entry.getKey(), entry.getValue());
477 MRClientFactory.prop = props;
478 this.consumer.setProps(props);
480 logger.info("{}: CREATION", this);
483 private IllegalArgumentException parmException(String topic, String propnm) {
484 return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
485 + topic + propnm + " property for DME2 in DMaaP");