Removing code smells
[dmaap/datarouter.git] / datarouter-prov / src / main / java / org / onap / dmaap / datarouter / provisioning / utils / ThrottleFilter.java
index ba2ca7e..da06f6b 100644 (file)
@@ -7,9 +7,9 @@
  * * Licensed under the Apache License, Version 2.0 (the "License");\r
  * * you may not use this file except in compliance with the License.\r
  * * You may obtain a copy of the License at\r
- * * \r
+ * *\r
  *  *      http://www.apache.org/licenses/LICENSE-2.0\r
- * * \r
+ * *\r
  *  * Unless required by applicable law or agreed to in writing, software\r
  * * distributed under the License is distributed on an "AS IS" BASIS,\r
  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
@@ -24,6 +24,8 @@
 \r
 package org.onap.dmaap.datarouter.provisioning.utils;\r
 \r
+import com.att.eelf.configuration.EELFLogger;\r
+import com.att.eelf.configuration.EELFManager;\r
 import java.io.IOException;\r
 import java.io.InputStream;\r
 import java.util.ArrayList;\r
@@ -32,8 +34,6 @@ import java.util.List;
 import java.util.Map;\r
 import java.util.Timer;\r
 import java.util.TimerTask;\r
-import java.util.Vector;\r
-\r
 import javax.servlet.Filter;\r
 import javax.servlet.FilterChain;\r
 import javax.servlet.FilterConfig;\r
@@ -42,11 +42,9 @@ import javax.servlet.ServletRequest;
 import javax.servlet.ServletResponse;\r
 import javax.servlet.http.HttpServletRequest;\r
 import javax.servlet.http.HttpServletResponse;\r
-\r
-import org.apache.log4j.Logger;\r
 import org.eclipse.jetty.continuation.Continuation;\r
 import org.eclipse.jetty.continuation.ContinuationSupport;\r
-import org.eclipse.jetty.server.AbstractHttpConnection;\r
+import org.eclipse.jetty.server.HttpConnection;\r
 import org.eclipse.jetty.server.Request;\r
 import org.onap.dmaap.datarouter.provisioning.beans.Parameters;\r
 \r
@@ -58,9 +56,9 @@ import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
  * <tr><td>(no value)</td><td>filter disabled</td></tr>\r
  * <tr><td>off</td><td>filter disabled</td></tr>\r
  * <tr><td>N[,M[,action]]</td><td>set N, M, and action (used in the algorithm below).\r
- *     Action is <i>drop</i> or <i>throttle</i>.\r
- *     If M is missing, it defaults to 5 minutes.\r
- *     If the action is missing, it defaults to <i>drop</i>.\r
+ * Action is <i>drop</i> or <i>throttle</i>.\r
+ * If M is missing, it defaults to 5 minutes.\r
+ * If the action is missing, it defaults to <i>drop</i>.\r
  * </td></tr>\r
  * </table>\r
  * <p>\r
@@ -68,19 +66,19 @@ import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
  * <ol>\r
  * <li>the filter is enabled, and</li>\r
  * <li>N /publish requests come to the provisioning server in M minutes\r
- *   <ol>\r
- *   <li>from the same IP address</li>\r
- *   <li>for the same feed</li>\r
- *   <li>lacking the <i>Expect: 100-continue</i> header</li>\r
- *   </ol>\r
+ * <ol>\r
+ * <li>from the same IP address</li>\r
+ * <li>for the same feed</li>\r
+ * <li>lacking the <i>Expect: 100-continue</i> header</li>\r
+ * </ol>\r
  * </li>\r
  * </ol>\r
  * The action that can be performed (if triggered) are:\r
  * <ol>\r
  * <li><i>drop</i> - the connection is dropped immediately.</li>\r
  * <li><i>throttle</i> - [not supported] the connection is put into a low priority queue with all other throttled connections.\r
