vFW and vDNS support added to azure-plugin
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / tests / orchestrator / context / test_toolbelt.py
1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements.  See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License.  You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import pytest
17
18 from aria import workflow, operation
19 from aria.modeling import models
20 from aria.orchestrator import context
21 from aria.orchestrator.workflows import api
22 from aria.orchestrator.workflows.executor import thread
23
24 from tests import (
25     mock,
26     storage,
27     helpers
28 )
29 from . import (
30     op_path,
31     execute,
32 )
33
34
35 @pytest.fixture
36 def workflow_context(tmpdir):
37     context = mock.context.simple(str(tmpdir))
38     yield context
39     storage.release_sqlite_storage(context.model)
40
41
42 @pytest.fixture
43 def executor():
44     result = thread.ThreadExecutor()
45     try:
46         yield result
47     finally:
48         result.close()
49
50
51 @pytest.fixture
52 def dataholder(tmpdir):
53     dataholder_path = str(tmpdir.join('dataholder'))
54     holder = helpers.FilesystemDataHolder(dataholder_path)
55     return holder
56
57
58 def _get_elements(workflow_context):
59     dependency_node_template = workflow_context.model.node_template.get_by_name(
60         mock.models.DEPENDENCY_NODE_TEMPLATE_NAME)
61     dependency_node_template.host = dependency_node_template
62     workflow_context.model.node.update(dependency_node_template)
63
64     dependency_node = workflow_context.model.node.get_by_name(
65         mock.models.DEPENDENCY_NODE_NAME)
66     dependency_node.host_fk = dependency_node.id
67     workflow_context.model.node.update(dependency_node)
68
69     dependent_node_template = workflow_context.model.node_template.get_by_name(
70         mock.models.DEPENDENT_NODE_TEMPLATE_NAME)
71     dependent_node_template.host = dependency_node_template
72     workflow_context.model.node_template.update(dependent_node_template)
73
74     dependent_node = workflow_context.model.node.get_by_name(
75         mock.models.DEPENDENT_NODE_NAME)
76     dependent_node.host = dependent_node
77     workflow_context.model.node.update(dependent_node)
78
79     relationship = workflow_context.model.relationship.list()[0]
80     return dependency_node_template, dependency_node, dependent_node_template, dependent_node, \
81         relationship
82
83
84 def test_host_ip(workflow_context, executor, dataholder):
85
86     interface_name = 'Standard'
87     operation_name = 'create'
88     _, dependency_node, _, _, _ = _get_elements(workflow_context)
89     arguments = {'putput': True, 'holder_path': dataholder.path}
90     interface = mock.models.create_interface(
91         dependency_node.service,
92         interface_name=interface_name,
93         operation_name=operation_name,
94         operation_kwargs=dict(function=op_path(host_ip, module_path=__name__), arguments=arguments)
95     )
96     dependency_node.interfaces[interface.name] = interface
97     dependency_node.attributes['ip'] = models.Attribute.wrap('ip', '1.1.1.1')
98
99     workflow_context.model.node.update(dependency_node)
100
101     @workflow
102     def basic_workflow(graph, **_):
103         graph.add_tasks(
104             api.task.OperationTask(
105                 dependency_node,
106                 interface_name=interface_name,
107                 operation_name=operation_name,
108                 arguments=arguments
109             )
110         )
111
112     execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor)
113
114     assert dataholder.get('host_ip') == dependency_node.attributes.get('ip').value
115
116
117 def test_relationship_tool_belt(workflow_context, executor, dataholder):
118     interface_name = 'Configure'
119     operation_name = 'post_configure'
120     _, _, _, _, relationship = _get_elements(workflow_context)
121     arguments = {'putput': True, 'holder_path': dataholder.path}
122     interface = mock.models.create_interface(
123         relationship.source_node.service,
124         interface_name=interface_name,
125         operation_name=operation_name,
126         operation_kwargs=dict(function=op_path(relationship_operation, module_path=__name__),
127                               arguments=arguments)
128     )
129     relationship.interfaces[interface.name] = interface
130     workflow_context.model.relationship.update(relationship)
131
132     @workflow
133     def basic_workflow(graph, **_):
134         graph.add_tasks(
135             api.task.OperationTask(
136                 relationship,
137                 interface_name=interface_name,
138                 operation_name=operation_name,
139                 arguments=arguments
140             )
141         )
142
143     execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor)
144
145     assert dataholder.get(api.task.OperationTask.NAME_FORMAT.format(
146         type='relationship',
147         name=relationship.name,
148         interface=interface_name,
149         operation=operation_name)) == relationship.source_node.name
150
151
152 def test_wrong_model_toolbelt():
153     with pytest.raises(RuntimeError):
154         context.toolbelt(None)
155
156
157 @operation(toolbelt=True)
158 def host_ip(toolbelt, holder_path, **_):
159     helpers.FilesystemDataHolder(holder_path)['host_ip'] = toolbelt.host_ip
160
161
162 @operation(toolbelt=True)
163 def relationship_operation(ctx, toolbelt, holder_path, **_):
164     helpers.FilesystemDataHolder(holder_path)[ctx.name] = toolbelt._op_context.source_node.name