2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus.internal.impl;
23 import com.att.nsa.mr.client.impl.MRConsumerImpl;
24 import com.att.nsa.mr.client.response.MRConsumerResponse;
26 import java.io.IOException;
27 import java.net.MalformedURLException;
28 import java.util.ArrayList;
29 import java.util.List;
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
38 public abstract class DmaapConsumerWrapper implements BusConsumer {
43 private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
46 * Name of the "protocol" property.
48 protected static final String PROTOCOL_PROP = "Protocol";
53 protected int fetchTimeout;
58 protected Object closeCondition = new Object();
63 protected MRConsumerImpl consumer;
68 * @param servers messaging bus hosts
70 * @param apiKey API Key
71 * @param apiSecret API Secret
72 * @param username AAF Login
73 * @param password AAF Password
74 * @param consumerGroup Consumer Group
75 * @param consumerInstance Consumer Instance
76 * @param fetchTimeout Fetch Timeout
77 * @param fetchLimit Fetch Limit
78 * @throws MalformedURLException
80 public DmaapConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, String username,
81 String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit)
82 throws MalformedURLException {
84 this.fetchTimeout = fetchTimeout;
86 if (topic == null || topic.isEmpty()) {
87 throw new IllegalArgumentException("No topic for DMaaP");
90 this.consumer = new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance, fetchTimeout, fetchLimit,
91 null, apiKey, apiSecret);
93 this.consumer.setUsername(username);
94 this.consumer.setPassword(password);
98 public Iterable<String> fetch() throws InterruptedException, IOException {
99 final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
100 if (response == null) {
101 logger.warn("{}: DMaaP NULL response received", this);
103 synchronized (closeCondition) {
104 closeCondition.wait(fetchTimeout);
106 return new ArrayList<>();
108 logger.debug("DMaaP consumer received {} : {}" + response.getResponseCode(), response.getResponseMessage());
110 if (response.getResponseCode() == null || !"200".equals(response.getResponseCode())) {
112 logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
113 response.getResponseMessage());
115 synchronized (closeCondition) {
116 closeCondition.wait(fetchTimeout);
123 if (response.getActualMessages() == null) {
124 return new ArrayList<>();
126 return response.getActualMessages();
131 public void close() {
132 synchronized (closeCondition) {
133 closeCondition.notifyAll();
136 this.consumer.close();
140 public String toString() {
141 return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
142 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
143 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
144 + consumer.getUsername() + "]";