1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements. See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License. You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 Execution plugin utilities.
26 from . import constants
27 from . import exceptions
31 return os.name == 'nt'
34 def download_script(ctx, script_path):
35 split = script_path.split('://')
37 suffix = script_path.split('/')[-1]
38 file_descriptor, dest_script_path = tempfile.mkstemp(suffix='-{0}'.format(suffix))
39 os.close(file_descriptor)
41 if schema in ('http', 'https'):
42 response = requests.get(script_path)
43 if response.status_code == 404:
44 ctx.task.abort('Failed to download script: {0} (status code: {1})'
45 .format(script_path, response.status_code))
46 content = response.text
47 with open(dest_script_path, 'wb') as f:
50 ctx.download_resource(destination=dest_script_path, path=script_path)
52 os.remove(dest_script_path)
54 return dest_script_path
57 def create_process_config(script_path, process, operation_kwargs, quote_json_env_vars=False):
59 Updates a process with its environment variables, and return it.
61 Gets a dict representing a process and a dict representing the environment variables. Converts
62 each environment variable to a format of::
64 <string representing the name of the variable>:
65 <json formatted string representing the value of the variable>.
67 Finally, updates the process with the newly formatted environment variables, and return the
70 :param process: dict representing a process
72 :param operation_kwargs: dict representing environment variables that should exist in the
73 process's running environment.
74 :type operation_kwargs: dict
75 :return: process updated with its environment variables
78 process = process or {}
79 env_vars = operation_kwargs.copy()
82 env_vars.update(process.get('env', {}))
83 for k, v in env_vars.items():
84 if isinstance(v, (dict, list, tuple, bool, int, float)):
86 if quote_json_env_vars:
89 # These <k,v> environment variables will subsequently
90 # be used in a subprocess.Popen() call, as the `env` parameter.
91 # In some windows python versions, if an environment variable
92 # name is not of type str (e.g. unicode), the Popen call will
95 # The windows shell removes all double quotes - escape them
96 # to still be able to pass JSON in env vars to the shell.
97 v = v.replace('"', '\\"')
100 process['env'] = env_vars
101 args = process.get('args')
102 command = script_path
103 command_prefix = process.get('command_prefix')
105 command = '{0} {1}'.format(command_prefix, command)
107 command = ' '.join([command] + [str(a) for a in args])
108 process['command'] = command
116 def _validate_legal_action():
117 if ctx._error is not None:
118 ctx._error = RuntimeError(constants.ILLEGAL_CTX_OPERATION_MESSAGE)
121 def abort_operation(message=None):
122 _validate_legal_action()
123 ctx._error = exceptions.ScriptException(message=message, retry=False)
125 task.abort = abort_operation
127 def retry_operation(message=None, retry_interval=None):
128 _validate_legal_action()
129 ctx._error = exceptions.ScriptException(message=message,
131 retry_interval=retry_interval)
133 task.retry = retry_operation
136 def check_error(ctx, error_check_func=None, reraise=False):
138 # this happens when a script calls task.abort/task.retry more than once
139 if isinstance(_error, RuntimeError):
140 ctx.task.abort(str(_error))
141 # ScriptException is populated by the ctx proxy server when task.abort or task.retry
143 elif isinstance(_error, exceptions.ScriptException):
145 ctx.task.retry(_error.message, _error.retry_interval)
147 ctx.task.abort(_error.message)
148 # local and ssh operations may pass an additional logic check for errors here
151 # if this function is called from within an ``except`` clause, a re-raise maybe required
153 raise # pylint: disable=misplaced-bare-raise