vFW and vDNS support added to azure-plugin
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / tests / orchestrator / workflows / executor / test_executor.py
1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements.  See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License.  You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16
17 import pytest
18 import retrying
19
20 try:
21     import celery as _celery
22     app = _celery.Celery()
23     app.conf.update(CELERY_RESULT_BACKEND='amqp://')
24 except ImportError:
25     _celery = None
26     app = None
27
28 import aria
29 from aria.modeling import models
30 from aria.orchestrator import events
31 from aria.orchestrator.workflows.executor import (
32     thread,
33     process,
34     # celery
35 )
36
37 import tests
38 from . import MockContext
39
40
41 def _get_function(func):
42     return '{module}.{func.__name__}'.format(module=__name__, func=func)
43
44
45 def execute_and_assert(executor, storage=None):
46     expected_value = 'value'
47     successful_task = MockContext(
48         storage, task_kwargs=dict(function=_get_function(mock_successful_task))
49     )
50     failing_task = MockContext(
51         storage, task_kwargs=dict(function=_get_function(mock_failing_task))
52     )
53     task_with_inputs = MockContext(
54         storage,
55         task_kwargs=dict(function=_get_function(mock_task_with_input),
56                          arguments={'input': models.Argument.wrap('input', 'value')})
57     )
58
59     for task in [successful_task, failing_task, task_with_inputs]:
60         executor.execute(task)
61
62     @retrying.retry(stop_max_delay=10000, wait_fixed=100)
63     def assertion():
64         assert successful_task.states == ['start', 'success']
65         assert failing_task.states == ['start', 'failure']
66         assert task_with_inputs.states == ['start', 'failure']
67         assert isinstance(failing_task.exception, MockException)
68         assert isinstance(task_with_inputs.exception, MockException)
69         assert task_with_inputs.exception.message == expected_value
70     assertion()
71
72
73 def test_thread_execute(thread_executor):
74     execute_and_assert(thread_executor)
75
76
77 def test_process_execute(process_executor, storage):
78     execute_and_assert(process_executor, storage)
79
80
81 def mock_successful_task(**_):
82     pass
83
84
85 def mock_failing_task(**_):
86     raise MockException
87
88
89 def mock_task_with_input(input, **_):
90     raise MockException(input)
91
92 if app:
93     mock_successful_task = app.task(mock_successful_task)
94     mock_failing_task = app.task(mock_failing_task)
95     mock_task_with_input = app.task(mock_task_with_input)
96
97
98 class MockException(Exception):
99     pass
100
101
102 @pytest.fixture
103 def storage(tmpdir):
104     _storage = aria.application_model_storage(aria.storage.sql_mapi.SQLAlchemyModelAPI,
105                                               initiator_kwargs=dict(base_dir=str(tmpdir)))
106     yield _storage
107     tests.storage.release_sqlite_storage(_storage)
108
109
110 @pytest.fixture(params=[
111     (thread.ThreadExecutor, {'pool_size': 1}),
112     (thread.ThreadExecutor, {'pool_size': 2}),
113     # subprocess needs to load a tests module so we explicitly add the root directory as if
114     # the project has been installed in editable mode
115     # (celery.CeleryExecutor, {'app': app})
116 ])
117 def thread_executor(request):
118     executor_cls, executor_kwargs = request.param
119     result = executor_cls(**executor_kwargs)
120     yield result
121     result.close()
122
123
124 @pytest.fixture
125 def process_executor():
126     result = process.ProcessExecutor(python_path=tests.ROOT_DIR)
127     yield result
128     result.close()
129
130
131 @pytest.fixture(autouse=True)
132 def register_signals():
133     def start_handler(task, *args, **kwargs):
134         task.states.append('start')
135
136     def success_handler(task, *args, **kwargs):
137         task.states.append('success')
138
139     def failure_handler(task, exception, *args, **kwargs):
140         task.states.append('failure')
141         task.exception = exception
142
143     events.start_task_signal.connect(start_handler)
144     events.on_success_task_signal.connect(success_handler)
145     events.on_failure_task_signal.connect(failure_handler)
146     yield
147     events.start_task_signal.disconnect(start_handler)
148     events.on_success_task_signal.disconnect(success_handler)
149     events.on_failure_task_signal.disconnect(failure_handler)