1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements. See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License. You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 from datetime import (
22 from aria.orchestrator.context import workflow as workflow_context
23 from aria.orchestrator.workflows import (
27 from aria.modeling import models
29 from tests import mock, storage
31 NODE_INTERFACE_NAME = 'Standard'
32 NODE_OPERATION_NAME = 'create'
33 RELATIONSHIP_INTERFACE_NAME = 'Configure'
34 RELATIONSHIP_OPERATION_NAME = 'pre_configure'
39 context = mock.context.simple(str(tmpdir))
41 relationship = context.model.relationship.list()[0]
42 interface = mock.models.create_interface(
43 relationship.source_node.service,
44 RELATIONSHIP_INTERFACE_NAME,
45 RELATIONSHIP_OPERATION_NAME,
46 operation_kwargs=dict(function='test')
48 relationship.interfaces[interface.name] = interface
49 context.model.relationship.update(relationship)
51 node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
52 interface = mock.models.create_interface(
56 operation_kwargs=dict(function='test')
58 node.interfaces[interface.name] = interface
59 context.model.node.update(node)
62 storage.release_sqlite_storage(context.model)
65 class TestOperationTask(object):
67 def _create_node_operation_task(self, ctx, node):
68 with workflow_context.current.push(ctx):
69 api_task = api.task.OperationTask(
71 interface_name=NODE_INTERFACE_NAME,
72 operation_name=NODE_OPERATION_NAME)
73 model_task = models.Task.from_api_task(api_task, None)
74 return api_task, model_task
76 def _create_relationship_operation_task(self, ctx, relationship):
77 with workflow_context.current.push(ctx):
78 api_task = api.task.OperationTask(
80 interface_name=RELATIONSHIP_INTERFACE_NAME,
81 operation_name=RELATIONSHIP_OPERATION_NAME)
82 core_task = models.Task.from_api_task(api_task, None)
83 return api_task, core_task
85 def test_node_operation_task_creation(self, ctx):
86 storage_plugin = mock.models.create_plugin('p1', '0.1')
87 storage_plugin_other = mock.models.create_plugin('p0', '0.0')
88 ctx.model.plugin.put(storage_plugin)
89 ctx.model.plugin.put(storage_plugin_other)
90 node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
91 interface = mock.models.create_interface(
95 operation_kwargs=dict(plugin=storage_plugin, function='test')
97 node.interfaces[interface.name] = interface
98 ctx.model.node.update(node)
99 api_task, model_task = self._create_node_operation_task(ctx, node)
100 assert model_task.name == api_task.name
101 assert model_task.function == api_task.function
102 assert model_task.actor == api_task.actor == node
103 assert model_task.arguments == api_task.arguments
104 assert model_task.plugin == storage_plugin
106 def test_relationship_operation_task_creation(self, ctx):
107 relationship = ctx.model.relationship.list()[0]
108 ctx.model.relationship.update(relationship)
109 _, model_task = self._create_relationship_operation_task(
111 assert model_task.actor == relationship
113 @pytest.mark.skip("Currently not supported for model tasks")
114 def test_operation_task_edit_locked_attribute(self, ctx):
115 node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
117 _, core_task = self._create_node_operation_task(ctx, node)
118 now = datetime.utcnow()
119 with pytest.raises(exceptions.TaskException):
120 core_task.status = core_task.STARTED
121 with pytest.raises(exceptions.TaskException):
122 core_task.started_at = now
123 with pytest.raises(exceptions.TaskException):
124 core_task.ended_at = now
125 with pytest.raises(exceptions.TaskException):
126 core_task.attempts_count = 2
127 with pytest.raises(exceptions.TaskException):
128 core_task.due_at = now
130 @pytest.mark.skip("Currently not supported for model tasks")
131 def test_operation_task_edit_attributes(self, ctx):
132 node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
134 _, core_task = self._create_node_operation_task(ctx, node)
135 future_time = datetime.utcnow() + timedelta(seconds=3)
137 with core_task._update():
138 core_task.status = core_task.STARTED
139 core_task.started_at = future_time
140 core_task.ended_at = future_time
141 core_task.attempts_count = 2
142 core_task.due_at = future_time
143 assert core_task.status != core_task.STARTED
144 assert core_task.started_at != future_time
145 assert core_task.ended_at != future_time
146 assert core_task.attempts_count != 2
147 assert core_task.due_at != future_time
149 assert core_task.status == core_task.STARTED
150 assert core_task.started_at == future_time
151 assert core_task.ended_at == future_time
152 assert core_task.attempts_count == 2
153 assert core_task.due_at == future_time