R10 tag/path update
[dcaegen2/platform/plugins.git] / k8s / k8sclient / k8sclient.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # Copyright (c) 2020-2021 Nokia. All rights reserved.
7 # Copyright (c) 2020 J. F. Lucas.  All rights reserved.
8 # ================================================================================
9 # Licensed under the Apache License, Version 2.0 (the "License");
10 # you may not use this file except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #      http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 # ============LICENSE_END=========================================================
21 #
22 from distutils import util
23 import os
24 import re
25 import uuid
26 import base64
27
28 from binascii import hexlify
29 from kubernetes import config, client, stream
30 from .sans_parser import SansParser
31
32 # Default values for readiness probe
33 PROBE_DEFAULT_PERIOD = 15
34 PROBE_DEFAULT_TIMEOUT = 1
35
36 # Location of k8s cluster config file ("kubeconfig")
37 K8S_CONFIG_PATH = "/opt/onap/kube/kubeconfig"
38
39 # Regular expression for interval/timeout specification
40 INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$")
41 # Conversion factors to seconds
42 FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600}
43
44 # Regular expression for port mapping
45 # group 1: container port
46 # group 2: / + protocol
47 # group 3: protocol
48 # group 4: host port
49 PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$")
50
51 # Constants for external_cert
52 MOUNT_PATH = "/etc/onap/oom/certservice/certs/"
53 DEFAULT_CERT_TYPE = "p12"
54
55
56 def _create_deployment_name(component_name):
57     return "dep-{0}".format(component_name)[:63]
58
59
60 def _create_service_name(component_name):
61     return "{0}".format(component_name)[:63]
62
63
64 def _create_exposed_service_name(component_name):
65     return ("x{0}".format(component_name))[:63]
66
67
68 def _create_exposed_v6_service_name(component_name):
69     return ("x{0}-ipv6".format(component_name))[:63]
70
71
72 def _configure_api(location=None):
73     # Look for a kubernetes config file
74     if os.path.exists(K8S_CONFIG_PATH):
75         config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False)
76     else:
77         # Maybe we're running in a k8s container and we can use info provided by k8s
78         # We would like to use:
79         # config.load_incluster_config()
80         # but this looks into os.environ for kubernetes host and port, and from
81         # the plugin those aren't visible.   So we use the InClusterConfigLoader class,
82         # where we can set the environment to what we like.
83         # This is probably brittle!  Maybe there's a better alternative.
84         localenv = {
85             config.incluster_config.SERVICE_HOST_ENV_NAME: "kubernetes.default.svc.cluster.local",
86             config.incluster_config.SERVICE_PORT_ENV_NAME: "443"
87         }
88         config.incluster_config.InClusterConfigLoader(
89             token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME,
90             cert_filename=config.incluster_config.SERVICE_CERT_FILENAME,
91             environ=localenv
92         ).load_and_set()
93
94
95 def _parse_interval(t):
96     """
97     Parse an interval specification
98     t can be
99        - a simple integer quantity, interpreted as seconds
100        - a string representation of a decimal integer, interpreted as seconds
101        - a string consisting of a represention of an decimal integer followed by a unit,
102          with "s" representing seconds, "m" representing minutes,
103          and "h" representing hours
104     Used for compatibility with the Docker plugin, where time intervals
105     for health checks were specified as strings with a number and a unit.
106     See 'intervalspec' above for the regular expression that's accepted.
107     """
108     m = INTERVAL_SPEC.match(str(t))
109     if m:
110         time = int(m.group(1)) * FACTORS[m.group(2)]
111     else:
112         raise ValueError("Bad interval specification: {0}".format(t))
113     return time
114
115
116 def _create_probe(hc, port):
117     """ Create a Kubernetes probe based on info in the health check dictionary hc """
118     probe_type = hc['type']
119     probe = None
120     period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD))
121     timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT))
122     if probe_type in ['http', 'https']:
123         probe = client.V1Probe(
124             failure_threshold=1,
125             initial_delay_seconds=5,
126             period_seconds=period,
127             timeout_seconds=timeout,
128             http_get=client.V1HTTPGetAction(
129                 path=hc['endpoint'],
130                 port=port,
131                 scheme=probe_type.upper()
132             )
133         )
134     elif probe_type in ['script', 'docker']:
135         probe = client.V1Probe(
136             failure_threshold=1,
137             initial_delay_seconds=5,
138             period_seconds=period,
139             timeout_seconds=timeout,
140             _exec=client.V1ExecAction(
141                 command=hc['script'].split()
142             )
143         )
144     return probe
145
146
147 def _create_resources(resources=None):
148     if resources is not None:
149         resources_obj = client.V1ResourceRequirements(
150             limits=resources.get("limits"),
151             requests=resources.get("requests")
152         )
153         return resources_obj
154     else:
155         return None
156
157
158 def _create_container_object(name, image, always_pull, **kwargs):
159     # Set up environment variables
160     # Copy any passed in environment variables
161     env = kwargs.get('env') or {}
162     env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env]
163
164     # Add POD_IP with the IP address of the pod running the container
165     pod_ip = client.V1EnvVarSource(field_ref=client.V1ObjectFieldSelector(field_path="status.podIP"))
166     env_vars.append(client.V1EnvVar(name="POD_IP", value_from=pod_ip))
167
168     # Add envs from Secret
169     if 'env_from_secret' in kwargs:
170         for env in kwargs.get('env_from_secret').values():
171             secret_key_selector = client.V1SecretKeySelector(key=env["secret_key"], name=env["secret_name"])
172             env_var_source = client.V1EnvVarSource(secret_key_ref=secret_key_selector)
173             env_vars.append(client.V1EnvVar(name=env["env_name"], value_from=env_var_source))
174
175     # If a health check is specified, create a readiness/liveness probe
176     # (For an HTTP-based check, we assume it's at the first container port)
177     readiness = kwargs.get('readiness')
178     liveness = kwargs.get('liveness')
179     resources = kwargs.get('resources')
180     container_ports = kwargs.get('container_ports') or []
181
182     hc_port = container_ports[0][0] if container_ports else None
183     probe = _create_probe(readiness, hc_port) if readiness else None
184     live_probe = _create_probe(liveness, hc_port) if liveness else None
185     resources_obj = _create_resources(resources) if resources else None
186     port_objs = [client.V1ContainerPort(container_port=port, protocol=proto)
187                  for port, proto in container_ports]
188
189     # Define container for pod
190     return client.V1Container(
191         name=name,
192         image=image,
193         image_pull_policy='Always' if always_pull else 'IfNotPresent',
194         env=env_vars,
195         ports=port_objs,
196         volume_mounts=kwargs.get('volume_mounts') or [],
197         resources=resources_obj,
198         readiness_probe=probe,
199         liveness_probe=live_probe
200     )
201
202
203 def _create_deployment_object(component_name,
204                               containers,
205                               init_containers,
206                               replicas,
207                               volumes,
208                               labels=None,
209                               pull_secrets=None):
210     if labels is None:
211         labels = {}
212     if pull_secrets is None:
213         pull_secrets = []
214     deployment_name = _create_deployment_name(component_name)
215
216     # Label the pod with the deployment name, so we can find it easily
217     labels.update({"k8sdeployment": deployment_name})
218
219     # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
220     # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
221     ips = []
222     for secret in pull_secrets:
223         ips.append(client.V1LocalObjectReference(name=secret))
224
225     # Define pod template
226     template = client.V1PodTemplateSpec(
227         metadata=client.V1ObjectMeta(labels=labels),
228         spec=client.V1PodSpec(hostname=component_name,
229                               containers=containers,
230                               init_containers=init_containers,
231                               volumes=volumes,
232                               image_pull_secrets=ips)
233     )
234
235     # Define deployment spec
236     spec = client.V1DeploymentSpec(
237         replicas=replicas,
238         selector=client.V1LabelSelector(match_labels=labels),
239         template=template
240     )
241
242     # Create deployment object
243     deployment = client.V1Deployment(
244         api_version="apps/v1",
245         kind="Deployment",
246         metadata=client.V1ObjectMeta(name=deployment_name, labels=labels),
247         spec=spec
248     )
249
250     return deployment
251
252
253 def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type, ip_family):
254     service_spec = client.V1ServiceSpec(
255         ports=service_ports,
256         selector={"app": component_name},
257         type=service_type,
258         ip_family=ip_family
259     )
260     if annotations:
261         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations)
262     else:
263         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels)
264
265     service = client.V1Service(
266         kind="Service",
267         api_version="v1",
268         metadata=metadata,
269         spec=service_spec
270     )
271     return service
272
273
274 def create_secret_with_password(namespace, secret_prefix, password_key, password_length):
275     """
276     Creates K8s secret object with a generated password.
277     Returns: secret name and data key.
278
279     Example usage:
280          create_secret_with_password('onap', 'dcae-keystore-password-', 128)
281     """
282     password = _generate_password(password_length)
283     password_base64 = _encode_base64(password)
284
285     metadata = {'generateName': secret_prefix, 'namespace': namespace}
286     key = password_key
287     data = {key: password_base64}
288
289     response = _create_k8s_secret(namespace, metadata, data, 'Opaque')
290     secret_name = response.metadata.name
291     return secret_name, key
292
293
294 def _generate_password(length):
295     rand = os.urandom(length)
296     password = hexlify(rand)
297     return password.decode("ascii");
298
299
300 def _encode_base64(value):
301     value_bytes = value.encode("ascii")
302     base64_encoded_bytes = base64.b64encode(value_bytes)
303     encoded_value = base64_encoded_bytes.decode("ascii")
304     return encoded_value
305
306
307 def _create_k8s_secret(namespace, metadata, data, secret_type):
308     api_version = 'v1'
309     kind = 'Secret'
310     body = client.V1Secret(api_version, data, kind, metadata, type=secret_type)
311
312     response = client.CoreV1Api().create_namespaced_secret(namespace, body)
313     return response
314
315
316 def parse_ports(port_list):
317     """
318     Parse the port list into a list of container ports (needed to create the container)
319     and to a set of port mappings to set up k8s services.
320     """
321     container_ports = []
322     port_map = {}
323     for p in port_list:
324         ipv6 = False
325         if type(p) is dict:
326             ipv6 = "ipv6" in p and p['ipv6']
327             p = "".join(str(v) for v in p['concat'])
328         m = PORTS.match(p.strip())
329         if m:
330             cport = int(m.group(1))
331             hport = int(m.group(4))
332             if m.group(3):
333                 proto = (m.group(3)).upper()
334             else:
335                 proto = "TCP"
336             port = (cport, proto)
337             if port not in container_ports:
338                 container_ports.append(port)
339             port_map[(cport, proto, ipv6)] = hport
340         else:
341             raise ValueError("Bad port specification: {0}".format(p))
342
343     return container_ports, port_map
344
345
346 def _parse_volumes(volume_list):
347     volumes = []
348     volume_mounts = []
349     for v in volume_list:
350         vname = str(uuid.uuid4())
351         vcontainer = v['container']['bind']
352         vro = (v['container'].get('mode') == 'ro')
353         if ('host' in v) and ('path' in v['host']):
354             vhost = v['host']['path']
355             volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
356         if ('config_volume' in v) and ('name' in v['config_volume']):
357             vconfig_volume = v['config_volume']['name']
358             volumes.append(client.V1Volume(name=vname, config_map=client.V1ConfigMapVolumeSource(default_mode=0o0644,
359                                                                                                  name=vconfig_volume,
360                                                                                                  optional=True)))
361         volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro))
362
363     return volumes, volume_mounts
364
365
366 def _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, log_info, filebeat):
367     if not log_info or not filebeat:
368         return
369     log_dir = log_info.get("log_directory")
370     if not log_dir:
371         return
372     sidecar_volume_mounts = []
373
374     # Create the volume for component log files and volume mounts for the component and sidecar containers
375     volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
376     volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
377     sc_path = log_info.get("alternate_fb_path") or "{0}/{1}".format(filebeat["log_path"], component_name)
378     sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
379
380     # Create the volume for sidecar data and the volume mount for it
381     volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
382     sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=filebeat["data_path"]))
383
384     # Create the volume for the sidecar configuration data and the volume mount for it
385     # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
386     volumes.append(
387         client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=filebeat["config_map"])))
388     sidecar_volume_mounts.append(
389         client.V1VolumeMount(name="filebeat-conf", mount_path=filebeat["config_path"],
390                              sub_path=filebeat["config_subpath"]))
391
392     # Finally create the container for the sidecar
393     containers.append(
394         _create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts))
395
396
397 def _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, tls_info, tls_config):
398     # Adds an InitContainer to the pod to set up TLS certificate information.  For components that act as a server(
399     # tls_info["use_tls"] is True), the InitContainer will populate a directory with server and CA certificate
400     # materials in various formats.   For other components (tls_info["use_tls"] is False, or tls_info is not
401     # specified), the InitContainer will populate a directory with CA certificate materials in PEM and JKS formats.
402     # In either case, the certificate directory is mounted onto the component container filesystem at the location
403     # specified by tls_info["component_cert_dir"], if present, otherwise at the configured default mount point (
404     # tls_config["component_cert_dir"]).
405     docker_image = tls_config["image"]
406     ctx.logger.info("Creating init container: TLS \n  * [" + docker_image + "]")
407
408     cert_directory = tls_info.get("cert_directory") or tls_config.get("component_cert_dir")
409     env = {}
410     env["TLS_SERVER"] = "true" if tls_info.get("use_tls") else "false"
411
412     # Create the certificate volume and volume mounts
413     volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource()))
414     volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=cert_directory))
415     init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])]
416
417     # Create the init container
418     init_containers.append(
419         _create_container_object("init-tls", docker_image, False, volume_mounts=init_volume_mounts, env=env))
420
421
422 def _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, external_tls_config):
423     # Adds an InitContainer to the pod which will generate external TLS certificates.
424     docker_image = external_tls_config["image_tag"]
425     ctx.logger.info("Creating init container: external TLS \n  * [" + docker_image + "]")
426
427     env = {}
428     env_from_secret = {}
429     output_path = external_cert.get("external_cert_directory")
430     if not output_path.endswith('/'):
431         output_path += '/'
432
433     keystore_secret_key = external_tls_config.get("keystore_secret_key")
434     truststore_secret_key = external_tls_config.get("truststore_secret_key")
435
436     env["REQUEST_URL"] = external_tls_config.get("request_url")
437     env["REQUEST_TIMEOUT"] = external_tls_config.get("timeout")
438     env["OUTPUT_PATH"] = output_path + "external"
439     env["OUTPUT_TYPE"] = external_cert.get("cert_type")
440     env["CA_NAME"] = external_cert.get("ca_name")
441     env["COMMON_NAME"] = external_cert.get("external_certificate_parameters").get("common_name")
442     env["ORGANIZATION"] = external_tls_config.get("organization")
443     env["ORGANIZATION_UNIT"] = external_tls_config.get("organizational_unit")
444     env["LOCATION"] = external_tls_config.get("location")
445     env["STATE"] = external_tls_config.get("state")
446     env["COUNTRY"] = external_tls_config.get("country")
447     env["SANS"] = external_cert.get("external_certificate_parameters").get("sans")
448     env["KEYSTORE_PATH"] = MOUNT_PATH + keystore_secret_key
449     env["TRUSTSTORE_PATH"] = MOUNT_PATH + truststore_secret_key
450     env_from_secret["KEYSTORE_PASSWORD"] = \
451         {"env_name": "KEYSTORE_PASSWORD",
452          "secret_name": external_tls_config.get("keystore_password_secret_name"),
453          "secret_key": external_tls_config.get("keystore_password_secret_key")}
454     env_from_secret["TRUSTSTORE_PASSWORD"] = \
455         {"env_name": "TRUSTSTORE_PASSWORD",
456          "secret_name": external_tls_config.get("truststore_password_secret_name"),
457          "secret_key": external_tls_config.get("truststore_password_secret_key")}
458     # Create the volumes and volume mounts
459     projected_volume = _create_projected_tls_volume(external_tls_config.get("cert_secret_name"),
460                                                     keystore_secret_key,
461                                                     truststore_secret_key)
462
463     volumes.append(client.V1Volume(name="tls-volume", projected=projected_volume))
464     init_volume_mounts = [
465         client.V1VolumeMount(name="tls-info", mount_path=external_cert.get("external_cert_directory")),
466         client.V1VolumeMount(name="tls-volume", mount_path=MOUNT_PATH)]
467
468     # Create the init container
469     init_containers.append(
470         _create_container_object("cert-service-client", docker_image, False, volume_mounts=init_volume_mounts, env=env, env_from_secret=env_from_secret))
471
472
473 def _create_projected_tls_volume(secret_name, keystore_secret_key, truststore_secret_key):
474     items = [
475         client.V1KeyToPath(key=keystore_secret_key, path=keystore_secret_key),
476         client.V1KeyToPath(key=truststore_secret_key, path=truststore_secret_key)]
477     secret_projection = client.V1SecretProjection(name=secret_name, items=items)
478     volume_projection = [client.V1VolumeProjection(secret=secret_projection)]
479     projected_volume = client.V1ProjectedVolumeSource(sources=volume_projection)
480     return projected_volume
481
482
483 def _add_cert_post_processor_init_container(ctx, init_containers, tls_info, tls_config, external_cert,
484                                             cert_post_processor_config, isCertManagerIntegration):
485     # Adds an InitContainer to the pod to merge TLS and external TLS truststore into single file.
486     docker_image = cert_post_processor_config["image_tag"]
487     ctx.logger.info("Creating init container: cert post processor \n  * [" + docker_image + "]")
488
489     tls_cert_dir = tls_info.get("cert_directory") or tls_config.get("component_cert_dir")
490     if not tls_cert_dir.endswith('/'):
491         tls_cert_dir += '/'
492
493     tls_cert_file_path = tls_cert_dir + "trust.jks"
494     tls_cert_file_pass = tls_cert_dir + "trust.pass"
495
496     ext_cert_dir = tls_cert_dir + "external/"
497
498     output_type = (external_cert.get("cert_type") or DEFAULT_CERT_TYPE).lower()
499     ext_truststore_path = ext_cert_dir + "truststore." + _get_file_extension(output_type)
500     ext_truststore_pass = ''
501     if output_type != 'pem':
502         ext_truststore_pass = ext_cert_dir + "truststore.pass"
503
504     env = {"TRUSTSTORES_PATHS": tls_cert_file_path + ":" + ext_truststore_path,
505            "TRUSTSTORES_PASSWORDS_PATHS": tls_cert_file_pass + ":" + ext_truststore_pass,
506            "KEYSTORE_SOURCE_PATHS": _get_keystore_source_paths(output_type, ext_cert_dir),
507            "KEYSTORE_DESTINATION_PATHS": _get_keystore_destination_paths(output_type, tls_cert_dir)}
508
509     ctx.logger.info("TRUSTSTORES_PATHS:            " + env["TRUSTSTORES_PATHS"])
510     ctx.logger.info("TRUSTSTORES_PASSWORDS_PATHS:  " + env["TRUSTSTORES_PASSWORDS_PATHS"])
511     ctx.logger.info("KEYSTORE_SOURCE_PATHS:        " + env["KEYSTORE_SOURCE_PATHS"])
512     ctx.logger.info("KEYSTORE_DESTINATION_PATHS:   " + env["KEYSTORE_DESTINATION_PATHS"])
513
514     # Create the volumes and volume mounts
515     init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_cert_dir)]
516     if isCertManagerIntegration:
517         init_volume_mounts.append(client.V1VolumeMount(
518             name="certmanager-certs-volume", mount_path=ext_cert_dir))
519     # Create the init container
520     init_containers.append(
521         _create_container_object("cert-post-processor", docker_image, False, volume_mounts=init_volume_mounts, env=env))
522
523
524 def _get_file_extension(output_type):
525     return {
526         'p12': 'p12',
527         'pem': 'pem',
528         'jks': 'jks',
529     }[output_type]
530
531
532 def _get_keystore_source_paths(output_type, ext_cert_dir):
533     source_paths_template = {
534         'p12': "{0}keystore.p12:{0}keystore.pass",
535         'jks': "{0}keystore.jks:{0}keystore.pass",
536         'pem': "{0}keystore.pem:{0}key.pem",
537     }[output_type]
538     return source_paths_template.format(ext_cert_dir)
539
540
541 def _get_keystore_destination_paths(output_type, tls_cert_dir):
542     destination_paths_template = {
543         'p12': "{0}cert.p12:{0}p12.pass",
544         'jks': "{0}cert.jks:{0}jks.pass",
545         'pem': "{0}cert.pem:{0}key.pem",
546     }[output_type]
547     return destination_paths_template.format(tls_cert_dir)
548
549
550 def _process_port_map(port_map):
551     service_ports = []  # Ports exposed internally on the k8s network
552     exposed_ports = []  # Ports to be mapped to ports on the k8s nodes via NodePort
553     exposed_ports_ipv6 = []
554     for (cport, proto, ipv6), hport in port_map.items():
555         name = "xport-{0}-{1}".format(proto[0].lower(), cport)
556         cport = int(cport)
557         hport = int(hport)
558         port = client.V1ServicePort(port=cport, protocol=proto, name=name[1:])
559         if port not in service_ports:
560             service_ports.append(port)
561         if hport != 0:
562             if ipv6:
563                 exposed_ports_ipv6.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name))
564             else:
565                 exposed_ports.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name))
566     return service_ports, exposed_ports, exposed_ports_ipv6
567
568
569 def _service_exists(location, namespace, component_name):
570     exists = False
571     try:
572         _configure_api(location)
573         client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace)
574         exists = True
575     except client.rest.ApiException:
576         pass
577
578     return exists
579
580
581 def _patch_deployment(location, namespace, deployment, modify):
582     '''
583     Gets the current spec for 'deployment' in 'namespace'
584     in the k8s cluster at 'location',
585     uses the 'modify' function to change the spec,
586     then sends the updated spec to k8s.
587     '''
588     _configure_api(location)
589
590     # Get deployment spec
591     spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace)
592
593     # Apply changes to spec
594     spec = modify(spec)
595
596     # Patch the deploy with updated spec
597     client.AppsV1Api().patch_namespaced_deployment(deployment, namespace, spec)
598
599
600 def _execute_command_in_pod(location, namespace, pod_name, command):
601     '''
602     Execute the command (specified by an argv-style list in  the "command" parameter) in
603     the specified pod in the specified namespace at the specified location.
604     For now at least, we use this only to
605     run a notification script in a pod after a configuration change.
606
607     The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec".
608     Unfortunately due to a bug/limitation in the Python client library, we can't call it directly.
609     We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API.
610     I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py.
611     There are several issues tracking this, in various states.  It isn't clear that there will ever
612     be a fix.
613         - https://github.com/kubernetes-client/python/issues/58
614         - https://github.com/kubernetes-client/python/issues/409
615         - https://github.com/kubernetes-client/python/issues/526
616
617     The main consequence of the workaround using "stream" is that the caller does not get an indication
618     of the exit code returned by the command when it completes execution.   It turns out that the
619     original implementation of notification in the Docker plugin did not use this result, so we can
620     still match the original notification functionality.
621
622     The "stream" approach returns a string containing any output sent by the command to stdout or stderr.
623     We'll return that so it can logged.
624     '''
625     _configure_api(location)
626     try:
627         output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec,
628                                name=pod_name,
629                                namespace=namespace,
630                                command=command,
631                                stdout=True,
632                                stderr=True,
633                                stdin=False,
634                                tty=False)
635     except client.rest.ApiException as e:
636         # If the exception indicates the pod wasn't found,  it's not a fatal error.
637         # It existed when we enumerated the pods for the deployment but no longer exists.
638         # Unfortunately, the only way to distinguish a pod not found from any other error
639         # is by looking at the reason text.
640         # (The ApiException's "status" field should contain the HTTP status code, which would
641         # be 404 if the pod isn't found, but empirical testing reveals that "status" is set
642         # to zero.)
643         if "404 not found" in e.reason.lower():
644             output = "Pod not found"
645         else:
646             raise e
647
648     return {"pod": pod_name, "output": output}
649
650
651 def _create_certificate_subject(external_tls_config):
652     """
653     Map parameters to custom resource subject
654     """
655     organization = external_tls_config.get("organization")
656     organization_unit = external_tls_config.get("organizational_unit")
657     country = external_tls_config.get("country")
658     location = external_tls_config.get("location")
659     state = external_tls_config.get("state")
660     subject = {
661         "organizations": [organization],
662         "countries": [country],
663         "localities": [location],
664         "provinces": [state],
665         "organizationalUnits": [organization_unit]
666     }
667     return subject
668
669
670 def _create_keystores_object(type, password_secret):
671     """
672     Create keystore property (JKS and PKC12 certificate) for custom resource
673     """
674     return {type: {
675         "create": True,
676         "passwordSecretRef": {
677             "name": password_secret,
678             "key": "password"
679         }}}
680
681
682 def _get_keystores_object_type(output_type):
683     """
684     Map config type to custom resource cert type
685     """
686     return {
687         'p12': 'pkcs12',
688         'jks': 'jks',
689     }[output_type]
690
691
692 def _create_projected_volume_with_password(cert_type, cert_secret_name, password_secret_name, password_secret_key):
693     """
694     Create volume for password protected certificates.
695     Secret contains passwords must be provided
696     """
697     extension = _get_file_extension(cert_type)
698     keystore_file_name = "keystore." + extension
699     truststore_file_name = "truststore." + extension
700     items = [client.V1KeyToPath(key=keystore_file_name, path=keystore_file_name),
701              client.V1KeyToPath(key=truststore_file_name, path=truststore_file_name)]
702     passwords = [client.V1KeyToPath(key=password_secret_key, path="keystore.pass"), client.V1KeyToPath(key=password_secret_key, path="truststore.pass")]
703
704     sec_projection = client.V1SecretProjection(name=cert_secret_name, items=items)
705     sec_passwords_projection = client.V1SecretProjection(name=password_secret_name, items=passwords)
706     sec_volume_projection = client.V1VolumeProjection(secret=sec_projection)
707     sec_passwords_volume_projection = client.V1VolumeProjection(secret=sec_passwords_projection)
708
709     return [sec_volume_projection, sec_passwords_volume_projection]
710
711
712 def _create_pem_projected_volume(cert_secret_name):
713     """
714     Create volume for pem certificate
715     """
716     items = [client.V1KeyToPath(key="tls.crt", path="keystore.pem"),
717              client.V1KeyToPath(key="ca.crt", path="truststore.pem"),
718              client.V1KeyToPath(key="tls.key", path="key.pem")]
719     sec_projection = client.V1SecretProjection(name=cert_secret_name, items=items)
720     return [client.V1VolumeProjection(secret=sec_projection)]
721
722
723 def create_certificate_object(ctx, cert_secret_name, external_cert_data, external_tls_config, cert_name, issuer):
724     """
725     Create cert-manager certificate custom resource object
726     """
727     common_name = external_cert_data.get("external_certificate_parameters").get("common_name")
728     subject = _create_certificate_subject(external_tls_config)
729
730     custom_resource = {
731         "apiVersion": "cert-manager.io/v1",
732         "kind": "Certificate",
733         "metadata": {"name": cert_name },
734         "spec": {
735             "secretName": cert_secret_name,
736             "commonName": common_name,
737             "issuerRef": {
738                 "group": "certmanager.onap.org",
739                 "kind": "CMPv2Issuer",
740                 "name": issuer
741             }
742         }
743     }
744     custom_resource.get("spec")["subject"] = subject
745
746     raw_sans = external_cert_data.get("external_certificate_parameters").get("sans")
747     ctx.logger.info("Read SANS property: " + str(raw_sans))
748     sans = SansParser().parse_sans(raw_sans)
749     ctx.logger.info("Parsed SANS: " + str(sans))
750
751     if len(sans["ips"]) > 0:
752         custom_resource.get("spec")["ipAddresses"] = sans["ips"]
753     if len(sans["dnss"]) > 0:
754         custom_resource.get("spec")["dnsNames"] = sans["dnss"]
755     if len(sans["emails"]) > 0:
756         custom_resource.get("spec")["emailAddresses"] = sans["emails"]
757     if len(sans["uris"]) > 0:
758         custom_resource.get("spec")["uris"] = sans["uris"]
759
760     return custom_resource
761
762
763 def _create_certificate_custom_resource(ctx, external_cert_data, external_tls_config, issuer, namespace, component_name, volumes, volume_mounts, deployment_description):
764     """
765     Create certificate custom resource for provided configuration
766     :param ctx: context
767     :param external_cert_data: object contains certificate common name and
768     SANs list
769     :param external_tls_config: object contains information about certificate subject
770     :param issuer: issuer-name
771     :param namespace: namespace
772     :param component_name: component name
773     :param volumes: list of deployment volume
774     :param volume_mounts: list of deployment volume mounts
775     :param deployment_description: list contains deployment information,
776     method appends created cert and secrets
777     """
778     ctx.logger.info("Creating certificate custom resource")
779     ctx.logger.info("External cert data: " + str(external_cert_data))
780
781     cert_type = (external_cert_data.get("cert_type") or DEFAULT_CERT_TYPE).lower()
782
783     api = client.CustomObjectsApi()
784     cert_secret_name = component_name + "-secret"
785     cert_name = component_name + "-cert"
786     cert_dir = external_cert_data.get("external_cert_directory") + "external/"
787     custom_resource = create_certificate_object(ctx, cert_secret_name,
788                                                 external_cert_data,
789                                                 external_tls_config,
790                                                 cert_name, issuer)
791     # Create the volumes
792     if cert_type != 'pem':
793         ctx.logger.info("Creating volume with passwords")
794         password_secret_name, password_secret_key = create_secret_with_password(namespace, component_name + "-cert-password", "password",  30)
795         deployment_description["secrets"].append(password_secret_name)
796         custom_resource.get("spec")["keystores"] = _create_keystores_object(_get_keystores_object_type(cert_type), password_secret_name)
797         projected_volume_sources = _create_projected_volume_with_password(
798             cert_type, cert_secret_name, password_secret_name, password_secret_key)
799     else:
800         ctx.logger.info("Creating PEM volume")
801         projected_volume_sources = _create_pem_projected_volume(cert_secret_name)
802
803     # Create the volume mounts
804     projected_volume = client.V1ProjectedVolumeSource(sources=projected_volume_sources)
805     volumes.append(client.V1Volume(name="certmanager-certs-volume", projected=projected_volume))
806     volume_mounts.append(client.V1VolumeMount(name="certmanager-certs-volume", mount_path=cert_dir))
807
808     #Create certificate custom resource
809     ctx.logger.info("Certificate CRD: " + str(custom_resource))
810     api.create_namespaced_custom_object(
811         group="cert-manager.io",
812         version="v1",
813         namespace=namespace,
814         plural="certificates",
815         body=custom_resource
816     )
817     deployment_description["certificates"].append(cert_name)
818     deployment_description["secrets"].append(cert_secret_name)
819     ctx.logger.info("CRD certificate created")
820
821
822 def deploy(ctx, namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs):
823     """
824     This will create a k8s Deployment and, if needed, one or two k8s Services.
825     (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
826     We're not exposing k8s to the component developer and the blueprint author.
827     This is a conscious choice.  We want to use k8s in a controlled, consistent way, and we want to hide
828     the details from the component developer and the blueprint author.)
829
830     namespace:  the Kubernetes namespace into which the component is deployed
831     component_name:  the component name, used to derive names of Kubernetes entities
832     image: the docker image for the component being deployed
833     replica: the number of instances of the component to be deployed
834     always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of
835        the Docker image for the component, even if it is already present on the Kubernetes node.
836     k8sconfig contains:
837         - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images.
838           (DON'T PANIC:  these are just the names of secrets held in the Kubernetes secret store.)
839         - filebeat: a dictionary of filebeat sidecar parameters:
840             "log_path" : mount point for log volume in filebeat container
841             "data_path" : mount point for data volume in filebeat container
842             "config_path" : mount point for config volume in filebeat container
843             "config_subpath" :  subpath for config data in filebeat container
844             "config_map" : ConfigMap holding the filebeat configuration
845             "image": Docker image to use for filebeat
846         - tls: a dictionary of TLS-related information:
847             "cert_path": mount point for certificate volume in init container
848             "image": Docker image to use for TLS init container
849             "component_cert_dir" : default mount point for certs
850         - cert_post_processor: a dictionary of cert_post_processor information:
851             "image_tag": docker image to use for cert-post-processor init container
852     kwargs may have:
853         - volumes:  array of volume objects, where a volume object is:
854             {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
855         - ports: array of strings in the form "container_port:host_port"
856         - env: map of name-value pairs ( {name0: value0, name1: value1...}
857         - log_info: an object with info for setting up ELK logging, with the form:
858             {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}
859         - tls_info: an object with info for setting up TLS (HTTPS), with the form:
860             {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" }
861         - external_cert: an object with information for setting up the init container for external certificates creation, with the form:
862             {"external_cert":
863                 "external_cert_directory": "/path/to/directory_where_certs_should_be_placed",
864                 "use_external_tls": true or false,
865                 "ca_name": "ca-name-value",
866                 "cert_type": "P12" or "JKS" or "PEM",
867                 "external_certificate_parameters":
868                     "common_name": "common-name-value",
869                     "sans": "sans-value"}
870         - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"}
871             These label will be set on all the pods deployed as a result of this deploy() invocation.
872         - resources: dict with optional "limits" and "requests" resource requirements, each a dict containing:
873             - cpu:    number CPU usage, like 0.5
874             - memory: string memory requirement, like "2Gi"
875         - readiness: dict with health check info; if present, used to create a readiness probe for the main container.  Includes:
876             - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
877             - interval: period (in seconds) between probes
878             - timeout:  time (in seconds) to allow a probe to complete
879             - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types
880             - path: the full path to the script to be executed in the container for "script" and "docker" types
881         - liveness: dict with health check info; if present, used to create a liveness probe for the main container.  Includes:
882             - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
883             - interval: period (in seconds) between probes
884             - timeout:  time (in seconds) to allow a probe to complete
885             - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types
886             - path: the full path to the script to be executed in the container for "script" and "docker" types
887         - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
888
889     """
890
891     deployment_ok = False
892     cip_service_created = False
893     deployment_description = {
894         "namespace": namespace,
895         "location": kwargs.get("k8s_location"),
896         "deployment": '',
897         "services": [],
898         "certificates": [],
899         "secrets": []
900     }
901
902     try:
903
904         # Get API handles
905         _configure_api(kwargs.get("k8s_location"))
906         core = client.CoreV1Api()
907         k8s_apps_v1_api_client = client.AppsV1Api()
908
909         # Parse the port mapping
910         container_ports, port_map = parse_ports(kwargs.get("ports", []))
911
912         # Parse the volumes list into volumes and volume_mounts for the deployment
913         volumes, volume_mounts = _parse_volumes(kwargs.get("volumes", []))
914
915         # Initialize the list of containers that will be part of the pod
916         containers = []
917         init_containers = []
918
919         # Set up the ELK logging sidecar container, if needed
920         _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"),
921                                  k8sconfig.get("filebeat"))
922
923         # Set up TLS information
924         _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {},
925                                 k8sconfig.get("tls"))
926
927         # Set up external TLS information
928         external_cert = kwargs.get("external_cert")
929         cmpv2_issuer_config = k8sconfig.get("cmpv2_issuer")
930         ctx.logger.info("CMPv2 Issuer properties: " + str(cmpv2_issuer_config))
931
932         cmpv2_integration_enabled = bool(util.strtobool(cmpv2_issuer_config.get("enabled")))
933         ctx.logger.info("CMPv2 integration enabled: " + str(cmpv2_integration_enabled))
934
935
936         if external_cert and external_cert.get("use_external_tls"):
937             if cmpv2_integration_enabled:
938                 _create_certificate_custom_resource(ctx, external_cert,
939                                                    k8sconfig.get("external_cert"),
940                                                    cmpv2_issuer_config.get("name"),
941                                                    namespace,
942                                                    component_name, volumes,
943                                                    volume_mounts, deployment_description)
944             else:
945                 _add_external_tls_init_container(ctx, init_containers, volumes, external_cert,
946                                                  k8sconfig.get("external_cert"))
947             _add_cert_post_processor_init_container(ctx, init_containers, kwargs.get("tls_info") or {},
948                                                         k8sconfig.get("tls"), external_cert,
949                                                         k8sconfig.get(
950                                                             "cert_post_processor"),cmpv2_integration_enabled)
951
952         # Create the container for the component
953         # Make it the first container in the pod
954         container_args = {key: kwargs.get(key) for key in ("env", "readiness", "liveness", "resources")}
955         container_args['container_ports'] = container_ports
956         container_args['volume_mounts'] = volume_mounts
957         containers.insert(0, _create_container_object(component_name, image, always_pull, **container_args))
958
959         # Build the k8s Deployment object
960         labels = kwargs.get("labels", {})
961         labels["app"] = component_name
962         dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels,
963                                         pull_secrets=k8sconfig["image_pull_secrets"])
964
965         # Have k8s deploy it
966         k8s_apps_v1_api_client.create_namespaced_deployment(namespace, dep)
967         deployment_ok = True
968         deployment_description["deployment"] = _create_deployment_name(component_name)
969
970         # Create service(s), if a port mapping is specified
971         if port_map:
972             service_ports, exposed_ports, exposed_ports_ipv6 = _process_port_map(port_map)
973
974             # Create a ClusterIP service for access via the k8s network
975             service = _create_service_object(_create_service_name(component_name), component_name, service_ports, None,
976                                              labels, "ClusterIP", "IPv4")
977             core.create_namespaced_service(namespace, service)
978             cip_service_created = True
979             deployment_description["services"].append(_create_service_name(component_name))
980
981             # If there are ports to be exposed on the k8s nodes, create a "NodePort" service
982             if exposed_ports:
983                 exposed_service = \
984                     _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports,
985                                            '', labels, "NodePort", "IPv4")
986                 core.create_namespaced_service(namespace, exposed_service)
987                 deployment_description["services"].append(_create_exposed_service_name(component_name))
988
989             if exposed_ports_ipv6:
990                 exposed_service_ipv6 = \
991                     _create_service_object(_create_exposed_v6_service_name(component_name), component_name,
992                                            exposed_ports_ipv6, '', labels, "NodePort", "IPv6")
993                 core.create_namespaced_service(namespace, exposed_service_ipv6)
994                 deployment_description["services"].append(_create_exposed_v6_service_name(component_name))
995
996     except Exception as e:
997         # If the ClusterIP service was created, delete the service:
998         if cip_service_created:
999             core.delete_namespaced_service(_create_service_name(component_name), namespace)
1000         # If the deployment was created but not the service, delete the deployment
1001         if deployment_ok:
1002             client.AppsV1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace,
1003                                                             body=client.V1DeleteOptions())
1004         raise e
1005
1006     return dep, deployment_description
1007
1008
1009 def undeploy(deployment_description):
1010     _configure_api(deployment_description["location"])
1011
1012     namespace = deployment_description["namespace"]
1013
1014     # remove any services associated with the component
1015     for service in deployment_description["services"]:
1016         client.CoreV1Api().delete_namespaced_service(service, namespace)
1017
1018     for secret in deployment_description["secrets"]:
1019         client.CoreV1Api().delete_namespaced_secret(secret, namespace)
1020
1021     for cert in deployment_description["certificates"]:
1022         # client.CoreV1Api().delete_namespaced_service(service, namespace)
1023         client.CustomObjectsApi().delete_namespaced_custom_object(
1024             group="cert-manager.io",
1025             version="v1",
1026             name=cert,
1027             namespace=namespace,
1028             plural="certificates"
1029         )
1030     # Have k8s delete the underlying pods and replicaset when deleting the deployment.
1031     options = client.V1DeleteOptions(propagation_policy="Foreground")
1032     client.AppsV1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options)
1033
1034
1035 def is_available(location, namespace, component_name):
1036     _configure_api(location)
1037     dep_status = client.AppsV1Api().read_namespaced_deployment_status(_create_deployment_name(component_name),
1038                                                                       namespace)
1039     # Check if the number of available replicas is equal to the number requested and that the replicas match the
1040     # current spec This check can be used to verify completion of an initial deployment, a scale operation,
1041     # or an update operation
1042     return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas
1043
1044
1045 def scale(deployment_description, replicas):
1046     """ Trigger a scaling operation by updating the replica count for the Deployment """
1047
1048     def update_replica_count(spec):
1049         spec.spec.replicas = replicas
1050         return spec
1051
1052     _patch_deployment(deployment_description["location"], deployment_description["namespace"],
1053                       deployment_description["deployment"], update_replica_count)
1054
1055
1056 def upgrade(deployment_description, image, container_index=0):
1057     """ Trigger a rolling upgrade by sending a new image name/tag to k8s """
1058
1059     def update_image(spec):
1060         spec.spec.template.spec.containers[container_index].image = image
1061         return spec
1062
1063     _patch_deployment(deployment_description["location"], deployment_description["namespace"],
1064                       deployment_description["deployment"], update_image)
1065
1066
1067 def rollback(deployment_description, rollback_to=0):
1068     """
1069     Undo upgrade by rolling back to a previous revision of the deployment.
1070     By default, go back one revision.
1071     rollback_to can be used to supply a specific revision number.
1072     Returns the image for the app container and the replica count from the rolled-back deployment
1073     """
1074     '''
1075     2018-07-13
1076     Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method.
1077     The k8s python client code throws an exception while processing the response from the API.
1078     See:
1079        - https://github.com/kubernetes-client/python/issues/491
1080        - https://github.com/kubernetes/kubernetes/pull/63837
1081     The fix has been merged into the master branch but is not in the latest release.
1082     '''
1083     _configure_api(deployment_description["location"])
1084     deployment = deployment_description["deployment"]
1085     namespace = deployment_description["namespace"]
1086
1087     # Initiate the rollback
1088     client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback(
1089         deployment,
1090         namespace,
1091         client.AppsV1beta1DeploymentRollback(name=deployment,
1092                                              rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to)))
1093
1094     # Read back the spec for the rolled-back deployment
1095     spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace)
1096     return spec.spec.template.spec.containers[0].image, spec.spec.replicas
1097
1098
1099 def execute_command_in_deployment(deployment_description, command):
1100     """
1101     Enumerates the pods in the k8s deployment identified by "deployment_description",
1102     then executes the command (represented as an argv-style list) in "command" in
1103     container 0 (the main application container) each of those pods.
1104
1105     Note that the sets of pods associated with a deployment can change over time.  The
1106     enumeration is a snapshot at one point in time.  The command will not be executed in
1107     pods that are created after the initial enumeration.   If a pod disappears after the
1108     initial enumeration and before the command is executed, the attempt to execute the
1109     command will fail.  This is not treated as a fatal error.
1110
1111     This approach is reasonable for the one current use case for "execute_command":  running a
1112     script to notify a container that its configuration has changed as a result of a
1113     policy change.  In this use case, the new configuration information is stored into
1114     the configuration store (Consul), the pods are enumerated, and the command is executed.
1115     If a pod disappears after the enumeration, the fact that the command cannot be run
1116     doesn't matter--a nonexistent pod doesn't need to be reconfigured.  Similarly, a pod that
1117     comes up after the enumeration will get its initial configuration from the updated version
1118     in Consul.
1119
1120     The optimal solution here would be for k8s to provide an API call to execute a command in
1121     all of the pods for a deployment.   Unfortunately, k8s does not provide such a call--the
1122     only call provided by k8s operates at the pod level, not the deployment level.
1123
1124     Another interesting k8s factoid: there's no direct way to list the pods belong to a
1125     particular k8s deployment.   The deployment code above sets a label ("k8sdeployment") on
1126     the pod that has the k8s deployment name.  To list the pods, the code below queries for
1127     pods with the label carrying the deployment name.
1128     """
1129     location = deployment_description["location"]
1130     _configure_api(location)
1131     deployment = deployment_description["deployment"]
1132     namespace = deployment_description["namespace"]
1133
1134     # Get names of all the running pods belonging to the deployment
1135     pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod(
1136         namespace=namespace,
1137         label_selector="k8sdeployment={0}".format(deployment),
1138         field_selector="status.phase=Running"
1139     ).items]
1140
1141     # Execute command in the running pods
1142     return [_execute_command_in_pod(location, namespace, pod_name, command)
1143             for pod_name in pod_names]
1144
1145
1146
1147