806b41e71d3f58f23d0431e26010113e0f9ad6f9
[dcaegen2/platform/plugins.git] / k8s / k8sclient / k8sclient.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #      http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
18 #
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
20 import os
21 import re
22 import uuid
23 from msb import msb
24 from kubernetes import config, client, stream
25
26 # Default values for readiness probe
27 PROBE_DEFAULT_PERIOD = 15
28 PROBE_DEFAULT_TIMEOUT = 1
29
30 # Regular expression for interval/timeout specification
31 INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$")
32 # Conversion factors to seconds
33 FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600}
34
35 # Regular expression for port mapping
36 # group 1: container port
37 # group 2: / + protocol
38 # group 3: protocol
39 # group 4: host port
40 PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$")
41    
42 def _create_deployment_name(component_name):
43     return "dep-{0}".format(component_name)
44
45 def _create_service_name(component_name):
46     return "{0}".format(component_name)
47
48 def _create_exposed_service_name(component_name):
49     return ("x{0}".format(component_name))[:63]
50
51 def _configure_api():
52     # Look for a kubernetes config file in ~/.kube/config
53     kubepath = os.path.join(os.environ["HOME"], '.kube/config')
54     if os.path.exists(kubepath):
55         config.load_kube_config(kubepath)
56     else:
57         # Maybe we're running in a k8s container and we can use info provided by k8s
58         # We would like to use:
59         # config.load_incluster_config()
60         # but this looks into os.environ for kubernetes host and port, and from
61         # the plugin those aren't visible.   So we use the InClusterConfigLoader class,
62         # where we can set the environment to what we like.
63         # This is probably brittle!  Maybe there's a better alternative.
64         localenv = {
65             config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local",
66             config.incluster_config.SERVICE_PORT_ENV_NAME : "443"
67         }
68         config.incluster_config.InClusterConfigLoader(
69             token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME,
70             cert_filename=config.incluster_config.SERVICE_CERT_FILENAME,
71             environ=localenv
72         ).load_and_set()
73
74 def _parse_interval(t):
75     """
76     Parse an interval specification
77     t can be
78        - a simple integer quantity, interpreted as seconds
79        - a string representation of a decimal integer, interpreted as seconds
80        - a string consisting of a represention of an decimal integer followed by a unit,
81          with "s" representing seconds, "m" representing minutes,
82          and "h" representing hours
83     Used for compatibility with the Docker plugin, where time intervals
84     for health checks were specified as strings with a number and a unit.
85     See 'intervalspec' above for the regular expression that's accepted.
86     """
87     m = INTERVAL_SPEC.match(str(t))
88     if m:
89         time = int(m.group(1)) * FACTORS[m.group(2)]
90     else:
91         raise ValueError("Bad interval specification: {0}".format(t))
92     return time
93
94 def _create_probe(hc, port, use_tls=False):
95     ''' Create a Kubernetes probe based on info in the health check dictionary hc '''
96     probe_type = hc['type']
97     probe = None
98     period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD))
99     timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT))
100     if probe_type in ['http', 'https']:
101         probe = client.V1Probe(
102           failure_threshold = 1,
103           initial_delay_seconds = 5,
104           period_seconds = period,
105           timeout_seconds = timeout,
106           http_get = client.V1HTTPGetAction(
107               path = hc['endpoint'],
108               port = port,
109               scheme = probe_type.upper()
110           )
111         )
112     elif probe_type in ['script', 'docker']:
113         probe = client.V1Probe(
114           failure_threshold = 1,
115           initial_delay_seconds = 5,
116           period_seconds = period,
117           timeout_seconds = timeout,
118           _exec = client.V1ExecAction(
119               command = [hc['script']]
120           )
121         )
122     return probe
123
124 def _create_container_object(name, image, always_pull, use_tls=False, env={}, container_ports=[], volume_mounts = [], readiness = None):
125     # Set up environment variables
126     # Copy any passed in environment variables
127     env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env.keys()]
128     # Add POD_IP with the IP address of the pod running the container
129     pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP"))
130     env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip))
131
132     # If a health check is specified, create a readiness probe
133     # (For an HTTP-based check, we assume it's at the first container port)
134     probe = None
135
136     if readiness:
137         hc_port = None
138         if len(container_ports) > 0:
139             (hc_port, proto) = container_ports[0]
140         probe = _create_probe(readiness, hc_port, use_tls)
141
142     # Define container for pod
143     return client.V1Container(
144         name=name,
145         image=image,
146         image_pull_policy='Always' if always_pull else 'IfNotPresent',
147         env=env_vars,
148         ports=[client.V1ContainerPort(container_port=p, protocol=proto) for (p, proto) in container_ports],
149         volume_mounts = volume_mounts,
150         readiness_probe = probe
151     )
152
153 def _create_deployment_object(component_name,
154                               containers,
155                               init_containers,
156                               replicas,
157                               volumes,
158                               labels={},
159                               pull_secrets=[]):
160
161     deployment_name = _create_deployment_name(component_name)
162
163     # Label the pod with the deployment name, so we can find it easily
164     labels.update({"k8sdeployment" : deployment_name})
165
166     # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
167     # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
168     ips = []
169     for secret in pull_secrets:
170         ips.append(client.V1LocalObjectReference(name=secret))
171
172     # Define pod template
173     template = client.V1PodTemplateSpec(
174         metadata=client.V1ObjectMeta(labels=labels),
175         spec=client.V1PodSpec(hostname=component_name,
176                               containers=containers,
177                               init_containers=init_containers,
178                               volumes=volumes,
179                               image_pull_secrets=ips)
180     )
181
182     # Define deployment spec
183     spec = client.ExtensionsV1beta1DeploymentSpec(
184         replicas=replicas,
185         template=template
186     )
187
188     # Create deployment object
189     deployment = client.ExtensionsV1beta1Deployment(
190         kind="Deployment",
191         metadata=client.V1ObjectMeta(name=deployment_name),
192         spec=spec
193     )
194
195     return deployment
196
197 def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type):
198     service_spec = client.V1ServiceSpec(
199         ports=service_ports,
200         selector={"app" : component_name},
201         type=service_type
202     )
203     if annotations:
204         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations)
205     else:
206         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels)
207
208     service = client.V1Service(
209         kind="Service",
210         api_version="v1",
211         metadata=metadata,
212         spec=service_spec
213     )
214     return service
215
216 def _parse_ports(port_list):
217     '''
218     Parse the port list into a list of container ports (needed to create the container)
219     and to a set of port mappings to set up k8s services.
220     '''
221     container_ports = []
222     port_map = {}
223     for p in port_list:
224         m = PORTS.match(p.strip())
225         if m:
226             cport = int(m.group(1))
227             hport = int (m.group(4))
228             if m.group(3):
229                 proto = (m.group(3)).upper()
230             else:
231                 proto = "TCP"
232             container_ports.append((cport, proto))
233             port_map[(cport, proto)] = hport
234         else:
235             raise ValueError("Bad port specification: {0}".format(p))
236
237     return container_ports, port_map
238
239 def _parse_volumes(volume_list):
240     volumes = []
241     volume_mounts = []
242     for v in volume_list:
243         vname = str(uuid.uuid4())
244         vhost = v['host']['path']
245         vcontainer = v['container']['bind']
246         vro = (v['container']['mode'] == 'ro')
247         volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
248         volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro))
249
250     return volumes, volume_mounts
251
252 def _service_exists(namespace, component_name):
253     exists = False
254     try:
255         _configure_api()
256         client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace)
257         exists = True
258     except client.rest.ApiException:
259         pass
260
261     return exists
262
263 def _patch_deployment(namespace, deployment, modify):
264     '''
265     Gets the current spec for 'deployment' in 'namespace',
266     uses the 'modify' function to change the spec,
267     then sends the updated spec to k8s.
268     '''
269     _configure_api()
270
271     # Get deployment spec
272     spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
273
274     # Apply changes to spec
275     spec = modify(spec)
276
277     # Patch the deploy with updated spec
278     client.ExtensionsV1beta1Api().patch_namespaced_deployment(deployment, namespace, spec)
279
280 def _execute_command_in_pod(namespace, pod_name, command):
281     '''
282     Execute the command (specified by an argv-style list in  the "command" parameter) in
283     the specified pod in the specified namespace.  For now at least, we use this only to
284     run a notification script in a pod after a configuration change.
285
286     The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec".
287     Unfortunately due to a bug/limitation in the Python client library, we can't call it directly.
288     We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API.
289     I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py.
290     There are several issues tracking this, in various states.  It isn't clear that there will ever
291     be a fix.
292         - https://github.com/kubernetes-client/python/issues/58
293         - https://github.com/kubernetes-client/python/issues/409
294         - https://github.com/kubernetes-client/python/issues/526
295
296     The main consequence of the workaround using "stream" is that the caller does not get an indication
297     of the exit code returned by the command when it completes execution.   It turns out that the
298     original implementation of notification in the Docker plugin did not use this result, so we can
299     still match the original notification functionality.
300
301     The "stream" approach returns a string containing any output sent by the command to stdout or stderr.
302     We'll return that so it can logged.
303     '''
304     _configure_api()
305     try:
306         output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec,
307                              name=pod_name,
308                              namespace=namespace,
309                              command=command,
310                              stdout=True,
311                              stderr=True,
312                             stdin=False,
313                             tty=False)
314     except client.rest.ApiException as e:
315         # If the exception indicates the pod wasn't found,  it's not a fatal error.
316         # It existed when we enumerated the pods for the deployment but no longer exists.
317         # Unfortunately, the only way to distinguish a pod not found from any other error
318         # is by looking at the reason text.
319         # (The ApiException's "status" field should contain the HTTP status code, which would
320         # be 404 if the pod isn't found, but empirical testing reveals that "status" is set
321         # to zero.)
322         if "404 not found" in e.reason.lower():
323             output = "Pod not found"
324         else:
325             raise e
326
327     return {"pod" : pod_name, "output" : output}
328
329 def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs):
330     '''
331     This will create a k8s Deployment and, if needed, one or two k8s Services.
332     (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
333     We're not exposing k8s to the component developer and the blueprint author.
334     This is a conscious choice.  We want to use k8s in a controlled, consistent way, and we want to hide
335     the details from the component developer and the blueprint author.)
336
337     namespace:  the Kubernetes namespace into which the component is deployed
338     component_name:  the component name, used to derive names of Kubernetes entities
339     image: the docker image for the component being deployed
340     replica: the number of instances of the component to be deployed
341     always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of
342        the Docker image for the component, even if it is already present on the Kubernetes node.
343     k8sconfig contains:
344         - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images.
345           (DON'T PANIC:  these are just the names of secrets held in the Kubernetes secret store.)
346         - filebeat: a dictionary of filebeat sidecar parameters:
347             "log_path" : mount point for log volume in filebeat container
348             "data_path" : mount point for data volume in filebeat container
349             "config_path" : mount point for config volume in filebeat container
350             "config_subpath" :  subpath for config data in filebeat container
351             "config_map" : ConfigMap holding the filebeat configuration
352             "image": Docker image to use for filebeat
353         - tls: a dictionary of TLS init container parameters:
354             "cert_path": mount point for certificate volume in init container
355             "image": Docker image to use for TLS init container
356     kwargs may have:
357         - volumes:  array of volume objects, where a volume object is:
358             {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
359         - ports: array of strings in the form "container_port:host_port"
360         - env: map of name-value pairs ( {name0: value0, name1: value1...}
361         - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
362         - log_info: an object with info for setting up ELK logging, with the form:
363             {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}
364         - tls_info: an object with info for setting up TLS (HTTPS), with the form:
365             {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" }
366         - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"}
367             These label will be set on all the pods deployed as a result of this deploy() invocation.
368         - readiness: dict with health check info; if present, used to create a readiness probe for the main container.  Includes:
369             - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
370             - interval: period (in seconds) between probes
371             - timeout:  time (in seconds) to allow a probe to complete
372             - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types
373             - path: the full path to the script to be executed in the container for "script" and "docker" types
374
375     '''
376
377     deployment_ok = False
378     cip_service_created = False
379     deployment_description = {
380         "namespace": namespace,
381         "deployment": '',
382         "services": []
383     }
384
385     try:
386         _configure_api()
387
388         # Get API handles
389         core = client.CoreV1Api()
390         ext = client.ExtensionsV1beta1Api()
391
392         # Parse the port mapping
393         container_ports, port_map = _parse_ports(kwargs.get("ports", []))
394
395         # Parse the volumes list into volumes and volume_mounts for the deployment
396         volumes, volume_mounts = _parse_volumes(kwargs.get("volumes",[]))
397
398         # Initialize the list of containers that will be part of the pod
399         containers = []
400         init_containers = []
401
402         # Set up the ELK logging sidecar container, if needed
403         log_info = kwargs.get("log_info")
404         if log_info and "log_directory" in log_info:
405             log_dir = log_info["log_directory"]
406             fb = k8sconfig["filebeat"]
407             sidecar_volume_mounts = []
408
409             # Create the volume for component log files and volume mounts for the component and sidecar containers
410             volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
411             volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
412             sc_path = log_info["alternate_fb_path"] if "alternate_fb_path" in log_info  \
413                 else "{0}/{1}".format(fb["log_path"], component_name)
414             sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
415
416             # Create the volume for sidecar data and the volume mount for it
417             volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
418             sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=fb["data_path"]))
419
420             # Create the container for the sidecar
421             containers.append(_create_container_object("filebeat", fb["image"], False, False, {}, [], sidecar_volume_mounts))
422
423             # Create the volume for the sidecar configuration data and the volume mount for it
424             # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
425             volumes.append(
426                 client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=fb["config_map"])))
427             sidecar_volume_mounts.append(
428                 client.V1VolumeMount(name="filebeat-conf", mount_path=fb["config_path"], sub_path=fb["config_subpath"]))
429
430         # Set up the TLS init container, if needed
431         tls_info = kwargs.get("tls_info")
432         use_tls = False
433         if tls_info and "use_tls" in tls_info and tls_info["use_tls"]:
434             if "cert_directory" in tls_info and len(tls_info["cert_directory"]) > 0:
435                 use_tls = True
436                 tls_config = k8sconfig["tls"]
437
438                 # Create the certificate volume and volume mounts
439                 volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource()))
440                 volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=tls_info["cert_directory"]))
441                 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])]
442
443                 # Create the init container
444                 init_containers.append(_create_container_object("init-tls", tls_config["image"], False, False, {}, [], init_volume_mounts))
445
446         # Create the container for the component
447         # Make it the first container in the pod
448         containers.insert(0, _create_container_object(component_name, image, always_pull, use_tls, kwargs.get("env", {}), container_ports, volume_mounts, kwargs["readiness"]))
449
450         # Build the k8s Deployment object
451         labels = kwargs.get("labels", {})
452         labels.update({"app": component_name})
453         dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"])
454
455         # Have k8s deploy it
456         ext.create_namespaced_deployment(namespace, dep)
457         deployment_ok = True
458         deployment_description["deployment"] = _create_deployment_name(component_name)
459
460         # Create service(s), if a port mapping is specified
461         if port_map:
462             service_ports = []      # Ports exposed internally on the k8s network
463             exposed_ports = []      # Ports to be mapped to ports on the k8s nodes via NodePort
464             for (cport, proto), hport in port_map.iteritems():
465                 service_ports.append(client.V1ServicePort(port=int(cport),protocol=proto,name="port-{0}-{1}".format(proto[0].lower(), cport)))
466                 if int(hport) != 0:
467                     exposed_ports.append(client.V1ServicePort(port=int(cport),protocol=proto,node_port=int(hport),name="xport-{0}-{1}".format(proto[0].lower(),cport)))
468
469             # If there are ports to be exposed via MSB, set up the annotation for the service
470             msb_list = kwargs.get("msb_list")
471             annotations = msb.create_msb_annotation(msb_list) if msb_list else ''
472
473             # Create a ClusterIP service for access via the k8s network
474             service = _create_service_object(_create_service_name(component_name), component_name, service_ports, annotations, labels, "ClusterIP")
475             core.create_namespaced_service(namespace, service)
476             cip_service_created = True
477             deployment_description["services"].append(_create_service_name(component_name))
478
479             # If there are ports to be exposed on the k8s nodes, create a "NodePort" service
480             if len(exposed_ports) > 0:
481                 exposed_service = \
482                     _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, '', labels, "NodePort")
483                 core.create_namespaced_service(namespace, exposed_service)
484                 deployment_description["services"].append(_create_exposed_service_name(component_name))
485
486     except Exception as e:
487         # If the ClusterIP service was created, delete the service:
488         if cip_service_created:
489             core.delete_namespaced_service(_create_service_name(component_name), namespace)
490         # If the deployment was created but not the service, delete the deployment
491         if deployment_ok:
492             client.ExtensionsV1beta1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, client.V1DeleteOptions())
493         raise e
494
495     return dep, deployment_description
496
497 def undeploy(deployment_description):
498     _configure_api()
499
500     namespace = deployment_description["namespace"]
501
502     # remove any services associated with the component
503     for service in deployment_description["services"]:
504         client.CoreV1Api().delete_namespaced_service(service, namespace)
505
506     # Have k8s delete the underlying pods and replicaset when deleting the deployment.
507     options = client.V1DeleteOptions(propagation_policy="Foreground")
508     client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, options)
509
510 def is_available(namespace, component_name):
511     _configure_api()
512     dep_status = client.AppsV1beta1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace)
513     # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec
514     # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation
515     return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas
516
517 def scale(deployment_description, replicas):
518     ''' Trigger a scaling operation by updating the replica count for the Deployment '''
519
520     def update_replica_count(spec):
521         spec.spec.replicas = replicas
522         return spec
523
524     _patch_deployment(deployment_description["namespace"], deployment_description["deployment"], update_replica_count)
525
526 def upgrade(deployment_description, image, container_index = 0):
527     ''' Trigger a rolling upgrade by sending a new image name/tag to k8s '''
528
529     def update_image(spec):
530         spec.spec.template.spec.containers[container_index].image = image
531         return spec
532
533     _patch_deployment(deployment_description["namespace"], deployment_description["deployment"], update_image)
534
535 def rollback(deployment_description, rollback_to=0):
536     '''
537     Undo upgrade by rolling back to a previous revision of the deployment.
538     By default, go back one revision.
539     rollback_to can be used to supply a specific revision number.
540     Returns the image for the app container and the replica count from the rolled-back deployment
541     '''
542     '''
543     2018-07-13
544     Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method.
545     The k8s python client code throws an exception while processing the response from the API.
546     See:
547        - https://github.com/kubernetes-client/python/issues/491
548        - https://github.com/kubernetes/kubernetes/pull/63837
549     The fix has been merged into the master branch but is not in the latest release.
550     '''
551     _configure_api()
552     deployment = deployment_description["deployment"]
553     namespace = deployment_description["namespace"]
554
555     # Initiate the rollback
556     client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback(
557         deployment,
558         namespace,
559         client.AppsV1beta1DeploymentRollback(name=deployment, rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to)))
560
561     # Read back the spec for the rolled-back deployment
562     spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
563     return spec.spec.template.spec.containers[0].image, spec.spec.replicas
564
565 def execute_command_in_deployment(deployment_description, command):
566     '''
567     Enumerates the pods in the k8s deployment identified by "deployment_description",
568     then executes the command (represented as an argv-style list) in "command" in
569     container 0 (the main application container) each of those pods.
570
571     Note that the sets of pods associated with a deployment can change over time.  The
572     enumeration is a snapshot at one point in time.  The command will not be executed in
573     pods that are created after the initial enumeration.   If a pod disappears after the
574     initial enumeration and before the command is executed, the attempt to execute the
575     command will fail.  This is not treated as a fatal error.
576
577     This approach is reasonable for the one current use case for "execute_command":  running a
578     script to notify a container that its configuration has changed as a result of a
579     policy change.  In this use case, the new configuration information is stored into
580     the configuration store (Consul), the pods are enumerated, and the command is executed.
581     If a pod disappears after the enumeration, the fact that the command cannot be run
582     doesn't matter--a nonexistent pod doesn't need to be reconfigured.  Similarly, a pod that
583     comes up after the enumeration will get its initial configuration from the updated version
584     in Consul.
585
586     The optimal solution here would be for k8s to provide an API call to execute a command in
587     all of the pods for a deployment.   Unfortunately, k8s does not provide such a call--the
588     only call provided by k8s operates at the pod level, not the deployment level.
589
590     Another interesting k8s factoid: there's no direct way to list the pods belong to a
591     particular k8s deployment.   The deployment code above sets a label ("k8sdeployment") on
592     the pod that has the k8s deployment name.  To list the pods, the code below queries for
593     pods with the label carrying the deployment name.
594     '''
595
596     _configure_api()
597     deployment = deployment_description["deployment"]
598     namespace = deployment_description["namespace"]
599
600     # Get names of all the running pods belonging to the deployment
601     pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod(
602         namespace = namespace,
603         label_selector = "k8sdeployment={0}".format(deployment),
604         field_selector = "status.phase=Running"
605     ).items]
606
607     def do_execute(pod_name):
608         return _execute_command_in_pod(namespace, pod_name, command)
609
610     # Execute command in the running pods
611     return map(do_execute, pod_names)