vFW and vDNS support added to azure-plugin
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / tests / orchestrator / workflows / api / test_task.py
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/test_task.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/test_task.py
new file mode 100644 (file)
index 0000000..9d91b6b
--- /dev/null
@@ -0,0 +1,223 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import pytest
+
+from aria.orchestrator import context
+from aria.orchestrator.workflows import api
+
+from tests import mock, storage
+
+
+@pytest.fixture
+def ctx(tmpdir):
+    """
+    Create the following graph in storage:
+    dependency_node <------ dependent_node
+    :return:
+    """
+    simple_context = mock.context.simple(str(tmpdir), inmemory=False)
+    simple_context.model.execution.put(mock.models.create_execution(simple_context.service))
+    yield simple_context
+    storage.release_sqlite_storage(simple_context.model)
+
+
+class TestOperationTask(object):
+
+    def test_node_operation_task_creation(self, ctx):
+        interface_name = 'test_interface'
+        operation_name = 'create'
+
+        plugin = mock.models.create_plugin('test_plugin', '0.1')
+        ctx.model.node.update(plugin)
+
+        arguments = {'test_input': True}
+
+        interface = mock.models.create_interface(
+            ctx.service,
+            interface_name,
+            operation_name,
+            operation_kwargs=dict(plugin=plugin,
+                                  function='op_path',
+                                  arguments=arguments),)
+
+        node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
+        node.interfaces[interface_name] = interface
+        ctx.model.node.update(node)
+        max_attempts = 10
+        retry_interval = 10
+        ignore_failure = True
+
+        with context.workflow.current.push(ctx):
+            api_task = api.task.OperationTask(
+                node,
+                interface_name=interface_name,
+                operation_name=operation_name,
+                arguments=arguments,
+                max_attempts=max_attempts,
+                retry_interval=retry_interval,
+                ignore_failure=ignore_failure)
+
+        assert api_task.name == api.task.OperationTask.NAME_FORMAT.format(
+            type='node',
+            name=node.name,
+            interface=interface_name,
+            operation=operation_name
+        )
+        assert api_task.function == 'op_path'
+        assert api_task.actor == node
+        assert api_task.arguments['test_input'].value is True
+        assert api_task.retry_interval == retry_interval
+        assert api_task.max_attempts == max_attempts
+        assert api_task.ignore_failure == ignore_failure
+        assert api_task.plugin.name == 'test_plugin'
+
+    def test_source_relationship_operation_task_creation(self, ctx):
+        interface_name = 'test_interface'
+        operation_name = 'preconfigure'
+
+        plugin = mock.models.create_plugin('test_plugin', '0.1')
+        ctx.model.plugin.update(plugin)
+
+        arguments = {'test_input': True}
+
+        interface = mock.models.create_interface(
+            ctx.service,
+            interface_name,
+            operation_name,
+            operation_kwargs=dict(plugin=plugin,
+                                  function='op_path',
+                                  arguments=arguments)
+        )
+
+        relationship = ctx.model.relationship.list()[0]
+        relationship.interfaces[interface.name] = interface
+        max_attempts = 10
+        retry_interval = 10
+
+        with context.workflow.current.push(ctx):
+            api_task = api.task.OperationTask(
+                relationship,
+                interface_name=interface_name,
+                operation_name=operation_name,
+                arguments=arguments,
+                max_attempts=max_attempts,
+                retry_interval=retry_interval)
+
+        assert api_task.name == api.task.OperationTask.NAME_FORMAT.format(
+            type='relationship',
+            name=relationship.name,
+            interface=interface_name,
+            operation=operation_name
+        )
+        assert api_task.function == 'op_path'
+        assert api_task.actor == relationship
+        assert api_task.arguments['test_input'].value is True
+        assert api_task.retry_interval == retry_interval
+        assert api_task.max_attempts == max_attempts
+        assert api_task.plugin.name == 'test_plugin'
+
+    def test_target_relationship_operation_task_creation(self, ctx):
+        interface_name = 'test_interface'
+        operation_name = 'preconfigure'
+
+        plugin = mock.models.create_plugin('test_plugin', '0.1')
+        ctx.model.node.update(plugin)
+
+        arguments = {'test_input': True}
+
+        interface = mock.models.create_interface(
+            ctx.service,
+            interface_name,
+            operation_name,
+            operation_kwargs=dict(plugin=plugin,
+                                  function='op_path',
+                                  arguments=arguments)
+        )
+
+        relationship = ctx.model.relationship.list()[0]
+        relationship.interfaces[interface.name] = interface
+        max_attempts = 10
+        retry_interval = 10
+
+        with context.workflow.current.push(ctx):
+            api_task = api.task.OperationTask(
+                relationship,
+                interface_name=interface_name,
+                operation_name=operation_name,
+                arguments=arguments,
+                max_attempts=max_attempts,
+                retry_interval=retry_interval)
+
+        assert api_task.name == api.task.OperationTask.NAME_FORMAT.format(
+            type='relationship',
+            name=relationship.name,
+            interface=interface_name,
+            operation=operation_name
+        )
+        assert api_task.function == 'op_path'
+        assert api_task.actor == relationship
+        assert api_task.arguments['test_input'].value is True
+        assert api_task.retry_interval == retry_interval
+        assert api_task.max_attempts == max_attempts
+        assert api_task.plugin.name == 'test_plugin'
+
+    def test_operation_task_default_values(self, ctx):
+        interface_name = 'test_interface'
+        operation_name = 'create'
+
+        plugin = mock.models.create_plugin('package', '0.1')
+        ctx.model.node.update(plugin)
+
+        dependency_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+
+        interface = mock.models.create_interface(
+            ctx.service,
+            interface_name,
+            operation_name,
+            operation_kwargs=dict(plugin=plugin,
+                                  function='op_path'))
+        dependency_node.interfaces[interface_name] = interface
+
+        with context.workflow.current.push(ctx):
+            task = api.task.OperationTask(
+                dependency_node,
+                interface_name=interface_name,
+                operation_name=operation_name)
+
+        assert task.arguments == {}
+        assert task.retry_interval == ctx._task_retry_interval
+        assert task.max_attempts == ctx._task_max_attempts
+        assert task.ignore_failure == ctx._task_ignore_failure
+        assert task.plugin is plugin
+
+
+class TestWorkflowTask(object):
+
+    def test_workflow_task_creation(self, ctx):
+
+        workspace = {}
+
+        mock_class = type('mock_class', (object,), {'test_attribute': True})
+
+        def sub_workflow(**kwargs):
+            workspace.update(kwargs)
+            return mock_class
+
+        with context.workflow.current.push(ctx):
+            workflow_task = api.task.WorkflowTask(sub_workflow, kwarg='workflow_kwarg')
+            assert workflow_task.graph is mock_class
+            assert workflow_task.test_attribute is True