Merge "vFW and vDNS support added to azure-plugin"
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / tests / orchestrator / workflows / executor / test_process_executor_extension.py
1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements.  See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License.  You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import pytest
17
18 from aria import extension
19 from aria.orchestrator.workflows import api
20 from aria.orchestrator.workflows.core import engine, graph_compiler
21 from aria.orchestrator.workflows.executor import process
22 from aria.orchestrator import workflow, operation
23
24 import tests
25 from tests import mock
26 from tests import storage
27
28
29 def test_decorate_extension(context, executor):
30     arguments = {'arg1': 1, 'arg2': 2}
31
32     def get_node(ctx):
33         return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
34
35     node = get_node(context)
36     interface_name = 'test_interface'
37     operation_name = 'operation'
38     interface = mock.models.create_interface(
39         context.service,
40         interface_name,
41         operation_name,
42         operation_kwargs=dict(function='{0}.{1}'.format(__name__, _mock_operation.__name__),
43                               arguments=arguments)
44     )
45     node.interfaces[interface.name] = interface
46     context.model.node.update(node)
47
48
49     @workflow
50     def mock_workflow(ctx, graph):
51         node = get_node(ctx)
52         task = api.task.OperationTask(
53             node,
54             interface_name=interface_name,
55             operation_name=operation_name,
56             arguments=arguments)
57         graph.add_tasks(task)
58         return graph
59     graph = mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
60     graph_compiler.GraphCompiler(context, executor.__class__).compile(graph)
61     eng = engine.Engine({executor.__class__: executor})
62     eng.execute(context)
63     out = get_node(context).attributes.get('out').value
64     assert out['wrapper_arguments'] == arguments
65     assert out['function_arguments'] == arguments
66
67
68 @extension.process_executor
69 class MockProcessExecutorExtension(object):
70
71     def decorate(self):
72         def decorator(function):
73             def wrapper(ctx, **operation_arguments):
74                 with ctx.model.instrument(ctx.model.node.model_cls.attributes):
75                     ctx.node.attributes['out'] = {'wrapper_arguments': operation_arguments}
76                 function(ctx=ctx, **operation_arguments)
77             return wrapper
78         return decorator
79
80
81 @operation
82 def _mock_operation(ctx, **operation_arguments):
83     ctx.node.attributes['out']['function_arguments'] = operation_arguments
84
85
86 @pytest.fixture
87 def executor():
88     result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
89     try:
90         yield result
91     finally:
92         result.close()
93
94
95 @pytest.fixture
96 def context(tmpdir):
97     result = mock.context.simple(str(tmpdir))
98     yield result
99     storage.release_sqlite_storage(result.model)