e73d96a533bd8992971eb85ddea965381f25a8b2
[dcaegen2/platform/plugins.git] / k8s / k8sclient / k8sclient.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 #      http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
19 #
20 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 import os
22 import re
23 import uuid
24 from kubernetes import config, client, stream
25
26 # Default values for readiness probe
27 PROBE_DEFAULT_PERIOD = 15
28 PROBE_DEFAULT_TIMEOUT = 1
29
30 # Location of k8s cluster config file ("kubeconfig")
31 K8S_CONFIG_PATH="/opt/onap/kube/kubeconfig"
32
33 # Regular expression for interval/timeout specification
34 INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$")
35 # Conversion factors to seconds
36 FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600}
37
38 # Regular expression for port mapping
39 # group 1: container port
40 # group 2: / + protocol
41 # group 3: protocol
42 # group 4: host port
43 PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$")
44
45 def _create_deployment_name(component_name):
46     return "dep-{0}".format(component_name)[:63]
47
48 def _create_service_name(component_name):
49     return "{0}".format(component_name)[:63]
50
51 def _create_exposed_service_name(component_name):
52     return ("x{0}".format(component_name))[:63]
53
54 def _configure_api(location=None):
55     # Look for a kubernetes config file
56     if os.path.exists(K8S_CONFIG_PATH):
57         config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False)
58     else:
59         # Maybe we're running in a k8s container and we can use info provided by k8s
60         # We would like to use:
61         # config.load_incluster_config()
62         # but this looks into os.environ for kubernetes host and port, and from
63         # the plugin those aren't visible.   So we use the InClusterConfigLoader class,
64         # where we can set the environment to what we like.
65         # This is probably brittle!  Maybe there's a better alternative.
66         localenv = {
67             config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local",
68             config.incluster_config.SERVICE_PORT_ENV_NAME : "443"
69         }
70         config.incluster_config.InClusterConfigLoader(
71             token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME,
72             cert_filename=config.incluster_config.SERVICE_CERT_FILENAME,
73             environ=localenv
74         ).load_and_set()
75
76 def _parse_interval(t):
77     """
78     Parse an interval specification
79     t can be
80        - a simple integer quantity, interpreted as seconds
81        - a string representation of a decimal integer, interpreted as seconds
82        - a string consisting of a represention of an decimal integer followed by a unit,
83          with "s" representing seconds, "m" representing minutes,
84          and "h" representing hours
85     Used for compatibility with the Docker plugin, where time intervals
86     for health checks were specified as strings with a number and a unit.
87     See 'intervalspec' above for the regular expression that's accepted.
88     """
89     m = INTERVAL_SPEC.match(str(t))
90     if m:
91         time = int(m.group(1)) * FACTORS[m.group(2)]
92     else:
93         raise ValueError("Bad interval specification: {0}".format(t))
94     return time
95
96 def _create_probe(hc, port):
97     ''' Create a Kubernetes probe based on info in the health check dictionary hc '''
98     probe_type = hc['type']
99     probe = None
100     period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD))
101     timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT))
102     if probe_type in ['http', 'https']:
103         probe = client.V1Probe(
104           failure_threshold = 1,
105           initial_delay_seconds = 5,
106           period_seconds = period,
107           timeout_seconds = timeout,
108           http_get = client.V1HTTPGetAction(
109               path = hc['endpoint'],
110               port = port,
111               scheme = probe_type.upper()
112           )
113         )
114     elif probe_type in ['script', 'docker']:
115         probe = client.V1Probe(
116           failure_threshold = 1,
117           initial_delay_seconds = 5,
118           period_seconds = period,
119           timeout_seconds = timeout,
120           _exec = client.V1ExecAction(
121               command = hc['script'].split( )
122           )
123         )
124     return probe
125
126 def _create_resources(resources=None):
127     if resources is not None:
128         resources_obj = client.V1ResourceRequirements(
129           limits = resources.get("limits"),
130           requests = resources.get("requests")
131         )
132         return resources_obj
133     else:
134         return None
135
136 def _create_container_object(name, image, always_pull, **kwargs):
137     # Set up environment variables
138     # Copy any passed in environment variables
139     env = kwargs.get('env') or {}
140     env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env]
141     # Add POD_IP with the IP address of the pod running the container
142     pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP"))
143     env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip))
144
145     # If a health check is specified, create a readiness/liveness probe
146     # (For an HTTP-based check, we assume it's at the first container port)
147     readiness = kwargs.get('readiness')
148     liveness = kwargs.get('liveness')
149     resources = kwargs.get('resources')
150     container_ports = kwargs.get('container_ports') or []
151
152     hc_port = container_ports[0][0] if container_ports else None
153     probe = _create_probe(readiness, hc_port) if readiness else None
154     live_probe = _create_probe(liveness, hc_port) if liveness else None
155     resources_obj = _create_resources(resources) if resources else None
156     port_objs = [client.V1ContainerPort(container_port=port, protocol=proto)
157                  for port, proto in container_ports]
158
159     # Define container for pod
160     return client.V1Container(
161         name=name,
162         image=image,
163         image_pull_policy='Always' if always_pull else 'IfNotPresent',
164         env=env_vars,
165         ports=port_objs,
166         volume_mounts=kwargs.get('volume_mounts') or [],
167         resources=resources_obj,
168         readiness_probe=probe,
169         liveness_probe=live_probe
170     )
171
172 def _create_deployment_object(component_name,
173                               containers,
174                               init_containers,
175                               replicas,
176                               volumes,
177                               labels={},
178                               pull_secrets=[]):
179
180     deployment_name = _create_deployment_name(component_name)
181
182     # Label the pod with the deployment name, so we can find it easily
183     labels.update({"k8sdeployment" : deployment_name})
184
185     # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
186     # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
187     ips = []
188     for secret in pull_secrets:
189         ips.append(client.V1LocalObjectReference(name=secret))
190
191     # Define pod template
192     template = client.V1PodTemplateSpec(
193         metadata=client.V1ObjectMeta(labels=labels),
194         spec=client.V1PodSpec(hostname=component_name,
195                               containers=containers,
196                               init_containers=init_containers,
197                               volumes=volumes,
198                               image_pull_secrets=ips)
199     )
200
201     # Define deployment spec
202     spec = client.ExtensionsV1beta1DeploymentSpec(
203         replicas=replicas,
204         template=template
205     )
206
207     # Create deployment object
208     deployment = client.ExtensionsV1beta1Deployment(
209         kind="Deployment",
210         metadata=client.V1ObjectMeta(name=deployment_name),
211         spec=spec
212     )
213
214     return deployment
215
216 def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type):
217     service_spec = client.V1ServiceSpec(
218         ports=service_ports,
219         selector={"app" : component_name},
220         type=service_type
221     )
222     if annotations:
223         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations)
224     else:
225         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels)
226
227     service = client.V1Service(
228         kind="Service",
229         api_version="v1",
230         metadata=metadata,
231         spec=service_spec
232     )
233     return service
234
235 def parse_ports(port_list):
236     '''
237     Parse the port list into a list of container ports (needed to create the container)
238     and to a set of port mappings to set up k8s services.
239     '''
240     container_ports = []
241     port_map = {}
242     for p in port_list:
243         m = PORTS.match(p.strip())
244         if m:
245             cport = int(m.group(1))
246             hport = int (m.group(4))
247             if m.group(3):
248                 proto = (m.group(3)).upper()
249             else:
250                 proto = "TCP"
251             container_ports.append((cport, proto))
252             port_map[(cport, proto)] = hport
253         else:
254             raise ValueError("Bad port specification: {0}".format(p))
255
256     return container_ports, port_map
257
258 def _parse_volumes(volume_list):
259     volumes = []
260     volume_mounts = []
261     for v in volume_list:
262         vname = str(uuid.uuid4())
263         vhost = v['host']['path']
264         vcontainer = v['container']['bind']
265         vro = (v['container'].get('mode') == 'ro')
266         volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
267         volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro))
268
269     return volumes, volume_mounts
270
271 def _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, log_info, filebeat):
272     if not log_info or not filebeat:
273         return
274     log_dir = log_info.get("log_directory")
275     if not log_dir:
276         return
277     sidecar_volume_mounts = []
278
279     # Create the volume for component log files and volume mounts for the component and sidecar containers
280     volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
281     volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
282     sc_path = log_info.get("alternate_fb_path") or "{0}/{1}".format(filebeat["log_path"], component_name)
283     sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
284
285     # Create the volume for sidecar data and the volume mount for it
286     volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
287     sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=filebeat["data_path"]))
288
289     # Create the volume for the sidecar configuration data and the volume mount for it
290     # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
291     volumes.append(
292         client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=filebeat["config_map"])))
293     sidecar_volume_mounts.append(
294         client.V1VolumeMount(name="filebeat-conf", mount_path=filebeat["config_path"], sub_path=filebeat["config_subpath"]))
295
296     # Finally create the container for the sidecar
297     containers.append(_create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts))
298
299 def _add_tls_init_container(init_containers, volumes, volume_mounts, tls_info, tls_config):
300     #   Adds an InitContainer to the pod to set up TLS certificate information.  For components that act as a
301     #   server(tls_info["use_tls"] is True), the InitContainer will populate a directory with server and CA certificate
302     #   materials in various formats.   For other components (tls_info["use_tls"] is False, or tls_info is not specified),
303     #   the InitContainer will populate a directory with CA certificate materials in PEM and JKS formats.
304     #   In either case, the certificate directory is mounted onto the component container filesystem at the location
305     #   specified by tls_info["component_cert_dir"], if present, otherwise at the configured default mount point
306     #   (tls_config["component_cert_dir"]).
307
308     cert_directory = tls_info.get("cert_directory") or tls_config.get("component_cert_dir")
309     env = {}
310     env["TLS_SERVER"] = "true" if tls_info.get("use_tls") else "false"
311
312     # Create the certificate volume and volume mounts
313     volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource()))
314     volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=cert_directory))
315     init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])]
316
317     # Create the init container
318     init_containers.append(_create_container_object("init-tls", tls_config["image"], False, volume_mounts=init_volume_mounts, env=env))
319
320 def _process_port_map(port_map):
321     service_ports = []      # Ports exposed internally on the k8s network
322     exposed_ports = []      # Ports to be mapped to ports on the k8s nodes via NodePort
323     for (cport, proto), hport in port_map.items():
324         name = "xport-{0}-{1}".format(proto[0].lower(), cport)
325         cport = int(cport)
326         hport = int(hport)
327         service_ports.append(client.V1ServicePort(port=cport, protocol=proto, name=name[1:]))
328         if hport != 0:
329             exposed_ports.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name))
330     return service_ports, exposed_ports
331
332 def _service_exists(location, namespace, component_name):
333     exists = False
334     try:
335         _configure_api(location)
336         client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace)
337         exists = True
338     except client.rest.ApiException:
339         pass
340
341     return exists
342
343 def _patch_deployment(location, namespace, deployment, modify):
344     '''
345     Gets the current spec for 'deployment' in 'namespace'
346     in the k8s cluster at 'location',
347     uses the 'modify' function to change the spec,
348     then sends the updated spec to k8s.
349     '''
350     _configure_api(location)
351
352     # Get deployment spec
353     spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
354
355     # Apply changes to spec
356     spec = modify(spec)
357
358     # Patch the deploy with updated spec
359     client.ExtensionsV1beta1Api().patch_namespaced_deployment(deployment, namespace, spec)
360
361 def _execute_command_in_pod(location, namespace, pod_name, command):
362     '''
363     Execute the command (specified by an argv-style list in  the "command" parameter) in
364     the specified pod in the specified namespace at the specified location.
365     For now at least, we use this only to
366     run a notification script in a pod after a configuration change.
367
368     The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec".
369     Unfortunately due to a bug/limitation in the Python client library, we can't call it directly.
370     We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API.
371     I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py.
372     There are several issues tracking this, in various states.  It isn't clear that there will ever
373     be a fix.
374         - https://github.com/kubernetes-client/python/issues/58
375         - https://github.com/kubernetes-client/python/issues/409
376         - https://github.com/kubernetes-client/python/issues/526
377
378     The main consequence of the workaround using "stream" is that the caller does not get an indication
379     of the exit code returned by the command when it completes execution.   It turns out that the
380     original implementation of notification in the Docker plugin did not use this result, so we can
381     still match the original notification functionality.
382
383     The "stream" approach returns a string containing any output sent by the command to stdout or stderr.
384     We'll return that so it can logged.
385     '''
386     _configure_api(location)
387     try:
388         output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec,
389                              name=pod_name,
390                              namespace=namespace,
391                              command=command,
392                              stdout=True,
393                              stderr=True,
394                             stdin=False,
395                             tty=False)
396     except client.rest.ApiException as e:
397         # If the exception indicates the pod wasn't found,  it's not a fatal error.
398         # It existed when we enumerated the pods for the deployment but no longer exists.
399         # Unfortunately, the only way to distinguish a pod not found from any other error
400         # is by looking at the reason text.
401         # (The ApiException's "status" field should contain the HTTP status code, which would
402         # be 404 if the pod isn't found, but empirical testing reveals that "status" is set
403         # to zero.)
404         if "404 not found" in e.reason.lower():
405             output = "Pod not found"
406         else:
407             raise e
408
409     return {"pod" : pod_name, "output" : output}
410
411 def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs):
412     '''
413     This will create a k8s Deployment and, if needed, one or two k8s Services.
414     (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
415     We're not exposing k8s to the component developer and the blueprint author.
416     This is a conscious choice.  We want to use k8s in a controlled, consistent way, and we want to hide
417     the details from the component developer and the blueprint author.)
418
419     namespace:  the Kubernetes namespace into which the component is deployed
420     component_name:  the component name, used to derive names of Kubernetes entities
421     image: the docker image for the component being deployed
422     replica: the number of instances of the component to be deployed
423     always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of
424        the Docker image for the component, even if it is already present on the Kubernetes node.
425     k8sconfig contains:
426         - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images.
427           (DON'T PANIC:  these are just the names of secrets held in the Kubernetes secret store.)
428         - filebeat: a dictionary of filebeat sidecar parameters:
429             "log_path" : mount point for log volume in filebeat container
430             "data_path" : mount point for data volume in filebeat container
431             "config_path" : mount point for config volume in filebeat container
432             "config_subpath" :  subpath for config data in filebeat container
433             "config_map" : ConfigMap holding the filebeat configuration
434             "image": Docker image to use for filebeat
435         - tls: a dictionary of TLS-related information:
436             "cert_path": mount point for certificate volume in init container
437             "image": Docker image to use for TLS init container
438             "component_cert_dir" : default mount point for certs
439     kwargs may have:
440         - volumes:  array of volume objects, where a volume object is:
441             {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
442         - ports: array of strings in the form "container_port:host_port"
443         - env: map of name-value pairs ( {name0: value0, name1: value1...}
444         - log_info: an object with info for setting up ELK logging, with the form:
445             {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}
446         - tls_info: an object with info for setting up TLS (HTTPS), with the form:
447             {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" }
448         - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"}
449             These label will be set on all the pods deployed as a result of this deploy() invocation.
450         - resources: dict with optional "limits" and "requests" resource requirements, each a dict containing:
451             - cpu:    number CPU usage, like 0.5
452             - memory: string memory requirement, like "2Gi"
453         - readiness: dict with health check info; if present, used to create a readiness probe for the main container.  Includes:
454             - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
455             - interval: period (in seconds) between probes
456             - timeout:  time (in seconds) to allow a probe to complete
457             - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types
458             - path: the full path to the script to be executed in the container for "script" and "docker" types
459         - liveness: dict with health check info; if present, used to create a liveness probe for the main container.  Includes:
460             - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
461             - interval: period (in seconds) between probes
462             - timeout:  time (in seconds) to allow a probe to complete
463             - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types
464             - path: the full path to the script to be executed in the container for "script" and "docker" types
465         - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
466
467     '''
468
469     deployment_ok = False
470     cip_service_created = False
471     deployment_description = {
472         "namespace": namespace,
473         "location" : kwargs.get("k8s_location"),
474         "deployment": '',
475         "services": []
476     }
477
478     try:
479
480         # Get API handles
481         _configure_api(kwargs.get("k8s_location"))
482         core = client.CoreV1Api()
483         ext = client.ExtensionsV1beta1Api()
484
485         # Parse the port mapping
486         container_ports, port_map = parse_ports(kwargs.get("ports", []))
487
488         # Parse the volumes list into volumes and volume_mounts for the deployment
489         volumes, volume_mounts = _parse_volumes(kwargs.get("volumes", []))
490
491         # Initialize the list of containers that will be part of the pod
492         containers = []
493         init_containers = []
494
495         # Set up the ELK logging sidecar container, if needed
496         _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"), k8sconfig.get("filebeat"))
497
498         # Set up TLS information
499         _add_tls_init_container(init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {}, k8sconfig.get("tls"))
500
501         # Create the container for the component
502         # Make it the first container in the pod
503         container_args = {key: kwargs.get(key) for key in ("env", "readiness", "liveness", "resources")}
504         container_args['container_ports'] = container_ports
505         container_args['volume_mounts'] = volume_mounts
506         containers.insert(0, _create_container_object(component_name, image, always_pull, **container_args))
507
508         # Build the k8s Deployment object
509         labels = kwargs.get("labels", {})
510         labels["app"] = component_name
511         dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"])
512
513         # Have k8s deploy it
514         ext.create_namespaced_deployment(namespace, dep)
515         deployment_ok = True
516         deployment_description["deployment"] = _create_deployment_name(component_name)
517
518         # Create service(s), if a port mapping is specified
519         if port_map:
520             service_ports, exposed_ports = _process_port_map(port_map)
521
522             # Create a ClusterIP service for access via the k8s network
523             service = _create_service_object(_create_service_name(component_name), component_name, service_ports, None, labels, "ClusterIP")
524             core.create_namespaced_service(namespace, service)
525             cip_service_created = True
526             deployment_description["services"].append(_create_service_name(component_name))
527
528             # If there are ports to be exposed on the k8s nodes, create a "NodePort" service
529             if exposed_ports:
530                 exposed_service = \
531                     _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, '', labels, "NodePort")
532                 core.create_namespaced_service(namespace, exposed_service)
533                 deployment_description["services"].append(_create_exposed_service_name(component_name))
534
535     except Exception as e:
536         # If the ClusterIP service was created, delete the service:
537         if cip_service_created:
538             core.delete_namespaced_service(_create_service_name(component_name), namespace)
539         # If the deployment was created but not the service, delete the deployment
540         if deployment_ok:
541             client.ExtensionsV1beta1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, body=client.V1DeleteOptions())
542         raise e
543
544     return dep, deployment_description
545
546 def undeploy(deployment_description):
547     _configure_api(deployment_description["location"])
548
549     namespace = deployment_description["namespace"]
550
551     # remove any services associated with the component
552     for service in deployment_description["services"]:
553         client.CoreV1Api().delete_namespaced_service(service, namespace)
554
555     # Have k8s delete the underlying pods and replicaset when deleting the deployment.
556     options = client.V1DeleteOptions(propagation_policy="Foreground")
557     client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options)
558
559 def is_available(location, namespace, component_name):
560     _configure_api(location)
561     dep_status = client.AppsV1beta1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace)
562     # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec
563     # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation
564     return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas
565
566 def scale(deployment_description, replicas):
567     ''' Trigger a scaling operation by updating the replica count for the Deployment '''
568
569     def update_replica_count(spec):
570         spec.spec.replicas = replicas
571         return spec
572
573     _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_replica_count)
574
575 def upgrade(deployment_description, image, container_index = 0):
576     ''' Trigger a rolling upgrade by sending a new image name/tag to k8s '''
577
578     def update_image(spec):
579         spec.spec.template.spec.containers[container_index].image = image
580         return spec
581
582     _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_image)
583
584 def rollback(deployment_description, rollback_to=0):
585     '''
586     Undo upgrade by rolling back to a previous revision of the deployment.
587     By default, go back one revision.
588     rollback_to can be used to supply a specific revision number.
589     Returns the image for the app container and the replica count from the rolled-back deployment
590     '''
591     '''
592     2018-07-13
593     Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method.
594     The k8s python client code throws an exception while processing the response from the API.
595     See:
596        - https://github.com/kubernetes-client/python/issues/491
597        - https://github.com/kubernetes/kubernetes/pull/63837
598     The fix has been merged into the master branch but is not in the latest release.
599     '''
600     _configure_api(deployment_description["location"])
601     deployment = deployment_description["deployment"]
602     namespace = deployment_description["namespace"]
603
604     # Initiate the rollback
605     client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback(
606         deployment,
607         namespace,
608         client.AppsV1beta1DeploymentRollback(name=deployment, rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to)))
609
610     # Read back the spec for the rolled-back deployment
611     spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
612     return spec.spec.template.spec.containers[0].image, spec.spec.replicas
613
614 def execute_command_in_deployment(deployment_description, command):
615     '''
616     Enumerates the pods in the k8s deployment identified by "deployment_description",
617     then executes the command (represented as an argv-style list) in "command" in
618     container 0 (the main application container) each of those pods.
619
620     Note that the sets of pods associated with a deployment can change over time.  The
621     enumeration is a snapshot at one point in time.  The command will not be executed in
622     pods that are created after the initial enumeration.   If a pod disappears after the
623     initial enumeration and before the command is executed, the attempt to execute the
624     command will fail.  This is not treated as a fatal error.
625
626     This approach is reasonable for the one current use case for "execute_command":  running a
627     script to notify a container that its configuration has changed as a result of a
628     policy change.  In this use case, the new configuration information is stored into
629     the configuration store (Consul), the pods are enumerated, and the command is executed.
630     If a pod disappears after the enumeration, the fact that the command cannot be run
631     doesn't matter--a nonexistent pod doesn't need to be reconfigured.  Similarly, a pod that
632     comes up after the enumeration will get its initial configuration from the updated version
633     in Consul.
634
635     The optimal solution here would be for k8s to provide an API call to execute a command in
636     all of the pods for a deployment.   Unfortunately, k8s does not provide such a call--the
637     only call provided by k8s operates at the pod level, not the deployment level.
638
639     Another interesting k8s factoid: there's no direct way to list the pods belong to a
640     particular k8s deployment.   The deployment code above sets a label ("k8sdeployment") on
641     the pod that has the k8s deployment name.  To list the pods, the code below queries for
642     pods with the label carrying the deployment name.
643     '''
644     location = deployment_description["location"]
645     _configure_api(location)
646     deployment = deployment_description["deployment"]
647     namespace = deployment_description["namespace"]
648
649     # Get names of all the running pods belonging to the deployment
650     pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod(
651         namespace = namespace,
652         label_selector = "k8sdeployment={0}".format(deployment),
653         field_selector = "status.phase=Running"
654     ).items]
655
656     # Execute command in the running pods
657     return [_execute_command_in_pod(location, namespace, pod_name, command)
658             for pod_name in pod_names]