move plugins from from ccsdk to dcaegen2
[dcaegen2/platform/plugins.git] / k8s / k8sclient / k8sclient.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 #      http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
19 #
20 import os
21 import re
22 import uuid
23 from kubernetes import config, client, stream
24
25 # Default values for readiness probe
26 PROBE_DEFAULT_PERIOD = 15
27 PROBE_DEFAULT_TIMEOUT = 1
28
29 # Location of k8s cluster config file ("kubeconfig")
30 K8S_CONFIG_PATH="/opt/onap/kube/kubeconfig"
31
32 # Regular expression for interval/timeout specification
33 INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$")
34 # Conversion factors to seconds
35 FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600}
36
37 # Regular expression for port mapping
38 # group 1: container port
39 # group 2: / + protocol
40 # group 3: protocol
41 # group 4: host port
42 PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$")
43
44 def _create_deployment_name(component_name):
45     return "dep-{0}".format(component_name)[:63]
46
47 def _create_service_name(component_name):
48     return "{0}".format(component_name)[:63]
49
50 def _create_exposed_service_name(component_name):
51     return ("x{0}".format(component_name))[:63]
52
53 def _configure_api(location=None):
54     # Look for a kubernetes config file
55     if os.path.exists(K8S_CONFIG_PATH):
56         config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False)
57     else:
58         # Maybe we're running in a k8s container and we can use info provided by k8s
59         # We would like to use:
60         # config.load_incluster_config()
61         # but this looks into os.environ for kubernetes host and port, and from
62         # the plugin those aren't visible.   So we use the InClusterConfigLoader class,
63         # where we can set the environment to what we like.
64         # This is probably brittle!  Maybe there's a better alternative.
65         localenv = {
66             config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local",
67             config.incluster_config.SERVICE_PORT_ENV_NAME : "443"
68         }
69         config.incluster_config.InClusterConfigLoader(
70             token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME,
71             cert_filename=config.incluster_config.SERVICE_CERT_FILENAME,
72             environ=localenv
73         ).load_and_set()
74
75 def _parse_interval(t):
76     """
77     Parse an interval specification
78     t can be
79        - a simple integer quantity, interpreted as seconds
80        - a string representation of a decimal integer, interpreted as seconds
81        - a string consisting of a represention of an decimal integer followed by a unit,
82          with "s" representing seconds, "m" representing minutes,
83          and "h" representing hours
84     Used for compatibility with the Docker plugin, where time intervals
85     for health checks were specified as strings with a number and a unit.
86     See 'intervalspec' above for the regular expression that's accepted.
87     """
88     m = INTERVAL_SPEC.match(str(t))
89     if m:
90         time = int(m.group(1)) * FACTORS[m.group(2)]
91     else:
92         raise ValueError("Bad interval specification: {0}".format(t))
93     return time
94
95 def _create_probe(hc, port):
96     ''' Create a Kubernetes probe based on info in the health check dictionary hc '''
97     probe_type = hc['type']
98     probe = None
99     period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD))
100     timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT))
101     if probe_type in ['http', 'https']:
102         probe = client.V1Probe(
103           failure_threshold = 1,
104           initial_delay_seconds = 5,
105           period_seconds = period,
106           timeout_seconds = timeout,
107           http_get = client.V1HTTPGetAction(
108               path = hc['endpoint'],
109               port = port,
110               scheme = probe_type.upper()
111           )
112         )
113     elif probe_type in ['script', 'docker']:
114         probe = client.V1Probe(
115           failure_threshold = 1,
116           initial_delay_seconds = 5,
117           period_seconds = period,
118           timeout_seconds = timeout,
119           _exec = client.V1ExecAction(
120               command = hc['script'].split( )
121           )
122         )
123     return probe
124
125 def _create_resources(resources=None):
126     if resources is not None:
127         resources_obj = client.V1ResourceRequirements(
128           limits = resources.get("limits"),
129           requests = resources.get("requests")
130         )
131         return resources_obj
132     else:
133         return None
134
135 def _create_container_object(name, image, always_pull, **kwargs):
136     # Set up environment variables
137     # Copy any passed in environment variables
138     env = kwargs.get('env') or {}
139     env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env]
140     # Add POD_IP with the IP address of the pod running the container
141     pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP"))
142     env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip))
143
144     # If a health check is specified, create a readiness/liveness probe
145     # (For an HTTP-based check, we assume it's at the first container port)
146     readiness = kwargs.get('readiness')
147     liveness = kwargs.get('liveness')
148     resources = kwargs.get('resources')
149     container_ports = kwargs.get('container_ports') or []
150
151     hc_port = container_ports[0][0] if container_ports else None
152     probe = _create_probe(readiness, hc_port) if readiness else None
153     live_probe = _create_probe(liveness, hc_port) if liveness else None
154     resources_obj = _create_resources(resources) if resources else None
155     port_objs = [client.V1ContainerPort(container_port=port, protocol=proto)
156                  for port, proto in container_ports]
157
158     # Define container for pod
159     return client.V1Container(
160         name=name,
161         image=image,
162         image_pull_policy='Always' if always_pull else 'IfNotPresent',
163         env=env_vars,
164         ports=port_objs,
165         volume_mounts=kwargs.get('volume_mounts') or [],
166         resources=resources_obj,
167         readiness_probe=probe,
168         liveness_probe=live_probe
169     )
170
171 def _create_deployment_object(component_name,
172                               containers,
173                               init_containers,
174                               replicas,
175                               volumes,
176                               labels={},
177                               pull_secrets=[]):
178
179     deployment_name = _create_deployment_name(component_name)
180
181     # Label the pod with the deployment name, so we can find it easily
182     labels.update({"k8sdeployment" : deployment_name})
183
184     # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
185     # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
186     ips = []
187     for secret in pull_secrets:
188         ips.append(client.V1LocalObjectReference(name=secret))
189
190     # Define pod template
191     template = client.V1PodTemplateSpec(
192         metadata=client.V1ObjectMeta(labels=labels),
193         spec=client.V1PodSpec(hostname=component_name,
194                               containers=containers,
195                               init_containers=init_containers,
196                               volumes=volumes,
197                               image_pull_secrets=ips)
198     )
199
200     # Define deployment spec
201     spec = client.ExtensionsV1beta1DeploymentSpec(
202         replicas=replicas,
203         template=template
204     )
205
206     # Create deployment object
207     deployment = client.ExtensionsV1beta1Deployment(
208         kind="Deployment",
209         metadata=client.V1ObjectMeta(name=deployment_name),
210         spec=spec
211     )
212
213     return deployment
214
215 def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type):
216     service_spec = client.V1ServiceSpec(
217         ports=service_ports,
218         selector={"app" : component_name},
219         type=service_type
220     )
221     if annotations:
222         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations)
223     else:
224         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels)
225
226     service = client.V1Service(
227         kind="Service",
228         api_version="v1",
229         metadata=metadata,
230         spec=service_spec
231     )
232     return service
233
234 def parse_ports(port_list):
235     '''
236     Parse the port list into a list of container ports (needed to create the container)
237     and to a set of port mappings to set up k8s services.
238     '''
239     container_ports = []
240     port_map = {}
241     for p in port_list:
242         m = PORTS.match(p.strip())
243         if m:
244             cport = int(m.group(1))
245             hport = int (m.group(4))
246             if m.group(3):
247                 proto = (m.group(3)).upper()
248             else:
249                 proto = "TCP"
250             container_ports.append((cport, proto))
251             port_map[(cport, proto)] = hport
252         else:
253             raise ValueError("Bad port specification: {0}".format(p))
254
255     return container_ports, port_map
256
257 def _parse_volumes(volume_list):
258     volumes = []
259     volume_mounts = []
260     for v in volume_list:
261         vname = str(uuid.uuid4())
262         vhost = v['host']['path']
263         vcontainer = v['container']['bind']
264         vro = (v['container'].get('mode') == 'ro')
265         volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
266         volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro))
267
268     return volumes, volume_mounts
269
270 def _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, log_info, filebeat):
271     if not log_info or not filebeat:
272         return
273     log_dir = log_info.get("log_directory")
274     if not log_dir:
275         return
276     sidecar_volume_mounts = []
277
278     # Create the volume for component log files and volume mounts for the component and sidecar containers
279     volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
280     volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
281     sc_path = log_info.get("alternate_fb_path") or "{0}/{1}".format(filebeat["log_path"], component_name)
282     sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
283
284     # Create the volume for sidecar data and the volume mount for it
285     volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
286     sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=filebeat["data_path"]))
287
288     # Create the volume for the sidecar configuration data and the volume mount for it
289     # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
290     volumes.append(
291         client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=filebeat["config_map"])))
292     sidecar_volume_mounts.append(
293         client.V1VolumeMount(name="filebeat-conf", mount_path=filebeat["config_path"], sub_path=filebeat["config_subpath"]))
294
295     # Finally create the container for the sidecar
296     containers.append(_create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts))
297
298 def _add_tls_init_container(init_containers, volumes, volume_mounts, tls_info, tls_config):
299     #   Adds an InitContainer to the pod to set up TLS certificate information.  For components that act as a
300     #   server(tls_info["use_tls"] is True), the InitContainer will populate a directory with server and CA certificate
301     #   materials in various formats.   For other components (tls_info["use_tls"] is False, or tls_info is not specified),
302     #   the InitContainer will populate a directory with CA certificate materials in PEM and JKS formats.
303     #   In either case, the certificate directory is mounted onto the component container filesystem at the location
304     #   specified by tls_info["component_cert_dir"], if present, otherwise at the configured default mount point
305     #   (tls_config["component_cert_dir"]).
306
307     cert_directory = tls_info.get("cert_directory") or tls_config.get("component_cert_dir")
308     env = {}
309     env["TLS_SERVER"] = "true" if tls_info.get("use_tls") else "false"
310
311     # Create the certificate volume and volume mounts
312     volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource()))
313     volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=cert_directory))
314     init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])]
315
316     # Create the init container
317     init_containers.append(_create_container_object("init-tls", tls_config["image"], False, volume_mounts=init_volume_mounts, env=env))
318
319 def _process_port_map(port_map):
320     service_ports = []      # Ports exposed internally on the k8s network
321     exposed_ports = []      # Ports to be mapped to ports on the k8s nodes via NodePort
322     for (cport, proto), hport in port_map.items():
323         name = "xport-{0}-{1}".format(proto[0].lower(), cport)
324         cport = int(cport)
325         hport = int(hport)
326         service_ports.append(client.V1ServicePort(port=cport, protocol=proto, name=name[1:]))
327         if hport != 0:
328             exposed_ports.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name))
329     return service_ports, exposed_ports
330
331 def _service_exists(location, namespace, component_name):
332     exists = False
333     try:
334         _configure_api(location)
335         client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace)
336         exists = True
337     except client.rest.ApiException:
338         pass
339
340     return exists
341
342 def _patch_deployment(location, namespace, deployment, modify):
343     '''
344     Gets the current spec for 'deployment' in 'namespace'
345     in the k8s cluster at 'location',
346     uses the 'modify' function to change the spec,
347     then sends the updated spec to k8s.
348     '''
349     _configure_api(location)
350
351     # Get deployment spec
352     spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
353
354     # Apply changes to spec
355     spec = modify(spec)
356
357     # Patch the deploy with updated spec
358     client.ExtensionsV1beta1Api().patch_namespaced_deployment(deployment, namespace, spec)
359
360 def _execute_command_in_pod(location, namespace, pod_name, command):
361     '''
362     Execute the command (specified by an argv-style list in  the "command" parameter) in
363     the specified pod in the specified namespace at the specified location.
364     For now at least, we use this only to
365     run a notification script in a pod after a configuration change.
366
367     The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec".
368     Unfortunately due to a bug/limitation in the Python client library, we can't call it directly.
369     We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API.
370     I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py.
371     There are several issues tracking this, in various states.  It isn't clear that there will ever
372     be a fix.
373         - https://github.com/kubernetes-client/python/issues/58
374         - https://github.com/kubernetes-client/python/issues/409
375         - https://github.com/kubernetes-client/python/issues/526
376
377     The main consequence of the workaround using "stream" is that the caller does not get an indication
378     of the exit code returned by the command when it completes execution.   It turns out that the
379     original implementation of notification in the Docker plugin did not use this result, so we can
380     still match the original notification functionality.
381
382     The "stream" approach returns a string containing any output sent by the command to stdout or stderr.
383     We'll return that so it can logged.
384     '''
385     _configure_api(location)
386     try:
387         output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec,
388                              name=pod_name,
389                              namespace=namespace,
390                              command=command,
391                              stdout=True,
392                              stderr=True,
393                             stdin=False,
394                             tty=False)
395     except client.rest.ApiException as e:
396         # If the exception indicates the pod wasn't found,  it's not a fatal error.
397         # It existed when we enumerated the pods for the deployment but no longer exists.
398         # Unfortunately, the only way to distinguish a pod not found from any other error
399         # is by looking at the reason text.
400         # (The ApiException's "status" field should contain the HTTP status code, which would
401         # be 404 if the pod isn't found, but empirical testing reveals that "status" is set
402         # to zero.)
403         if "404 not found" in e.reason.lower():
404             output = "Pod not found"
405         else:
406             raise e
407
408     return {"pod" : pod_name, "output" : output}
409
410 def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs):
411     '''
412     This will create a k8s Deployment and, if needed, one or two k8s Services.
413     (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
414     We're not exposing k8s to the component developer and the blueprint author.
415     This is a conscious choice.  We want to use k8s in a controlled, consistent way, and we want to hide
416     the details from the component developer and the blueprint author.)
417
418     namespace:  the Kubernetes namespace into which the component is deployed
419     component_name:  the component name, used to derive names of Kubernetes entities
420     image: the docker image for the component being deployed
421     replica: the number of instances of the component to be deployed
422     always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of
423        the Docker image for the component, even if it is already present on the Kubernetes node.
424     k8sconfig contains:
425         - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images.
426           (DON'T PANIC:  these are just the names of secrets held in the Kubernetes secret store.)
427         - filebeat: a dictionary of filebeat sidecar parameters:
428             "log_path" : mount point for log volume in filebeat container
429             "data_path" : mount point for data volume in filebeat container
430             "config_path" : mount point for config volume in filebeat container
431             "config_subpath" :  subpath for config data in filebeat container
432             "config_map" : ConfigMap holding the filebeat configuration
433             "image": Docker image to use for filebeat
434         - tls: a dictionary of TLS-related information:
435             "cert_path": mount point for certificate volume in init container
436             "image": Docker image to use for TLS init container
437             "component_cert_dir" : default mount point for certs
438     kwargs may have:
439         - volumes:  array of volume objects, where a volume object is:
440             {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
441         - ports: array of strings in the form "container_port:host_port"
442         - env: map of name-value pairs ( {name0: value0, name1: value1...}
443         - log_info: an object with info for setting up ELK logging, with the form:
444             {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}
445         - tls_info: an object with info for setting up TLS (HTTPS), with the form:
446             {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" }
447         - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"}
448             These label will be set on all the pods deployed as a result of this deploy() invocation.
449         - resources: dict with optional "limits" and "requests" resource requirements, each a dict containing:
450             - cpu:    number CPU usage, like 0.5
451             - memory: string memory requirement, like "2Gi"
452         - readiness: dict with health check info; if present, used to create a readiness probe for the main container.  Includes:
453             - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
454             - interval: period (in seconds) between probes
455             - timeout:  time (in seconds) to allow a probe to complete
456             - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types
457             - path: the full path to the script to be executed in the container for "script" and "docker" types
458         - liveness: dict with health check info; if present, used to create a liveness probe for the main container.  Includes:
459             - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
460             - interval: period (in seconds) between probes
461             - timeout:  time (in seconds) to allow a probe to complete
462             - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types
463             - path: the full path to the script to be executed in the container for "script" and "docker" types
464         - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
465
466     '''
467
468     deployment_ok = False
469     cip_service_created = False
470     deployment_description = {
471         "namespace": namespace,
472         "location" : kwargs.get("k8s_location"),
473         "deployment": '',
474         "services": []
475     }
476
477     try:
478
479         # Get API handles
480         _configure_api(kwargs.get("k8s_location"))
481         core = client.CoreV1Api()
482         ext = client.ExtensionsV1beta1Api()
483
484         # Parse the port mapping
485         container_ports, port_map = parse_ports(kwargs.get("ports", []))
486
487         # Parse the volumes list into volumes and volume_mounts for the deployment
488         volumes, volume_mounts = _parse_volumes(kwargs.get("volumes", []))
489
490         # Initialize the list of containers that will be part of the pod
491         containers = []
492         init_containers = []
493
494         # Set up the ELK logging sidecar container, if needed
495         _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"), k8sconfig.get("filebeat"))
496
497         # Set up TLS information
498         _add_tls_init_container(init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {}, k8sconfig.get("tls"))
499
500         # Create the container for the component
501         # Make it the first container in the pod
502         container_args = {key: kwargs.get(key) for key in ("env", "readiness", "liveness", "resources")}
503         container_args['container_ports'] = container_ports
504         container_args['volume_mounts'] = volume_mounts
505         containers.insert(0, _create_container_object(component_name, image, always_pull, **container_args))
506
507         # Build the k8s Deployment object
508         labels = kwargs.get("labels", {})
509         labels["app"] = component_name
510         dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"])
511
512         # Have k8s deploy it
513         ext.create_namespaced_deployment(namespace, dep)
514         deployment_ok = True
515         deployment_description["deployment"] = _create_deployment_name(component_name)
516
517         # Create service(s), if a port mapping is specified
518         if port_map:
519             service_ports, exposed_ports = _process_port_map(port_map)
520
521             # Create a ClusterIP service for access via the k8s network
522             service = _create_service_object(_create_service_name(component_name), component_name, service_ports, None, labels, "ClusterIP")
523             core.create_namespaced_service(namespace, service)
524             cip_service_created = True
525             deployment_description["services"].append(_create_service_name(component_name))
526
527             # If there are ports to be exposed on the k8s nodes, create a "NodePort" service
528             if exposed_ports:
529                 exposed_service = \
530                     _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, '', labels, "NodePort")
531                 core.create_namespaced_service(namespace, exposed_service)
532                 deployment_description["services"].append(_create_exposed_service_name(component_name))
533
534     except Exception as e:
535         # If the ClusterIP service was created, delete the service:
536         if cip_service_created:
537             core.delete_namespaced_service(_create_service_name(component_name), namespace)
538         # If the deployment was created but not the service, delete the deployment
539         if deployment_ok:
540             client.ExtensionsV1beta1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, body=client.V1DeleteOptions())
541         raise e
542
543     return dep, deployment_description
544
545 def undeploy(deployment_description):
546     _configure_api(deployment_description["location"])
547
548     namespace = deployment_description["namespace"]
549
550     # remove any services associated with the component
551     for service in deployment_description["services"]:
552         client.CoreV1Api().delete_namespaced_service(service, namespace)
553
554     # Have k8s delete the underlying pods and replicaset when deleting the deployment.
555     options = client.V1DeleteOptions(propagation_policy="Foreground")
556     client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options)
557
558 def is_available(location, namespace, component_name):
559     _configure_api(location)
560     dep_status = client.AppsV1beta1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace)
561     # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec
562     # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation
563     return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas
564
565 def scale(deployment_description, replicas):
566     ''' Trigger a scaling operation by updating the replica count for the Deployment '''
567
568     def update_replica_count(spec):
569         spec.spec.replicas = replicas
570         return spec
571
572     _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_replica_count)
573
574 def upgrade(deployment_description, image, container_index = 0):
575     ''' Trigger a rolling upgrade by sending a new image name/tag to k8s '''
576
577     def update_image(spec):
578         spec.spec.template.spec.containers[container_index].image = image
579         return spec
580
581     _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_image)
582
583 def rollback(deployment_description, rollback_to=0):
584     '''
585     Undo upgrade by rolling back to a previous revision of the deployment.
586     By default, go back one revision.
587     rollback_to can be used to supply a specific revision number.
588     Returns the image for the app container and the replica count from the rolled-back deployment
589     '''
590     '''
591     2018-07-13
592     Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method.
593     The k8s python client code throws an exception while processing the response from the API.
594     See:
595        - https://github.com/kubernetes-client/python/issues/491
596        - https://github.com/kubernetes/kubernetes/pull/63837
597     The fix has been merged into the master branch but is not in the latest release.
598     '''
599     _configure_api(deployment_description["location"])
600     deployment = deployment_description["deployment"]
601     namespace = deployment_description["namespace"]
602
603     # Initiate the rollback
604     client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback(
605         deployment,
606         namespace,
607         client.AppsV1beta1DeploymentRollback(name=deployment, rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to)))
608
609     # Read back the spec for the rolled-back deployment
610     spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
611     return spec.spec.template.spec.containers[0].image, spec.spec.replicas
612
613 def execute_command_in_deployment(deployment_description, command):
614     '''
615     Enumerates the pods in the k8s deployment identified by "deployment_description",
616     then executes the command (represented as an argv-style list) in "command" in
617     container 0 (the main application container) each of those pods.
618
619     Note that the sets of pods associated with a deployment can change over time.  The
620     enumeration is a snapshot at one point in time.  The command will not be executed in
621     pods that are created after the initial enumeration.   If a pod disappears after the
622     initial enumeration and before the command is executed, the attempt to execute the
623     command will fail.  This is not treated as a fatal error.
624
625     This approach is reasonable for the one current use case for "execute_command":  running a
626     script to notify a container that its configuration has changed as a result of a
627     policy change.  In this use case, the new configuration information is stored into
628     the configuration store (Consul), the pods are enumerated, and the command is executed.
629     If a pod disappears after the enumeration, the fact that the command cannot be run
630     doesn't matter--a nonexistent pod doesn't need to be reconfigured.  Similarly, a pod that
631     comes up after the enumeration will get its initial configuration from the updated version
632     in Consul.
633
634     The optimal solution here would be for k8s to provide an API call to execute a command in
635     all of the pods for a deployment.   Unfortunately, k8s does not provide such a call--the
636     only call provided by k8s operates at the pod level, not the deployment level.
637
638     Another interesting k8s factoid: there's no direct way to list the pods belong to a
639     particular k8s deployment.   The deployment code above sets a label ("k8sdeployment") on
640     the pod that has the k8s deployment name.  To list the pods, the code below queries for
641     pods with the label carrying the deployment name.
642     '''
643     location = deployment_description["location"]
644     _configure_api(location)
645     deployment = deployment_description["deployment"]
646     namespace = deployment_description["namespace"]
647
648     # Get names of all the running pods belonging to the deployment
649     pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod(
650         namespace = namespace,
651         label_selector = "k8sdeployment={0}".format(deployment),
652         field_selector = "status.phase=Running"
653     ).items]
654
655     # Execute command in the running pods
656     return [_execute_command_in_pod(location, namespace, pod_name, command)
657             for pod_name in pod_names]