* * Licensed under the Apache License, Version 2.0 (the "License");\r
* * you may not use this file except in compliance with the License.\r
* * You may obtain a copy of the License at\r
- * * \r
+ * *\r
* * http://www.apache.org/licenses/LICENSE-2.0\r
- * * \r
+ * *\r
* * Unless required by applicable law or agreed to in writing, software\r
* * distributed under the License is distributed on an "AS IS" BASIS,\r
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
import javax.servlet.http.HttpServletRequest;\r
import javax.servlet.http.HttpServletResponse;\r
\r
-import org.apache.log4j.Logger;\r
+import com.att.eelf.configuration.EELFLogger;\r
+import com.att.eelf.configuration.EELFManager;\r
import org.eclipse.jetty.continuation.Continuation;\r
import org.eclipse.jetty.continuation.ContinuationSupport;\r
-import org.eclipse.jetty.server.AbstractHttpConnection;\r
-import org.eclipse.jetty.server.Request;\r
+import org.eclipse.jetty.server.*;\r
import org.onap.dmaap.datarouter.provisioning.beans.Parameters;\r
\r
/**\r
* <tr><td>(no value)</td><td>filter disabled</td></tr>\r
* <tr><td>off</td><td>filter disabled</td></tr>\r
* <tr><td>N[,M[,action]]</td><td>set N, M, and action (used in the algorithm below).\r
- * Action is <i>drop</i> or <i>throttle</i>.\r
- * If M is missing, it defaults to 5 minutes.\r
- * If the action is missing, it defaults to <i>drop</i>.\r
+ * Action is <i>drop</i> or <i>throttle</i>.\r
+ * If M is missing, it defaults to 5 minutes.\r
+ * If the action is missing, it defaults to <i>drop</i>.\r
* </td></tr>\r
* </table>\r
* <p>\r
* <ol>\r
* <li>the filter is enabled, and</li>\r
* <li>N /publish requests come to the provisioning server in M minutes\r
- * <ol>\r
- * <li>from the same IP address</li>\r
- * <li>for the same feed</li>\r
- * <li>lacking the <i>Expect: 100-continue</i> header</li>\r
- * </ol>\r
+ * <ol>\r
+ * <li>from the same IP address</li>\r
+ * <li>for the same feed</li>\r
+ * <li>lacking the <i>Expect: 100-continue</i> header</li>\r
+ * </ol>\r
* </li>\r
* </ol>\r
* The action that can be performed (if triggered) are:\r
* <ol>\r
* <li><i>drop</i> - the connection is dropped immediately.</li>\r
* <li><i>throttle</i> - [not supported] the connection is put into a low priority queue with all other throttled connections.\r
- * These are then processed at a slower rate. Note: this option does not work correctly, and is disabled.\r
- * The only action that is supported is <i>drop</i>.\r
+ * These are then processed at a slower rate. Note: this option does not work correctly, and is disabled.\r
+ * The only action that is supported is <i>drop</i>.\r
* </li>\r
* </ol>\r
*\r
* @version $Id: ThrottleFilter.java,v 1.2 2014/03/12 19:45:41 eby Exp $\r
*/\r
public class ThrottleFilter extends TimerTask implements Filter {\r
- public static final int DEFAULT_N = 10;\r
- public static final int DEFAULT_M = 5;\r
- public static final String THROTTLE_MARKER = "org.onap.dmaap.datarouter.provisioning.THROTTLE_MARKER";\r
- private static final String JETTY_REQUEST = "org.eclipse.jetty.server.Request";\r
- private static final long ONE_MINUTE = 60000L;\r
- private static final int ACTION_DROP = 0;\r
- private static final int ACTION_THROTTLE = 1;\r
+ public static final int DEFAULT_N = 10;\r
+ public static final int DEFAULT_M = 5;\r
+ public static final String THROTTLE_MARKER = "org.onap.dmaap.datarouter.provisioning.THROTTLE_MARKER";\r
+ private static final String JETTY_REQUEST = "org.eclipse.jetty.server.Request";\r
+ private static final long ONE_MINUTE = 60000L;\r
+ private static final int ACTION_DROP = 0;\r
+ private static final int ACTION_THROTTLE = 1;\r
+\r
+ // Configuration\r
+ private static boolean enabled = false; // enabled or not\r
+ private static int n_requests = 0; // number of requests in M minutes\r
+ private static int m_minutes = 0; // sampling period\r
+ private static int action = ACTION_DROP; // action to take (throttle or drop)\r
+\r
+ private static EELFLogger logger = EELFManager.getInstance().getLogger("InternalLog");\r
+ private static Map<String, Counter> map = new HashMap<String, Counter>();\r
+ private static final Timer rolex = new Timer();\r
+\r
+ @Override\r
+ public void init(FilterConfig arg0) throws ServletException {\r
+ configure();\r
+ rolex.scheduleAtFixedRate(this, 5 * 60000L, 5 * 60000L); // Run once every 5 minutes to clean map\r
+ }\r
+\r
+ /**\r
+ * Configure the throttle. This should be called from BaseServlet.provisioningParametersChanged(), to make sure it stays up to date.\r
+ */\r
+ public static void configure() {\r
+ Parameters p = Parameters.getParameter(Parameters.THROTTLE_FILTER);\r
+ if (p != null) {\r
+ try {\r
+ Class.forName(JETTY_REQUEST);\r
+ String v = p.getValue();\r
+ if (v != null && !v.equals("off")) {\r
+ String[] pp = v.split(",");\r
+ if (pp != null) {\r
+ n_requests = (pp.length > 0) ? getInt(pp[0], DEFAULT_N) : DEFAULT_N;\r
+ m_minutes = (pp.length > 1) ? getInt(pp[1], DEFAULT_M) : DEFAULT_M;\r
+ action = (pp.length > 2 && pp[2] != null && pp[2].equalsIgnoreCase("throttle")) ? ACTION_THROTTLE : ACTION_DROP;\r
+ enabled = true;\r
+ // ACTION_THROTTLE is not currently working, so is not supported\r
+ if (action == ACTION_THROTTLE) {\r
+ action = ACTION_DROP;\r
+ logger.info("Throttling is not currently supported; action changed to DROP");\r
+ }\r
+ logger.info("ThrottleFilter is ENABLED for /publish requests; N=" + n_requests + ", M=" + m_minutes + ", Action=" + action);\r
+ return;\r
+ }\r
+ }\r
+ } catch (ClassNotFoundException e) {\r
+ logger.warn("Class " + JETTY_REQUEST + " is not available; this filter requires Jetty.", e);\r
+ }\r
+ }\r
+ logger.info("ThrottleFilter is DISABLED for /publish requests.");\r
+ enabled = false;\r
+ map.clear();\r
+ }\r
+\r
+ private static int getInt(String s, int deflt) {\r
+ try {\r
+ return Integer.parseInt(s);\r
+ } catch (NumberFormatException x) {\r
+ return deflt;\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void destroy() {\r
+ rolex.cancel();\r
+ map.clear();\r
+ }\r
+\r
+ @Override\r
+ public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)\r
+ throws IOException, ServletException {\r
+ if (enabled && action == ACTION_THROTTLE) {\r
+ throttleFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);\r
+ } else if (enabled) {\r
+ dropFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);\r
+ } else {\r
+ chain.doFilter(request, response);\r
+ }\r
+ }\r
+\r
+ public void dropFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)\r
+ throws IOException, ServletException {\r
+ int rate = getRequestRate(request);\r
+ if (rate >= n_requests) {\r
+ // drop request - only works under Jetty\r
+ String m = String.format("Dropping connection: %s %d bad connections in %d minutes", getConnectionId(request), rate, m_minutes);\r
+ logger.info(m);\r
+ Request base_request = (request instanceof Request)\r
+ ? (Request) request\r
+ : HttpConnection.getCurrentConnection().getHttpChannel().getRequest();\r
+ base_request.getHttpChannel().getEndPoint().close();\r
+ } else {\r
+ chain.doFilter(request, response);\r
+ }\r
+ }\r
+\r
+ public void throttleFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)\r
+ throws IOException, ServletException {\r
+ // throttle request\r
+ String id = getConnectionId(request);\r
+ int rate = getRequestRate(request);\r
+ Object results = request.getAttribute(THROTTLE_MARKER);\r
+ if (rate >= n_requests && results == null) {\r
+ String m = String.format("Throttling connection: %s %d bad connections in %d minutes", getConnectionId(request), rate, m_minutes);\r
+ logger.info(m);\r
+ Continuation continuation = ContinuationSupport.getContinuation(request);\r
+ continuation.suspend();\r
+ register(id, continuation);\r
+ continuation.undispatch();\r
+ } else {\r
+ chain.doFilter(request, response);\r
+ @SuppressWarnings("resource")\r
+ InputStream is = request.getInputStream();\r
+ byte[] b = new byte[4096];\r
+ int n = is.read(b);\r
+ while (n > 0) {\r
+ n = is.read(b);\r
+ }\r
+ resume(id);\r
+ }\r
+ }\r
+\r
+ private Map<String, List<Continuation>> suspended_requests = new HashMap<String, List<Continuation>>();\r
\r
- // Configuration\r
- private static boolean enabled = false; // enabled or not\r
- private static int n_requests = 0; // number of requests in M minutes\r
- private static int m_minutes = 0; // sampling period\r
- private static int action = ACTION_DROP; // action to take (throttle or drop)\r
+ private void register(String id, Continuation continuation) {\r
+ synchronized (suspended_requests) {\r
+ List<Continuation> list = suspended_requests.get(id);\r
+ if (list == null) {\r
+ list = new ArrayList<Continuation>();\r
+ suspended_requests.put(id, list);\r
+ }\r
+ list.add(continuation);\r
+ }\r
+ }\r
\r
- private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.provisioning.internal");\r
- private static Map<String, Counter> map = new HashMap<String, Counter>();\r
- private static final Timer rolex = new Timer();\r
+ private void resume(String id) {\r
+ synchronized (suspended_requests) {\r
+ List<Continuation> list = suspended_requests.get(id);\r
+ if (list != null) {\r
+ // when the waited for event happens\r
+ Continuation continuation = list.remove(0);\r
+ continuation.setAttribute(ThrottleFilter.THROTTLE_MARKER, new Object());\r
+ continuation.resume();\r
+ }\r
+ }\r
+ }\r
\r
- @Override\r
- public void init(FilterConfig arg0) throws ServletException {\r
- configure();\r
- rolex.scheduleAtFixedRate(this, 5*60000L, 5*60000L); // Run once every 5 minutes to clean map\r
- }\r
+ /**\r
+ * Return a count of number of requests in the last M minutes, iff this is a "bad" request.\r
+ * If the request has been resumed (if it contains the THROTTLE_MARKER) it is considered good.\r
+ *\r
+ * @param request the request\r
+ * @return number of requests in the last M minutes, 0 means it is a "good" request\r
+ */\r
+ private int getRequestRate(HttpServletRequest request) {\r
+ String expecthdr = request.getHeader("Expect");\r
+ if (expecthdr != null && expecthdr.equalsIgnoreCase("100-continue"))\r
+ return 0;\r
\r
- /**\r
- * Configure the throttle. This should be called from BaseServlet.provisioningParametersChanged(), to make sure it stays up to date.\r
- */\r
- public static void configure() {\r
- Parameters p = Parameters.getParameter(Parameters.THROTTLE_FILTER);\r
- if (p != null) {\r
- try {\r
- Class.forName(JETTY_REQUEST);\r
- String v = p.getValue();\r
- if (v != null && !v.equals("off")) {\r
- String[] pp = v.split(",");\r
- if (pp != null) {\r
- n_requests = (pp.length > 0) ? getInt(pp[0], DEFAULT_N) : DEFAULT_N;\r
- m_minutes = (pp.length > 1) ? getInt(pp[1], DEFAULT_M) : DEFAULT_M;\r
- action = (pp.length > 2 && pp[2] != null && pp[2].equalsIgnoreCase("throttle")) ? ACTION_THROTTLE : ACTION_DROP;\r
- enabled = true;\r
- // ACTION_THROTTLE is not currently working, so is not supported\r
- if (action == ACTION_THROTTLE) {\r
- action = ACTION_DROP;\r
- logger.info("Throttling is not currently supported; action changed to DROP");\r
- }\r
- logger.info("ThrottleFilter is ENABLED for /publish requests; N="+n_requests+", M="+m_minutes+", Action="+action);\r
- return;\r
- }\r
- }\r
- } catch (ClassNotFoundException e) {\r
- logger.warn("Class "+JETTY_REQUEST+" is not available; this filter requires Jetty.");\r
- }\r
- }\r
- logger.info("ThrottleFilter is DISABLED for /publish requests.");\r
- enabled = false;\r
- map.clear();\r
- }\r
- private static int getInt(String s, int deflt) {\r
- try {\r
- return Integer.parseInt(s);\r
- } catch (NumberFormatException x) {\r
- return deflt;\r
- }\r
- }\r
- @Override\r
- public void destroy() {\r
- rolex.cancel();\r
- map.clear();\r
- }\r
+ String key = getConnectionId(request);\r
+ synchronized (map) {\r
+ Counter cnt = map.get(key);\r
+ if (cnt == null) {\r
+ cnt = new Counter();\r
+ map.put(key, cnt);\r
+ }\r
+ int n = cnt.getRequestRate();\r
+ return n;\r
+ }\r
+ }\r
\r
- @Override\r
- public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)\r
- throws IOException, ServletException\r
- {\r
- if (enabled && action == ACTION_THROTTLE) {\r
- throttleFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);\r
- } else if (enabled) {\r
- dropFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);\r
- } else {\r
- chain.doFilter(request, response);\r
- }\r
- }\r
- public void dropFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)\r
- throws IOException, ServletException\r
- {\r
- int rate = getRequestRate((HttpServletRequest) request);\r
- if (rate >= n_requests) {\r
- // drop request - only works under Jetty\r
- String m = String.format("Dropping connection: %s %d bad connections in %d minutes", getConnectionId((HttpServletRequest) request), rate, m_minutes);\r
- logger.info(m);\r
- Request base_request = (request instanceof Request)\r
- ? (Request) request\r
- : AbstractHttpConnection.getCurrentConnection().getRequest();\r
- base_request.getConnection().getEndPoint().close();\r
- } else {\r
- chain.doFilter(request, response);\r
- }\r
- }\r
- public void throttleFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)\r
- throws IOException, ServletException\r
- {\r
- // throttle request\r
- String id = getConnectionId((HttpServletRequest) request);\r
- int rate = getRequestRate((HttpServletRequest) request);\r
- Object results = request.getAttribute(THROTTLE_MARKER);\r
- if (rate >= n_requests && results == null) {\r
- String m = String.format("Throttling connection: %s %d bad connections in %d minutes", getConnectionId((HttpServletRequest) request), rate, m_minutes);\r
- logger.info(m);\r
- Continuation continuation = ContinuationSupport.getContinuation(request);\r
- continuation.suspend();\r
- register(id, continuation);\r
- continuation.undispatch();\r
- } else {\r
- chain.doFilter(request, response);\r
- @SuppressWarnings("resource")\r
- InputStream is = request.getInputStream();\r
- byte[] b = new byte[4096];\r
- int n = is.read(b);\r
- while (n > 0) {\r
- n = is.read(b);\r
- }\r
- resume(id);\r
- }\r
- }\r
- private Map<String, List<Continuation>> suspended_requests = new HashMap<String, List<Continuation>>();\r
- private void register(String id, Continuation continuation) {\r
- synchronized (suspended_requests) {\r
- List<Continuation> list = suspended_requests.get(id);\r
- if (list == null) {\r
- list = new ArrayList<Continuation>();\r
- suspended_requests.put(id, list);\r
- }\r
- list.add(continuation);\r
- }\r
- }\r
- private void resume(String id) {\r
- synchronized (suspended_requests) {\r
- List<Continuation> list = suspended_requests.get(id);\r
- if (list != null) {\r
- // when the waited for event happens\r
- Continuation continuation = list.remove(0);\r
- continuation.setAttribute(ThrottleFilter.THROTTLE_MARKER, new Object());\r
- continuation.resume();\r
- }\r
- }\r
- }\r
+ public class Counter {\r
+ private List<Long> times = new Vector<Long>(); // a record of request times\r
\r
- /**\r
- * Return a count of number of requests in the last M minutes, iff this is a "bad" request.\r
- * If the request has been resumed (if it contains the THROTTLE_MARKER) it is considered good.\r
- * @param request the request\r
- * @return number of requests in the last M minutes, 0 means it is a "good" request\r
- */\r
- private int getRequestRate(HttpServletRequest request) {\r
- String expecthdr = request.getHeader("Expect");\r
- if (expecthdr != null && expecthdr.equalsIgnoreCase("100-continue"))\r
- return 0;\r
+ public int prune() {\r
+ try {\r
+ long n = System.currentTimeMillis() - (m_minutes * ONE_MINUTE);\r
+ long t = times.get(0);\r
+ while (t < n) {\r
+ times.remove(0);\r
+ t = times.get(0);\r
+ }\r
+ } catch (IndexOutOfBoundsException e) {\r
+ logger.trace("Exception: " + e.getMessage(), e);\r
+ }\r
+ return times.size();\r
+ }\r
\r
- String key = getConnectionId(request);\r
- synchronized (map) {\r
- Counter cnt = map.get(key);\r
- if (cnt == null) {\r
- cnt = new Counter();\r
- map.put(key, cnt);\r
- }\r
- int n = cnt.getRequestRate();\r
- return n;\r
- }\r
- }\r
+ public int getRequestRate() {\r
+ times.add(System.currentTimeMillis());\r
+ return prune();\r
+ }\r
+ }\r
\r
- public class Counter {\r
- private List<Long> times = new Vector<Long>(); // a record of request times\r
- public int prune() {\r
- try {\r
- long n = System.currentTimeMillis() - (m_minutes * ONE_MINUTE);\r
- long t = times.get(0);\r
- while (t < n) {\r
- times.remove(0);\r
- t = times.get(0);\r
- }\r
- } catch (IndexOutOfBoundsException e) {\r
- // ignore\r
- }\r
- return times.size();\r
- }\r
- public int getRequestRate() {\r
- times.add(System.currentTimeMillis());\r
- return prune();\r
- }\r
- }\r
+ /**\r
+ * Identify a connection by endpoint IP address, and feed ID.\r
+ */\r
+ private String getConnectionId(HttpServletRequest req) {\r
+ return req.getRemoteAddr() + "/" + getFeedId(req);\r
+ }\r
\r
- /**\r
- * Identify a connection by endpoint IP address, and feed ID.\r
- */\r
- private String getConnectionId(HttpServletRequest req) {\r
- return req.getRemoteAddr() + "/" + getFeedId(req);\r
- }\r
- private int getFeedId(HttpServletRequest req) {\r
- String path = req.getPathInfo();\r
- if (path == null || path.length() < 2)\r
- return -1;\r
- path = path.substring(1);\r
- int ix = path.indexOf('/');\r
- if (ix < 0 || ix == path.length()-1)\r
- return -2;\r
- try {\r
- int feedid = Integer.parseInt(path.substring(0, ix));\r
- return feedid;\r
- } catch (NumberFormatException e) {\r
- return -1;\r
- }\r
- }\r
+ private int getFeedId(HttpServletRequest req) {\r
+ String path = req.getPathInfo();\r
+ if (path == null || path.length() < 2)\r
+ return -1;\r
+ path = path.substring(1);\r
+ int ix = path.indexOf('/');\r
+ if (ix < 0 || ix == path.length() - 1)\r
+ return -2;\r
+ try {\r
+ int feedid = Integer.parseInt(path.substring(0, ix));\r
+ return feedid;\r
+ } catch (NumberFormatException e) {\r
+ return -1;\r
+ }\r
+ }\r
\r
- @Override\r
- public void run() {\r
- // Once every 5 minutes, go through the map, and remove empty entrys\r
- for (Object s : map.keySet().toArray()) {\r
- synchronized (map) {\r
- Counter c = map.get(s);\r
- if (c.prune() <= 0)\r
- map.remove(s);\r
- }\r
- }\r
- }\r
+ @Override\r
+ public void run() {\r
+ // Once every 5 minutes, go through the map, and remove empty entrys\r
+ for (Object s : map.keySet().toArray()) {\r
+ synchronized (map) {\r
+ Counter c = map.get(s);\r
+ if (c.prune() <= 0)\r
+ map.remove(s);\r
+ }\r
+ }\r
+ }\r
}\r