vFW and vDNS support added to azure-plugin
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / tests / orchestrator / workflows / executor / test_process_executor.py
1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements.  See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License.  You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import os
17 import sys
18 import time
19 import Queue
20 import subprocess
21
22 import pytest
23 import psutil
24 import retrying
25
26 import aria
27
28 from aria import operation
29 from aria.modeling import models
30 from aria.orchestrator import events
31 from aria.utils.plugin import create as create_plugin
32 from aria.orchestrator.workflows.executor import process
33
34 import tests.storage
35 import tests.resources
36 from tests.helpers import FilesystemDataHolder
37 from tests.fixtures import (  # pylint: disable=unused-import
38     plugins_dir,
39     plugin_manager,
40 )
41 from . import MockContext
42
43
44 class TestProcessExecutor(object):
45
46     def test_plugin_execution(self, executor, mock_plugin, model, queue):
47         ctx = MockContext(
48             model,
49             task_kwargs=dict(function='mock_plugin1.operation', plugin_fk=mock_plugin.id)
50         )
51
52         executor.execute(ctx)
53         error = queue.get(timeout=60)
54         # tests/resources/plugins/mock-plugin1 is the plugin installed
55         # during this tests setup. The module mock_plugin1 contains a single
56         # operation named "operation" which calls an entry point defined in the plugin's
57         # setup.py. This entry points simply prints 'mock-plugin-output' to stdout.
58         # The "operation" operation that called this subprocess, then raises a RuntimeError
59         # with that subprocess output as the error message.
60         # This is what we assert here. This tests checks that both the PYTHONPATH (operation)
61         # and PATH (entry point) are properly updated in the subprocess in which the task is
62         # running.
63         assert isinstance(error, RuntimeError)
64         assert error.message == 'mock-plugin-output'
65
66     def test_closed(self, executor, model):
67         executor.close()
68         with pytest.raises(RuntimeError) as exc_info:
69             executor.execute(MockContext(model, task_kwargs=dict(function='some.function')))
70         assert 'closed' in exc_info.value.message
71
72     def test_process_termination(self, executor, model, fs_test_holder, tmpdir):
73         freeze_script_path = str(tmpdir.join('freeze_script'))
74         with open(freeze_script_path, 'w+b') as f:
75             f.write(
76                 '''import time
77 while True:
78     time.sleep(5)
79                 '''
80             )
81         holder_path_argument = models.Argument.wrap('holder_path', fs_test_holder._path)
82         script_path_argument = models.Argument.wrap('freezing_script_path',
83                                                     str(tmpdir.join('freeze_script')))
84
85         model.argument.put(holder_path_argument)
86         model.argument.put(script_path_argument)
87         ctx = MockContext(
88             model,
89             task_kwargs=dict(
90                 function='{0}.{1}'.format(__name__, freezing_task.__name__),
91                 arguments=dict(holder_path=holder_path_argument,
92                                freezing_script_path=script_path_argument)),
93         )
94
95         executor.execute(ctx)
96
97         @retrying.retry(retry_on_result=lambda r: r is False, stop_max_delay=60000, wait_fixed=500)
98         def wait_for_extra_process_id():
99             return fs_test_holder.get('subproc', False)
100
101         task_pid = executor._tasks[ctx.task.id].proc.pid
102         extra_process_pid = wait_for_extra_process_id()
103
104         assert set([task_pid, extra_process_pid]).issubset(set(psutil.pids()))
105         executor.terminate(ctx.task.id)
106
107         # Give a chance to the processes to terminate
108         time.sleep(2)
109
110         # all processes should be either zombies or non existent
111         pids = [task_pid, extra_process_pid]
112         for pid in pids:
113             if pid in psutil.pids():
114                 assert psutil.Process(pid).status() == psutil.STATUS_ZOMBIE
115             else:
116                 # making the test more readable
117                 assert pid not in psutil.pids()
118
119
120 @pytest.fixture
121 def queue():
122     _queue = Queue.Queue()
123
124     def handler(_, exception=None, **kwargs):
125         _queue.put(exception)
126
127     events.on_success_task_signal.connect(handler)
128     events.on_failure_task_signal.connect(handler)
129     try:
130         yield _queue
131     finally:
132         events.on_success_task_signal.disconnect(handler)
133         events.on_failure_task_signal.disconnect(handler)
134
135
136 @pytest.fixture
137 def fs_test_holder(tmpdir):
138     dataholder_path = str(tmpdir.join('dataholder'))
139     holder = FilesystemDataHolder(dataholder_path)
140     return holder
141
142
143 @pytest.fixture
144 def executor(plugin_manager):
145     result = process.ProcessExecutor(plugin_manager=plugin_manager, python_path=[tests.ROOT_DIR])
146     try:
147         yield result
148     finally:
149         result.close()
150
151
152 @pytest.fixture
153 def mock_plugin(plugin_manager, tmpdir):
154     source = os.path.join(tests.resources.DIR, 'plugins', 'mock-plugin1')
155     plugin_path = create_plugin(source=source, destination_dir=str(tmpdir))
156     return plugin_manager.install(source=plugin_path)
157
158
159 @pytest.fixture
160 def model(tmpdir):
161     _storage = aria.application_model_storage(aria.storage.sql_mapi.SQLAlchemyModelAPI,
162                                               initiator_kwargs=dict(base_dir=str(tmpdir)))
163     yield _storage
164     tests.storage.release_sqlite_storage(_storage)
165
166
167 @operation
168 def freezing_task(holder_path, freezing_script_path, **_):
169     holder = FilesystemDataHolder(holder_path)
170     holder['subproc'] = subprocess.Popen([sys.executable, freezing_script_path], shell=True).pid
171     while True:
172         time.sleep(5)