2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
 
   6  * Copyright (C) 2017 Amdocs
 
   7  * ================================================================================
 
   8  * Licensed under the Apache License, Version 2.0 (the "License");
 
   9  * you may not use this file except in compliance with the License.
 
  10  * You may obtain a copy of the License at
 
  12  *      http://www.apache.org/licenses/LICENSE-2.0
 
  14  * Unless required by applicable law or agreed to in writing, software
 
  15  * distributed under the License is distributed on an "AS IS" BASIS,
 
  16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  17  * See the License for the specific language governing permissions and
 
  18  * limitations under the License.
 
  19  * ============LICENSE_END=========================================================
 
  20  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
 
  23 package org.openecomp.appc.adapter.messaging.dmaap.http;
 
  26 import java.util.ArrayList;
 
  27 import java.util.Collection;
 
  28 import java.util.List;
 
  30 import com.att.eelf.configuration.EELFLogger;
 
  31 import com.att.eelf.configuration.EELFManager;
 
  33 import org.apache.http.HttpEntity;
 
  34 import org.apache.http.NameValuePair;
 
  35 import org.apache.http.client.methods.CloseableHttpResponse;
 
  36 import org.apache.http.client.methods.HttpGet;
 
  37 import org.apache.http.client.utils.URIBuilder;
 
  38 import org.apache.http.message.BasicNameValuePair;
 
  39 import org.apache.http.util.EntityUtils;
 
  40 import org.json.JSONArray;
 
  41 import org.openecomp.appc.adapter.message.Consumer;
 
  43 public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer {
 
  45     private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class);
 
  48     private static final int DEFAULT_TIMEOUT_MS = 15000;
 
  49     private static final int DEFAULT_LIMIT = 1000;
 
  50     private static final String HTTPS_PORT = ":3905";
 
  51     private static final String URL_TEMPLATE = "%s/events/%s/%s/%s";
 
  53     private List<String> urls;
 
  54     private String filter;
 
  56     private boolean useHttps = false;
 
  58     public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId) {
 
  59         this(hosts, topicName, consumerName, consumerId, null);
 
  62     public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
 
  64         this(hosts, topicName, consumerName, consumerId, filter, null, null);
 
  67     public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
 
  68                                  String filter, String user, String password) {
 
  69         urls = new ArrayList<String>();
 
  70         for (String host : hosts) {
 
  71             urls.add(String.format(URL_TEMPLATE, formatHostString(host), topicName, consumerName, consumerId));
 
  74         updateCredentials(user, password);
 
  78     public void updateCredentials(String user, String pass) {
 
  79         LOG.debug(String.format("Setting auth to %s for %s", user, this.toString()));
 
  80         this.setBasicAuth(user, pass);
 
  84     public List<String> fetch(int waitMs, int limit) {
 
  85         LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
 
  86         List<String> out = new ArrayList<String>();
 
  88             List<NameValuePair> urlParams = new ArrayList<NameValuePair>();
 
  89             urlParams.add(new BasicNameValuePair("timeout", String.valueOf(waitMs)));
 
  90             urlParams.add(new BasicNameValuePair("limit", String.valueOf(limit)));
 
  92                 urlParams.add(new BasicNameValuePair("filter", filter));
 
  95             URIBuilder builder = new URIBuilder(urls.get(0));
 
  96             builder.setParameters(urlParams);
 
  98             URI uri = builder.build();
 
  99             LOG.info(String.format("GET %s", uri));
 
 100             HttpGet request = getReq(uri, waitMs);
 
 101             CloseableHttpResponse response = getClient().execute(request);
 
 103             int httpStatus = response.getStatusLine().getStatusCode();
 
 104             HttpEntity entity = response.getEntity();
 
 105             String body = (entity != null) ? EntityUtils.toString(entity) : null;
 
 107             LOG.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus,
 
 108                 (body != null ? body.length() : "null")));
 
 111             if (httpStatus == 200 && body != null) {
 
 112                 JSONArray json = new JSONArray(body);
 
 113                 LOG.info(String.format("Got %d messages from DMaaP", json.length()));
 
 114                 for (int i = 0; i < json.length(); i++) {
 
 115                     out.add(json.getString(i));
 
 118                 LOG.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body));
 
 121         } catch (Exception e) {
 
 122             if (urls.size() > 1) {
 
 123                 String failedUrl = urls.remove(0);
 
 125                 LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl,
 
 128             LOG.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e);
 
 136     public List<String> fetch() {
 
 137         return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT);
 
 141     public String toString() {
 
 142         String hostStr = (urls == null || urls.isEmpty()) ? "N/A" : urls.get(0);
 
 143         return String.format("Consumer listening to [%s]", hostStr);
 
 147     public void useHttps(boolean yes) {
 
 151     private void sleep(int ms) {
 
 152         LOG.info(String.format("Sleeping for %ds after failed request", ms / 1000));
 
 155         } catch (InterruptedException e1) {
 
 156             LOG.error("Interrupted while sleeping");
 
 161         public void close() {