1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
20 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
25 from kubernetes import config, client, stream
27 # Default values for readiness probe
28 PROBE_DEFAULT_PERIOD = 15
29 PROBE_DEFAULT_TIMEOUT = 1
31 # Location of k8s cluster config file ("kubeconfig")
32 K8S_CONFIG_PATH="/opt/onap/kube/kubeconfig"
34 # Regular expression for interval/timeout specification
35 INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$")
36 # Conversion factors to seconds
37 FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600}
39 # Regular expression for port mapping
40 # group 1: container port
41 # group 2: / + protocol
44 PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$")
46 def _create_deployment_name(component_name):
47 return "dep-{0}".format(component_name)[:63]
49 def _create_service_name(component_name):
50 return "{0}".format(component_name)[:63]
52 def _create_exposed_service_name(component_name):
53 return ("x{0}".format(component_name))[:63]
55 def _configure_api(location=None):
56 # Look for a kubernetes config file
57 if os.path.exists(K8S_CONFIG_PATH):
58 config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False)
60 # Maybe we're running in a k8s container and we can use info provided by k8s
61 # We would like to use:
62 # config.load_incluster_config()
63 # but this looks into os.environ for kubernetes host and port, and from
64 # the plugin those aren't visible. So we use the InClusterConfigLoader class,
65 # where we can set the environment to what we like.
66 # This is probably brittle! Maybe there's a better alternative.
68 config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local",
69 config.incluster_config.SERVICE_PORT_ENV_NAME : "443"
71 config.incluster_config.InClusterConfigLoader(
72 token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME,
73 cert_filename=config.incluster_config.SERVICE_CERT_FILENAME,
77 def _parse_interval(t):
79 Parse an interval specification
81 - a simple integer quantity, interpreted as seconds
82 - a string representation of a decimal integer, interpreted as seconds
83 - a string consisting of a represention of an decimal integer followed by a unit,
84 with "s" representing seconds, "m" representing minutes,
85 and "h" representing hours
86 Used for compatibility with the Docker plugin, where time intervals
87 for health checks were specified as strings with a number and a unit.
88 See 'intervalspec' above for the regular expression that's accepted.
90 m = INTERVAL_SPEC.match(str(t))
92 time = int(m.group(1)) * FACTORS[m.group(2)]
94 raise ValueError("Bad interval specification: {0}".format(t))
97 def _create_probe(hc, port):
98 ''' Create a Kubernetes probe based on info in the health check dictionary hc '''
99 probe_type = hc['type']
101 period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD))
102 timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT))
103 if probe_type in ['http', 'https']:
104 probe = client.V1Probe(
105 failure_threshold = 1,
106 initial_delay_seconds = 5,
107 period_seconds = period,
108 timeout_seconds = timeout,
109 http_get = client.V1HTTPGetAction(
110 path = hc['endpoint'],
112 scheme = probe_type.upper()
115 elif probe_type in ['script', 'docker']:
116 probe = client.V1Probe(
117 failure_threshold = 1,
118 initial_delay_seconds = 5,
119 period_seconds = period,
120 timeout_seconds = timeout,
121 _exec = client.V1ExecAction(
122 command = hc['script'].split( )
127 def _create_resources(resources=None):
128 if resources is not None:
129 resources_obj = client.V1ResourceRequirements(
130 limits = resources.get("limits"),
131 requests = resources.get("requests")
137 def _create_container_object(name, image, always_pull, **kwargs):
138 # Set up environment variables
139 # Copy any passed in environment variables
140 env = kwargs.get('env') or {}
141 env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env]
142 # Add POD_IP with the IP address of the pod running the container
143 pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP"))
144 env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip))
146 # If a health check is specified, create a readiness/liveness probe
147 # (For an HTTP-based check, we assume it's at the first container port)
148 readiness = kwargs.get('readiness')
149 liveness = kwargs.get('liveness')
150 resources = kwargs.get('resources')
151 container_ports = kwargs.get('container_ports') or []
153 hc_port = container_ports[0][0] if container_ports else None
154 probe = _create_probe(readiness, hc_port) if readiness else None
155 live_probe = _create_probe(liveness, hc_port) if liveness else None
156 resources_obj = _create_resources(resources) if resources else None
157 port_objs = [client.V1ContainerPort(container_port=port, protocol=proto)
158 for port, proto in container_ports]
160 # Define container for pod
161 return client.V1Container(
164 image_pull_policy='Always' if always_pull else 'IfNotPresent',
167 volume_mounts=kwargs.get('volume_mounts') or [],
168 resources=resources_obj,
169 readiness_probe=probe,
170 liveness_probe=live_probe
173 def _create_deployment_object(component_name,
181 deployment_name = _create_deployment_name(component_name)
183 # Label the pod with the deployment name, so we can find it easily
184 labels.update({"k8sdeployment" : deployment_name})
186 # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
187 # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
189 for secret in pull_secrets:
190 ips.append(client.V1LocalObjectReference(name=secret))
192 # Define pod template
193 template = client.V1PodTemplateSpec(
194 metadata=client.V1ObjectMeta(labels=labels),
195 spec=client.V1PodSpec(hostname=component_name,
196 containers=containers,
197 init_containers=init_containers,
199 image_pull_secrets=ips)
202 # Define deployment spec
203 spec = client.ExtensionsV1beta1DeploymentSpec(
208 # Create deployment object
209 deployment = client.ExtensionsV1beta1Deployment(
211 metadata=client.V1ObjectMeta(name=deployment_name),
217 def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type):
218 service_spec = client.V1ServiceSpec(
220 selector={"app" : component_name},
224 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations)
226 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels)
228 service = client.V1Service(
236 def parse_ports(port_list):
238 Parse the port list into a list of container ports (needed to create the container)
239 and to a set of port mappings to set up k8s services.
244 m = PORTS.match(p.strip())
246 cport = int(m.group(1))
247 hport = int (m.group(4))
249 proto = (m.group(3)).upper()
252 container_ports.append((cport, proto))
253 port_map[(cport, proto)] = hport
255 raise ValueError("Bad port specification: {0}".format(p))
257 return container_ports, port_map
259 def _parse_volumes(volume_list):
262 for v in volume_list:
263 vname = str(uuid.uuid4())
264 vhost = v['host']['path']
265 vcontainer = v['container']['bind']
266 vro = (v['container'].get('mode') == 'ro')
267 volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
268 volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro))
270 return volumes, volume_mounts
272 def _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, log_info, filebeat):
273 if not log_info or not filebeat:
275 log_dir = log_info.get("log_directory")
278 sidecar_volume_mounts = []
280 # Create the volume for component log files and volume mounts for the component and sidecar containers
281 volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
282 volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
283 sc_path = log_info.get("alternate_fb_path") or "{0}/{1}".format(filebeat["log_path"], component_name)
284 sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
286 # Create the volume for sidecar data and the volume mount for it
287 volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
288 sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=filebeat["data_path"]))
290 # Create the volume for the sidecar configuration data and the volume mount for it
291 # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
293 client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=filebeat["config_map"])))
294 sidecar_volume_mounts.append(
295 client.V1VolumeMount(name="filebeat-conf", mount_path=filebeat["config_path"], sub_path=filebeat["config_subpath"]))
297 # Finally create the container for the sidecar
298 containers.append(_create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts))
300 def _add_tls_init_container(init_containers, volumes, volume_mounts, tls_info, tls_config):
301 # Adds an InitContainer to the pod to set up TLS certificate information. For components that act as a
302 # server(tls_info["use_tls"] is True), the InitContainer will populate a directory with server and CA certificate
303 # materials in various formats. For other components (tls_info["use_tls"] is False, or tls_info is not specified),
304 # the InitContainer will populate a directory with CA certificate materials in PEM and JKS formats.
305 # In either case, the certificate directory is mounted onto the component container filesystem at the location
306 # specified by tls_info["component_cert_dir"], if present, otherwise at the configured default mount point
307 # (tls_config["component_cert_dir"]).
309 cert_directory = tls_info.get("cert_directory") or tls_config.get("component_cert_dir")
311 env["TLS_SERVER"] = "true" if tls_info.get("use_tls") else "false"
313 # Create the certificate volume and volume mounts
314 volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource()))
315 volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=cert_directory))
316 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])]
318 # Create the init container
319 init_containers.append(_create_container_object("init-tls", tls_config["image"], False, volume_mounts=init_volume_mounts, env=env))
321 def _process_port_map(port_map):
322 service_ports = [] # Ports exposed internally on the k8s network
323 exposed_ports = [] # Ports to be mapped to ports on the k8s nodes via NodePort
324 for (cport, proto), hport in port_map.items():
325 name = "xport-{0}-{1}".format(proto[0].lower(), cport)
328 service_ports.append(client.V1ServicePort(port=cport, protocol=proto, name=name[1:]))
330 exposed_ports.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name))
331 return service_ports, exposed_ports
333 def _service_exists(location, namespace, component_name):
336 _configure_api(location)
337 client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace)
339 except client.rest.ApiException:
344 def _patch_deployment(location, namespace, deployment, modify):
346 Gets the current spec for 'deployment' in 'namespace'
347 in the k8s cluster at 'location',
348 uses the 'modify' function to change the spec,
349 then sends the updated spec to k8s.
351 _configure_api(location)
353 # Get deployment spec
354 spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
356 # Apply changes to spec
359 # Patch the deploy with updated spec
360 client.ExtensionsV1beta1Api().patch_namespaced_deployment(deployment, namespace, spec)
362 def _execute_command_in_pod(location, namespace, pod_name, command):
364 Execute the command (specified by an argv-style list in the "command" parameter) in
365 the specified pod in the specified namespace at the specified location.
366 For now at least, we use this only to
367 run a notification script in a pod after a configuration change.
369 The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec".
370 Unfortunately due to a bug/limitation in the Python client library, we can't call it directly.
371 We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API.
372 I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py.
373 There are several issues tracking this, in various states. It isn't clear that there will ever
375 - https://github.com/kubernetes-client/python/issues/58
376 - https://github.com/kubernetes-client/python/issues/409
377 - https://github.com/kubernetes-client/python/issues/526
379 The main consequence of the workaround using "stream" is that the caller does not get an indication
380 of the exit code returned by the command when it completes execution. It turns out that the
381 original implementation of notification in the Docker plugin did not use this result, so we can
382 still match the original notification functionality.
384 The "stream" approach returns a string containing any output sent by the command to stdout or stderr.
385 We'll return that so it can logged.
387 _configure_api(location)
389 output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec,
397 except client.rest.ApiException as e:
398 # If the exception indicates the pod wasn't found, it's not a fatal error.
399 # It existed when we enumerated the pods for the deployment but no longer exists.
400 # Unfortunately, the only way to distinguish a pod not found from any other error
401 # is by looking at the reason text.
402 # (The ApiException's "status" field should contain the HTTP status code, which would
403 # be 404 if the pod isn't found, but empirical testing reveals that "status" is set
405 if "404 not found" in e.reason.lower():
406 output = "Pod not found"
410 return {"pod" : pod_name, "output" : output}
412 def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs):
414 This will create a k8s Deployment and, if needed, one or two k8s Services.
415 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
416 We're not exposing k8s to the component developer and the blueprint author.
417 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide
418 the details from the component developer and the blueprint author.)
420 namespace: the Kubernetes namespace into which the component is deployed
421 component_name: the component name, used to derive names of Kubernetes entities
422 image: the docker image for the component being deployed
423 replica: the number of instances of the component to be deployed
424 always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of
425 the Docker image for the component, even if it is already present on the Kubernetes node.
427 - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images.
428 (DON'T PANIC: these are just the names of secrets held in the Kubernetes secret store.)
429 - filebeat: a dictionary of filebeat sidecar parameters:
430 "log_path" : mount point for log volume in filebeat container
431 "data_path" : mount point for data volume in filebeat container
432 "config_path" : mount point for config volume in filebeat container
433 "config_subpath" : subpath for config data in filebeat container
434 "config_map" : ConfigMap holding the filebeat configuration
435 "image": Docker image to use for filebeat
436 - tls: a dictionary of TLS-related information:
437 "cert_path": mount point for certificate volume in init container
438 "image": Docker image to use for TLS init container
439 "component_cert_dir" : default mount point for certs
441 - volumes: array of volume objects, where a volume object is:
442 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
443 - ports: array of strings in the form "container_port:host_port"
444 - env: map of name-value pairs ( {name0: value0, name1: value1...}
445 - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
446 - log_info: an object with info for setting up ELK logging, with the form:
447 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}
448 - tls_info: an object with info for setting up TLS (HTTPS), with the form:
449 {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" }
450 - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"}
451 These label will be set on all the pods deployed as a result of this deploy() invocation.
452 - resources: dict with optional "limits" and "requests" resource requirements, each a dict containing:
453 - cpu: number CPU usage, like 0.5
454 - memory: string memory requirement, like "2Gi"
455 - readiness: dict with health check info; if present, used to create a readiness probe for the main container. Includes:
456 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
457 - interval: period (in seconds) between probes
458 - timeout: time (in seconds) to allow a probe to complete
459 - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types
460 - path: the full path to the script to be executed in the container for "script" and "docker" types
461 - liveness: dict with health check info; if present, used to create a liveness probe for the main container. Includes:
462 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
463 - interval: period (in seconds) between probes
464 - timeout: time (in seconds) to allow a probe to complete
465 - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types
466 - path: the full path to the script to be executed in the container for "script" and "docker" types
467 - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
471 deployment_ok = False
472 cip_service_created = False
473 deployment_description = {
474 "namespace": namespace,
475 "location" : kwargs.get("k8s_location"),
483 _configure_api(kwargs.get("k8s_location"))
484 core = client.CoreV1Api()
485 ext = client.ExtensionsV1beta1Api()
487 # Parse the port mapping
488 container_ports, port_map = parse_ports(kwargs.get("ports", []))
490 # Parse the volumes list into volumes and volume_mounts for the deployment
491 volumes, volume_mounts = _parse_volumes(kwargs.get("volumes", []))
493 # Initialize the list of containers that will be part of the pod
497 # Set up the ELK logging sidecar container, if needed
498 _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"), k8sconfig.get("filebeat"))
500 # Set up TLS information
501 _add_tls_init_container(init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {}, k8sconfig.get("tls"))
503 # Create the container for the component
504 # Make it the first container in the pod
505 container_args = {key: kwargs.get(key) for key in ("env", "readiness", "liveness", "resources")}
506 container_args['container_ports'] = container_ports
507 container_args['volume_mounts'] = volume_mounts
508 containers.insert(0, _create_container_object(component_name, image, always_pull, **container_args))
510 # Build the k8s Deployment object
511 labels = kwargs.get("labels", {})
512 labels["app"] = component_name
513 dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"])
516 ext.create_namespaced_deployment(namespace, dep)
518 deployment_description["deployment"] = _create_deployment_name(component_name)
520 # Create service(s), if a port mapping is specified
522 service_ports, exposed_ports = _process_port_map(port_map)
524 # If there are ports to be exposed via MSB, set up the annotation for the service
525 msb_list = kwargs.get("msb_list")
526 annotations = msb.create_msb_annotation(msb_list) if msb_list else ''
528 # Create a ClusterIP service for access via the k8s network
529 service = _create_service_object(_create_service_name(component_name), component_name, service_ports, annotations, labels, "ClusterIP")
530 core.create_namespaced_service(namespace, service)
531 cip_service_created = True
532 deployment_description["services"].append(_create_service_name(component_name))
534 # If there are ports to be exposed on the k8s nodes, create a "NodePort" service
537 _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, '', labels, "NodePort")
538 core.create_namespaced_service(namespace, exposed_service)
539 deployment_description["services"].append(_create_exposed_service_name(component_name))
541 except Exception as e:
542 # If the ClusterIP service was created, delete the service:
543 if cip_service_created:
544 core.delete_namespaced_service(_create_service_name(component_name), namespace)
545 # If the deployment was created but not the service, delete the deployment
547 client.ExtensionsV1beta1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, body=client.V1DeleteOptions())
550 return dep, deployment_description
552 def undeploy(deployment_description):
553 _configure_api(deployment_description["location"])
555 namespace = deployment_description["namespace"]
557 # remove any services associated with the component
558 for service in deployment_description["services"]:
559 client.CoreV1Api().delete_namespaced_service(service, namespace)
561 # Have k8s delete the underlying pods and replicaset when deleting the deployment.
562 options = client.V1DeleteOptions(propagation_policy="Foreground")
563 client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options)
565 def is_available(location, namespace, component_name):
566 _configure_api(location)
567 dep_status = client.AppsV1beta1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace)
568 # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec
569 # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation
570 return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas
572 def scale(deployment_description, replicas):
573 ''' Trigger a scaling operation by updating the replica count for the Deployment '''
575 def update_replica_count(spec):
576 spec.spec.replicas = replicas
579 _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_replica_count)
581 def upgrade(deployment_description, image, container_index = 0):
582 ''' Trigger a rolling upgrade by sending a new image name/tag to k8s '''
584 def update_image(spec):
585 spec.spec.template.spec.containers[container_index].image = image
588 _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_image)
590 def rollback(deployment_description, rollback_to=0):
592 Undo upgrade by rolling back to a previous revision of the deployment.
593 By default, go back one revision.
594 rollback_to can be used to supply a specific revision number.
595 Returns the image for the app container and the replica count from the rolled-back deployment
599 Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method.
600 The k8s python client code throws an exception while processing the response from the API.
602 - https://github.com/kubernetes-client/python/issues/491
603 - https://github.com/kubernetes/kubernetes/pull/63837
604 The fix has been merged into the master branch but is not in the latest release.
606 _configure_api(deployment_description["location"])
607 deployment = deployment_description["deployment"]
608 namespace = deployment_description["namespace"]
610 # Initiate the rollback
611 client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback(
614 client.AppsV1beta1DeploymentRollback(name=deployment, rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to)))
616 # Read back the spec for the rolled-back deployment
617 spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
618 return spec.spec.template.spec.containers[0].image, spec.spec.replicas
620 def execute_command_in_deployment(deployment_description, command):
622 Enumerates the pods in the k8s deployment identified by "deployment_description",
623 then executes the command (represented as an argv-style list) in "command" in
624 container 0 (the main application container) each of those pods.
626 Note that the sets of pods associated with a deployment can change over time. The
627 enumeration is a snapshot at one point in time. The command will not be executed in
628 pods that are created after the initial enumeration. If a pod disappears after the
629 initial enumeration and before the command is executed, the attempt to execute the
630 command will fail. This is not treated as a fatal error.
632 This approach is reasonable for the one current use case for "execute_command": running a
633 script to notify a container that its configuration has changed as a result of a
634 policy change. In this use case, the new configuration information is stored into
635 the configuration store (Consul), the pods are enumerated, and the command is executed.
636 If a pod disappears after the enumeration, the fact that the command cannot be run
637 doesn't matter--a nonexistent pod doesn't need to be reconfigured. Similarly, a pod that
638 comes up after the enumeration will get its initial configuration from the updated version
641 The optimal solution here would be for k8s to provide an API call to execute a command in
642 all of the pods for a deployment. Unfortunately, k8s does not provide such a call--the
643 only call provided by k8s operates at the pod level, not the deployment level.
645 Another interesting k8s factoid: there's no direct way to list the pods belong to a
646 particular k8s deployment. The deployment code above sets a label ("k8sdeployment") on
647 the pod that has the k8s deployment name. To list the pods, the code below queries for
648 pods with the label carrying the deployment name.
650 location = deployment_description["location"]
651 _configure_api(location)
652 deployment = deployment_description["deployment"]
653 namespace = deployment_description["namespace"]
655 # Get names of all the running pods belonging to the deployment
656 pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod(
657 namespace = namespace,
658 label_selector = "k8sdeployment={0}".format(deployment),
659 field_selector = "status.phase=Running"
662 # Execute command in the running pods
663 return [_execute_command_in_pod(location, namespace, pod_name, command)
664 for pod_name in pod_names]