1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements. See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License. You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 Workflow event handling.
20 from datetime import (
25 from ... import events
26 from ... import exceptions
29 @events.sent_task_signal.connect
30 def _task_sent(ctx, *args, **kwargs):
31 with ctx.persist_changes:
32 ctx.task.status = ctx.task.SENT
35 @events.start_task_signal.connect
36 def _task_started(ctx, *args, **kwargs):
37 with ctx.persist_changes:
38 ctx.task.started_at = datetime.utcnow()
39 ctx.task.status = ctx.task.STARTED
40 _update_node_state_if_necessary(ctx, is_transitional=True)
43 @events.on_failure_task_signal.connect
44 def _task_failed(ctx, exception, *args, **kwargs):
45 with ctx.persist_changes:
47 not isinstance(exception, exceptions.TaskAbortException),
48 ctx.task.attempts_count < ctx.task.max_attempts or
49 ctx.task.max_attempts == ctx.task.INFINITE_RETRIES,
50 # ignore_failure check here means the task will not be retried and it will be marked
51 # as failed. The engine will also look at ignore_failure so it won't fail the
53 not ctx.task.ignore_failure
57 if isinstance(exception, exceptions.TaskRetryException):
58 retry_interval = exception.retry_interval
59 if retry_interval is None:
60 retry_interval = ctx.task.retry_interval
61 ctx.task.status = ctx.task.RETRYING
62 ctx.task.attempts_count += 1
63 ctx.task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval)
65 ctx.task.ended_at = datetime.utcnow()
66 ctx.task.status = ctx.task.FAILED
69 @events.on_success_task_signal.connect
70 def _task_succeeded(ctx, *args, **kwargs):
71 with ctx.persist_changes:
72 ctx.task.ended_at = datetime.utcnow()
73 ctx.task.status = ctx.task.SUCCESS
74 ctx.task.attempts_count += 1
76 _update_node_state_if_necessary(ctx)
79 @events.start_workflow_signal.connect
80 def _workflow_started(workflow_context, *args, **kwargs):
81 with workflow_context.persist_changes:
82 execution = workflow_context.execution
83 # the execution may already be in the process of cancelling
84 if execution.status in (execution.CANCELLING, execution.CANCELLED):
86 execution.status = execution.STARTED
87 execution.started_at = datetime.utcnow()
90 @events.on_failure_workflow_signal.connect
91 def _workflow_failed(workflow_context, exception, *args, **kwargs):
92 with workflow_context.persist_changes:
93 execution = workflow_context.execution
94 execution.error = str(exception)
95 execution.status = execution.FAILED
96 execution.ended_at = datetime.utcnow()
99 @events.on_success_workflow_signal.connect
100 def _workflow_succeeded(workflow_context, *args, **kwargs):
101 with workflow_context.persist_changes:
102 execution = workflow_context.execution
103 execution.status = execution.SUCCEEDED
104 execution.ended_at = datetime.utcnow()
107 @events.on_cancelled_workflow_signal.connect
108 def _workflow_cancelled(workflow_context, *args, **kwargs):
109 with workflow_context.persist_changes:
110 execution = workflow_context.execution
111 # _workflow_cancelling function may have called this function already
112 if execution.status == execution.CANCELLED:
114 # the execution may have already been finished
115 elif execution.status in (execution.SUCCEEDED, execution.FAILED):
116 _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
118 execution.status = execution.CANCELLED
119 execution.ended_at = datetime.utcnow()
122 @events.on_resume_workflow_signal.connect
123 def _workflow_resume(workflow_context, retry_failed=False, *args, **kwargs):
124 with workflow_context.persist_changes:
125 execution = workflow_context.execution
126 execution.status = execution.PENDING
127 # Any non ended task would be put back to pending state
128 for task in execution.tasks:
129 if not task.has_ended():
130 task.status = task.PENDING
133 for task in execution.tasks:
134 if task.status == task.FAILED and not task.ignore_failure:
135 task.attempts_count = 0
136 task.status = task.PENDING
140 @events.on_cancelling_workflow_signal.connect
141 def _workflow_cancelling(workflow_context, *args, **kwargs):
142 with workflow_context.persist_changes:
143 execution = workflow_context.execution
144 if execution.status == execution.PENDING:
145 return _workflow_cancelled(workflow_context=workflow_context)
146 # the execution may have already been finished
147 elif execution.status in (execution.SUCCEEDED, execution.FAILED):
148 _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
150 execution.status = execution.CANCELLING
153 def _update_node_state_if_necessary(ctx, is_transitional=False):
154 # TODO: this is not the right way to check! the interface name is arbitrary
155 # and also will *never* be the type name
156 node = ctx.task.node if ctx.task is not None else None
157 if (node is not None) and \
158 (ctx.task.interface_name in ('Standard', 'tosca.interfaces.node.lifecycle.Standard',
160 state = node.determine_state(op_name=ctx.task.operation_name,
161 is_transitional=is_transitional)
164 ctx.model.node.update(node)
167 def _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, status):
168 workflow_context.logger.info(
169 "'{workflow_name}' workflow execution {status} before the cancel request"
170 "was fully processed".format(workflow_name=workflow_context.workflow_name, status=status))