1 /*******************************************************************************
\r
2 * ============LICENSE_START==================================================
\r
4 * * ===========================================================================
\r
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * * ===========================================================================
\r
7 * * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * * you may not use this file except in compliance with the License.
\r
9 * * You may obtain a copy of the License at
\r
11 * * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * * Unless required by applicable law or agreed to in writing, software
\r
14 * * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * * See the License for the specific language governing permissions and
\r
17 * * limitations under the License.
\r
18 * * ============LICENSE_END====================================================
\r
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
\r
22 ******************************************************************************/
\r
25 package org.onap.dmaap.datarouter.provisioning.utils;
\r
27 import java.io.IOException;
\r
28 import java.io.InputStream;
\r
29 import java.util.ArrayList;
\r
30 import java.util.HashMap;
\r
31 import java.util.List;
\r
32 import java.util.Map;
\r
33 import java.util.Timer;
\r
34 import java.util.TimerTask;
\r
35 import java.util.Vector;
\r
37 import javax.servlet.Filter;
\r
38 import javax.servlet.FilterChain;
\r
39 import javax.servlet.FilterConfig;
\r
40 import javax.servlet.ServletException;
\r
41 import javax.servlet.ServletRequest;
\r
42 import javax.servlet.ServletResponse;
\r
43 import javax.servlet.http.HttpServletRequest;
\r
44 import javax.servlet.http.HttpServletResponse;
\r
46 import org.apache.log4j.Logger;
\r
47 import org.eclipse.jetty.continuation.Continuation;
\r
48 import org.eclipse.jetty.continuation.ContinuationSupport;
\r
49 import org.eclipse.jetty.server.AbstractHttpConnection;
\r
50 import org.eclipse.jetty.server.Request;
\r
51 import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
\r
54 * This filter checks /publish requests to the provisioning server to allow ill-behaved publishers to be throttled.
\r
55 * It is configured via the provisioning parameter THROTTLE_FILTER.
\r
56 * The THROTTLE_FILTER provisioning parameter can have these values:
\r
58 * <tr><td>(no value)</td><td>filter disabled</td></tr>
\r
59 * <tr><td>off</td><td>filter disabled</td></tr>
\r
60 * <tr><td>N[,M[,action]]</td><td>set N, M, and action (used in the algorithm below).
\r
61 * Action is <i>drop</i> or <i>throttle</i>.
\r
62 * If M is missing, it defaults to 5 minutes.
\r
63 * If the action is missing, it defaults to <i>drop</i>.
\r
67 * The <i>action</i> is triggered iff:
\r
69 * <li>the filter is enabled, and</li>
\r
70 * <li>N /publish requests come to the provisioning server in M minutes
\r
72 * <li>from the same IP address</li>
\r
73 * <li>for the same feed</li>
\r
74 * <li>lacking the <i>Expect: 100-continue</i> header</li>
\r
78 * The action that can be performed (if triggered) are:
\r
80 * <li><i>drop</i> - the connection is dropped immediately.</li>
\r
81 * <li><i>throttle</i> - [not supported] the connection is put into a low priority queue with all other throttled connections.
\r
82 * These are then processed at a slower rate. Note: this option does not work correctly, and is disabled.
\r
83 * The only action that is supported is <i>drop</i>.
\r
87 * @author Robert Eby
\r
88 * @version $Id: ThrottleFilter.java,v 1.2 2014/03/12 19:45:41 eby Exp $
\r
90 public class ThrottleFilter extends TimerTask implements Filter {
\r
91 public static final int DEFAULT_N = 10;
\r
92 public static final int DEFAULT_M = 5;
\r
93 public static final String THROTTLE_MARKER = "org.onap.dmaap.datarouter.provisioning.THROTTLE_MARKER";
\r
94 private static final String JETTY_REQUEST = "org.eclipse.jetty.server.Request";
\r
95 private static final long ONE_MINUTE = 60000L;
\r
96 private static final int ACTION_DROP = 0;
\r
97 private static final int ACTION_THROTTLE = 1;
\r
100 private static boolean enabled = false; // enabled or not
\r
101 private static int n_requests = 0; // number of requests in M minutes
\r
102 private static int m_minutes = 0; // sampling period
\r
103 private static int action = ACTION_DROP; // action to take (throttle or drop)
\r
105 private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.provisioning.internal");
\r
106 private static Map<String, Counter> map = new HashMap<String, Counter>();
\r
107 private static final Timer rolex = new Timer();
\r
110 public void init(FilterConfig arg0) throws ServletException {
\r
112 rolex.scheduleAtFixedRate(this, 5*60000L, 5*60000L); // Run once every 5 minutes to clean map
\r
116 * Configure the throttle. This should be called from BaseServlet.provisioningParametersChanged(), to make sure it stays up to date.
\r
118 public static void configure() {
\r
119 Parameters p = Parameters.getParameter(Parameters.THROTTLE_FILTER);
\r
122 Class.forName(JETTY_REQUEST);
\r
123 String v = p.getValue();
\r
124 if (v != null && !v.equals("off")) {
\r
125 String[] pp = v.split(",");
\r
127 n_requests = (pp.length > 0) ? getInt(pp[0], DEFAULT_N) : DEFAULT_N;
\r
128 m_minutes = (pp.length > 1) ? getInt(pp[1], DEFAULT_M) : DEFAULT_M;
\r
129 action = (pp.length > 2 && pp[2] != null && pp[2].equalsIgnoreCase("throttle")) ? ACTION_THROTTLE : ACTION_DROP;
\r
131 // ACTION_THROTTLE is not currently working, so is not supported
\r
132 if (action == ACTION_THROTTLE) {
\r
133 action = ACTION_DROP;
\r
134 logger.info("Throttling is not currently supported; action changed to DROP");
\r
136 logger.info("ThrottleFilter is ENABLED for /publish requests; N="+n_requests+", M="+m_minutes+", Action="+action);
\r
140 } catch (ClassNotFoundException e) {
\r
141 logger.warn("Class "+JETTY_REQUEST+" is not available; this filter requires Jetty.");
\r
144 logger.info("ThrottleFilter is DISABLED for /publish requests.");
\r
148 private static int getInt(String s, int deflt) {
\r
150 return Integer.parseInt(s);
\r
151 } catch (NumberFormatException x) {
\r
156 public void destroy() {
\r
162 public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
\r
163 throws IOException, ServletException
\r
165 if (enabled && action == ACTION_THROTTLE) {
\r
166 throttleFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);
\r
167 } else if (enabled) {
\r
168 dropFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);
\r
170 chain.doFilter(request, response);
\r
173 public void dropFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)
\r
174 throws IOException, ServletException
\r
176 int rate = getRequestRate((HttpServletRequest) request);
\r
177 if (rate >= n_requests) {
\r
178 // drop request - only works under Jetty
\r
179 String m = String.format("Dropping connection: %s %d bad connections in %d minutes", getConnectionId((HttpServletRequest) request), rate, m_minutes);
\r
181 Request base_request = (request instanceof Request)
\r
182 ? (Request) request
\r
183 : AbstractHttpConnection.getCurrentConnection().getRequest();
\r
184 base_request.getConnection().getEndPoint().close();
\r
186 chain.doFilter(request, response);
\r
189 public void throttleFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)
\r
190 throws IOException, ServletException
\r
192 // throttle request
\r
193 String id = getConnectionId((HttpServletRequest) request);
\r
194 int rate = getRequestRate((HttpServletRequest) request);
\r
195 Object results = request.getAttribute(THROTTLE_MARKER);
\r
196 if (rate >= n_requests && results == null) {
\r
197 String m = String.format("Throttling connection: %s %d bad connections in %d minutes", getConnectionId((HttpServletRequest) request), rate, m_minutes);
\r
199 Continuation continuation = ContinuationSupport.getContinuation(request);
\r
200 continuation.suspend();
\r
201 register(id, continuation);
\r
202 continuation.undispatch();
\r
204 chain.doFilter(request, response);
\r
205 @SuppressWarnings("resource")
\r
206 InputStream is = request.getInputStream();
\r
207 byte[] b = new byte[4096];
\r
208 int n = is.read(b);
\r
215 private Map<String, List<Continuation>> suspended_requests = new HashMap<String, List<Continuation>>();
\r
216 private void register(String id, Continuation continuation) {
\r
217 synchronized (suspended_requests) {
\r
218 List<Continuation> list = suspended_requests.get(id);
\r
219 if (list == null) {
\r
220 list = new ArrayList<Continuation>();
\r
221 suspended_requests.put(id, list);
\r
223 list.add(continuation);
\r
226 private void resume(String id) {
\r
227 synchronized (suspended_requests) {
\r
228 List<Continuation> list = suspended_requests.get(id);
\r
229 if (list != null) {
\r
230 // when the waited for event happens
\r
231 Continuation continuation = list.remove(0);
\r
232 continuation.setAttribute(ThrottleFilter.THROTTLE_MARKER, new Object());
\r
233 continuation.resume();
\r
239 * Return a count of number of requests in the last M minutes, iff this is a "bad" request.
\r
240 * If the request has been resumed (if it contains the THROTTLE_MARKER) it is considered good.
\r
241 * @param request the request
\r
242 * @return number of requests in the last M minutes, 0 means it is a "good" request
\r
244 private int getRequestRate(HttpServletRequest request) {
\r
245 String expecthdr = request.getHeader("Expect");
\r
246 if (expecthdr != null && expecthdr.equalsIgnoreCase("100-continue"))
\r
249 String key = getConnectionId(request);
\r
250 synchronized (map) {
\r
251 Counter cnt = map.get(key);
\r
253 cnt = new Counter();
\r
256 int n = cnt.getRequestRate();
\r
261 public class Counter {
\r
262 private List<Long> times = new Vector<Long>(); // a record of request times
\r
263 public int prune() {
\r
265 long n = System.currentTimeMillis() - (m_minutes * ONE_MINUTE);
\r
266 long t = times.get(0);
\r
271 } catch (IndexOutOfBoundsException e) {
\r
274 return times.size();
\r
276 public int getRequestRate() {
\r
277 times.add(System.currentTimeMillis());
\r
283 * Identify a connection by endpoint IP address, and feed ID.
\r
285 private String getConnectionId(HttpServletRequest req) {
\r
286 return req.getRemoteAddr() + "/" + getFeedId(req);
\r
288 private int getFeedId(HttpServletRequest req) {
\r
289 String path = req.getPathInfo();
\r
290 if (path == null || path.length() < 2)
\r
292 path = path.substring(1);
\r
293 int ix = path.indexOf('/');
\r
294 if (ix < 0 || ix == path.length()-1)
\r
297 int feedid = Integer.parseInt(path.substring(0, ix));
\r
299 } catch (NumberFormatException e) {
\r
305 public void run() {
\r
306 // Once every 5 minutes, go through the map, and remove empty entrys
\r
307 for (Object s : map.keySet().toArray()) {
\r
308 synchronized (map) {
\r
309 Counter c = map.get(s);
\r
310 if (c.prune() <= 0)
\r