1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements. See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License. You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from datetime import datetime
21 from aria.orchestrator import (
26 from aria.modeling import models
27 from aria.orchestrator.workflows import (
31 from aria.orchestrator.workflows.core import engine, graph_compiler
32 from aria.orchestrator.workflows.executor import thread
34 from tests import mock, storage
37 global_test_holder = {}
40 class BaseTest(object):
43 def _execute(cls, workflow_func, workflow_context, executor):
44 eng = cls._engine(workflow_func=workflow_func,
45 workflow_context=workflow_context,
47 eng.execute(ctx=workflow_context)
51 def _engine(workflow_func, workflow_context, executor):
52 graph = workflow_func(ctx=workflow_context)
53 graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph)
55 return engine.Engine(executors={executor.__class__: executor})
58 def _create_interface(ctx, func, arguments=None):
59 node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
60 interface_name = 'aria.interfaces.lifecycle'
61 operation_kwargs = dict(function='{name}.{func.__name__}'.format(
62 name=__name__, func=func))
64 # the operation has to declare the arguments before those may be passed
65 operation_kwargs['arguments'] = arguments
66 operation_name = 'create'
67 interface = mock.models.create_interface(node.service, interface_name, operation_name,
68 operation_kwargs=operation_kwargs)
69 node.interfaces[interface.name] = interface
70 ctx.model.node.update(node)
72 return node, interface_name, operation_name
82 return api.task.OperationTask(
84 interface_name='aria.interfaces.lifecycle',
85 operation_name=operation_name,
87 max_attempts=max_attempts,
88 retry_interval=retry_interval,
89 ignore_failure=ignore_failure,
92 @pytest.fixture(autouse=True)
93 def globals_cleanup(self):
97 global_test_holder.clear()
99 @pytest.fixture(autouse=True)
100 def signals_registration(self, ):
101 def sent_task_handler(ctx, *args, **kwargs):
102 if ctx.task._stub_type is None:
103 calls = global_test_holder.setdefault('sent_task_signal_calls', 0)
104 global_test_holder['sent_task_signal_calls'] = calls + 1
106 def start_workflow_handler(workflow_context, *args, **kwargs):
107 workflow_context.states.append('start')
109 def success_workflow_handler(workflow_context, *args, **kwargs):
110 workflow_context.states.append('success')
112 def failure_workflow_handler(workflow_context, exception, *args, **kwargs):
113 workflow_context.states.append('failure')
114 workflow_context.exception = exception
116 def cancel_workflow_handler(workflow_context, *args, **kwargs):
117 workflow_context.states.append('cancel')
119 events.start_workflow_signal.connect(start_workflow_handler)
120 events.on_success_workflow_signal.connect(success_workflow_handler)
121 events.on_failure_workflow_signal.connect(failure_workflow_handler)
122 events.on_cancelled_workflow_signal.connect(cancel_workflow_handler)
123 events.sent_task_signal.connect(sent_task_handler)
127 events.start_workflow_signal.disconnect(start_workflow_handler)
128 events.on_success_workflow_signal.disconnect(success_workflow_handler)
129 events.on_failure_workflow_signal.disconnect(failure_workflow_handler)
130 events.on_cancelled_workflow_signal.disconnect(cancel_workflow_handler)
131 events.sent_task_signal.disconnect(sent_task_handler)
135 result = thread.ThreadExecutor()
142 def workflow_context(self, tmpdir):
143 workflow_context = mock.context.simple(str(tmpdir))
144 workflow_context.states = []
145 workflow_context.exception = None
146 yield workflow_context
147 storage.release_sqlite_storage(workflow_context.model)
150 class TestEngine(BaseTest):
152 def test_empty_graph_execution(self, workflow_context, executor):
154 def mock_workflow(**_):
156 self._execute(workflow_func=mock_workflow,
157 workflow_context=workflow_context,
159 assert workflow_context.states == ['start', 'success']
160 assert workflow_context.exception is None
161 assert 'sent_task_signal_calls' not in global_test_holder
162 execution = workflow_context.execution
163 assert execution.started_at <= execution.ended_at <= datetime.utcnow()
164 assert execution.error is None
165 assert execution.status == models.Execution.SUCCEEDED
167 def test_single_task_successful_execution(self, workflow_context, executor):
168 node, _, operation_name = self._create_interface(workflow_context, mock_success_task)
171 def mock_workflow(ctx, graph):
172 graph.add_tasks(self._op(node, operation_name))
174 workflow_func=mock_workflow,
175 workflow_context=workflow_context,
177 assert workflow_context.states == ['start', 'success']
178 assert workflow_context.exception is None
179 assert global_test_holder.get('sent_task_signal_calls') == 1
181 def test_single_task_failed_execution(self, workflow_context, executor):
182 node, _, operation_name = self._create_interface(workflow_context, mock_failed_task)
185 def mock_workflow(ctx, graph):
186 graph.add_tasks(self._op(node, operation_name))
187 with pytest.raises(exceptions.ExecutorException):
189 workflow_func=mock_workflow,
190 workflow_context=workflow_context,
192 assert workflow_context.states == ['start', 'failure']
193 assert isinstance(workflow_context.exception, exceptions.ExecutorException)
194 assert global_test_holder.get('sent_task_signal_calls') == 1
195 execution = workflow_context.execution
196 assert execution.started_at <= execution.ended_at <= datetime.utcnow()
197 assert execution.error is not None
198 assert execution.status == models.Execution.FAILED
200 def test_two_tasks_execution_order(self, workflow_context, executor):
201 node, _, operation_name = self._create_interface(
202 workflow_context, mock_ordered_task, {'counter': 1})
205 def mock_workflow(ctx, graph):
206 op1 = self._op(node, operation_name, arguments={'counter': 1})
207 op2 = self._op(node, operation_name, arguments={'counter': 2})
208 graph.sequence(op1, op2)
210 workflow_func=mock_workflow,
211 workflow_context=workflow_context,
213 assert workflow_context.states == ['start', 'success']
214 assert workflow_context.exception is None
215 assert global_test_holder.get('invocations') == [1, 2]
216 assert global_test_holder.get('sent_task_signal_calls') == 2
218 def test_stub_and_subworkflow_execution(self, workflow_context, executor):
219 node, _, operation_name = self._create_interface(
220 workflow_context, mock_ordered_task, {'counter': 1})
223 def sub_workflow(ctx, graph):
224 op1 = self._op(node, operation_name, arguments={'counter': 1})
225 op2 = api.task.StubTask()
226 op3 = self._op(node, operation_name, arguments={'counter': 2})
227 graph.sequence(op1, op2, op3)
230 def mock_workflow(ctx, graph):
231 graph.add_tasks(api.task.WorkflowTask(sub_workflow, ctx=ctx))
232 self._execute(workflow_func=mock_workflow,
233 workflow_context=workflow_context,
235 assert workflow_context.states == ['start', 'success']
236 assert workflow_context.exception is None
237 assert global_test_holder.get('invocations') == [1, 2]
238 assert global_test_holder.get('sent_task_signal_calls') == 2
241 class TestCancel(BaseTest):
243 def test_cancel_started_execution(self, workflow_context, executor):
244 number_of_tasks = 100
245 node, _, operation_name = self._create_interface(
246 workflow_context, mock_sleep_task, {'seconds': 0.1})
249 def mock_workflow(ctx, graph):
251 self._op(node, operation_name, arguments=dict(seconds=0.1))
252 for _ in range(number_of_tasks)
254 return graph.sequence(*operations)
256 eng = self._engine(workflow_func=mock_workflow,
257 workflow_context=workflow_context,
259 t = threading.Thread(target=eng.execute, kwargs=dict(ctx=workflow_context))
263 eng.cancel_execution(workflow_context)
264 t.join(timeout=60) # we need to give this a *lot* of time because Travis can be *very* slow
265 assert not t.is_alive() # if join is timed out it will not raise an exception
266 assert workflow_context.states == ['start', 'cancel']
267 assert workflow_context.exception is None
268 invocations = global_test_holder.get('invocations', [])
269 assert 0 < len(invocations) < number_of_tasks
270 execution = workflow_context.execution
271 assert execution.started_at <= execution.ended_at <= datetime.utcnow()
272 assert execution.error is None
273 assert execution.status == models.Execution.CANCELLED
275 def test_cancel_pending_execution(self, workflow_context, executor):
277 def mock_workflow(graph, **_):
279 eng = self._engine(workflow_func=mock_workflow,
280 workflow_context=workflow_context,
282 eng.cancel_execution(workflow_context)
283 execution = workflow_context.execution
284 assert execution.status == models.Execution.CANCELLED
287 class TestRetries(BaseTest):
289 def test_two_max_attempts_and_success_on_retry(self, workflow_context, executor):
290 node, _, operation_name = self._create_interface(
291 workflow_context, mock_conditional_failure_task, {'failure_count': 1})
294 def mock_workflow(ctx, graph):
295 op = self._op(node, operation_name,
296 arguments={'failure_count': 1},
300 workflow_func=mock_workflow,
301 workflow_context=workflow_context,
303 assert workflow_context.states == ['start', 'success']
304 assert workflow_context.exception is None
305 assert len(global_test_holder.get('invocations', [])) == 2
306 assert global_test_holder.get('sent_task_signal_calls') == 2
308 def test_two_max_attempts_and_failure_on_retry(self, workflow_context, executor):
309 node, _, operation_name = self._create_interface(
310 workflow_context, mock_conditional_failure_task, {'failure_count': 1})
313 def mock_workflow(ctx, graph):
314 op = self._op(node, operation_name,
315 arguments={'failure_count': 2},
318 with pytest.raises(exceptions.ExecutorException):
320 workflow_func=mock_workflow,
321 workflow_context=workflow_context,
323 assert workflow_context.states == ['start', 'failure']
324 assert isinstance(workflow_context.exception, exceptions.ExecutorException)
325 assert len(global_test_holder.get('invocations', [])) == 2
326 assert global_test_holder.get('sent_task_signal_calls') == 2
328 def test_three_max_attempts_and_success_on_first_retry(self, workflow_context, executor):
329 node, _, operation_name = self._create_interface(
330 workflow_context, mock_conditional_failure_task, {'failure_count': 1})
332 def mock_workflow(ctx, graph):
333 op = self._op(node, operation_name,
334 arguments={'failure_count': 1},
338 workflow_func=mock_workflow,
339 workflow_context=workflow_context,
341 assert workflow_context.states == ['start', 'success']
342 assert workflow_context.exception is None
343 assert len(global_test_holder.get('invocations', [])) == 2
344 assert global_test_holder.get('sent_task_signal_calls') == 2
346 def test_three_max_attempts_and_success_on_second_retry(self, workflow_context, executor):
347 node, _, operation_name = self._create_interface(
348 workflow_context, mock_conditional_failure_task, {'failure_count': 1})
351 def mock_workflow(ctx, graph):
352 op = self._op(node, operation_name,
353 arguments={'failure_count': 2},
357 workflow_func=mock_workflow,
358 workflow_context=workflow_context,
360 assert workflow_context.states == ['start', 'success']
361 assert workflow_context.exception is None
362 assert len(global_test_holder.get('invocations', [])) == 3
363 assert global_test_holder.get('sent_task_signal_calls') == 3
365 def test_infinite_retries(self, workflow_context, executor):
366 node, _, operation_name = self._create_interface(
367 workflow_context, mock_conditional_failure_task, {'failure_count': 1})
369 def mock_workflow(ctx, graph):
370 op = self._op(node, operation_name,
371 arguments={'failure_count': 1},
375 workflow_func=mock_workflow,
376 workflow_context=workflow_context,
378 assert workflow_context.states == ['start', 'success']
379 assert workflow_context.exception is None
380 assert len(global_test_holder.get('invocations', [])) == 2
381 assert global_test_holder.get('sent_task_signal_calls') == 2
383 def test_retry_interval_float(self, workflow_context, executor):
384 self._test_retry_interval(retry_interval=0.3,
385 workflow_context=workflow_context,
388 def test_retry_interval_int(self, workflow_context, executor):
389 self._test_retry_interval(retry_interval=1,
390 workflow_context=workflow_context,
393 def _test_retry_interval(self, retry_interval, workflow_context, executor):
394 node, _, operation_name = self._create_interface(
395 workflow_context, mock_conditional_failure_task, {'failure_count': 1})
397 def mock_workflow(ctx, graph):
398 op = self._op(node, operation_name,
399 arguments={'failure_count': 1},
401 retry_interval=retry_interval)
404 workflow_func=mock_workflow,
405 workflow_context=workflow_context,
407 assert workflow_context.states == ['start', 'success']
408 assert workflow_context.exception is None
409 invocations = global_test_holder.get('invocations', [])
410 assert len(invocations) == 2
411 invocation1, invocation2 = invocations
412 assert invocation2 - invocation1 >= retry_interval
413 assert global_test_holder.get('sent_task_signal_calls') == 2
415 def test_ignore_failure(self, workflow_context, executor):
416 node, _, operation_name = self._create_interface(
417 workflow_context, mock_conditional_failure_task, {'failure_count': 1})
419 def mock_workflow(ctx, graph):
420 op = self._op(node, operation_name,
422 arguments={'failure_count': 100},
426 workflow_func=mock_workflow,
427 workflow_context=workflow_context,
429 assert workflow_context.states == ['start', 'success']
430 assert workflow_context.exception is None
431 invocations = global_test_holder.get('invocations', [])
432 assert len(invocations) == 1
433 assert global_test_holder.get('sent_task_signal_calls') == 1
436 class TestTaskRetryAndAbort(BaseTest):
437 message = 'EXPECTED_ERROR'
439 def test_task_retry_default_interval(self, workflow_context, executor):
440 default_retry_interval = 0.1
441 node, _, operation_name = self._create_interface(
442 workflow_context, mock_task_retry, {'message': self.message})
445 def mock_workflow(ctx, graph):
446 op = self._op(node, operation_name,
447 arguments={'message': self.message},
448 retry_interval=default_retry_interval,
451 with pytest.raises(exceptions.ExecutorException):
453 workflow_func=mock_workflow,
454 workflow_context=workflow_context,
456 assert workflow_context.states == ['start', 'failure']
457 assert isinstance(workflow_context.exception, exceptions.ExecutorException)
458 invocations = global_test_holder.get('invocations', [])
459 assert len(invocations) == 2
460 invocation1, invocation2 = invocations
461 assert invocation2 - invocation1 >= default_retry_interval
462 assert global_test_holder.get('sent_task_signal_calls') == 2
464 def test_task_retry_custom_interval(self, workflow_context, executor):
465 default_retry_interval = 100
466 custom_retry_interval = 0.1
467 node, _, operation_name = self._create_interface(
468 workflow_context, mock_task_retry, {'message': self.message,
469 'retry_interval': custom_retry_interval})
472 def mock_workflow(ctx, graph):
473 op = self._op(node, operation_name,
474 arguments={'message': self.message,
475 'retry_interval': custom_retry_interval},
476 retry_interval=default_retry_interval,
479 execution_start = time.time()
480 with pytest.raises(exceptions.ExecutorException):
482 workflow_func=mock_workflow,
483 workflow_context=workflow_context,
485 execution_end = time.time()
486 assert workflow_context.states == ['start', 'failure']
487 assert isinstance(workflow_context.exception, exceptions.ExecutorException)
488 invocations = global_test_holder.get('invocations', [])
489 assert len(invocations) == 2
490 assert (execution_end - execution_start) < default_retry_interval
491 assert global_test_holder.get('sent_task_signal_calls') == 2
493 def test_task_abort(self, workflow_context, executor):
494 node, _, operation_name = self._create_interface(
495 workflow_context, mock_task_abort, {'message': self.message})
497 def mock_workflow(ctx, graph):
498 op = self._op(node, operation_name,
499 arguments={'message': self.message},
503 with pytest.raises(exceptions.ExecutorException):
505 workflow_func=mock_workflow,
506 workflow_context=workflow_context,
508 assert workflow_context.states == ['start', 'failure']
509 assert isinstance(workflow_context.exception, exceptions.ExecutorException)
510 invocations = global_test_holder.get('invocations', [])
511 assert len(invocations) == 1
512 assert global_test_holder.get('sent_task_signal_calls') == 1
516 def mock_success_task(**_):
521 def mock_failed_task(**_):
526 def mock_ordered_task(counter, **_):
527 invocations = global_test_holder.setdefault('invocations', [])
528 invocations.append(counter)
532 def mock_conditional_failure_task(failure_count, **_):
533 invocations = global_test_holder.setdefault('invocations', [])
535 if len(invocations) < failure_count:
538 invocations.append(time.time())
542 def mock_sleep_task(seconds, **_):
543 _add_invocation_timestamp()
548 def mock_task_retry(ctx, message, retry_interval=None, **_):
549 _add_invocation_timestamp()
551 if retry_interval is not None:
552 retry_kwargs['retry_interval'] = retry_interval
553 ctx.task.retry(message, **retry_kwargs)
557 def mock_task_abort(ctx, message, **_):
558 _add_invocation_timestamp()
559 ctx.task.abort(message)
562 def _add_invocation_timestamp():
563 invocations = global_test_holder.setdefault('invocations', [])
564 invocations.append(time.time())