vFW and vDNS support added to azure-plugin
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / tests / orchestrator / context / test_operation.py
1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements.  See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License.  You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import os
17 import time
18
19 import pytest
20
21 from aria.orchestrator.workflows.executor import process, thread
22
23 from aria import (
24     workflow,
25     operation,
26 )
27 from aria.orchestrator import context
28 from aria.orchestrator.workflows import api
29
30 import tests
31 from tests import (
32     mock,
33     storage,
34     helpers
35 )
36 from . import (
37     op_path,
38     execute,
39 )
40
41
42 @pytest.fixture
43 def ctx(tmpdir):
44     context = mock.context.simple(
45         str(tmpdir),
46         context_kwargs=dict(workdir=str(tmpdir.join('workdir')))
47     )
48     yield context
49     storage.release_sqlite_storage(context.model)
50
51
52 @pytest.fixture
53 def thread_executor():
54     result = thread.ThreadExecutor()
55     try:
56         yield result
57     finally:
58         result.close()
59
60
61 @pytest.fixture
62 def dataholder(tmpdir):
63     dataholder_path = str(tmpdir.join('dataholder'))
64     holder = helpers.FilesystemDataHolder(dataholder_path)
65     return holder
66
67
68 def test_node_operation_task_execution(ctx, thread_executor, dataholder):
69     interface_name = 'Standard'
70     operation_name = 'create'
71
72     arguments = {'putput': True, 'holder_path': dataholder.path}
73     node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
74     interface = mock.models.create_interface(
75         node.service,
76         interface_name,
77         operation_name,
78         operation_kwargs=dict(function=op_path(basic_node_operation, module_path=__name__),
79                               arguments=arguments)
80     )
81     node.interfaces[interface.name] = interface
82     ctx.model.node.update(node)
83
84     @workflow
85     def basic_workflow(graph, **_):
86         graph.add_tasks(
87             api.task.OperationTask(
88                 node,
89                 interface_name=interface_name,
90                 operation_name=operation_name,
91                 arguments=arguments
92             )
93         )
94
95     execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor)
96
97     assert dataholder['ctx_name'] == context.operation.NodeOperationContext.__name__
98
99     # Task bases assertions
100     assert dataholder['actor_name'] == node.name
101     assert dataholder['task_name'] == api.task.OperationTask.NAME_FORMAT.format(
102         type='node',
103         name=node.name,
104         interface=interface_name,
105         operation=operation_name
106     )
107     operations = interface.operations
108     assert len(operations) == 1
109     assert dataholder['function'] == operations.values()[0].function             # pylint: disable=no-member
110     assert dataholder['arguments']['putput'] is True
111
112     # Context based attributes (sugaring)
113     assert dataholder['template_name'] == node.node_template.name
114     assert dataholder['node_name'] == node.name
115
116
117 def test_relationship_operation_task_execution(ctx, thread_executor, dataholder):
118     interface_name = 'Configure'
119     operation_name = 'post_configure'
120
121     arguments = {'putput': True, 'holder_path': dataholder.path}
122     relationship = ctx.model.relationship.list()[0]
123     interface = mock.models.create_interface(
124         relationship.source_node.service,
125         interface_name,
126         operation_name,
127         operation_kwargs=dict(function=op_path(basic_relationship_operation, module_path=__name__),
128                               arguments=arguments),
129     )
130
131     relationship.interfaces[interface.name] = interface
132     ctx.model.relationship.update(relationship)
133
134     @workflow
135     def basic_workflow(graph, **_):
136         graph.add_tasks(
137             api.task.OperationTask(
138                 relationship,
139                 interface_name=interface_name,
140                 operation_name=operation_name,
141                 arguments=arguments
142             )
143         )
144
145     execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor)
146
147     assert dataholder['ctx_name'] == context.operation.RelationshipOperationContext.__name__
148
149     # Task bases assertions
150     assert dataholder['actor_name'] == relationship.name
151     assert interface_name in dataholder['task_name']
152     operations = interface.operations
153     assert dataholder['function'] == operations.values()[0].function           # pylint: disable=no-member
154     assert dataholder['arguments']['putput'] is True
155
156     # Context based attributes (sugaring)
157     dependency_node_template = ctx.model.node_template.get_by_name(
158         mock.models.DEPENDENCY_NODE_TEMPLATE_NAME)
159     dependency_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
160     dependent_node_template = ctx.model.node_template.get_by_name(
161         mock.models.DEPENDENT_NODE_TEMPLATE_NAME)
162     dependent_node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
163
164     assert dataholder['target_node_template_name'] == dependency_node_template.name
165     assert dataholder['target_node_name'] == dependency_node.name
166     assert dataholder['relationship_name'] == relationship.name
167     assert dataholder['source_node_template_name'] == dependent_node_template.name
168     assert dataholder['source_node_name'] == dependent_node.name
169
170
171 def test_invalid_task_operation_id(ctx, thread_executor, dataholder):
172     """
173     Checks that the right id is used. The task created with id == 1, thus running the task on
174     node with id == 2. will check that indeed the node uses the correct id.
175     :param ctx:
176     :param thread_executor:
177     :return:
178     """
179     interface_name = 'Standard'
180     operation_name = 'create'
181
182     other_node, node = ctx.model.node.list()
183     assert other_node.id == 1
184     assert node.id == 2
185
186     interface = mock.models.create_interface(
187         node.service,
188         interface_name=interface_name,
189         operation_name=operation_name,
190         operation_kwargs=dict(function=op_path(get_node_id, module_path=__name__),
191                               arguments={'holder_path': dataholder.path})
192     )
193     node.interfaces[interface.name] = interface
194     ctx.model.node.update(node)
195
196     @workflow
197     def basic_workflow(graph, **_):
198         graph.add_tasks(
199             api.task.OperationTask(
200                 node,
201                 interface_name=interface_name,
202                 operation_name=operation_name,
203             )
204         )
205
206     execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor)
207
208     op_node_id = dataholder[api.task.OperationTask.NAME_FORMAT.format(
209         type='node',
210         name=node.name,
211         interface=interface_name,
212         operation=operation_name
213     )]
214     assert op_node_id == node.id
215     assert op_node_id != other_node.id
216
217
218 def test_plugin_workdir(ctx, thread_executor, tmpdir):
219     interface_name = 'Standard'
220     operation_name = 'create'
221
222     plugin = mock.models.create_plugin()
223     ctx.model.plugin.put(plugin)
224     node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
225     filename = 'test_file'
226     content = 'file content'
227     arguments = {'filename': filename, 'content': content}
228     interface = mock.models.create_interface(
229         node.service,
230         interface_name,
231         operation_name,
232         operation_kwargs=dict(
233             function='{0}.{1}'.format(__name__, _test_plugin_workdir.__name__),
234             plugin=plugin,
235             arguments=arguments)
236     )
237     node.interfaces[interface.name] = interface
238     ctx.model.node.update(node)
239
240     @workflow
241     def basic_workflow(graph, **_):
242         graph.add_tasks(api.task.OperationTask(
243             node,
244             interface_name=interface_name,
245             operation_name=operation_name,
246             arguments=arguments))
247
248     execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor)
249     expected_file = tmpdir.join('workdir', 'plugins', str(ctx.service.id),
250                                 plugin.name,
251                                 filename)
252     assert expected_file.read() == content
253
254
255 @pytest.fixture(params=[
256     (thread.ThreadExecutor, {}),
257     (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}),
258 ])
259 def executor(request):
260     ex_cls, kwargs = request.param
261     ex = ex_cls(**kwargs)
262     try:
263         yield ex
264     finally:
265         ex.close()
266
267
268 def test_node_operation_logging(ctx, executor):
269     interface_name, operation_name = mock.operations.NODE_OPERATIONS_INSTALL[0]
270
271     node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
272
273     arguments = {
274         'op_start': 'op_start',
275         'op_end': 'op_end',
276     }
277     interface = mock.models.create_interface(
278         node.service,
279         interface_name,
280         operation_name,
281         operation_kwargs=dict(
282             function=op_path(logged_operation, module_path=__name__),
283             arguments=arguments)
284     )
285     node.interfaces[interface.name] = interface
286     ctx.model.node.update(node)
287
288     @workflow
289     def basic_workflow(graph, **_):
290         graph.add_tasks(
291             api.task.OperationTask(
292                 node,
293                 interface_name=interface_name,
294                 operation_name=operation_name,
295                 arguments=arguments
296             )
297         )
298     execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
299     _assert_loggins(ctx, arguments)
300
301
302 def test_relationship_operation_logging(ctx, executor):
303     interface_name, operation_name = mock.operations.RELATIONSHIP_OPERATIONS_INSTALL[0]
304
305     relationship = ctx.model.relationship.list()[0]
306     arguments = {
307         'op_start': 'op_start',
308         'op_end': 'op_end',
309     }
310     interface = mock.models.create_interface(
311         relationship.source_node.service,
312         interface_name,
313         operation_name,
314         operation_kwargs=dict(function=op_path(logged_operation, module_path=__name__),
315                               arguments=arguments)
316     )
317     relationship.interfaces[interface.name] = interface
318     ctx.model.relationship.update(relationship)
319
320     @workflow
321     def basic_workflow(graph, **_):
322         graph.add_tasks(
323             api.task.OperationTask(
324                 relationship,
325                 interface_name=interface_name,
326                 operation_name=operation_name,
327                 arguments=arguments
328             )
329         )
330
331     execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
332     _assert_loggins(ctx, arguments)
333
334
335 def test_attribute_consumption(ctx, executor, dataholder):
336     # region Updating node operation
337     node_int_name, node_op_name = mock.operations.NODE_OPERATIONS_INSTALL[0]
338
339     source_node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
340
341     arguments = {'dict_': {'key': 'value'},
342                  'set_test_dict': {'key2': 'value2'}}
343     interface = mock.models.create_interface(
344         source_node.service,
345         node_int_name,
346         node_op_name,
347         operation_kwargs=dict(
348             function=op_path(attribute_altering_operation, module_path=__name__),
349             arguments=arguments)
350     )
351     source_node.interfaces[interface.name] = interface
352     ctx.model.node.update(source_node)
353     # endregion
354
355     # region updating relationship operation
356     rel_int_name, rel_op_name = mock.operations.RELATIONSHIP_OPERATIONS_INSTALL[2]
357
358     relationship = ctx.model.relationship.list()[0]
359     interface = mock.models.create_interface(
360         relationship.source_node.service,
361         rel_int_name,
362         rel_op_name,
363         operation_kwargs=dict(
364             function=op_path(attribute_consuming_operation, module_path=__name__),
365             arguments={'holder_path': dataholder.path}
366         )
367     )
368     relationship.interfaces[interface.name] = interface
369     ctx.model.relationship.update(relationship)
370     # endregion
371
372     @workflow
373     def basic_workflow(graph, **_):
374         graph.sequence(
375             api.task.OperationTask(
376                 source_node,
377                 interface_name=node_int_name,
378                 operation_name=node_op_name,
379                 arguments=arguments
380             ),
381             api.task.OperationTask(
382                 relationship,
383                 interface_name=rel_int_name,
384                 operation_name=rel_op_name,
385             )
386         )
387
388     execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
389     target_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
390
391     assert len(source_node.attributes) == len(target_node.attributes) == 2
392     assert source_node.attributes['key'] != target_node.attributes['key']
393     assert source_node.attributes['key'].value == \
394            target_node.attributes['key'].value == \
395            dataholder['key'] == 'value'
396
397     assert source_node.attributes['key2'] != target_node.attributes['key2']
398     assert source_node.attributes['key2'].value == \
399            target_node.attributes['key2'].value == \
400            dataholder['key2'] == 'value2'
401
402
403 def _assert_loggins(ctx, arguments):
404     # The logs should contain the following: Workflow Start, Operation Start, custom operation
405     # log string (op_start), custom operation log string (op_end), Operation End, Workflow End.
406
407     executions = ctx.model.execution.list()
408     assert len(executions) == 1
409     execution = executions[0]
410
411     tasks = ctx.model.task.list(filters={'_stub_type': None})
412     assert len(tasks) == 1
413     task = tasks[0]
414     assert len(task.logs) == 4
415
416     logs = ctx.model.log.list()
417     assert len(logs) == len(execution.logs) == 6
418     assert set(logs) == set(execution.logs)
419
420     assert all(l.execution == execution for l in logs)
421     assert all(l in logs and l.task == task for l in task.logs)
422
423     op_start_log = [l for l in logs if arguments['op_start'] in l.msg and l.level.lower() == 'info']
424     assert len(op_start_log) == 1
425     op_start_log = op_start_log[0]
426
427     op_end_log = [l for l in logs if arguments['op_end'] in l.msg and l.level.lower() == 'debug']
428     assert len(op_end_log) == 1
429     op_end_log = op_end_log[0]
430
431     assert op_start_log.created_at < op_end_log.created_at
432
433
434 @operation
435 def logged_operation(ctx, **_):
436     ctx.logger.info(ctx.task.arguments['op_start'].value)
437     # enables to check the relation between the created_at field properly
438     time.sleep(1)
439     ctx.logger.debug(ctx.task.arguments['op_end'].value)
440
441
442 @operation
443 def basic_node_operation(ctx, holder_path, **_):
444     holder = helpers.FilesystemDataHolder(holder_path)
445
446     operation_common(ctx, holder)
447     holder['template_name'] = ctx.node_template.name
448     holder['node_name'] = ctx.node.name
449
450
451 @operation
452 def basic_relationship_operation(ctx, holder_path, **_):
453     holder = helpers.FilesystemDataHolder(holder_path)
454
455     operation_common(ctx, holder)
456     holder['target_node_template_name'] = ctx.target_node_template.name
457     holder['target_node_name'] = ctx.target_node.name
458     holder['relationship_name'] = ctx.relationship.name
459     holder['source_node_template_name'] = ctx.source_node_template.name
460     holder['source_node_name'] = ctx.source_node.name
461
462
463 def operation_common(ctx, holder):
464     holder['ctx_name'] = ctx.__class__.__name__
465
466     holder['actor_name'] = ctx.task.actor.name
467     holder['task_name'] = ctx.task.name
468     holder['function'] = ctx.task.function
469     holder['arguments'] = dict(i.unwrapped for i in ctx.task.arguments.itervalues())
470
471
472 @operation
473 def get_node_id(ctx, holder_path, **_):
474     helpers.FilesystemDataHolder(holder_path)[ctx.name] = ctx.node.id
475
476
477 @operation
478 def _test_plugin_workdir(ctx, filename, content):
479     with open(os.path.join(ctx.plugin_workdir, filename), 'w') as f:
480         f.write(content)
481
482
483 @operation
484 def attribute_altering_operation(ctx, dict_, set_test_dict, **_):
485     ctx.node.attributes.update(dict_)
486
487     for key, value in set_test_dict.items():
488         ctx.node.attributes[key] = value
489
490
491 @operation
492 def attribute_consuming_operation(ctx, holder_path, **_):
493     holder = helpers.FilesystemDataHolder(holder_path)
494     ctx.target_node.attributes.update(ctx.source_node.attributes)
495     holder.update(**ctx.target_node.attributes)
496
497     ctx.target_node.attributes['key2'] = ctx.source_node.attributes['key2']
498     holder['key2'] = ctx.target_node.attributes['key2']