+ private static final int DEFAULT_N = 10;\r
+ private static final int DEFAULT_M = 5;\r
+ private static final String THROTTLE_MARKER = "org.onap.dmaap.datarouter.provisioning.THROTTLE_MARKER";\r
+ private static final String JETTY_REQUEST = "org.eclipse.jetty.server.Request";\r
+ private static final long ONE_MINUTE = 60000L;\r
+ private static final int ACTION_DROP = 0;\r
+ private static final int ACTION_THROTTLE = 1;\r
+\r
+ // Configuration\r
+ private static boolean enabled = false; // enabled or not\r
+ private static int numRequests = 0; // number of requests in M minutes\r
+ private static int samplingPeriod = 0; // sampling period\r
+ private static int action = ACTION_DROP; // action to take (throttle or drop)\r
+\r
+ private static EELFLogger logger = EELFManager.getInstance().getLogger("InternalLog");\r
+ private static Map<String, Counter> map = new HashMap<>();\r
+ private static final Timer rolex = new Timer();\r
+\r
+ @Override\r
+ public void init(FilterConfig arg0) throws ServletException {\r
+ configure();\r
+ rolex.scheduleAtFixedRate(this, 5 * 60000L, 5 * 60000L); // Run once every 5 minutes to clean map\r
+ }\r
+\r
+ /**\r
+ * Configure the throttle. This should be called from BaseServlet.provisioningParametersChanged(), to make sure it stays up to date.\r
+ */\r
+ public static void configure() {\r
+ Parameters p = Parameters.getParameter(Parameters.THROTTLE_FILTER);\r
+ if (p != null) {\r
+ try {\r
+ Class.forName(JETTY_REQUEST);\r
+ String v = p.getValue();\r
+ if (v != null && !"off".equals(v)) {\r
+ String[] pp = v.split(",");\r
+ if (pp != null) {\r
+ numRequests = (pp.length > 0) ? getInt(pp[0], DEFAULT_N) : DEFAULT_N;\r
+ samplingPeriod = (pp.length > 1) ? getInt(pp[1], DEFAULT_M) : DEFAULT_M;\r
+ action = (pp.length > 2 && pp[2] != null && "throttle".equalsIgnoreCase(pp[2])) ? ACTION_THROTTLE : ACTION_DROP;\r
+ enabled = true;\r
+ // ACTION_THROTTLE is not currently working, so is not supported\r
+ if (action == ACTION_THROTTLE) {\r
+ action = ACTION_DROP;\r
+ logger.info("Throttling is not currently supported; action changed to DROP");\r
+ }\r
+ logger.info("ThrottleFilter is ENABLED for /publish requests; N=" + numRequests + ", M=" + samplingPeriod\r
+ + ", Action=" + action);\r
+ return;\r
+ }\r
+ }\r
+ } catch (ClassNotFoundException e) {\r
+ logger.warn("Class " + JETTY_REQUEST + " is not available; this filter requires Jetty.", e);\r
+ }\r
+ }\r
+ logger.info("ThrottleFilter is DISABLED for /publish requests.");\r
+ enabled = false;\r
+ map.clear();\r
+ }\r
+\r
+ private static int getInt(String s, int deflt) {\r
+ try {\r
+ return Integer.parseInt(s);\r
+ } catch (NumberFormatException x) {\r
+ return deflt;\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void destroy() {\r
+ rolex.cancel();\r
+ map.clear();\r
+ }\r
+\r
+ @Override\r
+ public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)\r
+ throws IOException, ServletException {\r
+ if (enabled && action == ACTION_THROTTLE) {\r
+ throttleFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);\r
+ } else if (enabled) {\r
+ dropFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);\r
+ } else {\r
+ chain.doFilter(request, response);\r
+ }\r
+ }\r
+\r
+ public void dropFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)\r
+ throws IOException, ServletException {\r
+ int rate = getRequestRate(request);\r
+ if (rate >= numRequests) {\r
+ // drop request - only works under Jetty\r
+ String m = String.format("Dropping connection: %s %d bad connections in %d minutes", getConnectionId(request), rate,\r
+ samplingPeriod);\r
+ logger.info(m);\r
+ Request baseRequest = (request instanceof Request)\r
+ ? (Request) request\r
+ : HttpConnection.getCurrentConnection().getHttpChannel().getRequest();\r
+ baseRequest.getHttpChannel().getEndPoint().close();\r
+ } else {\r
+ chain.doFilter(request, response);\r
+ }\r
+ }\r
+\r
+ private void throttleFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)\r
+ throws IOException, ServletException {\r
+ // throttle request\r
+ String id = getConnectionId(request);\r
+ int rate = getRequestRate(request);\r
+ Object results = request.getAttribute(THROTTLE_MARKER);\r
+ if (rate >= numRequests && results == null) {\r
+ String m = String.format("Throttling connection: %s %d bad connections in %d minutes",\r
+ getConnectionId(request), rate, samplingPeriod);\r
+ logger.info(m);\r
+ Continuation continuation = ContinuationSupport.getContinuation(request);\r
+ continuation.suspend();\r
+ register(id, continuation);\r
+ continuation.undispatch();\r
+ } else {\r
+ chain.doFilter(request, response);\r
+ @SuppressWarnings("resource")\r
+ InputStream is = request.getInputStream();\r
+ byte[] b = new byte[4096];\r
+ int n = is.read(b);\r
+ while (n > 0) {\r
+ n = is.read(b);\r
+ }\r
+ resume(id);\r
+ }\r
+ }\r
+\r
+ private Map<String, List<Continuation>> suspendedRequests = new HashMap<>();\r