Change nexus values to properties
[appc.git] / appc-adapters / appc-dmaap-adapter / appc-dmaap-adapter-bundle / src / main / java / org / openecomp / appc / adapter / dmaap / DmaapConsumer.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * openECOMP : APP-C
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights
6  *                                              reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.openecomp.appc.adapter.dmaap;
23
24 import java.net.URI;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.List;
28
29
30 import com.att.eelf.configuration.EELFLogger;
31 import com.att.eelf.configuration.EELFManager;
32
33 import org.apache.http.HttpEntity;
34 import org.apache.http.NameValuePair;
35 import org.apache.http.client.methods.CloseableHttpResponse;
36 import org.apache.http.client.methods.HttpGet;
37 import org.apache.http.client.utils.URIBuilder;
38 import org.apache.http.message.BasicNameValuePair;
39 import org.apache.http.util.EntityUtils;
40 import org.json.JSONArray;
41
42 public class DmaapConsumer extends CommonHttpClient implements Consumer {
43
44     private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumer.class);
45
46     // Default values
47     private static final int DEFAULT_TIMEOUT_MS = 15000;
48     private static final int DEFAULT_LIMIT = 1000;
49     private static final String HTTPS_PORT = ":3905";
50     private static final String URL_TEMPLATE = "%s/events/%s/%s/%s";
51
52     private List<String> urls;
53     private String filter;
54
55     private boolean useHttps = false;
56
57     public DmaapConsumer(Collection<String> hosts, String topicName, String consumerName, String consumerId) {
58         this(hosts, topicName, consumerName, consumerId, null);
59     }
60
61     public DmaapConsumer(Collection<String> hosts, String topicName, String consumerName, String consumerId,
62                         String filter) {
63         this(hosts, topicName, consumerName, consumerId, filter, null, null);
64     }
65
66     public DmaapConsumer(Collection<String> hosts, String topicName, String consumerName, String consumerId,
67                         String filter, String user, String password) {
68         urls = new ArrayList<String>();
69         for (String host : hosts) {
70             urls.add(String.format(URL_TEMPLATE, formatHostString(host), topicName, consumerName, consumerId));
71         }
72         this.filter = filter;
73         updateCredentials(user, password);
74     }
75
76     @Override
77     public void updateCredentials(String user, String pass) {
78         LOG.debug(String.format("Setting auth to %s for %s", user, this.toString()));
79         this.setBasicAuth(user, pass);
80     }
81
82     @Override
83     public List<String> fetch(int waitMs, int limit) {
84         LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
85         List<String> out = new ArrayList<String>();
86         try {
87             List<NameValuePair> urlParams = new ArrayList<NameValuePair>();
88             urlParams.add(new BasicNameValuePair("timeout", String.valueOf(waitMs)));
89             urlParams.add(new BasicNameValuePair("limit", String.valueOf(limit)));
90             if (filter != null) {
91                 urlParams.add(new BasicNameValuePair("filter", filter));
92             }
93
94             URIBuilder builder = new URIBuilder(urls.get(0));
95             builder.setParameters(urlParams);
96
97             URI uri = builder.build();
98             LOG.info(String.format("GET %s", uri));
99             HttpGet request = getReq(uri, waitMs);
100             CloseableHttpResponse response = getClient().execute(request);
101
102             int httpStatus = response.getStatusLine().getStatusCode();
103             HttpEntity entity = response.getEntity();
104             String body = (entity != null) ? EntityUtils.toString(entity) : null;
105
106             LOG.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus,
107                 (body != null ? body.length() : "null")));
108
109             response.close();
110             if (httpStatus == 200 && body != null) {
111                 JSONArray json = new JSONArray(body);
112                 LOG.info(String.format("Got %d messages from DMaaP", json.length()));
113                 for (int i = 0; i < json.length(); i++) {
114                     out.add(json.getString(i));
115                 }
116             } else {
117                 LOG.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body));
118                 sleep(waitMs);
119             }
120         } catch (Exception e) {
121             if (urls.size() > 1) {
122                 String failedUrl = urls.remove(0);
123                 urls.add(failedUrl);
124                 LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl,
125                     urls.get(0)));
126             }
127             LOG.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e);
128             sleep(waitMs);
129         }
130
131         return out;
132     }
133
134     @Override
135     public List<String> fetch() {
136         return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT);
137     }
138
139     @Override
140     public String toString() {
141         String hostStr = (urls == null && !urls.isEmpty()) ? "N/A" : urls.get(0);
142         return String.format("Consumer listening to [%s]", hostStr);
143     }
144
145     @Override
146     public void useHttps(boolean yes) {
147         useHttps = yes;
148     }
149
150     private void sleep(int ms) {
151         LOG.info(String.format("Sleeping for %ds after failed request", ms / 1000));
152         try {
153             Thread.sleep(ms);
154         } catch (InterruptedException e1) {
155             LOG.error("Interrupted while sleeping");
156         }
157     }
158     
159     public void close(){
160         //not used yet
161     }
162
163 }