vFW and vDNS support added to azure-plugin
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / tests / orchestrator / execution_plugin / test_local.py
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/execution_plugin/test_local.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/execution_plugin/test_local.py
new file mode 100644 (file)
index 0000000..7f33318
--- /dev/null
@@ -0,0 +1,598 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import os
+
+import pytest
+
+from aria import workflow
+from aria.orchestrator import events
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.exceptions import ExecutorException
+from aria.orchestrator.exceptions import TaskAbortException, TaskRetryException
+from aria.orchestrator.execution_plugin import operations
+from aria.orchestrator.execution_plugin.exceptions import ProcessException
+from aria.orchestrator.execution_plugin import local
+from aria.orchestrator.execution_plugin import constants
+from aria.orchestrator.workflows.executor import process
+from aria.orchestrator.workflows.core import engine, graph_compiler
+
+from tests import mock
+from tests import storage
+from tests.orchestrator.workflows.helpers import events_collector
+
+IS_WINDOWS = os.name == 'nt'
+
+
+class TestLocalRunScript(object):
+
+    def test_script_path_parameter(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx node attributes map key = value
+            ''',
+            windows_script='''
+            ctx node attributes map key = value
+        ''')
+        props = self._run(
+            executor, workflow_context,
+            script_path=script_path)
+        assert props['map'].value['key'] == 'value'
+
+    def test_process_env(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx node attributes map key1 = "$key1"
+            ctx node attributes map key2 = "$key2"
+            ''',
+            windows_script='''
+            ctx node attributes map key1 = %key1%
+            ctx node attributes map key2 = %key2%
+        ''')
+        props = self._run(
+            executor, workflow_context,
+            script_path=script_path,
+            process={
+                'env': {
+                    'key1': 'value1',
+                    'key2': 'value2'
+                }
+            })
+        p_map = props['map'].value
+        assert p_map['key1'] == 'value1'
+        assert p_map['key2'] == 'value2'
+
+    def test_process_cwd(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx node attributes map cwd = "$PWD"
+            ''',
+            windows_script='''
+            ctx node attributes map cwd = %CD%
+            ''')
+        tmpdir = str(tmpdir)
+        props = self._run(
+            executor, workflow_context,
+            script_path=script_path,
+            process={
+                'cwd': tmpdir
+            })
+        p_map = props['map'].value
+        assert p_map['cwd'] == tmpdir
+
+    def test_process_command_prefix(self, executor, workflow_context, tmpdir):
+        use_ctx = 'ctx node attributes map key = value'
+        python_script = ['import subprocess',
+                         'subprocess.Popen("{0}".split(' ')).communicate()[0]'.format(use_ctx)]
+        python_script = '\n'.join(python_script)
+        script_path = self._create_script(
+            tmpdir,
+            linux_script=python_script,
+            windows_script=python_script,
+            windows_suffix='',
+            linux_suffix='')
+        props = self._run(
+            executor, workflow_context,
+            script_path=script_path,
+            process={
+                'env': {'TEST_KEY': 'value'},
+                'command_prefix': 'python'
+            })
+        p_map = props['map'].value
+        assert p_map['key'] == 'value'
+
+    def test_process_args(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx node attributes map arg1 = "$1"
+            ctx node attributes map arg2 = "$2"
+            ''',
+            windows_script='''
+            ctx node attributes map arg1 = %1
+            ctx node attributes map arg2 = %2
+            ''')
+        props = self._run(
+            executor, workflow_context,
+            script_path=script_path,
+            process={
+                'args': ['"arg with spaces"', 'arg2']
+            })
+        assert props['map'].value['arg1'] == 'arg with spaces'
+        assert props['map'].value['arg2'] == 'arg2'
+
+    def test_no_script_path(self, executor, workflow_context):
+        exception = self._run_and_get_task_exception(
+            executor, workflow_context,
+            script_path=None)
+        assert isinstance(exception, TaskAbortException)
+        assert 'script_path' in exception.message
+
+    def test_script_error(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            echo 123123
+            command_that_does_not_exist [ ]
+            ''',
+            windows_script='''
+            @echo off
+            echo 123123
+            command_that_does_not_exist [ ]
+            ''')
+        exception = self._run_and_get_task_exception(
+            executor, workflow_context,
+            script_path=script_path)
+        assert isinstance(exception, ProcessException)
+        assert os.path.basename(script_path) in exception.command
+        assert exception.exit_code == 1 if IS_WINDOWS else 127
+        assert exception.stdout.strip() == '123123'
+        assert 'command_that_does_not_exist' in exception.stderr
+
+    def test_script_error_from_bad_ctx_request(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx property_that_does_not_exist
+            ''',
+            windows_script='''
+            ctx property_that_does_not_exist
+            ''')
+        exception = self._run_and_get_task_exception(
+            executor, workflow_context,
+            script_path=script_path)
+        assert isinstance(exception, ProcessException)
+        assert os.path.basename(script_path) in exception.command
+        assert exception.exit_code == 1
+        assert 'RequestError' in exception.stderr
+        assert 'property_that_does_not_exist' in exception.stderr
+
+    def test_python_script(self, executor, workflow_context, tmpdir):
+        script = '''
+from aria.orchestrator.execution_plugin import ctx, inputs
+if __name__ == '__main__':
+    ctx.node.attributes['key'] = inputs['key']
+'''
+        suffix = '.py'
+        script_path = self._create_script(
+            tmpdir,
+            linux_script=script,
+            windows_script=script,
+            linux_suffix=suffix,
+            windows_suffix=suffix)
+        props = self._run(
+            executor, workflow_context,
+            script_path=script_path,
+            arguments={'key': 'value'})
+        assert props['key'].value == 'value'
+
+    @pytest.mark.parametrize(
+        'value', ['string-value', [1, 2, 3], 999, 3.14, False,
+                  {'complex1': {'complex2': {'key': 'value'}, 'list': [1, 2, 3]}}])
+    def test_inputs_as_environment_variables(self, executor, workflow_context, tmpdir, value):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx node attributes key = "${input_as_env_var}"
+            ''',
+            windows_script='''
+            ctx node attributes key = "%input_as_env_var%"
+        ''')
+        props = self._run(
+            executor, workflow_context,
+            script_path=script_path,
+            env_var=value)
+        value = props['key'].value
+        expected = value if isinstance(value, basestring) else json.loads(value)
+        assert expected == value
+
+    @pytest.mark.parametrize('value', ['override', {'key': 'value'}])
+    def test_explicit_env_variables_inputs_override(
+            self, executor, workflow_context, tmpdir, value):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx node attributes key = "${input_as_env_var}"
+            ''',
+            windows_script='''
+            ctx node attributes key = "%input_as_env_var%"
+        ''')
+
+        props = self._run(
+            executor, workflow_context,
+            script_path=script_path,
+            env_var='test-value',
+            process={
+                'env': {
+                    'input_as_env_var': value
+                }
+            })
+        value = props['key'].value
+        expected = value if isinstance(value, basestring) else json.loads(value)
+        assert expected == value
+
+    def test_get_nonexistent_runtime_property(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx node attributes nonexistent
+            ''',
+            windows_script='''
+            ctx node attributes nonexistent
+        ''')
+        exception = self._run_and_get_task_exception(
+            executor, workflow_context,
+            script_path=script_path)
+        assert isinstance(exception, ProcessException)
+        assert os.path.basename(script_path) in exception.command
+        assert 'RequestError' in exception.stderr
+        assert 'nonexistent' in exception.stderr
+
+    def test_get_nonexistent_runtime_property_json(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx -j node attributes nonexistent
+            ''',
+            windows_script='''
+            ctx -j node attributes nonexistent
+            ''')
+        exception = self._run_and_get_task_exception(
+            executor, workflow_context,
+            script_path=script_path)
+        assert isinstance(exception, ProcessException)
+        assert os.path.basename(script_path) in exception.command
+        assert 'RequestError' in exception.stderr
+        assert 'nonexistent' in exception.stderr
+
+    def test_abort(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx task abort [ abort-message ]
+            ''',
+            windows_script='''
+            ctx task abort [ abort-message ]
+            ''')
+        exception = self._run_and_get_task_exception(
+            executor, workflow_context,
+            script_path=script_path)
+        assert isinstance(exception, TaskAbortException)
+        assert exception.message == 'abort-message'
+
+    def test_retry(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx task retry [ retry-message ]
+            ''',
+            windows_script='''
+            ctx task retry [ retry-message ]
+            ''')
+        exception = self._run_and_get_task_exception(
+            executor, workflow_context,
+            script_path=script_path)
+        assert isinstance(exception, TaskRetryException)
+        assert exception.message == 'retry-message'
+
+    def test_retry_with_interval(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx task retry [ retry-message @100 ]
+            ''',
+            windows_script='''
+            ctx task retry [ retry-message @100 ]
+            ''')
+        exception = self._run_and_get_task_exception(
+            executor, workflow_context,
+            script_path=script_path)
+        assert isinstance(exception, TaskRetryException)
+        assert exception.message == 'retry-message'
+        assert exception.retry_interval == 100
+
+    def test_crash_abort_after_retry(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash
+            ctx task retry [ retry-message ]
+            ctx task abort [ should-raise-a-runtime-error ]
+            ''',
+            windows_script='''
+            ctx task retry [ retry-message ]
+            ctx task abort [ should-raise-a-runtime-error ]
+        ''')
+        exception = self._run_and_get_task_exception(
+            executor, workflow_context,
+            script_path=script_path)
+        assert isinstance(exception, TaskAbortException)
+        assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE
+
+    def test_crash_retry_after_abort(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash
+            ctx task abort [ abort-message ]
+            ctx task retry [ should-raise-a-runtime-error ]
+            ''',
+            windows_script='''
+            ctx task abort [ abort-message ]
+            ctx task retry [ should-raise-a-runtime-error ]
+            ''')
+        exception = self._run_and_get_task_exception(
+            executor, workflow_context,
+            script_path=script_path)
+        assert isinstance(exception, TaskAbortException)
+        assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE
+
+    def test_crash_abort_after_abort(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash
+            ctx task abort [ abort-message ]
+            ctx task abort [ should-raise-a-runtime-error ]
+            ''',
+            windows_script='''
+            ctx task abort [ abort-message ]
+            ctx task abort [ should-raise-a-runtime-error ]
+            ''')
+        exception = self._run_and_get_task_exception(
+            executor, workflow_context,
+            script_path=script_path)
+        assert isinstance(exception, TaskAbortException)
+        assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE
+
+    def test_crash_retry_after_retry(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash
+            ctx task retry [ retry-message ]
+            ctx task retry [ should-raise-a-runtime-error ]
+            ''',
+            windows_script='''
+            ctx task retry [ retry-message ]
+            ctx task retry [ should-raise-a-runtime-error ]
+            ''')
+        exception = self._run_and_get_task_exception(
+            executor, workflow_context,
+            script_path=script_path)
+        assert isinstance(exception, TaskAbortException)
+        assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE
+
+    def test_retry_returns_a_nonzero_exit_code(self, executor, workflow_context, tmpdir):
+        log_path = tmpdir.join('temp.log')
+        message = 'message'
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx task retry [ "{0}" ] 2> {1}
+            echo should-not-run > {1}
+            '''.format(message, log_path),
+            windows_script='''
+            ctx task retry [ "{0}" ] 2> {1}
+            if %errorlevel% neq 0 exit /b %errorlevel%
+            echo should-not-run > {1}
+            '''.format(message, log_path))
+        with pytest.raises(ExecutorException):
+            self._run(
+                executor, workflow_context,
+                script_path=script_path)
+        assert log_path.read().strip() == message
+
+    def test_abort_returns_a_nonzero_exit_code(self, executor, workflow_context, tmpdir):
+        log_path = tmpdir.join('temp.log')
+        message = 'message'
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx task abort [ "{0}" ] 2> {1}
+            echo should-not-run > {1}
+            '''.format(message, log_path),
+            windows_script='''
+            ctx task abort [ "{0}" ] 2> {1}
+            if %errorlevel% neq 0 exit /b %errorlevel%
+            echo should-not-run > {1}
+            '''.format(message, log_path))
+        with pytest.raises(ExecutorException):
+            self._run(
+                executor, workflow_context,
+                script_path=script_path)
+        assert log_path.read().strip() == message
+
+    def _create_script(self,
+                       tmpdir,
+                       linux_script,
+                       windows_script,
+                       windows_suffix='.bat',
+                       linux_suffix=''):
+        suffix = windows_suffix if IS_WINDOWS else linux_suffix
+        script = windows_script if IS_WINDOWS else linux_script
+        script_path = tmpdir.join('script{0}'.format(suffix))
+        script_path.write(script)
+        return str(script_path)
+
+    def _run_and_get_task_exception(self, *args, **kwargs):
+        signal = events.on_failure_task_signal
+        with events_collector(signal) as collected:
+            with pytest.raises(ExecutorException):
+                self._run(*args, **kwargs)
+        return collected[signal][0]['kwargs']['exception']
+
+    def _run(self,
+             executor,
+             workflow_context,
+             script_path,
+             process=None,
+             env_var='value',
+             arguments=None):
+        local_script_path = script_path
+        script_path = os.path.basename(local_script_path) if local_script_path else ''
+        arguments = arguments or {}
+        process = process or {}
+        if script_path:
+            workflow_context.resource.service.upload(
+                entry_id=str(workflow_context.service.id),
+                source=local_script_path,
+                path=script_path)
+
+        arguments.update({
+            'script_path': script_path,
+            'process': process,
+            'input_as_env_var': env_var
+        })
+
+        node = workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+        interface = mock.models.create_interface(
+            node.service,
+            'test',
+            'op',
+            operation_kwargs=dict(
+                function='{0}.{1}'.format(
+                    operations.__name__,
+                    operations.run_script_locally.__name__),
+                arguments=arguments)
+        )
+        node.interfaces[interface.name] = interface
+        workflow_context.model.node.update(node)
+
+        @workflow
+        def mock_workflow(ctx, graph):
+            graph.add_tasks(api.task.OperationTask(
+                node,
+                interface_name='test',
+                operation_name='op',
+                arguments=arguments))
+            return graph
+        tasks_graph = mock_workflow(ctx=workflow_context)  # pylint: disable=no-value-for-parameter
+        graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(tasks_graph)
+        eng = engine.Engine({executor.__class__: executor})
+        eng.execute(workflow_context)
+        return workflow_context.model.node.get_by_name(
+            mock.models.DEPENDENCY_NODE_NAME).attributes
+
+    @pytest.fixture
+    def executor(self):
+        result = process.ProcessExecutor()
+        try:
+            yield result
+        finally:
+            result.close()
+
+    @pytest.fixture
+    def workflow_context(self, tmpdir):
+        workflow_context = mock.context.simple(str(tmpdir), inmemory=False)
+        workflow_context.states = []
+        workflow_context.exception = None
+        yield workflow_context
+        storage.release_sqlite_storage(workflow_context.model)
+
+
+class BaseTestConfiguration(object):
+
+    @pytest.fixture(autouse=True)
+    def mock_execute(self, mocker):
+        def eval_func(**_):
+            self.called = 'eval'
+
+        def execute_func(process, **_):
+            self.process = process
+            self.called = 'execute'
+        self.process = {}
+        self.called = None
+        mocker.patch.object(local, '_execute_func', execute_func)
+        mocker.patch.object(local, '_eval_script_func', eval_func)
+
+    class Ctx(object):
+        @staticmethod
+        def download_resource(destination, *args, **kwargs):
+            return destination
+
+    def _run(self, script_path, process=None):
+        local.run_script(
+            script_path=script_path,
+            process=process,
+            ctx=self.Ctx)
+
+
+class TestPowerShellConfiguration(BaseTestConfiguration):
+
+    def test_implicit_powershell_call_with_ps1_extension(self):
+        self._run(script_path='script_path.ps1')
+        assert self.process['command_prefix'] == 'powershell'
+
+    def test_command_prefix_is_overridden_for_ps1_extension(self):
+        self._run(script_path='script_path.ps1',
+                  process={'command_prefix': 'bash'})
+        assert self.process['command_prefix'] == 'bash'
+
+    def test_explicit_powershell_call(self):
+        self._run(script_path='script_path.ps1',
+                  process={'command_prefix': 'powershell'})
+        assert self.process['command_prefix'] == 'powershell'
+
+
+class TestEvalPythonConfiguration(BaseTestConfiguration):
+
+    def test_explicit_eval_without_py_extension(self):
+        self._run(script_path='script_path',
+                  process={'eval_python': True})
+        assert self.called == 'eval'
+
+    def test_explicit_eval_with_py_extension(self):
+        self._run(script_path='script_path.py',
+                  process={'eval_python': True})
+        assert self.called == 'eval'
+
+    def test_implicit_eval(self):
+        self._run(script_path='script_path.py')
+        assert self.called == 'eval'
+
+    def test_explicit_execute_without_py_extension(self):
+        self._run(script_path='script_path',
+                  process={'eval_python': False})
+        assert self.called == 'execute'
+
+    def test_explicit_execute_with_py_extension(self):
+        self._run(script_path='script_path.py',
+                  process={'eval_python': False})
+        assert self.called == 'execute'
+
+    def test_implicit_execute(self):
+        self._run(script_path='script_path')
+        assert self.called == 'execute'