vFW and vDNS support added to azure-plugin
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / tests / orchestrator / context / test_serialize.py
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_serialize.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_serialize.py
new file mode 100644 (file)
index 0000000..091e23c
--- /dev/null
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.core import engine, graph_compiler
+from aria.orchestrator.workflows.executor import process
+from aria.orchestrator import workflow, operation
+import tests
+from tests import mock
+from tests import storage
+
+TEST_FILE_CONTENT = 'CONTENT'
+TEST_FILE_ENTRY_ID = 'entry'
+TEST_FILE_NAME = 'test_file'
+
+
+def test_serialize_operation_context(context, executor, tmpdir):
+    test_file = tmpdir.join(TEST_FILE_NAME)
+    test_file.write(TEST_FILE_CONTENT)
+    resource = context.resource
+    resource.service_template.upload(TEST_FILE_ENTRY_ID, str(test_file))
+
+    node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+    plugin = mock.models.create_plugin()
+    context.model.plugin.put(plugin)
+    interface = mock.models.create_interface(
+        node.service,
+        'test',
+        'op',
+        operation_kwargs=dict(function=_operation_mapping(),
+                              plugin=plugin)
+    )
+    node.interfaces[interface.name] = interface
+    context.model.node.update(node)
+
+    graph = _mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
+    graph_compiler.GraphCompiler(context, executor.__class__).compile(graph)
+    eng = engine.Engine({executor.__class__: executor})
+    eng.execute(context)
+
+
+@workflow
+def _mock_workflow(ctx, graph):
+    node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+    graph.add_tasks(api.task.OperationTask(node, interface_name='test', operation_name='op'))
+    return graph
+
+
+@operation
+def _mock_operation(ctx):
+    # We test several things in this operation
+    # ctx.task, ctx.node, etc... tell us that the model storage was properly re-created
+    # a correct ctx.task.function tells us we kept the correct task_id
+    assert ctx.task.function == _operation_mapping()
+    # a correct ctx.node.name tells us we kept the correct actor_id
+    assert ctx.node.name == mock.models.DEPENDENCY_NODE_NAME
+    # a correct ctx.name tells us we kept the correct name
+    assert ctx.name is not None
+    assert ctx.name == ctx.task.name
+    # a correct ctx.deployment.name tells us we kept the correct deployment_id
+    assert ctx.service.name == mock.models.SERVICE_NAME
+    # Here we test that the resource storage was properly re-created
+    test_file_content = ctx.resource.service_template.read(TEST_FILE_ENTRY_ID, TEST_FILE_NAME)
+    assert test_file_content == TEST_FILE_CONTENT
+    # a non empty plugin workdir tells us that we kept the correct base_workdir
+    assert ctx.plugin_workdir is not None
+
+
+def _operation_mapping():
+    return '{name}.{func.__name__}'.format(name=__name__, func=_mock_operation)
+
+
+@pytest.fixture
+def executor():
+    result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
+    try:
+        yield result
+    finally:
+        result.close()
+
+
+@pytest.fixture
+def context(tmpdir):
+    result = mock.context.simple(
+        str(tmpdir),
+        context_kwargs=dict(workdir=str(tmpdir.join('workdir')))
+    )
+
+    yield result
+    storage.release_sqlite_storage(result.model)