2 # Copyright (c) 2017 GigaSpaces Technologies Ltd. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
24 from aria import (workflow, operation)
25 from aria.modeling import models
26 from aria.orchestrator import events
27 from aria.orchestrator.workflows import api
28 from aria.orchestrator.workflows.exceptions import ExecutorException
29 from aria.orchestrator.workflows.executor import process
30 from aria.orchestrator.workflows.core import (engine, graph_compiler)
31 from aria.orchestrator.exceptions import (TaskAbortException, TaskRetryException)
32 from aria.utils import type as type_
35 from tests import (mock, storage, conftest)
36 from tests.orchestrator.workflows.helpers import events_collector
38 from adapters import context_adapter
41 @pytest.fixture(autouse=True)
42 def cleanup_logger(request):
43 conftest.logging_handler_cleanup(request)
46 class TestCloudifyContextAdapter(object):
48 def test_node_instance_operation(self, executor, workflow_context):
49 node_template = self._get_node_template(workflow_context)
50 node_type = 'aria.plugin.nodes.App'
51 node_instance_property = models.Property.wrap('hello', 'world')
52 node_template.type = models.Type(variant='variant', name=node_type)
53 node = self._get_node(workflow_context)
54 node_instance_attribute = models.Attribute.wrap('hello2', 'world2')
55 node.attributes[node_instance_attribute.name] = node_instance_attribute
56 node.properties[node_instance_property.name] = node_instance_property
57 workflow_context.model.node.update(node)
58 workflow_context.model.node_template.update(node_template)
60 out = self._run(executor, workflow_context, _test_node_instance_operation)
62 node_template = self._get_node_template(workflow_context)
63 node = self._get_node(workflow_context)
64 assert out['type'] == context_adapter.NODE_INSTANCE
65 assert out['node']['id'] == node_template.id
66 assert out['node']['name'] == node_template.name
67 assert out['node']['properties'] == \
68 {node_instance_property.name: node_instance_property.value}
69 assert out['node']['type'] == node_type
70 assert out['node']['type_hierarchy'] == ['cloudify.plugin.nodes.App']
71 assert out['instance']['id'] == node.id
72 assert out['instance']['runtime_properties'] == \
73 {node_instance_attribute.name: node_instance_attribute.value}
74 assert not out['source']
75 assert not out['target']
77 def test_node_instance_relationships(self, executor, workflow_context):
78 relationship_node_template = self._get_dependency_node_template(workflow_context)
79 relationship_node_instance = self._get_dependency_node(workflow_context)
80 relationship = relationship_node_instance.inbound_relationships[0]
81 relationship_type = models.Type(variant='variant', name='test.relationships.Relationship')
82 relationship.type = relationship_type
83 workflow_context.model.relationship.update(relationship)
85 out = self._run(executor, workflow_context, _test_node_instance_relationships)
87 assert len(out['instance']['relationships']) == 1
88 relationship = out['instance']['relationships'][0]
89 assert relationship['type'] == relationship_type.name
90 assert relationship['type_hierarchy'] == [relationship_type.name]
91 assert relationship['target']['node']['id'] == relationship_node_template.id
92 assert relationship['target']['instance']['id'] == relationship_node_instance.id
94 def test_source_operation(self, executor, workflow_context):
95 self._test_relationship_operation(executor, workflow_context, operation_end='source')
97 def test_target_operation(self, executor, workflow_context):
98 self._test_relationship_operation(executor, workflow_context, operation_end='target')
100 def _test_relationship_operation(self, executor, workflow_context, operation_end):
102 executor, workflow_context, _test_relationship_operation, operation_end=operation_end)
104 source_node = self._get_node_template(workflow_context)
105 source_node_instance = self._get_node(workflow_context)
106 target_node = self._get_dependency_node_template(workflow_context)
107 target_node_instance = self._get_dependency_node(workflow_context)
108 assert out['type'] == context_adapter.RELATIONSHIP_INSTANCE
109 assert out['source']['node']['id'] == source_node.id
110 assert out['source']['instance']['id'] == source_node_instance.id
111 assert out['target']['node']['id'] == target_node.id
112 assert out['target']['instance']['id'] == target_node_instance.id
113 assert not out['node']
114 assert not out['instance']
116 def test_host_ip(self, executor, workflow_context):
117 node = self._get_node_template(workflow_context)
118 node.type_hierarchy = ['aria.nodes.Compute']
119 node_instance = self._get_node(workflow_context)
120 node_instance.host_fk = node_instance.id
121 node_instance_ip = '120.120.120.120'
122 node_instance.attributes['ip'] = models.Attribute.wrap('ip', node_instance_ip)
123 workflow_context.model.node_template.update(node)
124 workflow_context.model.node.update(node_instance)
126 out = self._run(executor, workflow_context, _test_host_ip)
128 assert out['instance']['host_ip'] == node_instance_ip
130 def test_get_and_download_resource_and_render(self, tmpdir, executor, workflow_context):
131 resource_path = 'resource'
133 content = '{{ctx.service.name}}-{{variable}}'
134 rendered = '{0}-{1}'.format(workflow_context.service.name, variable)
135 source = tmpdir.join(resource_path)
136 source.write(content)
137 workflow_context.resource.service.upload(
138 entry_id=str(workflow_context.service.id),
142 out = self._run(executor, workflow_context, _test_get_and_download_resource_and_render,
143 inputs={'resource': resource_path,
144 'variable': variable})
146 assert out['get_resource'] == content
147 assert out['get_resource_and_render'] == rendered
148 with open(out['download_resource'], 'rb') as f:
149 assert f.read() == content
150 with open(out['download_resource_and_render'], 'rb') as f:
151 assert f.read() == rendered
153 os.remove(out['download_resource'])
154 os.remove(out['download_resource_and_render'])
156 def test_retry(self, executor, workflow_context):
157 message = 'retry-message'
158 retry_interval = 0.01
160 exception = self._run_and_get_task_exceptions(
161 executor, workflow_context, _test_retry,
162 inputs={'message': message, 'retry_interval': retry_interval},
166 assert isinstance(exception, TaskRetryException)
167 assert exception.message == message
168 assert exception.retry_interval == retry_interval
170 out = self._get_node(workflow_context).attributes['out'].value
171 assert out['operation']['retry_number'] == 1
172 assert out['operation']['max_retries'] == 1
174 def test_logger_and_send_event(self, executor, workflow_context):
175 # TODO: add assertions of output once process executor output can be captured
176 message = 'logger-message'
177 event = 'event-message'
178 self._run(executor, workflow_context, _test_logger_and_send_event,
179 inputs={'message': message, 'event': event})
181 def test_plugin(self, executor, workflow_context, tmpdir):
182 plugin = self._put_plugin(workflow_context)
183 out = self._run(executor, workflow_context, _test_plugin, plugin=plugin)
185 expected_workdir = tmpdir.join(
186 'workdir', 'plugins', str(workflow_context.service.id), plugin.name)
187 assert out['plugin']['name'] == plugin.name
188 assert out['plugin']['package_name'] == plugin.package_name
189 assert out['plugin']['package_version'] == plugin.package_version
190 assert out['plugin']['workdir'] == str(expected_workdir)
192 def test_importable_ctx_and_inputs(self, executor, workflow_context):
193 test_inputs = {'input1': 1, 'input2': 2}
194 plugin = self._put_plugin(workflow_context, mock_cfy_plugin=True)
196 out = self._run(executor, workflow_context, _test_importable_ctx_and_inputs,
198 skip_common_assert=True,
200 assert out['inputs'] == test_inputs
202 def test_non_recoverable_error(self, executor, workflow_context):
203 message = 'NON_RECOVERABLE_MESSAGE'
204 plugin = self._put_plugin(workflow_context, mock_cfy_plugin=True)
206 exception = self._run_and_get_task_exceptions(
207 executor, workflow_context, _test_non_recoverable_error,
208 inputs={'message': message},
209 skip_common_assert=True,
212 assert isinstance(exception, TaskAbortException)
213 assert exception.message == message
215 def test_recoverable_error(self, executor, workflow_context):
216 message = 'RECOVERABLE_MESSAGE'
217 plugin = self._put_plugin(workflow_context, mock_cfy_plugin=True)
219 retry_interval = 0.01
220 exception = self._run_and_get_task_exceptions(
221 executor, workflow_context, _test_recoverable_error,
222 inputs={'message': message, 'retry_interval': retry_interval},
223 skip_common_assert=True,
226 assert isinstance(exception, TaskRetryException)
227 assert message in exception.message
228 assert exception.retry_interval == retry_interval
230 def _test_common(self, out, workflow_context):
231 assert out['execution_id'] == workflow_context.execution.id
232 assert out['workflow_id'] == workflow_context.execution.workflow_name
233 assert out['rest_token'] is None
234 assert out['task_id'][0] == out['task_id'][1]
235 assert out['task_name'][0] == out['task_name'][1]
236 assert out['task_target'] is None
237 assert out['task_queue'] is None
238 assert out['provider_context'] == {}
239 assert out['blueprint']['id'] == workflow_context.service_template.id
240 assert out['deployment']['id'] == workflow_context.service.id
241 assert out['operation']['name'][0] == out['operation']['name'][1]
242 assert out['operation']['retry_number'][0] == out['operation']['retry_number'][1]
243 assert out['operation']['max_retries'][0] == out['operation']['max_retries'][1] - 1
244 assert out['bootstrap_context']['resources_prefix'] == ''
245 assert out['bootstrap_context']['broker_config'] == {}
246 assert out['bootstrap_context']['cloudify_agent']['any'] is None
247 assert out['agent']['init_script'] is None
255 skip_common_assert=False,
258 interface_name = 'test'
259 operation_name = 'op'
260 op_dict = {'function': '{0}.{1}'.format(__name__, func.__name__),
262 'arguments': inputs or {}}
263 node = self._get_node(workflow_context)
266 actor = relationship = node.outbound_relationships[0]
267 relationship.interfaces[interface_name] = mock.models.create_interface(
268 relationship.source_node.service,
271 operation_kwargs=op_dict
273 workflow_context.model.relationship.update(relationship)
277 node.interfaces[interface_name] = mock.models.create_interface(
281 operation_kwargs=op_dict
283 workflow_context.model.node.update(node)
287 actor.interfaces[interface_name].operations[operation_name].inputs
288 for input_name, input in inputs.iteritems():
289 operation_inputs[input_name] = models.Input(name=input_name,
290 type_name=type_.full_type_name(input))
293 def mock_workflow(graph, **kwargs):
294 task = api.task.OperationTask(
298 arguments=inputs or {},
299 max_attempts=max_attempts
301 graph.add_tasks(task)
303 tasks_graph = mock_workflow(ctx=workflow_context)
304 graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(tasks_graph)
305 eng = engine.Engine(executors={executor.__class__: executor})
306 eng.execute(workflow_context)
307 out = self._get_node(workflow_context).attributes['out'].value
308 if not skip_common_assert:
309 self._test_common(out, workflow_context)
312 def _get_dependency_node_template(self, workflow_context):
313 return workflow_context.model.node_template.get_by_name(
314 mock.models.DEPENDENCY_NODE_TEMPLATE_NAME)
316 def _get_dependency_node(self, workflow_context):
317 return workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
319 def _get_node_template(self, workflow_context):
320 return workflow_context.model.node_template.get_by_name(
321 mock.models.DEPENDENT_NODE_TEMPLATE_NAME)
323 def _get_node(self, workflow_context):
324 return workflow_context.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
326 def _run_and_get_task_exceptions(self, *args, **kwargs):
327 signal = events.on_failure_task_signal
328 with events_collector(signal) as collected:
329 with pytest.raises(ExecutorException):
330 self._run(*args, **kwargs)
331 return [event['kwargs']['exception'] for event in collected[signal]]
335 result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
340 def workflow_context(self, tmpdir):
341 result = mock.context.simple(
343 context_kwargs=dict(workdir=str(tmpdir.join('workdir')))
346 storage.release_sqlite_storage(result.model)
348 def _put_plugin(self, workflow_context, mock_cfy_plugin=False):
350 archive_name = 'ARCHIVE'
351 package_name = 'PACKAGE'
352 package_version = '0.1.1'
354 plugin = models.Plugin(
356 archive_name=archive_name,
357 package_name=package_name,
358 package_version=package_version,
359 uploaded_at=datetime.datetime.now(),
360 wheels=['cloudify_plugins_common'] if mock_cfy_plugin else []
363 workflow_context.model.plugin.put(plugin)
369 def _test_node_instance_operation(ctx):
370 with _adapter(ctx) as (adapter, out):
372 instance = adapter.instance
377 'properties': copy.deepcopy(node.properties),
379 'type_hierarchy': node.type_hierarchy
383 'runtime_properties': copy.deepcopy(instance.runtime_properties)
387 assert adapter.source
389 except TaskAbortException:
390 out['source'] = False
392 assert adapter.target
394 except TaskAbortException:
395 out['target'] = False
399 def _test_node_instance_relationships(ctx):
400 with _adapter(ctx) as (adapter, out):
401 relationships = [{'type': r.type,
402 'type_hierarchy': [t.name for t in r.type_hierarchy],
403 'target': {'node': {'id': r.target.node.id},
404 'instance': {'id': r.target.instance.id}}}
405 for r in adapter.instance.relationships]
406 out['instance'] = {'relationships': relationships}
410 def _test_relationship_operation(ctx):
411 with _adapter(ctx) as (adapter, out):
413 'source': {'node': {'id': adapter.source.node.id},
414 'instance': {'id': adapter.source.instance.id}},
415 'target': {'node': {'id': adapter.target.node.id},
416 'instance': {'id': adapter.target.instance.id}}
421 except TaskAbortException:
424 assert adapter.instance
425 out['instance'] = True
426 except TaskAbortException:
427 out['instance'] = False
431 def _test_host_ip(ctx):
432 with _adapter(ctx) as (adapter, out):
433 out['instance'] = {'host_ip': adapter.instance.host_ip}
437 def _test_get_and_download_resource_and_render(ctx, resource, variable):
438 with _adapter(ctx) as (adapter, out):
440 'get_resource': adapter.get_resource(resource),
441 'get_resource_and_render': adapter.get_resource_and_render(
442 resource, template_variables={'variable': variable}
444 'download_resource': adapter.download_resource(resource),
445 'download_resource_and_render': adapter.download_resource_and_render(
446 resource, template_variables={'variable': variable}
452 def _test_retry(ctx, message, retry_interval):
453 with _adapter(ctx) as (adapter, out):
454 op = adapter.operation
455 out['operation'] = {'retry_number': op.retry_number, 'max_retries': op.max_retries}
456 op.retry(message, retry_after=retry_interval)
460 def _test_logger_and_send_event(ctx, message, event):
461 with _adapter(ctx) as (adapter, _):
462 adapter.logger.info(message)
463 adapter.send_event(event)
467 def _test_plugin(ctx):
468 with _adapter(ctx) as (adapter, out):
469 plugin = adapter.plugin
472 'package_name': plugin.package_name,
473 'package_version': plugin.package_version,
474 'workdir': plugin.workdir
479 def _test_importable_ctx_and_inputs(**_):
480 from cloudify import ctx
481 from cloudify.state import ctx_parameters
482 ctx.instance.runtime_properties['out'] = {'inputs': dict(ctx_parameters)}
486 def _test_non_recoverable_error(message, **_):
487 from cloudify.exceptions import NonRecoverableError
488 raise NonRecoverableError(message)
492 def _test_recoverable_error(message, retry_interval, **_):
493 from cloudify.exceptions import RecoverableError
494 raise RecoverableError(message, retry_interval)
497 def _test_common(out, ctx, adapter):
498 op = adapter.operation
499 bootstrap_context = adapter.bootstrap_context
501 'type': adapter.type,
502 'execution_id': adapter.execution_id,
503 'workflow_id': adapter.workflow_id,
504 'rest_token': adapter.rest_token,
505 'task_id': (adapter.task_id, ctx.task.id),
506 'task_name': (adapter.task_name, ctx.task.function),
507 'task_target': adapter.task_target,
508 'task_queue': adapter.task_queue,
509 'provider_context': adapter.provider_context,
510 'blueprint': {'id': adapter.blueprint.id},
511 'deployment': {'id': adapter.deployment.id},
513 'name': [op.name, ctx.name.split('@')[0].replace(':', '.')],
514 'retry_number': [op.retry_number, ctx.task.attempts_count - 1],
515 'max_retries': [op.max_retries, ctx.task.max_attempts]
517 'bootstrap_context': {
518 'broker_config': bootstrap_context.broker_config('arg1', 'arg2', arg3='arg3'),
519 # All attribute access of cloudify_agent returns none
520 'cloudify_agent': {'any': bootstrap_context.cloudify_agent.any},
521 'resources_prefix': bootstrap_context.resources_prefix
524 'init_script': adapter.agent.init_script('arg1', 'arg2', arg3='arg3')
529 @contextlib.contextmanager
532 adapter = context_adapter.CloudifyContextAdapter(ctx)
533 _test_common(out, ctx, adapter)
538 instance = adapter.instance
539 except TaskAbortException:
540 instance = adapter.source.instance
541 instance.runtime_properties['out'] = out