vFW and vDNS support added to azure-plugin
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / aria / orchestrator / workflows / core / events_handler.py
1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements.  See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License.  You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """
17 Workflow event handling.
18 """
19
20 from datetime import (
21     datetime,
22     timedelta,
23 )
24
25 from ... import events
26 from ... import exceptions
27
28
29 @events.sent_task_signal.connect
30 def _task_sent(ctx, *args, **kwargs):
31     with ctx.persist_changes:
32         ctx.task.status = ctx.task.SENT
33
34
35 @events.start_task_signal.connect
36 def _task_started(ctx, *args, **kwargs):
37     with ctx.persist_changes:
38         ctx.task.started_at = datetime.utcnow()
39         ctx.task.status = ctx.task.STARTED
40         _update_node_state_if_necessary(ctx, is_transitional=True)
41
42
43 @events.on_failure_task_signal.connect
44 def _task_failed(ctx, exception, *args, **kwargs):
45     with ctx.persist_changes:
46         should_retry = all([
47             not isinstance(exception, exceptions.TaskAbortException),
48             ctx.task.attempts_count < ctx.task.max_attempts or
49             ctx.task.max_attempts == ctx.task.INFINITE_RETRIES,
50             # ignore_failure check here means the task will not be retried and it will be marked
51             # as failed. The engine will also look at ignore_failure so it won't fail the
52             # workflow.
53             not ctx.task.ignore_failure
54         ])
55         if should_retry:
56             retry_interval = None
57             if isinstance(exception, exceptions.TaskRetryException):
58                 retry_interval = exception.retry_interval
59             if retry_interval is None:
60                 retry_interval = ctx.task.retry_interval
61             ctx.task.status = ctx.task.RETRYING
62             ctx.task.attempts_count += 1
63             ctx.task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval)
64         else:
65             ctx.task.ended_at = datetime.utcnow()
66             ctx.task.status = ctx.task.FAILED
67
68
69 @events.on_success_task_signal.connect
70 def _task_succeeded(ctx, *args, **kwargs):
71     with ctx.persist_changes:
72         ctx.task.ended_at = datetime.utcnow()
73         ctx.task.status = ctx.task.SUCCESS
74         ctx.task.attempts_count += 1
75
76         _update_node_state_if_necessary(ctx)
77
78
79 @events.start_workflow_signal.connect
80 def _workflow_started(workflow_context, *args, **kwargs):
81     with workflow_context.persist_changes:
82         execution = workflow_context.execution
83         # the execution may already be in the process of cancelling
84         if execution.status in (execution.CANCELLING, execution.CANCELLED):
85             return
86         execution.status = execution.STARTED
87         execution.started_at = datetime.utcnow()
88
89
90 @events.on_failure_workflow_signal.connect
91 def _workflow_failed(workflow_context, exception, *args, **kwargs):
92     with workflow_context.persist_changes:
93         execution = workflow_context.execution
94         execution.error = str(exception)
95         execution.status = execution.FAILED
96         execution.ended_at = datetime.utcnow()
97
98
99 @events.on_success_workflow_signal.connect
100 def _workflow_succeeded(workflow_context, *args, **kwargs):
101     with workflow_context.persist_changes:
102         execution = workflow_context.execution
103         execution.status = execution.SUCCEEDED
104         execution.ended_at = datetime.utcnow()
105
106
107 @events.on_cancelled_workflow_signal.connect
108 def _workflow_cancelled(workflow_context, *args, **kwargs):
109     with workflow_context.persist_changes:
110         execution = workflow_context.execution
111         # _workflow_cancelling function may have called this function already
112         if execution.status == execution.CANCELLED:
113             return
114         # the execution may have already been finished
115         elif execution.status in (execution.SUCCEEDED, execution.FAILED):
116             _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
117         else:
118             execution.status = execution.CANCELLED
119             execution.ended_at = datetime.utcnow()
120
121
122 @events.on_resume_workflow_signal.connect
123 def _workflow_resume(workflow_context, retry_failed=False, *args, **kwargs):
124     with workflow_context.persist_changes:
125         execution = workflow_context.execution
126         execution.status = execution.PENDING
127         # Any non ended task would be put back to pending state
128         for task in execution.tasks:
129             if not task.has_ended():
130                 task.status = task.PENDING
131
132         if retry_failed:
133             for task in execution.tasks:
134                 if task.status == task.FAILED and not task.ignore_failure:
135                     task.attempts_count = 0
136                     task.status = task.PENDING
137
138
139
140 @events.on_cancelling_workflow_signal.connect
141 def _workflow_cancelling(workflow_context, *args, **kwargs):
142     with workflow_context.persist_changes:
143         execution = workflow_context.execution
144         if execution.status == execution.PENDING:
145             return _workflow_cancelled(workflow_context=workflow_context)
146         # the execution may have already been finished
147         elif execution.status in (execution.SUCCEEDED, execution.FAILED):
148             _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
149         else:
150             execution.status = execution.CANCELLING
151
152
153 def _update_node_state_if_necessary(ctx, is_transitional=False):
154     # TODO: this is not the right way to check! the interface name is arbitrary
155     # and also will *never* be the type name
156     node = ctx.task.node if ctx.task is not None else None
157     if (node is not None) and \
158         (ctx.task.interface_name in ('Standard', 'tosca.interfaces.node.lifecycle.Standard',
159                                      'tosca:Standard')):
160         state = node.determine_state(op_name=ctx.task.operation_name,
161                                      is_transitional=is_transitional)
162         if state:
163             node.state = state
164             ctx.model.node.update(node)
165
166
167 def _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, status):
168     workflow_context.logger.info(
169         "'{workflow_name}' workflow execution {status} before the cancel request"
170         "was fully processed".format(workflow_name=workflow_context.workflow_name, status=status))