vFW and vDNS support added to azure-plugin
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / tests / orchestrator / workflows / executor / test_process_executor.py
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor.py
new file mode 100644 (file)
index 0000000..e050d18
--- /dev/null
@@ -0,0 +1,172 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+import time
+import Queue
+import subprocess
+
+import pytest
+import psutil
+import retrying
+
+import aria
+
+from aria import operation
+from aria.modeling import models
+from aria.orchestrator import events
+from aria.utils.plugin import create as create_plugin
+from aria.orchestrator.workflows.executor import process
+
+import tests.storage
+import tests.resources
+from tests.helpers import FilesystemDataHolder
+from tests.fixtures import (  # pylint: disable=unused-import
+    plugins_dir,
+    plugin_manager,
+)
+from . import MockContext
+
+
+class TestProcessExecutor(object):
+
+    def test_plugin_execution(self, executor, mock_plugin, model, queue):
+        ctx = MockContext(
+            model,
+            task_kwargs=dict(function='mock_plugin1.operation', plugin_fk=mock_plugin.id)
+        )
+
+        executor.execute(ctx)
+        error = queue.get(timeout=60)
+        # tests/resources/plugins/mock-plugin1 is the plugin installed
+        # during this tests setup. The module mock_plugin1 contains a single
+        # operation named "operation" which calls an entry point defined in the plugin's
+        # setup.py. This entry points simply prints 'mock-plugin-output' to stdout.
+        # The "operation" operation that called this subprocess, then raises a RuntimeError
+        # with that subprocess output as the error message.
+        # This is what we assert here. This tests checks that both the PYTHONPATH (operation)
+        # and PATH (entry point) are properly updated in the subprocess in which the task is
+        # running.
+        assert isinstance(error, RuntimeError)
+        assert error.message == 'mock-plugin-output'
+
+    def test_closed(self, executor, model):
+        executor.close()
+        with pytest.raises(RuntimeError) as exc_info:
+            executor.execute(MockContext(model, task_kwargs=dict(function='some.function')))
+        assert 'closed' in exc_info.value.message
+
+    def test_process_termination(self, executor, model, fs_test_holder, tmpdir):
+        freeze_script_path = str(tmpdir.join('freeze_script'))
+        with open(freeze_script_path, 'w+b') as f:
+            f.write(
+                '''import time
+while True:
+    time.sleep(5)
+                '''
+            )
+        holder_path_argument = models.Argument.wrap('holder_path', fs_test_holder._path)
+        script_path_argument = models.Argument.wrap('freezing_script_path',
+                                                    str(tmpdir.join('freeze_script')))
+
+        model.argument.put(holder_path_argument)
+        model.argument.put(script_path_argument)
+        ctx = MockContext(
+            model,
+            task_kwargs=dict(
+                function='{0}.{1}'.format(__name__, freezing_task.__name__),
+                arguments=dict(holder_path=holder_path_argument,
+                               freezing_script_path=script_path_argument)),
+        )
+
+        executor.execute(ctx)
+
+        @retrying.retry(retry_on_result=lambda r: r is False, stop_max_delay=60000, wait_fixed=500)
+        def wait_for_extra_process_id():
+            return fs_test_holder.get('subproc', False)
+
+        task_pid = executor._tasks[ctx.task.id].proc.pid
+        extra_process_pid = wait_for_extra_process_id()
+
+        assert set([task_pid, extra_process_pid]).issubset(set(psutil.pids()))
+        executor.terminate(ctx.task.id)
+
+        # Give a chance to the processes to terminate
+        time.sleep(2)
+
+        # all processes should be either zombies or non existent
+        pids = [task_pid, extra_process_pid]
+        for pid in pids:
+            if pid in psutil.pids():
+                assert psutil.Process(pid).status() == psutil.STATUS_ZOMBIE
+            else:
+                # making the test more readable
+                assert pid not in psutil.pids()
+
+
+@pytest.fixture
+def queue():
+    _queue = Queue.Queue()
+
+    def handler(_, exception=None, **kwargs):
+        _queue.put(exception)
+
+    events.on_success_task_signal.connect(handler)
+    events.on_failure_task_signal.connect(handler)
+    try:
+        yield _queue
+    finally:
+        events.on_success_task_signal.disconnect(handler)
+        events.on_failure_task_signal.disconnect(handler)
+
+
+@pytest.fixture
+def fs_test_holder(tmpdir):
+    dataholder_path = str(tmpdir.join('dataholder'))
+    holder = FilesystemDataHolder(dataholder_path)
+    return holder
+
+
+@pytest.fixture
+def executor(plugin_manager):
+    result = process.ProcessExecutor(plugin_manager=plugin_manager, python_path=[tests.ROOT_DIR])
+    try:
+        yield result
+    finally:
+        result.close()
+
+
+@pytest.fixture
+def mock_plugin(plugin_manager, tmpdir):
+    source = os.path.join(tests.resources.DIR, 'plugins', 'mock-plugin1')
+    plugin_path = create_plugin(source=source, destination_dir=str(tmpdir))
+    return plugin_manager.install(source=plugin_path)
+
+
+@pytest.fixture
+def model(tmpdir):
+    _storage = aria.application_model_storage(aria.storage.sql_mapi.SQLAlchemyModelAPI,
+                                              initiator_kwargs=dict(base_dir=str(tmpdir)))
+    yield _storage
+    tests.storage.release_sqlite_storage(_storage)
+
+
+@operation
+def freezing_task(holder_path, freezing_script_path, **_):
+    holder = FilesystemDataHolder(holder_path)
+    holder['subproc'] = subprocess.Popen([sys.executable, freezing_script_path], shell=True).pid
+    while True:
+        time.sleep(5)