code changes for platform hardening appc adapters
[appc.git] / appc-adapters / appc-dmaap-adapter / appc-dmaap-adapter-bundle / src / main / java / org / onap / appc / adapter / messaging / dmaap / http / HttpDmaapConsumerImpl.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP : APPC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2017 Amdocs
8  * =============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * 
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  * ============LICENSE_END=========================================================
23  */
24
25 package org.onap.appc.adapter.messaging.dmaap.http;
26
27 import java.net.URI;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.List;
31
32 import com.att.eelf.configuration.EELFLogger;
33 import com.att.eelf.configuration.EELFManager;
34
35 import org.apache.http.HttpEntity;
36 import org.apache.http.NameValuePair;
37 import org.apache.http.client.methods.CloseableHttpResponse;
38 import org.apache.http.client.methods.HttpGet;
39 import org.apache.http.client.utils.URIBuilder;
40 import org.apache.http.message.BasicNameValuePair;
41 import org.apache.http.util.EntityUtils;
42 import org.json.JSONArray;
43 import org.onap.appc.adapter.message.Consumer;
44
45 public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer {
46
47     private final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class);
48
49     // Default values
50     private static final int DEFAULT_TIMEOUT_MS = 15000;
51     private static final int DEFAULT_LIMIT = 1000;
52     private static final String URL_TEMPLATE = "%s/events/%s/%s/%s";
53
54     private List<String> urls;
55     private String filter;
56
57     private boolean useHttps = false;
58
59
60     public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
61                                  String filter) {
62         this(hosts, topicName, consumerName, consumerId, filter, null, null);
63     }
64
65     public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
66                                  String filter, String user, String password) {
67         urls = new ArrayList<String>();
68         for (String host : hosts) {
69             urls.add(String.format(URL_TEMPLATE, formatHostString(host), topicName, consumerName, consumerId));
70         }
71         this.filter = filter;
72         updateCredentials(user, password);
73     }
74
75     @Override
76     public void updateCredentials(String user, String pass) {
77         LOG.debug(String.format("Setting auth to %s for %s", user, this.toString()));
78         this.setBasicAuth(user, pass);
79     }
80
81     @Override
82     public List<String> fetch(int waitMs, int limit) {
83         LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
84         List<String> out = new ArrayList<>();
85         try {
86             List<NameValuePair> urlParams = new ArrayList<>();
87             urlParams.add(new BasicNameValuePair("timeout", String.valueOf(waitMs)));
88             urlParams.add(new BasicNameValuePair("limit", String.valueOf(limit)));
89             if (filter != null) {
90                 urlParams.add(new BasicNameValuePair("filter", filter));
91             }
92
93             URIBuilder builder = new URIBuilder(urls.get(0));
94             builder.setParameters(urlParams);
95
96             URI uri = builder.build();
97             LOG.info(String.format("GET %s", uri));
98             HttpGet request = getReq(uri, waitMs);
99             CloseableHttpResponse response = getClient().execute(request);
100
101             int httpStatus = response.getStatusLine().getStatusCode();
102             HttpEntity entity = response.getEntity();
103             String body = (entity != null) ? EntityUtils.toString(entity) : null;
104
105             LOG.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus,
106                 (body != null ? body.length() : "null")));
107
108             response.close();
109             if (httpStatus == 200 && body != null) {
110                 JSONArray json = new JSONArray(body);
111                 LOG.info(String.format("Got %d messages from DMaaP", json.length()));
112                 for (int i = 0; i < json.length(); i++) {
113                     out.add(json.getString(i));
114                 }
115             } else {
116                 LOG.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body));
117                 sleep(waitMs);
118             }
119         } catch (Exception e) {
120             if (urls.size() > 1) {
121                 String failedUrl = urls.remove(0);
122                 urls.add(failedUrl);
123                 LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl,
124                     urls.get(0)));
125             }
126             LOG.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e);
127             sleep(waitMs);
128         }
129
130         return out;
131     }
132
133     @Override
134     public List<String> fetch() {
135         return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT);
136     }
137
138     @Override
139     public String toString() {
140         String hostStr = (urls == null && !urls.isEmpty()) ? "N/A" : urls.get(0);
141         return String.format("Consumer listening to [%s]", hostStr);
142     }
143
144     @Override
145     public void useHttps(boolean yes) {
146         useHttps = yes;
147     }
148
149     private void sleep(int ms) {
150         LOG.info(String.format("Sleeping for %ds after failed request", ms / 1000));
151         try {
152             Thread.sleep(ms);
153         } catch (InterruptedException e1) {
154             LOG.error("Interrupted while sleeping");
155         }
156     }
157
158     @Override
159     public void close() {
160         // Nothing to do        
161     }
162
163 }