1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements. See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License. You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 Provides the tasks to be entered into the task graph
20 from ... import context
21 from ....modeling import models
22 from ....modeling import utils as modeling_utils
23 from ....utils.uuid import generate_uuid
24 from .. import exceptions
27 class BaseTask(object):
32 def __init__(self, ctx=None, **kwargs):
34 self._workflow_context = ctx
36 self._workflow_context = context.workflow.current.get()
37 self._id = generate_uuid(variant='uuid')
47 def workflow_context(self):
49 Context of the current workflow.
51 return self._workflow_context
54 class OperationTask(BaseTask):
56 Executes an operation.
58 :ivar name: formatted name (includes actor type, actor name, and interface/operation names)
59 :vartype name: basestring
60 :ivar actor: node or relationship
61 :vartype actor: :class:`~aria.modeling.models.Node` or
62 :class:`~aria.modeling.models.Relationship`
63 :ivar interface_name: interface name on actor
64 :vartype interface_name: basestring
65 :ivar operation_name: operation name on interface
66 :vartype operation_name: basestring
67 :ivar plugin: plugin (or None for default plugin)
68 :vartype plugin: :class:`~aria.modeling.models.Plugin`
69 :ivar function: path to Python function
70 :vartype function: basestring
71 :ivar arguments: arguments to send to Python function
72 :vartype arguments: {:obj:`basestring`: :class:`~aria.modeling.models.Argument`}
73 :ivar ignore_failure: whether to ignore failures
74 :vartype ignore_failure: bool
75 :ivar max_attempts: maximum number of attempts allowed in case of failure
76 :vartype max_attempts: int
77 :ivar retry_interval: interval between retries (in seconds)
78 :vartype retry_interval: float
81 NAME_FORMAT = '{interface}:{operation}@{type}:{name}'
92 :param actor: node or relationship
93 :type actor: :class:`~aria.modeling.models.Node` or
94 :class:`~aria.modeling.models.Relationship`
95 :param interface_name: interface name on actor
96 :type interface_name: basestring
97 :param operation_name: operation name on interface
98 :type operation_name: basestring
99 :param arguments: override argument values
100 :type arguments: {:obj:`basestring`: object}
101 :param ignore_failure: override whether to ignore failures
102 :type ignore_failure: bool
103 :param max_attempts: override maximum number of attempts allowed in case of failure
104 :type max_attempts: int
105 :param retry_interval: override interval between retries (in seconds)
106 :type retry_interval: float
107 :raises ~aria.orchestrator.workflows.exceptions.OperationNotFoundException: if
108 ``interface_name`` and ``operation_name`` do not refer to an operation on the actor
111 # Creating OperationTask directly should raise an error when there is no
112 # interface/operation.
113 if not has_operation(actor, interface_name, operation_name):
114 raise exceptions.OperationNotFoundException(
115 'Could not find operation "{operation_name}" on interface '
116 '"{interface_name}" for {actor_type} "{actor.name}"'.format(
117 operation_name=operation_name,
118 interface_name=interface_name,
119 actor_type=type(actor).__name__.lower(),
123 super(OperationTask, self).__init__()
125 self.name = OperationTask.NAME_FORMAT.format(type=type(actor).__name__.lower(),
127 interface=interface_name,
128 operation=operation_name)
130 self.interface_name = interface_name
131 self.operation_name = operation_name
132 self.ignore_failure = \
133 self.workflow_context._task_ignore_failure if ignore_failure is None else ignore_failure
134 self.max_attempts = max_attempts or self.workflow_context._task_max_attempts
135 self.retry_interval = retry_interval or self.workflow_context._task_retry_interval
137 operation = self.actor.interfaces[self.interface_name].operations[self.operation_name]
138 self.plugin = operation.plugin
139 self.function = operation.function
140 self.arguments = modeling_utils.merge_parameter_values(arguments, operation.arguments)
143 if hasattr(actor, '_wrapped'):
144 # Unwrap instrumented model
145 actor = actor._wrapped
147 if isinstance(actor, models.Node):
148 self._context_cls = context.operation.NodeOperationContext
149 elif isinstance(actor, models.Relationship):
150 self._context_cls = context.operation.RelationshipOperationContext
152 raise exceptions.TaskCreationException('Could not create valid context for '
153 '{actor.__class__}'.format(actor=actor))
159 class StubTask(BaseTask):
161 Enables creating empty tasks.
165 class WorkflowTask(BaseTask):
167 Executes a complete workflow.
170 def __init__(self, workflow_func, **kwargs):
172 :param workflow_func: function to run
173 :param kwargs: kwargs that would be passed to the workflow_func
175 super(WorkflowTask, self).__init__(**kwargs)
176 kwargs['ctx'] = self.workflow_context
177 self._graph = workflow_func(**kwargs)
182 Graph constructed by the sub workflow.
186 def __getattr__(self, item):
188 return getattr(self._graph, item)
189 except AttributeError:
190 return super(WorkflowTask, self).__getattribute__(item)
193 def create_task(actor, interface_name, operation_name, **kwargs):
195 Helper function that enables safe creation of :class:`OperationTask`. If the supplied interface
196 or operation do not exist, ``None`` is returned.
198 :param actor: actor for this task
199 :param interface_name: name of the interface
200 :param operation_name: name of the operation
201 :param kwargs: any additional kwargs to be passed to the OperationTask
202 :return: OperationTask or None (if the interface/operation does not exists)
205 return OperationTask(
207 interface_name=interface_name,
208 operation_name=operation_name,
211 except exceptions.OperationNotFoundException:
215 def create_relationships_tasks(
216 node, interface_name, source_operation_name=None, target_operation_name=None, **kwargs):
218 Creates a relationship task (source and target) for all of a node relationships.
220 :param basestring source_operation_name: relationship operation name
221 :param basestring interface_name: name of the interface
222 :param source_operation_name:
223 :param target_operation_name:
224 :param node: source node
227 for relationship in node.outbound_relationships:
228 relationship_operations = create_relationship_tasks(
231 source_operation_name=source_operation_name,
232 target_operation_name=target_operation_name,
234 sub_tasks.append(relationship_operations)
238 def create_relationship_tasks(relationship, interface_name, source_operation_name=None,
239 target_operation_name=None, **kwargs):
241 Creates a relationship task (source and target).
243 :param relationship: relationship instance itself
244 :param source_operation_name:
245 :param target_operation_name:
248 if source_operation_name:
252 interface_name=interface_name,
253 operation_name=source_operation_name,
257 if target_operation_name:
261 interface_name=interface_name,
262 operation_name=target_operation_name,
267 return [o for o in operations if o]
270 def has_operation(actor, interface_name, operation_name):
271 interface = actor.interfaces.get(interface_name, None)
272 return interface and interface.operations.get(operation_name, False)