Merge "remove the policy and security issue dependencies"
[dmaap/datarouter.git] / datarouter-prov / src / main / java / org / onap / dmaap / datarouter / provisioning / utils / ThrottleFilter.java
1 /*******************************************************************************\r
2  * ============LICENSE_START==================================================\r
3  * * org.onap.dmaap\r
4  * * ===========================================================================\r
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * * ===========================================================================\r
7  * * Licensed under the Apache License, Version 2.0 (the "License");\r
8  * * you may not use this file except in compliance with the License.\r
9  * * You may obtain a copy of the License at\r
10  * *\r
11  *  *      http://www.apache.org/licenses/LICENSE-2.0\r
12  * *\r
13  *  * Unless required by applicable law or agreed to in writing, software\r
14  * * distributed under the License is distributed on an "AS IS" BASIS,\r
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  * * See the License for the specific language governing permissions and\r
17  * * limitations under the License.\r
18  * * ============LICENSE_END====================================================\r
19  * *\r
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
21  * *\r
22  ******************************************************************************/\r
23 \r
24 \r
25 package org.onap.dmaap.datarouter.provisioning.utils;\r
26 \r
27 import java.io.IOException;\r
28 import java.io.InputStream;\r
29 import java.util.ArrayList;\r
30 import java.util.HashMap;\r
31 import java.util.List;\r
32 import java.util.Map;\r
33 import java.util.Timer;\r
34 import java.util.TimerTask;\r
35 import java.util.Vector;\r
36 \r
37 import javax.servlet.Filter;\r
38 import javax.servlet.FilterChain;\r
39 import javax.servlet.FilterConfig;\r
40 import javax.servlet.ServletException;\r
41 import javax.servlet.ServletRequest;\r
42 import javax.servlet.ServletResponse;\r
43 import javax.servlet.http.HttpServletRequest;\r
44 import javax.servlet.http.HttpServletResponse;\r
45 \r
46 import org.apache.log4j.Logger;\r
47 import org.eclipse.jetty.continuation.Continuation;\r
48 import org.eclipse.jetty.continuation.ContinuationSupport;\r
49 import org.eclipse.jetty.server.*;\r
50 import org.onap.dmaap.datarouter.provisioning.beans.Parameters;\r
51 \r
52 /**\r
53  * This filter checks /publish requests to the provisioning server to allow ill-behaved publishers to be throttled.\r
54  * It is configured via the provisioning parameter THROTTLE_FILTER.\r
55  * The THROTTLE_FILTER provisioning parameter can have these values:\r
56  * <table>\r
57  * <tr><td>(no value)</td><td>filter disabled</td></tr>\r
58  * <tr><td>off</td><td>filter disabled</td></tr>\r
59  * <tr><td>N[,M[,action]]</td><td>set N, M, and action (used in the algorithm below).\r
60  * Action is <i>drop</i> or <i>throttle</i>.\r
61  * If M is missing, it defaults to 5 minutes.\r
62  * If the action is missing, it defaults to <i>drop</i>.\r
63  * </td></tr>\r
64  * </table>\r
65  * <p>\r
66  * The <i>action</i> is triggered iff:\r
67  * <ol>\r
68  * <li>the filter is enabled, and</li>\r
69  * <li>N /publish requests come to the provisioning server in M minutes\r
70  * <ol>\r
71  * <li>from the same IP address</li>\r
72  * <li>for the same feed</li>\r
73  * <li>lacking the <i>Expect: 100-continue</i> header</li>\r
74  * </ol>\r
75  * </li>\r
76  * </ol>\r
77  * The action that can be performed (if triggered) are:\r
78  * <ol>\r
79  * <li><i>drop</i> - the connection is dropped immediately.</li>\r
80  * <li><i>throttle</i> - [not supported] the connection is put into a low priority queue with all other throttled connections.\r
81  * These are then processed at a slower rate.  Note: this option does not work correctly, and is disabled.\r
82  * The only action that is supported is <i>drop</i>.\r
83  * </li>\r
84  * </ol>\r
85  *\r
86  * @author Robert Eby\r
87  * @version $Id: ThrottleFilter.java,v 1.2 2014/03/12 19:45:41 eby Exp $\r
88  */\r
89 public class ThrottleFilter extends TimerTask implements Filter {\r
90     public static final int DEFAULT_N = 10;\r
91     public static final int DEFAULT_M = 5;\r
92     public static final String THROTTLE_MARKER = "org.onap.dmaap.datarouter.provisioning.THROTTLE_MARKER";\r
93     private static final String JETTY_REQUEST = "org.eclipse.jetty.server.Request";\r
94     private static final long ONE_MINUTE = 60000L;\r
95     private static final int ACTION_DROP = 0;\r
96     private static final int ACTION_THROTTLE = 1;\r
97 \r
98     // Configuration\r
99     private static boolean enabled = false;        // enabled or not\r
100     private static int n_requests = 0;            // number of requests in M minutes\r
101     private static int m_minutes = 0;            // sampling period\r
102     private static int action = ACTION_DROP;    // action to take (throttle or drop)\r
103 \r
104     private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.provisioning.internal");\r
105     private static Map<String, Counter> map = new HashMap<String, Counter>();\r
106     private static final Timer rolex = new Timer();\r
107 \r
108     @Override\r
109     public void init(FilterConfig arg0) throws ServletException {\r
110         configure();\r
111         rolex.scheduleAtFixedRate(this, 5 * 60000L, 5 * 60000L);    // Run once every 5 minutes to clean map\r
112     }\r
113 \r
114     /**\r
115      * Configure the throttle.  This should be called from BaseServlet.provisioningParametersChanged(), to make sure it stays up to date.\r
116      */\r
117     public static void configure() {\r
118         Parameters p = Parameters.getParameter(Parameters.THROTTLE_FILTER);\r
119         if (p != null) {\r
120             try {\r
121                 Class.forName(JETTY_REQUEST);\r
122                 String v = p.getValue();\r
123                 if (v != null && !v.equals("off")) {\r
124                     String[] pp = v.split(",");\r
125                     if (pp != null) {\r
126                         n_requests = (pp.length > 0) ? getInt(pp[0], DEFAULT_N) : DEFAULT_N;\r
127                         m_minutes = (pp.length > 1) ? getInt(pp[1], DEFAULT_M) : DEFAULT_M;\r
128                         action = (pp.length > 2 && pp[2] != null && pp[2].equalsIgnoreCase("throttle")) ? ACTION_THROTTLE : ACTION_DROP;\r
129                         enabled = true;\r
130                         // ACTION_THROTTLE is not currently working, so is not supported\r
131                         if (action == ACTION_THROTTLE) {\r
132                             action = ACTION_DROP;\r
133                             logger.info("Throttling is not currently supported; action changed to DROP");\r
134                         }\r
135                         logger.info("ThrottleFilter is ENABLED for /publish requests; N=" + n_requests + ", M=" + m_minutes + ", Action=" + action);\r
136                         return;\r
137                     }\r
138                 }\r
139             } catch (ClassNotFoundException e) {\r
140                 logger.warn("Class " + JETTY_REQUEST + " is not available; this filter requires Jetty.");\r
141             }\r
142         }\r
143         logger.info("ThrottleFilter is DISABLED for /publish requests.");\r
144         enabled = false;\r
145         map.clear();\r
146     }\r
147 \r
148     private static int getInt(String s, int deflt) {\r
149         try {\r
150             return Integer.parseInt(s);\r
151         } catch (NumberFormatException x) {\r
152             return deflt;\r
153         }\r
154     }\r
155 \r
156     @Override\r
157     public void destroy() {\r
158         rolex.cancel();\r
159         map.clear();\r
160     }\r
161 \r
162     @Override\r
163     public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)\r
164             throws IOException, ServletException {\r
165         if (enabled && action == ACTION_THROTTLE) {\r
166             throttleFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);\r
167         } else if (enabled) {\r
168             dropFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);\r
169         } else {\r
170             chain.doFilter(request, response);\r
171         }\r
172     }\r
173 \r
174     public void dropFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)\r
175             throws IOException, ServletException {\r
176         int rate = getRequestRate(request);\r
177         if (rate >= n_requests) {\r
178             // drop request - only works under Jetty\r
179             String m = String.format("Dropping connection: %s %d bad connections in %d minutes", getConnectionId(request), rate, m_minutes);\r
180             logger.info(m);\r
181             Request base_request = (request instanceof Request)\r
182                     ? (Request) request\r
183                     : HttpConnection.getCurrentConnection().getHttpChannel().getRequest();\r
184             base_request.getHttpChannel().getEndPoint().close();\r
185         } else {\r
186             chain.doFilter(request, response);\r
187         }\r
188     }\r
189 \r
190     public void throttleFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)\r
191             throws IOException, ServletException {\r
192         // throttle request\r
193         String id = getConnectionId(request);\r
194         int rate = getRequestRate(request);\r
195         Object results = request.getAttribute(THROTTLE_MARKER);\r
196         if (rate >= n_requests && results == null) {\r
197             String m = String.format("Throttling connection: %s %d bad connections in %d minutes", getConnectionId(request), rate, m_minutes);\r
198             logger.info(m);\r
199             Continuation continuation = ContinuationSupport.getContinuation(request);\r
200             continuation.suspend();\r
201             register(id, continuation);\r
202             continuation.undispatch();\r
203         } else {\r
204             chain.doFilter(request, response);\r
205             @SuppressWarnings("resource")\r
206             InputStream is = request.getInputStream();\r
207             byte[] b = new byte[4096];\r
208             int n = is.read(b);\r
209             while (n > 0) {\r
210                 n = is.read(b);\r
211             }\r
212             resume(id);\r
213         }\r
214     }\r
215 \r
216     private Map<String, List<Continuation>> suspended_requests = new HashMap<String, List<Continuation>>();\r
217 \r
218     private void register(String id, Continuation continuation) {\r
219         synchronized (suspended_requests) {\r
220             List<Continuation> list = suspended_requests.get(id);\r
221             if (list == null) {\r
222                 list = new ArrayList<Continuation>();\r
223                 suspended_requests.put(id, list);\r
224             }\r
225             list.add(continuation);\r
226         }\r
227     }\r
228 \r
229     private void resume(String id) {\r
230         synchronized (suspended_requests) {\r
231             List<Continuation> list = suspended_requests.get(id);\r
232             if (list != null) {\r
233                 // when the waited for event happens\r
234                 Continuation continuation = list.remove(0);\r
235                 continuation.setAttribute(ThrottleFilter.THROTTLE_MARKER, new Object());\r
236                 continuation.resume();\r
237             }\r
238         }\r
239     }\r
240 \r
241     /**\r
242      * Return a count of number of requests in the last M minutes, iff this is a "bad" request.\r
243      * If the request has been resumed (if it contains the THROTTLE_MARKER) it is considered good.\r
244      *\r
245      * @param request the request\r
246      * @return number of requests in the last M minutes, 0 means it is a "good" request\r
247      */\r
248     private int getRequestRate(HttpServletRequest request) {\r
249         String expecthdr = request.getHeader("Expect");\r
250         if (expecthdr != null && expecthdr.equalsIgnoreCase("100-continue"))\r
251             return 0;\r
252 \r
253         String key = getConnectionId(request);\r
254         synchronized (map) {\r
255             Counter cnt = map.get(key);\r
256             if (cnt == null) {\r
257                 cnt = new Counter();\r
258                 map.put(key, cnt);\r
259             }\r
260             int n = cnt.getRequestRate();\r
261             return n;\r
262         }\r
263     }\r
264 \r
265     public class Counter {\r
266         private List<Long> times = new Vector<Long>();    // a record of request times\r
267 \r
268         public int prune() {\r
269             try {\r
270                 long n = System.currentTimeMillis() - (m_minutes * ONE_MINUTE);\r
271                 long t = times.get(0);\r
272                 while (t < n) {\r
273                     times.remove(0);\r
274                     t = times.get(0);\r
275                 }\r
276             } catch (IndexOutOfBoundsException e) {\r
277                 // ignore\r
278             }\r
279             return times.size();\r
280         }\r
281 \r
282         public int getRequestRate() {\r
283             times.add(System.currentTimeMillis());\r
284             return prune();\r
285         }\r
286     }\r
287 \r
288     /**\r
289      * Identify a connection by endpoint IP address, and feed ID.\r
290      */\r
291     private String getConnectionId(HttpServletRequest req) {\r
292         return req.getRemoteAddr() + "/" + getFeedId(req);\r
293     }\r
294 \r
295     private int getFeedId(HttpServletRequest req) {\r
296         String path = req.getPathInfo();\r
297         if (path == null || path.length() < 2)\r
298             return -1;\r
299         path = path.substring(1);\r
300         int ix = path.indexOf('/');\r
301         if (ix < 0 || ix == path.length() - 1)\r
302             return -2;\r
303         try {\r
304             int feedid = Integer.parseInt(path.substring(0, ix));\r
305             return feedid;\r
306         } catch (NumberFormatException e) {\r
307             return -1;\r
308         }\r
309     }\r
310 \r
311     @Override\r
312     public void run() {\r
313         // Once every 5 minutes, go through the map, and remove empty entrys\r
314         for (Object s : map.keySet().toArray()) {\r
315             synchronized (map) {\r
316                 Counter c = map.get(s);\r
317                 if (c.prune() <= 0)\r
318                     map.remove(s);\r
319             }\r
320         }\r
321     }\r
322 }\r