- *   These are then processed at a slower rate.  Note: this option does not work correctly, and is disabled.\r
- *   The only action that is supported is <i>drop</i>.\r
+ * These are then processed at a slower rate.  Note: this option does not work correctly, and is disabled.\r
+ * The only action that is supported is <i>drop</i>.\r
  * </li>\r
  * </ol>\r
  *\r
@@ -88,228 +86,237 @@ import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
  * @version $Id: ThrottleFilter.java,v 1.2 2014/03/12 19:45:41 eby Exp $\r
  */\r
 public class ThrottleFilter extends TimerTask implements Filter {\r
-       public  static final int    DEFAULT_N       = 10;\r
-       public  static final int    DEFAULT_M       = 5;\r
-       public  static final String THROTTLE_MARKER = "org.onap.dmaap.datarouter.provisioning.THROTTLE_MARKER";\r
-       private static final String JETTY_REQUEST   = "org.eclipse.jetty.server.Request";\r
-       private static final long   ONE_MINUTE      = 60000L;\r
-       private static final int    ACTION_DROP     = 0;\r
-       private static final int    ACTION_THROTTLE = 1;\r
+    private static final int DEFAULT_N = 10;\r
+    private static final int DEFAULT_M = 5;\r
+    private static final String THROTTLE_MARKER = "org.onap.dmaap.datarouter.provisioning.THROTTLE_MARKER";\r
+    private static final String JETTY_REQUEST = "org.eclipse.jetty.server.Request";\r
+    private static final long ONE_MINUTE = 60000L;\r
+    private static final int ACTION_DROP = 0;\r
+    private static final int ACTION_THROTTLE = 1;\r
+\r
+    // Configuration\r
+    private static boolean enabled = false;        // enabled or not\r
+    private static int numRequests = 0;            // number of requests in M minutes\r
+    private static int samplingPeriod = 0;            // sampling period\r
+    private static int action = ACTION_DROP;    // action to take (throttle or drop)\r
+\r
+    private static EELFLogger logger = EELFManager.getInstance().getLogger("InternalLog");\r
+    private static Map<String, Counter> map = new HashMap<>();\r
+    private static final Timer rolex = new Timer();\r
+\r
+    @Override\r
+    public void init(FilterConfig arg0) throws ServletException {\r
+        configure();\r
+        rolex.scheduleAtFixedRate(this, 5 * 60000L, 5 * 60000L);    // Run once every 5 minutes to clean map\r
+    }\r
+\r
+    /**\r
+     * Configure the throttle.  This should be called from BaseServlet.provisioningParametersChanged(), to make sure it stays up to date.\r
+     */\r
+    public static void configure() {\r
+        Parameters p = Parameters.getParameter(Parameters.THROTTLE_FILTER);\r
+        if (p != null) {\r
+            try {\r
+                Class.forName(JETTY_REQUEST);\r
+                String v = p.getValue();\r
+                if (v != null && !"off".equals(v)) {\r
+                    String[] pp = v.split(",");\r
+                    if (pp != null) {\r
+                        numRequests = (pp.length > 0) ? getInt(pp[0], DEFAULT_N) : DEFAULT_N;\r
+                        samplingPeriod = (pp.length > 1) ? getInt(pp[1], DEFAULT_M) : DEFAULT_M;\r
+                        action = (pp.length > 2 && pp[2] != null && "throttle".equalsIgnoreCase(pp[2])) ? ACTION_THROTTLE : ACTION_DROP;\r
+                        enabled = true;\r
+                        // ACTION_THROTTLE is not currently working, so is not supported\r
+                        if (action == ACTION_THROTTLE) {\r
+                            action = ACTION_DROP;\r
+                            logger.info("Throttling is not currently supported; action changed to DROP");\r
+                        }\r
+                        logger.info("ThrottleFilter is ENABLED for /publish requests; N=" + numRequests + ", M=" + samplingPeriod\r
+                            + ", Action=" + action);\r
+                        return;\r
+                    }\r
+                }\r
+            } catch (ClassNotFoundException e) {\r
+                logger.warn("Class " + JETTY_REQUEST + " is not available; this filter requires Jetty.", e);\r
+            }\r
+        }\r
+        logger.info("ThrottleFilter is DISABLED for /publish requests.");\r
+        enabled = false;\r
+        map.clear();\r
+    }\r
+\r
+    private static int getInt(String s, int deflt) {\r
+        try {\r
+            return Integer.parseInt(s);\r
+        } catch (NumberFormatException x) {\r
+            return deflt;\r
+        }\r
+    }\r
+\r
+    @Override\r
+    public void destroy() {\r
+        rolex.cancel();\r
+        map.clear();\r
+    }\r
+\r
+    @Override\r
+    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)\r
+            throws IOException, ServletException {\r
+        if (enabled && action == ACTION_THROTTLE) {\r
+            throttleFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);\r
+        } else if (enabled) {\r
+            dropFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);\r
+        } else {\r
+            chain.doFilter(request, response);\r
+        }\r
+    }\r
+\r
+    public void dropFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)\r
+            throws IOException, ServletException {\r
+        int rate = getRequestRate(request);\r
+        if (rate >= numRequests) {\r
+            // drop request - only works under Jetty\r
+            String m = String.format("Dropping connection: %s %d bad connections in %d minutes", getConnectionId(request), rate,\r
+                samplingPeriod);\r
+            logger.info(m);\r
+            Request baseRequest = (request instanceof Request)\r
+                    ? (Request) request\r
+                    : HttpConnection.getCurrentConnection().getHttpChannel().getRequest();\r
+            baseRequest.getHttpChannel().getEndPoint().close();\r
+        } else {\r
+            chain.doFilter(request, response);\r
+        }\r
+    }\r
+\r
+    private void throttleFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)\r
+            throws IOException, ServletException {\r
+        // throttle request\r
+        String id = getConnectionId(request);\r
+        int rate = getRequestRate(request);\r
+        Object results = request.getAttribute(THROTTLE_MARKER);\r
+        if (rate >= numRequests && results == null) {\r
+            String m = String.format("Throttling connection: %s %d bad connections in %d minutes",\r
+                getConnectionId(request), rate, samplingPeriod);\r
+            logger.info(m);\r
+            Continuation continuation = ContinuationSupport.getContinuation(request);\r
+            continuation.suspend();\r
+            register(id, continuation);\r
+            continuation.undispatch();\r
+        } else {\r
+            chain.doFilter(request, response);\r
+            @SuppressWarnings("resource")\r
+            InputStream is = request.getInputStream();\r
+            byte[] b = new byte[4096];\r
+            int n = is.read(b);\r
+            while (n > 0) {\r
+                n = is.read(b);\r
+            }\r
+            resume(id);\r
+        }\r
+    }\r
+\r
+    private Map<String, List<Continuation>> suspendedRequests = new HashMap<>();\r
 \r
