1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements. See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License. You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
21 from contextlib import contextmanager
23 from .exceptions import ContextException
24 from .common import BaseContext
27 class WorkflowContext(BaseContext):
29 Context used during workflow creation and execution.
35 task_retry_interval=0,
36 task_ignore_failure=False,
38 super(WorkflowContext, self).__init__(*args, **kwargs)
39 self._workflow_name = workflow_name
40 self._parameters = parameters or {}
41 self._task_max_attempts = task_max_attempts
42 self._task_retry_interval = task_retry_interval
43 self._task_ignore_failure = task_ignore_failure
44 self._execution_graph = None
45 self._register_logger()
49 '{name}(deployment_id={self._service_id}, '
50 'workflow_name={self._workflow_name}, execution_id={self._execution_id})'.format(
51 name=self.__class__.__name__, self=self))
54 def workflow_name(self):
55 return self._workflow_name
62 return self.model.execution.get(self._execution_id)
65 def execution(self, value):
67 Stores the execution in the storage model API ("MAPI").
69 self.model.execution.put(value)
72 def node_templates(self):
74 Iterates over nodes templates.
76 key = 'service_{0}'.format(self.model.node_template.model_cls.name_column_name())
78 return self.model.node_template.iter(
80 key: getattr(self.service, self.service.name_column_name())
89 key = 'service_{0}'.format(self.model.node.model_cls.name_column_name())
90 return self.model.node.iter(
92 key: getattr(self.service, self.service.name_column_name())
98 def persist_changes(self):
100 self._model.execution.update(self.execution)
103 class _CurrentContext(threading.local):
105 Provides a thread-level context, with sugar for the task MAPI.
109 super(_CurrentContext, self).__init__()
110 self._workflow_context = None
112 def _set(self, value):
113 self._workflow_context = value
117 Retrieves the current workflow context.
119 if self._workflow_context is not None:
120 return self._workflow_context
121 raise ContextException("No context was set")
124 def push(self, workflow_context):
126 Switches the current context to the provided context.
128 prev_workflow_context = self._workflow_context
129 self._set(workflow_context)
133 self._set(prev_workflow_context)
135 current = _CurrentContext()