1 /*******************************************************************************
\r
2 * ============LICENSE_START==================================================
\r
4 * * ===========================================================================
\r
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * * ===========================================================================
\r
7 * * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * * you may not use this file except in compliance with the License.
\r
9 * * You may obtain a copy of the License at
\r
11 * * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * * Unless required by applicable law or agreed to in writing, software
\r
14 * * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * * See the License for the specific language governing permissions and
\r
17 * * limitations under the License.
\r
18 * * ============LICENSE_END====================================================
\r
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
\r
22 ******************************************************************************/
\r
25 package org.onap.dmaap.datarouter.provisioning.utils;
\r
27 import java.io.IOException;
\r
28 import java.io.InputStream;
\r
29 import java.util.ArrayList;
\r
30 import java.util.HashMap;
\r
31 import java.util.List;
\r
32 import java.util.Map;
\r
33 import java.util.Timer;
\r
34 import java.util.TimerTask;
\r
35 import java.util.Vector;
\r
37 import javax.servlet.Filter;
\r
38 import javax.servlet.FilterChain;
\r
39 import javax.servlet.FilterConfig;
\r
40 import javax.servlet.ServletException;
\r
41 import javax.servlet.ServletRequest;
\r
42 import javax.servlet.ServletResponse;
\r
43 import javax.servlet.http.HttpServletRequest;
\r
44 import javax.servlet.http.HttpServletResponse;
\r
46 import org.apache.log4j.Logger;
\r
47 import org.eclipse.jetty.continuation.Continuation;
\r
48 import org.eclipse.jetty.continuation.ContinuationSupport;
\r
49 import org.eclipse.jetty.server.*;
\r
50 import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
\r
53 * This filter checks /publish requests to the provisioning server to allow ill-behaved publishers to be throttled.
\r
54 * It is configured via the provisioning parameter THROTTLE_FILTER.
\r
55 * The THROTTLE_FILTER provisioning parameter can have these values:
\r
57 * <tr><td>(no value)</td><td>filter disabled</td></tr>
\r
58 * <tr><td>off</td><td>filter disabled</td></tr>
\r
59 * <tr><td>N[,M[,action]]</td><td>set N, M, and action (used in the algorithm below).
\r
60 * Action is <i>drop</i> or <i>throttle</i>.
\r
61 * If M is missing, it defaults to 5 minutes.
\r
62 * If the action is missing, it defaults to <i>drop</i>.
\r
66 * The <i>action</i> is triggered iff:
\r
68 * <li>the filter is enabled, and</li>
\r
69 * <li>N /publish requests come to the provisioning server in M minutes
\r
71 * <li>from the same IP address</li>
\r
72 * <li>for the same feed</li>
\r
73 * <li>lacking the <i>Expect: 100-continue</i> header</li>
\r
77 * The action that can be performed (if triggered) are:
\r
79 * <li><i>drop</i> - the connection is dropped immediately.</li>
\r
80 * <li><i>throttle</i> - [not supported] the connection is put into a low priority queue with all other throttled connections.
\r
81 * These are then processed at a slower rate. Note: this option does not work correctly, and is disabled.
\r
82 * The only action that is supported is <i>drop</i>.
\r
86 * @author Robert Eby
\r
87 * @version $Id: ThrottleFilter.java,v 1.2 2014/03/12 19:45:41 eby Exp $
\r
89 public class ThrottleFilter extends TimerTask implements Filter {
\r
90 public static final int DEFAULT_N = 10;
\r
91 public static final int DEFAULT_M = 5;
\r
92 public static final String THROTTLE_MARKER = "org.onap.dmaap.datarouter.provisioning.THROTTLE_MARKER";
\r
93 private static final String JETTY_REQUEST = "org.eclipse.jetty.server.Request";
\r
94 private static final long ONE_MINUTE = 60000L;
\r
95 private static final int ACTION_DROP = 0;
\r
96 private static final int ACTION_THROTTLE = 1;
\r
99 private static boolean enabled = false; // enabled or not
\r
100 private static int n_requests = 0; // number of requests in M minutes
\r
101 private static int m_minutes = 0; // sampling period
\r
102 private static int action = ACTION_DROP; // action to take (throttle or drop)
\r
104 private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.provisioning.internal");
\r
105 private static Map<String, Counter> map = new HashMap<String, Counter>();
\r
106 private static final Timer rolex = new Timer();
\r
109 public void init(FilterConfig arg0) throws ServletException {
\r
111 rolex.scheduleAtFixedRate(this, 5 * 60000L, 5 * 60000L); // Run once every 5 minutes to clean map
\r
115 * Configure the throttle. This should be called from BaseServlet.provisioningParametersChanged(), to make sure it stays up to date.
\r
117 public static void configure() {
\r
118 Parameters p = Parameters.getParameter(Parameters.THROTTLE_FILTER);
\r
121 Class.forName(JETTY_REQUEST);
\r
122 String v = p.getValue();
\r
123 if (v != null && !v.equals("off")) {
\r
124 String[] pp = v.split(",");
\r
126 n_requests = (pp.length > 0) ? getInt(pp[0], DEFAULT_N) : DEFAULT_N;
\r
127 m_minutes = (pp.length > 1) ? getInt(pp[1], DEFAULT_M) : DEFAULT_M;
\r
128 action = (pp.length > 2 && pp[2] != null && pp[2].equalsIgnoreCase("throttle")) ? ACTION_THROTTLE : ACTION_DROP;
\r
130 // ACTION_THROTTLE is not currently working, so is not supported
\r
131 if (action == ACTION_THROTTLE) {
\r
132 action = ACTION_DROP;
\r
133 logger.info("Throttling is not currently supported; action changed to DROP");
\r
135 logger.info("ThrottleFilter is ENABLED for /publish requests; N=" + n_requests + ", M=" + m_minutes + ", Action=" + action);
\r
139 } catch (ClassNotFoundException e) {
\r
140 logger.warn("Class " + JETTY_REQUEST + " is not available; this filter requires Jetty.");
\r
143 logger.info("ThrottleFilter is DISABLED for /publish requests.");
\r
148 private static int getInt(String s, int deflt) {
\r
150 return Integer.parseInt(s);
\r
151 } catch (NumberFormatException x) {
\r
157 public void destroy() {
\r
163 public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
\r
164 throws IOException, ServletException {
\r
165 if (enabled && action == ACTION_THROTTLE) {
\r
166 throttleFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);
\r
167 } else if (enabled) {
\r
168 dropFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);
\r
170 chain.doFilter(request, response);
\r
174 public void dropFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)
\r
175 throws IOException, ServletException {
\r
176 int rate = getRequestRate(request);
\r
177 if (rate >= n_requests) {
\r
178 // drop request - only works under Jetty
\r
179 String m = String.format("Dropping connection: %s %d bad connections in %d minutes", getConnectionId(request), rate, m_minutes);
\r
181 Request base_request = (request instanceof Request)
\r
182 ? (Request) request
\r
183 : HttpConnection.getCurrentConnection().getHttpChannel().getRequest();
\r
184 base_request.getHttpChannel().getEndPoint().close();
\r
186 chain.doFilter(request, response);
\r
190 public void throttleFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)
\r
191 throws IOException, ServletException {
\r
192 // throttle request
\r
193 String id = getConnectionId(request);
\r
194 int rate = getRequestRate(request);
\r
195 Object results = request.getAttribute(THROTTLE_MARKER);
\r
196 if (rate >= n_requests && results == null) {
\r
197 String m = String.format("Throttling connection: %s %d bad connections in %d minutes", getConnectionId(request), rate, m_minutes);
\r
199 Continuation continuation = ContinuationSupport.getContinuation(request);
\r
200 continuation.suspend();
\r
201 register(id, continuation);
\r
202 continuation.undispatch();
\r
204 chain.doFilter(request, response);
\r
205 @SuppressWarnings("resource")
\r
206 InputStream is = request.getInputStream();
\r
207 byte[] b = new byte[4096];
\r
208 int n = is.read(b);
\r
216 private Map<String, List<Continuation>> suspended_requests = new HashMap<String, List<Continuation>>();
\r
218 private void register(String id, Continuation continuation) {
\r
219 synchronized (suspended_requests) {
\r
220 List<Continuation> list = suspended_requests.get(id);
\r
221 if (list == null) {
\r
222 list = new ArrayList<Continuation>();
\r
223 suspended_requests.put(id, list);
\r
225 list.add(continuation);
\r
229 private void resume(String id) {
\r
230 synchronized (suspended_requests) {
\r
231 List<Continuation> list = suspended_requests.get(id);
\r
232 if (list != null) {
\r
233 // when the waited for event happens
\r
234 Continuation continuation = list.remove(0);
\r
235 continuation.setAttribute(ThrottleFilter.THROTTLE_MARKER, new Object());
\r
236 continuation.resume();
\r
242 * Return a count of number of requests in the last M minutes, iff this is a "bad" request.
\r
243 * If the request has been resumed (if it contains the THROTTLE_MARKER) it is considered good.
\r
245 * @param request the request
\r
246 * @return number of requests in the last M minutes, 0 means it is a "good" request
\r
248 private int getRequestRate(HttpServletRequest request) {
\r
249 String expecthdr = request.getHeader("Expect");
\r
250 if (expecthdr != null && expecthdr.equalsIgnoreCase("100-continue"))
\r
253 String key = getConnectionId(request);
\r
254 synchronized (map) {
\r
255 Counter cnt = map.get(key);
\r
257 cnt = new Counter();
\r
260 int n = cnt.getRequestRate();
\r
265 public class Counter {
\r
266 private List<Long> times = new Vector<Long>(); // a record of request times
\r
268 public int prune() {
\r
270 long n = System.currentTimeMillis() - (m_minutes * ONE_MINUTE);
\r
271 long t = times.get(0);
\r
276 } catch (IndexOutOfBoundsException e) {
\r
279 return times.size();
\r
282 public int getRequestRate() {
\r
283 times.add(System.currentTimeMillis());
\r
289 * Identify a connection by endpoint IP address, and feed ID.
\r
291 private String getConnectionId(HttpServletRequest req) {
\r
292 return req.getRemoteAddr() + "/" + getFeedId(req);
\r
295 private int getFeedId(HttpServletRequest req) {
\r
296 String path = req.getPathInfo();
\r
297 if (path == null || path.length() < 2)
\r
299 path = path.substring(1);
\r
300 int ix = path.indexOf('/');
\r
301 if (ix < 0 || ix == path.length() - 1)
\r
304 int feedid = Integer.parseInt(path.substring(0, ix));
\r
306 } catch (NumberFormatException e) {
\r
312 public void run() {
\r
313 // Once every 5 minutes, go through the map, and remove empty entrys
\r
314 for (Object s : map.keySet().toArray()) {
\r
315 synchronized (map) {
\r
316 Counter c = map.get(s);
\r
317 if (c.prune() <= 0)
\r