X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=azure%2Faria%2Faria-extension-cloudify%2Fsrc%2Faria%2Faria%2Forchestrator%2Fexecution_plugin%2Flocal.py;fp=azure%2Faria%2Faria-extension-cloudify%2Fsrc%2Faria%2Faria%2Forchestrator%2Fexecution_plugin%2Flocal.py;h=04b9ecdcb8e6e89ba627e930493dbbc1255cd08f;hb=7409dfb144cf2a06210400134d822a1393462b1f;hp=0000000000000000000000000000000000000000;hpb=9e65649dfff8f00dc0a0ef6b10d020ae0e2255ba;p=multicloud%2Fazure.git diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/execution_plugin/local.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/execution_plugin/local.py new file mode 100644 index 0000000..04b9ecd --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/execution_plugin/local.py @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Local execution of operations. +""" + +import os +import subprocess +import threading +import StringIO + +from . import ctx_proxy +from . import exceptions +from . import common +from . import constants +from . import environment_globals +from . import python_script_scope + + +def run_script(ctx, script_path, process, **kwargs): + if not script_path: + ctx.task.abort('Missing script_path') + process = process or {} + script_path = common.download_script(ctx, script_path) + script_func = _get_run_script_func(script_path, process) + return script_func( + ctx=ctx, + script_path=script_path, + process=process, + operation_kwargs=kwargs) + + +def _get_run_script_func(script_path, process): + if _treat_script_as_python_script(script_path, process): + return _eval_script_func + else: + if _treat_script_as_powershell_script(script_path): + process.setdefault('command_prefix', constants.DEFAULT_POWERSHELL_EXECUTABLE) + return _execute_func + + +def _treat_script_as_python_script(script_path, process): + eval_python = process.get('eval_python') + script_extension = os.path.splitext(script_path)[1].lower() + return (eval_python is True or (script_extension == constants.PYTHON_SCRIPT_FILE_EXTENSION and + eval_python is not False)) + + +def _treat_script_as_powershell_script(script_path): + script_extension = os.path.splitext(script_path)[1].lower() + return script_extension == constants.POWERSHELL_SCRIPT_FILE_EXTENSION + + +def _eval_script_func(script_path, ctx, operation_kwargs, **_): + with python_script_scope(operation_ctx=ctx, operation_inputs=operation_kwargs): + execfile(script_path, environment_globals.create_initial_globals(script_path)) + + +def _execute_func(script_path, ctx, process, operation_kwargs): + os.chmod(script_path, 0755) + process = common.create_process_config( + script_path=script_path, + process=process, + operation_kwargs=operation_kwargs) + command = process['command'] + env = os.environ.copy() + env.update(process['env']) + ctx.logger.info('Executing: {0}'.format(command)) + with ctx_proxy.server.CtxProxy(ctx, common.patch_ctx) as proxy: + env[ctx_proxy.client.CTX_SOCKET_URL] = proxy.socket_url + running_process = subprocess.Popen( + command, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + cwd=process.get('cwd'), + bufsize=1, + close_fds=not common.is_windows()) + stdout_consumer = _OutputConsumer(running_process.stdout) + stderr_consumer = _OutputConsumer(running_process.stderr) + exit_code = running_process.wait() + stdout_consumer.join() + stderr_consumer.join() + ctx.logger.info('Execution done (exit_code={0}): {1}'.format(exit_code, command)) + + def error_check_func(): + if exit_code: + raise exceptions.ProcessException( + command=command, + exit_code=exit_code, + stdout=stdout_consumer.read_output(), + stderr=stderr_consumer.read_output()) + return common.check_error(ctx, error_check_func=error_check_func) + + +class _OutputConsumer(object): + + def __init__(self, out): + self._out = out + self._buffer = StringIO.StringIO() + self._consumer = threading.Thread(target=self._consume_output) + self._consumer.daemon = True + self._consumer.start() + + def _consume_output(self): + for line in iter(self._out.readline, b''): + self._buffer.write(line) + self._out.close() + + def read_output(self): + return self._buffer.getvalue() + + def join(self): + self._consumer.join()