-       // Configuration\r
-       private static boolean enabled = false;         // enabled or not\r
-       private static int n_requests = 0;                      // number of requests in M minutes\r
-       private static int m_minutes = 0;                       // sampling period\r
-       private static int action = ACTION_DROP;        // action to take (throttle or drop)\r
+    private void register(String id, Continuation continuation) {\r
+        synchronized (suspendedRequests) {\r
+            List<Continuation> list = suspendedRequests.get(id);\r
+            if (list == null) {\r
+                list = new ArrayList<>();\r
+                suspendedRequests.put(id, list);\r
+            }\r
+            list.add(continuation);\r
+        }\r
+    }\r
 \r
-       private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.provisioning.internal");\r
-       private static Map<String, Counter> map = new HashMap<String, Counter>();\r
-       private static final Timer rolex = new Timer();\r
+    private void resume(String id) {\r
+        synchronized (suspendedRequests) {\r
+            List<Continuation> list = suspendedRequests.get(id);\r
+            if (list != null) {\r
+                // when the waited for event happens\r
+                Continuation continuation = list.remove(0);\r
+                continuation.setAttribute(ThrottleFilter.THROTTLE_MARKER, new Object());\r
+                continuation.resume();\r
+            }\r
+        }\r
+    }\r
 \r
-       @Override\r
-       public void init(FilterConfig arg0) throws ServletException {\r
-               configure();\r
-               rolex.scheduleAtFixedRate(this, 5*60000L, 5*60000L);    // Run once every 5 minutes to clean map\r
-       }\r
+    /**\r
+     * Return a count of number of requests in the last M minutes, iff this is a "bad" request.\r
+     * If the request has been resumed (if it contains the THROTTLE_MARKER) it is considered good.\r
+     *\r
+     * @param request the request\r
+     * @return number of requests in the last M minutes, 0 means it is a "good" request\r
+     */\r
+    private int getRequestRate(HttpServletRequest request) {\r
+        String expecthdr = request.getHeader("Expect");\r
+        if (expecthdr != null && "100-continue".equalsIgnoreCase(expecthdr))\r
+            return 0;\r
 \r
-       /**\r
-        * Configure the throttle.  This should be called from BaseServlet.provisioningParametersChanged(), to make sure it stays up to date.\r
-        */\r
-       public static void configure() {\r
-               Parameters p = Parameters.getParameter(Parameters.THROTTLE_FILTER);\r
-               if (p != null) {\r
-                       try {\r
-                               Class.forName(JETTY_REQUEST);\r
-                               String v = p.getValue();\r
-                               if (v != null && !v.equals("off")) {\r
-                                       String[] pp = v.split(",");\r
-                                       if (pp != null) {\r
-                                               n_requests = (pp.length > 0) ? getInt(pp[0], DEFAULT_N) : DEFAULT_N;\r
-                                               m_minutes  = (pp.length > 1) ? getInt(pp[1], DEFAULT_M) : DEFAULT_M;\r
-                                               action     = (pp.length > 2 && pp[2] != null && pp[2].equalsIgnoreCase("throttle")) ? ACTION_THROTTLE : ACTION_DROP;\r
-                                               enabled    = true;\r
-                                               // ACTION_THROTTLE is not currently working, so is not supported\r
-                                               if (action == ACTION_THROTTLE) {\r
-                                                       action = ACTION_DROP;\r
-                                                       logger.info("Throttling is not currently supported; action changed to DROP");\r
-                                               }\r
-                                               logger.info("ThrottleFilter is ENABLED for /publish requests; N="+n_requests+", M="+m_minutes+", Action="+action);\r
-                                               return;\r
-                                       }\r
-                               }\r
-                       } catch (ClassNotFoundException e) {\r
-                               logger.warn("Class "+JETTY_REQUEST+" is not available; this filter requires Jetty.");\r
-                       }\r
-               }\r
-               logger.info("ThrottleFilter is DISABLED for /publish requests.");\r
-               enabled = false;\r
-               map.clear();\r
-       }\r
-       private static int getInt(String s, int deflt) {\r
-               try {\r
-                       return Integer.parseInt(s);\r
-               } catch (NumberFormatException x) {\r
-                       return deflt;\r
-               }\r
-       }\r
-       @Override\r
-       public void destroy() {\r
-               rolex.cancel();\r
-               map.clear();\r
-       }\r
+        String key = getConnectionId(request);\r
+        synchronized (map) {\r
+            Counter cnt = map.get(key);\r
+            if (cnt == null) {\r
+                cnt = new Counter();\r
+                map.put(key, cnt);\r
+            }\r
+            return cnt.getRequestRate();\r
+        }\r
+    }\r
 \r
