1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements. See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License. You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
21 from aria.orchestrator.workflows.executor import process, thread
27 from aria.orchestrator import context
28 from aria.orchestrator.workflows import api
44 context = mock.context.simple(
46 context_kwargs=dict(workdir=str(tmpdir.join('workdir')))
49 storage.release_sqlite_storage(context.model)
53 def thread_executor():
54 result = thread.ThreadExecutor()
62 def dataholder(tmpdir):
63 dataholder_path = str(tmpdir.join('dataholder'))
64 holder = helpers.FilesystemDataHolder(dataholder_path)
68 def test_node_operation_task_execution(ctx, thread_executor, dataholder):
69 interface_name = 'Standard'
70 operation_name = 'create'
72 arguments = {'putput': True, 'holder_path': dataholder.path}
73 node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
74 interface = mock.models.create_interface(
78 operation_kwargs=dict(function=op_path(basic_node_operation, module_path=__name__),
81 node.interfaces[interface.name] = interface
82 ctx.model.node.update(node)
85 def basic_workflow(graph, **_):
87 api.task.OperationTask(
89 interface_name=interface_name,
90 operation_name=operation_name,
95 execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor)
97 assert dataholder['ctx_name'] == context.operation.NodeOperationContext.__name__
99 # Task bases assertions
100 assert dataholder['actor_name'] == node.name
101 assert dataholder['task_name'] == api.task.OperationTask.NAME_FORMAT.format(
104 interface=interface_name,
105 operation=operation_name
107 operations = interface.operations
108 assert len(operations) == 1
109 assert dataholder['function'] == operations.values()[0].function # pylint: disable=no-member
110 assert dataholder['arguments']['putput'] is True
112 # Context based attributes (sugaring)
113 assert dataholder['template_name'] == node.node_template.name
114 assert dataholder['node_name'] == node.name
117 def test_relationship_operation_task_execution(ctx, thread_executor, dataholder):
118 interface_name = 'Configure'
119 operation_name = 'post_configure'
121 arguments = {'putput': True, 'holder_path': dataholder.path}
122 relationship = ctx.model.relationship.list()[0]
123 interface = mock.models.create_interface(
124 relationship.source_node.service,
127 operation_kwargs=dict(function=op_path(basic_relationship_operation, module_path=__name__),
128 arguments=arguments),
131 relationship.interfaces[interface.name] = interface
132 ctx.model.relationship.update(relationship)
135 def basic_workflow(graph, **_):
137 api.task.OperationTask(
139 interface_name=interface_name,
140 operation_name=operation_name,
145 execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor)
147 assert dataholder['ctx_name'] == context.operation.RelationshipOperationContext.__name__
149 # Task bases assertions
150 assert dataholder['actor_name'] == relationship.name
151 assert interface_name in dataholder['task_name']
152 operations = interface.operations
153 assert dataholder['function'] == operations.values()[0].function # pylint: disable=no-member
154 assert dataholder['arguments']['putput'] is True
156 # Context based attributes (sugaring)
157 dependency_node_template = ctx.model.node_template.get_by_name(
158 mock.models.DEPENDENCY_NODE_TEMPLATE_NAME)
159 dependency_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
160 dependent_node_template = ctx.model.node_template.get_by_name(
161 mock.models.DEPENDENT_NODE_TEMPLATE_NAME)
162 dependent_node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
164 assert dataholder['target_node_template_name'] == dependency_node_template.name
165 assert dataholder['target_node_name'] == dependency_node.name
166 assert dataholder['relationship_name'] == relationship.name
167 assert dataholder['source_node_template_name'] == dependent_node_template.name
168 assert dataholder['source_node_name'] == dependent_node.name
171 def test_invalid_task_operation_id(ctx, thread_executor, dataholder):
173 Checks that the right id is used. The task created with id == 1, thus running the task on
174 node with id == 2. will check that indeed the node uses the correct id.
176 :param thread_executor:
179 interface_name = 'Standard'
180 operation_name = 'create'
182 other_node, node = ctx.model.node.list()
183 assert other_node.id == 1
186 interface = mock.models.create_interface(
188 interface_name=interface_name,
189 operation_name=operation_name,
190 operation_kwargs=dict(function=op_path(get_node_id, module_path=__name__),
191 arguments={'holder_path': dataholder.path})
193 node.interfaces[interface.name] = interface
194 ctx.model.node.update(node)
197 def basic_workflow(graph, **_):
199 api.task.OperationTask(
201 interface_name=interface_name,
202 operation_name=operation_name,
206 execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor)
208 op_node_id = dataholder[api.task.OperationTask.NAME_FORMAT.format(
211 interface=interface_name,
212 operation=operation_name
214 assert op_node_id == node.id
215 assert op_node_id != other_node.id
218 def test_plugin_workdir(ctx, thread_executor, tmpdir):
219 interface_name = 'Standard'
220 operation_name = 'create'
222 plugin = mock.models.create_plugin()
223 ctx.model.plugin.put(plugin)
224 node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
225 filename = 'test_file'
226 content = 'file content'
227 arguments = {'filename': filename, 'content': content}
228 interface = mock.models.create_interface(
232 operation_kwargs=dict(
233 function='{0}.{1}'.format(__name__, _test_plugin_workdir.__name__),
237 node.interfaces[interface.name] = interface
238 ctx.model.node.update(node)
241 def basic_workflow(graph, **_):
242 graph.add_tasks(api.task.OperationTask(
244 interface_name=interface_name,
245 operation_name=operation_name,
246 arguments=arguments))
248 execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor)
249 expected_file = tmpdir.join('workdir', 'plugins', str(ctx.service.id),
252 assert expected_file.read() == content
255 @pytest.fixture(params=[
256 (thread.ThreadExecutor, {}),
257 (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}),
259 def executor(request):
260 ex_cls, kwargs = request.param
261 ex = ex_cls(**kwargs)
268 def test_node_operation_logging(ctx, executor):
269 interface_name, operation_name = mock.operations.NODE_OPERATIONS_INSTALL[0]
271 node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
274 'op_start': 'op_start',
277 interface = mock.models.create_interface(
281 operation_kwargs=dict(
282 function=op_path(logged_operation, module_path=__name__),
285 node.interfaces[interface.name] = interface
286 ctx.model.node.update(node)
289 def basic_workflow(graph, **_):
291 api.task.OperationTask(
293 interface_name=interface_name,
294 operation_name=operation_name,
298 execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
299 _assert_loggins(ctx, arguments)
302 def test_relationship_operation_logging(ctx, executor):
303 interface_name, operation_name = mock.operations.RELATIONSHIP_OPERATIONS_INSTALL[0]
305 relationship = ctx.model.relationship.list()[0]
307 'op_start': 'op_start',
310 interface = mock.models.create_interface(
311 relationship.source_node.service,
314 operation_kwargs=dict(function=op_path(logged_operation, module_path=__name__),
317 relationship.interfaces[interface.name] = interface
318 ctx.model.relationship.update(relationship)
321 def basic_workflow(graph, **_):
323 api.task.OperationTask(
325 interface_name=interface_name,
326 operation_name=operation_name,
331 execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
332 _assert_loggins(ctx, arguments)
335 def test_attribute_consumption(ctx, executor, dataholder):
336 # region Updating node operation
337 node_int_name, node_op_name = mock.operations.NODE_OPERATIONS_INSTALL[0]
339 source_node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
341 arguments = {'dict_': {'key': 'value'},
342 'set_test_dict': {'key2': 'value2'}}
343 interface = mock.models.create_interface(
347 operation_kwargs=dict(
348 function=op_path(attribute_altering_operation, module_path=__name__),
351 source_node.interfaces[interface.name] = interface
352 ctx.model.node.update(source_node)
355 # region updating relationship operation
356 rel_int_name, rel_op_name = mock.operations.RELATIONSHIP_OPERATIONS_INSTALL[2]
358 relationship = ctx.model.relationship.list()[0]
359 interface = mock.models.create_interface(
360 relationship.source_node.service,
363 operation_kwargs=dict(
364 function=op_path(attribute_consuming_operation, module_path=__name__),
365 arguments={'holder_path': dataholder.path}
368 relationship.interfaces[interface.name] = interface
369 ctx.model.relationship.update(relationship)
373 def basic_workflow(graph, **_):
375 api.task.OperationTask(
377 interface_name=node_int_name,
378 operation_name=node_op_name,
381 api.task.OperationTask(
383 interface_name=rel_int_name,
384 operation_name=rel_op_name,
388 execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
389 target_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
391 assert len(source_node.attributes) == len(target_node.attributes) == 2
392 assert source_node.attributes['key'] != target_node.attributes['key']
393 assert source_node.attributes['key'].value == \
394 target_node.attributes['key'].value == \
395 dataholder['key'] == 'value'
397 assert source_node.attributes['key2'] != target_node.attributes['key2']
398 assert source_node.attributes['key2'].value == \
399 target_node.attributes['key2'].value == \
400 dataholder['key2'] == 'value2'
403 def _assert_loggins(ctx, arguments):
404 # The logs should contain the following: Workflow Start, Operation Start, custom operation
405 # log string (op_start), custom operation log string (op_end), Operation End, Workflow End.
407 executions = ctx.model.execution.list()
408 assert len(executions) == 1
409 execution = executions[0]
411 tasks = ctx.model.task.list(filters={'_stub_type': None})
412 assert len(tasks) == 1
414 assert len(task.logs) == 4
416 logs = ctx.model.log.list()
417 assert len(logs) == len(execution.logs) == 6
418 assert set(logs) == set(execution.logs)
420 assert all(l.execution == execution for l in logs)
421 assert all(l in logs and l.task == task for l in task.logs)
423 op_start_log = [l for l in logs if arguments['op_start'] in l.msg and l.level.lower() == 'info']
424 assert len(op_start_log) == 1
425 op_start_log = op_start_log[0]
427 op_end_log = [l for l in logs if arguments['op_end'] in l.msg and l.level.lower() == 'debug']
428 assert len(op_end_log) == 1
429 op_end_log = op_end_log[0]
431 assert op_start_log.created_at < op_end_log.created_at
435 def logged_operation(ctx, **_):
436 ctx.logger.info(ctx.task.arguments['op_start'].value)
437 # enables to check the relation between the created_at field properly
439 ctx.logger.debug(ctx.task.arguments['op_end'].value)
443 def basic_node_operation(ctx, holder_path, **_):
444 holder = helpers.FilesystemDataHolder(holder_path)
446 operation_common(ctx, holder)
447 holder['template_name'] = ctx.node_template.name
448 holder['node_name'] = ctx.node.name
452 def basic_relationship_operation(ctx, holder_path, **_):
453 holder = helpers.FilesystemDataHolder(holder_path)
455 operation_common(ctx, holder)
456 holder['target_node_template_name'] = ctx.target_node_template.name
457 holder['target_node_name'] = ctx.target_node.name
458 holder['relationship_name'] = ctx.relationship.name
459 holder['source_node_template_name'] = ctx.source_node_template.name
460 holder['source_node_name'] = ctx.source_node.name
463 def operation_common(ctx, holder):
464 holder['ctx_name'] = ctx.__class__.__name__
466 holder['actor_name'] = ctx.task.actor.name
467 holder['task_name'] = ctx.task.name
468 holder['function'] = ctx.task.function
469 holder['arguments'] = dict(i.unwrapped for i in ctx.task.arguments.itervalues())
473 def get_node_id(ctx, holder_path, **_):
474 helpers.FilesystemDataHolder(holder_path)[ctx.name] = ctx.node.id
478 def _test_plugin_workdir(ctx, filename, content):
479 with open(os.path.join(ctx.plugin_workdir, filename), 'w') as f:
484 def attribute_altering_operation(ctx, dict_, set_test_dict, **_):
485 ctx.node.attributes.update(dict_)
487 for key, value in set_test_dict.items():
488 ctx.node.attributes[key] = value
492 def attribute_consuming_operation(ctx, holder_path, **_):
493 holder = helpers.FilesystemDataHolder(holder_path)
494 ctx.target_node.attributes.update(ctx.source_node.attributes)
495 holder.update(**ctx.target_node.attributes)
497 ctx.target_node.attributes['key2'] = ctx.source_node.attributes['key2']
498 holder['key2'] = ctx.target_node.attributes['key2']