1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements. See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License. You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
21 from aria import workflow
22 from aria.orchestrator import events
23 from aria.orchestrator.workflows import api
24 from aria.orchestrator.workflows.exceptions import ExecutorException
25 from aria.orchestrator.exceptions import TaskAbortException, TaskRetryException
26 from aria.orchestrator.execution_plugin import operations
27 from aria.orchestrator.execution_plugin.exceptions import ProcessException
28 from aria.orchestrator.execution_plugin import local
29 from aria.orchestrator.execution_plugin import constants
30 from aria.orchestrator.workflows.executor import process
31 from aria.orchestrator.workflows.core import engine, graph_compiler
33 from tests import mock
34 from tests import storage
35 from tests.orchestrator.workflows.helpers import events_collector
37 IS_WINDOWS = os.name == 'nt'
40 class TestLocalRunScript(object):
42 def test_script_path_parameter(self, executor, workflow_context, tmpdir):
43 script_path = self._create_script(
45 linux_script='''#! /bin/bash -e
46 ctx node attributes map key = value
49 ctx node attributes map key = value
52 executor, workflow_context,
53 script_path=script_path)
54 assert props['map'].value['key'] == 'value'
56 def test_process_env(self, executor, workflow_context, tmpdir):
57 script_path = self._create_script(
59 linux_script='''#! /bin/bash -e
60 ctx node attributes map key1 = "$key1"
61 ctx node attributes map key2 = "$key2"
64 ctx node attributes map key1 = %key1%
65 ctx node attributes map key2 = %key2%
68 executor, workflow_context,
69 script_path=script_path,
76 p_map = props['map'].value
77 assert p_map['key1'] == 'value1'
78 assert p_map['key2'] == 'value2'
80 def test_process_cwd(self, executor, workflow_context, tmpdir):
81 script_path = self._create_script(
83 linux_script='''#! /bin/bash -e
84 ctx node attributes map cwd = "$PWD"
87 ctx node attributes map cwd = %CD%
91 executor, workflow_context,
92 script_path=script_path,
96 p_map = props['map'].value
97 assert p_map['cwd'] == tmpdir
99 def test_process_command_prefix(self, executor, workflow_context, tmpdir):
100 use_ctx = 'ctx node attributes map key = value'
101 python_script = ['import subprocess',
102 'subprocess.Popen("{0}".split(' ')).communicate()[0]'.format(use_ctx)]
103 python_script = '\n'.join(python_script)
104 script_path = self._create_script(
106 linux_script=python_script,
107 windows_script=python_script,
111 executor, workflow_context,
112 script_path=script_path,
114 'env': {'TEST_KEY': 'value'},
115 'command_prefix': 'python'
117 p_map = props['map'].value
118 assert p_map['key'] == 'value'
120 def test_process_args(self, executor, workflow_context, tmpdir):
121 script_path = self._create_script(
123 linux_script='''#! /bin/bash -e
124 ctx node attributes map arg1 = "$1"
125 ctx node attributes map arg2 = "$2"
128 ctx node attributes map arg1 = %1
129 ctx node attributes map arg2 = %2
132 executor, workflow_context,
133 script_path=script_path,
135 'args': ['"arg with spaces"', 'arg2']
137 assert props['map'].value['arg1'] == 'arg with spaces'
138 assert props['map'].value['arg2'] == 'arg2'
140 def test_no_script_path(self, executor, workflow_context):
141 exception = self._run_and_get_task_exception(
142 executor, workflow_context,
144 assert isinstance(exception, TaskAbortException)
145 assert 'script_path' in exception.message
147 def test_script_error(self, executor, workflow_context, tmpdir):
148 script_path = self._create_script(
150 linux_script='''#! /bin/bash -e
152 command_that_does_not_exist [ ]
157 command_that_does_not_exist [ ]
159 exception = self._run_and_get_task_exception(
160 executor, workflow_context,
161 script_path=script_path)
162 assert isinstance(exception, ProcessException)
163 assert os.path.basename(script_path) in exception.command
164 assert exception.exit_code == 1 if IS_WINDOWS else 127
165 assert exception.stdout.strip() == '123123'
166 assert 'command_that_does_not_exist' in exception.stderr
168 def test_script_error_from_bad_ctx_request(self, executor, workflow_context, tmpdir):
169 script_path = self._create_script(
171 linux_script='''#! /bin/bash -e
172 ctx property_that_does_not_exist
175 ctx property_that_does_not_exist
177 exception = self._run_and_get_task_exception(
178 executor, workflow_context,
179 script_path=script_path)
180 assert isinstance(exception, ProcessException)
181 assert os.path.basename(script_path) in exception.command
182 assert exception.exit_code == 1
183 assert 'RequestError' in exception.stderr
184 assert 'property_that_does_not_exist' in exception.stderr
186 def test_python_script(self, executor, workflow_context, tmpdir):
188 from aria.orchestrator.execution_plugin import ctx, inputs
189 if __name__ == '__main__':
190 ctx.node.attributes['key'] = inputs['key']
193 script_path = self._create_script(
196 windows_script=script,
198 windows_suffix=suffix)
200 executor, workflow_context,
201 script_path=script_path,
202 arguments={'key': 'value'})
203 assert props['key'].value == 'value'
205 @pytest.mark.parametrize(
206 'value', ['string-value', [1, 2, 3], 999, 3.14, False,
207 {'complex1': {'complex2': {'key': 'value'}, 'list': [1, 2, 3]}}])
208 def test_inputs_as_environment_variables(self, executor, workflow_context, tmpdir, value):
209 script_path = self._create_script(
211 linux_script='''#! /bin/bash -e
212 ctx node attributes key = "${input_as_env_var}"
215 ctx node attributes key = "%input_as_env_var%"
218 executor, workflow_context,
219 script_path=script_path,
221 value = props['key'].value
222 expected = value if isinstance(value, basestring) else json.loads(value)
223 assert expected == value
225 @pytest.mark.parametrize('value', ['override', {'key': 'value'}])
226 def test_explicit_env_variables_inputs_override(
227 self, executor, workflow_context, tmpdir, value):
228 script_path = self._create_script(
230 linux_script='''#! /bin/bash -e
231 ctx node attributes key = "${input_as_env_var}"
234 ctx node attributes key = "%input_as_env_var%"
238 executor, workflow_context,
239 script_path=script_path,
240 env_var='test-value',
243 'input_as_env_var': value
246 value = props['key'].value
247 expected = value if isinstance(value, basestring) else json.loads(value)
248 assert expected == value
250 def test_get_nonexistent_runtime_property(self, executor, workflow_context, tmpdir):
251 script_path = self._create_script(
253 linux_script='''#! /bin/bash -e
254 ctx node attributes nonexistent
257 ctx node attributes nonexistent
259 exception = self._run_and_get_task_exception(
260 executor, workflow_context,
261 script_path=script_path)
262 assert isinstance(exception, ProcessException)
263 assert os.path.basename(script_path) in exception.command
264 assert 'RequestError' in exception.stderr
265 assert 'nonexistent' in exception.stderr
267 def test_get_nonexistent_runtime_property_json(self, executor, workflow_context, tmpdir):
268 script_path = self._create_script(
270 linux_script='''#! /bin/bash -e
271 ctx -j node attributes nonexistent
274 ctx -j node attributes nonexistent
276 exception = self._run_and_get_task_exception(
277 executor, workflow_context,
278 script_path=script_path)
279 assert isinstance(exception, ProcessException)
280 assert os.path.basename(script_path) in exception.command
281 assert 'RequestError' in exception.stderr
282 assert 'nonexistent' in exception.stderr
284 def test_abort(self, executor, workflow_context, tmpdir):
285 script_path = self._create_script(
287 linux_script='''#! /bin/bash -e
288 ctx task abort [ abort-message ]
291 ctx task abort [ abort-message ]
293 exception = self._run_and_get_task_exception(
294 executor, workflow_context,
295 script_path=script_path)
296 assert isinstance(exception, TaskAbortException)
297 assert exception.message == 'abort-message'
299 def test_retry(self, executor, workflow_context, tmpdir):
300 script_path = self._create_script(
302 linux_script='''#! /bin/bash -e
303 ctx task retry [ retry-message ]
306 ctx task retry [ retry-message ]
308 exception = self._run_and_get_task_exception(
309 executor, workflow_context,
310 script_path=script_path)
311 assert isinstance(exception, TaskRetryException)
312 assert exception.message == 'retry-message'
314 def test_retry_with_interval(self, executor, workflow_context, tmpdir):
315 script_path = self._create_script(
317 linux_script='''#! /bin/bash -e
318 ctx task retry [ retry-message @100 ]
321 ctx task retry [ retry-message @100 ]
323 exception = self._run_and_get_task_exception(
324 executor, workflow_context,
325 script_path=script_path)
326 assert isinstance(exception, TaskRetryException)
327 assert exception.message == 'retry-message'
328 assert exception.retry_interval == 100
330 def test_crash_abort_after_retry(self, executor, workflow_context, tmpdir):
331 script_path = self._create_script(
333 linux_script='''#! /bin/bash
334 ctx task retry [ retry-message ]
335 ctx task abort [ should-raise-a-runtime-error ]
338 ctx task retry [ retry-message ]
339 ctx task abort [ should-raise-a-runtime-error ]
341 exception = self._run_and_get_task_exception(
342 executor, workflow_context,
343 script_path=script_path)
344 assert isinstance(exception, TaskAbortException)
345 assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE
347 def test_crash_retry_after_abort(self, executor, workflow_context, tmpdir):
348 script_path = self._create_script(
350 linux_script='''#! /bin/bash
351 ctx task abort [ abort-message ]
352 ctx task retry [ should-raise-a-runtime-error ]
355 ctx task abort [ abort-message ]
356 ctx task retry [ should-raise-a-runtime-error ]
358 exception = self._run_and_get_task_exception(
359 executor, workflow_context,
360 script_path=script_path)
361 assert isinstance(exception, TaskAbortException)
362 assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE
364 def test_crash_abort_after_abort(self, executor, workflow_context, tmpdir):
365 script_path = self._create_script(
367 linux_script='''#! /bin/bash
368 ctx task abort [ abort-message ]
369 ctx task abort [ should-raise-a-runtime-error ]
372 ctx task abort [ abort-message ]
373 ctx task abort [ should-raise-a-runtime-error ]
375 exception = self._run_and_get_task_exception(
376 executor, workflow_context,
377 script_path=script_path)
378 assert isinstance(exception, TaskAbortException)
379 assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE
381 def test_crash_retry_after_retry(self, executor, workflow_context, tmpdir):
382 script_path = self._create_script(
384 linux_script='''#! /bin/bash
385 ctx task retry [ retry-message ]
386 ctx task retry [ should-raise-a-runtime-error ]
389 ctx task retry [ retry-message ]
390 ctx task retry [ should-raise-a-runtime-error ]
392 exception = self._run_and_get_task_exception(
393 executor, workflow_context,
394 script_path=script_path)
395 assert isinstance(exception, TaskAbortException)
396 assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE
398 def test_retry_returns_a_nonzero_exit_code(self, executor, workflow_context, tmpdir):
399 log_path = tmpdir.join('temp.log')
401 script_path = self._create_script(
403 linux_script='''#! /bin/bash -e
404 ctx task retry [ "{0}" ] 2> {1}
405 echo should-not-run > {1}
406 '''.format(message, log_path),
408 ctx task retry [ "{0}" ] 2> {1}
409 if %errorlevel% neq 0 exit /b %errorlevel%
410 echo should-not-run > {1}
411 '''.format(message, log_path))
412 with pytest.raises(ExecutorException):
414 executor, workflow_context,
415 script_path=script_path)
416 assert log_path.read().strip() == message
418 def test_abort_returns_a_nonzero_exit_code(self, executor, workflow_context, tmpdir):
419 log_path = tmpdir.join('temp.log')
421 script_path = self._create_script(
423 linux_script='''#! /bin/bash -e
424 ctx task abort [ "{0}" ] 2> {1}
425 echo should-not-run > {1}
426 '''.format(message, log_path),
428 ctx task abort [ "{0}" ] 2> {1}
429 if %errorlevel% neq 0 exit /b %errorlevel%
430 echo should-not-run > {1}
431 '''.format(message, log_path))
432 with pytest.raises(ExecutorException):
434 executor, workflow_context,
435 script_path=script_path)
436 assert log_path.read().strip() == message
438 def _create_script(self,
442 windows_suffix='.bat',
444 suffix = windows_suffix if IS_WINDOWS else linux_suffix
445 script = windows_script if IS_WINDOWS else linux_script
446 script_path = tmpdir.join('script{0}'.format(suffix))
447 script_path.write(script)
448 return str(script_path)
450 def _run_and_get_task_exception(self, *args, **kwargs):
451 signal = events.on_failure_task_signal
452 with events_collector(signal) as collected:
453 with pytest.raises(ExecutorException):
454 self._run(*args, **kwargs)
455 return collected[signal][0]['kwargs']['exception']
464 local_script_path = script_path
465 script_path = os.path.basename(local_script_path) if local_script_path else ''
466 arguments = arguments or {}
467 process = process or {}
469 workflow_context.resource.service.upload(
470 entry_id=str(workflow_context.service.id),
471 source=local_script_path,
475 'script_path': script_path,
477 'input_as_env_var': env_var
480 node = workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
481 interface = mock.models.create_interface(
485 operation_kwargs=dict(
486 function='{0}.{1}'.format(
488 operations.run_script_locally.__name__),
491 node.interfaces[interface.name] = interface
492 workflow_context.model.node.update(node)
495 def mock_workflow(ctx, graph):
496 graph.add_tasks(api.task.OperationTask(
498 interface_name='test',
500 arguments=arguments))
502 tasks_graph = mock_workflow(ctx=workflow_context) # pylint: disable=no-value-for-parameter
503 graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(tasks_graph)
504 eng = engine.Engine({executor.__class__: executor})
505 eng.execute(workflow_context)
506 return workflow_context.model.node.get_by_name(
507 mock.models.DEPENDENCY_NODE_NAME).attributes
511 result = process.ProcessExecutor()
518 def workflow_context(self, tmpdir):
519 workflow_context = mock.context.simple(str(tmpdir), inmemory=False)
520 workflow_context.states = []
521 workflow_context.exception = None
522 yield workflow_context
523 storage.release_sqlite_storage(workflow_context.model)
526 class BaseTestConfiguration(object):
528 @pytest.fixture(autouse=True)
529 def mock_execute(self, mocker):
533 def execute_func(process, **_):
534 self.process = process
535 self.called = 'execute'
538 mocker.patch.object(local, '_execute_func', execute_func)
539 mocker.patch.object(local, '_eval_script_func', eval_func)
543 def download_resource(destination, *args, **kwargs):
546 def _run(self, script_path, process=None):
548 script_path=script_path,
553 class TestPowerShellConfiguration(BaseTestConfiguration):
555 def test_implicit_powershell_call_with_ps1_extension(self):
556 self._run(script_path='script_path.ps1')
557 assert self.process['command_prefix'] == 'powershell'
559 def test_command_prefix_is_overridden_for_ps1_extension(self):
560 self._run(script_path='script_path.ps1',
561 process={'command_prefix': 'bash'})
562 assert self.process['command_prefix'] == 'bash'
564 def test_explicit_powershell_call(self):
565 self._run(script_path='script_path.ps1',
566 process={'command_prefix': 'powershell'})
567 assert self.process['command_prefix'] == 'powershell'
570 class TestEvalPythonConfiguration(BaseTestConfiguration):
572 def test_explicit_eval_without_py_extension(self):
573 self._run(script_path='script_path',
574 process={'eval_python': True})
575 assert self.called == 'eval'
577 def test_explicit_eval_with_py_extension(self):
578 self._run(script_path='script_path.py',
579 process={'eval_python': True})
580 assert self.called == 'eval'
582 def test_implicit_eval(self):
583 self._run(script_path='script_path.py')
584 assert self.called == 'eval'
586 def test_explicit_execute_without_py_extension(self):
587 self._run(script_path='script_path',
588 process={'eval_python': False})
589 assert self.called == 'execute'
591 def test_explicit_execute_with_py_extension(self):
592 self._run(script_path='script_path.py',
593 process={'eval_python': False})
594 assert self.called == 'execute'
596 def test_implicit_execute(self):
597 self._run(script_path='script_path')
598 assert self.called == 'execute'