Merge "vFW and vDNS support added to azure-plugin"
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / aria_extension_tests / adapters / test_context_adapter.py
diff --git a/azure/aria/aria-extension-cloudify/aria_extension_tests/adapters/test_context_adapter.py b/azure/aria/aria-extension-cloudify/aria_extension_tests/adapters/test_context_adapter.py
new file mode 100644 (file)
index 0000000..267f211
--- /dev/null
@@ -0,0 +1,541 @@
+#
+# Copyright (c) 2017 GigaSpaces Technologies Ltd. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import copy
+import datetime
+import contextlib
+
+import pytest
+
+from aria import (workflow, operation)
+from aria.modeling import models
+from aria.orchestrator import events
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.exceptions import ExecutorException
+from aria.orchestrator.workflows.executor import process
+from aria.orchestrator.workflows.core import (engine, graph_compiler)
+from aria.orchestrator.exceptions import (TaskAbortException, TaskRetryException)
+from aria.utils import type as type_
+
+import tests
+from tests import (mock, storage, conftest)
+from tests.orchestrator.workflows.helpers import events_collector
+
+from adapters import context_adapter
+
+
+@pytest.fixture(autouse=True)
+def cleanup_logger(request):
+    conftest.logging_handler_cleanup(request)
+
+
+class TestCloudifyContextAdapter(object):
+
+    def test_node_instance_operation(self, executor, workflow_context):
+        node_template = self._get_node_template(workflow_context)
+        node_type = 'aria.plugin.nodes.App'
+        node_instance_property = models.Property.wrap('hello', 'world')
+        node_template.type = models.Type(variant='variant', name=node_type)
+        node = self._get_node(workflow_context)
+        node_instance_attribute = models.Attribute.wrap('hello2', 'world2')
+        node.attributes[node_instance_attribute.name] = node_instance_attribute
+        node.properties[node_instance_property.name] = node_instance_property
+        workflow_context.model.node.update(node)
+        workflow_context.model.node_template.update(node_template)
+
+        out = self._run(executor, workflow_context, _test_node_instance_operation)
+
+        node_template = self._get_node_template(workflow_context)
+        node = self._get_node(workflow_context)
+        assert out['type'] == context_adapter.NODE_INSTANCE
+        assert out['node']['id'] == node_template.id
+        assert out['node']['name'] == node_template.name
+        assert out['node']['properties'] == \
+               {node_instance_property.name: node_instance_property.value}
+        assert out['node']['type'] == node_type
+        assert out['node']['type_hierarchy'] == ['cloudify.plugin.nodes.App']
+        assert out['instance']['id'] == node.id
+        assert out['instance']['runtime_properties'] == \
+               {node_instance_attribute.name: node_instance_attribute.value}
+        assert not out['source']
+        assert not out['target']
+
+    def test_node_instance_relationships(self, executor, workflow_context):
+        relationship_node_template = self._get_dependency_node_template(workflow_context)
+        relationship_node_instance = self._get_dependency_node(workflow_context)
+        relationship = relationship_node_instance.inbound_relationships[0]
+        relationship_type = models.Type(variant='variant', name='test.relationships.Relationship')
+        relationship.type = relationship_type
+        workflow_context.model.relationship.update(relationship)
+
+        out = self._run(executor, workflow_context, _test_node_instance_relationships)
+
+        assert len(out['instance']['relationships']) == 1
+        relationship = out['instance']['relationships'][0]
+        assert relationship['type'] == relationship_type.name
+        assert relationship['type_hierarchy'] == [relationship_type.name]
+        assert relationship['target']['node']['id'] == relationship_node_template.id
+        assert relationship['target']['instance']['id'] == relationship_node_instance.id
+
+    def test_source_operation(self, executor, workflow_context):
+        self._test_relationship_operation(executor, workflow_context, operation_end='source')
+
+    def test_target_operation(self, executor, workflow_context):
+        self._test_relationship_operation(executor, workflow_context, operation_end='target')
+
+    def _test_relationship_operation(self, executor, workflow_context, operation_end):
+        out = self._run(
+            executor, workflow_context, _test_relationship_operation, operation_end=operation_end)
+
+        source_node = self._get_node_template(workflow_context)
+        source_node_instance = self._get_node(workflow_context)
+        target_node = self._get_dependency_node_template(workflow_context)
+        target_node_instance = self._get_dependency_node(workflow_context)
+        assert out['type'] == context_adapter.RELATIONSHIP_INSTANCE
+        assert out['source']['node']['id'] == source_node.id
+        assert out['source']['instance']['id'] == source_node_instance.id
+        assert out['target']['node']['id'] == target_node.id
+        assert out['target']['instance']['id'] == target_node_instance.id
+        assert not out['node']
+        assert not out['instance']
+
+    def test_host_ip(self, executor, workflow_context):
+        node = self._get_node_template(workflow_context)
+        node.type_hierarchy = ['aria.nodes.Compute']
+        node_instance = self._get_node(workflow_context)
+        node_instance.host_fk = node_instance.id
+        node_instance_ip = '120.120.120.120'
+        node_instance.attributes['ip'] = models.Attribute.wrap('ip', node_instance_ip)
+        workflow_context.model.node_template.update(node)
+        workflow_context.model.node.update(node_instance)
+
+        out = self._run(executor, workflow_context, _test_host_ip)
+
+        assert out['instance']['host_ip'] == node_instance_ip
+
+    def test_get_and_download_resource_and_render(self, tmpdir, executor, workflow_context):
+        resource_path = 'resource'
+        variable = 'VALUE'
+        content = '{{ctx.service.name}}-{{variable}}'
+        rendered = '{0}-{1}'.format(workflow_context.service.name, variable)
+        source = tmpdir.join(resource_path)
+        source.write(content)
+        workflow_context.resource.service.upload(
+            entry_id=str(workflow_context.service.id),
+            source=str(source),
+            path=resource_path)
+
+        out = self._run(executor, workflow_context, _test_get_and_download_resource_and_render,
+                        inputs={'resource': resource_path,
+                                'variable': variable})
+
+        assert out['get_resource'] == content
+        assert out['get_resource_and_render'] == rendered
+        with open(out['download_resource'], 'rb') as f:
+            assert f.read() == content
+        with open(out['download_resource_and_render'], 'rb') as f:
+            assert f.read() == rendered
+
+        os.remove(out['download_resource'])
+        os.remove(out['download_resource_and_render'])
+
+    def test_retry(self, executor, workflow_context):
+        message = 'retry-message'
+        retry_interval = 0.01
+
+        exception = self._run_and_get_task_exceptions(
+            executor, workflow_context, _test_retry,
+            inputs={'message': message, 'retry_interval': retry_interval},
+            max_attempts=2
+        )[-1]
+
+        assert isinstance(exception, TaskRetryException)
+        assert exception.message == message
+        assert exception.retry_interval == retry_interval
+
+        out = self._get_node(workflow_context).attributes['out'].value
+        assert out['operation']['retry_number'] == 1
+        assert out['operation']['max_retries'] == 1
+
+    def test_logger_and_send_event(self, executor, workflow_context):
+        # TODO: add assertions of output once process executor output can be captured
+        message = 'logger-message'
+        event = 'event-message'
+        self._run(executor, workflow_context, _test_logger_and_send_event,
+                  inputs={'message': message, 'event': event})
+
+    def test_plugin(self, executor, workflow_context, tmpdir):
+        plugin = self._put_plugin(workflow_context)
+        out = self._run(executor, workflow_context, _test_plugin, plugin=plugin)
+
+        expected_workdir = tmpdir.join(
+            'workdir', 'plugins', str(workflow_context.service.id), plugin.name)
+        assert out['plugin']['name'] == plugin.name
+        assert out['plugin']['package_name'] == plugin.package_name
+        assert out['plugin']['package_version'] == plugin.package_version
+        assert out['plugin']['workdir'] == str(expected_workdir)
+
+    def test_importable_ctx_and_inputs(self, executor, workflow_context):
+        test_inputs = {'input1': 1, 'input2': 2}
+        plugin = self._put_plugin(workflow_context, mock_cfy_plugin=True)
+
+        out = self._run(executor, workflow_context, _test_importable_ctx_and_inputs,
+                        inputs=test_inputs,
+                        skip_common_assert=True,
+                        plugin=plugin)
+        assert out['inputs'] == test_inputs
+
+    def test_non_recoverable_error(self, executor, workflow_context):
+        message = 'NON_RECOVERABLE_MESSAGE'
+        plugin = self._put_plugin(workflow_context, mock_cfy_plugin=True)
+
+        exception = self._run_and_get_task_exceptions(
+            executor, workflow_context, _test_non_recoverable_error,
+            inputs={'message': message},
+            skip_common_assert=True,
+            plugin=plugin
+        )[0]
+        assert isinstance(exception, TaskAbortException)
+        assert exception.message == message
+
+    def test_recoverable_error(self, executor, workflow_context):
+        message = 'RECOVERABLE_MESSAGE'
+        plugin = self._put_plugin(workflow_context, mock_cfy_plugin=True)
+
+        retry_interval = 0.01
+        exception = self._run_and_get_task_exceptions(
+            executor, workflow_context, _test_recoverable_error,
+            inputs={'message': message, 'retry_interval': retry_interval},
+            skip_common_assert=True,
+            plugin=plugin
+        )[0]
+        assert isinstance(exception, TaskRetryException)
+        assert message in exception.message
+        assert exception.retry_interval == retry_interval
+
+    def _test_common(self, out, workflow_context):
+        assert out['execution_id'] == workflow_context.execution.id
+        assert out['workflow_id'] == workflow_context.execution.workflow_name
+        assert out['rest_token'] is None
+        assert out['task_id'][0] == out['task_id'][1]
+        assert out['task_name'][0] == out['task_name'][1]
+        assert out['task_target'] is None
+        assert out['task_queue'] is None
+        assert out['provider_context'] == {}
+        assert out['blueprint']['id'] == workflow_context.service_template.id
+        assert out['deployment']['id'] == workflow_context.service.id
+        assert out['operation']['name'][0] == out['operation']['name'][1]
+        assert out['operation']['retry_number'][0] == out['operation']['retry_number'][1]
+        assert out['operation']['max_retries'][0] == out['operation']['max_retries'][1] - 1
+        assert out['bootstrap_context']['resources_prefix'] == ''
+        assert out['bootstrap_context']['broker_config'] == {}
+        assert out['bootstrap_context']['cloudify_agent']['any'] is None
+        assert out['agent']['init_script'] is None
+
+    def _run(self,
+             executor,
+             workflow_context,
+             func,
+             inputs=None,
+             max_attempts=None,
+             skip_common_assert=False,
+             operation_end=None,
+             plugin=None):
+        interface_name = 'test'
+        operation_name = 'op'
+        op_dict = {'function': '{0}.{1}'.format(__name__, func.__name__),
+                   'plugin': plugin,
+                   'arguments': inputs or {}}
+        node = self._get_node(workflow_context)
+
+        if operation_end:
+            actor = relationship = node.outbound_relationships[0]
+            relationship.interfaces[interface_name] = mock.models.create_interface(
+                relationship.source_node.service,
+                interface_name,
+                operation_name,
+                operation_kwargs=op_dict
+            )
+            workflow_context.model.relationship.update(relationship)
+
+        else:
+            actor = node
+            node.interfaces[interface_name] = mock.models.create_interface(
+                node.service,
+                interface_name,
+                operation_name,
+                operation_kwargs=op_dict
+            )
+            workflow_context.model.node.update(node)
+
+        if inputs:
+            operation_inputs = \
+                actor.interfaces[interface_name].operations[operation_name].inputs
+            for input_name, input in inputs.iteritems():
+                operation_inputs[input_name] = models.Input(name=input_name,
+                                                            type_name=type_.full_type_name(input))
+
+        @workflow
+        def mock_workflow(graph, **kwargs):
+            task = api.task.OperationTask(
+                actor,
+                interface_name,
+                operation_name,
+                arguments=inputs or {},
+                max_attempts=max_attempts
+            )
+            graph.add_tasks(task)
+
+        tasks_graph = mock_workflow(ctx=workflow_context)
+        graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(tasks_graph)
+        eng = engine.Engine(executors={executor.__class__: executor})
+        eng.execute(workflow_context)
+        out = self._get_node(workflow_context).attributes['out'].value
+        if not skip_common_assert:
+            self._test_common(out, workflow_context)
+        return out
+
+    def _get_dependency_node_template(self, workflow_context):
+        return workflow_context.model.node_template.get_by_name(
+            mock.models.DEPENDENCY_NODE_TEMPLATE_NAME)
+
+    def _get_dependency_node(self, workflow_context):
+        return workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+
+    def _get_node_template(self, workflow_context):
+        return workflow_context.model.node_template.get_by_name(
+            mock.models.DEPENDENT_NODE_TEMPLATE_NAME)
+
+    def _get_node(self, workflow_context):
+        return workflow_context.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
+
+    def _run_and_get_task_exceptions(self, *args, **kwargs):
+        signal = events.on_failure_task_signal
+        with events_collector(signal) as collected:
+            with pytest.raises(ExecutorException):
+                self._run(*args, **kwargs)
+        return [event['kwargs']['exception'] for event in collected[signal]]
+
+    @pytest.fixture
+    def executor(self):
+        result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
+        yield result
+        result.close()
+
+    @pytest.fixture
+    def workflow_context(self, tmpdir):
+        result = mock.context.simple(
+            str(tmpdir),
+            context_kwargs=dict(workdir=str(tmpdir.join('workdir')))
+        )
+        yield result
+        storage.release_sqlite_storage(result.model)
+
+    def _put_plugin(self, workflow_context, mock_cfy_plugin=False):
+        name = 'PLUGIN'
+        archive_name = 'ARCHIVE'
+        package_name = 'PACKAGE'
+        package_version = '0.1.1'
+
+        plugin = models.Plugin(
+            name=name,
+            archive_name=archive_name,
+            package_name=package_name,
+            package_version=package_version,
+            uploaded_at=datetime.datetime.now(),
+            wheels=['cloudify_plugins_common'] if mock_cfy_plugin else []
+        )
+
+        workflow_context.model.plugin.put(plugin)
+
+        return plugin
+
+
+@operation
+def _test_node_instance_operation(ctx):
+    with _adapter(ctx) as (adapter, out):
+        node = adapter.node
+        instance = adapter.instance
+        out.update({
+            'node': {
+                'id': node.id,
+                'name': node.name,
+                'properties': copy.deepcopy(node.properties),
+                'type': node.type,
+                'type_hierarchy': node.type_hierarchy
+            },
+            'instance': {
+                'id': instance.id,
+                'runtime_properties': copy.deepcopy(instance.runtime_properties)
+            }
+        })
+        try:
+            assert adapter.source
+            out['source'] = True
+        except TaskAbortException:
+            out['source'] = False
+        try:
+            assert adapter.target
+            out['target'] = True
+        except TaskAbortException:
+            out['target'] = False
+
+
+@operation
+def _test_node_instance_relationships(ctx):
+    with _adapter(ctx) as (adapter, out):
+        relationships = [{'type': r.type,
+                          'type_hierarchy': [t.name for t in r.type_hierarchy],
+                          'target': {'node': {'id': r.target.node.id},
+                                     'instance': {'id': r.target.instance.id}}}
+                         for r in adapter.instance.relationships]
+        out['instance'] = {'relationships': relationships}
+
+
+@operation
+def _test_relationship_operation(ctx):
+    with _adapter(ctx) as (adapter, out):
+        out.update({
+            'source': {'node': {'id': adapter.source.node.id},
+                       'instance': {'id': adapter.source.instance.id}},
+            'target': {'node': {'id': adapter.target.node.id},
+                       'instance': {'id': adapter.target.instance.id}}
+        })
+        try:
+            assert adapter.node
+            out['node'] = True
+        except TaskAbortException:
+            out['node'] = False
+        try:
+            assert adapter.instance
+            out['instance'] = True
+        except TaskAbortException:
+            out['instance'] = False
+
+
+@operation
+def _test_host_ip(ctx):
+    with _adapter(ctx) as (adapter, out):
+        out['instance'] = {'host_ip': adapter.instance.host_ip}
+
+
+@operation
+def _test_get_and_download_resource_and_render(ctx, resource, variable):
+    with _adapter(ctx) as (adapter, out):
+        out.update({
+            'get_resource': adapter.get_resource(resource),
+            'get_resource_and_render': adapter.get_resource_and_render(
+                resource, template_variables={'variable': variable}
+            ),
+            'download_resource': adapter.download_resource(resource),
+            'download_resource_and_render': adapter.download_resource_and_render(
+                resource, template_variables={'variable': variable}
+            )
+        })
+
+
+@operation
+def _test_retry(ctx, message, retry_interval):
+    with _adapter(ctx) as (adapter, out):
+        op = adapter.operation
+        out['operation'] = {'retry_number': op.retry_number, 'max_retries': op.max_retries}
+        op.retry(message, retry_after=retry_interval)
+
+
+@operation
+def _test_logger_and_send_event(ctx, message, event):
+    with _adapter(ctx) as (adapter, _):
+        adapter.logger.info(message)
+        adapter.send_event(event)
+
+
+@operation
+def _test_plugin(ctx):
+    with _adapter(ctx) as (adapter, out):
+        plugin = adapter.plugin
+        out['plugin'] = {
+            'name': plugin.name,
+            'package_name': plugin.package_name,
+            'package_version': plugin.package_version,
+            'workdir': plugin.workdir
+        }
+
+
+@operation
+def _test_importable_ctx_and_inputs(**_):
+    from cloudify import ctx
+    from cloudify.state import ctx_parameters
+    ctx.instance.runtime_properties['out'] = {'inputs': dict(ctx_parameters)}
+
+
+@operation
+def _test_non_recoverable_error(message, **_):
+    from cloudify.exceptions import NonRecoverableError
+    raise NonRecoverableError(message)
+
+
+@operation
+def _test_recoverable_error(message, retry_interval, **_):
+    from cloudify.exceptions import RecoverableError
+    raise RecoverableError(message, retry_interval)
+
+
+def _test_common(out, ctx, adapter):
+    op = adapter.operation
+    bootstrap_context = adapter.bootstrap_context
+    out.update({
+        'type': adapter.type,
+        'execution_id': adapter.execution_id,
+        'workflow_id': adapter.workflow_id,
+        'rest_token': adapter.rest_token,
+        'task_id': (adapter.task_id, ctx.task.id),
+        'task_name': (adapter.task_name, ctx.task.function),
+        'task_target': adapter.task_target,
+        'task_queue': adapter.task_queue,
+        'provider_context': adapter.provider_context,
+        'blueprint': {'id': adapter.blueprint.id},
+        'deployment': {'id': adapter.deployment.id},
+        'operation': {
+            'name': [op.name, ctx.name.split('@')[0].replace(':', '.')],
+            'retry_number': [op.retry_number, ctx.task.attempts_count - 1],
+            'max_retries': [op.max_retries, ctx.task.max_attempts]
+        },
+        'bootstrap_context': {
+            'broker_config': bootstrap_context.broker_config('arg1', 'arg2', arg3='arg3'),
+            # All attribute access of cloudify_agent returns none
+            'cloudify_agent': {'any': bootstrap_context.cloudify_agent.any},
+            'resources_prefix': bootstrap_context.resources_prefix
+        },
+        'agent': {
+            'init_script': adapter.agent.init_script('arg1', 'arg2', arg3='arg3')
+        }
+    })
+
+
+@contextlib.contextmanager
+def _adapter(ctx):
+    out = {}
+    adapter = context_adapter.CloudifyContextAdapter(ctx)
+    _test_common(out, ctx, adapter)
+    try:
+        yield adapter, out
+    finally:
+        try:
+            instance = adapter.instance
+        except TaskAbortException:
+            instance = adapter.source.instance
+        instance.runtime_properties['out'] = out