vFW and vDNS support added to azure-plugin
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / tests / orchestrator / context / test_workflow.py
1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements.  See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License.  You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 from datetime import datetime
17
18 import pytest
19
20 from aria import application_model_storage, workflow
21 from aria.orchestrator import context
22 from aria.storage import sql_mapi
23 from aria.orchestrator.workflows.executor import thread, process
24
25 from tests import storage as test_storage, ROOT_DIR
26 from ... import mock
27 from . import execute
28
29
30 class TestWorkflowContext(object):
31
32     def test_execution_creation_on_workflow_context_creation(self, storage):
33         ctx = self._create_ctx(storage)
34         execution = storage.execution.get(ctx.execution.id)             # pylint: disable=no-member
35         assert execution.service == storage.service.get_by_name(
36             mock.models.SERVICE_NAME)
37         assert execution.workflow_name == mock.models.WORKFLOW_NAME
38         assert execution.service_template == storage.service_template.get_by_name(
39             mock.models.SERVICE_TEMPLATE_NAME)
40         assert execution.status == storage.execution.model_cls.PENDING
41         assert execution.inputs == {}
42         assert execution.created_at <= datetime.utcnow()
43
44     def test_subsequent_workflow_context_creation_do_not_fail(self, storage):
45         self._create_ctx(storage)
46         self._create_ctx(storage)
47
48     @staticmethod
49     def _create_ctx(storage):
50         """
51
52         :param storage:
53         :return WorkflowContext:
54         """
55         service = storage.service.get_by_name(mock.models.SERVICE_NAME)
56         return context.workflow.WorkflowContext(
57             name='simple_context',
58             model_storage=storage,
59             resource_storage=None,
60             service_id=service,
61             execution_id=storage.execution.list(filters=dict(service=service))[0].id,
62             workflow_name=mock.models.WORKFLOW_NAME,
63             task_max_attempts=mock.models.TASK_MAX_ATTEMPTS,
64             task_retry_interval=mock.models.TASK_RETRY_INTERVAL
65         )
66
67     @pytest.fixture
68     def storage(self):
69         workflow_storage = application_model_storage(
70             sql_mapi.SQLAlchemyModelAPI, initiator=test_storage.init_inmemory_model_storage)
71         workflow_storage.service_template.put(mock.models.create_service_template())
72         service_template = workflow_storage.service_template.get_by_name(
73             mock.models.SERVICE_TEMPLATE_NAME)
74         service = mock.models.create_service(service_template)
75         workflow_storage.service.put(service)
76         workflow_storage.execution.put(mock.models.create_execution(service))
77         yield workflow_storage
78         test_storage.release_sqlite_storage(workflow_storage)
79
80
81 @pytest.fixture
82 def ctx(tmpdir):
83     context = mock.context.simple(
84         str(tmpdir),
85         context_kwargs=dict(workdir=str(tmpdir.join('workdir')))
86     )
87     yield context
88     test_storage.release_sqlite_storage(context.model)
89
90
91 @pytest.fixture(params=[
92     (thread.ThreadExecutor, {}),
93     (process.ProcessExecutor, {'python_path': [ROOT_DIR]}),
94 ])
95 def executor(request):
96     executor_cls, executor_kwargs = request.param
97     result = executor_cls(**executor_kwargs)
98     try:
99         yield result
100     finally:
101         result.close()
102
103
104 def test_attribute_consumption(ctx, executor):
105
106     node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
107     node.attributes['key'] = ctx.model.attribute.model_cls.wrap('key', 'value')
108     node.attributes['key2'] = ctx.model.attribute.model_cls.wrap('key2', 'value_to_change')
109     ctx.model.node.update(node)
110
111     assert node.attributes['key'].value == 'value'
112     assert node.attributes['key2'].value == 'value_to_change'
113
114     @workflow
115     def basic_workflow(ctx, **_):
116         node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
117         node.attributes['new_key'] = 'new_value'
118         node.attributes['key2'] = 'changed_value'
119
120     execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
121     node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
122
123     assert len(node.attributes) == 3
124     assert node.attributes['key'].value == 'value'
125     assert node.attributes['new_key'].value == 'new_value'
126     assert node.attributes['key2'].value == 'changed_value'