* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
+ *
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
* ============LICENSE_END=========================================================
*/
package org.onap.appc.adapter.messaging.dmaap.http;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
import org.apache.http.HttpEntity;
import org.apache.http.NameValuePair;
import org.apache.http.client.methods.CloseableHttpResponse;
// Default values
private static final int DEFAULT_TIMEOUT_MS = 15000;
private static final int DEFAULT_LIMIT = 1000;
- private static final String HTTPS_PORT = ":3905";
private static final String URL_TEMPLATE = "%s/events/%s/%s/%s";
private List<String> urls;
private String filter;
- private boolean useHttps = false;
-
- public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId) {
- this(hosts, topicName, consumerName, consumerId, null);
- }
-
public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
String filter) {
+
this(hosts, topicName, consumerName, consumerId, filter, null, null);
}
public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
String filter, String user, String password) {
- urls = new ArrayList<String>();
+
+ urls = new ArrayList<>();
for (String host : hosts) {
urls.add(String.format(URL_TEMPLATE, formatHostString(host), topicName, consumerName, consumerId));
}
@Override
public List<String> fetch(int waitMs, int limit) {
LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
- List<String> out = new ArrayList<String>();
+ List<String> out = new ArrayList<>();
try {
- List<NameValuePair> urlParams = new ArrayList<NameValuePair>();
+ List<NameValuePair> urlParams = new ArrayList<>();
urlParams.add(new BasicNameValuePair("timeout", String.valueOf(waitMs)));
urlParams.add(new BasicNameValuePair("limit", String.valueOf(limit)));
if (filter != null) {
int httpStatus = response.getStatusLine().getStatusCode();
HttpEntity entity = response.getEntity();
- String body = (entity != null) ? EntityUtils.toString(entity) : null;
+ String body = (entity != null) ? entityToString(entity) : null;
LOG.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus,
- (body != null ? body.length() : "null")));
+ body != null ? body.length() : "null"));
response.close();
if (httpStatus == 200 && body != null) {
LOG.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e);
sleep(waitMs);
}
-
return out;
}
return String.format("Consumer listening to [%s]", hostStr);
}
- @Override
- public void useHttps(boolean yes) {
- useHttps = yes;
+ String entityToString(HttpEntity entity) throws IOException {
+ return EntityUtils.toString(entity);
}
private void sleep(int ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e1) {
- LOG.error("Interrupted while sleeping");
+ LOG.error("Interrupted while sleeping", e1);
+ Thread.currentThread().interrupt();
}
}
- @Override
- public void close() {
- // Nothing to do
- }
-
}