1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements. See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License. You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 Workflow and operation decorators.
20 from functools import partial, wraps
22 from ..utils.validation import validate_function_arguments
23 from ..utils.uuid import generate_uuid
26 from .workflows.api import task_graph
29 WORKFLOW_DECORATOR_RESERVED_ARGUMENTS = set(('ctx', 'graph'))
30 OPERATION_DECORATOR_RESERVED_ARGUMENTS = set(('ctx', 'toolbelt'))
33 def workflow(func=None, suffix_template=''):
38 return partial(workflow, suffix_template=suffix_template)
41 def _wrapper(ctx, **workflow_parameters):
43 workflow_name = _generate_name(
44 func_name=func.__name__,
45 suffix_template=suffix_template,
47 **workflow_parameters)
49 workflow_parameters.setdefault('ctx', ctx)
50 workflow_parameters.setdefault('graph', task_graph.TaskGraph(workflow_name))
51 validate_function_arguments(func, workflow_parameters)
52 with ctx.model.instrument(*ctx.INSTRUMENTATION_FIELDS):
53 with context.workflow.current.push(ctx):
54 func(**workflow_parameters)
55 return workflow_parameters['graph']
59 def operation(func=None, toolbelt=False, suffix_template='', logging_handlers=None):
65 return partial(operation,
66 suffix_template=suffix_template,
68 logging_handlers=logging_handlers)
71 def _wrapper(**func_kwargs):
72 ctx = func_kwargs['ctx']
74 operation_toolbelt = context.toolbelt(ctx)
75 func_kwargs.setdefault('toolbelt', operation_toolbelt)
76 validate_function_arguments(func, func_kwargs)
77 with ctx.model.instrument(*ctx.INSTRUMENTATION_FIELDS):
78 return func(**func_kwargs)
82 def _generate_name(func_name, ctx, suffix_template, **custom_kwargs):
83 return '{func_name}.{suffix}'.format(
85 suffix=suffix_template.format(ctx=ctx, **custom_kwargs) or generate_uuid(variant='uuid'))