2 # Copyright (c) 2017 GigaSpaces Technologies Ltd. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
17 from functools import wraps
18 from contextlib import contextmanager
20 from aria import extension as aria_extension
22 from .context_adapter import CloudifyContextAdapter
25 @aria_extension.process_executor
26 class CloudifyExecutorExtension(object):
29 def decorator(function):
31 def wrapper(ctx, **operation_inputs):
32 # We assume that any Cloudify-based plugin would use the plugins-common, thus two
33 # different paths are created
34 is_cloudify_dependent = ctx.task.plugin and any(
35 'cloudify_plugins_common' in w for w in ctx.task.plugin.wheels)
37 if is_cloudify_dependent:
38 from cloudify import context
39 from cloudify.exceptions import (NonRecoverableError, RecoverableError)
41 with ctx.model.instrument(*ctx.INSTRUMENTATION_FIELDS):
42 # We need to create a new class dynamically, since CloudifyContextAdapter
43 # doesn't exist at runtime
44 ctx_adapter = type('_CloudifyContextAdapter',
45 (CloudifyContextAdapter, context.CloudifyContext),
49 with _push_cfy_ctx(ctx_adapter, operation_inputs):
51 function(ctx=ctx_adapter, **operation_inputs)
52 except NonRecoverableError as e:
53 ctx.task.abort(str(e))
54 except RecoverableError as e:
55 ctx.task.retry(str(e), retry_interval=e.retry_after)
56 except BaseException as e:
57 # Keep exception and raise it outside of "with", because
58 # contextmanager does not allow raising exceptions
60 if exception is not None:
63 function(ctx=ctx, **operation_inputs)
69 def _push_cfy_ctx(ctx, params):
70 from cloudify import state
73 # Support for Cloudify > 4.0
74 with state.current_ctx.push(ctx, params) as current_ctx:
77 except AttributeError:
78 # Support for Cloudify < 4.0
80 original_ctx = state.current_ctx.get_ctx()
84 original_params = state.current_ctx.get_parameters()
86 original_params = None
88 state.current_ctx.set(ctx, params)
90 yield state.current_ctx.get_ctx()
92 state.current_ctx.set(original_ctx, original_params)