# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Workflow and operation decorators. """ from functools import partial, wraps from ..utils.validation import validate_function_arguments from ..utils.uuid import generate_uuid from . import context from .workflows.api import task_graph WORKFLOW_DECORATOR_RESERVED_ARGUMENTS = set(('ctx', 'graph')) OPERATION_DECORATOR_RESERVED_ARGUMENTS = set(('ctx', 'toolbelt')) def workflow(func=None, suffix_template=''): """ Workflow decorator. """ if func is None: return partial(workflow, suffix_template=suffix_template) @wraps(func) def _wrapper(ctx, **workflow_parameters): workflow_name = _generate_name( func_name=func.__name__, suffix_template=suffix_template, ctx=ctx, **workflow_parameters) workflow_parameters.setdefault('ctx', ctx) workflow_parameters.setdefault('graph', task_graph.TaskGraph(workflow_name)) validate_function_arguments(func, workflow_parameters) with ctx.model.instrument(*ctx.INSTRUMENTATION_FIELDS): with context.workflow.current.push(ctx): func(**workflow_parameters) return workflow_parameters['graph'] return _wrapper def operation(func=None, toolbelt=False, suffix_template='', logging_handlers=None): """ Operation decorator. """ if func is None: return partial(operation, suffix_template=suffix_template, toolbelt=toolbelt, logging_handlers=logging_handlers) @wraps(func) def _wrapper(**func_kwargs): ctx = func_kwargs['ctx'] if toolbelt: operation_toolbelt = context.toolbelt(ctx) func_kwargs.setdefault('toolbelt', operation_toolbelt) validate_function_arguments(func, func_kwargs) with ctx.model.instrument(*ctx.INSTRUMENTATION_FIELDS): return func(**func_kwargs) return _wrapper def _generate_name(func_name, ctx, suffix_template, **custom_kwargs): return '{func_name}.{suffix}'.format( func_name=func_name, suffix=suffix_template.format(ctx=ctx, **custom_kwargs) or generate_uuid(variant='uuid'))