1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements. See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License. You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
28 from aria import operation
29 from aria.modeling import models
30 from aria.orchestrator import events
31 from aria.utils.plugin import create as create_plugin
32 from aria.orchestrator.workflows.executor import process
35 import tests.resources
36 from tests.helpers import FilesystemDataHolder
37 from tests.fixtures import ( # pylint: disable=unused-import
41 from . import MockContext
44 class TestProcessExecutor(object):
46 def test_plugin_execution(self, executor, mock_plugin, model, queue):
49 task_kwargs=dict(function='mock_plugin1.operation', plugin_fk=mock_plugin.id)
53 error = queue.get(timeout=60)
54 # tests/resources/plugins/mock-plugin1 is the plugin installed
55 # during this tests setup. The module mock_plugin1 contains a single
56 # operation named "operation" which calls an entry point defined in the plugin's
57 # setup.py. This entry points simply prints 'mock-plugin-output' to stdout.
58 # The "operation" operation that called this subprocess, then raises a RuntimeError
59 # with that subprocess output as the error message.
60 # This is what we assert here. This tests checks that both the PYTHONPATH (operation)
61 # and PATH (entry point) are properly updated in the subprocess in which the task is
63 assert isinstance(error, RuntimeError)
64 assert error.message == 'mock-plugin-output'
66 def test_closed(self, executor, model):
68 with pytest.raises(RuntimeError) as exc_info:
69 executor.execute(MockContext(model, task_kwargs=dict(function='some.function')))
70 assert 'closed' in exc_info.value.message
72 def test_process_termination(self, executor, model, fs_test_holder, tmpdir):
73 freeze_script_path = str(tmpdir.join('freeze_script'))
74 with open(freeze_script_path, 'w+b') as f:
81 holder_path_argument = models.Argument.wrap('holder_path', fs_test_holder._path)
82 script_path_argument = models.Argument.wrap('freezing_script_path',
83 str(tmpdir.join('freeze_script')))
85 model.argument.put(holder_path_argument)
86 model.argument.put(script_path_argument)
90 function='{0}.{1}'.format(__name__, freezing_task.__name__),
91 arguments=dict(holder_path=holder_path_argument,
92 freezing_script_path=script_path_argument)),
97 @retrying.retry(retry_on_result=lambda r: r is False, stop_max_delay=60000, wait_fixed=500)
98 def wait_for_extra_process_id():
99 return fs_test_holder.get('subproc', False)
101 task_pid = executor._tasks[ctx.task.id].proc.pid
102 extra_process_pid = wait_for_extra_process_id()
104 assert set([task_pid, extra_process_pid]).issubset(set(psutil.pids()))
105 executor.terminate(ctx.task.id)
107 # Give a chance to the processes to terminate
110 # all processes should be either zombies or non existent
111 pids = [task_pid, extra_process_pid]
113 if pid in psutil.pids():
114 assert psutil.Process(pid).status() == psutil.STATUS_ZOMBIE
116 # making the test more readable
117 assert pid not in psutil.pids()
122 _queue = Queue.Queue()
124 def handler(_, exception=None, **kwargs):
125 _queue.put(exception)
127 events.on_success_task_signal.connect(handler)
128 events.on_failure_task_signal.connect(handler)
132 events.on_success_task_signal.disconnect(handler)
133 events.on_failure_task_signal.disconnect(handler)
137 def fs_test_holder(tmpdir):
138 dataholder_path = str(tmpdir.join('dataholder'))
139 holder = FilesystemDataHolder(dataholder_path)
144 def executor(plugin_manager):
145 result = process.ProcessExecutor(plugin_manager=plugin_manager, python_path=[tests.ROOT_DIR])
153 def mock_plugin(plugin_manager, tmpdir):
154 source = os.path.join(tests.resources.DIR, 'plugins', 'mock-plugin1')
155 plugin_path = create_plugin(source=source, destination_dir=str(tmpdir))
156 return plugin_manager.install(source=plugin_path)
161 _storage = aria.application_model_storage(aria.storage.sql_mapi.SQLAlchemyModelAPI,
162 initiator_kwargs=dict(base_dir=str(tmpdir)))
164 tests.storage.release_sqlite_storage(_storage)
168 def freezing_task(holder_path, freezing_script_path, **_):
169 holder = FilesystemDataHolder(holder_path)
170 holder['subproc'] = subprocess.Popen([sys.executable, freezing_script_path], shell=True).pid