1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements. See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License. You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 Built-in operation execution Workflow.
20 from ... import workflow
21 from ..api import task
25 def execute_operation(
31 run_by_dependency_order,
37 Built-in operation execution Workflow.
39 :param workflow_context: workflow context
40 :param graph: graph which will describe the workflow
41 :param operation: operation name to execute
42 :param operation_kwargs:
43 :param run_by_dependency_order:
45 :param node_template_ids:
51 # filtering node instances
52 filtered_nodes = list(_filter_nodes(
54 node_template_ids=node_template_ids,
56 type_names=type_names))
58 if run_by_dependency_order:
59 filtered_node_ids = set(node_instance.id for node_instance in filtered_nodes)
60 for node in ctx.nodes:
61 if node.id not in filtered_node_ids:
62 subgraphs[node.id] = ctx.task_graph(
63 name='execute_operation_stub_{0}'.format(node.id))
65 # registering actual tasks to sequences
66 for node in filtered_nodes:
70 interface_name=interface_name,
71 operation_name=operation_name,
72 arguments=operation_kwargs
76 for _, node_sub_workflow in subgraphs.items():
77 graph.add_tasks(node_sub_workflow)
79 # adding tasks dependencies if required
80 if run_by_dependency_order:
81 for node in ctx.nodes:
82 for relationship in node.relationships:
84 source_task=subgraphs[node.id], after=[subgraphs[relationship.target_id]])
87 def _filter_nodes(context, node_template_ids=(), node_ids=(), type_names=()):
88 def _is_node_template_by_id(node_template_id):
89 return not node_template_ids or node_template_id in node_template_ids
91 def _is_node_by_id(node_id):
92 return not node_ids or node_id in node_ids
94 def _is_node_by_type(node_type):
95 return not node_type.name in type_names
97 for node in context.nodes:
98 if all((_is_node_template_by_id(node.node_template.id),
99 _is_node_by_id(node.id),
100 _is_node_by_type(node.node_template.type))):