f0f10671a82a5f9647e844bafac08101dae579f7
[dmaap/datarouter.git] / datarouter-prov / src / main / java / org / onap / dmaap / datarouter / provisioning / utils / ThrottleFilter.java
1 /*******************************************************************************\r
2  * ============LICENSE_START==================================================\r
3  * * org.onap.dmaap\r
4  * * ===========================================================================\r
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * * ===========================================================================\r
7  * * Licensed under the Apache License, Version 2.0 (the "License");\r
8  * * you may not use this file except in compliance with the License.\r
9  * * You may obtain a copy of the License at\r
10  * *\r
11  *  *      http://www.apache.org/licenses/LICENSE-2.0\r
12  * *\r
13  *  * Unless required by applicable law or agreed to in writing, software\r
14  * * distributed under the License is distributed on an "AS IS" BASIS,\r
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  * * See the License for the specific language governing permissions and\r
17  * * limitations under the License.\r
18  * * ============LICENSE_END====================================================\r
19  * *\r
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
21  * *\r
22  ******************************************************************************/\r
23 \r
24 \r
25 package org.onap.dmaap.datarouter.provisioning.utils;\r
26 \r
27 import com.att.eelf.configuration.EELFLogger;\r
28 import com.att.eelf.configuration.EELFManager;\r
29 import java.io.IOException;\r
30 import java.io.InputStream;\r
31 import java.util.ArrayList;\r
32 import java.util.HashMap;\r
33 import java.util.List;\r
34 import java.util.Map;\r
35 import java.util.Timer;\r
36 import java.util.TimerTask;\r
37 import javax.servlet.Filter;\r
38 import javax.servlet.FilterChain;\r
39 import javax.servlet.FilterConfig;\r
40 import javax.servlet.ServletException;\r
41 import javax.servlet.ServletRequest;\r
42 import javax.servlet.ServletResponse;\r
43 import javax.servlet.http.HttpServletRequest;\r
44 import javax.servlet.http.HttpServletResponse;\r
45 import org.eclipse.jetty.continuation.Continuation;\r
46 import org.eclipse.jetty.continuation.ContinuationSupport;\r
47 import org.eclipse.jetty.server.HttpConnection;\r
48 import org.eclipse.jetty.server.Request;\r
49 import org.onap.dmaap.datarouter.provisioning.beans.Parameters;\r
50 \r
51 /**\r
52  * This filter checks /publish requests to the provisioning server to allow ill-behaved publishers to be throttled.\r
53  * It is configured via the provisioning parameter THROTTLE_FILTER.\r
54  * The THROTTLE_FILTER provisioning parameter can have these values:\r
55  * <table>\r
56  * <tr><td>(no value)</td><td>filter disabled</td></tr>\r
57  * <tr><td>off</td><td>filter disabled</td></tr>\r
58  * <tr><td>N[,M[,action]]</td><td>set N, M, and action (used in the algorithm below).\r
59  * Action is <i>drop</i> or <i>throttle</i>.\r
60  * If M is missing, it defaults to 5 minutes.\r
61  * If the action is missing, it defaults to <i>drop</i>.\r
62  * </td></tr>\r
63  * </table>\r
64  *\r
65  * <p>* The <i>action</i> is triggered iff:\r
66  * <ol>\r
67  * <li>the filter is enabled, and</li>\r
68  * <li>N /publish requests come to the provisioning server in M minutes\r
69  * <ol>\r
70  * <li>from the same IP address</li>\r
71  * <li>for the same feed</li>\r
72  * <li>lacking the <i>Expect: 100-continue</i> header</li>\r
73  * </ol>\r
74  * </li>\r
75  * </ol>\r
76  * The action that can be performed (if triggered) are:\r
77  * <ol>\r
78  * <li><i>drop</i> - the connection is dropped immediately.</li>\r
79  * <li><i>throttle</i> - [not supported] the connection is put into a low priority queue\r
80  * with all other throttled connections.\r
81  * These are then processed at a slower rate.  Note: this option does not work correctly, and is disabled.\r
82  * The only action that is supported is <i>drop</i>.\r
83  * </li>\r
84  * </ol>\r
85  *\r
86  * @author Robert Eby\r
87  * @version $Id: ThrottleFilter.java,v 1.2 2014/03/12 19:45:41 eby Exp $\r
88  */\r
89 \r
90 public class ThrottleFilter extends TimerTask implements Filter {\r
91     private static final int DEFAULT_N = 10;\r
92     private static final int DEFAULT_M = 5;\r
93     private static final String THROTTLE_MARKER = "org.onap.dmaap.datarouter.provisioning.THROTTLE_MARKER";\r
94     private static final String JETTY_REQUEST = "org.eclipse.jetty.server.Request";\r
95     private static final long ONE_MINUTE = 60000L;\r
96     private static final int ACTION_DROP = 0;\r
97     private static final int ACTION_THROTTLE = 1;\r
98 \r
99     // Configuration\r
100     private static boolean enabled = false;        // enabled or not\r
101     private static int numRequests = 0;            // number of requests in M minutes\r
102     private static int samplingPeriod = 0;            // sampling period\r
103     private static int action = ACTION_DROP;    // action to take (throttle or drop)\r
104 \r
105     private static EELFLogger logger = EELFManager.getInstance().getLogger("InternalLog");\r
106     private static Map<String, Counter> map = new HashMap<>();\r
107     private Map<String, List<Continuation>> suspendedRequests = new HashMap<>();\r
108     private static final Timer rolex = new Timer();\r
109 \r
110     @Override\r
111     public void init(FilterConfig arg0) throws ServletException {\r
112         configure();\r
113         rolex.scheduleAtFixedRate(this, 5 * 60000L, 5 * 60000L);    // Run once every 5 minutes to clean map\r
114     }\r
115 \r
116     /**\r
117      * Configure the throttle.  This should be called from BaseServlet.provisioningParametersChanged(),\r
118      * to make sure it stays up to date.\r
119      */\r
120     public static void configure() {\r
121         Parameters param = Parameters.getParameter(Parameters.THROTTLE_FILTER);\r
122         if (param != null) {\r
123             try {\r
124                 Class.forName(JETTY_REQUEST);\r
125                 String val = param.getValue();\r
126                 if (val != null && !"off".equals(val)) {\r
127                     String[] pp = val.split(",");\r
128                     if (pp != null) {\r
129                         numRequests = (pp.length > 0) ? getInt(pp[0], DEFAULT_N) : DEFAULT_N;\r
130                         samplingPeriod = (pp.length > 1) ? getInt(pp[1], DEFAULT_M) : DEFAULT_M;\r
131                         action = (pp.length > 2 && pp[2] != null\r
132                                           && "throttle".equalsIgnoreCase(pp[2])) ? ACTION_THROTTLE : ACTION_DROP;\r
133                         enabled = true;\r
134                         // ACTION_THROTTLE is not currently working, so is not supported\r
135                         if (action == ACTION_THROTTLE) {\r
136                             action = ACTION_DROP;\r
137                             logger.info("Throttling is not currently supported; action changed to DROP");\r
138                         }\r
139                         logger.info("ThrottleFilter is ENABLED for /publish requests; N="\r
140                                             + numRequests + ", M=" + samplingPeriod\r
141                             + ", Action=" + action);\r
142                         return;\r
143                     }\r
144                 }\r
145             } catch (ClassNotFoundException e) {\r
146                 logger.warn("Class " + JETTY_REQUEST + " is not available; this filter requires Jetty.", e);\r
147             }\r
148         }\r
149         logger.info("ThrottleFilter is DISABLED for /publish requests.");\r
150         enabled = false;\r
151         map.clear();\r
152     }\r
153 \r
154     private static int getInt(String str, int deflt) {\r
155         try {\r
156             return Integer.parseInt(str);\r
157         } catch (NumberFormatException x) {\r
158             return deflt;\r
159         }\r
160     }\r
161 \r
162     @Override\r
163     public void destroy() {\r
164         rolex.cancel();\r
165         map.clear();\r
166     }\r
167 \r
168     @Override\r
169     public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)\r
170             throws IOException, ServletException {\r
171         if (enabled && action == ACTION_THROTTLE) {\r
172             throttleFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);\r
173         } else if (enabled) {\r
174             dropFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);\r
175         } else {\r
176             chain.doFilter(request, response);\r
177         }\r
178     }\r
179 \r
180     /**\r
181      * Method to drop filter chain.\r
182      * @param request HttpServletRequest\r
183      * @param response HttpServletResponse\r
184      * @param chain FilterChain\r
185      * @throws IOException input/output exception\r
186      * @throws ServletException servle exception\r
187      */\r
188     public void dropFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)\r
189             throws IOException, ServletException {\r
190         int rate = getRequestRate(request);\r
191         if (rate >= numRequests) {\r
192             // drop request - only works under Jetty\r
193             String str = String.format("Dropping connection: %s %d bad connections in %d minutes",\r
194                     getConnectionId(request), rate,\r
195                 samplingPeriod);\r
196             logger.info(str);\r
197             Request baseRequest = (request instanceof Request)\r
198                     ? (Request) request\r
199                     : HttpConnection.getCurrentConnection().getHttpChannel().getRequest();\r
200             baseRequest.getHttpChannel().getEndPoint().close();\r
201         } else {\r
202             chain.doFilter(request, response);\r
203         }\r
204     }\r
205 \r
206     private void throttleFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)\r
207             throws IOException, ServletException {\r
208         // throttle request\r
209         String id = getConnectionId(request);\r
210         int rate = getRequestRate(request);\r
211         Object results = request.getAttribute(THROTTLE_MARKER);\r
212         if (rate >= numRequests && results == null) {\r
213             String str = String.format("Throttling connection: %s %d bad connections in %d minutes",\r
214                 getConnectionId(request), rate, samplingPeriod);\r
215             logger.info(str);\r
216             Continuation continuation = ContinuationSupport.getContinuation(request);\r
217             continuation.suspend();\r
218             register(id, continuation);\r
219             continuation.undispatch();\r
220         } else {\r
221             chain.doFilter(request, response);\r
222             @SuppressWarnings("resource")\r
223             InputStream is = request.getInputStream();\r
224             byte[] bite = new byte[4096];\r
225             int num = is.read(bite);\r
226             while (num > 0) {\r
227                 num = is.read(bite);\r
228             }\r
229             resume(id);\r
230         }\r
231     }\r
232 \r
233     private void register(String id, Continuation continuation) {\r
234         synchronized (suspendedRequests) {\r
235             List<Continuation> list = suspendedRequests.get(id);\r
236             if (list == null) {\r
237                 list = new ArrayList<>();\r
238                 suspendedRequests.put(id, list);\r
239             }\r
240             list.add(continuation);\r
241         }\r
242     }\r
243 \r
244     private void resume(String id) {\r
245         synchronized (suspendedRequests) {\r
246             List<Continuation> list = suspendedRequests.get(id);\r
247             if (list != null) {\r
248                 // when the waited for event happens\r
249                 Continuation continuation = list.remove(0);\r
250                 continuation.setAttribute(ThrottleFilter.THROTTLE_MARKER, new Object());\r
251                 continuation.resume();\r
252             }\r
253         }\r
254     }\r
255 \r
256     /**\r
257      * Return a count of number of requests in the last M minutes, iff this is a "bad" request.\r
258      * If the request has been resumed (if it contains the THROTTLE_MARKER) it is considered good.\r
259      *\r
260      * @param request the request\r
261      * @return number of requests in the last M minutes, 0 means it is a "good" request\r
262      */\r
263     private int getRequestRate(HttpServletRequest request) {\r
264         String expecthdr = request.getHeader("Expect");\r
265         if (expecthdr != null && "100-continue".equalsIgnoreCase(expecthdr)) {\r
266             return 0;\r
267         }\r
268 \r
269         String key = getConnectionId(request);\r
270         synchronized (map) {\r
271             Counter cnt = map.get(key);\r
272             if (cnt == null) {\r
273                 cnt = new Counter();\r
274                 map.put(key, cnt);\r
275             }\r
276             return cnt.getRequestRate();\r
277         }\r
278     }\r
279 \r
280     public class Counter {\r
281         private List<Long> times = new ArrayList<>();    // a record of request times\r
282 \r
283         /**\r
284          * Method to prune request rate.\r
285          * @return times\r
286          */\r
287         public int prune() {\r
288             try {\r
289                 long num = System.currentTimeMillis() - (samplingPeriod * ONE_MINUTE);\r
290                 long time = times.get(0);\r
291                 while (time < num) {\r
292                     times.remove(0);\r
293                     time = times.get(0);\r
294                 }\r
295             } catch (IndexOutOfBoundsException e) {\r
296                 logger.trace("Exception: " + e.getMessage(), e);\r
297             }\r
298             return times.size();\r
299         }\r
300 \r
301         public int getRequestRate() {\r
302             times.add(System.currentTimeMillis());\r
303             return prune();\r
304         }\r
305     }\r
306 \r
307     /**\r
308      * Identify a connection by endpoint IP address, and feed ID.\r
309      */\r
310     private String getConnectionId(HttpServletRequest req) {\r
311         return req.getRemoteAddr() + "/" + getFeedId(req);\r
312     }\r
313 \r
314     private int getFeedId(HttpServletRequest req) {\r
315         String path = req.getPathInfo();\r
316         if (path == null || path.length() < 2) {\r
317             return -1;\r
318         }\r
319         path = path.substring(1);\r
320         int ix = path.indexOf('/');\r
321         if (ix < 0 || ix == path.length() - 1) {\r
322             return -2;\r
323         }\r
324         try {\r
325             return Integer.parseInt(path.substring(0, ix));\r
326         } catch (NumberFormatException e) {\r
327             return -1;\r
328         }\r
329     }\r
330 \r
331     @Override\r
332     public void run() {\r
333         // Once every 5 minutes, go through the map, and remove empty entrys\r
334         for (Object s : map.keySet().toArray()) {\r
335             synchronized (map) {\r
336                 Counter counter = map.get(s);\r
337                 if (counter.prune() <= 0) {\r
338                     map.remove(s);\r
339                 }\r
340             }\r
341         }\r
342     }\r
343 }\r