Update project structure to org.onap
[dmaap/datarouter.git] / datarouter-prov / src / main / java / org / onap / dmaap / datarouter / provisioning / utils / ThrottleFilter.java
diff --git a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/ThrottleFilter.java b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/ThrottleFilter.java
new file mode 100644 (file)
index 0000000..ba2ca7e
--- /dev/null
@@ -0,0 +1,315 @@
+/*******************************************************************************\r
+ * ============LICENSE_START==================================================\r
+ * * org.onap.dmaap\r
+ * * ===========================================================================\r
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * * ===========================================================================\r
+ * * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * * you may not use this file except in compliance with the License.\r
+ * * You may obtain a copy of the License at\r
+ * * \r
+ *  *      http://www.apache.org/licenses/LICENSE-2.0\r
+ * * \r
+ *  * Unless required by applicable law or agreed to in writing, software\r
+ * * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * * See the License for the specific language governing permissions and\r
+ * * limitations under the License.\r
+ * * ============LICENSE_END====================================================\r
+ * *\r
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
+ * *\r
+ ******************************************************************************/\r
+\r
+\r
+package org.onap.dmaap.datarouter.provisioning.utils;\r
+\r
+import java.io.IOException;\r
+import java.io.InputStream;\r
+import java.util.ArrayList;\r
+import java.util.HashMap;\r
+import java.util.List;\r
+import java.util.Map;\r
+import java.util.Timer;\r
+import java.util.TimerTask;\r
+import java.util.Vector;\r
+\r
+import javax.servlet.Filter;\r
+import javax.servlet.FilterChain;\r
+import javax.servlet.FilterConfig;\r
+import javax.servlet.ServletException;\r
+import javax.servlet.ServletRequest;\r
+import javax.servlet.ServletResponse;\r
+import javax.servlet.http.HttpServletRequest;\r
+import javax.servlet.http.HttpServletResponse;\r
+\r
+import org.apache.log4j.Logger;\r
+import org.eclipse.jetty.continuation.Continuation;\r
+import org.eclipse.jetty.continuation.ContinuationSupport;\r
+import org.eclipse.jetty.server.AbstractHttpConnection;\r
+import org.eclipse.jetty.server.Request;\r
+import org.onap.dmaap.datarouter.provisioning.beans.Parameters;\r
+\r
+/**\r
+ * This filter checks /publish requests to the provisioning server to allow ill-behaved publishers to be throttled.\r
+ * It is configured via the provisioning parameter THROTTLE_FILTER.\r
+ * The THROTTLE_FILTER provisioning parameter can have these values:\r
+ * <table>\r
+ * <tr><td>(no value)</td><td>filter disabled</td></tr>\r
+ * <tr><td>off</td><td>filter disabled</td></tr>\r
+ * <tr><td>N[,M[,action]]</td><td>set N, M, and action (used in the algorithm below).\r
+ *     Action is <i>drop</i> or <i>throttle</i>.\r
+ *     If M is missing, it defaults to 5 minutes.\r
+ *     If the action is missing, it defaults to <i>drop</i>.\r
+ * </td></tr>\r
+ * </table>\r
+ * <p>\r
+ * The <i>action</i> is triggered iff:\r
+ * <ol>\r
+ * <li>the filter is enabled, and</li>\r
+ * <li>N /publish requests come to the provisioning server in M minutes\r
+ *   <ol>\r
+ *   <li>from the same IP address</li>\r
+ *   <li>for the same feed</li>\r
+ *   <li>lacking the <i>Expect: 100-continue</i> header</li>\r
+ *   </ol>\r
+ * </li>\r
+ * </ol>\r
+ * The action that can be performed (if triggered) are:\r
+ * <ol>\r
+ * <li><i>drop</i> - the connection is dropped immediately.</li>\r
+ * <li><i>throttle</i> - [not supported] the connection is put into a low priority queue with all other throttled connections.\r
+ *   These are then processed at a slower rate.  Note: this option does not work correctly, and is disabled.\r
+ *   The only action that is supported is <i>drop</i>.\r
+ * </li>\r
+ * </ol>\r
+ *\r
+ * @author Robert Eby\r
+ * @version $Id: ThrottleFilter.java,v 1.2 2014/03/12 19:45:41 eby Exp $\r
+ */\r
+public class ThrottleFilter extends TimerTask implements Filter {\r
+       public  static final int    DEFAULT_N       = 10;\r
+       public  static final int    DEFAULT_M       = 5;\r
+       public  static final String THROTTLE_MARKER = "org.onap.dmaap.datarouter.provisioning.THROTTLE_MARKER";\r
+       private static final String JETTY_REQUEST   = "org.eclipse.jetty.server.Request";\r
+       private static final long   ONE_MINUTE      = 60000L;\r
+       private static final int    ACTION_DROP     = 0;\r
+       private static final int    ACTION_THROTTLE = 1;\r
+\r
+       // Configuration\r
+       private static boolean enabled = false;         // enabled or not\r
+       private static int n_requests = 0;                      // number of requests in M minutes\r
+       private static int m_minutes = 0;                       // sampling period\r
+       private static int action = ACTION_DROP;        // action to take (throttle or drop)\r
+\r
+       private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.provisioning.internal");\r
+       private static Map<String, Counter> map = new HashMap<String, Counter>();\r
+       private static final Timer rolex = new Timer();\r
+\r
+       @Override\r
+       public void init(FilterConfig arg0) throws ServletException {\r
+               configure();\r
+               rolex.scheduleAtFixedRate(this, 5*60000L, 5*60000L);    // Run once every 5 minutes to clean map\r
+       }\r
+\r
+       /**\r
+        * Configure the throttle.  This should be called from BaseServlet.provisioningParametersChanged(), to make sure it stays up to date.\r
+        */\r
+       public static void configure() {\r
+               Parameters p = Parameters.getParameter(Parameters.THROTTLE_FILTER);\r
+               if (p != null) {\r
+                       try {\r
+                               Class.forName(JETTY_REQUEST);\r
+                               String v = p.getValue();\r
+                               if (v != null && !v.equals("off")) {\r
+                                       String[] pp = v.split(",");\r
+                                       if (pp != null) {\r
+                                               n_requests = (pp.length > 0) ? getInt(pp[0], DEFAULT_N) : DEFAULT_N;\r
+                                               m_minutes  = (pp.length > 1) ? getInt(pp[1], DEFAULT_M) : DEFAULT_M;\r
+                                               action     = (pp.length > 2 && pp[2] != null && pp[2].equalsIgnoreCase("throttle")) ? ACTION_THROTTLE : ACTION_DROP;\r
+                                               enabled    = true;\r
+                                               // ACTION_THROTTLE is not currently working, so is not supported\r
+                                               if (action == ACTION_THROTTLE) {\r
+                                                       action = ACTION_DROP;\r
+                                                       logger.info("Throttling is not currently supported; action changed to DROP");\r
+                                               }\r
+                                               logger.info("ThrottleFilter is ENABLED for /publish requests; N="+n_requests+", M="+m_minutes+", Action="+action);\r
+                                               return;\r
+                                       }\r
+                               }\r
+                       } catch (ClassNotFoundException e) {\r
+                               logger.warn("Class "+JETTY_REQUEST+" is not available; this filter requires Jetty.");\r
+                       }\r
+               }\r
+               logger.info("ThrottleFilter is DISABLED for /publish requests.");\r
+               enabled = false;\r
+               map.clear();\r
+       }\r
+       private static int getInt(String s, int deflt) {\r
+               try {\r
+                       return Integer.parseInt(s);\r
+               } catch (NumberFormatException x) {\r
+                       return deflt;\r
+               }\r
+       }\r
+       @Override\r
+       public void destroy() {\r
+               rolex.cancel();\r
+               map.clear();\r
+       }\r
+\r
+       @Override\r
+       public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)\r
+               throws IOException, ServletException\r
+       {\r
+               if (enabled && action == ACTION_THROTTLE) {\r
+                       throttleFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);\r
+               } else if (enabled) {\r
+                       dropFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);\r
+               } else {\r
+                       chain.doFilter(request, response);\r
+               }\r
+       }\r
+       public void dropFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)\r
+               throws IOException, ServletException\r
+       {\r
+               int rate = getRequestRate((HttpServletRequest) request);\r
+               if (rate >= n_requests) {\r
+                       // drop request - only works under Jetty\r
+                       String m = String.format("Dropping connection: %s %d bad connections in %d minutes", getConnectionId((HttpServletRequest) request), rate, m_minutes);\r
+                       logger.info(m);\r
+                       Request base_request = (request instanceof Request)\r
+                               ? (Request) request\r
+                               : AbstractHttpConnection.getCurrentConnection().getRequest();\r
+                       base_request.getConnection().getEndPoint().close();\r
+               } else {\r
+                       chain.doFilter(request, response);\r
+               }\r
+       }\r
+       public void throttleFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)\r
+               throws IOException, ServletException\r
+       {\r
+               // throttle request\r
+               String id = getConnectionId((HttpServletRequest) request);\r
+               int rate = getRequestRate((HttpServletRequest) request);\r
+               Object results = request.getAttribute(THROTTLE_MARKER);\r
+               if (rate >= n_requests && results == null) {\r
+                       String m = String.format("Throttling connection: %s %d bad connections in %d minutes", getConnectionId((HttpServletRequest) request), rate, m_minutes);\r
+                       logger.info(m);\r
+                       Continuation continuation = ContinuationSupport.getContinuation(request);\r
+                       continuation.suspend();\r
+                       register(id, continuation);\r
+                       continuation.undispatch();\r
+               } else {\r
+                       chain.doFilter(request, response);\r
+                       @SuppressWarnings("resource")\r
+                       InputStream is = request.getInputStream();\r
+                       byte[] b = new byte[4096];\r
+                       int n = is.read(b);\r
+                       while (n > 0) {\r
+                               n = is.read(b);\r
+                       }\r
+                       resume(id);\r
+               }\r
+       }\r
+       private Map<String, List<Continuation>> suspended_requests = new HashMap<String, List<Continuation>>();\r
+       private void register(String id, Continuation continuation) {\r
+               synchronized (suspended_requests) {\r
+                       List<Continuation> list = suspended_requests.get(id);\r
+                       if (list == null) {\r
+                               list = new ArrayList<Continuation>();\r
+                               suspended_requests.put(id,  list);\r
+                       }\r
+                       list.add(continuation);\r
+               }\r
+       }\r
+       private void resume(String id) {\r
+               synchronized (suspended_requests) {\r
+                       List<Continuation> list = suspended_requests.get(id);\r
+                       if (list != null) {\r
+                               // when the waited for event happens\r
+                               Continuation continuation = list.remove(0);\r
+                               continuation.setAttribute(ThrottleFilter.THROTTLE_MARKER, new Object());\r
+                               continuation.resume();\r
+                       }\r
+               }\r
+       }\r
+\r
+       /**\r
+        * Return a count of number of requests in the last M minutes, iff this is a "bad" request.\r
+        * If the request has been resumed (if it contains the THROTTLE_MARKER) it is considered good.\r
+        * @param request the request\r
+        * @return number of requests in the last M minutes, 0 means it is a "good" request\r
+        */\r
+       private int getRequestRate(HttpServletRequest request) {\r
+               String expecthdr = request.getHeader("Expect");\r
+               if (expecthdr != null && expecthdr.equalsIgnoreCase("100-continue"))\r
+                       return 0;\r
+\r
+               String key = getConnectionId(request);\r
+               synchronized (map) {\r
+                       Counter cnt = map.get(key);\r
+                       if (cnt == null) {\r
+                               cnt = new Counter();\r
+                               map.put(key, cnt);\r
+                       }\r
+                       int n = cnt.getRequestRate();\r
+                       return n;\r
+               }\r
+       }\r
+\r
+       public class Counter {\r
+               private List<Long> times = new Vector<Long>();  // a record of request times\r
+               public int prune() {\r
+                       try {\r
+                               long n = System.currentTimeMillis() - (m_minutes * ONE_MINUTE);\r
+                               long t = times.get(0);\r
+                               while (t < n) {\r
+                                       times.remove(0);\r
+                                       t = times.get(0);\r
+                               }\r
+                       } catch (IndexOutOfBoundsException e) {\r
+                               // ignore\r
+                       }\r
+                       return times.size();\r
+               }\r
+               public int getRequestRate() {\r
+                       times.add(System.currentTimeMillis());\r
+                       return prune();\r
+               }\r
+       }\r
+\r
+       /**\r
+        *  Identify a connection by endpoint IP address, and feed ID.\r
+        */\r
+       private String getConnectionId(HttpServletRequest req) {\r
+               return req.getRemoteAddr() + "/" + getFeedId(req);\r
+       }\r
+       private int getFeedId(HttpServletRequest req) {\r
+               String path = req.getPathInfo();\r
+               if (path == null || path.length() < 2)\r
+                       return -1;\r
+               path = path.substring(1);\r
+               int ix = path.indexOf('/');\r
+               if (ix < 0 || ix == path.length()-1)\r
+                       return -2;\r
+               try {\r
+                       int feedid = Integer.parseInt(path.substring(0, ix));\r
+                       return feedid;\r
+               } catch (NumberFormatException e) {\r
+                       return -1;\r
+               }\r
+       }\r
+\r
+       @Override\r
+       public void run() {\r
+               // Once every 5 minutes, go through the map, and remove empty entrys\r
+               for (Object s : map.keySet().toArray()) {\r
+                       synchronized (map) {\r
+                               Counter c = map.get(s);\r
+                               if (c.prune() <= 0)\r
+                                       map.remove(s);\r
+                       }\r
+               }\r
+       }\r
+}\r