1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements. See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License. You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
19 Built-in heal workflow.
22 from aria import workflow
24 from .workflows import (install_node, uninstall_node)
25 from ..api import task
29 def heal(ctx, graph, node_id):
31 Built-in heal workflow..
33 :param ctx: workflow context
34 :param graph: graph which will describe the workflow.
35 :param node_id: ID of the node to heal
38 failing_node = ctx.model.node.get(node_id)
39 host_node = ctx.model.node.get(failing_node.host.id)
40 failed_node_subgraph = _get_contained_subgraph(ctx, host_node)
41 failed_node_ids = list(n.id for n in failed_node_subgraph)
43 targeted_nodes = [node for node in ctx.nodes
44 if node.id not in failed_node_ids]
46 uninstall_subgraph = task.WorkflowTask(
48 failing_nodes=failed_node_subgraph,
49 targeted_nodes=targeted_nodes
52 install_subgraph = task.WorkflowTask(
54 failing_nodes=failed_node_subgraph,
55 targeted_nodes=targeted_nodes)
57 graph.sequence(uninstall_subgraph, install_subgraph)
60 @workflow(suffix_template='{failing_nodes}')
61 def heal_uninstall(ctx, graph, failing_nodes, targeted_nodes):
63 Uninstall phase of the heal mechanism.
65 :param ctx: workflow context
66 :param graph: task graph to edit
67 :param failing_nodes: failing nodes to heal
68 :param targeted_nodes: targets of the relationships where the failing node are
70 node_sub_workflows = {}
72 # Create install stub workflow for each unaffected node
73 for node in targeted_nodes:
74 node_stub = task.StubTask()
75 node_sub_workflows[node.id] = node_stub
76 graph.add_tasks(node_stub)
78 # create install sub workflow for every node
79 for node in failing_nodes:
80 node_sub_workflow = task.WorkflowTask(uninstall_node,
82 node_sub_workflows[node.id] = node_sub_workflow
83 graph.add_tasks(node_sub_workflow)
85 # create dependencies between the node sub workflow
86 for node in failing_nodes:
87 node_sub_workflow = node_sub_workflows[node.id]
88 for relationship in reversed(node.outbound_relationships):
90 node_sub_workflows[relationship.target_node.id],
93 # Add operations for intact nodes depending on a node belonging to nodes
94 for node in targeted_nodes:
95 node_sub_workflow = node_sub_workflows[node.id]
97 for relationship in reversed(node.outbound_relationships):
100 ctx.model.node.get(relationship.target_node.id)
101 target_node_subgraph = node_sub_workflows[target_node.id]
102 graph.add_dependency(target_node_subgraph, node_sub_workflow)
104 if target_node in failing_nodes:
105 dependency = task.create_relationship_tasks(
106 relationship=relationship,
107 operation_name='aria.interfaces.relationship_lifecycle.unlink')
108 graph.add_tasks(*dependency)
109 graph.add_dependency(node_sub_workflow, dependency)
112 @workflow(suffix_template='{failing_nodes}')
113 def heal_install(ctx, graph, failing_nodes, targeted_nodes):
115 Install phase of the heal mechanism.
117 :param ctx: workflow context
118 :param graph: task graph to edit.
119 :param failing_nodes: failing nodes to heal
120 :param targeted_nodes: targets of the relationships where the failing node are
122 node_sub_workflows = {}
124 # Create install sub workflow for each unaffected
125 for node in targeted_nodes:
126 node_stub = task.StubTask()
127 node_sub_workflows[node.id] = node_stub
128 graph.add_tasks(node_stub)
130 # create install sub workflow for every node
131 for node in failing_nodes:
132 node_sub_workflow = task.WorkflowTask(install_node,
134 node_sub_workflows[node.id] = node_sub_workflow
135 graph.add_tasks(node_sub_workflow)
137 # create dependencies between the node sub workflow
138 for node in failing_nodes:
139 node_sub_workflow = node_sub_workflows[node.id]
140 if node.outbound_relationships:
142 [node_sub_workflows[relationship.target_node.id]
143 for relationship in node.outbound_relationships]
144 graph.add_dependency(node_sub_workflow, dependencies)
146 # Add operations for intact nodes depending on a node
148 for node in targeted_nodes:
149 node_sub_workflow = node_sub_workflows[node.id]
151 for relationship in node.outbound_relationships:
152 target_node = ctx.model.node.get(
153 relationship.target_node.id)
154 target_node_subworkflow = node_sub_workflows[target_node.id]
155 graph.add_dependency(node_sub_workflow, target_node_subworkflow)
157 if target_node in failing_nodes:
158 dependent = task.create_relationship_tasks(
159 relationship=relationship,
160 operation_name='aria.interfaces.relationship_lifecycle.establish')
161 graph.add_tasks(*dependent)
162 graph.add_dependency(dependent, node_sub_workflow)
165 def _get_contained_subgraph(context, host_node):
166 contained_instances = [node
167 for node in context.nodes
168 if node.host_fk == host_node.id and
169 node.host_fk != node.id]
172 if not contained_instances:
175 result.extend(contained_instances)
176 for node in contained_instances:
177 result.extend(_get_contained_subgraph(context, node))