vFW and vDNS support added to azure-plugin
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / aria / orchestrator / workflows / api / task_graph.py
1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements.  See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License.  You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """
17 Task graph.
18 """
19
20 from collections import Iterable
21
22 from networkx import DiGraph, topological_sort
23
24 from ....utils.uuid import generate_uuid
25 from . import task as api_task
26
27
28 class TaskNotInGraphError(Exception):
29     """
30     An error representing a scenario where a given task is not in the graph as expected.
31     """
32     pass
33
34
35 def _filter_out_empty_tasks(func=None):
36     if func is None:
37         return lambda f: _filter_out_empty_tasks(func=f)
38
39     def _wrapper(task, *tasks, **kwargs):
40         return func(*(t for t in (task,) + tuple(tasks) if t), **kwargs)
41     return _wrapper
42
43
44 class TaskGraph(object):
45     """
46     Task graph builder.
47     """
48
49     def __init__(self, name):
50         self.name = name
51         self._id = generate_uuid(variant='uuid')
52         self._graph = DiGraph()
53
54     def __repr__(self):
55         return '{name}(id={self._id}, name={self.name}, graph={self._graph!r})'.format(
56             name=self.__class__.__name__, self=self)
57
58     @property
59     def id(self):
60         """
61         ID of the graph
62         """
63         return self._id
64
65     # graph traversal methods
66
67     @property
68     def tasks(self):
69         """
70         Iterator over tasks in the graph.
71         """
72         for _, data in self._graph.nodes_iter(data=True):
73             yield data['task']
74
75     def topological_order(self, reverse=False):
76         """
77         Topological sort of the graph.
78
79         :param reverse: whether to reverse the sort
80         :return: list which represents the topological sort
81         """
82         for task_id in topological_sort(self._graph, reverse=reverse):
83             yield self.get_task(task_id)
84
85     def get_dependencies(self, dependent_task):
86         """
87         Iterates over the task's dependencies.
88
89         :param dependent_task: task whose dependencies are requested
90         :raises ~aria.orchestrator.workflows.api.task_graph.TaskNotInGraphError: if
91          ``dependent_task`` is not in the graph
92         """
93         if not self.has_tasks(dependent_task):
94             raise TaskNotInGraphError('Task id: {0}'.format(dependent_task.id))
95         for _, dependency_id in self._graph.out_edges_iter(dependent_task.id):
96             yield self.get_task(dependency_id)
97
98     def get_dependents(self, dependency_task):
99         """
100         Iterates over the task's dependents.
101
102         :param dependency_task: task whose dependents are requested
103         :raises ~aria.orchestrator.workflows.api.task_graph.TaskNotInGraphError: if
104          ``dependency_task`` is not in the graph
105         """
106         if not self.has_tasks(dependency_task):
107             raise TaskNotInGraphError('Task id: {0}'.format(dependency_task.id))
108         for dependent_id, _ in self._graph.in_edges_iter(dependency_task.id):
109             yield self.get_task(dependent_id)
110
111     # task methods
112
113     def get_task(self, task_id):
114         """
115         Get a task instance that's been inserted to the graph by the task's ID.
116
117         :param basestring task_id: task ID
118         :raises ~aria.orchestrator.workflows.api.task_graph.TaskNotInGraphError: if no task found in
119          the graph with the given ID
120         """
121         if not self._graph.has_node(task_id):
122             raise TaskNotInGraphError('Task id: {0}'.format(task_id))
123         data = self._graph.node[task_id]
124         return data['task']
125
126     @_filter_out_empty_tasks
127     def add_tasks(self, *tasks):
128         """
129         Adds a task to the graph.
130
131         :param task: task
132         :return: list of added tasks
133         :rtype: list
134         """
135         assert all([isinstance(task, (api_task.BaseTask, Iterable)) for task in tasks])
136         return_tasks = []
137
138         for task in tasks:
139             if isinstance(task, Iterable):
140                 return_tasks += self.add_tasks(*task)
141             elif not self.has_tasks(task):
142                 self._graph.add_node(task.id, task=task)
143                 return_tasks.append(task)
144
145         return return_tasks
146
147     @_filter_out_empty_tasks
148     def remove_tasks(self, *tasks):
149         """
150         Removes the provided task from the graph.
151
152         :param task: task
153         :return: list of removed tasks
154         :rtype: list
155         """
156         return_tasks = []
157
158         for task in tasks:
159             if isinstance(task, Iterable):
160                 return_tasks += self.remove_tasks(*task)
161             elif self.has_tasks(task):
162                 self._graph.remove_node(task.id)
163                 return_tasks.append(task)
164
165         return return_tasks
166
167     @_filter_out_empty_tasks
168     def has_tasks(self, *tasks):
169         """
170         Checks whether a task is in the graph.
171
172         :param task: task
173         :return: ``True`` if all tasks are in the graph, otherwise ``False``
174         :rtype: list
175         """
176         assert all(isinstance(t, (api_task.BaseTask, Iterable)) for t in tasks)
177         return_value = True
178
179         for task in tasks:
180             if isinstance(task, Iterable):
181                 return_value &= self.has_tasks(*task)
182             else:
183                 return_value &= self._graph.has_node(task.id)
184
185         return return_value
186
187     def add_dependency(self, dependent, dependency):
188         """
189         Adds a dependency for one item (task, sequence or parallel) on another.
190
191         The dependent will only be executed after the dependency terminates. If either of the items
192         is either a sequence or a parallel, multiple dependencies may be added.
193
194         :param dependent: dependent (task, sequence or parallel)
195         :param dependency: dependency (task, sequence or parallel)
196         :return: ``True`` if the dependency between the two hadn't already existed, otherwise
197          ``False``
198         :rtype: bool
199         :raises ~aria.orchestrator.workflows.api.task_graph.TaskNotInGraphError: if either the
200          dependent or dependency are tasks which are not in the graph
201         """
202         if not (self.has_tasks(dependent) and self.has_tasks(dependency)):
203             raise TaskNotInGraphError()
204
205         if self.has_dependency(dependent, dependency):
206             return
207
208         if isinstance(dependent, Iterable):
209             for dependent_task in dependent:
210                 self.add_dependency(dependent_task, dependency)
211         else:
212             if isinstance(dependency, Iterable):
213                 for dependency_task in dependency:
214                     self.add_dependency(dependent, dependency_task)
215             else:
216                 self._graph.add_edge(dependent.id, dependency.id)
217
218     def has_dependency(self, dependent, dependency):
219         """
220         Checks whether one item (task, sequence or parallel) depends on another.
221
222         Note that if either of the items is either a sequence or a parallel, and some of the
223         dependencies exist in the graph but not all of them, this method will return ``False``.
224
225         :param dependent: dependent (task, sequence or parallel)
226         :param dependency: dependency (task, sequence or parallel)
227         :return: ``True`` if the dependency between the two exists, otherwise ``False``
228         :rtype: bool
229         :raises ~aria.orchestrator.workflows.api.task_graph.TaskNotInGraphError: if either the
230          dependent or dependency are tasks which are not in the graph
231         """
232         if not (dependent and dependency):
233             return False
234         elif not (self.has_tasks(dependent) and self.has_tasks(dependency)):
235             raise TaskNotInGraphError()
236
237         return_value = True
238
239         if isinstance(dependent, Iterable):
240             for dependent_task in dependent:
241                 return_value &= self.has_dependency(dependent_task, dependency)
242         else:
243             if isinstance(dependency, Iterable):
244                 for dependency_task in dependency:
245                     return_value &= self.has_dependency(dependent, dependency_task)
246             else:
247                 return_value &= self._graph.has_edge(dependent.id, dependency.id)
248
249         return return_value
250
251     def remove_dependency(self, dependent, dependency):
252         """
253         Removes a dependency for one item (task, sequence or parallel) on another.
254
255         Note that if either of the items is either a sequence or a parallel, and some of the
256         dependencies exist in the graph but not all of them, this method will not remove any of the
257         dependencies and return ``False``.
258
259         :param dependent: dependent (task, sequence or parallel)
260         :param dependency: dependency (task, sequence or parallel)
261         :return: ``False`` if the dependency between the two hadn't existed, otherwise ``True``
262         :rtype: bool
263         :raises ~aria.orchestrator.workflows.api.task_graph.TaskNotInGraphError: if either the
264          dependent or dependency are tasks which are not in the graph
265         """
266         if not (self.has_tasks(dependent) and self.has_tasks(dependency)):
267             raise TaskNotInGraphError()
268
269         if not self.has_dependency(dependent, dependency):
270             return
271
272         if isinstance(dependent, Iterable):
273             for dependent_task in dependent:
274                 self.remove_dependency(dependent_task, dependency)
275         elif isinstance(dependency, Iterable):
276             for dependency_task in dependency:
277                 self.remove_dependency(dependent, dependency_task)
278         else:
279             self._graph.remove_edge(dependent.id, dependency.id)
280
281     @_filter_out_empty_tasks
282     def sequence(self, *tasks):
283         """
284         Creates and inserts a sequence into the graph, effectively each task i depends on i-1.
285
286         :param tasks: iterable of dependencies
287         :return: provided tasks
288         """
289         if tasks:
290             self.add_tasks(*tasks)
291
292             for i in xrange(1, len(tasks)):
293                 self.add_dependency(tasks[i], tasks[i-1])
294
295         return tasks