Update project maturity status
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / aria / utils / threading.py
1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements.  See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License.  You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """
17 Threading utilities.
18 """
19
20 from __future__ import absolute_import  # so we can import standard 'threading'
21
22 import sys
23 import itertools
24 import multiprocessing
25 from threading import (Thread, Lock)
26 from Queue import (Queue, Full, Empty)
27
28 from .exceptions import print_exception
29
30 class ExecutorException(Exception):
31     pass
32
33
34 class DaemonThread(Thread):
35     def __init__(self, *args, **kwargs):
36         super(DaemonThread, self).__init__(*args, **kwargs)
37         self.daemon = True
38
39     def run(self):
40         """
41         We're overriding ``Thread.run`` in order to avoid annoying (but harmless) error messages
42         during shutdown. The problem is that CPython nullifies the global state _before_ shutting
43         down daemon threads, so that exceptions might happen, and then ``Thread.__bootstrap_inner``
44         prints them out.
45
46         Our solution is to swallow these exceptions here.
47
48         The side effect is that uncaught exceptions in our own thread code will _not_ be printed out
49         as usual, so it's our responsibility to catch them in our code.
50         """
51
52         try:
53             super(DaemonThread, self).run()
54         except SystemExit as e:
55             # This exception should be bubbled up
56             raise e
57         except BaseException:
58             # Exceptions might occur in daemon threads during interpreter shutdown
59             pass
60
61
62 # https://gist.github.com/tliron/81dd915166b0bfc64be08b4f8e22c835
63 class FixedThreadPoolExecutor(object):
64     """
65     Executes tasks in a fixed thread pool.
66
67     Makes sure to gather all returned results and thrown exceptions in one place, in order of task
68     submission.
69
70     Example::
71
72         def sum(arg1, arg2):
73             return arg1 + arg2
74
75         executor = FixedThreadPoolExecutor(10)
76         try:
77             for value in range(100):
78                 executor.submit(sum, value, value)
79             executor.drain()
80         except:
81             executor.close()
82         executor.raise_first()
83         print executor.returns
84
85     You can also use it with the Python ``with`` keyword, in which case you don't need to call
86     ``close`` explicitly::
87
88         with FixedThreadPoolExecutor(10) as executor:
89             for value in range(100):
90                 executor.submit(sum, value, value)
91             executor.drain()
92             executor.raise_first()
93             print executor.returns
94     """
95
96     _CYANIDE = object()  # Special task marker used to kill worker threads.
97
98     def __init__(self,
99                  size=None,
100                  timeout=None,
101                  print_exceptions=False):
102         """
103         :param size: number of threads in the pool; if ``None`` will use an optimal number for the
104          platform
105         :param timeout: timeout in seconds for all blocking operations (``None`` means no timeout)
106         :param print_exceptions: set to ``True`` in order to print exceptions from tasks
107         """
108         if not size:
109             try:
110                 size = multiprocessing.cpu_count() * 2 + 1
111             except NotImplementedError:
112                 size = 3
113
114         self.size = size
115         self.timeout = timeout
116         self.print_exceptions = print_exceptions
117
118         self._tasks = Queue()
119         self._returns = {}
120         self._exceptions = {}
121         self._id_creator = itertools.count()
122         self._lock = Lock() # for console output
123
124         self._workers = []
125         for index in range(size):
126             worker = DaemonThread(
127                 name='%s%d' % (self.__class__.__name__, index),
128                 target=self._thread_worker)
129             worker.start()
130             self._workers.append(worker)
131
132     def submit(self, func, *args, **kwargs):
133         """
134         Submit a task for execution.
135
136         The task will be called ASAP on the next available worker thread in the pool.
137
138         :raises ExecutorException: if cannot be submitted
139         """
140
141         try:
142             self._tasks.put((self._id_creator.next(), func, args, kwargs), timeout=self.timeout)
143         except Full:
144             raise ExecutorException('cannot submit task: queue is full')
145
146     def close(self):
147         """
148         Blocks until all current tasks finish execution and all worker threads are dead.
149
150         You cannot submit tasks anymore after calling this.
151
152         This is called automatically upon exit if you are using the ``with`` keyword.
153         """
154
155         self.drain()
156         while self.is_alive:
157             try:
158                 self._tasks.put(self._CYANIDE, timeout=self.timeout)
159             except Full:
160                 raise ExecutorException('cannot close executor: a thread seems to be hanging')
161         self._workers = None
162
163     def drain(self):
164         """
165         Blocks until all current tasks finish execution, but leaves the worker threads alive.
166         """
167
168         self._tasks.join()  # oddly, the API does not support a timeout parameter
169
170     @property
171     def is_alive(self):
172         """
173         True if any of the worker threads are alive.
174         """
175
176         for worker in self._workers:
177             if worker.is_alive():
178                 return True
179         return False
180
181     @property
182     def returns(self):
183         """
184         The returned values from all tasks, in order of submission.
185         """
186
187         return [self._returns[k] for k in sorted(self._returns)]
188
189     @property
190     def exceptions(self):
191         """
192         The raised exceptions from all tasks, in order of submission.
193         """
194
195         return [self._exceptions[k] for k in sorted(self._exceptions)]
196
197     def raise_first(self):
198         """
199         If exceptions were thrown by any task, then the first one will be raised.
200
201         This is rather arbitrary: proper handling would involve iterating all the exceptions.
202         However, if you want to use the "raise" mechanism, you are limited to raising only one of
203         them.
204         """
205
206         exceptions = self.exceptions
207         if exceptions:
208             raise exceptions[0]
209
210     def _thread_worker(self):
211         while True:
212             if not self._execute_next_task():
213                 break
214
215     def _execute_next_task(self):
216         try:
217             task = self._tasks.get(timeout=self.timeout)
218         except Empty:
219             # Happens if timeout is reached
220             return True
221         if task == self._CYANIDE:
222             # Time to die :(
223             return False
224         self._execute_task(*task)
225         return True
226
227     def _execute_task(self, task_id, func, args, kwargs):
228         try:
229             result = func(*args, **kwargs)
230             self._returns[task_id] = result
231         except Exception as e:
232             self._exceptions[task_id] = e
233             if self.print_exceptions:
234                 with self._lock:
235                     print_exception(e)
236         self._tasks.task_done()
237
238     def __enter__(self):
239         return self
240
241     def __exit__(self, the_type, value, traceback):
242         self.close()
243         return False
244
245
246 class LockedList(list):
247     """
248     A list that supports the ``with`` keyword with a built-in lock.
249
250     Though Python lists are thread-safe in that they will not raise exceptions during concurrent
251     access, they do not guarantee atomicity. This class will let you gain atomicity when needed.
252     """
253
254     def __init__(self, *args, **kwargs):
255         super(LockedList, self).__init__(*args, **kwargs)
256         self.lock = Lock()
257
258     def __enter__(self):
259         return self.lock.__enter__()
260
261     def __exit__(self, the_type, value, traceback):
262         return self.lock.__exit__(the_type, value, traceback)
263
264
265 class ExceptionThread(Thread):
266     """
267     A thread from which top level exceptions can be retrieved or re-raised.
268     """
269     def __init__(self, *args, **kwargs):
270         Thread.__init__(self, *args, **kwargs)
271         self.exception = None
272         self.daemon = True
273
274     def run(self):
275         try:
276             super(ExceptionThread, self).run()
277         except BaseException:
278             self.exception = sys.exc_info()
279
280     def is_error(self):
281         return self.exception is not None
282
283     def raise_error_if_exists(self):
284         if self.is_error():
285             type_, value, trace = self.exception
286             raise type_, value, trace