1 /*******************************************************************************
\r
2 * ============LICENSE_START==================================================
\r
4 * * ===========================================================================
\r
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * * ===========================================================================
\r
7 * * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * * you may not use this file except in compliance with the License.
\r
9 * * You may obtain a copy of the License at
\r
11 * * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * * Unless required by applicable law or agreed to in writing, software
\r
14 * * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * * See the License for the specific language governing permissions and
\r
17 * * limitations under the License.
\r
18 * * ============LICENSE_END====================================================
\r
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
\r
22 ******************************************************************************/
\r
25 package org.onap.dmaap.datarouter.provisioning.utils;
\r
27 import com.att.eelf.configuration.EELFLogger;
\r
28 import com.att.eelf.configuration.EELFManager;
\r
29 import java.io.IOException;
\r
30 import java.io.InputStream;
\r
31 import java.util.ArrayList;
\r
32 import java.util.HashMap;
\r
33 import java.util.List;
\r
34 import java.util.Map;
\r
35 import java.util.Timer;
\r
36 import java.util.TimerTask;
\r
37 import javax.servlet.Filter;
\r
38 import javax.servlet.FilterChain;
\r
39 import javax.servlet.FilterConfig;
\r
40 import javax.servlet.ServletException;
\r
41 import javax.servlet.ServletRequest;
\r
42 import javax.servlet.ServletResponse;
\r
43 import javax.servlet.http.HttpServletRequest;
\r
44 import javax.servlet.http.HttpServletResponse;
\r
45 import org.eclipse.jetty.continuation.Continuation;
\r
46 import org.eclipse.jetty.continuation.ContinuationSupport;
\r
47 import org.eclipse.jetty.server.HttpConnection;
\r
48 import org.eclipse.jetty.server.Request;
\r
49 import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
\r
52 * This filter checks /publish requests to the provisioning server to allow ill-behaved publishers to be throttled.
\r
53 * It is configured via the provisioning parameter THROTTLE_FILTER.
\r
54 * The THROTTLE_FILTER provisioning parameter can have these values:
\r
56 * <tr><td>(no value)</td><td>filter disabled</td></tr>
\r
57 * <tr><td>off</td><td>filter disabled</td></tr>
\r
58 * <tr><td>N[,M[,action]]</td><td>set N, M, and action (used in the algorithm below).
\r
59 * Action is <i>drop</i> or <i>throttle</i>.
\r
60 * If M is missing, it defaults to 5 minutes.
\r
61 * If the action is missing, it defaults to <i>drop</i>.
\r
65 * The <i>action</i> is triggered iff:
\r
67 * <li>the filter is enabled, and</li>
\r
68 * <li>N /publish requests come to the provisioning server in M minutes
\r
70 * <li>from the same IP address</li>
\r
71 * <li>for the same feed</li>
\r
72 * <li>lacking the <i>Expect: 100-continue</i> header</li>
\r
76 * The action that can be performed (if triggered) are:
\r
78 * <li><i>drop</i> - the connection is dropped immediately.</li>
\r
79 * <li><i>throttle</i> - [not supported] the connection is put into a low priority queue with all other throttled connections.
\r
80 * These are then processed at a slower rate. Note: this option does not work correctly, and is disabled.
\r
81 * The only action that is supported is <i>drop</i>.
\r
85 * @author Robert Eby
\r
86 * @version $Id: ThrottleFilter.java,v 1.2 2014/03/12 19:45:41 eby Exp $
\r
88 public class ThrottleFilter extends TimerTask implements Filter {
\r
89 private static final int DEFAULT_N = 10;
\r
90 private static final int DEFAULT_M = 5;
\r
91 private static final String THROTTLE_MARKER = "org.onap.dmaap.datarouter.provisioning.THROTTLE_MARKER";
\r
92 private static final String JETTY_REQUEST = "org.eclipse.jetty.server.Request";
\r
93 private static final long ONE_MINUTE = 60000L;
\r
94 private static final int ACTION_DROP = 0;
\r
95 private static final int ACTION_THROTTLE = 1;
\r
98 private static boolean enabled = false; // enabled or not
\r
99 private static int numRequests = 0; // number of requests in M minutes
\r
100 private static int samplingPeriod = 0; // sampling period
\r
101 private static int action = ACTION_DROP; // action to take (throttle or drop)
\r
103 private static EELFLogger logger = EELFManager.getInstance().getLogger("InternalLog");
\r
104 private static Map<String, Counter> map = new HashMap<>();
\r
105 private static final Timer rolex = new Timer();
\r
108 public void init(FilterConfig arg0) throws ServletException {
\r
110 rolex.scheduleAtFixedRate(this, 5 * 60000L, 5 * 60000L); // Run once every 5 minutes to clean map
\r
114 * Configure the throttle. This should be called from BaseServlet.provisioningParametersChanged(), to make sure it stays up to date.
\r
116 public static void configure() {
\r
117 Parameters p = Parameters.getParameter(Parameters.THROTTLE_FILTER);
\r
120 Class.forName(JETTY_REQUEST);
\r
121 String v = p.getValue();
\r
122 if (v != null && !"off".equals(v)) {
\r
123 String[] pp = v.split(",");
\r
125 numRequests = (pp.length > 0) ? getInt(pp[0], DEFAULT_N) : DEFAULT_N;
\r
126 samplingPeriod = (pp.length > 1) ? getInt(pp[1], DEFAULT_M) : DEFAULT_M;
\r
127 action = (pp.length > 2 && pp[2] != null && "throttle".equalsIgnoreCase(pp[2])) ? ACTION_THROTTLE : ACTION_DROP;
\r
129 // ACTION_THROTTLE is not currently working, so is not supported
\r
130 if (action == ACTION_THROTTLE) {
\r
131 action = ACTION_DROP;
\r
132 logger.info("Throttling is not currently supported; action changed to DROP");
\r
134 logger.info("ThrottleFilter is ENABLED for /publish requests; N=" + numRequests + ", M=" + samplingPeriod
\r
135 + ", Action=" + action);
\r
139 } catch (ClassNotFoundException e) {
\r
140 logger.warn("Class " + JETTY_REQUEST + " is not available; this filter requires Jetty.", e);
\r
143 logger.info("ThrottleFilter is DISABLED for /publish requests.");
\r
148 private static int getInt(String s, int deflt) {
\r
150 return Integer.parseInt(s);
\r
151 } catch (NumberFormatException x) {
\r
157 public void destroy() {
\r
163 public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
\r
164 throws IOException, ServletException {
\r
165 if (enabled && action == ACTION_THROTTLE) {
\r
166 throttleFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);
\r
167 } else if (enabled) {
\r
168 dropFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);
\r
170 chain.doFilter(request, response);
\r
174 public void dropFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)
\r
175 throws IOException, ServletException {
\r
176 int rate = getRequestRate(request);
\r
177 if (rate >= numRequests) {
\r
178 // drop request - only works under Jetty
\r
179 String m = String.format("Dropping connection: %s %d bad connections in %d minutes", getConnectionId(request), rate,
\r
182 Request baseRequest = (request instanceof Request)
\r
183 ? (Request) request
\r
184 : HttpConnection.getCurrentConnection().getHttpChannel().getRequest();
\r
185 baseRequest.getHttpChannel().getEndPoint().close();
\r
187 chain.doFilter(request, response);
\r
191 private void throttleFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)
\r
192 throws IOException, ServletException {
\r
193 // throttle request
\r
194 String id = getConnectionId(request);
\r
195 int rate = getRequestRate(request);
\r
196 Object results = request.getAttribute(THROTTLE_MARKER);
\r
197 if (rate >= numRequests && results == null) {
\r
198 String m = String.format("Throttling connection: %s %d bad connections in %d minutes",
\r
199 getConnectionId(request), rate, samplingPeriod);
\r
201 Continuation continuation = ContinuationSupport.getContinuation(request);
\r
202 continuation.suspend();
\r
203 register(id, continuation);
\r
204 continuation.undispatch();
\r
206 chain.doFilter(request, response);
\r
207 @SuppressWarnings("resource")
\r
208 InputStream is = request.getInputStream();
\r
209 byte[] b = new byte[4096];
\r
210 int n = is.read(b);
\r
218 private Map<String, List<Continuation>> suspendedRequests = new HashMap<>();
\r
220 private void register(String id, Continuation continuation) {
\r
221 synchronized (suspendedRequests) {
\r
222 List<Continuation> list = suspendedRequests.get(id);
\r
223 if (list == null) {
\r
224 list = new ArrayList<>();
\r
225 suspendedRequests.put(id, list);
\r
227 list.add(continuation);
\r
231 private void resume(String id) {
\r
232 synchronized (suspendedRequests) {
\r
233 List<Continuation> list = suspendedRequests.get(id);
\r
234 if (list != null) {
\r
235 // when the waited for event happens
\r
236 Continuation continuation = list.remove(0);
\r
237 continuation.setAttribute(ThrottleFilter.THROTTLE_MARKER, new Object());
\r
238 continuation.resume();
\r
244 * Return a count of number of requests in the last M minutes, iff this is a "bad" request.
\r
245 * If the request has been resumed (if it contains the THROTTLE_MARKER) it is considered good.
\r
247 * @param request the request
\r
248 * @return number of requests in the last M minutes, 0 means it is a "good" request
\r
250 private int getRequestRate(HttpServletRequest request) {
\r
251 String expecthdr = request.getHeader("Expect");
\r
252 if (expecthdr != null && "100-continue".equalsIgnoreCase(expecthdr))
\r
255 String key = getConnectionId(request);
\r
256 synchronized (map) {
\r
257 Counter cnt = map.get(key);
\r
259 cnt = new Counter();
\r
262 return cnt.getRequestRate();
\r
266 public class Counter {
\r
267 private List<Long> times = new ArrayList<>(); // a record of request times
\r
269 public int prune() {
\r
271 long n = System.currentTimeMillis() - (samplingPeriod * ONE_MINUTE);
\r
272 long t = times.get(0);
\r
277 } catch (IndexOutOfBoundsException e) {
\r
278 logger.trace("Exception: " + e.getMessage(), e);
\r
280 return times.size();
\r
283 public int getRequestRate() {
\r
284 times.add(System.currentTimeMillis());
\r
290 * Identify a connection by endpoint IP address, and feed ID.
\r
292 private String getConnectionId(HttpServletRequest req) {
\r
293 return req.getRemoteAddr() + "/" + getFeedId(req);
\r
296 private int getFeedId(HttpServletRequest req) {
\r
297 String path = req.getPathInfo();
\r
298 if (path == null || path.length() < 2)
\r
300 path = path.substring(1);
\r
301 int ix = path.indexOf('/');
\r
302 if (ix < 0 || ix == path.length() - 1)
\r
305 return Integer.parseInt(path.substring(0, ix));
\r
306 } catch (NumberFormatException e) {
\r
312 public void run() {
\r
313 // Once every 5 minutes, go through the map, and remove empty entrys
\r
314 for (Object s : map.keySet().toArray()) {
\r
315 synchronized (map) {
\r
316 Counter c = map.get(s);
\r
317 if (c.prune() <= 0)
\r