Merge "Add support to request certificates from CMPv2 server in DCAE cloudify blueprints"
[dcaegen2/platform/plugins.git] / k8s / k8sclient / k8sclient.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # Copyright (c) 2020 Nokia. All rights reserved.
7 # ================================================================================
8 # Licensed under the Apache License, Version 2.0 (the "License");
9 # you may not use this file except in compliance with the License.
10 # You may obtain a copy of the License at
11 #
12 #      http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS,
16 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 # See the License for the specific language governing permissions and
18 # limitations under the License.
19 # ============LICENSE_END=========================================================
20 #
21 import os
22 import re
23 import uuid
24 from kubernetes import config, client, stream
25
26 # Default values for readiness probe
27 PROBE_DEFAULT_PERIOD = 15
28 PROBE_DEFAULT_TIMEOUT = 1
29
30 # Location of k8s cluster config file ("kubeconfig")
31 K8S_CONFIG_PATH="/opt/onap/kube/kubeconfig"
32
33 # Regular expression for interval/timeout specification
34 INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$")
35 # Conversion factors to seconds
36 FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600}
37
38 # Regular expression for port mapping
39 # group 1: container port
40 # group 2: / + protocol
41 # group 3: protocol
42 # group 4: host port
43 PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$")
44
45 # Constants for external_cert
46 MOUNT_PATH = "/etc/onap/aaf/certservice/certs/"
47 KEYSTORE_PATH = MOUNT_PATH + "certServiceClient-keystore.jks"
48 TRUSTSTORE_PATH = MOUNT_PATH + "truststore.jks"
49 CERT_SECRET_NAME = "aaf-cert-service-client-tls-secret"
50
51 def _create_deployment_name(component_name):
52     return "dep-{0}".format(component_name)[:63]
53
54 def _create_service_name(component_name):
55     return "{0}".format(component_name)[:63]
56
57 def _create_exposed_service_name(component_name):
58     return ("x{0}".format(component_name))[:63]
59
60 def _configure_api(location=None):
61     # Look for a kubernetes config file
62     if os.path.exists(K8S_CONFIG_PATH):
63         config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False)
64     else:
65         # Maybe we're running in a k8s container and we can use info provided by k8s
66         # We would like to use:
67         # config.load_incluster_config()
68         # but this looks into os.environ for kubernetes host and port, and from
69         # the plugin those aren't visible.   So we use the InClusterConfigLoader class,
70         # where we can set the environment to what we like.
71         # This is probably brittle!  Maybe there's a better alternative.
72         localenv = {
73             config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local",
74             config.incluster_config.SERVICE_PORT_ENV_NAME : "443"
75         }
76         config.incluster_config.InClusterConfigLoader(
77             token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME,
78             cert_filename=config.incluster_config.SERVICE_CERT_FILENAME,
79             environ=localenv
80         ).load_and_set()
81
82 def _parse_interval(t):
83     """
84     Parse an interval specification
85     t can be
86        - a simple integer quantity, interpreted as seconds
87        - a string representation of a decimal integer, interpreted as seconds
88        - a string consisting of a represention of an decimal integer followed by a unit,
89          with "s" representing seconds, "m" representing minutes,
90          and "h" representing hours
91     Used for compatibility with the Docker plugin, where time intervals
92     for health checks were specified as strings with a number and a unit.
93     See 'intervalspec' above for the regular expression that's accepted.
94     """
95     m = INTERVAL_SPEC.match(str(t))
96     if m:
97         time = int(m.group(1)) * FACTORS[m.group(2)]
98     else:
99         raise ValueError("Bad interval specification: {0}".format(t))
100     return time
101
102 def _create_probe(hc, port):
103     ''' Create a Kubernetes probe based on info in the health check dictionary hc '''
104     probe_type = hc['type']
105     probe = None
106     period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD))
107     timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT))
108     if probe_type in ['http', 'https']:
109         probe = client.V1Probe(
110           failure_threshold = 1,
111           initial_delay_seconds = 5,
112           period_seconds = period,
113           timeout_seconds = timeout,
114           http_get = client.V1HTTPGetAction(
115               path = hc['endpoint'],
116               port = port,
117               scheme = probe_type.upper()
118           )
119         )
120     elif probe_type in ['script', 'docker']:
121         probe = client.V1Probe(
122           failure_threshold = 1,
123           initial_delay_seconds = 5,
124           period_seconds = period,
125           timeout_seconds = timeout,
126           _exec = client.V1ExecAction(
127               command = hc['script'].split( )
128           )
129         )
130     return probe
131
132 def _create_resources(resources=None):
133     if resources is not None:
134         resources_obj = client.V1ResourceRequirements(
135           limits = resources.get("limits"),
136           requests = resources.get("requests")
137         )
138         return resources_obj
139     else:
140         return None
141
142 def _create_container_object(name, image, always_pull, **kwargs):
143     # Set up environment variables
144     # Copy any passed in environment variables
145     env = kwargs.get('env') or {}
146     env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env]
147     # Add POD_IP with the IP address of the pod running the container
148     pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP"))
149     env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip))
150
151     # If a health check is specified, create a readiness/liveness probe
152     # (For an HTTP-based check, we assume it's at the first container port)
153     readiness = kwargs.get('readiness')
154     liveness = kwargs.get('liveness')
155     resources = kwargs.get('resources')
156     container_ports = kwargs.get('container_ports') or []
157
158     hc_port = container_ports[0][0] if container_ports else None
159     probe = _create_probe(readiness, hc_port) if readiness else None
160     live_probe = _create_probe(liveness, hc_port) if liveness else None
161     resources_obj = _create_resources(resources) if resources else None
162     port_objs = [client.V1ContainerPort(container_port=port, protocol=proto)
163                  for port, proto in container_ports]
164
165     # Define container for pod
166     return client.V1Container(
167         name=name,
168         image=image,
169         image_pull_policy='Always' if always_pull else 'IfNotPresent',
170         env=env_vars,
171         ports=port_objs,
172         volume_mounts=kwargs.get('volume_mounts') or [],
173         resources=resources_obj,
174         readiness_probe=probe,
175         liveness_probe=live_probe
176     )
177
178 def _create_deployment_object(component_name,
179                               containers,
180                               init_containers,
181                               replicas,
182                               volumes,
183                               labels={},
184                               pull_secrets=[]):
185
186     deployment_name = _create_deployment_name(component_name)
187
188     # Label the pod with the deployment name, so we can find it easily
189     labels.update({"k8sdeployment" : deployment_name})
190
191     # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
192     # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
193     ips = []
194     for secret in pull_secrets:
195         ips.append(client.V1LocalObjectReference(name=secret))
196
197     # Define pod template
198     template = client.V1PodTemplateSpec(
199         metadata=client.V1ObjectMeta(labels=labels),
200         spec=client.V1PodSpec(hostname=component_name,
201                               containers=containers,
202                               init_containers=init_containers,
203                               volumes=volumes,
204                               image_pull_secrets=ips)
205     )
206
207     # Define deployment spec
208     spec = client.ExtensionsV1beta1DeploymentSpec(
209         replicas=replicas,
210         template=template
211     )
212
213     # Create deployment object
214     deployment = client.ExtensionsV1beta1Deployment(
215         kind="Deployment",
216         metadata=client.V1ObjectMeta(name=deployment_name),
217         spec=spec
218     )
219
220     return deployment
221
222 def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type):
223     service_spec = client.V1ServiceSpec(
224         ports=service_ports,
225         selector={"app" : component_name},
226         type=service_type
227     )
228     if annotations:
229         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations)
230     else:
231         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels)
232
233     service = client.V1Service(
234         kind="Service",
235         api_version="v1",
236         metadata=metadata,
237         spec=service_spec
238     )
239     return service
240
241 def parse_ports(port_list):
242     '''
243     Parse the port list into a list of container ports (needed to create the container)
244     and to a set of port mappings to set up k8s services.
245     '''
246     container_ports = []
247     port_map = {}
248     for p in port_list:
249         m = PORTS.match(p.strip())
250         if m:
251             cport = int(m.group(1))
252             hport = int (m.group(4))
253             if m.group(3):
254                 proto = (m.group(3)).upper()
255             else:
256                 proto = "TCP"
257             container_ports.append((cport, proto))
258             port_map[(cport, proto)] = hport
259         else:
260             raise ValueError("Bad port specification: {0}".format(p))
261
262     return container_ports, port_map
263
264 def _parse_volumes(volume_list):
265     volumes = []
266     volume_mounts = []
267     for v in volume_list:
268         vname = str(uuid.uuid4())
269         vhost = v['host']['path']
270         vcontainer = v['container']['bind']
271         vro = (v['container'].get('mode') == 'ro')
272         volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
273         volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro))
274
275     return volumes, volume_mounts
276
277 def _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, log_info, filebeat):
278     if not log_info or not filebeat:
279         return
280     log_dir = log_info.get("log_directory")
281     if not log_dir:
282         return
283     sidecar_volume_mounts = []
284
285     # Create the volume for component log files and volume mounts for the component and sidecar containers
286     volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
287     volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
288     sc_path = log_info.get("alternate_fb_path") or "{0}/{1}".format(filebeat["log_path"], component_name)
289     sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
290
291     # Create the volume for sidecar data and the volume mount for it
292     volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
293     sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=filebeat["data_path"]))
294
295     # Create the volume for the sidecar configuration data and the volume mount for it
296     # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
297     volumes.append(
298         client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=filebeat["config_map"])))
299     sidecar_volume_mounts.append(
300         client.V1VolumeMount(name="filebeat-conf", mount_path=filebeat["config_path"], sub_path=filebeat["config_subpath"]))
301
302     # Finally create the container for the sidecar
303     containers.append(_create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts))
304
305 def _add_tls_init_container(init_containers, volumes, volume_mounts, tls_info, tls_config):
306     #   Adds an InitContainer to the pod to set up TLS certificate information.  For components that act as a
307     #   server(tls_info["use_tls"] is True), the InitContainer will populate a directory with server and CA certificate
308     #   materials in various formats.   For other components (tls_info["use_tls"] is False, or tls_info is not specified),
309     #   the InitContainer will populate a directory with CA certificate materials in PEM and JKS formats.
310     #   In either case, the certificate directory is mounted onto the component container filesystem at the location
311     #   specified by tls_info["component_cert_dir"], if present, otherwise at the configured default mount point
312     #   (tls_config["component_cert_dir"]).
313
314     cert_directory = tls_info.get("cert_directory") or tls_config.get("component_cert_dir")
315     env = {}
316     env["TLS_SERVER"] = "true" if tls_info.get("use_tls") else "false"
317
318     # Create the certificate volume and volume mounts
319     volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource()))
320     volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=cert_directory))
321     init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])]
322
323     # Create the init container
324     init_containers.append(_create_container_object("init-tls", tls_config["image"], False, volume_mounts=init_volume_mounts, env=env))
325
326 def _add_external_tls_init_container(init_containers, volumes, external_cert, external_tls_config):
327     env = {}
328     output_path = external_cert.get("external_cert_directory")
329     if not output_path.endswith('/'):
330        output_path += '/'
331
332     env["REQUEST_URL"] = external_tls_config.get("request_url")
333     env["REQUEST_TIMEOUT"] = external_tls_config.get("timeout")
334     env["OUTPUT_PATH"] = output_path + "external"
335     env["OUTPUT_TYPE"] = external_cert.get("cert_type")
336     env["CA_NAME"] = external_cert.get("ca_name")
337     env["COMMON_NAME"] = external_cert.get("external_certificate_parameters").get("common_name")
338     env["ORGANIZATION"] = external_tls_config.get("organization")
339     env["ORGANIZATION_UNIT"] = external_tls_config.get("organizational_unit")
340     env["LOCATION"] = external_tls_config.get("location")
341     env["STATE"] = external_tls_config.get("state")
342     env["COUNTRY"] = external_tls_config.get("country")
343     env["SANS"] = external_cert.get("external_certificate_parameters").get("sans")
344     env["KEYSTORE_PATH"] = KEYSTORE_PATH
345     env["KEYSTORE_PASSWORD"] = external_tls_config.get("keystore_password")
346     env["TRUSTSTORE_PATH"] = TRUSTSTORE_PATH
347     env["TRUSTSTORE_PASSWORD"] = external_tls_config.get("truststore_password")
348
349     # Create the volumes and volume mounts
350     sec = client.V1SecretVolumeSource(secret_name=CERT_SECRET_NAME)
351     volumes.append(client.V1Volume(name="tls-volume", secret=sec))
352     init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=external_cert.get("external_cert_directory")),
353                           client.V1VolumeMount(name="tls-volume", mount_path=MOUNT_PATH)]
354
355     # Create the init container
356     init_containers.append(_create_container_object("cert-service-client", external_tls_config["image_tag"], False, volume_mounts=init_volume_mounts, env=env))
357
358 def _process_port_map(port_map):
359     service_ports = []      # Ports exposed internally on the k8s network
360     exposed_ports = []      # Ports to be mapped to ports on the k8s nodes via NodePort
361     for (cport, proto), hport in port_map.items():
362         name = "xport-{0}-{1}".format(proto[0].lower(), cport)
363         cport = int(cport)
364         hport = int(hport)
365         service_ports.append(client.V1ServicePort(port=cport, protocol=proto, name=name[1:]))
366         if hport != 0:
367             exposed_ports.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name))
368     return service_ports, exposed_ports
369
370 def _service_exists(location, namespace, component_name):
371     exists = False
372     try:
373         _configure_api(location)
374         client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace)
375         exists = True
376     except client.rest.ApiException:
377         pass
378
379     return exists
380
381 def _patch_deployment(location, namespace, deployment, modify):
382     '''
383     Gets the current spec for 'deployment' in 'namespace'
384     in the k8s cluster at 'location',
385     uses the 'modify' function to change the spec,
386     then sends the updated spec to k8s.
387     '''
388     _configure_api(location)
389
390     # Get deployment spec
391     spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
392
393     # Apply changes to spec
394     spec = modify(spec)
395
396     # Patch the deploy with updated spec
397     client.ExtensionsV1beta1Api().patch_namespaced_deployment(deployment, namespace, spec)
398
399 def _execute_command_in_pod(location, namespace, pod_name, command):
400     '''
401     Execute the command (specified by an argv-style list in  the "command" parameter) in
402     the specified pod in the specified namespace at the specified location.
403     For now at least, we use this only to
404     run a notification script in a pod after a configuration change.
405
406     The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec".
407     Unfortunately due to a bug/limitation in the Python client library, we can't call it directly.
408     We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API.
409     I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py.
410     There are several issues tracking this, in various states.  It isn't clear that there will ever
411     be a fix.
412         - https://github.com/kubernetes-client/python/issues/58
413         - https://github.com/kubernetes-client/python/issues/409
414         - https://github.com/kubernetes-client/python/issues/526
415
416     The main consequence of the workaround using "stream" is that the caller does not get an indication
417     of the exit code returned by the command when it completes execution.   It turns out that the
418     original implementation of notification in the Docker plugin did not use this result, so we can
419     still match the original notification functionality.
420
421     The "stream" approach returns a string containing any output sent by the command to stdout or stderr.
422     We'll return that so it can logged.
423     '''
424     _configure_api(location)
425     try:
426         output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec,
427                              name=pod_name,
428                              namespace=namespace,
429                              command=command,
430                              stdout=True,
431                              stderr=True,
432                             stdin=False,
433                             tty=False)
434     except client.rest.ApiException as e:
435         # If the exception indicates the pod wasn't found,  it's not a fatal error.
436         # It existed when we enumerated the pods for the deployment but no longer exists.
437         # Unfortunately, the only way to distinguish a pod not found from any other error
438         # is by looking at the reason text.
439         # (The ApiException's "status" field should contain the HTTP status code, which would
440         # be 404 if the pod isn't found, but empirical testing reveals that "status" is set
441         # to zero.)
442         if "404 not found" in e.reason.lower():
443             output = "Pod not found"
444         else:
445             raise e
446
447     return {"pod" : pod_name, "output" : output}
448
449 def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs):
450     '''
451     This will create a k8s Deployment and, if needed, one or two k8s Services.
452     (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
453     We're not exposing k8s to the component developer and the blueprint author.
454     This is a conscious choice.  We want to use k8s in a controlled, consistent way, and we want to hide
455     the details from the component developer and the blueprint author.)
456
457     namespace:  the Kubernetes namespace into which the component is deployed
458     component_name:  the component name, used to derive names of Kubernetes entities
459     image: the docker image for the component being deployed
460     replica: the number of instances of the component to be deployed
461     always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of
462        the Docker image for the component, even if it is already present on the Kubernetes node.
463     k8sconfig contains:
464         - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images.
465           (DON'T PANIC:  these are just the names of secrets held in the Kubernetes secret store.)
466         - filebeat: a dictionary of filebeat sidecar parameters:
467             "log_path" : mount point for log volume in filebeat container
468             "data_path" : mount point for data volume in filebeat container
469             "config_path" : mount point for config volume in filebeat container
470             "config_subpath" :  subpath for config data in filebeat container
471             "config_map" : ConfigMap holding the filebeat configuration
472             "image": Docker image to use for filebeat
473         - tls: a dictionary of TLS-related information:
474             "cert_path": mount point for certificate volume in init container
475             "image": Docker image to use for TLS init container
476             "component_cert_dir" : default mount point for certs
477     kwargs may have:
478         - volumes:  array of volume objects, where a volume object is:
479             {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
480         - ports: array of strings in the form "container_port:host_port"
481         - env: map of name-value pairs ( {name0: value0, name1: value1...}
482         - log_info: an object with info for setting up ELK logging, with the form:
483             {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}
484         - tls_info: an object with info for setting up TLS (HTTPS), with the form:
485             {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" }
486         - external_cert: an object with information for setting up the init container for external certificates creation, with the form:
487             {"external_cert":
488                 "external_cert_directory": "/path/to/directory_where_certs_should_be_placed",
489                 "use_external_tls": true or false,
490                 "ca_name": "ca-name-value",
491                 "cert_type": "P12" or "JKS" or "PEM",
492                 "external_certificate_parameters":
493                     "common_name": "common-name-value",
494                     "sans": "sans-value"}
495         - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"}
496             These label will be set on all the pods deployed as a result of this deploy() invocation.
497         - resources: dict with optional "limits" and "requests" resource requirements, each a dict containing:
498             - cpu:    number CPU usage, like 0.5
499             - memory: string memory requirement, like "2Gi"
500         - readiness: dict with health check info; if present, used to create a readiness probe for the main container.  Includes:
501             - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
502             - interval: period (in seconds) between probes
503             - timeout:  time (in seconds) to allow a probe to complete
504             - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types
505             - path: the full path to the script to be executed in the container for "script" and "docker" types
506         - liveness: dict with health check info; if present, used to create a liveness probe for the main container.  Includes:
507             - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
508             - interval: period (in seconds) between probes
509             - timeout:  time (in seconds) to allow a probe to complete
510             - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types
511             - path: the full path to the script to be executed in the container for "script" and "docker" types
512         - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
513
514     '''
515
516     deployment_ok = False
517     cip_service_created = False
518     deployment_description = {
519         "namespace": namespace,
520         "location" : kwargs.get("k8s_location"),
521         "deployment": '',
522         "services": []
523     }
524
525     try:
526
527         # Get API handles
528         _configure_api(kwargs.get("k8s_location"))
529         core = client.CoreV1Api()
530         ext = client.ExtensionsV1beta1Api()
531
532         # Parse the port mapping
533         container_ports, port_map = parse_ports(kwargs.get("ports", []))
534
535         # Parse the volumes list into volumes and volume_mounts for the deployment
536         volumes, volume_mounts = _parse_volumes(kwargs.get("volumes", []))
537
538         # Initialize the list of containers that will be part of the pod
539         containers = []
540         init_containers = []
541
542         # Set up the ELK logging sidecar container, if needed
543         _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"), k8sconfig.get("filebeat"))
544
545         # Set up TLS information
546         _add_tls_init_container(init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {}, k8sconfig.get("tls"))
547
548         # Set up external TLS information
549         external_cert = kwargs.get("external_cert")
550         if external_cert and external_cert.get("use_external_tls"):
551             _add_external_tls_init_container(init_containers, volumes, external_cert, k8sconfig.get("external_cert"))
552
553         # Create the container for the component
554         # Make it the first container in the pod
555         container_args = {key: kwargs.get(key) for key in ("env", "readiness", "liveness", "resources")}
556         container_args['container_ports'] = container_ports
557         container_args['volume_mounts'] = volume_mounts
558         containers.insert(0, _create_container_object(component_name, image, always_pull, **container_args))
559
560         # Build the k8s Deployment object
561         labels = kwargs.get("labels", {})
562         labels["app"] = component_name
563         dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"])
564
565         # Have k8s deploy it
566         ext.create_namespaced_deployment(namespace, dep)
567         deployment_ok = True
568         deployment_description["deployment"] = _create_deployment_name(component_name)
569
570         # Create service(s), if a port mapping is specified
571         if port_map:
572             service_ports, exposed_ports = _process_port_map(port_map)
573
574             # Create a ClusterIP service for access via the k8s network
575             service = _create_service_object(_create_service_name(component_name), component_name, service_ports, None, labels, "ClusterIP")
576             core.create_namespaced_service(namespace, service)
577             cip_service_created = True
578             deployment_description["services"].append(_create_service_name(component_name))
579
580             # If there are ports to be exposed on the k8s nodes, create a "NodePort" service
581             if exposed_ports:
582                 exposed_service = \
583                     _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, '', labels, "NodePort")
584                 core.create_namespaced_service(namespace, exposed_service)
585                 deployment_description["services"].append(_create_exposed_service_name(component_name))
586
587     except Exception as e:
588         # If the ClusterIP service was created, delete the service:
589         if cip_service_created:
590             core.delete_namespaced_service(_create_service_name(component_name), namespace)
591         # If the deployment was created but not the service, delete the deployment
592         if deployment_ok:
593             client.ExtensionsV1beta1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, body=client.V1DeleteOptions())
594         raise e
595
596     return dep, deployment_description
597
598 def undeploy(deployment_description):
599     _configure_api(deployment_description["location"])
600
601     namespace = deployment_description["namespace"]
602
603     # remove any services associated with the component
604     for service in deployment_description["services"]:
605         client.CoreV1Api().delete_namespaced_service(service, namespace)
606
607     # Have k8s delete the underlying pods and replicaset when deleting the deployment.
608     options = client.V1DeleteOptions(propagation_policy="Foreground")
609     client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options)
610
611 def is_available(location, namespace, component_name):
612     _configure_api(location)
613     dep_status = client.AppsV1beta1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace)
614     # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec
615     # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation
616     return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas
617
618 def scale(deployment_description, replicas):
619     ''' Trigger a scaling operation by updating the replica count for the Deployment '''
620
621     def update_replica_count(spec):
622         spec.spec.replicas = replicas
623         return spec
624
625     _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_replica_count)
626
627 def upgrade(deployment_description, image, container_index = 0):
628     ''' Trigger a rolling upgrade by sending a new image name/tag to k8s '''
629
630     def update_image(spec):
631         spec.spec.template.spec.containers[container_index].image = image
632         return spec
633
634     _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_image)
635
636 def rollback(deployment_description, rollback_to=0):
637     '''
638     Undo upgrade by rolling back to a previous revision of the deployment.
639     By default, go back one revision.
640     rollback_to can be used to supply a specific revision number.
641     Returns the image for the app container and the replica count from the rolled-back deployment
642     '''
643     '''
644     2018-07-13
645     Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method.
646     The k8s python client code throws an exception while processing the response from the API.
647     See:
648        - https://github.com/kubernetes-client/python/issues/491
649        - https://github.com/kubernetes/kubernetes/pull/63837
650     The fix has been merged into the master branch but is not in the latest release.
651     '''
652     _configure_api(deployment_description["location"])
653     deployment = deployment_description["deployment"]
654     namespace = deployment_description["namespace"]
655
656     # Initiate the rollback
657     client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback(
658         deployment,
659         namespace,
660         client.AppsV1beta1DeploymentRollback(name=deployment, rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to)))
661
662     # Read back the spec for the rolled-back deployment
663     spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
664     return spec.spec.template.spec.containers[0].image, spec.spec.replicas
665
666 def execute_command_in_deployment(deployment_description, command):
667     '''
668     Enumerates the pods in the k8s deployment identified by "deployment_description",
669     then executes the command (represented as an argv-style list) in "command" in
670     container 0 (the main application container) each of those pods.
671
672     Note that the sets of pods associated with a deployment can change over time.  The
673     enumeration is a snapshot at one point in time.  The command will not be executed in
674     pods that are created after the initial enumeration.   If a pod disappears after the
675     initial enumeration and before the command is executed, the attempt to execute the
676     command will fail.  This is not treated as a fatal error.
677
678     This approach is reasonable for the one current use case for "execute_command":  running a
679     script to notify a container that its configuration has changed as a result of a
680     policy change.  In this use case, the new configuration information is stored into
681     the configuration store (Consul), the pods are enumerated, and the command is executed.
682     If a pod disappears after the enumeration, the fact that the command cannot be run
683     doesn't matter--a nonexistent pod doesn't need to be reconfigured.  Similarly, a pod that
684     comes up after the enumeration will get its initial configuration from the updated version
685     in Consul.
686
687     The optimal solution here would be for k8s to provide an API call to execute a command in
688     all of the pods for a deployment.   Unfortunately, k8s does not provide such a call--the
689     only call provided by k8s operates at the pod level, not the deployment level.
690
691     Another interesting k8s factoid: there's no direct way to list the pods belong to a
692     particular k8s deployment.   The deployment code above sets a label ("k8sdeployment") on
693     the pod that has the k8s deployment name.  To list the pods, the code below queries for
694     pods with the label carrying the deployment name.
695     '''
696     location = deployment_description["location"]
697     _configure_api(location)
698     deployment = deployment_description["deployment"]
699     namespace = deployment_description["namespace"]
700
701     # Get names of all the running pods belonging to the deployment
702     pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod(
703         namespace = namespace,
704         label_selector = "k8sdeployment={0}".format(deployment),
705         field_selector = "status.phase=Running"
706     ).items]
707
708     # Execute command in the running pods
709     return [_execute_command_in_pod(location, namespace, pod_name, command)
710             for pod_name in pod_names]