1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements. See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License. You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 from threading import Thread, Event
19 from datetime import datetime
24 from aria.modeling import exceptions as modeling_exceptions
25 from aria.modeling import models
26 from aria.orchestrator import exceptions
27 from aria.orchestrator import events
28 from aria.orchestrator.workflow_runner import WorkflowRunner
29 from aria.orchestrator.workflows.executor.process import ProcessExecutor
30 from aria.orchestrator.workflows import api
31 from aria.orchestrator.workflows.core import engine, graph_compiler
32 from aria.orchestrator.workflows.executor import thread
33 from aria.orchestrator import (
43 from ..fixtures import ( # pylint: disable=unused-import
47 resource_storage as resource
51 'is_resumed': Event(),
53 'execution_cancelled': Event(),
54 'execution_failed': Event(),
58 class TimeoutError(BaseException):
62 class FailingTask(BaseException):
66 def test_undeclared_workflow(request):
67 # validating a proper error is raised when the workflow is not declared in the service
68 with pytest.raises(exceptions.UndeclaredWorkflowError):
69 _create_workflow_runner(request, 'undeclared_workflow')
72 def test_missing_workflow_implementation(service, request):
73 # validating a proper error is raised when the workflow code path does not exist
74 workflow = models.Operation(
77 function='nonexistent.workflow.implementation')
78 service.workflows['test_workflow'] = workflow
80 with pytest.raises(exceptions.WorkflowImplementationNotFoundError):
81 _create_workflow_runner(request, 'test_workflow')
84 def test_builtin_workflow_instantiation(request):
85 # validates the workflow runner instantiates properly when provided with a builtin workflow
86 # (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
87 workflow_runner = _create_workflow_runner(request, 'install')
88 tasks = list(workflow_runner.execution.tasks)
89 assert len(tasks) == 18 # expecting 18 tasks for 2 node topology
92 def test_custom_workflow_instantiation(request):
93 # validates the workflow runner instantiates properly when provided with a custom workflow
94 # (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
95 mock_workflow = _setup_mock_workflow_in_service(request)
96 workflow_runner = _create_workflow_runner(request, mock_workflow)
97 tasks = list(workflow_runner.execution.tasks)
98 assert len(tasks) == 2 # mock workflow creates only start workflow and end workflow task
101 def test_existing_active_executions(request, service, model):
102 existing_active_execution = models.Execution(
104 status=models.Execution.STARTED,
105 workflow_name='uninstall')
106 model.execution.put(existing_active_execution)
107 with pytest.raises(exceptions.ActiveExecutionsError):
108 _create_workflow_runner(request, 'install')
111 def test_existing_executions_but_no_active_ones(request, service, model):
112 existing_terminated_execution = models.Execution(
114 status=models.Execution.SUCCEEDED,
115 workflow_name='uninstall')
116 model.execution.put(existing_terminated_execution)
117 # no active executions exist, so no error should be raised
118 _create_workflow_runner(request, 'install')
121 def test_default_executor(request):
122 # validates the ProcessExecutor is used by the workflow runner by default
123 mock_workflow = _setup_mock_workflow_in_service(request)
125 with mock.patch('aria.orchestrator.workflow_runner.engine.Engine') as mock_engine_cls:
126 _create_workflow_runner(request, mock_workflow)
127 _, engine_kwargs = mock_engine_cls.call_args
128 assert isinstance(engine_kwargs.get('executors').values()[0], ProcessExecutor)
131 def test_custom_executor(request):
132 mock_workflow = _setup_mock_workflow_in_service(request)
134 custom_executor = mock.MagicMock()
135 with mock.patch('aria.orchestrator.workflow_runner.engine.Engine') as mock_engine_cls:
136 _create_workflow_runner(request, mock_workflow, executor=custom_executor)
137 _, engine_kwargs = mock_engine_cls.call_args
138 assert engine_kwargs.get('executors').values()[0] == custom_executor
141 def test_task_configuration_parameters(request):
142 mock_workflow = _setup_mock_workflow_in_service(request)
144 task_max_attempts = 5
145 task_retry_interval = 7
146 with mock.patch('aria.orchestrator.workflow_runner.engine.Engine.execute') as \
148 _create_workflow_runner(request, mock_workflow, task_max_attempts=task_max_attempts,
149 task_retry_interval=task_retry_interval).execute()
150 _, engine_kwargs = mock_engine_execute.call_args
151 assert engine_kwargs['ctx']._task_max_attempts == task_max_attempts
152 assert engine_kwargs['ctx']._task_retry_interval == task_retry_interval
155 def test_execute(request, service):
156 mock_workflow = _setup_mock_workflow_in_service(request)
158 mock_engine = mock.MagicMock()
159 with mock.patch('aria.orchestrator.workflow_runner.engine.Engine.execute',
160 return_value=mock_engine) as mock_engine_execute:
161 workflow_runner = _create_workflow_runner(request, mock_workflow)
162 workflow_runner.execute()
164 _, engine_kwargs = mock_engine_execute.call_args
165 assert engine_kwargs['ctx'].service.id == service.id
166 assert engine_kwargs['ctx'].execution.workflow_name == 'test_workflow'
168 mock_engine_execute.assert_called_once_with(ctx=workflow_runner._workflow_context,
173 def test_cancel_execution(request):
174 mock_workflow = _setup_mock_workflow_in_service(request)
176 mock_engine = mock.MagicMock()
177 with mock.patch('aria.orchestrator.workflow_runner.engine.Engine', return_value=mock_engine):
178 workflow_runner = _create_workflow_runner(request, mock_workflow)
179 workflow_runner.cancel()
180 mock_engine.cancel_execution.assert_called_once_with(ctx=workflow_runner._workflow_context)
183 def test_execution_model_creation(request, service, model):
184 mock_workflow = _setup_mock_workflow_in_service(request)
186 with mock.patch('aria.orchestrator.workflow_runner.engine.Engine'):
187 workflow_runner = _create_workflow_runner(request, mock_workflow)
189 assert model.execution.get(workflow_runner.execution.id) == workflow_runner.execution
190 assert workflow_runner.execution.service.id == service.id
191 assert workflow_runner.execution.workflow_name == mock_workflow
192 assert workflow_runner.execution.created_at <= datetime.utcnow()
193 assert workflow_runner.execution.inputs == dict()
196 def test_execution_inputs_override_workflow_inputs(request):
197 wf_inputs = {'input1': 'value1', 'input2': 'value2', 'input3': 5}
198 mock_workflow = _setup_mock_workflow_in_service(
200 inputs=dict((name, models.Input.wrap(name, val)) for name, val
201 in wf_inputs.iteritems()))
203 with mock.patch('aria.orchestrator.workflow_runner.engine.Engine'):
204 workflow_runner = _create_workflow_runner(
205 request, mock_workflow, inputs={'input2': 'overriding-value2', 'input3': 7})
207 assert len(workflow_runner.execution.inputs) == 3
208 # did not override input1 - expecting the default value from the workflow inputs
209 assert workflow_runner.execution.inputs['input1'].value == 'value1'
211 assert workflow_runner.execution.inputs['input2'].value == 'overriding-value2'
212 # overrode input of integer type
213 assert workflow_runner.execution.inputs['input3'].value == 7
216 def test_execution_inputs_undeclared_inputs(request):
217 mock_workflow = _setup_mock_workflow_in_service(request)
219 with pytest.raises(modeling_exceptions.UndeclaredInputsException):
220 _create_workflow_runner(request, mock_workflow, inputs={'undeclared_input': 'value'})
223 def test_execution_inputs_missing_required_inputs(request):
224 mock_workflow = _setup_mock_workflow_in_service(
225 request, inputs={'required_input': models.Input.wrap('required_input', value=None)})
227 with pytest.raises(modeling_exceptions.MissingRequiredInputsException):
228 _create_workflow_runner(request, mock_workflow, inputs={})
231 def test_execution_inputs_wrong_type_inputs(request):
232 mock_workflow = _setup_mock_workflow_in_service(
233 request, inputs={'input': models.Input.wrap('input', 'value')})
235 with pytest.raises(modeling_exceptions.ParametersOfWrongTypeException):
236 _create_workflow_runner(request, mock_workflow, inputs={'input': 5})
239 def test_execution_inputs_builtin_workflow_with_inputs(request):
240 # built-in workflows don't have inputs
241 with pytest.raises(modeling_exceptions.UndeclaredInputsException):
242 _create_workflow_runner(request, 'install', inputs={'undeclared_input': 'value'})
245 def test_workflow_function_parameters(request, tmpdir):
246 # validating the workflow function is passed with the
247 # merged execution inputs, in dict form
249 # the workflow function parameters will be written to this file
250 output_path = str(tmpdir.join('output'))
251 wf_inputs = {'output_path': output_path, 'input1': 'value1', 'input2': 'value2', 'input3': 5}
253 mock_workflow = _setup_mock_workflow_in_service(
254 request, inputs=dict((name, models.Input.wrap(name, val)) for name, val
255 in wf_inputs.iteritems()))
257 _create_workflow_runner(request, mock_workflow,
258 inputs={'input2': 'overriding-value2', 'input3': 7})
260 with open(output_path) as f:
261 wf_call_kwargs = json.load(f)
262 assert len(wf_call_kwargs) == 3
263 assert wf_call_kwargs.get('input1') == 'value1'
264 assert wf_call_kwargs.get('input2') == 'overriding-value2'
265 assert wf_call_kwargs.get('input3') == 7
270 # sets up a service in the storage
271 service_id = tests_mock.topology.create_simple_topology_two_nodes(model)
272 service = model.service.get(service_id)
276 def _setup_mock_workflow_in_service(request, inputs=None):
277 # sets up a mock workflow as part of the service, including uploading
278 # the workflow code to the service's dir on the resource storage
279 service = request.getfixturevalue('service')
280 resource = request.getfixturevalue('resource')
282 source = tests_mock.workflow.__file__
283 resource.service_template.upload(str(service.service_template.id), source)
284 mock_workflow_name = 'test_workflow'
287 for input in inputs.itervalues():
288 arguments[input.name] = input.as_argument()
289 workflow = models.Operation(
290 name=mock_workflow_name,
292 function='workflow.mock_workflow',
295 service.workflows[mock_workflow_name] = workflow
296 return mock_workflow_name
299 def _create_workflow_runner(request, workflow_name, inputs=None, executor=None,
300 task_max_attempts=None, task_retry_interval=None):
301 # helper method for instantiating a workflow runner
302 service_id = request.getfixturevalue('service').id
303 model = request.getfixturevalue('model')
304 resource = request.getfixturevalue('resource')
305 plugin_manager = request.getfixturevalue('plugin_manager')
307 # task configuration parameters can't be set to None, therefore only
308 # passing those if they've been set by the test
309 task_configuration_kwargs = dict()
310 if task_max_attempts is not None:
311 task_configuration_kwargs['task_max_attempts'] = task_max_attempts
312 if task_retry_interval is not None:
313 task_configuration_kwargs['task_retry_interval'] = task_retry_interval
315 return WorkflowRunner(
316 workflow_name=workflow_name,
317 service_id=service_id,
321 resource_storage=resource,
322 plugin_manager=plugin_manager,
323 **task_configuration_kwargs)
326 class TestResumableWorkflows(object):
328 def _create_initial_workflow_runner(
329 self, workflow_context, workflow, executor, inputs=None):
331 service = workflow_context.service
332 service.workflows['custom_workflow'] = tests_mock.models.create_operation(
335 'function': '{0}.{1}'.format(__name__, workflow.__name__),
336 'inputs': dict((k, models.Input.wrap(k, v)) for k, v in (inputs or {}).items())
339 workflow_context.model.service.update(service)
341 wf_runner = WorkflowRunner(
342 service_id=workflow_context.service.id,
344 model_storage=workflow_context.model,
345 resource_storage=workflow_context.resource,
347 workflow_name='custom_workflow',
352 def _wait_for_active_and_cancel(workflow_runner):
353 if custom_events['is_active'].wait(60) is False:
354 raise TimeoutError("is_active wasn't set to True")
355 workflow_runner.cancel()
356 if custom_events['execution_cancelled'].wait(60) is False:
357 raise TimeoutError("Execution did not end")
359 def test_resume_workflow(self, workflow_context, thread_executor):
360 node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
361 node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
362 self._create_interface(workflow_context, node, mock_pass_first_task_only)
364 wf_runner = self._create_initial_workflow_runner(
365 workflow_context, mock_parallel_tasks_workflow, thread_executor,
366 inputs={'number_of_tasks': 2})
368 wf_thread = Thread(target=wf_runner.execute)
369 wf_thread.daemon = True
372 # Wait for the execution to start
373 self._wait_for_active_and_cancel(wf_runner)
374 node = workflow_context.model.node.refresh(node)
376 tasks = workflow_context.model.task.list(filters={'_stub_type': None})
377 assert any(task.status == task.SUCCESS for task in tasks)
378 assert any(task.status == task.RETRYING for task in tasks)
379 custom_events['is_resumed'].set()
380 assert any(task.status == task.RETRYING for task in tasks)
382 # Create a new workflow runner, with an existing execution id. This would cause
383 # the old execution to restart.
384 new_wf_runner = WorkflowRunner(
385 service_id=wf_runner.service.id,
387 model_storage=workflow_context.model,
388 resource_storage=workflow_context.resource,
390 execution_id=wf_runner.execution.id,
391 executor=thread_executor)
393 new_wf_runner.execute()
395 # Wait for it to finish and assert changes.
396 node = workflow_context.model.node.refresh(node)
397 assert all(task.status == task.SUCCESS for task in tasks)
398 assert node.attributes['invocations'].value == 3
399 assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
401 def test_resume_started_task(self, workflow_context, thread_executor):
402 node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
403 node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
404 self._create_interface(workflow_context, node, mock_stuck_task)
406 wf_runner = self._create_initial_workflow_runner(
407 workflow_context, mock_parallel_tasks_workflow, thread_executor,
408 inputs={'number_of_tasks': 1})
410 wf_thread = Thread(target=wf_runner.execute)
411 wf_thread.daemon = True
414 self._wait_for_active_and_cancel(wf_runner)
415 node = workflow_context.model.node.refresh(node)
416 task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
417 assert node.attributes['invocations'].value == 1
418 assert task.status == task.STARTED
419 assert wf_runner.execution.status in (wf_runner.execution.CANCELLED,
420 wf_runner.execution.CANCELLING)
421 custom_events['is_resumed'].set()
423 new_thread_executor = thread.ThreadExecutor()
425 new_wf_runner = WorkflowRunner(
426 service_id=wf_runner.service.id,
428 model_storage=workflow_context.model,
429 resource_storage=workflow_context.resource,
431 execution_id=wf_runner.execution.id,
432 executor=new_thread_executor)
434 new_wf_runner.execute()
436 new_thread_executor.close()
438 # Wait for it to finish and assert changes.
439 node = workflow_context.model.node.refresh(node)
440 assert node.attributes['invocations'].value == 2
441 assert task.status == task.SUCCESS
442 assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
444 def test_resume_failed_task(self, workflow_context, thread_executor):
445 node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
446 node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
447 self._create_interface(workflow_context, node, mock_failed_before_resuming)
449 wf_runner = self._create_initial_workflow_runner(workflow_context,
450 mock_parallel_tasks_workflow,
452 wf_thread = Thread(target=wf_runner.execute)
453 wf_thread.setDaemon(True)
456 self._wait_for_active_and_cancel(wf_runner)
457 node = workflow_context.model.node.refresh(node)
459 task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
460 assert node.attributes['invocations'].value == 2
461 assert task.status == task.STARTED
462 assert wf_runner.execution.status in (wf_runner.execution.CANCELLED,
463 wf_runner.execution.CANCELLING)
465 custom_events['is_resumed'].set()
466 assert node.attributes['invocations'].value == 2
468 # Create a new workflow runner, with an existing execution id. This would cause
469 # the old execution to restart.
470 new_thread_executor = thread.ThreadExecutor()
472 new_wf_runner = WorkflowRunner(
473 service_id=wf_runner.service.id,
475 model_storage=workflow_context.model,
476 resource_storage=workflow_context.resource,
478 execution_id=wf_runner.execution.id,
479 executor=new_thread_executor)
481 new_wf_runner.execute()
483 new_thread_executor.close()
485 # Wait for it to finish and assert changes.
486 node = workflow_context.model.node.refresh(node)
487 assert node.attributes['invocations'].value == task.max_attempts - 1
488 assert task.status == task.SUCCESS
489 assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
491 def test_resume_failed_task_and_successful_task(self, workflow_context, thread_executor):
492 node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
493 node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
494 self._create_interface(workflow_context, node, mock_pass_first_task_only)
496 wf_runner = self._create_initial_workflow_runner(
498 mock_parallel_tasks_workflow,
500 inputs={'retry_interval': 1, 'max_attempts': 2, 'number_of_tasks': 2}
502 wf_thread = Thread(target=wf_runner.execute)
503 wf_thread.setDaemon(True)
506 if custom_events['execution_failed'].wait(60) is False:
507 raise TimeoutError("Execution did not end")
509 tasks = workflow_context.model.task.list(filters={'_stub_type': None})
510 node = workflow_context.model.node.refresh(node)
511 assert node.attributes['invocations'].value == 3
512 failed_task = [t for t in tasks if t.status == t.FAILED][0]
515 assert any(task.status == task.FAILED for task in tasks)
516 assert failed_task.attempts_count == 2
518 assert any(task.status == task.SUCCESS for task in tasks)
519 assert wf_runner.execution.status in wf_runner.execution.FAILED
521 custom_events['is_resumed'].set()
522 new_thread_executor = thread.ThreadExecutor()
524 new_wf_runner = WorkflowRunner(
525 service_id=wf_runner.service.id,
526 retry_failed_tasks=True,
528 model_storage=workflow_context.model,
529 resource_storage=workflow_context.resource,
531 execution_id=wf_runner.execution.id,
532 executor=new_thread_executor)
534 new_wf_runner.execute()
536 new_thread_executor.close()
538 # Wait for it to finish and assert changes.
539 node = workflow_context.model.node.refresh(node)
540 assert failed_task.attempts_count == 1
541 assert node.attributes['invocations'].value == 4
542 assert all(task.status == task.SUCCESS for task in tasks)
543 assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
545 def test_two_sequential_task_first_task_failed(self, workflow_context, thread_executor):
546 node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
547 node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
548 self._create_interface(workflow_context, node, mock_fail_first_task_only)
550 wf_runner = self._create_initial_workflow_runner(
552 mock_sequential_tasks_workflow,
554 inputs={'retry_interval': 1, 'max_attempts': 1, 'number_of_tasks': 2}
556 wf_thread = Thread(target=wf_runner.execute)
557 wf_thread.setDaemon(True)
560 if custom_events['execution_failed'].wait(60) is False:
561 raise TimeoutError("Execution did not end")
563 tasks = workflow_context.model.task.list(filters={'_stub_type': None})
564 node = workflow_context.model.node.refresh(node)
565 assert node.attributes['invocations'].value == 1
566 assert any(t.status == t.FAILED for t in tasks)
567 assert any(t.status == t.PENDING for t in tasks)
569 custom_events['is_resumed'].set()
570 new_thread_executor = thread.ThreadExecutor()
572 new_wf_runner = WorkflowRunner(
573 service_id=wf_runner.service.id,
575 model_storage=workflow_context.model,
576 resource_storage=workflow_context.resource,
578 execution_id=wf_runner.execution.id,
579 executor=new_thread_executor)
581 new_wf_runner.execute()
583 new_thread_executor.close()
585 # Wait for it to finish and assert changes.
586 node = workflow_context.model.node.refresh(node)
587 assert node.attributes['invocations'].value == 2
588 assert any(t.status == t.SUCCESS for t in tasks)
589 assert any(t.status == t.FAILED for t in tasks)
590 assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
596 def thread_executor():
597 result = thread.ThreadExecutor()
605 def workflow_context(tmpdir):
606 workflow_context = tests_mock.context.simple(str(tmpdir))
607 yield workflow_context
608 storage.release_sqlite_storage(workflow_context.model)
611 def _create_interface(ctx, node, func, arguments=None):
612 interface_name = 'aria.interfaces.lifecycle'
613 operation_kwargs = dict(function='{name}.{func.__name__}'.format(
614 name=__name__, func=func))
616 # the operation has to declare the arguments before those may be passed
617 operation_kwargs['arguments'] = arguments
618 operation_name = 'create'
619 interface = tests_mock.models.create_interface(node.service, interface_name, operation_name,
620 operation_kwargs=operation_kwargs)
621 node.interfaces[interface.name] = interface
622 ctx.model.node.update(node)
624 return node, interface_name, operation_name
627 def _engine(workflow_func, workflow_context, executor):
628 graph = workflow_func(ctx=workflow_context)
629 execution = workflow_context.execution
630 graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph)
631 workflow_context.execution = execution
633 return engine.Engine(executors={executor.__class__: executor})
635 @pytest.fixture(autouse=True)
636 def register_to_events(self):
637 def execution_cancelled(*args, **kwargs):
638 custom_events['execution_cancelled'].set()
640 def execution_failed(*args, **kwargs):
641 custom_events['execution_failed'].set()
643 events.on_cancelled_workflow_signal.connect(execution_cancelled)
644 events.on_failure_workflow_signal.connect(execution_failed)
646 events.on_cancelled_workflow_signal.disconnect(execution_cancelled)
647 events.on_failure_workflow_signal.disconnect(execution_failed)
648 for event in custom_events.values():
653 def mock_sequential_tasks_workflow(ctx, graph,
654 retry_interval=1, max_attempts=10, number_of_tasks=1):
655 node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
656 graph.sequence(*_create_tasks(node, retry_interval, max_attempts, number_of_tasks))
660 def mock_parallel_tasks_workflow(ctx, graph,
661 retry_interval=1, max_attempts=10, number_of_tasks=1):
662 node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
663 graph.add_tasks(*_create_tasks(node, retry_interval, max_attempts, number_of_tasks))
666 def _create_tasks(node, retry_interval, max_attempts, number_of_tasks):
668 api.task.OperationTask(node,
669 'aria.interfaces.lifecycle',
671 retry_interval=retry_interval,
672 max_attempts=max_attempts)
673 for _ in xrange(number_of_tasks)
679 def mock_failed_before_resuming(ctx):
681 The task should run atmost ctx.task.max_attempts - 1 times, and only then pass.
682 overall, the number of invocations should be ctx.task.max_attempts - 1
684 ctx.node.attributes['invocations'] += 1
686 if ctx.node.attributes['invocations'] == 2:
687 custom_events['is_active'].set()
688 # unfreeze the thread only when all of the invocations are done
689 while ctx.node.attributes['invocations'] < ctx.task.max_attempts - 1:
692 elif ctx.node.attributes['invocations'] == ctx.task.max_attempts - 1:
693 # pass only just before the end.
697 raise FailingTask("stop this task")
701 def mock_stuck_task(ctx):
702 ctx.node.attributes['invocations'] += 1
703 while not custom_events['is_resumed'].isSet():
704 if not custom_events['is_active'].isSet():
705 custom_events['is_active'].set()
710 def mock_pass_first_task_only(ctx):
711 ctx.node.attributes['invocations'] += 1
713 if ctx.node.attributes['invocations'] != 1:
714 custom_events['is_active'].set()
715 if not custom_events['is_resumed'].isSet():
716 # if resume was called, increase by one. o/w fail the execution - second task should
717 # fail as long it was not a part of resuming the workflow
718 raise FailingTask("wasn't resumed yet")
722 def mock_fail_first_task_only(ctx):
723 ctx.node.attributes['invocations'] += 1
725 if not custom_events['is_resumed'].isSet() and ctx.node.attributes['invocations'] == 1:
726 raise FailingTask("First task should fail")