1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements. See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License. You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
25 from aria.utils import imports, exceptions
27 from .base import BaseExecutor
30 class ThreadExecutor(BaseExecutor):
34 It's easier writing tests using this executor rather than the full-blown sub-process executor.
36 Note: This executor is incapable of running plugin operations.
39 def __init__(self, pool_size=1, close_timeout=5, *args, **kwargs):
40 super(ThreadExecutor, self).__init__(*args, **kwargs)
42 self._close_timeout = close_timeout
43 self._queue = Queue.Queue()
45 for i in range(pool_size):
46 name = 'ThreadExecutor-{index}'.format(index=i+1)
47 thread = threading.Thread(target=self._processor, name=name)
50 self._pool.append(thread)
52 def _execute(self, ctx):
57 for thread in self._pool:
58 if self._close_timeout is None:
61 thread.join(self._close_timeout)
64 while not self._stopped:
66 ctx = self._queue.get(timeout=1)
67 self._task_started(ctx)
69 task_func = imports.load_attribute(ctx.task.function)
70 arguments = dict(arg.unwrapped for arg in ctx.task.arguments.itervalues())
71 task_func(ctx=ctx, **arguments)
72 self._task_succeeded(ctx)
73 except BaseException as e:
74 self._task_failed(ctx,
76 traceback=exceptions.get_exception_as_string(*sys.exc_info()))
78 except BaseException as e: