bd8332299788ed730c7aa9685748fb71b6b89fde
[dcaegen2/platform/plugins.git] / k8s / k8sclient / k8sclient.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 #      http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
19 #
20 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 import os
22 import re
23 import uuid
24 from msb import msb
25 from kubernetes import config, client, stream
26
27 # Default values for readiness probe
28 PROBE_DEFAULT_PERIOD = 15
29 PROBE_DEFAULT_TIMEOUT = 1
30
31 # Location of k8s cluster config file ("kubeconfig")
32 K8S_CONFIG_PATH="/opt/onap/kube/kubeconfig"
33
34 # Regular expression for interval/timeout specification
35 INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$")
36 # Conversion factors to seconds
37 FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600}
38
39 # Regular expression for port mapping
40 # group 1: container port
41 # group 2: / + protocol
42 # group 3: protocol
43 # group 4: host port
44 PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$")
45
46 def _create_deployment_name(component_name):
47     return "dep-{0}".format(component_name)[:63]
48
49 def _create_service_name(component_name):
50     return "{0}".format(component_name)[:63]
51
52 def _create_exposed_service_name(component_name):
53     return ("x{0}".format(component_name))[:63]
54
55 def _configure_api(location=None):
56     # Look for a kubernetes config file
57     if os.path.exists(K8S_CONFIG_PATH):
58         config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False)
59     else:
60         # Maybe we're running in a k8s container and we can use info provided by k8s
61         # We would like to use:
62         # config.load_incluster_config()
63         # but this looks into os.environ for kubernetes host and port, and from
64         # the plugin those aren't visible.   So we use the InClusterConfigLoader class,
65         # where we can set the environment to what we like.
66         # This is probably brittle!  Maybe there's a better alternative.
67         localenv = {
68             config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local",
69             config.incluster_config.SERVICE_PORT_ENV_NAME : "443"
70         }
71         config.incluster_config.InClusterConfigLoader(
72             token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME,
73             cert_filename=config.incluster_config.SERVICE_CERT_FILENAME,
74             environ=localenv
75         ).load_and_set()
76
77 def _parse_interval(t):
78     """
79     Parse an interval specification
80     t can be
81        - a simple integer quantity, interpreted as seconds
82        - a string representation of a decimal integer, interpreted as seconds
83        - a string consisting of a represention of an decimal integer followed by a unit,
84          with "s" representing seconds, "m" representing minutes,
85          and "h" representing hours
86     Used for compatibility with the Docker plugin, where time intervals
87     for health checks were specified as strings with a number and a unit.
88     See 'intervalspec' above for the regular expression that's accepted.
89     """
90     m = INTERVAL_SPEC.match(str(t))
91     if m:
92         time = int(m.group(1)) * FACTORS[m.group(2)]
93     else:
94         raise ValueError("Bad interval specification: {0}".format(t))
95     return time
96
97 def _create_probe(hc, port):
98     ''' Create a Kubernetes probe based on info in the health check dictionary hc '''
99     probe_type = hc['type']
100     probe = None
101     period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD))
102     timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT))
103     if probe_type in ['http', 'https']:
104         probe = client.V1Probe(
105           failure_threshold = 1,
106           initial_delay_seconds = 5,
107           period_seconds = period,
108           timeout_seconds = timeout,
109           http_get = client.V1HTTPGetAction(
110               path = hc['endpoint'],
111               port = port,
112               scheme = probe_type.upper()
113           )
114         )
115     elif probe_type in ['script', 'docker']:
116         probe = client.V1Probe(
117           failure_threshold = 1,
118           initial_delay_seconds = 5,
119           period_seconds = period,
120           timeout_seconds = timeout,
121           _exec = client.V1ExecAction(
122               command = hc['script'].split( )
123           )
124         )
125     return probe
126
127 def _create_resources(resources=None):
128     if resources is not None:
129         resources_obj = client.V1ResourceRequirements(
130           limits = resources.get("limits"),
131           requests = resources.get("requests")
132         )
133         return resources_obj
134     else:
135         return None
136
137 def _create_container_object(name, image, always_pull, **kwargs):
138     # Set up environment variables
139     # Copy any passed in environment variables
140     env = kwargs.get('env') or {}
141     env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env]
142     # Add POD_IP with the IP address of the pod running the container
143     pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP"))
144     env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip))
145
146     # If a health check is specified, create a readiness/liveness probe
147     # (For an HTTP-based check, we assume it's at the first container port)
148     readiness = kwargs.get('readiness')
149     liveness = kwargs.get('liveness')
150     resources = kwargs.get('resources')
151     container_ports = kwargs.get('container_ports') or []
152
153     hc_port = container_ports[0][0] if container_ports else None
154     probe = _create_probe(readiness, hc_port) if readiness else None
155     live_probe = _create_probe(liveness, hc_port) if liveness else None
156     resources_obj = _create_resources(resources) if resources else None
157     port_objs = [client.V1ContainerPort(container_port=port, protocol=proto)
158                  for port, proto in container_ports]
159
160     # Define container for pod
161     return client.V1Container(
162         name=name,
163         image=image,
164         image_pull_policy='Always' if always_pull else 'IfNotPresent',
165         env=env_vars,
166         ports=port_objs,
167         volume_mounts=kwargs.get('volume_mounts') or [],
168         resources=resources_obj,
169         readiness_probe=probe,
170         liveness_probe=live_probe
171     )
172
173 def _create_deployment_object(component_name,
174                               containers,
175                               init_containers,
176                               replicas,
177                               volumes,
178                               labels={},
179                               pull_secrets=[]):
180
181     deployment_name = _create_deployment_name(component_name)
182
183     # Label the pod with the deployment name, so we can find it easily
184     labels.update({"k8sdeployment" : deployment_name})
185
186     # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
187     # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
188     ips = []
189     for secret in pull_secrets:
190         ips.append(client.V1LocalObjectReference(name=secret))
191
192     # Define pod template
193     template = client.V1PodTemplateSpec(
194         metadata=client.V1ObjectMeta(labels=labels),
195         spec=client.V1PodSpec(hostname=component_name,
196                               containers=containers,
197                               init_containers=init_containers,
198                               volumes=volumes,
199                               image_pull_secrets=ips)
200     )
201
202     # Define deployment spec
203     spec = client.ExtensionsV1beta1DeploymentSpec(
204         replicas=replicas,
205         template=template
206     )
207
208     # Create deployment object
209     deployment = client.ExtensionsV1beta1Deployment(
210         kind="Deployment",
211         metadata=client.V1ObjectMeta(name=deployment_name),
212         spec=spec
213     )
214
215     return deployment
216
217 def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type):
218     service_spec = client.V1ServiceSpec(
219         ports=service_ports,
220         selector={"app" : component_name},
221         type=service_type
222     )
223     if annotations:
224         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations)
225     else:
226         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels)
227
228     service = client.V1Service(
229         kind="Service",
230         api_version="v1",
231         metadata=metadata,
232         spec=service_spec
233     )
234     return service
235
236 def parse_ports(port_list):
237     '''
238     Parse the port list into a list of container ports (needed to create the container)
239     and to a set of port mappings to set up k8s services.
240     '''
241     container_ports = []
242     port_map = {}
243     for p in port_list:
244         m = PORTS.match(p.strip())
245         if m:
246             cport = int(m.group(1))
247             hport = int (m.group(4))
248             if m.group(3):
249                 proto = (m.group(3)).upper()
250             else:
251                 proto = "TCP"
252             container_ports.append((cport, proto))
253             port_map[(cport, proto)] = hport
254         else:
255             raise ValueError("Bad port specification: {0}".format(p))
256
257     return container_ports, port_map
258
259 def _parse_volumes(volume_list):
260     volumes = []
261     volume_mounts = []
262     for v in volume_list:
263         vname = str(uuid.uuid4())
264         vhost = v['host']['path']
265         vcontainer = v['container']['bind']
266         vro = (v['container'].get('mode') == 'ro')
267         volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
268         volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro))
269
270     return volumes, volume_mounts
271
272 def _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, log_info, filebeat):
273     if not log_info or not filebeat:
274         return
275     log_dir = log_info.get("log_directory")
276     if not log_dir:
277         return
278     sidecar_volume_mounts = []
279
280     # Create the volume for component log files and volume mounts for the component and sidecar containers
281     volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
282     volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
283     sc_path = log_info.get("alternate_fb_path") or "{0}/{1}".format(filebeat["log_path"], component_name)
284     sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
285
286     # Create the volume for sidecar data and the volume mount for it
287     volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
288     sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=filebeat["data_path"]))
289
290     # Create the volume for the sidecar configuration data and the volume mount for it
291     # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
292     volumes.append(
293         client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=filebeat["config_map"])))
294     sidecar_volume_mounts.append(
295         client.V1VolumeMount(name="filebeat-conf", mount_path=filebeat["config_path"], sub_path=filebeat["config_subpath"]))
296
297     # Finally create the container for the sidecar
298     containers.append(_create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts))
299
300 def _add_tls_init_container(init_containers, volumes, volume_mounts, tls_info, tls_config):
301     #   Adds an InitContainer to the pod to set up TLS certificate information.  For components that act as a
302     #   server(tls_info["use_tls"] is True), the InitContainer will populate a directory with server and CA certificate
303     #   materials in various formats.   For other components (tls_info["use_tls"] is False, or tls_info is not specified),
304     #   the InitContainer will populate a directory with CA certificate materials in PEM and JKS formats.
305     #   In either case, the certificate directory is mounted onto the component container filesystem at the location
306     #   specified by tls_info["component_cert_dir"], if present, otherwise at the configured default mount point
307     #   (tls_config["component_cert_dir"]).
308
309     cert_directory = tls_info.get("cert_directory") or tls_config.get("component_cert_dir")
310     env = {}
311     env["TLS_SERVER"] = "true" if tls_info.get("use_tls") else "false"
312
313     # Create the certificate volume and volume mounts
314     volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource()))
315     volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=cert_directory))
316     init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])]
317
318     # Create the init container
319     init_containers.append(_create_container_object("init-tls", tls_config["image"], False, volume_mounts=init_volume_mounts, env=env))
320
321 def _process_port_map(port_map):
322     service_ports = []      # Ports exposed internally on the k8s network
323     exposed_ports = []      # Ports to be mapped to ports on the k8s nodes via NodePort
324     for (cport, proto), hport in port_map.items():
325         name = "xport-{0}-{1}".format(proto[0].lower(), cport)
326         cport = int(cport)
327         hport = int(hport)
328         service_ports.append(client.V1ServicePort(port=cport, protocol=proto, name=name[1:]))
329         if hport != 0:
330             exposed_ports.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name))
331     return service_ports, exposed_ports
332
333 def _service_exists(location, namespace, component_name):
334     exists = False
335     try:
336         _configure_api(location)
337         client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace)
338         exists = True
339     except client.rest.ApiException:
340         pass
341
342     return exists
343
344 def _patch_deployment(location, namespace, deployment, modify):
345     '''
346     Gets the current spec for 'deployment' in 'namespace'
347     in the k8s cluster at 'location',
348     uses the 'modify' function to change the spec,
349     then sends the updated spec to k8s.
350     '''
351     _configure_api(location)
352
353     # Get deployment spec
354     spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
355
356     # Apply changes to spec
357     spec = modify(spec)
358
359     # Patch the deploy with updated spec
360     client.ExtensionsV1beta1Api().patch_namespaced_deployment(deployment, namespace, spec)
361
362 def _execute_command_in_pod(location, namespace, pod_name, command):
363     '''
364     Execute the command (specified by an argv-style list in  the "command" parameter) in
365     the specified pod in the specified namespace at the specified location.
366     For now at least, we use this only to
367     run a notification script in a pod after a configuration change.
368
369     The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec".
370     Unfortunately due to a bug/limitation in the Python client library, we can't call it directly.
371     We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API.
372     I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py.
373     There are several issues tracking this, in various states.  It isn't clear that there will ever
374     be a fix.
375         - https://github.com/kubernetes-client/python/issues/58
376         - https://github.com/kubernetes-client/python/issues/409
377         - https://github.com/kubernetes-client/python/issues/526
378
379     The main consequence of the workaround using "stream" is that the caller does not get an indication
380     of the exit code returned by the command when it completes execution.   It turns out that the
381     original implementation of notification in the Docker plugin did not use this result, so we can
382     still match the original notification functionality.
383
384     The "stream" approach returns a string containing any output sent by the command to stdout or stderr.
385     We'll return that so it can logged.
386     '''
387     _configure_api(location)
388     try:
389         output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec,
390                              name=pod_name,
391                              namespace=namespace,
392                              command=command,
393                              stdout=True,
394                              stderr=True,
395                             stdin=False,
396                             tty=False)
397     except client.rest.ApiException as e:
398         # If the exception indicates the pod wasn't found,  it's not a fatal error.
399         # It existed when we enumerated the pods for the deployment but no longer exists.
400         # Unfortunately, the only way to distinguish a pod not found from any other error
401         # is by looking at the reason text.
402         # (The ApiException's "status" field should contain the HTTP status code, which would
403         # be 404 if the pod isn't found, but empirical testing reveals that "status" is set
404         # to zero.)
405         if "404 not found" in e.reason.lower():
406             output = "Pod not found"
407         else:
408             raise e
409
410     return {"pod" : pod_name, "output" : output}
411
412 def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs):
413     '''
414     This will create a k8s Deployment and, if needed, one or two k8s Services.
415     (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
416     We're not exposing k8s to the component developer and the blueprint author.
417     This is a conscious choice.  We want to use k8s in a controlled, consistent way, and we want to hide
418     the details from the component developer and the blueprint author.)
419
420     namespace:  the Kubernetes namespace into which the component is deployed
421     component_name:  the component name, used to derive names of Kubernetes entities
422     image: the docker image for the component being deployed
423     replica: the number of instances of the component to be deployed
424     always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of
425        the Docker image for the component, even if it is already present on the Kubernetes node.
426     k8sconfig contains:
427         - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images.
428           (DON'T PANIC:  these are just the names of secrets held in the Kubernetes secret store.)
429         - filebeat: a dictionary of filebeat sidecar parameters:
430             "log_path" : mount point for log volume in filebeat container
431             "data_path" : mount point for data volume in filebeat container
432             "config_path" : mount point for config volume in filebeat container
433             "config_subpath" :  subpath for config data in filebeat container
434             "config_map" : ConfigMap holding the filebeat configuration
435             "image": Docker image to use for filebeat
436         - tls: a dictionary of TLS-related information:
437             "cert_path": mount point for certificate volume in init container
438             "image": Docker image to use for TLS init container
439             "component_cert_dir" : default mount point for certs
440     kwargs may have:
441         - volumes:  array of volume objects, where a volume object is:
442             {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
443         - ports: array of strings in the form "container_port:host_port"
444         - env: map of name-value pairs ( {name0: value0, name1: value1...}
445         - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
446         - log_info: an object with info for setting up ELK logging, with the form:
447             {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}
448         - tls_info: an object with info for setting up TLS (HTTPS), with the form:
449             {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" }
450         - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"}
451             These label will be set on all the pods deployed as a result of this deploy() invocation.
452         - resources: dict with optional "limits" and "requests" resource requirements, each a dict containing:
453             - cpu:    number CPU usage, like 0.5
454             - memory: string memory requirement, like "2Gi"
455         - readiness: dict with health check info; if present, used to create a readiness probe for the main container.  Includes:
456             - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
457             - interval: period (in seconds) between probes
458             - timeout:  time (in seconds) to allow a probe to complete
459             - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types
460             - path: the full path to the script to be executed in the container for "script" and "docker" types
461         - liveness: dict with health check info; if present, used to create a liveness probe for the main container.  Includes:
462             - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
463             - interval: period (in seconds) between probes
464             - timeout:  time (in seconds) to allow a probe to complete
465             - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types
466             - path: the full path to the script to be executed in the container for "script" and "docker" types
467         - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
468
469     '''
470
471     deployment_ok = False
472     cip_service_created = False
473     deployment_description = {
474         "namespace": namespace,
475         "location" : kwargs.get("k8s_location"),
476         "deployment": '',
477         "services": []
478     }
479
480     try:
481
482         # Get API handles
483         _configure_api(kwargs.get("k8s_location"))
484         core = client.CoreV1Api()
485         ext = client.ExtensionsV1beta1Api()
486
487         # Parse the port mapping
488         container_ports, port_map = parse_ports(kwargs.get("ports", []))
489
490         # Parse the volumes list into volumes and volume_mounts for the deployment
491         volumes, volume_mounts = _parse_volumes(kwargs.get("volumes", []))
492
493         # Initialize the list of containers that will be part of the pod
494         containers = []
495         init_containers = []
496
497         # Set up the ELK logging sidecar container, if needed
498         _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"), k8sconfig.get("filebeat"))
499
500         # Set up TLS information
501         _add_tls_init_container(init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {}, k8sconfig.get("tls"))
502
503         # Create the container for the component
504         # Make it the first container in the pod
505         container_args = {key: kwargs.get(key) for key in ("env", "readiness", "liveness", "resources")}
506         container_args['container_ports'] = container_ports
507         container_args['volume_mounts'] = volume_mounts
508         containers.insert(0, _create_container_object(component_name, image, always_pull, **container_args))
509
510         # Build the k8s Deployment object
511         labels = kwargs.get("labels", {})
512         labels["app"] = component_name
513         dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"])
514
515         # Have k8s deploy it
516         ext.create_namespaced_deployment(namespace, dep)
517         deployment_ok = True
518         deployment_description["deployment"] = _create_deployment_name(component_name)
519
520         # Create service(s), if a port mapping is specified
521         if port_map:
522             service_ports, exposed_ports = _process_port_map(port_map)
523
524             # If there are ports to be exposed via MSB, set up the annotation for the service
525             msb_list = kwargs.get("msb_list")
526             annotations = msb.create_msb_annotation(msb_list) if msb_list else ''
527
528             # Create a ClusterIP service for access via the k8s network
529             service = _create_service_object(_create_service_name(component_name), component_name, service_ports, annotations, labels, "ClusterIP")
530             core.create_namespaced_service(namespace, service)
531             cip_service_created = True
532             deployment_description["services"].append(_create_service_name(component_name))
533
534             # If there are ports to be exposed on the k8s nodes, create a "NodePort" service
535             if exposed_ports:
536                 exposed_service = \
537                     _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, '', labels, "NodePort")
538                 core.create_namespaced_service(namespace, exposed_service)
539                 deployment_description["services"].append(_create_exposed_service_name(component_name))
540
541     except Exception as e:
542         # If the ClusterIP service was created, delete the service:
543         if cip_service_created:
544             core.delete_namespaced_service(_create_service_name(component_name), namespace)
545         # If the deployment was created but not the service, delete the deployment
546         if deployment_ok:
547             client.ExtensionsV1beta1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, body=client.V1DeleteOptions())
548         raise e
549
550     return dep, deployment_description
551
552 def undeploy(deployment_description):
553     _configure_api(deployment_description["location"])
554
555     namespace = deployment_description["namespace"]
556
557     # remove any services associated with the component
558     for service in deployment_description["services"]:
559         client.CoreV1Api().delete_namespaced_service(service, namespace)
560
561     # Have k8s delete the underlying pods and replicaset when deleting the deployment.
562     options = client.V1DeleteOptions(propagation_policy="Foreground")
563     client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options)
564
565 def is_available(location, namespace, component_name):
566     _configure_api(location)
567     dep_status = client.AppsV1beta1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace)
568     # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec
569     # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation
570     return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas
571
572 def scale(deployment_description, replicas):
573     ''' Trigger a scaling operation by updating the replica count for the Deployment '''
574
575     def update_replica_count(spec):
576         spec.spec.replicas = replicas
577         return spec
578
579     _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_replica_count)
580
581 def upgrade(deployment_description, image, container_index = 0):
582     ''' Trigger a rolling upgrade by sending a new image name/tag to k8s '''
583
584     def update_image(spec):
585         spec.spec.template.spec.containers[container_index].image = image
586         return spec
587
588     _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_image)
589
590 def rollback(deployment_description, rollback_to=0):
591     '''
592     Undo upgrade by rolling back to a previous revision of the deployment.
593     By default, go back one revision.
594     rollback_to can be used to supply a specific revision number.
595     Returns the image for the app container and the replica count from the rolled-back deployment
596     '''
597     '''
598     2018-07-13
599     Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method.
600     The k8s python client code throws an exception while processing the response from the API.
601     See:
602        - https://github.com/kubernetes-client/python/issues/491
603        - https://github.com/kubernetes/kubernetes/pull/63837
604     The fix has been merged into the master branch but is not in the latest release.
605     '''
606     _configure_api(deployment_description["location"])
607     deployment = deployment_description["deployment"]
608     namespace = deployment_description["namespace"]
609
610     # Initiate the rollback
611     client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback(
612         deployment,
613         namespace,
614         client.AppsV1beta1DeploymentRollback(name=deployment, rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to)))
615
616     # Read back the spec for the rolled-back deployment
617     spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
618     return spec.spec.template.spec.containers[0].image, spec.spec.replicas
619
620 def execute_command_in_deployment(deployment_description, command):
621     '''
622     Enumerates the pods in the k8s deployment identified by "deployment_description",
623     then executes the command (represented as an argv-style list) in "command" in
624     container 0 (the main application container) each of those pods.
625
626     Note that the sets of pods associated with a deployment can change over time.  The
627     enumeration is a snapshot at one point in time.  The command will not be executed in
628     pods that are created after the initial enumeration.   If a pod disappears after the
629     initial enumeration and before the command is executed, the attempt to execute the
630     command will fail.  This is not treated as a fatal error.
631
632     This approach is reasonable for the one current use case for "execute_command":  running a
633     script to notify a container that its configuration has changed as a result of a
634     policy change.  In this use case, the new configuration information is stored into
635     the configuration store (Consul), the pods are enumerated, and the command is executed.
636     If a pod disappears after the enumeration, the fact that the command cannot be run
637     doesn't matter--a nonexistent pod doesn't need to be reconfigured.  Similarly, a pod that
638     comes up after the enumeration will get its initial configuration from the updated version
639     in Consul.
640
641     The optimal solution here would be for k8s to provide an API call to execute a command in
642     all of the pods for a deployment.   Unfortunately, k8s does not provide such a call--the
643     only call provided by k8s operates at the pod level, not the deployment level.
644
645     Another interesting k8s factoid: there's no direct way to list the pods belong to a
646     particular k8s deployment.   The deployment code above sets a label ("k8sdeployment") on
647     the pod that has the k8s deployment name.  To list the pods, the code below queries for
648     pods with the label carrying the deployment name.
649     '''
650     location = deployment_description["location"]
651     _configure_api(location)
652     deployment = deployment_description["deployment"]
653     namespace = deployment_description["namespace"]
654
655     # Get names of all the running pods belonging to the deployment
656     pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod(
657         namespace = namespace,
658         label_selector = "k8sdeployment={0}".format(deployment),
659         field_selector = "status.phase=Running"
660     ).items]
661
662     # Execute command in the running pods
663     return [_execute_command_in_pod(location, namespace, pod_name, command)
664             for pod_name in pod_names]