vFW and vDNS support added to azure-plugin
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / tests / orchestrator / workflows / api / test_task.py
1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements.  See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License.  You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16
17 import pytest
18
19 from aria.orchestrator import context
20 from aria.orchestrator.workflows import api
21
22 from tests import mock, storage
23
24
25 @pytest.fixture
26 def ctx(tmpdir):
27     """
28     Create the following graph in storage:
29     dependency_node <------ dependent_node
30     :return:
31     """
32     simple_context = mock.context.simple(str(tmpdir), inmemory=False)
33     simple_context.model.execution.put(mock.models.create_execution(simple_context.service))
34     yield simple_context
35     storage.release_sqlite_storage(simple_context.model)
36
37
38 class TestOperationTask(object):
39
40     def test_node_operation_task_creation(self, ctx):
41         interface_name = 'test_interface'
42         operation_name = 'create'
43
44         plugin = mock.models.create_plugin('test_plugin', '0.1')
45         ctx.model.node.update(plugin)
46
47         arguments = {'test_input': True}
48
49         interface = mock.models.create_interface(
50             ctx.service,
51             interface_name,
52             operation_name,
53             operation_kwargs=dict(plugin=plugin,
54                                   function='op_path',
55                                   arguments=arguments),)
56
57         node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
58         node.interfaces[interface_name] = interface
59         ctx.model.node.update(node)
60         max_attempts = 10
61         retry_interval = 10
62         ignore_failure = True
63
64         with context.workflow.current.push(ctx):
65             api_task = api.task.OperationTask(
66                 node,
67                 interface_name=interface_name,
68                 operation_name=operation_name,
69                 arguments=arguments,
70                 max_attempts=max_attempts,
71                 retry_interval=retry_interval,
72                 ignore_failure=ignore_failure)
73
74         assert api_task.name == api.task.OperationTask.NAME_FORMAT.format(
75             type='node',
76             name=node.name,
77             interface=interface_name,
78             operation=operation_name
79         )
80         assert api_task.function == 'op_path'
81         assert api_task.actor == node
82         assert api_task.arguments['test_input'].value is True
83         assert api_task.retry_interval == retry_interval
84         assert api_task.max_attempts == max_attempts
85         assert api_task.ignore_failure == ignore_failure
86         assert api_task.plugin.name == 'test_plugin'
87
88     def test_source_relationship_operation_task_creation(self, ctx):
89         interface_name = 'test_interface'
90         operation_name = 'preconfigure'
91
92         plugin = mock.models.create_plugin('test_plugin', '0.1')
93         ctx.model.plugin.update(plugin)
94
95         arguments = {'test_input': True}
96
97         interface = mock.models.create_interface(
98             ctx.service,
99             interface_name,
100             operation_name,
101             operation_kwargs=dict(plugin=plugin,
102                                   function='op_path',
103                                   arguments=arguments)
104         )
105
106         relationship = ctx.model.relationship.list()[0]
107         relationship.interfaces[interface.name] = interface
108         max_attempts = 10
109         retry_interval = 10
110
111         with context.workflow.current.push(ctx):
112             api_task = api.task.OperationTask(
113                 relationship,
114                 interface_name=interface_name,
115                 operation_name=operation_name,
116                 arguments=arguments,
117                 max_attempts=max_attempts,
118                 retry_interval=retry_interval)
119
120         assert api_task.name == api.task.OperationTask.NAME_FORMAT.format(
121             type='relationship',
122             name=relationship.name,
123             interface=interface_name,
124             operation=operation_name
125         )
126         assert api_task.function == 'op_path'
127         assert api_task.actor == relationship
128         assert api_task.arguments['test_input'].value is True
129         assert api_task.retry_interval == retry_interval
130         assert api_task.max_attempts == max_attempts
131         assert api_task.plugin.name == 'test_plugin'
132
133     def test_target_relationship_operation_task_creation(self, ctx):
134         interface_name = 'test_interface'
135         operation_name = 'preconfigure'
136
137         plugin = mock.models.create_plugin('test_plugin', '0.1')
138         ctx.model.node.update(plugin)
139
140         arguments = {'test_input': True}
141
142         interface = mock.models.create_interface(
143             ctx.service,
144             interface_name,
145             operation_name,
146             operation_kwargs=dict(plugin=plugin,
147                                   function='op_path',
148                                   arguments=arguments)
149         )
150
151         relationship = ctx.model.relationship.list()[0]
152         relationship.interfaces[interface.name] = interface
153         max_attempts = 10
154         retry_interval = 10
155
156         with context.workflow.current.push(ctx):
157             api_task = api.task.OperationTask(
158                 relationship,
159                 interface_name=interface_name,
160                 operation_name=operation_name,
161                 arguments=arguments,
162                 max_attempts=max_attempts,
163                 retry_interval=retry_interval)
164
165         assert api_task.name == api.task.OperationTask.NAME_FORMAT.format(
166             type='relationship',
167             name=relationship.name,
168             interface=interface_name,
169             operation=operation_name
170         )
171         assert api_task.function == 'op_path'
172         assert api_task.actor == relationship
173         assert api_task.arguments['test_input'].value is True
174         assert api_task.retry_interval == retry_interval
175         assert api_task.max_attempts == max_attempts
176         assert api_task.plugin.name == 'test_plugin'
177
178     def test_operation_task_default_values(self, ctx):
179         interface_name = 'test_interface'
180         operation_name = 'create'
181
182         plugin = mock.models.create_plugin('package', '0.1')
183         ctx.model.node.update(plugin)
184
185         dependency_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
186
187         interface = mock.models.create_interface(
188             ctx.service,
189             interface_name,
190             operation_name,
191             operation_kwargs=dict(plugin=plugin,
192                                   function='op_path'))
193         dependency_node.interfaces[interface_name] = interface
194
195         with context.workflow.current.push(ctx):
196             task = api.task.OperationTask(
197                 dependency_node,
198                 interface_name=interface_name,
199                 operation_name=operation_name)
200
201         assert task.arguments == {}
202         assert task.retry_interval == ctx._task_retry_interval
203         assert task.max_attempts == ctx._task_max_attempts
204         assert task.ignore_failure == ctx._task_ignore_failure
205         assert task.plugin is plugin
206
207
208 class TestWorkflowTask(object):
209
210     def test_workflow_task_creation(self, ctx):
211
212         workspace = {}
213
214         mock_class = type('mock_class', (object,), {'test_attribute': True})
215
216         def sub_workflow(**kwargs):
217             workspace.update(kwargs)
218             return mock_class
219
220         with context.workflow.current.push(ctx):
221             workflow_task = api.task.WorkflowTask(sub_workflow, kwarg='workflow_kwarg')
222             assert workflow_task.graph is mock_class
223             assert workflow_task.test_attribute is True