X-Git-Url: https://gerrit.onap.org/r/gitweb?p=dmaap%2Fdatarouter.git;a=blobdiff_plain;f=datarouter-prov%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdatarouter%2Fprovisioning%2Futils%2FThrottleFilter.java;h=da06f6bbc8b54a986bbc0cc19036fa18bcad8d6e;hp=ba2ca7e7073315f220ee5d7b62f85a0ea586d0a0;hb=6c78b3e6a0a67c73f931337356a172cc69cee0e8;hpb=e4b20cc6f7c31f48ddd0de5bcd054b09a35cd510 diff --git a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/ThrottleFilter.java b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/ThrottleFilter.java index ba2ca7e7..da06f6bb 100644 --- a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/ThrottleFilter.java +++ b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/ThrottleFilter.java @@ -7,9 +7,9 @@ * * Licensed under the Apache License, Version 2.0 (the "License"); * * you may not use this file except in compliance with the License. * * You may obtain a copy of the License at - * * + * * * * http://www.apache.org/licenses/LICENSE-2.0 - * * + * * * * Unless required by applicable law or agreed to in writing, software * * distributed under the License is distributed on an "AS IS" BASIS, * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -24,6 +24,8 @@ package org.onap.dmaap.datarouter.provisioning.utils; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -32,8 +34,6 @@ import java.util.List; import java.util.Map; import java.util.Timer; import java.util.TimerTask; -import java.util.Vector; - import javax.servlet.Filter; import javax.servlet.FilterChain; import javax.servlet.FilterConfig; @@ -42,11 +42,9 @@ import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; - -import org.apache.log4j.Logger; import org.eclipse.jetty.continuation.Continuation; import org.eclipse.jetty.continuation.ContinuationSupport; -import org.eclipse.jetty.server.AbstractHttpConnection; +import org.eclipse.jetty.server.HttpConnection; import org.eclipse.jetty.server.Request; import org.onap.dmaap.datarouter.provisioning.beans.Parameters; @@ -58,9 +56,9 @@ import org.onap.dmaap.datarouter.provisioning.beans.Parameters; * (no value)filter disabled * offfilter disabled * N[,M[,action]]set N, M, and action (used in the algorithm below). - * Action is drop or throttle. - * If M is missing, it defaults to 5 minutes. - * If the action is missing, it defaults to drop. + * Action is drop or throttle. + * If M is missing, it defaults to 5 minutes. + * If the action is missing, it defaults to drop. * * *

@@ -68,19 +66,19 @@ import org.onap.dmaap.datarouter.provisioning.beans.Parameters; *

    *
  1. the filter is enabled, and
  2. *
  3. N /publish requests come to the provisioning server in M minutes - *
      - *
    1. from the same IP address
    2. - *
    3. for the same feed
    4. - *
    5. lacking the Expect: 100-continue header
    6. - *
    + *
      + *
    1. from the same IP address
    2. + *
    3. for the same feed
    4. + *
    5. lacking the Expect: 100-continue header
    6. + *
    *
  4. *
* The action that can be performed (if triggered) are: *
    *
  1. drop - the connection is dropped immediately.
  2. *
  3. throttle - [not supported] the connection is put into a low priority queue with all other throttled connections. - * These are then processed at a slower rate. Note: this option does not work correctly, and is disabled. - * The only action that is supported is drop. + * These are then processed at a slower rate. Note: this option does not work correctly, and is disabled. + * The only action that is supported is drop. *
  4. *
