vFW and vDNS support added to azure-plugin
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / tests / orchestrator / workflows / executor / test_process_executor_tracked_changes.py
1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements.  See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License.  You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import copy
17
18 import pytest
19
20 from aria.orchestrator.workflows import api
21 from aria.orchestrator.workflows.core import engine, graph_compiler
22 from aria.orchestrator.workflows.executor import process
23 from aria.orchestrator import workflow, operation
24 from aria.orchestrator.workflows import exceptions
25
26 import tests
27 from tests import mock
28 from tests import storage
29
30
31 _TEST_ATTRIBUTES = {
32     'some': 'values', 'that': 'are', 'most': 'likely', 'only': 'set', 'here': 'yo'
33 }
34
35
36 def test_track_changes_of_successful_operation(context, executor):
37     _run_workflow(context=context, executor=executor, op_func=_mock_success_operation)
38     _assert_tracked_changes_are_applied(context)
39
40
41 def test_track_changes_of_failed_operation(context, executor):
42     with pytest.raises(exceptions.ExecutorException):
43         _run_workflow(context=context, executor=executor, op_func=_mock_fail_operation)
44     _assert_tracked_changes_are_applied(context)
45
46
47 def _assert_tracked_changes_are_applied(context):
48     instance = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
49     assert all(instance.attributes[key].value == value
50                for key, value in _TEST_ATTRIBUTES.items())
51
52
53 def _update_attributes(context):
54     context.node.attributes.clear()
55     context.node.attributes.update(_TEST_ATTRIBUTES)
56
57
58 def test_refresh_state_of_tracked_attributes(context, executor):
59     out = _run_workflow(context=context, executor=executor, op_func=_mock_refreshing_operation)
60     assert out['after_refresh'] == out['after_change']
61     assert out['initial'] != out['after_change']
62
63
64 def test_apply_tracked_changes_during_an_operation(context, executor):
65     arguments = {
66         'committed': {'some': 'new', 'properties': 'right here'},
67         'changed_but_refreshed': {'some': 'newer', 'properties': 'right there'}
68     }
69
70     expected_initial = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes
71     out = _run_workflow(
72         context=context, executor=executor, op_func=_mock_updating_operation, arguments=arguments)
73
74     expected_after_update = expected_initial.copy()
75     expected_after_update.update(arguments['committed']) # pylint: disable=no-member
76     expected_after_change = expected_after_update.copy()
77     expected_after_change.update(arguments['changed_but_refreshed']) # pylint: disable=no-member
78
79     assert out['initial'] == expected_initial
80     assert out['after_update'] == expected_after_update
81     assert out['after_change'] == expected_after_change
82     assert out['after_refresh'] == expected_after_change
83
84
85 def _run_workflow(context, executor, op_func, arguments=None):
86     node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
87     interface_name = 'test_interface'
88     operation_name = 'operation'
89     wf_arguments = arguments or {}
90     interface = mock.models.create_interface(
91         context.service,
92         interface_name,
93         operation_name,
94         operation_kwargs=dict(function=_operation_mapping(op_func),
95                               arguments=wf_arguments)
96     )
97     node.interfaces[interface.name] = interface
98     context.model.node.update(node)
99
100     @workflow
101     def mock_workflow(ctx, graph):
102         task = api.task.OperationTask(
103             node,
104             interface_name=interface_name,
105             operation_name=operation_name,
106             arguments=wf_arguments)
107         graph.add_tasks(task)
108         return graph
109     graph = mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
110     graph_compiler.GraphCompiler(context, executor.__class__).compile(graph)
111     eng = engine.Engine({executor.__class__: executor})
112     eng.execute(context)
113     out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out')
114     return out.value if out else None
115
116
117 @operation
118 def _mock_success_operation(ctx):
119     _update_attributes(ctx)
120
121
122 @operation
123 def _mock_fail_operation(ctx):
124     _update_attributes(ctx)
125     raise RuntimeError
126
127
128 @operation
129 def _mock_refreshing_operation(ctx):
130     out = {'initial': copy.deepcopy(ctx.node.attributes)}
131     ctx.node.attributes.update({'some': 'new', 'properties': 'right here'})
132     out['after_change'] = copy.deepcopy(ctx.node.attributes)
133     ctx.model.node.refresh(ctx.node)
134     out['after_refresh'] = copy.deepcopy(ctx.node.attributes)
135     ctx.node.attributes['out'] = out
136
137
138 @operation
139 def _mock_updating_operation(ctx, committed, changed_but_refreshed):
140     out = {'initial': copy.deepcopy(ctx.node.attributes)}
141     ctx.node.attributes.update(committed)
142     ctx.model.node.update(ctx.node)
143     out['after_update'] = copy.deepcopy(ctx.node.attributes)
144     ctx.node.attributes.update(changed_but_refreshed)
145     out['after_change'] = copy.deepcopy(ctx.node.attributes)
146     ctx.model.node.refresh(ctx.node)
147     out['after_refresh'] = copy.deepcopy(ctx.node.attributes)
148     ctx.node.attributes['out'] = out
149
150
151 def _operation_mapping(func):
152     return '{name}.{func.__name__}'.format(name=__name__, func=func)
153
154
155 @pytest.fixture
156 def executor():
157     result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
158     try:
159         yield result
160     finally:
161         result.close()
162
163
164 @pytest.fixture
165 def context(tmpdir):
166     result = mock.context.simple(str(tmpdir))
167     yield result
168     storage.release_sqlite_storage(result.model)