1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements. See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License. You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 from aria import workflow, operation
19 from aria.modeling import models
20 from aria.orchestrator import context
21 from aria.orchestrator.workflows import api
22 from aria.orchestrator.workflows.executor import thread
36 def workflow_context(tmpdir):
37 context = mock.context.simple(str(tmpdir))
39 storage.release_sqlite_storage(context.model)
44 result = thread.ThreadExecutor()
52 def dataholder(tmpdir):
53 dataholder_path = str(tmpdir.join('dataholder'))
54 holder = helpers.FilesystemDataHolder(dataholder_path)
58 def _get_elements(workflow_context):
59 dependency_node_template = workflow_context.model.node_template.get_by_name(
60 mock.models.DEPENDENCY_NODE_TEMPLATE_NAME)
61 dependency_node_template.host = dependency_node_template
62 workflow_context.model.node.update(dependency_node_template)
64 dependency_node = workflow_context.model.node.get_by_name(
65 mock.models.DEPENDENCY_NODE_NAME)
66 dependency_node.host_fk = dependency_node.id
67 workflow_context.model.node.update(dependency_node)
69 dependent_node_template = workflow_context.model.node_template.get_by_name(
70 mock.models.DEPENDENT_NODE_TEMPLATE_NAME)
71 dependent_node_template.host = dependency_node_template
72 workflow_context.model.node_template.update(dependent_node_template)
74 dependent_node = workflow_context.model.node.get_by_name(
75 mock.models.DEPENDENT_NODE_NAME)
76 dependent_node.host = dependent_node
77 workflow_context.model.node.update(dependent_node)
79 relationship = workflow_context.model.relationship.list()[0]
80 return dependency_node_template, dependency_node, dependent_node_template, dependent_node, \
84 def test_host_ip(workflow_context, executor, dataholder):
86 interface_name = 'Standard'
87 operation_name = 'create'
88 _, dependency_node, _, _, _ = _get_elements(workflow_context)
89 arguments = {'putput': True, 'holder_path': dataholder.path}
90 interface = mock.models.create_interface(
91 dependency_node.service,
92 interface_name=interface_name,
93 operation_name=operation_name,
94 operation_kwargs=dict(function=op_path(host_ip, module_path=__name__), arguments=arguments)
96 dependency_node.interfaces[interface.name] = interface
97 dependency_node.attributes['ip'] = models.Attribute.wrap('ip', '1.1.1.1')
99 workflow_context.model.node.update(dependency_node)
102 def basic_workflow(graph, **_):
104 api.task.OperationTask(
106 interface_name=interface_name,
107 operation_name=operation_name,
112 execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor)
114 assert dataholder.get('host_ip') == dependency_node.attributes.get('ip').value
117 def test_relationship_tool_belt(workflow_context, executor, dataholder):
118 interface_name = 'Configure'
119 operation_name = 'post_configure'
120 _, _, _, _, relationship = _get_elements(workflow_context)
121 arguments = {'putput': True, 'holder_path': dataholder.path}
122 interface = mock.models.create_interface(
123 relationship.source_node.service,
124 interface_name=interface_name,
125 operation_name=operation_name,
126 operation_kwargs=dict(function=op_path(relationship_operation, module_path=__name__),
129 relationship.interfaces[interface.name] = interface
130 workflow_context.model.relationship.update(relationship)
133 def basic_workflow(graph, **_):
135 api.task.OperationTask(
137 interface_name=interface_name,
138 operation_name=operation_name,
143 execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor)
145 assert dataholder.get(api.task.OperationTask.NAME_FORMAT.format(
147 name=relationship.name,
148 interface=interface_name,
149 operation=operation_name)) == relationship.source_node.name
152 def test_wrong_model_toolbelt():
153 with pytest.raises(RuntimeError):
154 context.toolbelt(None)
157 @operation(toolbelt=True)
158 def host_ip(toolbelt, holder_path, **_):
159 helpers.FilesystemDataHolder(holder_path)['host_ip'] = toolbelt.host_ip
162 @operation(toolbelt=True)
163 def relationship_operation(ctx, toolbelt, holder_path, **_):
164 helpers.FilesystemDataHolder(holder_path)[ctx.name] = toolbelt._op_context.source_node.name