2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Copyright (C) 2017 Amdocs
8 * =============================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 * ============LICENSE_END=========================================================
25 package org.onap.appc.adapter.messaging.dmaap.http;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.List;
32 import com.att.eelf.configuration.EELFLogger;
33 import com.att.eelf.configuration.EELFManager;
35 import org.apache.http.HttpEntity;
36 import org.apache.http.NameValuePair;
37 import org.apache.http.client.methods.CloseableHttpResponse;
38 import org.apache.http.client.methods.HttpGet;
39 import org.apache.http.client.utils.URIBuilder;
40 import org.apache.http.message.BasicNameValuePair;
41 import org.apache.http.util.EntityUtils;
42 import org.json.JSONArray;
43 import org.onap.appc.adapter.message.Consumer;
45 public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer {
47 private final EELFLogger logger = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class);
50 private static final int DEFAULT_TIMEOUT_MS = 15000;
51 private static final int DEFAULT_LIMIT = 1000;
52 private static final String URL_TEMPLATE = "%s/events/%s/%s/%s";
54 private List<String> urls;
55 private String filter;
58 public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
60 this(hosts, topicName, consumerName, consumerId, filter, null, null);
63 public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
64 String filter, String user, String password) {
65 urls = new ArrayList<>();
66 for (String host : hosts) {
67 urls.add(String.format(URL_TEMPLATE, formatHostString(host), topicName, consumerName, consumerId));
70 updateCredentials(user, password);
74 public void updateCredentials(String user, String pass) {
75 logger.debug(String.format("Setting auth to %s for %s", user, this.toString()));
76 this.setBasicAuth(user, pass);
80 public List<String> fetch(int waitMs, int limit) {
81 logger.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
82 List<String> out = new ArrayList<>();
84 List<NameValuePair> urlParams = new ArrayList<>();
85 urlParams.add(new BasicNameValuePair("timeout", String.valueOf(waitMs)));
86 urlParams.add(new BasicNameValuePair("limit", String.valueOf(limit)));
88 urlParams.add(new BasicNameValuePair("filter", filter));
91 URIBuilder builder = new URIBuilder(urls.get(0));
92 builder.setParameters(urlParams);
94 URI uri = builder.build();
95 logger.info(String.format("GET %s", uri));
96 HttpGet request = getReq(uri, waitMs);
97 CloseableHttpResponse response = getClient().execute(request);
99 int httpStatus = response.getStatusLine().getStatusCode();
100 HttpEntity entity = response.getEntity();
101 String body = (entity != null) ? EntityUtils.toString(entity) : null;
103 logger.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus,
104 body != null ? body.length() : "null"));
107 if (httpStatus == 200 && body != null) {
108 JSONArray json = new JSONArray(body);
109 logger.info(String.format("Got %d messages from DMaaP", json.length()));
110 for (int i = 0; i < json.length(); i++) {
111 out.add(json.getString(i));
114 logger.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body));
117 } catch (Exception e) {
118 if (urls.size() > 1) {
119 String failedUrl = urls.remove(0);
121 logger.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl,
124 logger.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e);
132 public List<String> fetch() {
133 return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT);
137 public String toString() {
138 String hostStr = (urls == null || urls.isEmpty()) ? "N/A" : urls.get(0);
139 return String.format("Consumer listening to [%s]", hostStr);
143 public void useHttps(boolean yes) {
146 private void sleep(int ms) {
147 logger.info(String.format("Sleeping for %ds after failed request", ms / 1000));
150 } catch (InterruptedException e1) {
151 logger.error("Interrupted while sleeping", e1);
152 Thread.currentThread().interrupt();
157 public void close() {