1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements. See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License. You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 from datetime import datetime
20 from aria import application_model_storage, workflow
21 from aria.orchestrator import context
22 from aria.storage import sql_mapi
23 from aria.orchestrator.workflows.executor import thread, process
25 from tests import storage as test_storage, ROOT_DIR
30 class TestWorkflowContext(object):
32 def test_execution_creation_on_workflow_context_creation(self, storage):
33 ctx = self._create_ctx(storage)
34 execution = storage.execution.get(ctx.execution.id) # pylint: disable=no-member
35 assert execution.service == storage.service.get_by_name(
36 mock.models.SERVICE_NAME)
37 assert execution.workflow_name == mock.models.WORKFLOW_NAME
38 assert execution.service_template == storage.service_template.get_by_name(
39 mock.models.SERVICE_TEMPLATE_NAME)
40 assert execution.status == storage.execution.model_cls.PENDING
41 assert execution.inputs == {}
42 assert execution.created_at <= datetime.utcnow()
44 def test_subsequent_workflow_context_creation_do_not_fail(self, storage):
45 self._create_ctx(storage)
46 self._create_ctx(storage)
49 def _create_ctx(storage):
53 :return WorkflowContext:
55 service = storage.service.get_by_name(mock.models.SERVICE_NAME)
56 return context.workflow.WorkflowContext(
57 name='simple_context',
58 model_storage=storage,
59 resource_storage=None,
61 execution_id=storage.execution.list(filters=dict(service=service))[0].id,
62 workflow_name=mock.models.WORKFLOW_NAME,
63 task_max_attempts=mock.models.TASK_MAX_ATTEMPTS,
64 task_retry_interval=mock.models.TASK_RETRY_INTERVAL
69 workflow_storage = application_model_storage(
70 sql_mapi.SQLAlchemyModelAPI, initiator=test_storage.init_inmemory_model_storage)
71 workflow_storage.service_template.put(mock.models.create_service_template())
72 service_template = workflow_storage.service_template.get_by_name(
73 mock.models.SERVICE_TEMPLATE_NAME)
74 service = mock.models.create_service(service_template)
75 workflow_storage.service.put(service)
76 workflow_storage.execution.put(mock.models.create_execution(service))
77 yield workflow_storage
78 test_storage.release_sqlite_storage(workflow_storage)
83 context = mock.context.simple(
85 context_kwargs=dict(workdir=str(tmpdir.join('workdir')))
88 test_storage.release_sqlite_storage(context.model)
91 @pytest.fixture(params=[
92 (thread.ThreadExecutor, {}),
93 (process.ProcessExecutor, {'python_path': [ROOT_DIR]}),
95 def executor(request):
96 executor_cls, executor_kwargs = request.param
97 result = executor_cls(**executor_kwargs)
104 def test_attribute_consumption(ctx, executor):
106 node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
107 node.attributes['key'] = ctx.model.attribute.model_cls.wrap('key', 'value')
108 node.attributes['key2'] = ctx.model.attribute.model_cls.wrap('key2', 'value_to_change')
109 ctx.model.node.update(node)
111 assert node.attributes['key'].value == 'value'
112 assert node.attributes['key2'].value == 'value_to_change'
115 def basic_workflow(ctx, **_):
116 node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
117 node.attributes['new_key'] = 'new_value'
118 node.attributes['key2'] = 'changed_value'
120 execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
121 node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
123 assert len(node.attributes) == 3
124 assert node.attributes['key'].value == 'value'
125 assert node.attributes['new_key'].value == 'new_value'
126 assert node.attributes['key2'].value == 'changed_value'