1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements. See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License. You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
20 from __future__ import absolute_import # so we can import standard 'threading'
24 import multiprocessing
25 from threading import (Thread, Lock)
26 from Queue import (Queue, Full, Empty)
28 from .exceptions import print_exception
30 class ExecutorException(Exception):
34 class DaemonThread(Thread):
35 def __init__(self, *args, **kwargs):
36 super(DaemonThread, self).__init__(*args, **kwargs)
41 We're overriding ``Thread.run`` in order to avoid annoying (but harmless) error messages
42 during shutdown. The problem is that CPython nullifies the global state _before_ shutting
43 down daemon threads, so that exceptions might happen, and then ``Thread.__bootstrap_inner``
46 Our solution is to swallow these exceptions here.
48 The side effect is that uncaught exceptions in our own thread code will _not_ be printed out
49 as usual, so it's our responsibility to catch them in our code.
53 super(DaemonThread, self).run()
54 except SystemExit as e:
55 # This exception should be bubbled up
58 # Exceptions might occur in daemon threads during interpreter shutdown
62 # https://gist.github.com/tliron/81dd915166b0bfc64be08b4f8e22c835
63 class FixedThreadPoolExecutor(object):
65 Executes tasks in a fixed thread pool.
67 Makes sure to gather all returned results and thrown exceptions in one place, in order of task
75 executor = FixedThreadPoolExecutor(10)
77 for value in range(100):
78 executor.submit(sum, value, value)
82 executor.raise_first()
83 print executor.returns
85 You can also use it with the Python ``with`` keyword, in which case you don't need to call
86 ``close`` explicitly::
88 with FixedThreadPoolExecutor(10) as executor:
89 for value in range(100):
90 executor.submit(sum, value, value)
92 executor.raise_first()
93 print executor.returns
96 _CYANIDE = object() # Special task marker used to kill worker threads.
101 print_exceptions=False):
103 :param size: number of threads in the pool; if ``None`` will use an optimal number for the
105 :param timeout: timeout in seconds for all blocking operations (``None`` means no timeout)
106 :param print_exceptions: set to ``True`` in order to print exceptions from tasks
110 size = multiprocessing.cpu_count() * 2 + 1
111 except NotImplementedError:
115 self.timeout = timeout
116 self.print_exceptions = print_exceptions
118 self._tasks = Queue()
120 self._exceptions = {}
121 self._id_creator = itertools.count()
122 self._lock = Lock() # for console output
125 for index in range(size):
126 worker = DaemonThread(
127 name='%s%d' % (self.__class__.__name__, index),
128 target=self._thread_worker)
130 self._workers.append(worker)
132 def submit(self, func, *args, **kwargs):
134 Submit a task for execution.
136 The task will be called ASAP on the next available worker thread in the pool.
138 :raises ExecutorException: if cannot be submitted
142 self._tasks.put((self._id_creator.next(), func, args, kwargs), timeout=self.timeout)
144 raise ExecutorException('cannot submit task: queue is full')
148 Blocks until all current tasks finish execution and all worker threads are dead.
150 You cannot submit tasks anymore after calling this.
152 This is called automatically upon exit if you are using the ``with`` keyword.
158 self._tasks.put(self._CYANIDE, timeout=self.timeout)
160 raise ExecutorException('cannot close executor: a thread seems to be hanging')
165 Blocks until all current tasks finish execution, but leaves the worker threads alive.
168 self._tasks.join() # oddly, the API does not support a timeout parameter
173 True if any of the worker threads are alive.
176 for worker in self._workers:
177 if worker.is_alive():
184 The returned values from all tasks, in order of submission.
187 return [self._returns[k] for k in sorted(self._returns)]
190 def exceptions(self):
192 The raised exceptions from all tasks, in order of submission.
195 return [self._exceptions[k] for k in sorted(self._exceptions)]
197 def raise_first(self):
199 If exceptions were thrown by any task, then the first one will be raised.
201 This is rather arbitrary: proper handling would involve iterating all the exceptions.
202 However, if you want to use the "raise" mechanism, you are limited to raising only one of
206 exceptions = self.exceptions
210 def _thread_worker(self):
212 if not self._execute_next_task():
215 def _execute_next_task(self):
217 task = self._tasks.get(timeout=self.timeout)
219 # Happens if timeout is reached
221 if task == self._CYANIDE:
224 self._execute_task(*task)
227 def _execute_task(self, task_id, func, args, kwargs):
229 result = func(*args, **kwargs)
230 self._returns[task_id] = result
231 except Exception as e:
232 self._exceptions[task_id] = e
233 if self.print_exceptions:
236 self._tasks.task_done()
241 def __exit__(self, the_type, value, traceback):
246 class LockedList(list):
248 A list that supports the ``with`` keyword with a built-in lock.
250 Though Python lists are thread-safe in that they will not raise exceptions during concurrent
251 access, they do not guarantee atomicity. This class will let you gain atomicity when needed.
254 def __init__(self, *args, **kwargs):
255 super(LockedList, self).__init__(*args, **kwargs)
259 return self.lock.__enter__()
261 def __exit__(self, the_type, value, traceback):
262 return self.lock.__exit__(the_type, value, traceback)
265 class ExceptionThread(Thread):
267 A thread from which top level exceptions can be retrieved or re-raised.
269 def __init__(self, *args, **kwargs):
270 Thread.__init__(self, *args, **kwargs)
271 self.exception = None
276 super(ExceptionThread, self).run()
277 except BaseException:
278 self.exception = sys.exc_info()
281 return self.exception is not None
283 def raise_error_if_exists(self):
285 type_, value, trace = self.exception
286 raise type_, value, trace