vFW and vDNS support added to azure-plugin
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / aria_extension_tests / adapters / test_context_adapter.py
1 #
2 # Copyright (c) 2017 GigaSpaces Technologies Ltd. All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
14 # under the License.
15 #
16
17 import os
18 import copy
19 import datetime
20 import contextlib
21
22 import pytest
23
24 from aria import (workflow, operation)
25 from aria.modeling import models
26 from aria.orchestrator import events
27 from aria.orchestrator.workflows import api
28 from aria.orchestrator.workflows.exceptions import ExecutorException
29 from aria.orchestrator.workflows.executor import process
30 from aria.orchestrator.workflows.core import (engine, graph_compiler)
31 from aria.orchestrator.exceptions import (TaskAbortException, TaskRetryException)
32 from aria.utils import type as type_
33
34 import tests
35 from tests import (mock, storage, conftest)
36 from tests.orchestrator.workflows.helpers import events_collector
37
38 from adapters import context_adapter
39
40
41 @pytest.fixture(autouse=True)
42 def cleanup_logger(request):
43     conftest.logging_handler_cleanup(request)
44
45
46 class TestCloudifyContextAdapter(object):
47
48     def test_node_instance_operation(self, executor, workflow_context):
49         node_template = self._get_node_template(workflow_context)
50         node_type = 'aria.plugin.nodes.App'
51         node_instance_property = models.Property.wrap('hello', 'world')
52         node_template.type = models.Type(variant='variant', name=node_type)
53         node = self._get_node(workflow_context)
54         node_instance_attribute = models.Attribute.wrap('hello2', 'world2')
55         node.attributes[node_instance_attribute.name] = node_instance_attribute
56         node.properties[node_instance_property.name] = node_instance_property
57         workflow_context.model.node.update(node)
58         workflow_context.model.node_template.update(node_template)
59
60         out = self._run(executor, workflow_context, _test_node_instance_operation)
61
62         node_template = self._get_node_template(workflow_context)
63         node = self._get_node(workflow_context)
64         assert out['type'] == context_adapter.NODE_INSTANCE
65         assert out['node']['id'] == node_template.id
66         assert out['node']['name'] == node_template.name
67         assert out['node']['properties'] == \
68                {node_instance_property.name: node_instance_property.value}
69         assert out['node']['type'] == node_type
70         assert out['node']['type_hierarchy'] == ['cloudify.plugin.nodes.App']
71         assert out['instance']['id'] == node.id
72         assert out['instance']['runtime_properties'] == \
73                {node_instance_attribute.name: node_instance_attribute.value}
74         assert not out['source']
75         assert not out['target']
76
77     def test_node_instance_relationships(self, executor, workflow_context):
78         relationship_node_template = self._get_dependency_node_template(workflow_context)
79         relationship_node_instance = self._get_dependency_node(workflow_context)
80         relationship = relationship_node_instance.inbound_relationships[0]
81         relationship_type = models.Type(variant='variant', name='test.relationships.Relationship')
82         relationship.type = relationship_type
83         workflow_context.model.relationship.update(relationship)
84
85         out = self._run(executor, workflow_context, _test_node_instance_relationships)
86
87         assert len(out['instance']['relationships']) == 1
88         relationship = out['instance']['relationships'][0]
89         assert relationship['type'] == relationship_type.name
90         assert relationship['type_hierarchy'] == [relationship_type.name]
91         assert relationship['target']['node']['id'] == relationship_node_template.id
92         assert relationship['target']['instance']['id'] == relationship_node_instance.id
93
94     def test_source_operation(self, executor, workflow_context):
95         self._test_relationship_operation(executor, workflow_context, operation_end='source')
96
97     def test_target_operation(self, executor, workflow_context):
98         self._test_relationship_operation(executor, workflow_context, operation_end='target')
99
100     def _test_relationship_operation(self, executor, workflow_context, operation_end):
101         out = self._run(
102             executor, workflow_context, _test_relationship_operation, operation_end=operation_end)
103
104         source_node = self._get_node_template(workflow_context)
105         source_node_instance = self._get_node(workflow_context)
106         target_node = self._get_dependency_node_template(workflow_context)
107         target_node_instance = self._get_dependency_node(workflow_context)
108         assert out['type'] == context_adapter.RELATIONSHIP_INSTANCE
109         assert out['source']['node']['id'] == source_node.id
110         assert out['source']['instance']['id'] == source_node_instance.id
111         assert out['target']['node']['id'] == target_node.id
112         assert out['target']['instance']['id'] == target_node_instance.id
113         assert not out['node']
114         assert not out['instance']
115
116     def test_host_ip(self, executor, workflow_context):
117         node = self._get_node_template(workflow_context)
118         node.type_hierarchy = ['aria.nodes.Compute']
119         node_instance = self._get_node(workflow_context)
120         node_instance.host_fk = node_instance.id
121         node_instance_ip = '120.120.120.120'
122         node_instance.attributes['ip'] = models.Attribute.wrap('ip', node_instance_ip)
123         workflow_context.model.node_template.update(node)
124         workflow_context.model.node.update(node_instance)
125
126         out = self._run(executor, workflow_context, _test_host_ip)
127
128         assert out['instance']['host_ip'] == node_instance_ip
129
130     def test_get_and_download_resource_and_render(self, tmpdir, executor, workflow_context):
131         resource_path = 'resource'
132         variable = 'VALUE'
133         content = '{{ctx.service.name}}-{{variable}}'
134         rendered = '{0}-{1}'.format(workflow_context.service.name, variable)
135         source = tmpdir.join(resource_path)
136         source.write(content)
137         workflow_context.resource.service.upload(
138             entry_id=str(workflow_context.service.id),
139             source=str(source),
140             path=resource_path)
141
142         out = self._run(executor, workflow_context, _test_get_and_download_resource_and_render,
143                         inputs={'resource': resource_path,
144                                 'variable': variable})
145
146         assert out['get_resource'] == content
147         assert out['get_resource_and_render'] == rendered
148         with open(out['download_resource'], 'rb') as f:
149             assert f.read() == content
150         with open(out['download_resource_and_render'], 'rb') as f:
151             assert f.read() == rendered
152
153         os.remove(out['download_resource'])
154         os.remove(out['download_resource_and_render'])
155
156     def test_retry(self, executor, workflow_context):
157         message = 'retry-message'
158         retry_interval = 0.01
159
160         exception = self._run_and_get_task_exceptions(
161             executor, workflow_context, _test_retry,
162             inputs={'message': message, 'retry_interval': retry_interval},
163             max_attempts=2
164         )[-1]
165
166         assert isinstance(exception, TaskRetryException)
167         assert exception.message == message
168         assert exception.retry_interval == retry_interval
169
170         out = self._get_node(workflow_context).attributes['out'].value
171         assert out['operation']['retry_number'] == 1
172         assert out['operation']['max_retries'] == 1
173
174     def test_logger_and_send_event(self, executor, workflow_context):
175         # TODO: add assertions of output once process executor output can be captured
176         message = 'logger-message'
177         event = 'event-message'
178         self._run(executor, workflow_context, _test_logger_and_send_event,
179                   inputs={'message': message, 'event': event})
180
181     def test_plugin(self, executor, workflow_context, tmpdir):
182         plugin = self._put_plugin(workflow_context)
183         out = self._run(executor, workflow_context, _test_plugin, plugin=plugin)
184
185         expected_workdir = tmpdir.join(
186             'workdir', 'plugins', str(workflow_context.service.id), plugin.name)
187         assert out['plugin']['name'] == plugin.name
188         assert out['plugin']['package_name'] == plugin.package_name
189         assert out['plugin']['package_version'] == plugin.package_version
190         assert out['plugin']['workdir'] == str(expected_workdir)
191
192     def test_importable_ctx_and_inputs(self, executor, workflow_context):
193         test_inputs = {'input1': 1, 'input2': 2}
194         plugin = self._put_plugin(workflow_context, mock_cfy_plugin=True)
195
196         out = self._run(executor, workflow_context, _test_importable_ctx_and_inputs,
197                         inputs=test_inputs,
198                         skip_common_assert=True,
199                         plugin=plugin)
200         assert out['inputs'] == test_inputs
201
202     def test_non_recoverable_error(self, executor, workflow_context):
203         message = 'NON_RECOVERABLE_MESSAGE'
204         plugin = self._put_plugin(workflow_context, mock_cfy_plugin=True)
205
206         exception = self._run_and_get_task_exceptions(
207             executor, workflow_context, _test_non_recoverable_error,
208             inputs={'message': message},
209             skip_common_assert=True,
210             plugin=plugin
211         )[0]
212         assert isinstance(exception, TaskAbortException)
213         assert exception.message == message
214
215     def test_recoverable_error(self, executor, workflow_context):
216         message = 'RECOVERABLE_MESSAGE'
217         plugin = self._put_plugin(workflow_context, mock_cfy_plugin=True)
218
219         retry_interval = 0.01
220         exception = self._run_and_get_task_exceptions(
221             executor, workflow_context, _test_recoverable_error,
222             inputs={'message': message, 'retry_interval': retry_interval},
223             skip_common_assert=True,
224             plugin=plugin
225         )[0]
226         assert isinstance(exception, TaskRetryException)
227         assert message in exception.message
228         assert exception.retry_interval == retry_interval
229
230     def _test_common(self, out, workflow_context):
231         assert out['execution_id'] == workflow_context.execution.id
232         assert out['workflow_id'] == workflow_context.execution.workflow_name
233         assert out['rest_token'] is None
234         assert out['task_id'][0] == out['task_id'][1]
235         assert out['task_name'][0] == out['task_name'][1]
236         assert out['task_target'] is None
237         assert out['task_queue'] is None
238         assert out['provider_context'] == {}
239         assert out['blueprint']['id'] == workflow_context.service_template.id
240         assert out['deployment']['id'] == workflow_context.service.id
241         assert out['operation']['name'][0] == out['operation']['name'][1]
242         assert out['operation']['retry_number'][0] == out['operation']['retry_number'][1]
243         assert out['operation']['max_retries'][0] == out['operation']['max_retries'][1] - 1
244         assert out['bootstrap_context']['resources_prefix'] == ''
245         assert out['bootstrap_context']['broker_config'] == {}
246         assert out['bootstrap_context']['cloudify_agent']['any'] is None
247         assert out['agent']['init_script'] is None
248
249     def _run(self,
250              executor,
251              workflow_context,
252              func,
253              inputs=None,
254              max_attempts=None,
255              skip_common_assert=False,
256              operation_end=None,
257              plugin=None):
258         interface_name = 'test'
259         operation_name = 'op'
260         op_dict = {'function': '{0}.{1}'.format(__name__, func.__name__),
261                    'plugin': plugin,
262                    'arguments': inputs or {}}
263         node = self._get_node(workflow_context)
264
265         if operation_end:
266             actor = relationship = node.outbound_relationships[0]
267             relationship.interfaces[interface_name] = mock.models.create_interface(
268                 relationship.source_node.service,
269                 interface_name,
270                 operation_name,
271                 operation_kwargs=op_dict
272             )
273             workflow_context.model.relationship.update(relationship)
274
275         else:
276             actor = node
277             node.interfaces[interface_name] = mock.models.create_interface(
278                 node.service,
279                 interface_name,
280                 operation_name,
281                 operation_kwargs=op_dict
282             )
283             workflow_context.model.node.update(node)
284
285         if inputs:
286             operation_inputs = \
287                 actor.interfaces[interface_name].operations[operation_name].inputs
288             for input_name, input in inputs.iteritems():
289                 operation_inputs[input_name] = models.Input(name=input_name,
290                                                             type_name=type_.full_type_name(input))
291
292         @workflow
293         def mock_workflow(graph, **kwargs):
294             task = api.task.OperationTask(
295                 actor,
296                 interface_name,
297                 operation_name,
298                 arguments=inputs or {},
299                 max_attempts=max_attempts
300             )
301             graph.add_tasks(task)
302
303         tasks_graph = mock_workflow(ctx=workflow_context)
304         graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(tasks_graph)
305         eng = engine.Engine(executors={executor.__class__: executor})
306         eng.execute(workflow_context)
307         out = self._get_node(workflow_context).attributes['out'].value
308         if not skip_common_assert:
309             self._test_common(out, workflow_context)
310         return out
311
312     def _get_dependency_node_template(self, workflow_context):
313         return workflow_context.model.node_template.get_by_name(
314             mock.models.DEPENDENCY_NODE_TEMPLATE_NAME)
315
316     def _get_dependency_node(self, workflow_context):
317         return workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
318
319     def _get_node_template(self, workflow_context):
320         return workflow_context.model.node_template.get_by_name(
321             mock.models.DEPENDENT_NODE_TEMPLATE_NAME)
322
323     def _get_node(self, workflow_context):
324         return workflow_context.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
325
326     def _run_and_get_task_exceptions(self, *args, **kwargs):
327         signal = events.on_failure_task_signal
328         with events_collector(signal) as collected:
329             with pytest.raises(ExecutorException):
330                 self._run(*args, **kwargs)
331         return [event['kwargs']['exception'] for event in collected[signal]]
332
333     @pytest.fixture
334     def executor(self):
335         result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
336         yield result
337         result.close()
338
339     @pytest.fixture
340     def workflow_context(self, tmpdir):
341         result = mock.context.simple(
342             str(tmpdir),
343             context_kwargs=dict(workdir=str(tmpdir.join('workdir')))
344         )
345         yield result
346         storage.release_sqlite_storage(result.model)
347
348     def _put_plugin(self, workflow_context, mock_cfy_plugin=False):
349         name = 'PLUGIN'
350         archive_name = 'ARCHIVE'
351         package_name = 'PACKAGE'
352         package_version = '0.1.1'
353
354         plugin = models.Plugin(
355             name=name,
356             archive_name=archive_name,
357             package_name=package_name,
358             package_version=package_version,
359             uploaded_at=datetime.datetime.now(),
360             wheels=['cloudify_plugins_common'] if mock_cfy_plugin else []
361         )
362
363         workflow_context.model.plugin.put(plugin)
364
365         return plugin
366
367
368 @operation
369 def _test_node_instance_operation(ctx):
370     with _adapter(ctx) as (adapter, out):
371         node = adapter.node
372         instance = adapter.instance
373         out.update({
374             'node': {
375                 'id': node.id,
376                 'name': node.name,
377                 'properties': copy.deepcopy(node.properties),
378                 'type': node.type,
379                 'type_hierarchy': node.type_hierarchy
380             },
381             'instance': {
382                 'id': instance.id,
383                 'runtime_properties': copy.deepcopy(instance.runtime_properties)
384             }
385         })
386         try:
387             assert adapter.source
388             out['source'] = True
389         except TaskAbortException:
390             out['source'] = False
391         try:
392             assert adapter.target
393             out['target'] = True
394         except TaskAbortException:
395             out['target'] = False
396
397
398 @operation
399 def _test_node_instance_relationships(ctx):
400     with _adapter(ctx) as (adapter, out):
401         relationships = [{'type': r.type,
402                           'type_hierarchy': [t.name for t in r.type_hierarchy],
403                           'target': {'node': {'id': r.target.node.id},
404                                      'instance': {'id': r.target.instance.id}}}
405                          for r in adapter.instance.relationships]
406         out['instance'] = {'relationships': relationships}
407
408
409 @operation
410 def _test_relationship_operation(ctx):
411     with _adapter(ctx) as (adapter, out):
412         out.update({
413             'source': {'node': {'id': adapter.source.node.id},
414                        'instance': {'id': adapter.source.instance.id}},
415             'target': {'node': {'id': adapter.target.node.id},
416                        'instance': {'id': adapter.target.instance.id}}
417         })
418         try:
419             assert adapter.node
420             out['node'] = True
421         except TaskAbortException:
422             out['node'] = False
423         try:
424             assert adapter.instance
425             out['instance'] = True
426         except TaskAbortException:
427             out['instance'] = False
428
429
430 @operation
431 def _test_host_ip(ctx):
432     with _adapter(ctx) as (adapter, out):
433         out['instance'] = {'host_ip': adapter.instance.host_ip}
434
435
436 @operation
437 def _test_get_and_download_resource_and_render(ctx, resource, variable):
438     with _adapter(ctx) as (adapter, out):
439         out.update({
440             'get_resource': adapter.get_resource(resource),
441             'get_resource_and_render': adapter.get_resource_and_render(
442                 resource, template_variables={'variable': variable}
443             ),
444             'download_resource': adapter.download_resource(resource),
445             'download_resource_and_render': adapter.download_resource_and_render(
446                 resource, template_variables={'variable': variable}
447             )
448         })
449
450
451 @operation
452 def _test_retry(ctx, message, retry_interval):
453     with _adapter(ctx) as (adapter, out):
454         op = adapter.operation
455         out['operation'] = {'retry_number': op.retry_number, 'max_retries': op.max_retries}
456         op.retry(message, retry_after=retry_interval)
457
458
459 @operation
460 def _test_logger_and_send_event(ctx, message, event):
461     with _adapter(ctx) as (adapter, _):
462         adapter.logger.info(message)
463         adapter.send_event(event)
464
465
466 @operation
467 def _test_plugin(ctx):
468     with _adapter(ctx) as (adapter, out):
469         plugin = adapter.plugin
470         out['plugin'] = {
471             'name': plugin.name,
472             'package_name': plugin.package_name,
473             'package_version': plugin.package_version,
474             'workdir': plugin.workdir
475         }
476
477
478 @operation
479 def _test_importable_ctx_and_inputs(**_):
480     from cloudify import ctx
481     from cloudify.state import ctx_parameters
482     ctx.instance.runtime_properties['out'] = {'inputs': dict(ctx_parameters)}
483
484
485 @operation
486 def _test_non_recoverable_error(message, **_):
487     from cloudify.exceptions import NonRecoverableError
488     raise NonRecoverableError(message)
489
490
491 @operation
492 def _test_recoverable_error(message, retry_interval, **_):
493     from cloudify.exceptions import RecoverableError
494     raise RecoverableError(message, retry_interval)
495
496
497 def _test_common(out, ctx, adapter):
498     op = adapter.operation
499     bootstrap_context = adapter.bootstrap_context
500     out.update({
501         'type': adapter.type,
502         'execution_id': adapter.execution_id,
503         'workflow_id': adapter.workflow_id,
504         'rest_token': adapter.rest_token,
505         'task_id': (adapter.task_id, ctx.task.id),
506         'task_name': (adapter.task_name, ctx.task.function),
507         'task_target': adapter.task_target,
508         'task_queue': adapter.task_queue,
509         'provider_context': adapter.provider_context,
510         'blueprint': {'id': adapter.blueprint.id},
511         'deployment': {'id': adapter.deployment.id},
512         'operation': {
513             'name': [op.name, ctx.name.split('@')[0].replace(':', '.')],
514             'retry_number': [op.retry_number, ctx.task.attempts_count - 1],
515             'max_retries': [op.max_retries, ctx.task.max_attempts]
516         },
517         'bootstrap_context': {
518             'broker_config': bootstrap_context.broker_config('arg1', 'arg2', arg3='arg3'),
519             # All attribute access of cloudify_agent returns none
520             'cloudify_agent': {'any': bootstrap_context.cloudify_agent.any},
521             'resources_prefix': bootstrap_context.resources_prefix
522         },
523         'agent': {
524             'init_script': adapter.agent.init_script('arg1', 'arg2', arg3='arg3')
525         }
526     })
527
528
529 @contextlib.contextmanager
530 def _adapter(ctx):
531     out = {}
532     adapter = context_adapter.CloudifyContextAdapter(ctx)
533     _test_common(out, ctx, adapter)
534     try:
535         yield adapter, out
536     finally:
537         try:
538             instance = adapter.instance
539         except TaskAbortException:
540             instance = adapter.source.instance
541         instance.runtime_properties['out'] = out