* @@ -88,228 +86,237 @@ import org.onap.dmaap.datarouter.provisioning.beans.Parameters; * @version $Id: ThrottleFilter.java,v 1.2 2014/03/12 19:45:41 eby Exp $ */ public class ThrottleFilter extends TimerTask implements Filter { - public static final int DEFAULT_N = 10; - public static final int DEFAULT_M = 5; - public static final String THROTTLE_MARKER = "org.onap.dmaap.datarouter.provisioning.THROTTLE_MARKER"; - private static final String JETTY_REQUEST = "org.eclipse.jetty.server.Request"; - private static final long ONE_MINUTE = 60000L; - private static final int ACTION_DROP = 0; - private static final int ACTION_THROTTLE = 1; + private static final int DEFAULT_N = 10; + private static final int DEFAULT_M = 5; + private static final String THROTTLE_MARKER = "org.onap.dmaap.datarouter.provisioning.THROTTLE_MARKER"; + private static final String JETTY_REQUEST = "org.eclipse.jetty.server.Request"; + private static final long ONE_MINUTE = 60000L; + private static final int ACTION_DROP = 0; + private static final int ACTION_THROTTLE = 1; + + // Configuration + private static boolean enabled = false; // enabled or not + private static int numRequests = 0; // number of requests in M minutes + private static int samplingPeriod = 0; // sampling period + private static int action = ACTION_DROP; // action to take (throttle or drop) + + private static EELFLogger logger = EELFManager.getInstance().getLogger("InternalLog"); + private static Map map = new HashMap<>(); + private static final Timer rolex = new Timer(); + + @Override + public void init(FilterConfig arg0) throws ServletException { + configure(); + rolex.scheduleAtFixedRate(this, 5 * 60000L, 5 * 60000L); // Run once every 5 minutes to clean map + } + + /** + * Configure the throttle. This should be called from BaseServlet.provisioningParametersChanged(), to make sure it stays up to date. + */ + public static void configure() { + Parameters p = Parameters.getParameter(Parameters.THROTTLE_FILTER); + if (p != null) { + try { + Class.forName(JETTY_REQUEST); + String v = p.getValue(); + if (v != null && !"off".equals(v)) { + String[] pp = v.split(","); + if (pp != null) { + numRequests = (pp.length > 0) ? getInt(pp[0], DEFAULT_N) : DEFAULT_N; + samplingPeriod = (pp.length > 1) ? getInt(pp[1], DEFAULT_M) : DEFAULT_M; + action = (pp.length > 2 && pp[2] != null && "throttle".equalsIgnoreCase(pp[2])) ? ACTION_THROTTLE : ACTION_DROP; + enabled = true; + // ACTION_THROTTLE is not currently working, so is not supported + if (action == ACTION_THROTTLE) { + action = ACTION_DROP; + logger.info("Throttling is not currently supported; action changed to DROP"); + } + logger.info("ThrottleFilter is ENABLED for /publish requests; N=" + numRequests + ", M=" + samplingPeriod + + ", Action=" + action); + return; + } + } + } catch (ClassNotFoundException e) { + logger.warn("Class " + JETTY_REQUEST + " is not available; this filter requires Jetty.", e); + } + } + logger.info("ThrottleFilter is DISABLED for /publish requests."); + enabled = false; + map.clear(); + } + + private static int getInt(String s, int deflt) { + try { + return Integer.parseInt(s); + } catch (NumberFormatException x) { + return deflt; + } + } + + @Override + public void destroy() { + rolex.cancel(); + map.clear(); + } + + @Override + public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) + throws IOException, ServletException { + if (enabled && action == ACTION_THROTTLE) { + throttleFilter((HttpServletRequest) request, (HttpServletResponse) response, chain); + } else if (enabled) { + dropFilter((HttpServletRequest) request, (HttpServletResponse) response, chain); + } else { + chain.doFilter(request, response); + } + } + + public void dropFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain) + throws IOException, ServletException { + int rate = getRequestRate(request); + if (rate >= numRequests) { + // drop request - only works under Jetty + String m = String.format("Dropping connection: %s %d bad connections in %d minutes", getConnectionId(request), rate, + samplingPeriod); + logger.info(m); + Request baseRequest = (request instanceof Request) + ? (Request) request + : HttpConnection.getCurrentConnection().getHttpChannel().getRequest(); + baseRequest.getHttpChannel().getEndPoint().close(); + } else { + chain.doFilter(request, response); + } + } + + private void throttleFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain) + throws IOException, ServletException { + // throttle request + String id = getConnectionId(request); + int rate = getRequestRate(request); + Object results = request.getAttribute(THROTTLE_MARKER); + if (rate >= numRequests && results == null) { + String m = String.format("Throttling connection: %s %d bad connections in %d minutes", + getConnectionId(request), rate, samplingPeriod); + logger.info(m); + Continuation continuation = ContinuationSupport.getContinuation(request); + continuation.suspend(); + register(id, continuation); + continuation.undispatch(); + } else { + chain.doFilter(request, response); + @SuppressWarnings("resource") + InputStream is = request.getInputStream(); + byte[] b = new byte[4096]; + int n = is.read(b); + while (n > 0) { + n = is.read(b); + } + resume(id); + } + } + + private Map> suspendedRequests = new HashMap<>(); - // Configuration - private static boolean enabled = false; // enabled or not - private static int n_requests = 0; // number of requests in M minutes - private static int m_minutes = 0; // sampling period - private static int action = ACTION_DROP; // action to take (throttle or drop) + private void register(String id, Continuation continuation) { + synchronized (suspendedRequests) { + List list = suspendedRequests.get(id); + if (list == null) { + list = new ArrayList<>(); + suspendedRequests.put(id, list); + } + list.add(continuation); + } + } - private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.provisioning.internal"); - private static Map map = new HashMap(); - private static final Timer rolex = new Timer(); + private void resume(String id) { + synchronized (suspendedRequests) { + List list = suspendedRequests.get(id); + if (list != null) { + // when the waited for event happens + Continuation continuation = list.remove(0); + continuation.setAttribute(ThrottleFilter.THROTTLE_MARKER, new Object()); + continuation.resume(); + } + } + } - @Override - public void init(FilterConfig arg0) throws ServletException { - configure(); - rolex.scheduleAtFixedRate(this, 5*60000L, 5*60000L); // Run once every 5 minutes to clean map - } + /** + * Return a count of number of requests in the last M minutes, iff this is a "bad" request. + * If the request has been resumed (if it contains the THROTTLE_MARKER) it is considered good. + * + * @param request the request + * @return number of requests in the last M minutes, 0 means it is a "good" request + */ + private int getRequestRate(HttpServletRequest request) { + String expecthdr = request.getHeader("Expect"); + if (expecthdr != null && "100-continue".equalsIgnoreCase(expecthdr)) + return 0; - /** - * Configure the throttle. This should be called from BaseServlet.provisioningParametersChanged(), to make sure it stays up to date. - */ - public static void configure() { - Parameters p = Parameters.getParameter(Parameters.THROTTLE_FILTER); - if (p != null) { - try { - Class.forName(JETTY_REQUEST); - String v = p.getValue(); - if (v != null && !v.equals("off")) { - String[] pp = v.split(","); - if (pp != null) { - n_requests = (pp.length > 0) ? getInt(pp[0], DEFAULT_N) : DEFAULT_N; - m_minutes = (pp.length > 1) ? getInt(pp[1], DEFAULT_M) : DEFAULT_M; - action = (pp.length > 2 && pp[2] != null && pp[2].equalsIgnoreCase("throttle")) ? ACTION_THROTTLE : ACTION_DROP; - enabled = true; - // ACTION_THROTTLE is not currently working, so is not supported - if (action == ACTION_THROTTLE) { - action = ACTION_DROP; - logger.info("Throttling is not currently supported; action changed to DROP"); - } - logger.info("ThrottleFilter is ENABLED for /publish requests; N="+n_requests+", M="+m_minutes+", Action="+action); - return; - } - } - } catch (ClassNotFoundException e) { - logger.warn("Class "+JETTY_REQUEST+" is not available; this filter requires Jetty."); - } - } - logger.info("ThrottleFilter is DISABLED for /publish requests."); - enabled = false; - map.clear(); - } - private static int getInt(String s, int deflt) { - try { - return Integer.parseInt(s); - } catch (NumberFormatException x) { - return deflt; - } - } - @Override - public void destroy() { - rolex.cancel(); - map.clear(); - } + String key = getConnectionId(request); + synchronized (map) { + Counter cnt = map.get(key); + if (cnt == null) { + cnt = new Counter(); + map.put(key, cnt); + } + return cnt.getRequestRate(); + } + } - @Override - public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) - throws IOException, ServletException - { - if (enabled && action == ACTION_THROTTLE) { - throttleFilter((HttpServletRequest) request, (HttpServletResponse) response, chain); - } else if (enabled) { - dropFilter((HttpServletRequest) request, (HttpServletResponse) response, chain); - } else { - chain.doFilter(request, response); - } - } - public void dropFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain) - throws IOException, ServletException - { - int rate = getRequestRate((HttpServletRequest) request); - if (rate >= n_requests) { - // drop request - only works under Jetty - String m = String.format("Dropping connection: %s %d bad connections in %d minutes", getConnectionId((HttpServletRequest) request), rate, m_minutes); - logger.info(m); - Request base_request = (request instanceof Request) - ? (Request) request - : AbstractHttpConnection.getCurrentConnection().getRequest(); - base_request.getConnection().getEndPoint().close(); - } else { - chain.doFilter(request, response); - } - } - public void throttleFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain) - throws IOException, ServletException - { - // throttle request - String id = getConnectionId((HttpServletRequest) request); - int rate = getRequestRate((HttpServletRequest) request); - Object results = request.getAttribute(THROTTLE_MARKER); - if (rate >= n_requests && results == null) { - String m = String.format("Throttling connection: %s %d bad connections in %d minutes", getConnectionId((HttpServletRequest) request), rate, m_minutes); - logger.info(m); - Continuation continuation = ContinuationSupport.getContinuation(request); - continuation.suspend(); - register(id, continuation); - continuation.undispatch(); - } else { - chain.doFilter(request, response); - @SuppressWarnings("resource") - InputStream is = request.getInputStream(); - byte[] b = new byte[4096]; - int n = is.read(b); - while (n > 0) { - n = is.read(b); - } - resume(id); - } - } - private Map> suspended_requests = new HashMap>(); - private void register(String id, Continuation continuation) { - synchronized (suspended_requests) { - List list = suspended_requests.get(id); - if (list == null) { - list = new ArrayList(); - suspended_requests.put(id, list); - } - list.add(continuation); - } - } - private void resume(String id) { - synchronized (suspended_requests) { - List list = suspended_requests.get(id); - if (list != null) { - // when the waited for event happens - Continuation continuation = list.remove(0); - continuation.setAttribute(ThrottleFilter.THROTTLE_MARKER, new Object()); - continuation.resume(); - } - } - } + public class Counter { + private List times = new ArrayList<>(); // a record of request times - /** - * Return a count of number of requests in the last M minutes, iff this is a "bad" request. - * If the request has been resumed (if it contains the THROTTLE_MARKER) it is considered good. - * @param request the request - * @return number of requests in the last M minutes, 0 means it is a "good" request - */ - private int getRequestRate(HttpServletRequest request) { - String expecthdr = request.getHeader("Expect"); - if (expecthdr != null && expecthdr.equalsIgnoreCase("100-continue")) - return 0; + public int prune() { + try { + long n = System.currentTimeMillis() - (samplingPeriod * ONE_MINUTE); + long t = times.get(0); + while (t < n) { + times.remove(0); + t = times.get(0); + } + } catch (IndexOutOfBoundsException e) { + logger.trace("Exception: " + e.getMessage(), e); + } + return times.size(); + } - String key = getConnectionId(request); - synchronized (map) { - Counter cnt = map.get(key); - if (cnt == null) { - cnt = new Counter(); - map.put(key, cnt); - } - int n = cnt.getRequestRate(); - return n; - } - } + public int getRequestRate() { + times.add(System.currentTimeMillis()); + return prune(); + } + } - public class Counter { - private List times = new Vector(); // a record of request times - public int prune() { - try { - long n = System.currentTimeMillis() - (m_minutes * ONE_MINUTE); - long t = times.get(0); - while (t < n) { - times.remove(0); - t = times.get(0); - } - } catch (IndexOutOfBoundsException e) { - // ignore - } - return times.size(); - } - public int getRequestRate() { - times.add(System.currentTimeMillis()); - return prune(); - } - } + /** + * Identify a connection by endpoint IP address, and feed ID. + */ + private String getConnectionId(HttpServletRequest req) { + return req.getRemoteAddr() + "/" + getFeedId(req); + } - /** - * Identify a connection by endpoint IP address, and feed ID. - */ - private String getConnectionId(HttpServletRequest req) { - return req.getRemoteAddr() + "/" + getFeedId(req); - } - private int getFeedId(HttpServletRequest req) { - String path = req.getPathInfo(); - if (path == null || path.length() < 2) - return -1; - path = path.substring(1); - int ix = path.indexOf('/'); - if (ix < 0 || ix == path.length()-1) - return -2; - try { - int feedid = Integer.parseInt(path.substring(0, ix)); - return feedid; - } catch (NumberFormatException e) { - return -1; - } - } + private int getFeedId(HttpServletRequest req) { + String path = req.getPathInfo(); + if (path == null || path.length() < 2) + return -1; + path = path.substring(1); + int ix = path.indexOf('/'); + if (ix < 0 || ix == path.length() - 1) + return -2; + try { + return Integer.parseInt(path.substring(0, ix)); + } catch (NumberFormatException e) { + return -1; + } + } - @Override - public void run() { - // Once every 5 minutes, go through the map, and remove empty entrys - for (Object s : map.keySet().toArray()) { - synchronized (map) { - Counter c = map.get(s); - if (c.prune() <= 0) - map.remove(s); - } - } - } + @Override + public void run() { + // Once every 5 minutes, go through the map, and remove empty entrys + for (Object s : map.keySet().toArray()) { + synchronized (map) { + Counter c = map.get(s); + if (c.prune() <= 0) + map.remove(s); + } + } + } }