-       @Override\r
-       public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)\r
-               throws IOException, ServletException\r
-       {\r
-               if (enabled && action == ACTION_THROTTLE) {\r
-                       throttleFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);\r
-               } else if (enabled) {\r
-                       dropFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);\r
-               } else {\r
-                       chain.doFilter(request, response);\r
-               }\r
-       }\r
-       public void dropFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)\r
-               throws IOException, ServletException\r
-       {\r
-               int rate = getRequestRate((HttpServletRequest) request);\r
-               if (rate >= n_requests) {\r
-                       // drop request - only works under Jetty\r
-                       String m = String.format("Dropping connection: %s %d bad connections in %d minutes", getConnectionId((HttpServletRequest) request), rate, m_minutes);\r
-                       logger.info(m);\r
-                       Request base_request = (request instanceof Request)\r
-                               ? (Request) request\r
-                               : AbstractHttpConnection.getCurrentConnection().getRequest();\r
-                       base_request.getConnection().getEndPoint().close();\r
-               } else {\r
-                       chain.doFilter(request, response);\r
-               }\r
-       }\r
-       public void throttleFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)\r
-               throws IOException, ServletException\r
-       {\r
-               // throttle request\r
-               String id = getConnectionId((HttpServletRequest) request);\r
-               int rate = getRequestRate((HttpServletRequest) request);\r
-               Object results = request.getAttribute(THROTTLE_MARKER);\r
-               if (rate >= n_requests && results == null) {\r
-                       String m = String.format("Throttling connection: %s %d bad connections in %d minutes", getConnectionId((HttpServletRequest) request), rate, m_minutes);\r
-                       logger.info(m);\r
-                       Continuation continuation = ContinuationSupport.getContinuation(request);\r
-                       continuation.suspend();\r
-                       register(id, continuation);\r
-                       continuation.undispatch();\r
-               } else {\r
-                       chain.doFilter(request, response);\r
-                       @SuppressWarnings("resource")\r
-                       InputStream is = request.getInputStream();\r
-                       byte[] b = new byte[4096];\r
-                       int n = is.read(b);\r
-                       while (n > 0) {\r
-                               n = is.read(b);\r
-                       }\r
-                       resume(id);\r
-               }\r
-       }\r
-       private Map<String, List<Continuation>> suspended_requests = new HashMap<String, List<Continuation>>();\r
-       private void register(String id, Continuation continuation) {\r
-               synchronized (suspended_requests) {\r
-                       List<Continuation> list = suspended_requests.get(id);\r
-                       if (list == null) {\r
-                               list = new ArrayList<Continuation>();\r
-                               suspended_requests.put(id,  list);\r
-                       }\r
-                       list.add(continuation);\r
-               }\r
-       }\r
-       private void resume(String id) {\r
-               synchronized (suspended_requests) {\r
-                       List<Continuation> list = suspended_requests.get(id);\r
-                       if (list != null) {\r
-                               // when the waited for event happens\r
-                               Continuation continuation = list.remove(0);\r
-                               continuation.setAttribute(ThrottleFilter.THROTTLE_MARKER, new Object());\r
-                               continuation.resume();\r
-                       }\r
-               }\r
-       }\r
+    public class Counter {\r
+        private List<Long> times = new ArrayList<>();    // a record of request times\r
 \r
-       /**\r
-        * Return a count of number of requests in the last M minutes, iff this is a "bad" request.\r
-        * If the request has been resumed (if it contains the THROTTLE_MARKER) it is considered good.\r
-        * @param request the request\r
-        * @return number of requests in the last M minutes, 0 means it is a "good" request\r
-        */\r
-       private int getRequestRate(HttpServletRequest request) {\r
-               String expecthdr = request.getHeader("Expect");\r
-               if (expecthdr != null && expecthdr.equalsIgnoreCase("100-continue"))\r
-                       return 0;\r
+        public int prune() {\r
+            try {\r
+                long n = System.currentTimeMillis() - (samplingPeriod * ONE_MINUTE);\r
+                long t = times.get(0);\r
+                while (t < n) {\r
+                    times.remove(0);\r
+                    t = times.get(0);\r
+                }\r
+            } catch (IndexOutOfBoundsException e) {\r
+                logger.trace("Exception: " + e.getMessage(), e);\r
+            }\r
+            return times.size();\r
+        }\r
 \r
-               String key = getConnectionId(request);\r
-               synchronized (map) {\r
-                       Counter cnt = map.get(key);\r
-                       if (cnt == null) {\r
-                               cnt = new Counter();\r
-                               map.put(key, cnt);\r
-                       }\r
-                       int n = cnt.getRequestRate();\r
-                       return n;\r
-               }\r
-       }\r
+        public int getRequestRate() {\r
+            times.add(System.currentTimeMillis());\r
+            return prune();\r
+        }\r
+    }\r
 \r
