1 /*******************************************************************************
\r
2 * ============LICENSE_START==================================================
\r
4 * * ===========================================================================
\r
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * * ===========================================================================
\r
7 * * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * * you may not use this file except in compliance with the License.
\r
9 * * You may obtain a copy of the License at
\r
11 * * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * * Unless required by applicable law or agreed to in writing, software
\r
14 * * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * * See the License for the specific language governing permissions and
\r
17 * * limitations under the License.
\r
18 * * ============LICENSE_END====================================================
\r
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
\r
22 ******************************************************************************/
\r
25 package org.onap.dmaap.datarouter.provisioning.utils;
\r
27 import com.att.eelf.configuration.EELFLogger;
\r
28 import com.att.eelf.configuration.EELFManager;
\r
29 import java.io.IOException;
\r
30 import java.io.InputStream;
\r
31 import java.util.ArrayList;
\r
32 import java.util.HashMap;
\r
33 import java.util.List;
\r
34 import java.util.Map;
\r
35 import java.util.Timer;
\r
36 import java.util.TimerTask;
\r
37 import javax.servlet.Filter;
\r
38 import javax.servlet.FilterChain;
\r
39 import javax.servlet.FilterConfig;
\r
40 import javax.servlet.ServletException;
\r
41 import javax.servlet.ServletRequest;
\r
42 import javax.servlet.ServletResponse;
\r
43 import javax.servlet.http.HttpServletRequest;
\r
44 import javax.servlet.http.HttpServletResponse;
\r
45 import org.eclipse.jetty.continuation.Continuation;
\r
46 import org.eclipse.jetty.continuation.ContinuationSupport;
\r
47 import org.eclipse.jetty.server.HttpConnection;
\r
48 import org.eclipse.jetty.server.Request;
\r
49 import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
\r
52 * This filter checks /publish requests to the provisioning server to allow ill-behaved publishers to be throttled.
\r
53 * It is configured via the provisioning parameter THROTTLE_FILTER.
\r
54 * The THROTTLE_FILTER provisioning parameter can have these values:
\r
56 * <tr><td>(no value)</td><td>filter disabled</td></tr>
\r
57 * <tr><td>off</td><td>filter disabled</td></tr>
\r
58 * <tr><td>N[,M[,action]]</td><td>set N, M, and action (used in the algorithm below).
\r
59 * Action is <i>drop</i> or <i>throttle</i>.
\r
60 * If M is missing, it defaults to 5 minutes.
\r
61 * If the action is missing, it defaults to <i>drop</i>.
\r
65 * <p>* The <i>action</i> is triggered iff:
\r
67 * <li>the filter is enabled, and</li>
\r
68 * <li>N /publish requests come to the provisioning server in M minutes
\r
70 * <li>from the same IP address</li>
\r
71 * <li>for the same feed</li>
\r
72 * <li>lacking the <i>Expect: 100-continue</i> header</li>
\r
76 * The action that can be performed (if triggered) are:
\r
78 * <li><i>drop</i> - the connection is dropped immediately.</li>
\r
79 * <li><i>throttle</i> - [not supported] the connection is put into a low priority queue
\r
80 * with all other throttled connections.
\r
81 * These are then processed at a slower rate. Note: this option does not work correctly, and is disabled.
\r
82 * The only action that is supported is <i>drop</i>.
\r
86 * @author Robert Eby
\r
87 * @version $Id: ThrottleFilter.java,v 1.2 2014/03/12 19:45:41 eby Exp $
\r
90 public class ThrottleFilter extends TimerTask implements Filter {
\r
91 private static final int DEFAULT_N = 10;
\r
92 private static final int DEFAULT_M = 5;
\r
93 private static final String THROTTLE_MARKER = "org.onap.dmaap.datarouter.provisioning.THROTTLE_MARKER";
\r
94 private static final String JETTY_REQUEST = "org.eclipse.jetty.server.Request";
\r
95 private static final long ONE_MINUTE = 60000L;
\r
96 private static final int ACTION_DROP = 0;
\r
97 private static final int ACTION_THROTTLE = 1;
\r
100 private static boolean enabled = false; // enabled or not
\r
101 private static int numRequests = 0; // number of requests in M minutes
\r
102 private static int samplingPeriod = 0; // sampling period
\r
103 private static int action = ACTION_DROP; // action to take (throttle or drop)
\r
105 private static EELFLogger logger = EELFManager.getInstance().getLogger("InternalLog");
\r
106 private static Map<String, Counter> map = new HashMap<>();
\r
107 private static final Timer rolex = new Timer();
\r
110 public void init(FilterConfig arg0) throws ServletException {
\r
112 rolex.scheduleAtFixedRate(this, 5 * 60000L, 5 * 60000L); // Run once every 5 minutes to clean map
\r
116 * Configure the throttle. This should be called from BaseServlet.provisioningParametersChanged(),
\r
117 * to make sure it stays up to date.
\r
119 public static void configure() {
\r
120 Parameters param = Parameters.getParameter(Parameters.THROTTLE_FILTER);
\r
121 if (param != null) {
\r
123 Class.forName(JETTY_REQUEST);
\r
124 String val = param.getValue();
\r
125 if (val != null && !"off".equals(val)) {
\r
126 String[] pp = val.split(",");
\r
128 numRequests = (pp.length > 0) ? getInt(pp[0], DEFAULT_N) : DEFAULT_N;
\r
129 samplingPeriod = (pp.length > 1) ? getInt(pp[1], DEFAULT_M) : DEFAULT_M;
\r
130 action = (pp.length > 2 && pp[2] != null
\r
131 && "throttle".equalsIgnoreCase(pp[2])) ? ACTION_THROTTLE : ACTION_DROP;
\r
133 // ACTION_THROTTLE is not currently working, so is not supported
\r
134 if (action == ACTION_THROTTLE) {
\r
135 action = ACTION_DROP;
\r
136 logger.info("Throttling is not currently supported; action changed to DROP");
\r
138 logger.info("ThrottleFilter is ENABLED for /publish requests; N="
\r
139 + numRequests + ", M=" + samplingPeriod
\r
140 + ", Action=" + action);
\r
144 } catch (ClassNotFoundException e) {
\r
145 logger.warn("Class " + JETTY_REQUEST + " is not available; this filter requires Jetty.", e);
\r
148 logger.info("ThrottleFilter is DISABLED for /publish requests.");
\r
153 private static int getInt(String str, int deflt) {
\r
155 return Integer.parseInt(str);
\r
156 } catch (NumberFormatException x) {
\r
162 public void destroy() {
\r
168 public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
\r
169 throws IOException, ServletException {
\r
170 if (enabled && action == ACTION_THROTTLE) {
\r
171 throttleFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);
\r
172 } else if (enabled) {
\r
173 dropFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);
\r
175 chain.doFilter(request, response);
\r
180 * Method to drop filter chain.
\r
181 * @param request HttpServletRequest
\r
182 * @param response HttpServletResponse
\r
183 * @param chain FilterChain
\r
184 * @throws IOException input/output exception
\r
185 * @throws ServletException servle exception
\r
187 public void dropFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)
\r
188 throws IOException, ServletException {
\r
189 int rate = getRequestRate(request);
\r
190 if (rate >= numRequests) {
\r
191 // drop request - only works under Jetty
\r
192 String str = String.format("Dropping connection: %s %d bad connections in %d minutes",
\r
193 getConnectionId(request), rate,
\r
196 Request baseRequest = (request instanceof Request)
\r
197 ? (Request) request
\r
198 : HttpConnection.getCurrentConnection().getHttpChannel().getRequest();
\r
199 baseRequest.getHttpChannel().getEndPoint().close();
\r
201 chain.doFilter(request, response);
\r
205 private void throttleFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)
\r
206 throws IOException, ServletException {
\r
207 // throttle request
\r
208 String id = getConnectionId(request);
\r
209 int rate = getRequestRate(request);
\r
210 Object results = request.getAttribute(THROTTLE_MARKER);
\r
211 if (rate >= numRequests && results == null) {
\r
212 String str = String.format("Throttling connection: %s %d bad connections in %d minutes",
\r
213 getConnectionId(request), rate, samplingPeriod);
\r
215 Continuation continuation = ContinuationSupport.getContinuation(request);
\r
216 continuation.suspend();
\r
217 register(id, continuation);
\r
218 continuation.undispatch();
\r
220 chain.doFilter(request, response);
\r
221 @SuppressWarnings("resource")
\r
222 InputStream is = request.getInputStream();
\r
223 byte[] bite = new byte[4096];
\r
224 int num = is.read(bite);
\r
226 num = is.read(bite);
\r
232 private Map<String, List<Continuation>> suspendedRequests = new HashMap<>();
\r
234 private void register(String id, Continuation continuation) {
\r
235 synchronized (suspendedRequests) {
\r
236 List<Continuation> list = suspendedRequests.get(id);
\r
237 if (list == null) {
\r
238 list = new ArrayList<>();
\r
239 suspendedRequests.put(id, list);
\r
241 list.add(continuation);
\r
245 private void resume(String id) {
\r
246 synchronized (suspendedRequests) {
\r
247 List<Continuation> list = suspendedRequests.get(id);
\r
248 if (list != null) {
\r
249 // when the waited for event happens
\r
250 Continuation continuation = list.remove(0);
\r
251 continuation.setAttribute(ThrottleFilter.THROTTLE_MARKER, new Object());
\r
252 continuation.resume();
\r
258 * Return a count of number of requests in the last M minutes, iff this is a "bad" request.
\r
259 * If the request has been resumed (if it contains the THROTTLE_MARKER) it is considered good.
\r
261 * @param request the request
\r
262 * @return number of requests in the last M minutes, 0 means it is a "good" request
\r
264 private int getRequestRate(HttpServletRequest request) {
\r
265 String expecthdr = request.getHeader("Expect");
\r
266 if (expecthdr != null && "100-continue".equalsIgnoreCase(expecthdr)) {
\r
270 String key = getConnectionId(request);
\r
271 synchronized (map) {
\r
272 Counter cnt = map.get(key);
\r
274 cnt = new Counter();
\r
277 return cnt.getRequestRate();
\r
281 public class Counter {
\r
282 private List<Long> times = new ArrayList<>(); // a record of request times
\r
285 * Method to prune request rate.
\r
288 public int prune() {
\r
290 long num = System.currentTimeMillis() - (samplingPeriod * ONE_MINUTE);
\r
291 long time = times.get(0);
\r
292 while (time < num) {
\r
294 time = times.get(0);
\r
296 } catch (IndexOutOfBoundsException e) {
\r
297 logger.trace("Exception: " + e.getMessage(), e);
\r
299 return times.size();
\r
302 public int getRequestRate() {
\r
303 times.add(System.currentTimeMillis());
\r
309 * Identify a connection by endpoint IP address, and feed ID.
\r
311 private String getConnectionId(HttpServletRequest req) {
\r
312 return req.getRemoteAddr() + "/" + getFeedId(req);
\r
315 private int getFeedId(HttpServletRequest req) {
\r
316 String path = req.getPathInfo();
\r
317 if (path == null || path.length() < 2) {
\r
320 path = path.substring(1);
\r
321 int ix = path.indexOf('/');
\r
322 if (ix < 0 || ix == path.length() - 1) {
\r
326 return Integer.parseInt(path.substring(0, ix));
\r
327 } catch (NumberFormatException e) {
\r
333 public void run() {
\r
334 // Once every 5 minutes, go through the map, and remove empty entrys
\r
335 for (Object s : map.keySet().toArray()) {
\r
336 synchronized (map) {
\r
337 Counter counter = map.get(s);
\r
338 if (counter.prune() <= 0) {
\r