1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
20 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
24 from kubernetes import config, client, stream
26 # Default values for readiness probe
27 PROBE_DEFAULT_PERIOD = 15
28 PROBE_DEFAULT_TIMEOUT = 1
30 # Location of k8s cluster config file ("kubeconfig")
31 K8S_CONFIG_PATH="/opt/onap/kube/kubeconfig"
33 # Regular expression for interval/timeout specification
34 INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$")
35 # Conversion factors to seconds
36 FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600}
38 # Regular expression for port mapping
39 # group 1: container port
40 # group 2: / + protocol
43 PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$")
45 def _create_deployment_name(component_name):
46 return "dep-{0}".format(component_name)[:63]
48 def _create_service_name(component_name):
49 return "{0}".format(component_name)[:63]
51 def _create_exposed_service_name(component_name):
52 return ("x{0}".format(component_name))[:63]
54 def _configure_api(location=None):
55 # Look for a kubernetes config file
56 if os.path.exists(K8S_CONFIG_PATH):
57 config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False)
59 # Maybe we're running in a k8s container and we can use info provided by k8s
60 # We would like to use:
61 # config.load_incluster_config()
62 # but this looks into os.environ for kubernetes host and port, and from
63 # the plugin those aren't visible. So we use the InClusterConfigLoader class,
64 # where we can set the environment to what we like.
65 # This is probably brittle! Maybe there's a better alternative.
67 config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local",
68 config.incluster_config.SERVICE_PORT_ENV_NAME : "443"
70 config.incluster_config.InClusterConfigLoader(
71 token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME,
72 cert_filename=config.incluster_config.SERVICE_CERT_FILENAME,
76 def _parse_interval(t):
78 Parse an interval specification
80 - a simple integer quantity, interpreted as seconds
81 - a string representation of a decimal integer, interpreted as seconds
82 - a string consisting of a represention of an decimal integer followed by a unit,
83 with "s" representing seconds, "m" representing minutes,
84 and "h" representing hours
85 Used for compatibility with the Docker plugin, where time intervals
86 for health checks were specified as strings with a number and a unit.
87 See 'intervalspec' above for the regular expression that's accepted.
89 m = INTERVAL_SPEC.match(str(t))
91 time = int(m.group(1)) * FACTORS[m.group(2)]
93 raise ValueError("Bad interval specification: {0}".format(t))
96 def _create_probe(hc, port):
97 ''' Create a Kubernetes probe based on info in the health check dictionary hc '''
98 probe_type = hc['type']
100 period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD))
101 timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT))
102 if probe_type in ['http', 'https']:
103 probe = client.V1Probe(
104 failure_threshold = 1,
105 initial_delay_seconds = 5,
106 period_seconds = period,
107 timeout_seconds = timeout,
108 http_get = client.V1HTTPGetAction(
109 path = hc['endpoint'],
111 scheme = probe_type.upper()
114 elif probe_type in ['script', 'docker']:
115 probe = client.V1Probe(
116 failure_threshold = 1,
117 initial_delay_seconds = 5,
118 period_seconds = period,
119 timeout_seconds = timeout,
120 _exec = client.V1ExecAction(
121 command = hc['script'].split( )
126 def _create_resources(resources=None):
127 if resources is not None:
128 resources_obj = client.V1ResourceRequirements(
129 limits = resources.get("limits"),
130 requests = resources.get("requests")
136 def _create_container_object(name, image, always_pull, **kwargs):
137 # Set up environment variables
138 # Copy any passed in environment variables
139 env = kwargs.get('env') or {}
140 env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env]
141 # Add POD_IP with the IP address of the pod running the container
142 pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP"))
143 env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip))
145 # If a health check is specified, create a readiness/liveness probe
146 # (For an HTTP-based check, we assume it's at the first container port)
147 readiness = kwargs.get('readiness')
148 liveness = kwargs.get('liveness')
149 resources = kwargs.get('resources')
150 container_ports = kwargs.get('container_ports') or []
152 hc_port = container_ports[0][0] if container_ports else None
153 probe = _create_probe(readiness, hc_port) if readiness else None
154 live_probe = _create_probe(liveness, hc_port) if liveness else None
155 resources_obj = _create_resources(resources) if resources else None
156 port_objs = [client.V1ContainerPort(container_port=port, protocol=proto)
157 for port, proto in container_ports]
159 # Define container for pod
160 return client.V1Container(
163 image_pull_policy='Always' if always_pull else 'IfNotPresent',
166 volume_mounts=kwargs.get('volume_mounts') or [],
167 resources=resources_obj,
168 readiness_probe=probe,
169 liveness_probe=live_probe
172 def _create_deployment_object(component_name,
180 deployment_name = _create_deployment_name(component_name)
182 # Label the pod with the deployment name, so we can find it easily
183 labels.update({"k8sdeployment" : deployment_name})
185 # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
186 # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
188 for secret in pull_secrets:
189 ips.append(client.V1LocalObjectReference(name=secret))
191 # Define pod template
192 template = client.V1PodTemplateSpec(
193 metadata=client.V1ObjectMeta(labels=labels),
194 spec=client.V1PodSpec(hostname=component_name,
195 containers=containers,
196 init_containers=init_containers,
198 image_pull_secrets=ips)
201 # Define deployment spec
202 spec = client.ExtensionsV1beta1DeploymentSpec(
207 # Create deployment object
208 deployment = client.ExtensionsV1beta1Deployment(
210 metadata=client.V1ObjectMeta(name=deployment_name),
216 def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type):
217 service_spec = client.V1ServiceSpec(
219 selector={"app" : component_name},
223 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations)
225 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels)
227 service = client.V1Service(
235 def parse_ports(port_list):
237 Parse the port list into a list of container ports (needed to create the container)
238 and to a set of port mappings to set up k8s services.
243 m = PORTS.match(p.strip())
245 cport = int(m.group(1))
246 hport = int (m.group(4))
248 proto = (m.group(3)).upper()
251 container_ports.append((cport, proto))
252 port_map[(cport, proto)] = hport
254 raise ValueError("Bad port specification: {0}".format(p))
256 return container_ports, port_map
258 def _parse_volumes(volume_list):
261 for v in volume_list:
262 vname = str(uuid.uuid4())
263 vhost = v['host']['path']
264 vcontainer = v['container']['bind']
265 vro = (v['container'].get('mode') == 'ro')
266 volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
267 volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro))
269 return volumes, volume_mounts
271 def _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, log_info, filebeat):
272 if not log_info or not filebeat:
274 log_dir = log_info.get("log_directory")
277 sidecar_volume_mounts = []
279 # Create the volume for component log files and volume mounts for the component and sidecar containers
280 volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
281 volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
282 sc_path = log_info.get("alternate_fb_path") or "{0}/{1}".format(filebeat["log_path"], component_name)
283 sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
285 # Create the volume for sidecar data and the volume mount for it
286 volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
287 sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=filebeat["data_path"]))
289 # Create the volume for the sidecar configuration data and the volume mount for it
290 # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
292 client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=filebeat["config_map"])))
293 sidecar_volume_mounts.append(
294 client.V1VolumeMount(name="filebeat-conf", mount_path=filebeat["config_path"], sub_path=filebeat["config_subpath"]))
296 # Finally create the container for the sidecar
297 containers.append(_create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts))
299 def _add_tls_init_container(init_containers, volumes, volume_mounts, tls_info, tls_config):
300 # Adds an InitContainer to the pod to set up TLS certificate information. For components that act as a
301 # server(tls_info["use_tls"] is True), the InitContainer will populate a directory with server and CA certificate
302 # materials in various formats. For other components (tls_info["use_tls"] is False, or tls_info is not specified),
303 # the InitContainer will populate a directory with CA certificate materials in PEM and JKS formats.
304 # In either case, the certificate directory is mounted onto the component container filesystem at the location
305 # specified by tls_info["component_cert_dir"], if present, otherwise at the configured default mount point
306 # (tls_config["component_cert_dir"]).
308 cert_directory = tls_info.get("cert_directory") or tls_config.get("component_cert_dir")
310 env["TLS_SERVER"] = "true" if tls_info.get("use_tls") else "false"
312 # Create the certificate volume and volume mounts
313 volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource()))
314 volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=cert_directory))
315 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])]
317 # Create the init container
318 init_containers.append(_create_container_object("init-tls", tls_config["image"], False, volume_mounts=init_volume_mounts, env=env))
320 def _process_port_map(port_map):
321 service_ports = [] # Ports exposed internally on the k8s network
322 exposed_ports = [] # Ports to be mapped to ports on the k8s nodes via NodePort
323 for (cport, proto), hport in port_map.items():
324 name = "xport-{0}-{1}".format(proto[0].lower(), cport)
327 service_ports.append(client.V1ServicePort(port=cport, protocol=proto, name=name[1:]))
329 exposed_ports.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name))
330 return service_ports, exposed_ports
332 def _service_exists(location, namespace, component_name):
335 _configure_api(location)
336 client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace)
338 except client.rest.ApiException:
343 def _patch_deployment(location, namespace, deployment, modify):
345 Gets the current spec for 'deployment' in 'namespace'
346 in the k8s cluster at 'location',
347 uses the 'modify' function to change the spec,
348 then sends the updated spec to k8s.
350 _configure_api(location)
352 # Get deployment spec
353 spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
355 # Apply changes to spec
358 # Patch the deploy with updated spec
359 client.ExtensionsV1beta1Api().patch_namespaced_deployment(deployment, namespace, spec)
361 def _execute_command_in_pod(location, namespace, pod_name, command):
363 Execute the command (specified by an argv-style list in the "command" parameter) in
364 the specified pod in the specified namespace at the specified location.
365 For now at least, we use this only to
366 run a notification script in a pod after a configuration change.
368 The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec".
369 Unfortunately due to a bug/limitation in the Python client library, we can't call it directly.
370 We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API.
371 I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py.
372 There are several issues tracking this, in various states. It isn't clear that there will ever
374 - https://github.com/kubernetes-client/python/issues/58
375 - https://github.com/kubernetes-client/python/issues/409
376 - https://github.com/kubernetes-client/python/issues/526
378 The main consequence of the workaround using "stream" is that the caller does not get an indication
379 of the exit code returned by the command when it completes execution. It turns out that the
380 original implementation of notification in the Docker plugin did not use this result, so we can
381 still match the original notification functionality.
383 The "stream" approach returns a string containing any output sent by the command to stdout or stderr.
384 We'll return that so it can logged.
386 _configure_api(location)
388 output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec,
396 except client.rest.ApiException as e:
397 # If the exception indicates the pod wasn't found, it's not a fatal error.
398 # It existed when we enumerated the pods for the deployment but no longer exists.
399 # Unfortunately, the only way to distinguish a pod not found from any other error
400 # is by looking at the reason text.
401 # (The ApiException's "status" field should contain the HTTP status code, which would
402 # be 404 if the pod isn't found, but empirical testing reveals that "status" is set
404 if "404 not found" in e.reason.lower():
405 output = "Pod not found"
409 return {"pod" : pod_name, "output" : output}
411 def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs):
413 This will create a k8s Deployment and, if needed, one or two k8s Services.
414 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
415 We're not exposing k8s to the component developer and the blueprint author.
416 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide
417 the details from the component developer and the blueprint author.)
419 namespace: the Kubernetes namespace into which the component is deployed
420 component_name: the component name, used to derive names of Kubernetes entities
421 image: the docker image for the component being deployed
422 replica: the number of instances of the component to be deployed
423 always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of
424 the Docker image for the component, even if it is already present on the Kubernetes node.
426 - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images.
427 (DON'T PANIC: these are just the names of secrets held in the Kubernetes secret store.)
428 - filebeat: a dictionary of filebeat sidecar parameters:
429 "log_path" : mount point for log volume in filebeat container
430 "data_path" : mount point for data volume in filebeat container
431 "config_path" : mount point for config volume in filebeat container
432 "config_subpath" : subpath for config data in filebeat container
433 "config_map" : ConfigMap holding the filebeat configuration
434 "image": Docker image to use for filebeat
435 - tls: a dictionary of TLS-related information:
436 "cert_path": mount point for certificate volume in init container
437 "image": Docker image to use for TLS init container
438 "component_cert_dir" : default mount point for certs
440 - volumes: array of volume objects, where a volume object is:
441 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
442 - ports: array of strings in the form "container_port:host_port"
443 - env: map of name-value pairs ( {name0: value0, name1: value1...}
444 - log_info: an object with info for setting up ELK logging, with the form:
445 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}
446 - tls_info: an object with info for setting up TLS (HTTPS), with the form:
447 {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" }
448 - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"}
449 These label will be set on all the pods deployed as a result of this deploy() invocation.
450 - resources: dict with optional "limits" and "requests" resource requirements, each a dict containing:
451 - cpu: number CPU usage, like 0.5
452 - memory: string memory requirement, like "2Gi"
453 - readiness: dict with health check info; if present, used to create a readiness probe for the main container. Includes:
454 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
455 - interval: period (in seconds) between probes
456 - timeout: time (in seconds) to allow a probe to complete
457 - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types
458 - path: the full path to the script to be executed in the container for "script" and "docker" types
459 - liveness: dict with health check info; if present, used to create a liveness probe for the main container. Includes:
460 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
461 - interval: period (in seconds) between probes
462 - timeout: time (in seconds) to allow a probe to complete
463 - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types
464 - path: the full path to the script to be executed in the container for "script" and "docker" types
465 - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
469 deployment_ok = False
470 cip_service_created = False
471 deployment_description = {
472 "namespace": namespace,
473 "location" : kwargs.get("k8s_location"),
481 _configure_api(kwargs.get("k8s_location"))
482 core = client.CoreV1Api()
483 ext = client.ExtensionsV1beta1Api()
485 # Parse the port mapping
486 container_ports, port_map = parse_ports(kwargs.get("ports", []))
488 # Parse the volumes list into volumes and volume_mounts for the deployment
489 volumes, volume_mounts = _parse_volumes(kwargs.get("volumes", []))
491 # Initialize the list of containers that will be part of the pod
495 # Set up the ELK logging sidecar container, if needed
496 _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"), k8sconfig.get("filebeat"))
498 # Set up TLS information
499 _add_tls_init_container(init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {}, k8sconfig.get("tls"))
501 # Create the container for the component
502 # Make it the first container in the pod
503 container_args = {key: kwargs.get(key) for key in ("env", "readiness", "liveness", "resources")}
504 container_args['container_ports'] = container_ports
505 container_args['volume_mounts'] = volume_mounts
506 containers.insert(0, _create_container_object(component_name, image, always_pull, **container_args))
508 # Build the k8s Deployment object
509 labels = kwargs.get("labels", {})
510 labels["app"] = component_name
511 dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"])
514 ext.create_namespaced_deployment(namespace, dep)
516 deployment_description["deployment"] = _create_deployment_name(component_name)
518 # Create service(s), if a port mapping is specified
520 service_ports, exposed_ports = _process_port_map(port_map)
522 # Create a ClusterIP service for access via the k8s network
523 service = _create_service_object(_create_service_name(component_name), component_name, service_ports, None, labels, "ClusterIP")
524 core.create_namespaced_service(namespace, service)
525 cip_service_created = True
526 deployment_description["services"].append(_create_service_name(component_name))
528 # If there are ports to be exposed on the k8s nodes, create a "NodePort" service
531 _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, '', labels, "NodePort")
532 core.create_namespaced_service(namespace, exposed_service)
533 deployment_description["services"].append(_create_exposed_service_name(component_name))
535 except Exception as e:
536 # If the ClusterIP service was created, delete the service:
537 if cip_service_created:
538 core.delete_namespaced_service(_create_service_name(component_name), namespace)
539 # If the deployment was created but not the service, delete the deployment
541 client.ExtensionsV1beta1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, body=client.V1DeleteOptions())
544 return dep, deployment_description
546 def undeploy(deployment_description):
547 _configure_api(deployment_description["location"])
549 namespace = deployment_description["namespace"]
551 # remove any services associated with the component
552 for service in deployment_description["services"]:
553 client.CoreV1Api().delete_namespaced_service(service, namespace)
555 # Have k8s delete the underlying pods and replicaset when deleting the deployment.
556 options = client.V1DeleteOptions(propagation_policy="Foreground")
557 client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options)
559 def is_available(location, namespace, component_name):
560 _configure_api(location)
561 dep_status = client.AppsV1beta1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace)
562 # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec
563 # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation
564 return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas
566 def scale(deployment_description, replicas):
567 ''' Trigger a scaling operation by updating the replica count for the Deployment '''
569 def update_replica_count(spec):
570 spec.spec.replicas = replicas
573 _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_replica_count)
575 def upgrade(deployment_description, image, container_index = 0):
576 ''' Trigger a rolling upgrade by sending a new image name/tag to k8s '''
578 def update_image(spec):
579 spec.spec.template.spec.containers[container_index].image = image
582 _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_image)
584 def rollback(deployment_description, rollback_to=0):
586 Undo upgrade by rolling back to a previous revision of the deployment.
587 By default, go back one revision.
588 rollback_to can be used to supply a specific revision number.
589 Returns the image for the app container and the replica count from the rolled-back deployment
593 Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method.
594 The k8s python client code throws an exception while processing the response from the API.
596 - https://github.com/kubernetes-client/python/issues/491
597 - https://github.com/kubernetes/kubernetes/pull/63837
598 The fix has been merged into the master branch but is not in the latest release.
600 _configure_api(deployment_description["location"])
601 deployment = deployment_description["deployment"]
602 namespace = deployment_description["namespace"]
604 # Initiate the rollback
605 client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback(
608 client.AppsV1beta1DeploymentRollback(name=deployment, rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to)))
610 # Read back the spec for the rolled-back deployment
611 spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
612 return spec.spec.template.spec.containers[0].image, spec.spec.replicas
614 def execute_command_in_deployment(deployment_description, command):
616 Enumerates the pods in the k8s deployment identified by "deployment_description",
617 then executes the command (represented as an argv-style list) in "command" in
618 container 0 (the main application container) each of those pods.
620 Note that the sets of pods associated with a deployment can change over time. The
621 enumeration is a snapshot at one point in time. The command will not be executed in
622 pods that are created after the initial enumeration. If a pod disappears after the
623 initial enumeration and before the command is executed, the attempt to execute the
624 command will fail. This is not treated as a fatal error.
626 This approach is reasonable for the one current use case for "execute_command": running a
627 script to notify a container that its configuration has changed as a result of a
628 policy change. In this use case, the new configuration information is stored into
629 the configuration store (Consul), the pods are enumerated, and the command is executed.
630 If a pod disappears after the enumeration, the fact that the command cannot be run
631 doesn't matter--a nonexistent pod doesn't need to be reconfigured. Similarly, a pod that
632 comes up after the enumeration will get its initial configuration from the updated version
635 The optimal solution here would be for k8s to provide an API call to execute a command in
636 all of the pods for a deployment. Unfortunately, k8s does not provide such a call--the
637 only call provided by k8s operates at the pod level, not the deployment level.
639 Another interesting k8s factoid: there's no direct way to list the pods belong to a
640 particular k8s deployment. The deployment code above sets a label ("k8sdeployment") on
641 the pod that has the k8s deployment name. To list the pods, the code below queries for
642 pods with the label carrying the deployment name.
644 location = deployment_description["location"]
645 _configure_api(location)
646 deployment = deployment_description["deployment"]
647 namespace = deployment_description["namespace"]
649 # Get names of all the running pods belonging to the deployment
650 pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod(
651 namespace = namespace,
652 label_selector = "k8sdeployment={0}".format(deployment),
653 field_selector = "status.phase=Running"
656 # Execute command in the running pods
657 return [_execute_command_in_pod(location, namespace, pod_name, command)
658 for pod_name in pod_names]