Correct dynamic properties grpc json serializing
[ccsdk/cds.git] / ms / command-executor / src / main / python / command_executor_handler.py
1 #
2 # Copyright (C) 2019 - 2020 Bell Canada.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 from builtins import Exception, open, dict
17 from subprocess import CalledProcessError, PIPE, TimeoutExpired
18 from google.protobuf.json_format import MessageToJson
19 import tempfile
20 import logging
21 import os
22 import sys
23 import re
24 import subprocess
25 import virtualenv
26 import venv
27 import utils
28 import proto.CommandExecutor_pb2 as CommandExecutor_pb2
29 from zipfile import ZipFile
30 import io
31 import time
32 import prometheus_client as prometheus
33
34 REQUIREMENTS_TXT = "requirements.txt"
35
36
37 class CommandExecutorHandler():
38     BLUEPRINTS_DEPLOY_DIR = '/opt/app/onap/blueprints/deploy/'
39     TOSCA_META_FILE = 'TOSCA-Metadata/TOSCA.meta'
40     PROMETHEUS_METRICS_UPLOAD_CBA_LABEL = 'upload_cba'
41     PROMETHEUS_METRICS_PREP_ENV_LABEL = 'prepare_env'
42     PROMETHEUS_METRICS_EXEC_COMMAND_LABEL = 'execute_command'
43
44     def __init__(self, request):
45         self.request = request
46         self.logger = logging.getLogger(self.__class__.__name__)
47         self.blueprint_name = utils.get_blueprint_name(request)
48         self.blueprint_version = utils.get_blueprint_version(request)
49         self.uuid = utils.get_blueprint_uuid(request)
50         self.request_id = utils.get_blueprint_requestid(request)
51         self.sub_request_id = utils.get_blueprint_subRequestId(request)
52         self.blueprint_name_version_uuid = utils.blueprint_name_version_uuid(request)
53         self.execution_timeout = utils.get_blueprint_timeout(request)
54         # onap/blueprints/deploy will be ephemeral now
55         self.blueprint_dir = self.BLUEPRINTS_DEPLOY_DIR + self.blueprint_name_version_uuid
56         self.blueprint_tosca_meta_file = self.blueprint_dir + '/' + self.TOSCA_META_FILE
57         self.extra = utils.getExtraLogData(request)
58         self.installed = self.blueprint_dir + '/.installed'
59         self.prometheus_histogram = self.get_prometheus_histogram()
60         self.prometheus_counter = self.get_prometheus_counter()
61         self.start_prometheus_server()
62
63     def get_prometheus_histogram(self):
64         histogram = getattr(prometheus.REGISTRY, '_command_executor_histogram', None)
65         if not histogram:
66             histogram = prometheus.Histogram('cds_ce_execution_duration_seconds',
67                              'How many times CE actions (upload, prepare env and execute) got executed and how long it took to complete for each CBA python script.',
68                              ['step', 'blueprint_name', 'blueprint_version', 'script_name'])
69             prometheus.REGISTRY._command_executor_histogram = histogram
70         return histogram
71
72     def get_prometheus_counter(self):
73         counter = getattr(prometheus.REGISTRY, '_command_executor_counter', None)
74         if not counter:
75             counter = prometheus.Counter('cds_ce_execution_error_total',
76                               'How many times CE actions (upload, prepare env and execute) got executed and failed for each CBA python script',
77                               ['step', 'blueprint_name', 'blueprint_version', 'script_name'])
78             prometheus.REGISTRY._command_executor_counter = counter
79         return counter
80
81     def start_prometheus_server(self):
82         self.logger.info("PROMETHEUS_METRICS_ENABLED: {}".format(os.environ.get('PROMETHEUS_METRICS_ENABLED')), extra=self.extra)
83         if (os.environ.get('PROMETHEUS_METRICS_ENABLED')):
84            if not "PROMETHEUS_PORT" in os.environ:
85               err_msg = "ERROR: failed to start prometheus server, PROMETHEUS_PORT env variable is not found."
86               self.logger.error(err_msg, extra=self.extra)
87               return utils.build_ret_data(False, results_log=[], error=err_msg)
88
89            server_started = getattr(prometheus.REGISTRY, '_command_executor_prometheus_server_started', None)
90            if not server_started:
91                self.logger.info("PROMETHEUS_PORT: {}".format(os.environ.get('PROMETHEUS_PORT')), extra=self.extra)
92                prometheus.start_http_server(int(os.environ.get('PROMETHEUS_PORT')))
93                prometheus.REGISTRY._command_executor_prometheus_server_started = True
94
95     def is_installed(self):
96         return os.path.exists(self.installed)
97
98     def blueprint_dir_exists(self):
99         return os.path.exists(self.blueprint_dir)
100
101     # used to validate if the blueprint actually had a chace of getting uploaded
102     def blueprint_tosca_meta_file_exists(self):
103         return os.path.exists(self.blueprint_tosca_meta_file)
104
105     def err_exit(self, msg):
106         self.logger.error(msg, extra=self.extra)
107         return utils.build_ret_data(False, error=msg)
108     
109     def is_valid_archive_type(self, archiveType):
110         return archiveType=="CBA_ZIP" or archiveType=="CBA_GZIP"
111
112     # Handle uploading blueprint request
113     # accept UploadBlueprintInput (CommandExecutor.proto) struct
114     # create dir blueprintName/BlueprintVersion/BlueprintUUID, and extract binData as either ZIP file or GZIP
115     # based on archiveType field...
116     def uploadBlueprint(self, request):
117         start_time = time.time()
118         archive_type = request.archiveType
119         compressed_cba_stream = io.BytesIO(request.binData)
120         self.logger.info("uploadBlueprint request {}".format(request), extra=self.extra)
121         if not self.is_valid_archive_type(archive_type):
122             self.prometheus_counter.labels(self.PROMETHEUS_METRICS_UPLOAD_CBA_LABEL, self.blueprint_name, self.blueprint_version, None).inc()
123             return utils.build_grpc_blueprint_upload_response(self.request_id, self.sub_request_id, False, ["Archive type {} is not valid.".format(archive_type)])
124         
125         # create the BP dir self.blueprint_dir
126         try:
127             os.makedirs(name=self.blueprint_dir, mode=0o755, exist_ok=True)
128         except OSError as ex:
129             self.prometheus_counter.labels(self.PROMETHEUS_METRICS_UPLOAD_CBA_LABEL, self.blueprint_name, self.blueprint_version, None).inc()
130             err_msg = "Failed to create blueprint dir: {} exception message: {}".format(self.blueprint_dir, ex.strerror)
131             self.logger.error(err_msg, extra=self.extra)
132             return utils.build_grpc_blueprint_upload_response(self.request_id, self.sub_request_id, False, [err_msg])
133         if archive_type=="CBA_ZIP":
134             self.logger.info("Extracting ZIP data to dir {}".format(self.blueprint_dir), extra=self.extra)
135             try:
136                 with ZipFile(compressed_cba_stream,'r') as zipfile:
137                     zipfile.extractall(self.blueprint_dir)                    
138                 self.logger.info("Done extracting ZIP data to dir {}".format(self.blueprint_dir), extra=self.extra)
139             except (IOError, zipfile.error) as e:
140                 self.prometheus_counter.labels(self.PROMETHEUS_METRICS_UPLOAD_CBA_LABEL, self.blueprint_name, self.blueprint_version, None).inc()
141                 err_msg = "Error extracting ZIP data to dir {} exception: {}".format(self.blueprint_dir, e)
142                 self.logger.error(err_msg, extra=self.extra)
143                 return utils.build_grpc_blueprint_upload_response(self.request_id, self.sub_request_id, False, [err_msg])
144         # TODO with an actual test gzip cba...
145         elif archive_type=="CBA_GZIP":
146             self.prometheus_counter.labels(self.PROMETHEUS_METRICS_UPLOAD_CBA_LABEL, self.blueprint_name, self.blueprint_version, None).inc()
147             self.logger.error("CBA_GZIP TODO", extra=self.extra)
148             return utils.build_grpc_blueprint_upload_response(self.request_id, self.sub_request_id, False, ["Error extracting GZIP data to {} GZIP todo!".format(self.blueprint_dir)])
149         # Finally, everything is ok!
150         self.prometheus_histogram.labels(self.PROMETHEUS_METRICS_UPLOAD_CBA_LABEL, self.blueprint_name, self.blueprint_version, None).observe(time.time() - start_time)
151         return utils.build_grpc_blueprint_upload_response(self.request_id, self.sub_request_id, True, [])
152
153     def prepare_env(self, request):
154         results_log = []
155         start_time = time.time()
156
157         self.logger.info("prepare_env request {}".format(request), extra=self.extra)
158         # validate that the blueprint name in the request exists, if not, notify the caller
159         if not self.blueprint_dir_exists():
160             self.prometheus_counter.labels(self.PROMETHEUS_METRICS_PREP_ENV_LABEL, self.blueprint_name, self.blueprint_version, None).inc()
161             err_msg = "CBA directory {} not found on cmd-exec. CBA will be uploaded by BP proc.".format(self.blueprint_name_version_uuid)
162             self.logger.info(err_msg, extra=self.extra)
163             return utils.build_ret_data(False, results_log=results_log, error=err_msg, reupload_cba=True)
164         if not self.blueprint_tosca_meta_file_exists():
165             self.prometheus_counter.labels(self.PROMETHEUS_METRICS_PREP_ENV_LABEL, self.blueprint_name, self.blueprint_version, None).inc()
166             err_msg = "CBA directory {} exists on cmd-exec, but TOSCA meta file is not found!!! Returning (null) as UUID. CBA will be uploaded by BP proc.".format(self.blueprint_name_version_uuid)
167             self.logger.info(err_msg, extra=self.extra)
168             return utils.build_ret_data(False, results_log=results_log, error=err_msg, reupload_cba=True)
169         self.logger.info("CBA directory {} exists on cmd-exec.".format(self.blueprint_name_version_uuid), extra=self.extra)
170
171         if not self.is_installed():
172             create_venv_status = self.create_venv()
173             if not create_venv_status[utils.CDS_IS_SUCCESSFUL_KEY]:
174                 self.prometheus_counter.labels(self.PROMETHEUS_METRICS_PREP_ENV_LABEL, self.blueprint_name, self.blueprint_version, None).inc()
175                 return self.err_exit("ERROR: failed to prepare environment for request {} due to error in creating virtual Python env. Original error {}".format(self.blueprint_name_version_uuid, create_venv_status[utils.ERR_MSG_KEY]))
176
177             activate_venv_status = self.activate_venv()
178             if not activate_venv_status[utils.CDS_IS_SUCCESSFUL_KEY]:
179                 self.prometheus_counter.labels(self.PROMETHEUS_METRICS_PREP_ENV_LABEL, self.blueprint_name, self.blueprint_version, None).inc()
180                 return self.err_exit("ERROR: failed to prepare environment for request {} due Python venv_activation. Original error {}".format(self.blueprint_name_version_uuid, activate_venv_status[utils.ERR_MSG_KEY]))
181
182             try:
183                 with open(self.installed, "w+") as f:
184                     if not self.install_packages(request, CommandExecutor_pb2.pip, f, results_log):
185                         self.prometheus_counter.labels(self.PROMETHEUS_METRICS_PREP_ENV_LABEL, self.blueprint_name, self.blueprint_version, None).inc()
186                         err_msg = "ERROR: failed to prepare environment for request {} during pip package install.".format(self.blueprint_name_version_uuid)
187                         return utils.build_ret_data(False, results_log=results_log, error=err_msg)
188                     f.write("\r\n") # TODO: is \r needed?
189                     results_log.append("\n")
190                     if not self.install_packages(request, CommandExecutor_pb2.ansible_galaxy, f, results_log):
191                         self.prometheus_counter.labels(self.PROMETHEUS_METRICS_PREP_ENV_LABEL, self.blueprint_name, self.blueprint_version, None).inc()
192                         err_msg = "ERROR: failed to prepare environment for request {} during Ansible install.".format(self.blueprint_name_version_uuid)
193                         return utils.build_ret_data(False, results_log=results_log, error=err_msg)
194             except Exception as ex:
195                 self.prometheus_counter.labels(self.PROMETHEUS_METRICS_PREP_ENV_LABEL, self.blueprint_name, self.blueprint_version, None).inc()
196                 err_msg = "ERROR: failed to prepare environment for request {} during installing packages. Exception: {}".format(self.blueprint_name_version_uuid, ex)
197                 self.logger.error(err_msg, extra=self.extra)
198                 return utils.build_ret_data(False, error=err_msg)
199         else:
200             try:
201                 with open(self.installed, "r") as f:
202                     results_log.append(f.read())
203             except Exception as ex:
204                 self.prometheus_counter.labels(self.PROMETHEUS_METRICS_PREP_ENV_LABEL, self.blueprint_name, self.blueprint_version, None).inc()
205                 err_msg="ERROR: failed to prepare environment during reading 'installed' file {}. Exception: {}".format(self.installed, ex)
206                 return utils.build_ret_data(False, error=err_msg)
207
208         # deactivate_venv(blueprint_id)
209         self.prometheus_histogram.labels(self.PROMETHEUS_METRICS_PREP_ENV_LABEL, self.blueprint_name, self.blueprint_version, None).observe(time.time() - start_time)
210         return utils.build_ret_data(True, results_log=results_log)
211
212     def execute_command(self, request):
213         start_time = time.time()
214         # STDOUT/STDERR output of the process
215         results_log = []
216         # encoded payload returned by the process
217         result = {}
218
219         self.logger.info("execute_command request {}".format(request), extra=self.extra)
220         # workaround for when packages are not specified, we may not want to go through the install step
221         # can just call create_venv from here.
222         if not self.is_installed():
223             self.create_venv()
224         try:
225             if not self.is_installed():
226                 create_venv_status = self.create_venv
227                 if not create_venv_status[utils.CDS_IS_SUCCESSFUL_KEY]:
228                     self.prometheus_counter.labels(self.PROMETHEUS_METRICS_EXEC_COMMAND_LABEL, self.blueprint_name, self.blueprint_version, request.command).inc()
229                     err_msg = "{} - Failed to execute command during venv creation. Original error: {}".format(self.blueprint_name_version_uuid, create_venv_status[utils.ERR_MSG_KEY])
230                     return utils.build_ret_data(False, error=err_msg)
231             activate_response = self.activate_venv()
232             if not activate_response[utils.CDS_IS_SUCCESSFUL_KEY]:
233                 orig_error = activate_response[utils.ERR_MSG_KEY]
234                 err_msg = "{} - Failed to execute command during environment activation. Original error: {}".format(self.blueprint_name_version_uuid, orig_error)
235                 return utils.build_ret_data(False, error=err_msg)
236             # touch blueprint dir to indicate this CBA was used recently
237             os.utime(self.blueprint_dir)
238
239             cmd = "cd " + self.blueprint_dir
240
241             ### if properties are defined we add them to the command
242             properties = ""
243             if request.properties is not None and len(request.properties) > 0:
244                 properties = " " + re.escape(MessageToJson(request.properties)).replace('"','\\"')
245
246             ### TODO: replace with os.environ['VIRTUAL_ENV']?
247             if "ansible-playbook" in request.command:
248                 cmd = cmd + "; " + request.command + " -e 'ansible_python_interpreter=" + self.blueprint_dir + "/bin/python'"
249             else:
250                 cmd = cmd + "; " + request.command + properties
251
252             ### extract the original header request into sys-env variables
253             ### OriginatorID
254             originator_id = request.originatorId
255             ### CorrelationID
256             correlation_id = request.correlationId
257             request_id_map = {'CDS_REQUEST_ID':self.request_id, 'CDS_SUBREQUEST_ID':self.sub_request_id, 'CDS_ORIGINATOR_ID': originator_id, 'CDS_CORRELATION_ID': correlation_id}
258             updated_env =  { **os.environ, **request_id_map }
259             self.logger.info("Running blueprint {} with timeout: {}".format(self.blueprint_name_version_uuid, self.execution_timeout), extra=self.extra)
260
261             with tempfile.TemporaryFile(mode="w+") as tmp:
262                 try:
263                     completed_subprocess = subprocess.run(cmd, stdout=tmp, stderr=subprocess.STDOUT, shell=True,
264                                                 env=updated_env, timeout=self.execution_timeout)
265                 except TimeoutExpired:
266                     self.prometheus_counter.labels(self.PROMETHEUS_METRICS_EXEC_COMMAND_LABEL, self.blueprint_name, self.blueprint_version, request.command).inc()
267                     timeout_err_msg = "Running command {} failed due to timeout of {} seconds.".format(self.blueprint_name_version_uuid, self.execution_timeout)
268                     self.logger.error(timeout_err_msg, extra=self.extra)
269                     utils.parse_cmd_exec_output(tmp, self.logger, result, results_log, self.extra)
270                     return utils.build_ret_data(False, results_log=results_log, error=timeout_err_msg)
271
272                 utils.parse_cmd_exec_output(tmp, self.logger, result, results_log, self.extra)
273                 rc = completed_subprocess.returncode
274         except Exception as e:
275             self.prometheus_counter.labels(self.PROMETHEUS_METRICS_EXEC_COMMAND_LABEL, self.blueprint_name, self.blueprint_version, request.command).inc()
276             err_msg = "{} - Failed to execute command. Error: {}".format(self.blueprint_name_version_uuid, e)
277             result.update(utils.build_ret_data(False, results_log=results_log, error=err_msg))
278             return result
279
280         # deactivate_venv(blueprint_id)
281         #Since return code is only used to check if it's zero (success), we can just return success flag instead.
282         is_execution_successful = rc == 0
283         result.update(utils.build_ret_data(is_execution_successful, results_log=results_log))
284         self.prometheus_histogram.labels(self.PROMETHEUS_METRICS_EXEC_COMMAND_LABEL, self.blueprint_name, self.blueprint_version, request.command).observe(time.time() - start_time)
285
286         return result
287
288     def install_packages(self, request, type, f, results):
289         success = self.install_python_packages('UTILITY', results)
290         if not success:
291             self.logger.error("Error installing 'UTILITY (cds_utils) package to CBA python environment!!!", extra=self.extra)
292             return False
293
294         for package in request.packages:
295             if package.type == type:
296                 f.write("Installed %s packages:\r\n" % CommandExecutor_pb2.PackageType.Name(type))
297                 for p in package.package:
298                     f.write("   %s\r\n" % p)
299                     if package.type == CommandExecutor_pb2.pip:
300                         success = self.install_python_packages(p, results)
301                     else:
302                         success = self.install_ansible_packages(p, results)
303                     if not success:
304                         f.close()
305                         os.remove(self.installed)
306                         return False
307         return True
308
309     def install_python_packages(self, package, results):
310         self.logger.info(
311             "{} - Install Python package({}) in Python Virtual Environment".format(self.blueprint_name_version_uuid, package), extra=self.extra)
312
313         if REQUIREMENTS_TXT == package:
314             command = ["pip", "install", "--user", "-r", self.blueprint_dir + "/Environments/" + REQUIREMENTS_TXT]
315         elif package == 'UTILITY':
316             py_ver_maj = sys.version_info.major
317             py_ver_min = sys.version_info.minor
318             command = ["cp", "-r", "./cds_utils", "{}/lib/python{}.{}/site-packages/".format(self.blueprint_dir, py_ver_maj,py_ver_min)]
319         else:
320             command = ["pip", "install", "--user", package]
321
322         env = dict(os.environ)
323         if "https_proxy" in os.environ:
324             env['https_proxy'] = os.environ['https_proxy']
325             self.logger.info("Using https_proxy: {}".format(env['https_proxy']), extra=self.extra)
326         try:
327             results.append(subprocess.run(command, check=True, stdout=PIPE, stderr=PIPE, env=env).stdout.decode())
328             results.append("\n")
329             self.logger.info("install_python_packages {} succeeded".format(package), extra=self.extra)
330             return True
331         except CalledProcessError as e:
332             results.append(e.stderr.decode())
333             self.logger.error("install_python_packages {} failed".format(package), extra=self.extra)
334             return False
335
336     def install_ansible_packages(self, package, results):
337         self.logger.info(
338             "{} - Install Ansible Role package({}) in Python Virtual Environment".format(self.blueprint_name_version_uuid, package), extra=self.extra)
339         command = ["ansible-galaxy", "install", package, "-p", self.blueprint_dir + "/Scripts/ansible/roles"]
340
341         env = dict(os.environ)
342         if "http_proxy" in os.environ:
343             # ansible galaxy uses https_proxy environment variable, but requires it to be set with http proxy value.
344             env['https_proxy'] = os.environ['http_proxy']
345         try:
346             results.append(subprocess.run(command, check=True, stdout=PIPE, stderr=PIPE, env=env).stdout.decode())
347             results.append("\n")
348             return True
349         except CalledProcessError as e:
350             results.append(e.stderr.decode())
351             return False
352
353     # Returns a map with 'status' and 'err_msg'.
354     # 'status' True indicates success.
355     # 'err_msg' indicates an error occurred. The presence of err_msg may not be fatal,
356     # status should be set to False for fatal errors.
357     def create_venv(self):
358         self.logger.info("{} - Create Python Virtual Environment".format(self.blueprint_name_version_uuid), extra=self.extra)
359         try:
360             bin_dir = self.blueprint_dir + "/bin"
361             # venv doesn't populate the activate_this.py script, hence we use from virtualenv
362             venv.create(self.blueprint_dir, with_pip=True, system_site_packages=True)
363             virtualenv.writefile(os.path.join(bin_dir, "activate_this.py"), virtualenv.ACTIVATE_THIS)
364             self.logger.info("{} - Creation of Python Virtual Environment finished.".format(self.blueprint_name_version_uuid), extra=self.extra)
365             return utils.build_ret_data(True)
366         except Exception as err:
367             err_msg = "{} - Failed to provision Python Virtual Environment. Error: {}".format(self.blueprint_name_version_uuid, err)
368             self.logger.info(err_msg, extra=self.extra)
369             return utils.build_ret_data(False, error=err_msg)
370
371     # return map cds_is_successful and err_msg. Status is True on success. err_msg may existence doesn't necessarily indicate fatal condition.
372     # the 'status' should be set to False to indicate error.
373     def activate_venv(self):
374         self.logger.info("{} - Activate Python Virtual Environment".format(self.blueprint_name_version_uuid), extra=self.extra)
375
376         # Fix: The python generated activate_this.py script concatenates the env bin dir to PATH on every call
377         #      eventually this process PATH variable was so big (128Kb) that no child process could be spawn
378         #      This script will remove all duplicates; while keeping the order of the PATH folders
379         fixpathenvvar = "os.environ['PATH']=os.pathsep.join(list(dict.fromkeys(os.environ['PATH'].split(':'))))"
380
381         path = "%s/bin/activate_this.py" % self.blueprint_dir
382         try:
383             with open(path) as activate_this_script:
384                 exec (activate_this_script.read(), {'__file__': path})
385             exec (fixpathenvvar)
386             self.logger.info("Running with PATH : {}".format(os.environ['PATH']), extra=self.extra)
387             return utils.build_ret_data(True)
388         except Exception as err:
389             err_msg ="{} - Failed to activate Python Virtual Environment. Error: {}".format(self.blueprint_name_version_uuid, err)
390             self.logger.info( err_msg, extra=self.extra)
391             return utils.build_ret_data(False, error=err_msg)
392
393     def deactivate_venv(self):
394         self.logger.info("{} - Deactivate Python Virtual Environment".format(self.blueprint_name_version_uuid), extra=self.extra)
395         command = ["deactivate"]
396         try:
397             subprocess.run(command, check=True)
398         except Exception as err:
399             self.logger.info(
400                 "{} - Failed to deactivate Python Virtual Environment. Error: {}".format(self.blueprint_name_version_uuid, err), extra=self.extra)
401
402