vFW and vDNS support added to azure-plugin
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / tests / orchestrator / context / test_serialize.py
1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements.  See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License.  You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import pytest
17
18 from aria.orchestrator.workflows import api
19 from aria.orchestrator.workflows.core import engine, graph_compiler
20 from aria.orchestrator.workflows.executor import process
21 from aria.orchestrator import workflow, operation
22 import tests
23 from tests import mock
24 from tests import storage
25
26 TEST_FILE_CONTENT = 'CONTENT'
27 TEST_FILE_ENTRY_ID = 'entry'
28 TEST_FILE_NAME = 'test_file'
29
30
31 def test_serialize_operation_context(context, executor, tmpdir):
32     test_file = tmpdir.join(TEST_FILE_NAME)
33     test_file.write(TEST_FILE_CONTENT)
34     resource = context.resource
35     resource.service_template.upload(TEST_FILE_ENTRY_ID, str(test_file))
36
37     node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
38     plugin = mock.models.create_plugin()
39     context.model.plugin.put(plugin)
40     interface = mock.models.create_interface(
41         node.service,
42         'test',
43         'op',
44         operation_kwargs=dict(function=_operation_mapping(),
45                               plugin=plugin)
46     )
47     node.interfaces[interface.name] = interface
48     context.model.node.update(node)
49
50     graph = _mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
51     graph_compiler.GraphCompiler(context, executor.__class__).compile(graph)
52     eng = engine.Engine({executor.__class__: executor})
53     eng.execute(context)
54
55
56 @workflow
57 def _mock_workflow(ctx, graph):
58     node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
59     graph.add_tasks(api.task.OperationTask(node, interface_name='test', operation_name='op'))
60     return graph
61
62
63 @operation
64 def _mock_operation(ctx):
65     # We test several things in this operation
66     # ctx.task, ctx.node, etc... tell us that the model storage was properly re-created
67     # a correct ctx.task.function tells us we kept the correct task_id
68     assert ctx.task.function == _operation_mapping()
69     # a correct ctx.node.name tells us we kept the correct actor_id
70     assert ctx.node.name == mock.models.DEPENDENCY_NODE_NAME
71     # a correct ctx.name tells us we kept the correct name
72     assert ctx.name is not None
73     assert ctx.name == ctx.task.name
74     # a correct ctx.deployment.name tells us we kept the correct deployment_id
75     assert ctx.service.name == mock.models.SERVICE_NAME
76     # Here we test that the resource storage was properly re-created
77     test_file_content = ctx.resource.service_template.read(TEST_FILE_ENTRY_ID, TEST_FILE_NAME)
78     assert test_file_content == TEST_FILE_CONTENT
79     # a non empty plugin workdir tells us that we kept the correct base_workdir
80     assert ctx.plugin_workdir is not None
81
82
83 def _operation_mapping():
84     return '{name}.{func.__name__}'.format(name=__name__, func=_mock_operation)
85
86
87 @pytest.fixture
88 def executor():
89     result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
90     try:
91         yield result
92     finally:
93         result.close()
94
95
96 @pytest.fixture
97 def context(tmpdir):
98     result = mock.context.simple(
99         str(tmpdir),
100         context_kwargs=dict(workdir=str(tmpdir.join('workdir')))
101     )
102
103     yield result
104     storage.release_sqlite_storage(result.model)