Change kubeconfig file location
[dcaegen2/platform/plugins.git] / k8s / k8sclient / k8sclient.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #      http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
18 #
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
20 import os
21 import re
22 import uuid
23 from msb import msb
24 from kubernetes import config, client, stream
25
26 # Default values for readiness probe
27 PROBE_DEFAULT_PERIOD = 15
28 PROBE_DEFAULT_TIMEOUT = 1
29
30 # Location of k8s cluster config file ("kubeconfig")
31 K8S_CONFIG_PATH="/opt/onap/kube/kubeconfig"
32
33 # Regular expression for interval/timeout specification
34 INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$")
35 # Conversion factors to seconds
36 FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600}
37
38 # Regular expression for port mapping
39 # group 1: container port
40 # group 2: / + protocol
41 # group 3: protocol
42 # group 4: host port
43 PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$")
44
45 def _create_deployment_name(component_name):
46     return "dep-{0}".format(component_name)
47
48 def _create_service_name(component_name):
49     return "{0}".format(component_name)
50
51 def _create_exposed_service_name(component_name):
52     return ("x{0}".format(component_name))[:63]
53
54 def _configure_api(location=None):
55     # Look for a kubernetes config file
56     if os.path.exists(K8S_CONFIG_PATH):
57         config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False)
58     else:
59         # Maybe we're running in a k8s container and we can use info provided by k8s
60         # We would like to use:
61         # config.load_incluster_config()
62         # but this looks into os.environ for kubernetes host and port, and from
63         # the plugin those aren't visible.   So we use the InClusterConfigLoader class,
64         # where we can set the environment to what we like.
65         # This is probably brittle!  Maybe there's a better alternative.
66         localenv = {
67             config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local",
68             config.incluster_config.SERVICE_PORT_ENV_NAME : "443"
69         }
70         config.incluster_config.InClusterConfigLoader(
71             token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME,
72             cert_filename=config.incluster_config.SERVICE_CERT_FILENAME,
73             environ=localenv
74         ).load_and_set()
75
76 def _parse_interval(t):
77     """
78     Parse an interval specification
79     t can be
80        - a simple integer quantity, interpreted as seconds
81        - a string representation of a decimal integer, interpreted as seconds
82        - a string consisting of a represention of an decimal integer followed by a unit,
83          with "s" representing seconds, "m" representing minutes,
84          and "h" representing hours
85     Used for compatibility with the Docker plugin, where time intervals
86     for health checks were specified as strings with a number and a unit.
87     See 'intervalspec' above for the regular expression that's accepted.
88     """
89     m = INTERVAL_SPEC.match(str(t))
90     if m:
91         time = int(m.group(1)) * FACTORS[m.group(2)]
92     else:
93         raise ValueError("Bad interval specification: {0}".format(t))
94     return time
95
96 def _create_probe(hc, port, use_tls=False):
97     ''' Create a Kubernetes probe based on info in the health check dictionary hc '''
98     probe_type = hc['type']
99     probe = None
100     period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD))
101     timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT))
102     if probe_type in ['http', 'https']:
103         probe = client.V1Probe(
104           failure_threshold = 1,
105           initial_delay_seconds = 5,
106           period_seconds = period,
107           timeout_seconds = timeout,
108           http_get = client.V1HTTPGetAction(
109               path = hc['endpoint'],
110               port = port,
111               scheme = probe_type.upper()
112           )
113         )
114     elif probe_type in ['script', 'docker']:
115         probe = client.V1Probe(
116           failure_threshold = 1,
117           initial_delay_seconds = 5,
118           period_seconds = period,
119           timeout_seconds = timeout,
120           _exec = client.V1ExecAction(
121               command = hc['script'].split( )
122           )
123         )
124     return probe
125
126 def _create_resources(resources=None):
127     if resources is not None:
128         resources_obj = client.V1ResourceRequirements(
129           limits = resources.get("limits"),
130           requests = resources.get("requests")
131         )
132         return resources_obj
133     else:
134         return None
135
136 def _create_container_object(name, image, always_pull, use_tls=False, env={}, container_ports=[], volume_mounts = [], resources = None, readiness = None, liveness = None):
137     # Set up environment variables
138     # Copy any passed in environment variables
139     env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env.keys()]
140     # Add POD_IP with the IP address of the pod running the container
141     pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP"))
142     env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip))
143
144     # If a health check is specified, create a readiness/liveness probe
145     # (For an HTTP-based check, we assume it's at the first container port)
146     probe = None
147     live_probe = None
148
149     if readiness:
150         hc_port = None
151         if len(container_ports) > 0:
152             (hc_port, proto) = container_ports[0]
153         probe = _create_probe(readiness, hc_port, use_tls)
154     if liveness:
155         hc_port = None
156         if len(container_ports) > 0:
157             (hc_port, proto) = container_ports[0]
158         live_probe = _create_probe(liveness, hc_port, use_tls)
159
160     if resources:
161         resources_obj = _create_resources(resources)
162     else:
163         resources_obj = None
164     # Define container for pod
165     return client.V1Container(
166         name=name,
167         image=image,
168         image_pull_policy='Always' if always_pull else 'IfNotPresent',
169         env=env_vars,
170         ports=[client.V1ContainerPort(container_port=p, protocol=proto) for (p, proto) in container_ports],
171         volume_mounts = volume_mounts,
172         resources = resources_obj,
173         readiness_probe = probe,
174         liveness_probe = live_probe
175     )
176
177 def _create_deployment_object(component_name,
178                               containers,
179                               init_containers,
180                               replicas,
181                               volumes,
182                               labels={},
183                               pull_secrets=[]):
184
185     deployment_name = _create_deployment_name(component_name)
186
187     # Label the pod with the deployment name, so we can find it easily
188     labels.update({"k8sdeployment" : deployment_name})
189
190     # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
191     # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
192     ips = []
193     for secret in pull_secrets:
194         ips.append(client.V1LocalObjectReference(name=secret))
195
196     # Define pod template
197     template = client.V1PodTemplateSpec(
198         metadata=client.V1ObjectMeta(labels=labels),
199         spec=client.V1PodSpec(hostname=component_name,
200                               containers=containers,
201                               init_containers=init_containers,
202                               volumes=volumes,
203                               image_pull_secrets=ips)
204     )
205
206     # Define deployment spec
207     spec = client.ExtensionsV1beta1DeploymentSpec(
208         replicas=replicas,
209         template=template
210     )
211
212     # Create deployment object
213     deployment = client.ExtensionsV1beta1Deployment(
214         kind="Deployment",
215         metadata=client.V1ObjectMeta(name=deployment_name),
216         spec=spec
217     )
218
219     return deployment
220
221 def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type):
222     service_spec = client.V1ServiceSpec(
223         ports=service_ports,
224         selector={"app" : component_name},
225         type=service_type
226     )
227     if annotations:
228         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations)
229     else:
230         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels)
231
232     service = client.V1Service(
233         kind="Service",
234         api_version="v1",
235         metadata=metadata,
236         spec=service_spec
237     )
238     return service
239
240 def _parse_ports(port_list):
241     '''
242     Parse the port list into a list of container ports (needed to create the container)
243     and to a set of port mappings to set up k8s services.
244     '''
245     container_ports = []
246     port_map = {}
247     for p in port_list:
248         m = PORTS.match(p.strip())
249         if m:
250             cport = int(m.group(1))
251             hport = int (m.group(4))
252             if m.group(3):
253                 proto = (m.group(3)).upper()
254             else:
255                 proto = "TCP"
256             container_ports.append((cport, proto))
257             port_map[(cport, proto)] = hport
258         else:
259             raise ValueError("Bad port specification: {0}".format(p))
260
261     return container_ports, port_map
262
263 def _parse_volumes(volume_list):
264     volumes = []
265     volume_mounts = []
266     for v in volume_list:
267         vname = str(uuid.uuid4())
268         vhost = v['host']['path']
269         vcontainer = v['container']['bind']
270         vro = (v['container'].get('mode') == 'ro')
271         volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
272         volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro))
273
274     return volumes, volume_mounts
275
276 def _service_exists(location, namespace, component_name):
277     exists = False
278     try:
279         _configure_api(location)
280         client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace)
281         exists = True
282     except client.rest.ApiException:
283         pass
284
285     return exists
286
287 def _patch_deployment(location, namespace, deployment, modify):
288     '''
289     Gets the current spec for 'deployment' in 'namespace'
290     in the k8s cluster at 'location',
291     uses the 'modify' function to change the spec,
292     then sends the updated spec to k8s.
293     '''
294     _configure_api(location)
295
296     # Get deployment spec
297     spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
298
299     # Apply changes to spec
300     spec = modify(spec)
301
302     # Patch the deploy with updated spec
303     client.ExtensionsV1beta1Api().patch_namespaced_deployment(deployment, namespace, spec)
304
305 def _execute_command_in_pod(location, namespace, pod_name, command):
306     '''
307     Execute the command (specified by an argv-style list in  the "command" parameter) in
308     the specified pod in the specified namespace at the specified location.
309     For now at least, we use this only to
310     run a notification script in a pod after a configuration change.
311
312     The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec".
313     Unfortunately due to a bug/limitation in the Python client library, we can't call it directly.
314     We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API.
315     I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py.
316     There are several issues tracking this, in various states.  It isn't clear that there will ever
317     be a fix.
318         - https://github.com/kubernetes-client/python/issues/58
319         - https://github.com/kubernetes-client/python/issues/409
320         - https://github.com/kubernetes-client/python/issues/526
321
322     The main consequence of the workaround using "stream" is that the caller does not get an indication
323     of the exit code returned by the command when it completes execution.   It turns out that the
324     original implementation of notification in the Docker plugin did not use this result, so we can
325     still match the original notification functionality.
326
327     The "stream" approach returns a string containing any output sent by the command to stdout or stderr.
328     We'll return that so it can logged.
329     '''
330     _configure_api(location)
331     try:
332         output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec,
333                              name=pod_name,
334                              namespace=namespace,
335                              command=command,
336                              stdout=True,
337                              stderr=True,
338                             stdin=False,
339                             tty=False)
340     except client.rest.ApiException as e:
341         # If the exception indicates the pod wasn't found,  it's not a fatal error.
342         # It existed when we enumerated the pods for the deployment but no longer exists.
343         # Unfortunately, the only way to distinguish a pod not found from any other error
344         # is by looking at the reason text.
345         # (The ApiException's "status" field should contain the HTTP status code, which would
346         # be 404 if the pod isn't found, but empirical testing reveals that "status" is set
347         # to zero.)
348         if "404 not found" in e.reason.lower():
349             output = "Pod not found"
350         else:
351             raise e
352
353     return {"pod" : pod_name, "output" : output}
354
355 def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, resources, **kwargs):
356     '''
357     This will create a k8s Deployment and, if needed, one or two k8s Services.
358     (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
359     We're not exposing k8s to the component developer and the blueprint author.
360     This is a conscious choice.  We want to use k8s in a controlled, consistent way, and we want to hide
361     the details from the component developer and the blueprint author.)
362
363     namespace:  the Kubernetes namespace into which the component is deployed
364     component_name:  the component name, used to derive names of Kubernetes entities
365     image: the docker image for the component being deployed
366     replica: the number of instances of the component to be deployed
367     always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of
368        the Docker image for the component, even if it is already present on the Kubernetes node.
369     k8sconfig contains:
370         - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images.
371           (DON'T PANIC:  these are just the names of secrets held in the Kubernetes secret store.)
372         - filebeat: a dictionary of filebeat sidecar parameters:
373             "log_path" : mount point for log volume in filebeat container
374             "data_path" : mount point for data volume in filebeat container
375             "config_path" : mount point for config volume in filebeat container
376             "config_subpath" :  subpath for config data in filebeat container
377             "config_map" : ConfigMap holding the filebeat configuration
378             "image": Docker image to use for filebeat
379         - tls: a dictionary of TLS init container parameters:
380             "cert_path": mount point for certificate volume in init container
381             "image": Docker image to use for TLS init container
382     kwargs may have:
383         - volumes:  array of volume objects, where a volume object is:
384             {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
385         - ports: array of strings in the form "container_port:host_port"
386         - env: map of name-value pairs ( {name0: value0, name1: value1...}
387         - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
388         - log_info: an object with info for setting up ELK logging, with the form:
389             {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}
390         - tls_info: an object with info for setting up TLS (HTTPS), with the form:
391             {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" }
392         - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"}
393             These label will be set on all the pods deployed as a result of this deploy() invocation.
394         - readiness: dict with health check info; if present, used to create a readiness probe for the main container.  Includes:
395             - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
396             - interval: period (in seconds) between probes
397             - timeout:  time (in seconds) to allow a probe to complete
398             - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types
399             - path: the full path to the script to be executed in the container for "script" and "docker" types
400         - liveness: dict with health check info; if present, used to create a liveness probe for the main container.  Includes:
401             - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
402             - interval: period (in seconds) between probes
403             - timeout:  time (in seconds) to allow a probe to complete
404             - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types
405             - path: the full path to the script to be executed in the container for "script" and "docker" types
406         - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
407
408
409     '''
410
411     deployment_ok = False
412     cip_service_created = False
413     deployment_description = {
414         "namespace": namespace,
415         "location" : kwargs.get("k8s_location"),
416         "deployment": '',
417         "services": []
418     }
419
420     try:
421
422         # Get API handles
423         _configure_api(kwargs.get("k8s_location"))
424         core = client.CoreV1Api()
425         ext = client.ExtensionsV1beta1Api()
426
427         # Parse the port mapping
428         container_ports, port_map = _parse_ports(kwargs.get("ports", []))
429
430         # Parse the volumes list into volumes and volume_mounts for the deployment
431         volumes, volume_mounts = _parse_volumes(kwargs.get("volumes",[]))
432
433         # Initialize the list of containers that will be part of the pod
434         containers = []
435         init_containers = []
436
437         # Set up the ELK logging sidecar container, if needed
438         log_info = kwargs.get("log_info")
439         if log_info and "log_directory" in log_info:
440             log_dir = log_info["log_directory"]
441             fb = k8sconfig["filebeat"]
442             sidecar_volume_mounts = []
443
444             # Create the volume for component log files and volume mounts for the component and sidecar containers
445             volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
446             volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
447             sc_path = log_info["alternate_fb_path"] if "alternate_fb_path" in log_info  \
448                 else "{0}/{1}".format(fb["log_path"], component_name)
449             sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
450
451             # Create the volume for sidecar data and the volume mount for it
452             volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
453             sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=fb["data_path"]))
454
455             # Create the container for the sidecar
456             containers.append(_create_container_object("filebeat", fb["image"], False, False, {}, [], sidecar_volume_mounts))
457
458             # Create the volume for the sidecar configuration data and the volume mount for it
459             # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
460             volumes.append(
461                 client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=fb["config_map"])))
462             sidecar_volume_mounts.append(
463                 client.V1VolumeMount(name="filebeat-conf", mount_path=fb["config_path"], sub_path=fb["config_subpath"]))
464
465         # Set up the TLS init container, if needed
466         tls_info = kwargs.get("tls_info")
467         use_tls = False
468         if tls_info and "use_tls" in tls_info and tls_info["use_tls"]:
469             if "cert_directory" in tls_info and len(tls_info["cert_directory"]) > 0:
470                 use_tls = True
471                 tls_config = k8sconfig["tls"]
472
473                 # Create the certificate volume and volume mounts
474                 volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource()))
475                 volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=tls_info["cert_directory"]))
476                 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])]
477
478                 # Create the init container
479                 init_containers.append(_create_container_object("init-tls", tls_config["image"], False, False, {}, [], init_volume_mounts))
480
481         # Create the container for the component
482         # Make it the first container in the pod
483         containers.insert(0, _create_container_object(component_name, image, always_pull, use_tls, kwargs.get("env", {}), container_ports, volume_mounts, resources, kwargs["readiness"], kwargs.get("liveness")))
484
485         # Build the k8s Deployment object
486         labels = kwargs.get("labels", {})
487         labels.update({"app": component_name})
488         dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"])
489
490         # Have k8s deploy it
491         ext.create_namespaced_deployment(namespace, dep)
492         deployment_ok = True
493         deployment_description["deployment"] = _create_deployment_name(component_name)
494
495         # Create service(s), if a port mapping is specified
496         if port_map:
497             service_ports = []      # Ports exposed internally on the k8s network
498             exposed_ports = []      # Ports to be mapped to ports on the k8s nodes via NodePort
499             for (cport, proto), hport in port_map.iteritems():
500                 service_ports.append(client.V1ServicePort(port=int(cport),protocol=proto,name="port-{0}-{1}".format(proto[0].lower(), cport)))
501                 if int(hport) != 0:
502                     exposed_ports.append(client.V1ServicePort(port=int(cport),protocol=proto,node_port=int(hport),name="xport-{0}-{1}".format(proto[0].lower(),cport)))
503
504             # If there are ports to be exposed via MSB, set up the annotation for the service
505             msb_list = kwargs.get("msb_list")
506             annotations = msb.create_msb_annotation(msb_list) if msb_list else ''
507
508             # Create a ClusterIP service for access via the k8s network
509             service = _create_service_object(_create_service_name(component_name), component_name, service_ports, annotations, labels, "ClusterIP")
510             core.create_namespaced_service(namespace, service)
511             cip_service_created = True
512             deployment_description["services"].append(_create_service_name(component_name))
513
514             # If there are ports to be exposed on the k8s nodes, create a "NodePort" service
515             if len(exposed_ports) > 0:
516                 exposed_service = \
517                     _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, '', labels, "NodePort")
518                 core.create_namespaced_service(namespace, exposed_service)
519                 deployment_description["services"].append(_create_exposed_service_name(component_name))
520
521     except Exception as e:
522         # If the ClusterIP service was created, delete the service:
523         if cip_service_created:
524             core.delete_namespaced_service(_create_service_name(component_name), namespace)
525         # If the deployment was created but not the service, delete the deployment
526         if deployment_ok:
527             client.ExtensionsV1beta1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, client.V1DeleteOptions())
528         raise e
529
530     return dep, deployment_description
531
532 def undeploy(deployment_description):
533     _configure_api(deployment_description["location"])
534
535     namespace = deployment_description["namespace"]
536
537     # remove any services associated with the component
538     for service in deployment_description["services"]:
539         client.CoreV1Api().delete_namespaced_service(service, namespace)
540
541     # Have k8s delete the underlying pods and replicaset when deleting the deployment.
542     options = client.V1DeleteOptions(propagation_policy="Foreground")
543     client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, options)
544
545 def is_available(location, namespace, component_name):
546     _configure_api(location)
547     dep_status = client.AppsV1beta1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace)
548     # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec
549     # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation
550     return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas
551
552 def scale(deployment_description, replicas):
553     ''' Trigger a scaling operation by updating the replica count for the Deployment '''
554
555     def update_replica_count(spec):
556         spec.spec.replicas = replicas
557         return spec
558
559     _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_replica_count)
560
561 def upgrade(deployment_description, image, container_index = 0):
562     ''' Trigger a rolling upgrade by sending a new image name/tag to k8s '''
563
564     def update_image(spec):
565         spec.spec.template.spec.containers[container_index].image = image
566         return spec
567
568     _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_image)
569
570 def rollback(deployment_description, rollback_to=0):
571     '''
572     Undo upgrade by rolling back to a previous revision of the deployment.
573     By default, go back one revision.
574     rollback_to can be used to supply a specific revision number.
575     Returns the image for the app container and the replica count from the rolled-back deployment
576     '''
577     '''
578     2018-07-13
579     Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method.
580     The k8s python client code throws an exception while processing the response from the API.
581     See:
582        - https://github.com/kubernetes-client/python/issues/491
583        - https://github.com/kubernetes/kubernetes/pull/63837
584     The fix has been merged into the master branch but is not in the latest release.
585     '''
586     _configure_api(deployment_description["location"])
587     deployment = deployment_description["deployment"]
588     namespace = deployment_description["namespace"]
589
590     # Initiate the rollback
591     client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback(
592         deployment,
593         namespace,
594         client.AppsV1beta1DeploymentRollback(name=deployment, rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to)))
595
596     # Read back the spec for the rolled-back deployment
597     spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
598     return spec.spec.template.spec.containers[0].image, spec.spec.replicas
599
600 def execute_command_in_deployment(deployment_description, command):
601     '''
602     Enumerates the pods in the k8s deployment identified by "deployment_description",
603     then executes the command (represented as an argv-style list) in "command" in
604     container 0 (the main application container) each of those pods.
605
606     Note that the sets of pods associated with a deployment can change over time.  The
607     enumeration is a snapshot at one point in time.  The command will not be executed in
608     pods that are created after the initial enumeration.   If a pod disappears after the
609     initial enumeration and before the command is executed, the attempt to execute the
610     command will fail.  This is not treated as a fatal error.
611
612     This approach is reasonable for the one current use case for "execute_command":  running a
613     script to notify a container that its configuration has changed as a result of a
614     policy change.  In this use case, the new configuration information is stored into
615     the configuration store (Consul), the pods are enumerated, and the command is executed.
616     If a pod disappears after the enumeration, the fact that the command cannot be run
617     doesn't matter--a nonexistent pod doesn't need to be reconfigured.  Similarly, a pod that
618     comes up after the enumeration will get its initial configuration from the updated version
619     in Consul.
620
621     The optimal solution here would be for k8s to provide an API call to execute a command in
622     all of the pods for a deployment.   Unfortunately, k8s does not provide such a call--the
623     only call provided by k8s operates at the pod level, not the deployment level.
624
625     Another interesting k8s factoid: there's no direct way to list the pods belong to a
626     particular k8s deployment.   The deployment code above sets a label ("k8sdeployment") on
627     the pod that has the k8s deployment name.  To list the pods, the code below queries for
628     pods with the label carrying the deployment name.
629     '''
630     location = deployment_description["location"]
631     _configure_api(location)
632     deployment = deployment_description["deployment"]
633     namespace = deployment_description["namespace"]
634
635     # Get names of all the running pods belonging to the deployment
636     pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod(
637         namespace = namespace,
638         label_selector = "k8sdeployment={0}".format(deployment),
639         field_selector = "status.phase=Running"
640     ).items]
641
642     def do_execute(pod_name):
643         return _execute_command_in_pod(location, namespace, pod_name, command)
644
645     # Execute command in the running pods
646     return map(do_execute, pod_names)