Merge "vFW and vDNS support added to azure-plugin"
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / tests / orchestrator / workflows / core / test_engine.py
1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements.  See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License.  You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import time
16 import threading
17 from datetime import datetime
18
19 import pytest
20
21 from aria.orchestrator import (
22     events,
23     workflow,
24     operation,
25 )
26 from aria.modeling import models
27 from aria.orchestrator.workflows import (
28     api,
29     exceptions,
30 )
31 from aria.orchestrator.workflows.core import engine, graph_compiler
32 from aria.orchestrator.workflows.executor import thread
33
34 from tests import mock, storage
35
36
37 global_test_holder = {}
38
39
40 class BaseTest(object):
41
42     @classmethod
43     def _execute(cls, workflow_func, workflow_context, executor):
44         eng = cls._engine(workflow_func=workflow_func,
45                           workflow_context=workflow_context,
46                           executor=executor)
47         eng.execute(ctx=workflow_context)
48         return eng
49
50     @staticmethod
51     def _engine(workflow_func, workflow_context, executor):
52         graph = workflow_func(ctx=workflow_context)
53         graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph)
54
55         return engine.Engine(executors={executor.__class__: executor})
56
57     @staticmethod
58     def _create_interface(ctx, func, arguments=None):
59         node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
60         interface_name = 'aria.interfaces.lifecycle'
61         operation_kwargs = dict(function='{name}.{func.__name__}'.format(
62             name=__name__, func=func))
63         if arguments:
64             # the operation has to declare the arguments before those may be passed
65             operation_kwargs['arguments'] = arguments
66         operation_name = 'create'
67         interface = mock.models.create_interface(node.service, interface_name, operation_name,
68                                                  operation_kwargs=operation_kwargs)
69         node.interfaces[interface.name] = interface
70         ctx.model.node.update(node)
71
72         return node, interface_name, operation_name
73
74     @staticmethod
75     def _op(node,
76             operation_name,
77             arguments=None,
78             max_attempts=None,
79             retry_interval=None,
80             ignore_failure=None):
81
82         return api.task.OperationTask(
83             node,
84             interface_name='aria.interfaces.lifecycle',
85             operation_name=operation_name,
86             arguments=arguments,
87             max_attempts=max_attempts,
88             retry_interval=retry_interval,
89             ignore_failure=ignore_failure,
90         )
91
92     @pytest.fixture(autouse=True)
93     def globals_cleanup(self):
94         try:
95             yield
96         finally:
97             global_test_holder.clear()
98
99     @pytest.fixture(autouse=True)
100     def signals_registration(self, ):
101         def sent_task_handler(ctx, *args, **kwargs):
102             if ctx.task._stub_type is None:
103                 calls = global_test_holder.setdefault('sent_task_signal_calls', 0)
104                 global_test_holder['sent_task_signal_calls'] = calls + 1
105
106         def start_workflow_handler(workflow_context, *args, **kwargs):
107             workflow_context.states.append('start')
108
109         def success_workflow_handler(workflow_context, *args, **kwargs):
110             workflow_context.states.append('success')
111
112         def failure_workflow_handler(workflow_context, exception, *args, **kwargs):
113             workflow_context.states.append('failure')
114             workflow_context.exception = exception
115
116         def cancel_workflow_handler(workflow_context, *args, **kwargs):
117             workflow_context.states.append('cancel')
118
119         events.start_workflow_signal.connect(start_workflow_handler)
120         events.on_success_workflow_signal.connect(success_workflow_handler)
121         events.on_failure_workflow_signal.connect(failure_workflow_handler)
122         events.on_cancelled_workflow_signal.connect(cancel_workflow_handler)
123         events.sent_task_signal.connect(sent_task_handler)
124         try:
125             yield
126         finally:
127             events.start_workflow_signal.disconnect(start_workflow_handler)
128             events.on_success_workflow_signal.disconnect(success_workflow_handler)
129             events.on_failure_workflow_signal.disconnect(failure_workflow_handler)
130             events.on_cancelled_workflow_signal.disconnect(cancel_workflow_handler)
131             events.sent_task_signal.disconnect(sent_task_handler)
132
133     @pytest.fixture
134     def executor(self):
135         result = thread.ThreadExecutor()
136         try:
137             yield result
138         finally:
139             result.close()
140
141     @pytest.fixture
142     def workflow_context(self, tmpdir):
143         workflow_context = mock.context.simple(str(tmpdir))
144         workflow_context.states = []
145         workflow_context.exception = None
146         yield workflow_context
147         storage.release_sqlite_storage(workflow_context.model)
148
149
150 class TestEngine(BaseTest):
151
152     def test_empty_graph_execution(self, workflow_context, executor):
153         @workflow
154         def mock_workflow(**_):
155             pass
156         self._execute(workflow_func=mock_workflow,
157                       workflow_context=workflow_context,
158                       executor=executor)
159         assert workflow_context.states == ['start', 'success']
160         assert workflow_context.exception is None
161         assert 'sent_task_signal_calls' not in global_test_holder
162         execution = workflow_context.execution
163         assert execution.started_at <= execution.ended_at <= datetime.utcnow()
164         assert execution.error is None
165         assert execution.status == models.Execution.SUCCEEDED
166
167     def test_single_task_successful_execution(self, workflow_context, executor):
168         node, _, operation_name = self._create_interface(workflow_context, mock_success_task)
169
170         @workflow
171         def mock_workflow(ctx, graph):
172             graph.add_tasks(self._op(node, operation_name))
173         self._execute(
174             workflow_func=mock_workflow,
175             workflow_context=workflow_context,
176             executor=executor)
177         assert workflow_context.states == ['start', 'success']
178         assert workflow_context.exception is None
179         assert global_test_holder.get('sent_task_signal_calls') == 1
180
181     def test_single_task_failed_execution(self, workflow_context, executor):
182         node, _, operation_name = self._create_interface(workflow_context, mock_failed_task)
183
184         @workflow
185         def mock_workflow(ctx, graph):
186             graph.add_tasks(self._op(node, operation_name))
187         with pytest.raises(exceptions.ExecutorException):
188             self._execute(
189                 workflow_func=mock_workflow,
190                 workflow_context=workflow_context,
191                 executor=executor)
192         assert workflow_context.states == ['start', 'failure']
193         assert isinstance(workflow_context.exception, exceptions.ExecutorException)
194         assert global_test_holder.get('sent_task_signal_calls') == 1
195         execution = workflow_context.execution
196         assert execution.started_at <= execution.ended_at <= datetime.utcnow()
197         assert execution.error is not None
198         assert execution.status == models.Execution.FAILED
199
200     def test_two_tasks_execution_order(self, workflow_context, executor):
201         node, _, operation_name = self._create_interface(
202             workflow_context, mock_ordered_task, {'counter': 1})
203
204         @workflow
205         def mock_workflow(ctx, graph):
206             op1 = self._op(node, operation_name, arguments={'counter': 1})
207             op2 = self._op(node, operation_name, arguments={'counter': 2})
208             graph.sequence(op1, op2)
209         self._execute(
210             workflow_func=mock_workflow,
211             workflow_context=workflow_context,
212             executor=executor)
213         assert workflow_context.states == ['start', 'success']
214         assert workflow_context.exception is None
215         assert global_test_holder.get('invocations') == [1, 2]
216         assert global_test_holder.get('sent_task_signal_calls') == 2
217
218     def test_stub_and_subworkflow_execution(self, workflow_context, executor):
219         node, _, operation_name = self._create_interface(
220             workflow_context, mock_ordered_task, {'counter': 1})
221
222         @workflow
223         def sub_workflow(ctx, graph):
224             op1 = self._op(node, operation_name, arguments={'counter': 1})
225             op2 = api.task.StubTask()
226             op3 = self._op(node, operation_name, arguments={'counter': 2})
227             graph.sequence(op1, op2, op3)
228
229         @workflow
230         def mock_workflow(ctx, graph):
231             graph.add_tasks(api.task.WorkflowTask(sub_workflow, ctx=ctx))
232         self._execute(workflow_func=mock_workflow,
233                       workflow_context=workflow_context,
234                       executor=executor)
235         assert workflow_context.states == ['start', 'success']
236         assert workflow_context.exception is None
237         assert global_test_holder.get('invocations') == [1, 2]
238         assert global_test_holder.get('sent_task_signal_calls') == 2
239
240
241 class TestCancel(BaseTest):
242
243     def test_cancel_started_execution(self, workflow_context, executor):
244         number_of_tasks = 100
245         node, _, operation_name = self._create_interface(
246             workflow_context, mock_sleep_task, {'seconds': 0.1})
247
248         @workflow
249         def mock_workflow(ctx, graph):
250             operations = (
251                 self._op(node, operation_name, arguments=dict(seconds=0.1))
252                 for _ in range(number_of_tasks)
253             )
254             return graph.sequence(*operations)
255
256         eng = self._engine(workflow_func=mock_workflow,
257                            workflow_context=workflow_context,
258                            executor=executor)
259         t = threading.Thread(target=eng.execute, kwargs=dict(ctx=workflow_context))
260         t.daemon = True
261         t.start()
262         time.sleep(10)
263         eng.cancel_execution(workflow_context)
264         t.join(timeout=60) # we need to give this a *lot* of time because Travis can be *very* slow
265         assert not t.is_alive() # if join is timed out it will not raise an exception
266         assert workflow_context.states == ['start', 'cancel']
267         assert workflow_context.exception is None
268         invocations = global_test_holder.get('invocations', [])
269         assert 0 < len(invocations) < number_of_tasks
270         execution = workflow_context.execution
271         assert execution.started_at <= execution.ended_at <= datetime.utcnow()
272         assert execution.error is None
273         assert execution.status == models.Execution.CANCELLED
274
275     def test_cancel_pending_execution(self, workflow_context, executor):
276         @workflow
277         def mock_workflow(graph, **_):
278             return graph
279         eng = self._engine(workflow_func=mock_workflow,
280                            workflow_context=workflow_context,
281                            executor=executor)
282         eng.cancel_execution(workflow_context)
283         execution = workflow_context.execution
284         assert execution.status == models.Execution.CANCELLED
285
286
287 class TestRetries(BaseTest):
288
289     def test_two_max_attempts_and_success_on_retry(self, workflow_context, executor):
290         node, _, operation_name = self._create_interface(
291             workflow_context, mock_conditional_failure_task, {'failure_count': 1})
292
293         @workflow
294         def mock_workflow(ctx, graph):
295             op = self._op(node, operation_name,
296                           arguments={'failure_count': 1},
297                           max_attempts=2)
298             graph.add_tasks(op)
299         self._execute(
300             workflow_func=mock_workflow,
301             workflow_context=workflow_context,
302             executor=executor)
303         assert workflow_context.states == ['start', 'success']
304         assert workflow_context.exception is None
305         assert len(global_test_holder.get('invocations', [])) == 2
306         assert global_test_holder.get('sent_task_signal_calls') == 2
307
308     def test_two_max_attempts_and_failure_on_retry(self, workflow_context, executor):
309         node, _, operation_name = self._create_interface(
310             workflow_context, mock_conditional_failure_task, {'failure_count': 1})
311
312         @workflow
313         def mock_workflow(ctx, graph):
314             op = self._op(node, operation_name,
315                           arguments={'failure_count': 2},
316                           max_attempts=2)
317             graph.add_tasks(op)
318         with pytest.raises(exceptions.ExecutorException):
319             self._execute(
320                 workflow_func=mock_workflow,
321                 workflow_context=workflow_context,
322                 executor=executor)
323         assert workflow_context.states == ['start', 'failure']
324         assert isinstance(workflow_context.exception, exceptions.ExecutorException)
325         assert len(global_test_holder.get('invocations', [])) == 2
326         assert global_test_holder.get('sent_task_signal_calls') == 2
327
328     def test_three_max_attempts_and_success_on_first_retry(self, workflow_context, executor):
329         node, _, operation_name = self._create_interface(
330             workflow_context, mock_conditional_failure_task, {'failure_count': 1})
331         @workflow
332         def mock_workflow(ctx, graph):
333             op = self._op(node, operation_name,
334                           arguments={'failure_count': 1},
335                           max_attempts=3)
336             graph.add_tasks(op)
337         self._execute(
338             workflow_func=mock_workflow,
339             workflow_context=workflow_context,
340             executor=executor)
341         assert workflow_context.states == ['start', 'success']
342         assert workflow_context.exception is None
343         assert len(global_test_holder.get('invocations', [])) == 2
344         assert global_test_holder.get('sent_task_signal_calls') == 2
345
346     def test_three_max_attempts_and_success_on_second_retry(self, workflow_context, executor):
347         node, _, operation_name = self._create_interface(
348             workflow_context, mock_conditional_failure_task, {'failure_count': 1})
349
350         @workflow
351         def mock_workflow(ctx, graph):
352             op = self._op(node, operation_name,
353                           arguments={'failure_count': 2},
354                           max_attempts=3)
355             graph.add_tasks(op)
356         self._execute(
357             workflow_func=mock_workflow,
358             workflow_context=workflow_context,
359             executor=executor)
360         assert workflow_context.states == ['start', 'success']
361         assert workflow_context.exception is None
362         assert len(global_test_holder.get('invocations', [])) == 3
363         assert global_test_holder.get('sent_task_signal_calls') == 3
364
365     def test_infinite_retries(self, workflow_context, executor):
366         node, _, operation_name = self._create_interface(
367             workflow_context, mock_conditional_failure_task, {'failure_count': 1})
368         @workflow
369         def mock_workflow(ctx, graph):
370             op = self._op(node, operation_name,
371                           arguments={'failure_count': 1},
372                           max_attempts=-1)
373             graph.add_tasks(op)
374         self._execute(
375             workflow_func=mock_workflow,
376             workflow_context=workflow_context,
377             executor=executor)
378         assert workflow_context.states == ['start', 'success']
379         assert workflow_context.exception is None
380         assert len(global_test_holder.get('invocations', [])) == 2
381         assert global_test_holder.get('sent_task_signal_calls') == 2
382
383     def test_retry_interval_float(self, workflow_context, executor):
384         self._test_retry_interval(retry_interval=0.3,
385                                   workflow_context=workflow_context,
386                                   executor=executor)
387
388     def test_retry_interval_int(self, workflow_context, executor):
389         self._test_retry_interval(retry_interval=1,
390                                   workflow_context=workflow_context,
391                                   executor=executor)
392
393     def _test_retry_interval(self, retry_interval, workflow_context, executor):
394         node, _, operation_name = self._create_interface(
395             workflow_context, mock_conditional_failure_task, {'failure_count': 1})
396         @workflow
397         def mock_workflow(ctx, graph):
398             op = self._op(node, operation_name,
399                           arguments={'failure_count': 1},
400                           max_attempts=2,
401                           retry_interval=retry_interval)
402             graph.add_tasks(op)
403         self._execute(
404             workflow_func=mock_workflow,
405             workflow_context=workflow_context,
406             executor=executor)
407         assert workflow_context.states == ['start', 'success']
408         assert workflow_context.exception is None
409         invocations = global_test_holder.get('invocations', [])
410         assert len(invocations) == 2
411         invocation1, invocation2 = invocations
412         assert invocation2 - invocation1 >= retry_interval
413         assert global_test_holder.get('sent_task_signal_calls') == 2
414
415     def test_ignore_failure(self, workflow_context, executor):
416         node, _, operation_name = self._create_interface(
417             workflow_context, mock_conditional_failure_task, {'failure_count': 1})
418         @workflow
419         def mock_workflow(ctx, graph):
420             op = self._op(node, operation_name,
421                           ignore_failure=True,
422                           arguments={'failure_count': 100},
423                           max_attempts=100)
424             graph.add_tasks(op)
425         self._execute(
426             workflow_func=mock_workflow,
427             workflow_context=workflow_context,
428             executor=executor)
429         assert workflow_context.states == ['start', 'success']
430         assert workflow_context.exception is None
431         invocations = global_test_holder.get('invocations', [])
432         assert len(invocations) == 1
433         assert global_test_holder.get('sent_task_signal_calls') == 1
434
435
436 class TestTaskRetryAndAbort(BaseTest):
437     message = 'EXPECTED_ERROR'
438
439     def test_task_retry_default_interval(self, workflow_context, executor):
440         default_retry_interval = 0.1
441         node, _, operation_name = self._create_interface(
442             workflow_context, mock_task_retry, {'message': self.message})
443
444         @workflow
445         def mock_workflow(ctx, graph):
446             op = self._op(node, operation_name,
447                           arguments={'message': self.message},
448                           retry_interval=default_retry_interval,
449                           max_attempts=2)
450             graph.add_tasks(op)
451         with pytest.raises(exceptions.ExecutorException):
452             self._execute(
453                 workflow_func=mock_workflow,
454                 workflow_context=workflow_context,
455                 executor=executor)
456         assert workflow_context.states == ['start', 'failure']
457         assert isinstance(workflow_context.exception, exceptions.ExecutorException)
458         invocations = global_test_holder.get('invocations', [])
459         assert len(invocations) == 2
460         invocation1, invocation2 = invocations
461         assert invocation2 - invocation1 >= default_retry_interval
462         assert global_test_holder.get('sent_task_signal_calls') == 2
463
464     def test_task_retry_custom_interval(self, workflow_context, executor):
465         default_retry_interval = 100
466         custom_retry_interval = 0.1
467         node, _, operation_name = self._create_interface(
468             workflow_context, mock_task_retry, {'message': self.message,
469                                                 'retry_interval': custom_retry_interval})
470
471         @workflow
472         def mock_workflow(ctx, graph):
473             op = self._op(node, operation_name,
474                           arguments={'message': self.message,
475                                      'retry_interval': custom_retry_interval},
476                           retry_interval=default_retry_interval,
477                           max_attempts=2)
478             graph.add_tasks(op)
479         execution_start = time.time()
480         with pytest.raises(exceptions.ExecutorException):
481             self._execute(
482                 workflow_func=mock_workflow,
483                 workflow_context=workflow_context,
484                 executor=executor)
485         execution_end = time.time()
486         assert workflow_context.states == ['start', 'failure']
487         assert isinstance(workflow_context.exception, exceptions.ExecutorException)
488         invocations = global_test_holder.get('invocations', [])
489         assert len(invocations) == 2
490         assert (execution_end - execution_start) < default_retry_interval
491         assert global_test_holder.get('sent_task_signal_calls') == 2
492
493     def test_task_abort(self, workflow_context, executor):
494         node, _, operation_name = self._create_interface(
495             workflow_context, mock_task_abort, {'message': self.message})
496         @workflow
497         def mock_workflow(ctx, graph):
498             op = self._op(node, operation_name,
499                           arguments={'message': self.message},
500                           retry_interval=100,
501                           max_attempts=100)
502             graph.add_tasks(op)
503         with pytest.raises(exceptions.ExecutorException):
504             self._execute(
505                 workflow_func=mock_workflow,
506                 workflow_context=workflow_context,
507                 executor=executor)
508         assert workflow_context.states == ['start', 'failure']
509         assert isinstance(workflow_context.exception, exceptions.ExecutorException)
510         invocations = global_test_holder.get('invocations', [])
511         assert len(invocations) == 1
512         assert global_test_holder.get('sent_task_signal_calls') == 1
513
514
515 @operation
516 def mock_success_task(**_):
517     pass
518
519
520 @operation
521 def mock_failed_task(**_):
522     raise RuntimeError
523
524
525 @operation
526 def mock_ordered_task(counter, **_):
527     invocations = global_test_holder.setdefault('invocations', [])
528     invocations.append(counter)
529
530
531 @operation
532 def mock_conditional_failure_task(failure_count, **_):
533     invocations = global_test_holder.setdefault('invocations', [])
534     try:
535         if len(invocations) < failure_count:
536             raise RuntimeError
537     finally:
538         invocations.append(time.time())
539
540
541 @operation
542 def mock_sleep_task(seconds, **_):
543     _add_invocation_timestamp()
544     time.sleep(seconds)
545
546
547 @operation
548 def mock_task_retry(ctx, message, retry_interval=None, **_):
549     _add_invocation_timestamp()
550     retry_kwargs = {}
551     if retry_interval is not None:
552         retry_kwargs['retry_interval'] = retry_interval
553     ctx.task.retry(message, **retry_kwargs)
554
555
556 @operation
557 def mock_task_abort(ctx, message, **_):
558     _add_invocation_timestamp()
559     ctx.task.abort(message)
560
561
562 def _add_invocation_timestamp():
563     invocations = global_test_holder.setdefault('invocations', [])
564     invocations.append(time.time())