1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements. See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License. You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 Local execution of operations.
25 from . import ctx_proxy
26 from . import exceptions
28 from . import constants
29 from . import environment_globals
30 from . import python_script_scope
33 def run_script(ctx, script_path, process, **kwargs):
35 ctx.task.abort('Missing script_path')
36 process = process or {}
37 script_path = common.download_script(ctx, script_path)
38 script_func = _get_run_script_func(script_path, process)
41 script_path=script_path,
43 operation_kwargs=kwargs)
46 def _get_run_script_func(script_path, process):
47 if _treat_script_as_python_script(script_path, process):
48 return _eval_script_func
50 if _treat_script_as_powershell_script(script_path):
51 process.setdefault('command_prefix', constants.DEFAULT_POWERSHELL_EXECUTABLE)
55 def _treat_script_as_python_script(script_path, process):
56 eval_python = process.get('eval_python')
57 script_extension = os.path.splitext(script_path)[1].lower()
58 return (eval_python is True or (script_extension == constants.PYTHON_SCRIPT_FILE_EXTENSION and
59 eval_python is not False))
62 def _treat_script_as_powershell_script(script_path):
63 script_extension = os.path.splitext(script_path)[1].lower()
64 return script_extension == constants.POWERSHELL_SCRIPT_FILE_EXTENSION
67 def _eval_script_func(script_path, ctx, operation_kwargs, **_):
68 with python_script_scope(operation_ctx=ctx, operation_inputs=operation_kwargs):
69 execfile(script_path, environment_globals.create_initial_globals(script_path))
72 def _execute_func(script_path, ctx, process, operation_kwargs):
73 os.chmod(script_path, 0755)
74 process = common.create_process_config(
75 script_path=script_path,
77 operation_kwargs=operation_kwargs)
78 command = process['command']
79 env = os.environ.copy()
80 env.update(process['env'])
81 ctx.logger.info('Executing: {0}'.format(command))
82 with ctx_proxy.server.CtxProxy(ctx, common.patch_ctx) as proxy:
83 env[ctx_proxy.client.CTX_SOCKET_URL] = proxy.socket_url
84 running_process = subprocess.Popen(
87 stdout=subprocess.PIPE,
88 stderr=subprocess.PIPE,
90 cwd=process.get('cwd'),
92 close_fds=not common.is_windows())
93 stdout_consumer = _OutputConsumer(running_process.stdout)
94 stderr_consumer = _OutputConsumer(running_process.stderr)
95 exit_code = running_process.wait()
96 stdout_consumer.join()
97 stderr_consumer.join()
98 ctx.logger.info('Execution done (exit_code={0}): {1}'.format(exit_code, command))
100 def error_check_func():
102 raise exceptions.ProcessException(
105 stdout=stdout_consumer.read_output(),
106 stderr=stderr_consumer.read_output())
107 return common.check_error(ctx, error_check_func=error_check_func)
110 class _OutputConsumer(object):
112 def __init__(self, out):
114 self._buffer = StringIO.StringIO()
115 self._consumer = threading.Thread(target=self._consume_output)
116 self._consumer.daemon = True
117 self._consumer.start()
119 def _consume_output(self):
120 for line in iter(self._out.readline, b''):
121 self._buffer.write(line)
124 def read_output(self):
125 return self._buffer.getvalue()
128 self._consumer.join()