1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements. See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License. You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
19 from aria.orchestrator import context
20 from aria.orchestrator.workflows import api
22 from tests import mock, storage
28 Create the following graph in storage:
29 dependency_node <------ dependent_node
32 simple_context = mock.context.simple(str(tmpdir), inmemory=False)
33 simple_context.model.execution.put(mock.models.create_execution(simple_context.service))
35 storage.release_sqlite_storage(simple_context.model)
38 class TestOperationTask(object):
40 def test_node_operation_task_creation(self, ctx):
41 interface_name = 'test_interface'
42 operation_name = 'create'
44 plugin = mock.models.create_plugin('test_plugin', '0.1')
45 ctx.model.node.update(plugin)
47 arguments = {'test_input': True}
49 interface = mock.models.create_interface(
53 operation_kwargs=dict(plugin=plugin,
55 arguments=arguments),)
57 node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
58 node.interfaces[interface_name] = interface
59 ctx.model.node.update(node)
64 with context.workflow.current.push(ctx):
65 api_task = api.task.OperationTask(
67 interface_name=interface_name,
68 operation_name=operation_name,
70 max_attempts=max_attempts,
71 retry_interval=retry_interval,
72 ignore_failure=ignore_failure)
74 assert api_task.name == api.task.OperationTask.NAME_FORMAT.format(
77 interface=interface_name,
78 operation=operation_name
80 assert api_task.function == 'op_path'
81 assert api_task.actor == node
82 assert api_task.arguments['test_input'].value is True
83 assert api_task.retry_interval == retry_interval
84 assert api_task.max_attempts == max_attempts
85 assert api_task.ignore_failure == ignore_failure
86 assert api_task.plugin.name == 'test_plugin'
88 def test_source_relationship_operation_task_creation(self, ctx):
89 interface_name = 'test_interface'
90 operation_name = 'preconfigure'
92 plugin = mock.models.create_plugin('test_plugin', '0.1')
93 ctx.model.plugin.update(plugin)
95 arguments = {'test_input': True}
97 interface = mock.models.create_interface(
101 operation_kwargs=dict(plugin=plugin,
106 relationship = ctx.model.relationship.list()[0]
107 relationship.interfaces[interface.name] = interface
111 with context.workflow.current.push(ctx):
112 api_task = api.task.OperationTask(
114 interface_name=interface_name,
115 operation_name=operation_name,
117 max_attempts=max_attempts,
118 retry_interval=retry_interval)
120 assert api_task.name == api.task.OperationTask.NAME_FORMAT.format(
122 name=relationship.name,
123 interface=interface_name,
124 operation=operation_name
126 assert api_task.function == 'op_path'
127 assert api_task.actor == relationship
128 assert api_task.arguments['test_input'].value is True
129 assert api_task.retry_interval == retry_interval
130 assert api_task.max_attempts == max_attempts
131 assert api_task.plugin.name == 'test_plugin'
133 def test_target_relationship_operation_task_creation(self, ctx):
134 interface_name = 'test_interface'
135 operation_name = 'preconfigure'
137 plugin = mock.models.create_plugin('test_plugin', '0.1')
138 ctx.model.node.update(plugin)
140 arguments = {'test_input': True}
142 interface = mock.models.create_interface(
146 operation_kwargs=dict(plugin=plugin,
151 relationship = ctx.model.relationship.list()[0]
152 relationship.interfaces[interface.name] = interface
156 with context.workflow.current.push(ctx):
157 api_task = api.task.OperationTask(
159 interface_name=interface_name,
160 operation_name=operation_name,
162 max_attempts=max_attempts,
163 retry_interval=retry_interval)
165 assert api_task.name == api.task.OperationTask.NAME_FORMAT.format(
167 name=relationship.name,
168 interface=interface_name,
169 operation=operation_name
171 assert api_task.function == 'op_path'
172 assert api_task.actor == relationship
173 assert api_task.arguments['test_input'].value is True
174 assert api_task.retry_interval == retry_interval
175 assert api_task.max_attempts == max_attempts
176 assert api_task.plugin.name == 'test_plugin'
178 def test_operation_task_default_values(self, ctx):
179 interface_name = 'test_interface'
180 operation_name = 'create'
182 plugin = mock.models.create_plugin('package', '0.1')
183 ctx.model.node.update(plugin)
185 dependency_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
187 interface = mock.models.create_interface(
191 operation_kwargs=dict(plugin=plugin,
193 dependency_node.interfaces[interface_name] = interface
195 with context.workflow.current.push(ctx):
196 task = api.task.OperationTask(
198 interface_name=interface_name,
199 operation_name=operation_name)
201 assert task.arguments == {}
202 assert task.retry_interval == ctx._task_retry_interval
203 assert task.max_attempts == ctx._task_max_attempts
204 assert task.ignore_failure == ctx._task_ignore_failure
205 assert task.plugin is plugin
208 class TestWorkflowTask(object):
210 def test_workflow_task_creation(self, ctx):
214 mock_class = type('mock_class', (object,), {'test_attribute': True})
216 def sub_workflow(**kwargs):
217 workspace.update(kwargs)
220 with context.workflow.current.push(ctx):
221 workflow_task = api.task.WorkflowTask(sub_workflow, kwarg='workflow_kwarg')
222 assert workflow_task.graph is mock_class
223 assert workflow_task.test_attribute is True