vFW and vDNS support added to azure-plugin
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / tests / orchestrator / test_workflow_runner.py
1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements.  See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License.  You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import json
17 import time
18 from threading import Thread, Event
19 from datetime import datetime
20
21 import mock
22 import pytest
23
24 from aria.modeling import exceptions as modeling_exceptions
25 from aria.modeling import models
26 from aria.orchestrator import exceptions
27 from aria.orchestrator import events
28 from aria.orchestrator.workflow_runner import WorkflowRunner
29 from aria.orchestrator.workflows.executor.process import ProcessExecutor
30 from aria.orchestrator.workflows import api
31 from aria.orchestrator.workflows.core import engine, graph_compiler
32 from aria.orchestrator.workflows.executor import thread
33 from aria.orchestrator import (
34     workflow,
35     operation,
36 )
37
38 from tests import (
39     mock as tests_mock,
40     storage
41 )
42
43 from ..fixtures import (  # pylint: disable=unused-import
44     plugins_dir,
45     plugin_manager,
46     fs_model as model,
47     resource_storage as resource
48 )
49
50 custom_events = {
51     'is_resumed': Event(),
52     'is_active': Event(),
53     'execution_cancelled': Event(),
54     'execution_failed': Event(),
55 }
56
57
58 class TimeoutError(BaseException):
59     pass
60
61
62 class FailingTask(BaseException):
63     pass
64
65
66 def test_undeclared_workflow(request):
67     # validating a proper error is raised when the workflow is not declared in the service
68     with pytest.raises(exceptions.UndeclaredWorkflowError):
69         _create_workflow_runner(request, 'undeclared_workflow')
70
71
72 def test_missing_workflow_implementation(service, request):
73     # validating a proper error is raised when the workflow code path does not exist
74     workflow = models.Operation(
75         name='test_workflow',
76         service=service,
77         function='nonexistent.workflow.implementation')
78     service.workflows['test_workflow'] = workflow
79
80     with pytest.raises(exceptions.WorkflowImplementationNotFoundError):
81         _create_workflow_runner(request, 'test_workflow')
82
83
84 def test_builtin_workflow_instantiation(request):
85     # validates the workflow runner instantiates properly when provided with a builtin workflow
86     # (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
87     workflow_runner = _create_workflow_runner(request, 'install')
88     tasks = list(workflow_runner.execution.tasks)
89     assert len(tasks) == 18  # expecting 18 tasks for 2 node topology
90
91
92 def test_custom_workflow_instantiation(request):
93     # validates the workflow runner instantiates properly when provided with a custom workflow
94     # (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
95     mock_workflow = _setup_mock_workflow_in_service(request)
96     workflow_runner = _create_workflow_runner(request, mock_workflow)
97     tasks = list(workflow_runner.execution.tasks)
98     assert len(tasks) == 2  # mock workflow creates only start workflow and end workflow task
99
100
101 def test_existing_active_executions(request, service, model):
102     existing_active_execution = models.Execution(
103         service=service,
104         status=models.Execution.STARTED,
105         workflow_name='uninstall')
106     model.execution.put(existing_active_execution)
107     with pytest.raises(exceptions.ActiveExecutionsError):
108         _create_workflow_runner(request, 'install')
109
110
111 def test_existing_executions_but_no_active_ones(request, service, model):
112     existing_terminated_execution = models.Execution(
113         service=service,
114         status=models.Execution.SUCCEEDED,
115         workflow_name='uninstall')
116     model.execution.put(existing_terminated_execution)
117     # no active executions exist, so no error should be raised
118     _create_workflow_runner(request, 'install')
119
120
121 def test_default_executor(request):
122     # validates the ProcessExecutor is used by the workflow runner by default
123     mock_workflow = _setup_mock_workflow_in_service(request)
124
125     with mock.patch('aria.orchestrator.workflow_runner.engine.Engine') as mock_engine_cls:
126         _create_workflow_runner(request, mock_workflow)
127         _, engine_kwargs = mock_engine_cls.call_args
128         assert isinstance(engine_kwargs.get('executors').values()[0], ProcessExecutor)
129
130
131 def test_custom_executor(request):
132     mock_workflow = _setup_mock_workflow_in_service(request)
133
134     custom_executor = mock.MagicMock()
135     with mock.patch('aria.orchestrator.workflow_runner.engine.Engine') as mock_engine_cls:
136         _create_workflow_runner(request, mock_workflow, executor=custom_executor)
137         _, engine_kwargs = mock_engine_cls.call_args
138         assert engine_kwargs.get('executors').values()[0] == custom_executor
139
140
141 def test_task_configuration_parameters(request):
142     mock_workflow = _setup_mock_workflow_in_service(request)
143
144     task_max_attempts = 5
145     task_retry_interval = 7
146     with mock.patch('aria.orchestrator.workflow_runner.engine.Engine.execute') as \
147             mock_engine_execute:
148         _create_workflow_runner(request, mock_workflow, task_max_attempts=task_max_attempts,
149                                 task_retry_interval=task_retry_interval).execute()
150         _, engine_kwargs = mock_engine_execute.call_args
151         assert engine_kwargs['ctx']._task_max_attempts == task_max_attempts
152         assert engine_kwargs['ctx']._task_retry_interval == task_retry_interval
153
154
155 def test_execute(request, service):
156     mock_workflow = _setup_mock_workflow_in_service(request)
157
158     mock_engine = mock.MagicMock()
159     with mock.patch('aria.orchestrator.workflow_runner.engine.Engine.execute',
160                     return_value=mock_engine) as mock_engine_execute:
161         workflow_runner = _create_workflow_runner(request, mock_workflow)
162         workflow_runner.execute()
163
164         _, engine_kwargs = mock_engine_execute.call_args
165         assert engine_kwargs['ctx'].service.id == service.id
166         assert engine_kwargs['ctx'].execution.workflow_name == 'test_workflow'
167
168         mock_engine_execute.assert_called_once_with(ctx=workflow_runner._workflow_context,
169                                                     resuming=False,
170                                                     retry_failed=False)
171
172
173 def test_cancel_execution(request):
174     mock_workflow = _setup_mock_workflow_in_service(request)
175
176     mock_engine = mock.MagicMock()
177     with mock.patch('aria.orchestrator.workflow_runner.engine.Engine', return_value=mock_engine):
178         workflow_runner = _create_workflow_runner(request, mock_workflow)
179         workflow_runner.cancel()
180         mock_engine.cancel_execution.assert_called_once_with(ctx=workflow_runner._workflow_context)
181
182
183 def test_execution_model_creation(request, service, model):
184     mock_workflow = _setup_mock_workflow_in_service(request)
185
186     with mock.patch('aria.orchestrator.workflow_runner.engine.Engine'):
187         workflow_runner = _create_workflow_runner(request, mock_workflow)
188
189         assert model.execution.get(workflow_runner.execution.id) == workflow_runner.execution
190         assert workflow_runner.execution.service.id == service.id
191         assert workflow_runner.execution.workflow_name == mock_workflow
192         assert workflow_runner.execution.created_at <= datetime.utcnow()
193         assert workflow_runner.execution.inputs == dict()
194
195
196 def test_execution_inputs_override_workflow_inputs(request):
197     wf_inputs = {'input1': 'value1', 'input2': 'value2', 'input3': 5}
198     mock_workflow = _setup_mock_workflow_in_service(
199         request,
200         inputs=dict((name, models.Input.wrap(name, val)) for name, val
201                     in wf_inputs.iteritems()))
202
203     with mock.patch('aria.orchestrator.workflow_runner.engine.Engine'):
204         workflow_runner = _create_workflow_runner(
205             request, mock_workflow, inputs={'input2': 'overriding-value2', 'input3': 7})
206
207         assert len(workflow_runner.execution.inputs) == 3
208         # did not override input1 - expecting the default value from the workflow inputs
209         assert workflow_runner.execution.inputs['input1'].value == 'value1'
210         # overrode input2
211         assert workflow_runner.execution.inputs['input2'].value == 'overriding-value2'
212         # overrode input of integer type
213         assert workflow_runner.execution.inputs['input3'].value == 7
214
215
216 def test_execution_inputs_undeclared_inputs(request):
217     mock_workflow = _setup_mock_workflow_in_service(request)
218
219     with pytest.raises(modeling_exceptions.UndeclaredInputsException):
220         _create_workflow_runner(request, mock_workflow, inputs={'undeclared_input': 'value'})
221
222
223 def test_execution_inputs_missing_required_inputs(request):
224     mock_workflow = _setup_mock_workflow_in_service(
225         request, inputs={'required_input': models.Input.wrap('required_input', value=None)})
226
227     with pytest.raises(modeling_exceptions.MissingRequiredInputsException):
228         _create_workflow_runner(request, mock_workflow, inputs={})
229
230
231 def test_execution_inputs_wrong_type_inputs(request):
232     mock_workflow = _setup_mock_workflow_in_service(
233         request, inputs={'input': models.Input.wrap('input', 'value')})
234
235     with pytest.raises(modeling_exceptions.ParametersOfWrongTypeException):
236         _create_workflow_runner(request, mock_workflow, inputs={'input': 5})
237
238
239 def test_execution_inputs_builtin_workflow_with_inputs(request):
240     # built-in workflows don't have inputs
241     with pytest.raises(modeling_exceptions.UndeclaredInputsException):
242         _create_workflow_runner(request, 'install', inputs={'undeclared_input': 'value'})
243
244
245 def test_workflow_function_parameters(request, tmpdir):
246     # validating the workflow function is passed with the
247     # merged execution inputs, in dict form
248
249     # the workflow function parameters will be written to this file
250     output_path = str(tmpdir.join('output'))
251     wf_inputs = {'output_path': output_path, 'input1': 'value1', 'input2': 'value2', 'input3': 5}
252
253     mock_workflow = _setup_mock_workflow_in_service(
254         request, inputs=dict((name, models.Input.wrap(name, val)) for name, val
255                              in wf_inputs.iteritems()))
256
257     _create_workflow_runner(request, mock_workflow,
258                             inputs={'input2': 'overriding-value2', 'input3': 7})
259
260     with open(output_path) as f:
261         wf_call_kwargs = json.load(f)
262     assert len(wf_call_kwargs) == 3
263     assert wf_call_kwargs.get('input1') == 'value1'
264     assert wf_call_kwargs.get('input2') == 'overriding-value2'
265     assert wf_call_kwargs.get('input3') == 7
266
267
268 @pytest.fixture
269 def service(model):
270     # sets up a service in the storage
271     service_id = tests_mock.topology.create_simple_topology_two_nodes(model)
272     service = model.service.get(service_id)
273     return service
274
275
276 def _setup_mock_workflow_in_service(request, inputs=None):
277     # sets up a mock workflow as part of the service, including uploading
278     # the workflow code to the service's dir on the resource storage
279     service = request.getfixturevalue('service')
280     resource = request.getfixturevalue('resource')
281
282     source = tests_mock.workflow.__file__
283     resource.service_template.upload(str(service.service_template.id), source)
284     mock_workflow_name = 'test_workflow'
285     arguments = {}
286     if inputs:
287         for input in inputs.itervalues():
288             arguments[input.name] = input.as_argument()
289     workflow = models.Operation(
290         name=mock_workflow_name,
291         service=service,
292         function='workflow.mock_workflow',
293         inputs=inputs or {},
294         arguments=arguments)
295     service.workflows[mock_workflow_name] = workflow
296     return mock_workflow_name
297
298
299 def _create_workflow_runner(request, workflow_name, inputs=None, executor=None,
300                             task_max_attempts=None, task_retry_interval=None):
301     # helper method for instantiating a workflow runner
302     service_id = request.getfixturevalue('service').id
303     model = request.getfixturevalue('model')
304     resource = request.getfixturevalue('resource')
305     plugin_manager = request.getfixturevalue('plugin_manager')
306
307     # task configuration parameters can't be set to None, therefore only
308     # passing those if they've been set by the test
309     task_configuration_kwargs = dict()
310     if task_max_attempts is not None:
311         task_configuration_kwargs['task_max_attempts'] = task_max_attempts
312     if task_retry_interval is not None:
313         task_configuration_kwargs['task_retry_interval'] = task_retry_interval
314
315     return WorkflowRunner(
316         workflow_name=workflow_name,
317         service_id=service_id,
318         inputs=inputs or {},
319         executor=executor,
320         model_storage=model,
321         resource_storage=resource,
322         plugin_manager=plugin_manager,
323         **task_configuration_kwargs)
324
325
326 class TestResumableWorkflows(object):
327
328     def _create_initial_workflow_runner(
329             self, workflow_context, workflow, executor, inputs=None):
330
331         service = workflow_context.service
332         service.workflows['custom_workflow'] = tests_mock.models.create_operation(
333             'custom_workflow',
334             operation_kwargs={
335                 'function': '{0}.{1}'.format(__name__, workflow.__name__),
336                 'inputs': dict((k, models.Input.wrap(k, v)) for k, v in (inputs or {}).items())
337             }
338         )
339         workflow_context.model.service.update(service)
340
341         wf_runner = WorkflowRunner(
342             service_id=workflow_context.service.id,
343             inputs=inputs or {},
344             model_storage=workflow_context.model,
345             resource_storage=workflow_context.resource,
346             plugin_manager=None,
347             workflow_name='custom_workflow',
348             executor=executor)
349         return wf_runner
350
351     @staticmethod
352     def _wait_for_active_and_cancel(workflow_runner):
353         if custom_events['is_active'].wait(60) is False:
354             raise TimeoutError("is_active wasn't set to True")
355         workflow_runner.cancel()
356         if custom_events['execution_cancelled'].wait(60) is False:
357             raise TimeoutError("Execution did not end")
358
359     def test_resume_workflow(self, workflow_context, thread_executor):
360         node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
361         node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
362         self._create_interface(workflow_context, node, mock_pass_first_task_only)
363
364         wf_runner = self._create_initial_workflow_runner(
365             workflow_context, mock_parallel_tasks_workflow, thread_executor,
366             inputs={'number_of_tasks': 2})
367
368         wf_thread = Thread(target=wf_runner.execute)
369         wf_thread.daemon = True
370         wf_thread.start()
371
372         # Wait for the execution to start
373         self._wait_for_active_and_cancel(wf_runner)
374         node = workflow_context.model.node.refresh(node)
375
376         tasks = workflow_context.model.task.list(filters={'_stub_type': None})
377         assert any(task.status == task.SUCCESS for task in tasks)
378         assert any(task.status == task.RETRYING for task in tasks)
379         custom_events['is_resumed'].set()
380         assert any(task.status == task.RETRYING for task in tasks)
381
382         # Create a new workflow runner, with an existing execution id. This would cause
383         # the old execution to restart.
384         new_wf_runner = WorkflowRunner(
385             service_id=wf_runner.service.id,
386             inputs={},
387             model_storage=workflow_context.model,
388             resource_storage=workflow_context.resource,
389             plugin_manager=None,
390             execution_id=wf_runner.execution.id,
391             executor=thread_executor)
392
393         new_wf_runner.execute()
394
395         # Wait for it to finish and assert changes.
396         node = workflow_context.model.node.refresh(node)
397         assert all(task.status == task.SUCCESS for task in tasks)
398         assert node.attributes['invocations'].value == 3
399         assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
400
401     def test_resume_started_task(self, workflow_context, thread_executor):
402         node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
403         node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
404         self._create_interface(workflow_context, node, mock_stuck_task)
405
406         wf_runner = self._create_initial_workflow_runner(
407             workflow_context, mock_parallel_tasks_workflow, thread_executor,
408             inputs={'number_of_tasks': 1})
409
410         wf_thread = Thread(target=wf_runner.execute)
411         wf_thread.daemon = True
412         wf_thread.start()
413
414         self._wait_for_active_and_cancel(wf_runner)
415         node = workflow_context.model.node.refresh(node)
416         task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
417         assert node.attributes['invocations'].value == 1
418         assert task.status == task.STARTED
419         assert wf_runner.execution.status in (wf_runner.execution.CANCELLED,
420                                               wf_runner.execution.CANCELLING)
421         custom_events['is_resumed'].set()
422
423         new_thread_executor = thread.ThreadExecutor()
424         try:
425             new_wf_runner = WorkflowRunner(
426                 service_id=wf_runner.service.id,
427                 inputs={},
428                 model_storage=workflow_context.model,
429                 resource_storage=workflow_context.resource,
430                 plugin_manager=None,
431                 execution_id=wf_runner.execution.id,
432                 executor=new_thread_executor)
433
434             new_wf_runner.execute()
435         finally:
436             new_thread_executor.close()
437
438         # Wait for it to finish and assert changes.
439         node = workflow_context.model.node.refresh(node)
440         assert node.attributes['invocations'].value == 2
441         assert task.status == task.SUCCESS
442         assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
443
444     def test_resume_failed_task(self, workflow_context, thread_executor):
445         node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
446         node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
447         self._create_interface(workflow_context, node, mock_failed_before_resuming)
448
449         wf_runner = self._create_initial_workflow_runner(workflow_context,
450                                                          mock_parallel_tasks_workflow,
451                                                          thread_executor)
452         wf_thread = Thread(target=wf_runner.execute)
453         wf_thread.setDaemon(True)
454         wf_thread.start()
455
456         self._wait_for_active_and_cancel(wf_runner)
457         node = workflow_context.model.node.refresh(node)
458
459         task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
460         assert node.attributes['invocations'].value == 2
461         assert task.status == task.STARTED
462         assert wf_runner.execution.status in (wf_runner.execution.CANCELLED,
463                                               wf_runner.execution.CANCELLING)
464
465         custom_events['is_resumed'].set()
466         assert node.attributes['invocations'].value == 2
467
468         # Create a new workflow runner, with an existing execution id. This would cause
469         # the old execution to restart.
470         new_thread_executor = thread.ThreadExecutor()
471         try:
472             new_wf_runner = WorkflowRunner(
473                 service_id=wf_runner.service.id,
474                 inputs={},
475                 model_storage=workflow_context.model,
476                 resource_storage=workflow_context.resource,
477                 plugin_manager=None,
478                 execution_id=wf_runner.execution.id,
479                 executor=new_thread_executor)
480
481             new_wf_runner.execute()
482         finally:
483             new_thread_executor.close()
484
485         # Wait for it to finish and assert changes.
486         node = workflow_context.model.node.refresh(node)
487         assert node.attributes['invocations'].value == task.max_attempts - 1
488         assert task.status == task.SUCCESS
489         assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
490
491     def test_resume_failed_task_and_successful_task(self, workflow_context, thread_executor):
492         node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
493         node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
494         self._create_interface(workflow_context, node, mock_pass_first_task_only)
495
496         wf_runner = self._create_initial_workflow_runner(
497             workflow_context,
498             mock_parallel_tasks_workflow,
499             thread_executor,
500             inputs={'retry_interval': 1, 'max_attempts': 2, 'number_of_tasks': 2}
501         )
502         wf_thread = Thread(target=wf_runner.execute)
503         wf_thread.setDaemon(True)
504         wf_thread.start()
505
506         if custom_events['execution_failed'].wait(60) is False:
507             raise TimeoutError("Execution did not end")
508
509         tasks = workflow_context.model.task.list(filters={'_stub_type': None})
510         node = workflow_context.model.node.refresh(node)
511         assert node.attributes['invocations'].value == 3
512         failed_task = [t for t in tasks if t.status == t.FAILED][0]
513
514         # First task passes
515         assert any(task.status == task.FAILED for task in tasks)
516         assert failed_task.attempts_count == 2
517         # Second task fails
518         assert any(task.status == task.SUCCESS for task in tasks)
519         assert wf_runner.execution.status in wf_runner.execution.FAILED
520
521         custom_events['is_resumed'].set()
522         new_thread_executor = thread.ThreadExecutor()
523         try:
524             new_wf_runner = WorkflowRunner(
525                 service_id=wf_runner.service.id,
526                 retry_failed_tasks=True,
527                 inputs={},
528                 model_storage=workflow_context.model,
529                 resource_storage=workflow_context.resource,
530                 plugin_manager=None,
531                 execution_id=wf_runner.execution.id,
532                 executor=new_thread_executor)
533
534             new_wf_runner.execute()
535         finally:
536             new_thread_executor.close()
537
538         # Wait for it to finish and assert changes.
539         node = workflow_context.model.node.refresh(node)
540         assert failed_task.attempts_count == 1
541         assert node.attributes['invocations'].value == 4
542         assert all(task.status == task.SUCCESS for task in tasks)
543         assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
544
545     def test_two_sequential_task_first_task_failed(self, workflow_context, thread_executor):
546         node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
547         node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
548         self._create_interface(workflow_context, node, mock_fail_first_task_only)
549
550         wf_runner = self._create_initial_workflow_runner(
551             workflow_context,
552             mock_sequential_tasks_workflow,
553             thread_executor,
554             inputs={'retry_interval': 1, 'max_attempts': 1, 'number_of_tasks': 2}
555         )
556         wf_thread = Thread(target=wf_runner.execute)
557         wf_thread.setDaemon(True)
558         wf_thread.start()
559
560         if custom_events['execution_failed'].wait(60) is False:
561             raise TimeoutError("Execution did not end")
562
563         tasks = workflow_context.model.task.list(filters={'_stub_type': None})
564         node = workflow_context.model.node.refresh(node)
565         assert node.attributes['invocations'].value == 1
566         assert any(t.status == t.FAILED for t in tasks)
567         assert any(t.status == t.PENDING for t in tasks)
568
569         custom_events['is_resumed'].set()
570         new_thread_executor = thread.ThreadExecutor()
571         try:
572             new_wf_runner = WorkflowRunner(
573                 service_id=wf_runner.service.id,
574                 inputs={},
575                 model_storage=workflow_context.model,
576                 resource_storage=workflow_context.resource,
577                 plugin_manager=None,
578                 execution_id=wf_runner.execution.id,
579                 executor=new_thread_executor)
580
581             new_wf_runner.execute()
582         finally:
583             new_thread_executor.close()
584
585         # Wait for it to finish and assert changes.
586         node = workflow_context.model.node.refresh(node)
587         assert node.attributes['invocations'].value == 2
588         assert any(t.status == t.SUCCESS for t in tasks)
589         assert any(t.status == t.FAILED for t in tasks)
590         assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
591
592
593
594     @staticmethod
595     @pytest.fixture
596     def thread_executor():
597         result = thread.ThreadExecutor()
598         try:
599             yield result
600         finally:
601             result.close()
602
603     @staticmethod
604     @pytest.fixture
605     def workflow_context(tmpdir):
606         workflow_context = tests_mock.context.simple(str(tmpdir))
607         yield workflow_context
608         storage.release_sqlite_storage(workflow_context.model)
609
610     @staticmethod
611     def _create_interface(ctx, node, func, arguments=None):
612         interface_name = 'aria.interfaces.lifecycle'
613         operation_kwargs = dict(function='{name}.{func.__name__}'.format(
614             name=__name__, func=func))
615         if arguments:
616             # the operation has to declare the arguments before those may be passed
617             operation_kwargs['arguments'] = arguments
618         operation_name = 'create'
619         interface = tests_mock.models.create_interface(node.service, interface_name, operation_name,
620                                                        operation_kwargs=operation_kwargs)
621         node.interfaces[interface.name] = interface
622         ctx.model.node.update(node)
623
624         return node, interface_name, operation_name
625
626     @staticmethod
627     def _engine(workflow_func, workflow_context, executor):
628         graph = workflow_func(ctx=workflow_context)
629         execution = workflow_context.execution
630         graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph)
631         workflow_context.execution = execution
632
633         return engine.Engine(executors={executor.__class__: executor})
634
635     @pytest.fixture(autouse=True)
636     def register_to_events(self):
637         def execution_cancelled(*args, **kwargs):
638             custom_events['execution_cancelled'].set()
639
640         def execution_failed(*args, **kwargs):
641             custom_events['execution_failed'].set()
642
643         events.on_cancelled_workflow_signal.connect(execution_cancelled)
644         events.on_failure_workflow_signal.connect(execution_failed)
645         yield
646         events.on_cancelled_workflow_signal.disconnect(execution_cancelled)
647         events.on_failure_workflow_signal.disconnect(execution_failed)
648         for event in custom_events.values():
649             event.clear()
650
651
652 @workflow
653 def mock_sequential_tasks_workflow(ctx, graph,
654                                    retry_interval=1, max_attempts=10, number_of_tasks=1):
655     node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
656     graph.sequence(*_create_tasks(node, retry_interval, max_attempts, number_of_tasks))
657
658
659 @workflow
660 def mock_parallel_tasks_workflow(ctx, graph,
661                                  retry_interval=1, max_attempts=10, number_of_tasks=1):
662     node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
663     graph.add_tasks(*_create_tasks(node, retry_interval, max_attempts, number_of_tasks))
664
665
666 def _create_tasks(node, retry_interval, max_attempts, number_of_tasks):
667     return [
668         api.task.OperationTask(node,
669                                'aria.interfaces.lifecycle',
670                                'create',
671                                retry_interval=retry_interval,
672                                max_attempts=max_attempts)
673         for _ in xrange(number_of_tasks)
674     ]
675
676
677
678 @operation
679 def mock_failed_before_resuming(ctx):
680     """
681     The task should run atmost ctx.task.max_attempts - 1 times, and only then pass.
682     overall, the number of invocations should be ctx.task.max_attempts - 1
683     """
684     ctx.node.attributes['invocations'] += 1
685
686     if ctx.node.attributes['invocations'] == 2:
687         custom_events['is_active'].set()
688         # unfreeze the thread only when all of the invocations are done
689         while ctx.node.attributes['invocations'] < ctx.task.max_attempts - 1:
690             time.sleep(5)
691
692     elif ctx.node.attributes['invocations'] == ctx.task.max_attempts - 1:
693         # pass only just before the end.
694         return
695     else:
696         # fail o.w.
697         raise FailingTask("stop this task")
698
699
700 @operation
701 def mock_stuck_task(ctx):
702     ctx.node.attributes['invocations'] += 1
703     while not custom_events['is_resumed'].isSet():
704         if not custom_events['is_active'].isSet():
705             custom_events['is_active'].set()
706         time.sleep(5)
707
708
709 @operation
710 def mock_pass_first_task_only(ctx):
711     ctx.node.attributes['invocations'] += 1
712
713     if ctx.node.attributes['invocations'] != 1:
714         custom_events['is_active'].set()
715         if not custom_events['is_resumed'].isSet():
716             # if resume was called, increase by one. o/w fail the execution - second task should
717             # fail as long it was not a part of resuming the workflow
718             raise FailingTask("wasn't resumed yet")
719
720
721 @operation
722 def mock_fail_first_task_only(ctx):
723     ctx.node.attributes['invocations'] += 1
724
725     if not custom_events['is_resumed'].isSet() and ctx.node.attributes['invocations'] == 1:
726         raise FailingTask("First task should fail")