1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
23 from kubernetes import config, client, stream
25 # Default values for readiness probe
26 PROBE_DEFAULT_PERIOD = 15
27 PROBE_DEFAULT_TIMEOUT = 1
29 # Location of k8s cluster config file ("kubeconfig")
30 K8S_CONFIG_PATH="/opt/onap/kube/kubeconfig"
32 # Regular expression for interval/timeout specification
33 INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$")
34 # Conversion factors to seconds
35 FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600}
37 # Regular expression for port mapping
38 # group 1: container port
39 # group 2: / + protocol
42 PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$")
44 def _create_deployment_name(component_name):
45 return "dep-{0}".format(component_name)[:63]
47 def _create_service_name(component_name):
48 return "{0}".format(component_name)[:63]
50 def _create_exposed_service_name(component_name):
51 return ("x{0}".format(component_name))[:63]
53 def _configure_api(location=None):
54 # Look for a kubernetes config file
55 if os.path.exists(K8S_CONFIG_PATH):
56 config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False)
58 # Maybe we're running in a k8s container and we can use info provided by k8s
59 # We would like to use:
60 # config.load_incluster_config()
61 # but this looks into os.environ for kubernetes host and port, and from
62 # the plugin those aren't visible. So we use the InClusterConfigLoader class,
63 # where we can set the environment to what we like.
64 # This is probably brittle! Maybe there's a better alternative.
66 config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local",
67 config.incluster_config.SERVICE_PORT_ENV_NAME : "443"
69 config.incluster_config.InClusterConfigLoader(
70 token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME,
71 cert_filename=config.incluster_config.SERVICE_CERT_FILENAME,
75 def _parse_interval(t):
77 Parse an interval specification
79 - a simple integer quantity, interpreted as seconds
80 - a string representation of a decimal integer, interpreted as seconds
81 - a string consisting of a represention of an decimal integer followed by a unit,
82 with "s" representing seconds, "m" representing minutes,
83 and "h" representing hours
84 Used for compatibility with the Docker plugin, where time intervals
85 for health checks were specified as strings with a number and a unit.
86 See 'intervalspec' above for the regular expression that's accepted.
88 m = INTERVAL_SPEC.match(str(t))
90 time = int(m.group(1)) * FACTORS[m.group(2)]
92 raise ValueError("Bad interval specification: {0}".format(t))
95 def _create_probe(hc, port):
96 ''' Create a Kubernetes probe based on info in the health check dictionary hc '''
97 probe_type = hc['type']
99 period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD))
100 timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT))
101 if probe_type in ['http', 'https']:
102 probe = client.V1Probe(
103 failure_threshold = 1,
104 initial_delay_seconds = 5,
105 period_seconds = period,
106 timeout_seconds = timeout,
107 http_get = client.V1HTTPGetAction(
108 path = hc['endpoint'],
110 scheme = probe_type.upper()
113 elif probe_type in ['script', 'docker']:
114 probe = client.V1Probe(
115 failure_threshold = 1,
116 initial_delay_seconds = 5,
117 period_seconds = period,
118 timeout_seconds = timeout,
119 _exec = client.V1ExecAction(
120 command = hc['script'].split( )
125 def _create_resources(resources=None):
126 if resources is not None:
127 resources_obj = client.V1ResourceRequirements(
128 limits = resources.get("limits"),
129 requests = resources.get("requests")
135 def _create_container_object(name, image, always_pull, **kwargs):
136 # Set up environment variables
137 # Copy any passed in environment variables
138 env = kwargs.get('env') or {}
139 env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env]
140 # Add POD_IP with the IP address of the pod running the container
141 pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP"))
142 env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip))
144 # If a health check is specified, create a readiness/liveness probe
145 # (For an HTTP-based check, we assume it's at the first container port)
146 readiness = kwargs.get('readiness')
147 liveness = kwargs.get('liveness')
148 resources = kwargs.get('resources')
149 container_ports = kwargs.get('container_ports') or []
151 hc_port = container_ports[0][0] if container_ports else None
152 probe = _create_probe(readiness, hc_port) if readiness else None
153 live_probe = _create_probe(liveness, hc_port) if liveness else None
154 resources_obj = _create_resources(resources) if resources else None
155 port_objs = [client.V1ContainerPort(container_port=port, protocol=proto)
156 for port, proto in container_ports]
158 # Define container for pod
159 return client.V1Container(
162 image_pull_policy='Always' if always_pull else 'IfNotPresent',
165 volume_mounts=kwargs.get('volume_mounts') or [],
166 resources=resources_obj,
167 readiness_probe=probe,
168 liveness_probe=live_probe
171 def _create_deployment_object(component_name,
179 deployment_name = _create_deployment_name(component_name)
181 # Label the pod with the deployment name, so we can find it easily
182 labels.update({"k8sdeployment" : deployment_name})
184 # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
185 # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
187 for secret in pull_secrets:
188 ips.append(client.V1LocalObjectReference(name=secret))
190 # Define pod template
191 template = client.V1PodTemplateSpec(
192 metadata=client.V1ObjectMeta(labels=labels),
193 spec=client.V1PodSpec(hostname=component_name,
194 containers=containers,
195 init_containers=init_containers,
197 image_pull_secrets=ips)
200 # Define deployment spec
201 spec = client.ExtensionsV1beta1DeploymentSpec(
206 # Create deployment object
207 deployment = client.ExtensionsV1beta1Deployment(
209 metadata=client.V1ObjectMeta(name=deployment_name),
215 def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type):
216 service_spec = client.V1ServiceSpec(
218 selector={"app" : component_name},
222 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations)
224 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels)
226 service = client.V1Service(
234 def parse_ports(port_list):
236 Parse the port list into a list of container ports (needed to create the container)
237 and to a set of port mappings to set up k8s services.
242 m = PORTS.match(p.strip())
244 cport = int(m.group(1))
245 hport = int (m.group(4))
247 proto = (m.group(3)).upper()
250 container_ports.append((cport, proto))
251 port_map[(cport, proto)] = hport
253 raise ValueError("Bad port specification: {0}".format(p))
255 return container_ports, port_map
257 def _parse_volumes(volume_list):
260 for v in volume_list:
261 vname = str(uuid.uuid4())
262 vhost = v['host']['path']
263 vcontainer = v['container']['bind']
264 vro = (v['container'].get('mode') == 'ro')
265 volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
266 volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro))
268 return volumes, volume_mounts
270 def _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, log_info, filebeat):
271 if not log_info or not filebeat:
273 log_dir = log_info.get("log_directory")
276 sidecar_volume_mounts = []
278 # Create the volume for component log files and volume mounts for the component and sidecar containers
279 volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
280 volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
281 sc_path = log_info.get("alternate_fb_path") or "{0}/{1}".format(filebeat["log_path"], component_name)
282 sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
284 # Create the volume for sidecar data and the volume mount for it
285 volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
286 sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=filebeat["data_path"]))
288 # Create the volume for the sidecar configuration data and the volume mount for it
289 # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
291 client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=filebeat["config_map"])))
292 sidecar_volume_mounts.append(
293 client.V1VolumeMount(name="filebeat-conf", mount_path=filebeat["config_path"], sub_path=filebeat["config_subpath"]))
295 # Finally create the container for the sidecar
296 containers.append(_create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts))
298 def _add_tls_init_container(init_containers, volumes, volume_mounts, tls_info, tls_config):
299 # Adds an InitContainer to the pod to set up TLS certificate information. For components that act as a
300 # server(tls_info["use_tls"] is True), the InitContainer will populate a directory with server and CA certificate
301 # materials in various formats. For other components (tls_info["use_tls"] is False, or tls_info is not specified),
302 # the InitContainer will populate a directory with CA certificate materials in PEM and JKS formats.
303 # In either case, the certificate directory is mounted onto the component container filesystem at the location
304 # specified by tls_info["component_cert_dir"], if present, otherwise at the configured default mount point
305 # (tls_config["component_cert_dir"]).
307 cert_directory = tls_info.get("cert_directory") or tls_config.get("component_cert_dir")
309 env["TLS_SERVER"] = "true" if tls_info.get("use_tls") else "false"
311 # Create the certificate volume and volume mounts
312 volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource()))
313 volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=cert_directory))
314 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])]
316 # Create the init container
317 init_containers.append(_create_container_object("init-tls", tls_config["image"], False, volume_mounts=init_volume_mounts, env=env))
319 def _process_port_map(port_map):
320 service_ports = [] # Ports exposed internally on the k8s network
321 exposed_ports = [] # Ports to be mapped to ports on the k8s nodes via NodePort
322 for (cport, proto), hport in port_map.items():
323 name = "xport-{0}-{1}".format(proto[0].lower(), cport)
326 service_ports.append(client.V1ServicePort(port=cport, protocol=proto, name=name[1:]))
328 exposed_ports.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name))
329 return service_ports, exposed_ports
331 def _service_exists(location, namespace, component_name):
334 _configure_api(location)
335 client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace)
337 except client.rest.ApiException:
342 def _patch_deployment(location, namespace, deployment, modify):
344 Gets the current spec for 'deployment' in 'namespace'
345 in the k8s cluster at 'location',
346 uses the 'modify' function to change the spec,
347 then sends the updated spec to k8s.
349 _configure_api(location)
351 # Get deployment spec
352 spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
354 # Apply changes to spec
357 # Patch the deploy with updated spec
358 client.ExtensionsV1beta1Api().patch_namespaced_deployment(deployment, namespace, spec)
360 def _execute_command_in_pod(location, namespace, pod_name, command):
362 Execute the command (specified by an argv-style list in the "command" parameter) in
363 the specified pod in the specified namespace at the specified location.
364 For now at least, we use this only to
365 run a notification script in a pod after a configuration change.
367 The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec".
368 Unfortunately due to a bug/limitation in the Python client library, we can't call it directly.
369 We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API.
370 I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py.
371 There are several issues tracking this, in various states. It isn't clear that there will ever
373 - https://github.com/kubernetes-client/python/issues/58
374 - https://github.com/kubernetes-client/python/issues/409
375 - https://github.com/kubernetes-client/python/issues/526
377 The main consequence of the workaround using "stream" is that the caller does not get an indication
378 of the exit code returned by the command when it completes execution. It turns out that the
379 original implementation of notification in the Docker plugin did not use this result, so we can
380 still match the original notification functionality.
382 The "stream" approach returns a string containing any output sent by the command to stdout or stderr.
383 We'll return that so it can logged.
385 _configure_api(location)
387 output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec,
395 except client.rest.ApiException as e:
396 # If the exception indicates the pod wasn't found, it's not a fatal error.
397 # It existed when we enumerated the pods for the deployment but no longer exists.
398 # Unfortunately, the only way to distinguish a pod not found from any other error
399 # is by looking at the reason text.
400 # (The ApiException's "status" field should contain the HTTP status code, which would
401 # be 404 if the pod isn't found, but empirical testing reveals that "status" is set
403 if "404 not found" in e.reason.lower():
404 output = "Pod not found"
408 return {"pod" : pod_name, "output" : output}
410 def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs):
412 This will create a k8s Deployment and, if needed, one or two k8s Services.
413 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
414 We're not exposing k8s to the component developer and the blueprint author.
415 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide
416 the details from the component developer and the blueprint author.)
418 namespace: the Kubernetes namespace into which the component is deployed
419 component_name: the component name, used to derive names of Kubernetes entities
420 image: the docker image for the component being deployed
421 replica: the number of instances of the component to be deployed
422 always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of
423 the Docker image for the component, even if it is already present on the Kubernetes node.
425 - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images.
426 (DON'T PANIC: these are just the names of secrets held in the Kubernetes secret store.)
427 - filebeat: a dictionary of filebeat sidecar parameters:
428 "log_path" : mount point for log volume in filebeat container
429 "data_path" : mount point for data volume in filebeat container
430 "config_path" : mount point for config volume in filebeat container
431 "config_subpath" : subpath for config data in filebeat container
432 "config_map" : ConfigMap holding the filebeat configuration
433 "image": Docker image to use for filebeat
434 - tls: a dictionary of TLS-related information:
435 "cert_path": mount point for certificate volume in init container
436 "image": Docker image to use for TLS init container
437 "component_cert_dir" : default mount point for certs
439 - volumes: array of volume objects, where a volume object is:
440 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
441 - ports: array of strings in the form "container_port:host_port"
442 - env: map of name-value pairs ( {name0: value0, name1: value1...}
443 - log_info: an object with info for setting up ELK logging, with the form:
444 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}
445 - tls_info: an object with info for setting up TLS (HTTPS), with the form:
446 {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" }
447 - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"}
448 These label will be set on all the pods deployed as a result of this deploy() invocation.
449 - resources: dict with optional "limits" and "requests" resource requirements, each a dict containing:
450 - cpu: number CPU usage, like 0.5
451 - memory: string memory requirement, like "2Gi"
452 - readiness: dict with health check info; if present, used to create a readiness probe for the main container. Includes:
453 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
454 - interval: period (in seconds) between probes
455 - timeout: time (in seconds) to allow a probe to complete
456 - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types
457 - path: the full path to the script to be executed in the container for "script" and "docker" types
458 - liveness: dict with health check info; if present, used to create a liveness probe for the main container. Includes:
459 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
460 - interval: period (in seconds) between probes
461 - timeout: time (in seconds) to allow a probe to complete
462 - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types
463 - path: the full path to the script to be executed in the container for "script" and "docker" types
464 - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
468 deployment_ok = False
469 cip_service_created = False
470 deployment_description = {
471 "namespace": namespace,
472 "location" : kwargs.get("k8s_location"),
480 _configure_api(kwargs.get("k8s_location"))
481 core = client.CoreV1Api()
482 ext = client.ExtensionsV1beta1Api()
484 # Parse the port mapping
485 container_ports, port_map = parse_ports(kwargs.get("ports", []))
487 # Parse the volumes list into volumes and volume_mounts for the deployment
488 volumes, volume_mounts = _parse_volumes(kwargs.get("volumes", []))
490 # Initialize the list of containers that will be part of the pod
494 # Set up the ELK logging sidecar container, if needed
495 _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"), k8sconfig.get("filebeat"))
497 # Set up TLS information
498 _add_tls_init_container(init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {}, k8sconfig.get("tls"))
500 # Create the container for the component
501 # Make it the first container in the pod
502 container_args = {key: kwargs.get(key) for key in ("env", "readiness", "liveness", "resources")}
503 container_args['container_ports'] = container_ports
504 container_args['volume_mounts'] = volume_mounts
505 containers.insert(0, _create_container_object(component_name, image, always_pull, **container_args))
507 # Build the k8s Deployment object
508 labels = kwargs.get("labels", {})
509 labels["app"] = component_name
510 dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"])
513 ext.create_namespaced_deployment(namespace, dep)
515 deployment_description["deployment"] = _create_deployment_name(component_name)
517 # Create service(s), if a port mapping is specified
519 service_ports, exposed_ports = _process_port_map(port_map)
521 # Create a ClusterIP service for access via the k8s network
522 service = _create_service_object(_create_service_name(component_name), component_name, service_ports, None, labels, "ClusterIP")
523 core.create_namespaced_service(namespace, service)
524 cip_service_created = True
525 deployment_description["services"].append(_create_service_name(component_name))
527 # If there are ports to be exposed on the k8s nodes, create a "NodePort" service
530 _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, '', labels, "NodePort")
531 core.create_namespaced_service(namespace, exposed_service)
532 deployment_description["services"].append(_create_exposed_service_name(component_name))
534 except Exception as e:
535 # If the ClusterIP service was created, delete the service:
536 if cip_service_created:
537 core.delete_namespaced_service(_create_service_name(component_name), namespace)
538 # If the deployment was created but not the service, delete the deployment
540 client.ExtensionsV1beta1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, body=client.V1DeleteOptions())
543 return dep, deployment_description
545 def undeploy(deployment_description):
546 _configure_api(deployment_description["location"])
548 namespace = deployment_description["namespace"]
550 # remove any services associated with the component
551 for service in deployment_description["services"]:
552 client.CoreV1Api().delete_namespaced_service(service, namespace)
554 # Have k8s delete the underlying pods and replicaset when deleting the deployment.
555 options = client.V1DeleteOptions(propagation_policy="Foreground")
556 client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options)
558 def is_available(location, namespace, component_name):
559 _configure_api(location)
560 dep_status = client.AppsV1beta1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace)
561 # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec
562 # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation
563 return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas
565 def scale(deployment_description, replicas):
566 ''' Trigger a scaling operation by updating the replica count for the Deployment '''
568 def update_replica_count(spec):
569 spec.spec.replicas = replicas
572 _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_replica_count)
574 def upgrade(deployment_description, image, container_index = 0):
575 ''' Trigger a rolling upgrade by sending a new image name/tag to k8s '''
577 def update_image(spec):
578 spec.spec.template.spec.containers[container_index].image = image
581 _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_image)
583 def rollback(deployment_description, rollback_to=0):
585 Undo upgrade by rolling back to a previous revision of the deployment.
586 By default, go back one revision.
587 rollback_to can be used to supply a specific revision number.
588 Returns the image for the app container and the replica count from the rolled-back deployment
592 Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method.
593 The k8s python client code throws an exception while processing the response from the API.
595 - https://github.com/kubernetes-client/python/issues/491
596 - https://github.com/kubernetes/kubernetes/pull/63837
597 The fix has been merged into the master branch but is not in the latest release.
599 _configure_api(deployment_description["location"])
600 deployment = deployment_description["deployment"]
601 namespace = deployment_description["namespace"]
603 # Initiate the rollback
604 client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback(
607 client.AppsV1beta1DeploymentRollback(name=deployment, rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to)))
609 # Read back the spec for the rolled-back deployment
610 spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
611 return spec.spec.template.spec.containers[0].image, spec.spec.replicas
613 def execute_command_in_deployment(deployment_description, command):
615 Enumerates the pods in the k8s deployment identified by "deployment_description",
616 then executes the command (represented as an argv-style list) in "command" in
617 container 0 (the main application container) each of those pods.
619 Note that the sets of pods associated with a deployment can change over time. The
620 enumeration is a snapshot at one point in time. The command will not be executed in
621 pods that are created after the initial enumeration. If a pod disappears after the
622 initial enumeration and before the command is executed, the attempt to execute the
623 command will fail. This is not treated as a fatal error.
625 This approach is reasonable for the one current use case for "execute_command": running a
626 script to notify a container that its configuration has changed as a result of a
627 policy change. In this use case, the new configuration information is stored into
628 the configuration store (Consul), the pods are enumerated, and the command is executed.
629 If a pod disappears after the enumeration, the fact that the command cannot be run
630 doesn't matter--a nonexistent pod doesn't need to be reconfigured. Similarly, a pod that
631 comes up after the enumeration will get its initial configuration from the updated version
634 The optimal solution here would be for k8s to provide an API call to execute a command in
635 all of the pods for a deployment. Unfortunately, k8s does not provide such a call--the
636 only call provided by k8s operates at the pod level, not the deployment level.
638 Another interesting k8s factoid: there's no direct way to list the pods belong to a
639 particular k8s deployment. The deployment code above sets a label ("k8sdeployment") on
640 the pod that has the k8s deployment name. To list the pods, the code below queries for
641 pods with the label carrying the deployment name.
643 location = deployment_description["location"]
644 _configure_api(location)
645 deployment = deployment_description["deployment"]
646 namespace = deployment_description["namespace"]
648 # Get names of all the running pods belonging to the deployment
649 pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod(
650 namespace = namespace,
651 label_selector = "k8sdeployment={0}".format(deployment),
652 field_selector = "status.phase=Running"
655 # Execute command in the running pods
656 return [_execute_command_in_pod(location, namespace, pod_name, command)
657 for pod_name in pod_names]