vFW and vDNS support added to azure-plugin
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / aria / orchestrator / workflows / api / task.py
1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements.  See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License.  You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """
17 Provides the tasks to be entered into the task graph
18 """
19
20 from ... import context
21 from ....modeling import models
22 from ....modeling import utils as modeling_utils
23 from ....utils.uuid import generate_uuid
24 from .. import exceptions
25
26
27 class BaseTask(object):
28     """
29     Base class for tasks.
30     """
31
32     def __init__(self, ctx=None, **kwargs):
33         if ctx is not None:
34             self._workflow_context = ctx
35         else:
36             self._workflow_context = context.workflow.current.get()
37         self._id = generate_uuid(variant='uuid')
38
39     @property
40     def id(self):
41         """
42         UUID4 ID.
43         """
44         return self._id
45
46     @property
47     def workflow_context(self):
48         """
49         Context of the current workflow.
50         """
51         return self._workflow_context
52
53
54 class OperationTask(BaseTask):
55     """
56     Executes an operation.
57
58     :ivar name: formatted name (includes actor type, actor name, and interface/operation names)
59     :vartype name: basestring
60     :ivar actor: node or relationship
61     :vartype actor: :class:`~aria.modeling.models.Node` or
62      :class:`~aria.modeling.models.Relationship`
63     :ivar interface_name: interface name on actor
64     :vartype interface_name: basestring
65     :ivar operation_name: operation name on interface
66     :vartype operation_name: basestring
67     :ivar plugin: plugin (or None for default plugin)
68     :vartype plugin: :class:`~aria.modeling.models.Plugin`
69     :ivar function: path to Python function
70     :vartype function: basestring
71     :ivar arguments: arguments to send to Python function
72     :vartype arguments: {:obj:`basestring`: :class:`~aria.modeling.models.Argument`}
73     :ivar ignore_failure: whether to ignore failures
74     :vartype ignore_failure: bool
75     :ivar max_attempts: maximum number of attempts allowed in case of failure
76     :vartype max_attempts: int
77     :ivar retry_interval: interval between retries (in seconds)
78     :vartype retry_interval: float
79     """
80
81     NAME_FORMAT = '{interface}:{operation}@{type}:{name}'
82
83     def __init__(self,
84                  actor,
85                  interface_name,
86                  operation_name,
87                  arguments=None,
88                  ignore_failure=None,
89                  max_attempts=None,
90                  retry_interval=None):
91         """
92         :param actor: node or relationship
93         :type actor: :class:`~aria.modeling.models.Node` or
94          :class:`~aria.modeling.models.Relationship`
95         :param interface_name: interface name on actor
96         :type interface_name: basestring
97         :param operation_name: operation name on interface
98         :type operation_name: basestring
99         :param arguments: override argument values
100         :type arguments: {:obj:`basestring`: object}
101         :param ignore_failure: override whether to ignore failures
102         :type ignore_failure: bool
103         :param max_attempts: override maximum number of attempts allowed in case of failure
104         :type max_attempts: int
105         :param retry_interval: override interval between retries (in seconds)
106         :type retry_interval: float
107         :raises ~aria.orchestrator.workflows.exceptions.OperationNotFoundException: if
108          ``interface_name`` and ``operation_name`` do not refer to an operation on the actor
109         """
110
111         # Creating OperationTask directly should raise an error when there is no
112         # interface/operation.
113         if not has_operation(actor, interface_name, operation_name):
114             raise exceptions.OperationNotFoundException(
115                 'Could not find operation "{operation_name}" on interface '
116                 '"{interface_name}" for {actor_type} "{actor.name}"'.format(
117                     operation_name=operation_name,
118                     interface_name=interface_name,
119                     actor_type=type(actor).__name__.lower(),
120                     actor=actor)
121             )
122
123         super(OperationTask, self).__init__()
124
125         self.name = OperationTask.NAME_FORMAT.format(type=type(actor).__name__.lower(),
126                                                      name=actor.name,
127                                                      interface=interface_name,
128                                                      operation=operation_name)
129         self.actor = actor
130         self.interface_name = interface_name
131         self.operation_name = operation_name
132         self.ignore_failure = \
133             self.workflow_context._task_ignore_failure if ignore_failure is None else ignore_failure
134         self.max_attempts = max_attempts or self.workflow_context._task_max_attempts
135         self.retry_interval = retry_interval or self.workflow_context._task_retry_interval
136
137         operation = self.actor.interfaces[self.interface_name].operations[self.operation_name]
138         self.plugin = operation.plugin
139         self.function = operation.function
140         self.arguments = modeling_utils.merge_parameter_values(arguments, operation.arguments)
141
142         actor = self.actor
143         if hasattr(actor, '_wrapped'):
144             # Unwrap instrumented model
145             actor = actor._wrapped
146
147         if isinstance(actor, models.Node):
148             self._context_cls = context.operation.NodeOperationContext
149         elif isinstance(actor, models.Relationship):
150             self._context_cls = context.operation.RelationshipOperationContext
151         else:
152             raise exceptions.TaskCreationException('Could not create valid context for '
153                                                    '{actor.__class__}'.format(actor=actor))
154
155     def __repr__(self):
156         return self.name
157
158
159 class StubTask(BaseTask):
160     """
161     Enables creating empty tasks.
162     """
163
164
165 class WorkflowTask(BaseTask):
166     """
167     Executes a complete workflow.
168     """
169
170     def __init__(self, workflow_func, **kwargs):
171         """
172         :param workflow_func: function to run
173         :param kwargs: kwargs that would be passed to the workflow_func
174         """
175         super(WorkflowTask, self).__init__(**kwargs)
176         kwargs['ctx'] = self.workflow_context
177         self._graph = workflow_func(**kwargs)
178
179     @property
180     def graph(self):
181         """
182         Graph constructed by the sub workflow.
183         """
184         return self._graph
185
186     def __getattr__(self, item):
187         try:
188             return getattr(self._graph, item)
189         except AttributeError:
190             return super(WorkflowTask, self).__getattribute__(item)
191
192
193 def create_task(actor, interface_name, operation_name, **kwargs):
194     """
195     Helper function that enables safe creation of :class:`OperationTask`. If the supplied interface
196     or operation do not exist, ``None`` is returned.
197
198     :param actor: actor for this task
199     :param interface_name: name of the interface
200     :param operation_name: name of the operation
201     :param kwargs: any additional kwargs to be passed to the OperationTask
202     :return: OperationTask or None (if the interface/operation does not exists)
203     """
204     try:
205         return OperationTask(
206             actor,
207             interface_name=interface_name,
208             operation_name=operation_name,
209             **kwargs
210         )
211     except exceptions.OperationNotFoundException:
212         return None
213
214
215 def create_relationships_tasks(
216         node, interface_name, source_operation_name=None, target_operation_name=None, **kwargs):
217     """
218     Creates a relationship task (source and target) for all of a node relationships.
219
220     :param basestring source_operation_name: relationship operation name
221     :param basestring interface_name: name of the interface
222     :param source_operation_name:
223     :param target_operation_name:
224     :param node: source node
225     """
226     sub_tasks = []
227     for relationship in node.outbound_relationships:
228         relationship_operations = create_relationship_tasks(
229             relationship,
230             interface_name,
231             source_operation_name=source_operation_name,
232             target_operation_name=target_operation_name,
233             **kwargs)
234         sub_tasks.append(relationship_operations)
235     return sub_tasks
236
237
238 def create_relationship_tasks(relationship, interface_name, source_operation_name=None,
239                               target_operation_name=None, **kwargs):
240     """
241     Creates a relationship task (source and target).
242
243     :param relationship: relationship instance itself
244     :param source_operation_name:
245     :param target_operation_name:
246     """
247     operations = []
248     if source_operation_name:
249         operations.append(
250             create_task(
251                 relationship,
252                 interface_name=interface_name,
253                 operation_name=source_operation_name,
254                 **kwargs
255             )
256         )
257     if target_operation_name:
258         operations.append(
259             create_task(
260                 relationship,
261                 interface_name=interface_name,
262                 operation_name=target_operation_name,
263                 **kwargs
264             )
265         )
266
267     return [o for o in operations if o]
268
269
270 def has_operation(actor, interface_name, operation_name):
271     interface = actor.interfaces.get(interface_name, None)
272     return interface and interface.operations.get(operation_name, False)