X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=datarouter-prov%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdatarouter%2Fprovisioning%2Futils%2FThrottleFilter.java;h=e117d36873450f58f7bd2ba2b27154c68395643e;hb=bda6aeaa60607ab4fe5af508156019d7bd5c0ce4;hp=7f8d7a8c87ed3f2407b70d2615a6ff61d70e770d;hpb=4261823d84c2b911b68cdf4cb4dc3be429ebe285;p=dmaap%2Fdatarouter.git
diff --git a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/ThrottleFilter.java b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/ThrottleFilter.java
index 7f8d7a8c..e117d368 100644
--- a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/ThrottleFilter.java
+++ b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/ThrottleFilter.java
@@ -24,6 +24,8 @@
package org.onap.dmaap.datarouter.provisioning.utils;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@@ -32,8 +34,6 @@ import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.Vector;
-
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
@@ -42,11 +42,9 @@ import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-
-import org.apache.log4j.Logger;
import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.continuation.ContinuationSupport;
-import org.eclipse.jetty.server.AbstractHttpConnection;
+import org.eclipse.jetty.server.HttpConnection;
import org.eclipse.jetty.server.Request;
import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
@@ -63,8 +61,8 @@ import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
* If the action is missing, it defaults to drop.
*
*
- *
- * The action is triggered iff:
+ *
+ *
* The action is triggered iff:
*
* - the filter is enabled, and
* - N /publish requests come to the provisioning server in M minutes
@@ -78,7 +76,8 @@ import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
* The action that can be performed (if triggered) are:
*
* - drop - the connection is dropped immediately.
- * - throttle - [not supported] the connection is put into a low priority queue with all other throttled connections.
+ *
- throttle - [not supported] the connection is put into a low priority queue
+ * with all other throttled connections.
* These are then processed at a slower rate. Note: this option does not work correctly, and is disabled.
* The only action that is supported is drop.
*
@@ -87,10 +86,11 @@ import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
* @author Robert Eby
* @version $Id: ThrottleFilter.java,v 1.2 2014/03/12 19:45:41 eby Exp $
*/
+
public class ThrottleFilter extends TimerTask implements Filter {
- public static final int DEFAULT_N = 10;
- public static final int DEFAULT_M = 5;
- public static final String THROTTLE_MARKER = "org.onap.dmaap.datarouter.provisioning.THROTTLE_MARKER";
+ private static final int DEFAULT_N = 10;
+ private static final int DEFAULT_M = 5;
+ private static final String THROTTLE_MARKER = "org.onap.dmaap.datarouter.provisioning.THROTTLE_MARKER";
private static final String JETTY_REQUEST = "org.eclipse.jetty.server.Request";
private static final long ONE_MINUTE = 60000L;
private static final int ACTION_DROP = 0;
@@ -98,12 +98,13 @@ public class ThrottleFilter extends TimerTask implements Filter {
// Configuration
private static boolean enabled = false; // enabled or not
- private static int n_requests = 0; // number of requests in M minutes
- private static int m_minutes = 0; // sampling period
+ private static int numRequests = 0; // number of requests in M minutes
+ private static int samplingPeriod = 0; // sampling period
private static int action = ACTION_DROP; // action to take (throttle or drop)
- private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.provisioning.internal");
- private static Map map = new HashMap();
+ private static final EELFLogger logger = EELFManager.getInstance().getLogger("InternalLog");
+ private static Map map = new HashMap<>();
+ private final Map> suspendedRequests = new HashMap<>();
private static final Timer rolex = new Timer();
@Override
@@ -113,32 +114,36 @@ public class ThrottleFilter extends TimerTask implements Filter {
}
/**
- * Configure the throttle. This should be called from BaseServlet.provisioningParametersChanged(), to make sure it stays up to date.
+ * Configure the throttle. This should be called from BaseServlet.provisioningParametersChanged(),
+ * to make sure it stays up to date.
*/
public static void configure() {
- Parameters p = Parameters.getParameter(Parameters.THROTTLE_FILTER);
- if (p != null) {
+ Parameters param = Parameters.getParameter(Parameters.THROTTLE_FILTER);
+ if (param != null) {
try {
Class.forName(JETTY_REQUEST);
- String v = p.getValue();
- if (v != null && !v.equals("off")) {
- String[] pp = v.split(",");
+ String val = param.getValue();
+ if (val != null && !"off".equals(val)) {
+ String[] pp = val.split(",");
if (pp != null) {
- n_requests = (pp.length > 0) ? getInt(pp[0], DEFAULT_N) : DEFAULT_N;
- m_minutes = (pp.length > 1) ? getInt(pp[1], DEFAULT_M) : DEFAULT_M;
- action = (pp.length > 2 && pp[2] != null && pp[2].equalsIgnoreCase("throttle")) ? ACTION_THROTTLE : ACTION_DROP;
+ numRequests = (pp.length > 0) ? getInt(pp[0], DEFAULT_N) : DEFAULT_N;
+ samplingPeriod = (pp.length > 1) ? getInt(pp[1], DEFAULT_M) : DEFAULT_M;
+ action = (pp.length > 2 && pp[2] != null
+ && "throttle".equalsIgnoreCase(pp[2])) ? ACTION_THROTTLE : ACTION_DROP;
enabled = true;
// ACTION_THROTTLE is not currently working, so is not supported
if (action == ACTION_THROTTLE) {
action = ACTION_DROP;
logger.info("Throttling is not currently supported; action changed to DROP");
}
- logger.info("ThrottleFilter is ENABLED for /publish requests; N=" + n_requests + ", M=" + m_minutes + ", Action=" + action);
+ logger.info("ThrottleFilter is ENABLED for /publish requests; N="
+ + numRequests + ", M=" + samplingPeriod
+ + ", Action=" + action);
return;
}
}
} catch (ClassNotFoundException e) {
- logger.warn("Class " + JETTY_REQUEST + " is not available; this filter requires Jetty.");
+ logger.warn("Class " + JETTY_REQUEST + " is not available; this filter requires Jetty.", e);
}
}
logger.info("ThrottleFilter is DISABLED for /publish requests.");
@@ -146,9 +151,9 @@ public class ThrottleFilter extends TimerTask implements Filter {
map.clear();
}
- private static int getInt(String s, int deflt) {
+ private static int getInt(String str, int deflt) {
try {
- return Integer.parseInt(s);
+ return Integer.parseInt(str);
} catch (NumberFormatException x) {
return deflt;
}
@@ -172,32 +177,43 @@ public class ThrottleFilter extends TimerTask implements Filter {
}
}
+ /**
+ * Method to drop filter chain.
+ * @param request HttpServletRequest
+ * @param response HttpServletResponse
+ * @param chain FilterChain
+ * @throws IOException input/output exception
+ * @throws ServletException servle exception
+ */
public void dropFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)
throws IOException, ServletException {
- int rate = getRequestRate((HttpServletRequest) request);
- if (rate >= n_requests) {
+ int rate = getRequestRate(request);
+ if (rate >= numRequests) {
// drop request - only works under Jetty
- String m = String.format("Dropping connection: %s %d bad connections in %d minutes", getConnectionId((HttpServletRequest) request), rate, m_minutes);
- logger.info(m);
- Request base_request = (request instanceof Request)
+ String str = String.format("Dropping connection: %s %d bad connections in %d minutes",
+ getConnectionId(request), rate,
+ samplingPeriod);
+ logger.info(str);
+ Request baseRequest = (request instanceof Request)
? (Request) request
- : AbstractHttpConnection.getCurrentConnection().getRequest();
- base_request.getConnection().getEndPoint().close();
+ : HttpConnection.getCurrentConnection().getHttpChannel().getRequest();
+ baseRequest.getHttpChannel().getEndPoint().close();
} else {
chain.doFilter(request, response);
}
}
- public void throttleFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)
+ private void throttleFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)
throws IOException, ServletException {
// throttle request
- String id = getConnectionId((HttpServletRequest) request);
- int rate = getRequestRate((HttpServletRequest) request);
+ String id = getConnectionId(request);
+ int rate = getRequestRate(request);
Object results = request.getAttribute(THROTTLE_MARKER);
- if (rate >= n_requests && results == null) {
- String m = String.format("Throttling connection: %s %d bad connections in %d minutes", getConnectionId((HttpServletRequest) request), rate, m_minutes);
- logger.info(m);
- Continuation continuation = ContinuationSupport.getContinuation(request);
+ if (rate >= numRequests && results == null) {
+ String str = String.format("Throttling connection: %s %d bad connections in %d minutes",
+ getConnectionId(request), rate, samplingPeriod);
+ logger.info(str);
+ Continuation continuation = ContinuationSupport.getContinuation((javax.servlet.ServletRequest) request);
continuation.suspend();
register(id, continuation);
continuation.undispatch();
@@ -205,31 +221,25 @@ public class ThrottleFilter extends TimerTask implements Filter {
chain.doFilter(request, response);
@SuppressWarnings("resource")
InputStream is = request.getInputStream();
- byte[] b = new byte[4096];
- int n = is.read(b);
- while (n > 0) {
- n = is.read(b);
+ byte[] bite = new byte[4096];
+ int num = is.read(bite);
+ while (num > 0) {
+ num = is.read(bite);
}
resume(id);
}
}
- private Map> suspended_requests = new HashMap>();
-
private void register(String id, Continuation continuation) {
- synchronized (suspended_requests) {
- List list = suspended_requests.get(id);
- if (list == null) {
- list = new ArrayList();
- suspended_requests.put(id, list);
- }
+ synchronized (suspendedRequests) {
+ List list = suspendedRequests.computeIfAbsent(id, k -> new ArrayList<>());
list.add(continuation);
}
}
private void resume(String id) {
- synchronized (suspended_requests) {
- List list = suspended_requests.get(id);
+ synchronized (suspendedRequests) {
+ List list = suspendedRequests.get(id);
if (list != null) {
// when the waited for event happens
Continuation continuation = list.remove(0);
@@ -248,8 +258,9 @@ public class ThrottleFilter extends TimerTask implements Filter {
*/
private int getRequestRate(HttpServletRequest request) {
String expecthdr = request.getHeader("Expect");
- if (expecthdr != null && expecthdr.equalsIgnoreCase("100-continue"))
+ if (expecthdr != null && "100-continue".equalsIgnoreCase(expecthdr)) {
return 0;
+ }
String key = getConnectionId(request);
synchronized (map) {
@@ -258,24 +269,27 @@ public class ThrottleFilter extends TimerTask implements Filter {
cnt = new Counter();
map.put(key, cnt);
}
- int n = cnt.getRequestRate();
- return n;
+ return cnt.getRequestRate();
}
}
- public class Counter {
- private List times = new Vector(); // a record of request times
+ public static class Counter {
+ private final List times = new ArrayList<>(); // a record of request times
+ /**
+ * Method to prune request rate.
+ * @return times
+ */
public int prune() {
try {
- long n = System.currentTimeMillis() - (m_minutes * ONE_MINUTE);
- long t = times.get(0);
- while (t < n) {
+ long num = System.currentTimeMillis() - (samplingPeriod * ONE_MINUTE);
+ long time = times.get(0);
+ while (time < num) {
times.remove(0);
- t = times.get(0);
+ time = times.get(0);
}
} catch (IndexOutOfBoundsException e) {
- // ignore
+ logger.trace("Exception: " + e.getMessage(), e);
}
return times.size();
}
@@ -295,15 +309,16 @@ public class ThrottleFilter extends TimerTask implements Filter {
private int getFeedId(HttpServletRequest req) {
String path = req.getPathInfo();
- if (path == null || path.length() < 2)
+ if (path == null || path.length() < 2) {
return -1;
+ }
path = path.substring(1);
int ix = path.indexOf('/');
- if (ix < 0 || ix == path.length() - 1)
+ if (ix < 0 || ix == path.length() - 1) {
return -2;
+ }
try {
- int feedid = Integer.parseInt(path.substring(0, ix));
- return feedid;
+ return Integer.parseInt(path.substring(0, ix));
} catch (NumberFormatException e) {
return -1;
}
@@ -314,9 +329,10 @@ public class ThrottleFilter extends TimerTask implements Filter {
// Once every 5 minutes, go through the map, and remove empty entrys
for (Object s : map.keySet().toArray()) {
synchronized (map) {
- Counter c = map.get(s);
- if (c.prune() <= 0)
+ Counter counter = map.get(s);
+ if (counter.prune() <= 0) {
map.remove(s);
+ }
}
}
}