-       public class Counter {\r
-               private List<Long> times = new Vector<Long>();  // a record of request times\r
-               public int prune() {\r
-                       try {\r
-                               long n = System.currentTimeMillis() - (m_minutes * ONE_MINUTE);\r
-                               long t = times.get(0);\r
-                               while (t < n) {\r
-                                       times.remove(0);\r
-                                       t = times.get(0);\r
-                               }\r
-                       } catch (IndexOutOfBoundsException e) {\r
-                               // ignore\r
-                       }\r
-                       return times.size();\r
-               }\r
-               public int getRequestRate() {\r
-                       times.add(System.currentTimeMillis());\r
-                       return prune();\r
-               }\r
-       }\r
+    /**\r
+     * Identify a connection by endpoint IP address, and feed ID.\r
+     */\r
+    private String getConnectionId(HttpServletRequest req) {\r
+        return req.getRemoteAddr() + "/" + getFeedId(req);\r
+    }\r
 \r
-       /**\r
-        *  Identify a connection by endpoint IP address, and feed ID.\r
-        */\r
-       private String getConnectionId(HttpServletRequest req) {\r
-               return req.getRemoteAddr() + "/" + getFeedId(req);\r
-       }\r
-       private int getFeedId(HttpServletRequest req) {\r
-               String path = req.getPathInfo();\r
-               if (path == null || path.length() < 2)\r
-                       return -1;\r
-               path = path.substring(1);\r
-               int ix = path.indexOf('/');\r
-               if (ix < 0 || ix == path.length()-1)\r
-                       return -2;\r
-               try {\r
-                       int feedid = Integer.parseInt(path.substring(0, ix));\r
-                       return feedid;\r
-               } catch (NumberFormatException e) {\r
-                       return -1;\r
-               }\r
-       }\r
+    private int getFeedId(HttpServletRequest req) {\r
+        String path = req.getPathInfo();\r
+        if (path == null || path.length() < 2)\r
+            return -1;\r
+        path = path.substring(1);\r
+        int ix = path.indexOf('/');\r
+        if (ix < 0 || ix == path.length() - 1)\r
+            return -2;\r
+        try {\r
+            return Integer.parseInt(path.substring(0, ix));\r
+        } catch (NumberFormatException e) {\r
+            return -1;\r
+        }\r
+    }\r
 \r
-       @Override\r
-       public void run() {\r
-               // Once every 5 minutes, go through the map, and remove empty entrys\r
-               for (Object s : map.keySet().toArray()) {\r
-                       synchronized (map) {\r
-                               Counter c = map.get(s);\r
-                               if (c.prune() <= 0)\r
-                                       map.remove(s);\r
-                       }\r
-               }\r
-       }\r
+    @Override\r
+    public void run() {\r
+        // Once every 5 minutes, go through the map, and remove empty entrys\r
+        for (Object s : map.keySet().toArray()) {\r
+            synchronized (map) {\r
+                Counter c = map.get(s);\r
+                if (c.prune() <= 0)\r
+                    map.remove(s);\r
+            }\r
+        }\r
+    }\r
 }\r