Upgrade k8s to use configMap
[dcaegen2/platform/plugins.git] / k8s / k8sclient / k8sclient.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # Copyright (c) 2020-2021 Nokia. All rights reserved.
7 # Copyright (c) 2020 J. F. Lucas.  All rights reserved.
8 # ================================================================================
9 # Licensed under the Apache License, Version 2.0 (the "License");
10 # you may not use this file except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #      http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 # ============LICENSE_END=========================================================
21 #
22 import os
23 import re
24 import uuid
25
26 from kubernetes import config, client, stream
27
28 # Default values for readiness probe
29 PROBE_DEFAULT_PERIOD = 15
30 PROBE_DEFAULT_TIMEOUT = 1
31
32 # Location of k8s cluster config file ("kubeconfig")
33 K8S_CONFIG_PATH = "/opt/onap/kube/kubeconfig"
34
35 # Regular expression for interval/timeout specification
36 INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$")
37 # Conversion factors to seconds
38 FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600}
39
40 # Regular expression for port mapping
41 # group 1: container port
42 # group 2: / + protocol
43 # group 3: protocol
44 # group 4: host port
45 PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$")
46
47 # Constants for external_cert
48 MOUNT_PATH = "/etc/onap/oom/certservice/certs/"
49 KEYSTORE_PATH = MOUNT_PATH + "certServiceClient-keystore.jks"
50 TRUSTSTORE_PATH = MOUNT_PATH + "truststore.jks"
51 DEFAULT_CERT_TYPE = "p12"
52
53
54 def _create_deployment_name(component_name):
55     return "dep-{0}".format(component_name)[:63]
56
57
58 def _create_service_name(component_name):
59     return "{0}".format(component_name)[:63]
60
61
62 def _create_exposed_service_name(component_name):
63     return ("x{0}".format(component_name))[:63]
64
65
66 def _create_exposed_v6_service_name(component_name):
67     return ("x{0}-ipv6".format(component_name))[:63]
68
69
70 def _configure_api(location=None):
71     # Look for a kubernetes config file
72     if os.path.exists(K8S_CONFIG_PATH):
73         config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False)
74     else:
75         # Maybe we're running in a k8s container and we can use info provided by k8s
76         # We would like to use:
77         # config.load_incluster_config()
78         # but this looks into os.environ for kubernetes host and port, and from
79         # the plugin those aren't visible.   So we use the InClusterConfigLoader class,
80         # where we can set the environment to what we like.
81         # This is probably brittle!  Maybe there's a better alternative.
82         localenv = {
83             config.incluster_config.SERVICE_HOST_ENV_NAME: "kubernetes.default.svc.cluster.local",
84             config.incluster_config.SERVICE_PORT_ENV_NAME: "443"
85         }
86         config.incluster_config.InClusterConfigLoader(
87             token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME,
88             cert_filename=config.incluster_config.SERVICE_CERT_FILENAME,
89             environ=localenv
90         ).load_and_set()
91
92
93 def _parse_interval(t):
94     """
95     Parse an interval specification
96     t can be
97        - a simple integer quantity, interpreted as seconds
98        - a string representation of a decimal integer, interpreted as seconds
99        - a string consisting of a represention of an decimal integer followed by a unit,
100          with "s" representing seconds, "m" representing minutes,
101          and "h" representing hours
102     Used for compatibility with the Docker plugin, where time intervals
103     for health checks were specified as strings with a number and a unit.
104     See 'intervalspec' above for the regular expression that's accepted.
105     """
106     m = INTERVAL_SPEC.match(str(t))
107     if m:
108         time = int(m.group(1)) * FACTORS[m.group(2)]
109     else:
110         raise ValueError("Bad interval specification: {0}".format(t))
111     return time
112
113
114 def _create_probe(hc, port):
115     """ Create a Kubernetes probe based on info in the health check dictionary hc """
116     probe_type = hc['type']
117     probe = None
118     period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD))
119     timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT))
120     if probe_type in ['http', 'https']:
121         probe = client.V1Probe(
122             failure_threshold=1,
123             initial_delay_seconds=5,
124             period_seconds=period,
125             timeout_seconds=timeout,
126             http_get=client.V1HTTPGetAction(
127                 path=hc['endpoint'],
128                 port=port,
129                 scheme=probe_type.upper()
130             )
131         )
132     elif probe_type in ['script', 'docker']:
133         probe = client.V1Probe(
134             failure_threshold=1,
135             initial_delay_seconds=5,
136             period_seconds=period,
137             timeout_seconds=timeout,
138             _exec=client.V1ExecAction(
139                 command=hc['script'].split()
140             )
141         )
142     return probe
143
144
145 def _create_resources(resources=None):
146     if resources is not None:
147         resources_obj = client.V1ResourceRequirements(
148             limits=resources.get("limits"),
149             requests=resources.get("requests")
150         )
151         return resources_obj
152     else:
153         return None
154
155
156 def _create_container_object(name, image, always_pull, **kwargs):
157     # Set up environment variables
158     # Copy any passed in environment variables
159     env = kwargs.get('env') or {}
160     env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env]
161     # Add POD_IP with the IP address of the pod running the container
162     pod_ip = client.V1EnvVarSource(field_ref=client.V1ObjectFieldSelector(field_path="status.podIP"))
163     env_vars.append(client.V1EnvVar(name="POD_IP", value_from=pod_ip))
164
165     # If a health check is specified, create a readiness/liveness probe
166     # (For an HTTP-based check, we assume it's at the first container port)
167     readiness = kwargs.get('readiness')
168     liveness = kwargs.get('liveness')
169     resources = kwargs.get('resources')
170     container_ports = kwargs.get('container_ports') or []
171
172     hc_port = container_ports[0][0] if container_ports else None
173     probe = _create_probe(readiness, hc_port) if readiness else None
174     live_probe = _create_probe(liveness, hc_port) if liveness else None
175     resources_obj = _create_resources(resources) if resources else None
176     port_objs = [client.V1ContainerPort(container_port=port, protocol=proto)
177                  for port, proto in container_ports]
178
179     # Define container for pod
180     return client.V1Container(
181         name=name,
182         image=image,
183         image_pull_policy='Always' if always_pull else 'IfNotPresent',
184         env=env_vars,
185         ports=port_objs,
186         volume_mounts=kwargs.get('volume_mounts') or [],
187         resources=resources_obj,
188         readiness_probe=probe,
189         liveness_probe=live_probe
190     )
191
192
193 def _create_deployment_object(component_name,
194                               containers,
195                               init_containers,
196                               replicas,
197                               volumes,
198                               labels=None,
199                               pull_secrets=None):
200     if labels is None:
201         labels = {}
202     if pull_secrets is None:
203         pull_secrets = []
204     deployment_name = _create_deployment_name(component_name)
205
206     # Label the pod with the deployment name, so we can find it easily
207     labels.update({"k8sdeployment": deployment_name})
208
209     # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
210     # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
211     ips = []
212     for secret in pull_secrets:
213         ips.append(client.V1LocalObjectReference(name=secret))
214
215     # Define pod template
216     template = client.V1PodTemplateSpec(
217         metadata=client.V1ObjectMeta(labels=labels),
218         spec=client.V1PodSpec(hostname=component_name,
219                               containers=containers,
220                               init_containers=init_containers,
221                               volumes=volumes,
222                               image_pull_secrets=ips)
223     )
224
225     # Define deployment spec
226     spec = client.V1DeploymentSpec(
227         replicas=replicas,
228         selector=client.V1LabelSelector(match_labels=labels),
229         template=template
230     )
231
232     # Create deployment object
233     deployment = client.V1Deployment(
234         api_version="apps/v1",
235         kind="Deployment",
236         metadata=client.V1ObjectMeta(name=deployment_name, labels=labels),
237         spec=spec
238     )
239
240     return deployment
241
242
243 def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type, ip_family):
244     service_spec = client.V1ServiceSpec(
245         ports=service_ports,
246         selector={"app": component_name},
247         type=service_type,
248         ip_family=ip_family
249     )
250     if annotations:
251         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations)
252     else:
253         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels)
254
255     service = client.V1Service(
256         kind="Service",
257         api_version="v1",
258         metadata=metadata,
259         spec=service_spec
260     )
261     return service
262
263
264 def parse_ports(port_list):
265     """
266     Parse the port list into a list of container ports (needed to create the container)
267     and to a set of port mappings to set up k8s services.
268     """
269     container_ports = []
270     port_map = {}
271     for p in port_list:
272         ipv6 = False
273         if type(p) is dict:
274             ipv6 = "ipv6" in p and p['ipv6']
275             p = "".join(str(v) for v in p['concat'])
276         m = PORTS.match(p.strip())
277         if m:
278             cport = int(m.group(1))
279             hport = int(m.group(4))
280             if m.group(3):
281                 proto = (m.group(3)).upper()
282             else:
283                 proto = "TCP"
284             port = (cport, proto)
285             if port not in container_ports:
286                 container_ports.append(port)
287             port_map[(cport, proto, ipv6)] = hport
288         else:
289             raise ValueError("Bad port specification: {0}".format(p))
290
291     return container_ports, port_map
292
293
294 def _parse_volumes(volume_list):
295     volumes = []
296     volume_mounts = []
297     for v in volume_list:
298         vname = str(uuid.uuid4())
299         vcontainer = v['container']['bind']
300         vro = (v['container'].get('mode') == 'ro')
301         if 'host' in v:
302             vhost = v['host']['path']
303             volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
304         if 'config_volume' in v:
305             vconfig_volume = v['config_volume']['name']
306             volumes.append(client.V1Volume(name=vname, config_map=client.V1ConfigMapVolumeSource(default_mode="0644",
307                                                                                                  name=vconfig_volume,
308                                                                                                  optional=True)))
309         volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro))
310
311     return volumes, volume_mounts
312
313
314 def _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, log_info, filebeat):
315     if not log_info or not filebeat:
316         return
317     log_dir = log_info.get("log_directory")
318     if not log_dir:
319         return
320     sidecar_volume_mounts = []
321
322     # Create the volume for component log files and volume mounts for the component and sidecar containers
323     volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
324     volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
325     sc_path = log_info.get("alternate_fb_path") or "{0}/{1}".format(filebeat["log_path"], component_name)
326     sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
327
328     # Create the volume for sidecar data and the volume mount for it
329     volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
330     sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=filebeat["data_path"]))
331
332     # Create the volume for the sidecar configuration data and the volume mount for it
333     # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
334     volumes.append(
335         client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=filebeat["config_map"])))
336     sidecar_volume_mounts.append(
337         client.V1VolumeMount(name="filebeat-conf", mount_path=filebeat["config_path"],
338                              sub_path=filebeat["config_subpath"]))
339
340     # Finally create the container for the sidecar
341     containers.append(
342         _create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts))
343
344
345 def _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, tls_info, tls_config):
346     # Adds an InitContainer to the pod to set up TLS certificate information.  For components that act as a server(
347     # tls_info["use_tls"] is True), the InitContainer will populate a directory with server and CA certificate
348     # materials in various formats.   For other components (tls_info["use_tls"] is False, or tls_info is not
349     # specified), the InitContainer will populate a directory with CA certificate materials in PEM and JKS formats.
350     # In either case, the certificate directory is mounted onto the component container filesystem at the location
351     # specified by tls_info["component_cert_dir"], if present, otherwise at the configured default mount point (
352     # tls_config["component_cert_dir"]).
353     docker_image = tls_config["image"]
354     ctx.logger.info("Creating init container: TLS \n  * [" + docker_image + "]")
355
356     cert_directory = tls_info.get("cert_directory") or tls_config.get("component_cert_dir")
357     env = {}
358     env["TLS_SERVER"] = "true" if tls_info.get("use_tls") else "false"
359
360     # Create the certificate volume and volume mounts
361     volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource()))
362     volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=cert_directory))
363     init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])]
364
365     # Create the init container
366     init_containers.append(
367         _create_container_object("init-tls", docker_image, False, volume_mounts=init_volume_mounts, env=env))
368
369
370 def _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, external_tls_config):
371     # Adds an InitContainer to the pod which will generate external TLS certificates.
372     docker_image = external_tls_config["image_tag"]
373     ctx.logger.info("Creating init container: external TLS \n  * [" + docker_image + "]")
374
375     env = {}
376     output_path = external_cert.get("external_cert_directory")
377     if not output_path.endswith('/'):
378         output_path += '/'
379
380     env["REQUEST_URL"] = external_tls_config.get("request_url")
381     env["REQUEST_TIMEOUT"] = external_tls_config.get("timeout")
382     env["OUTPUT_PATH"] = output_path + "external"
383     env["OUTPUT_TYPE"] = external_cert.get("cert_type")
384     env["CA_NAME"] = external_cert.get("ca_name")
385     env["COMMON_NAME"] = external_cert.get("external_certificate_parameters").get("common_name")
386     env["ORGANIZATION"] = external_tls_config.get("organization")
387     env["ORGANIZATION_UNIT"] = external_tls_config.get("organizational_unit")
388     env["LOCATION"] = external_tls_config.get("location")
389     env["STATE"] = external_tls_config.get("state")
390     env["COUNTRY"] = external_tls_config.get("country")
391     env["SANS"] = external_cert.get("external_certificate_parameters").get("sans")
392     env["KEYSTORE_PATH"] = KEYSTORE_PATH
393     env["KEYSTORE_PASSWORD"] = external_tls_config.get("keystore_password")
394     env["TRUSTSTORE_PATH"] = TRUSTSTORE_PATH
395     env["TRUSTSTORE_PASSWORD"] = external_tls_config.get("truststore_password")
396
397     # Create the volumes and volume mounts
398     sec = client.V1SecretVolumeSource(secret_name=external_tls_config.get("cert_secret_name"))
399     volumes.append(client.V1Volume(name="tls-volume", secret=sec))
400     init_volume_mounts = [
401         client.V1VolumeMount(name="tls-info", mount_path=external_cert.get("external_cert_directory")),
402         client.V1VolumeMount(name="tls-volume", mount_path=MOUNT_PATH)]
403
404     # Create the init container
405     init_containers.append(
406         _create_container_object("cert-service-client", docker_image, False, volume_mounts=init_volume_mounts, env=env))
407
408
409 def _add_cert_post_processor_init_container(ctx, init_containers, tls_info, tls_config, external_cert,
410                                             cert_post_processor_config):
411     # Adds an InitContainer to the pod to merge TLS and external TLS truststore into single file.
412     docker_image = cert_post_processor_config["image_tag"]
413     ctx.logger.info("Creating init container: cert post processor \n  * [" + docker_image + "]")
414
415     tls_cert_dir = tls_info.get("cert_directory") or tls_config.get("component_cert_dir")
416     if not tls_cert_dir.endswith('/'):
417         tls_cert_dir += '/'
418
419     tls_cert_file_path = tls_cert_dir + "trust.jks"
420     tls_cert_file_pass = tls_cert_dir + "trust.pass"
421
422     ext_cert_dir = tls_cert_dir + "external/"
423
424     output_type = (external_cert.get("cert_type") or DEFAULT_CERT_TYPE).lower()
425     ext_truststore_path = ext_cert_dir + "truststore." + _get_file_extension(output_type)
426     ext_truststore_pass = ''
427     if output_type != 'pem':
428         ext_truststore_pass = ext_cert_dir + "truststore.pass"
429
430     env = {"TRUSTSTORES_PATHS": tls_cert_file_path + ":" + ext_truststore_path,
431            "TRUSTSTORES_PASSWORDS_PATHS": tls_cert_file_pass + ":" + ext_truststore_pass,
432            "KEYSTORE_SOURCE_PATHS": _get_keystore_source_paths(output_type, ext_cert_dir),
433            "KEYSTORE_DESTINATION_PATHS": _get_keystore_destination_paths(output_type, tls_cert_dir)}
434
435     ctx.logger.info("TRUSTSTORES_PATHS:            " + env["TRUSTSTORES_PATHS"])
436     ctx.logger.info("TRUSTSTORES_PASSWORDS_PATHS:  " + env["TRUSTSTORES_PASSWORDS_PATHS"])
437     ctx.logger.info("KEYSTORE_SOURCE_PATHS:        " + env["KEYSTORE_SOURCE_PATHS"])
438     ctx.logger.info("KEYSTORE_DESTINATION_PATHS:   " + env["KEYSTORE_DESTINATION_PATHS"])
439
440     # Create the volumes and volume mounts
441     init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_cert_dir)]
442
443     # Create the init container
444     init_containers.append(
445         _create_container_object("cert-post-processor", docker_image, False, volume_mounts=init_volume_mounts, env=env))
446
447
448 def _get_file_extension(output_type):
449     return {
450         'p12': 'p12',
451         'pem': 'pem',
452         'jks': 'jks',
453     }[output_type]
454
455
456 def _get_keystore_source_paths(output_type, ext_cert_dir):
457     source_paths_template = {
458         'p12': "{0}keystore.p12:{0}keystore.pass",
459         'jks': "{0}keystore.jks:{0}keystore.pass",
460         'pem': "{0}keystore.pem:{0}key.pem",
461     }[output_type]
462     return source_paths_template.format(ext_cert_dir)
463
464
465 def _get_keystore_destination_paths(output_type, tls_cert_dir):
466     destination_paths_template = {
467         'p12': "{0}cert.p12:{0}p12.pass",
468         'jks': "{0}cert.jks:{0}jks.pass",
469         'pem': "{0}cert.pem:{0}key.pem",
470     }[output_type]
471     return destination_paths_template.format(tls_cert_dir)
472
473
474 def _process_port_map(port_map):
475     service_ports = []  # Ports exposed internally on the k8s network
476     exposed_ports = []  # Ports to be mapped to ports on the k8s nodes via NodePort
477     exposed_ports_ipv6 = []
478     for (cport, proto, ipv6), hport in port_map.items():
479         name = "xport-{0}-{1}".format(proto[0].lower(), cport)
480         cport = int(cport)
481         hport = int(hport)
482         port = client.V1ServicePort(port=cport, protocol=proto, name=name[1:])
483         if port not in service_ports:
484             service_ports.append(port)
485         if hport != 0:
486             if ipv6:
487                 exposed_ports_ipv6.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name))
488             else:
489                 exposed_ports.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name))
490     return service_ports, exposed_ports, exposed_ports_ipv6
491
492
493 def _service_exists(location, namespace, component_name):
494     exists = False
495     try:
496         _configure_api(location)
497         client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace)
498         exists = True
499     except client.rest.ApiException:
500         pass
501
502     return exists
503
504
505 def _patch_deployment(location, namespace, deployment, modify):
506     '''
507     Gets the current spec for 'deployment' in 'namespace'
508     in the k8s cluster at 'location',
509     uses the 'modify' function to change the spec,
510     then sends the updated spec to k8s.
511     '''
512     _configure_api(location)
513
514     # Get deployment spec
515     spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace)
516
517     # Apply changes to spec
518     spec = modify(spec)
519
520     # Patch the deploy with updated spec
521     client.AppsV1Api().patch_namespaced_deployment(deployment, namespace, spec)
522
523
524 def _execute_command_in_pod(location, namespace, pod_name, command):
525     '''
526     Execute the command (specified by an argv-style list in  the "command" parameter) in
527     the specified pod in the specified namespace at the specified location.
528     For now at least, we use this only to
529     run a notification script in a pod after a configuration change.
530
531     The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec".
532     Unfortunately due to a bug/limitation in the Python client library, we can't call it directly.
533     We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API.
534     I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py.
535     There are several issues tracking this, in various states.  It isn't clear that there will ever
536     be a fix.
537         - https://github.com/kubernetes-client/python/issues/58
538         - https://github.com/kubernetes-client/python/issues/409
539         - https://github.com/kubernetes-client/python/issues/526
540
541     The main consequence of the workaround using "stream" is that the caller does not get an indication
542     of the exit code returned by the command when it completes execution.   It turns out that the
543     original implementation of notification in the Docker plugin did not use this result, so we can
544     still match the original notification functionality.
545
546     The "stream" approach returns a string containing any output sent by the command to stdout or stderr.
547     We'll return that so it can logged.
548     '''
549     _configure_api(location)
550     try:
551         output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec,
552                                name=pod_name,
553                                namespace=namespace,
554                                command=command,
555                                stdout=True,
556                                stderr=True,
557                                stdin=False,
558                                tty=False)
559     except client.rest.ApiException as e:
560         # If the exception indicates the pod wasn't found,  it's not a fatal error.
561         # It existed when we enumerated the pods for the deployment but no longer exists.
562         # Unfortunately, the only way to distinguish a pod not found from any other error
563         # is by looking at the reason text.
564         # (The ApiException's "status" field should contain the HTTP status code, which would
565         # be 404 if the pod isn't found, but empirical testing reveals that "status" is set
566         # to zero.)
567         if "404 not found" in e.reason.lower():
568             output = "Pod not found"
569         else:
570             raise e
571
572     return {"pod": pod_name, "output": output}
573
574
575 def deploy(ctx, namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs):
576     """
577     This will create a k8s Deployment and, if needed, one or two k8s Services.
578     (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
579     We're not exposing k8s to the component developer and the blueprint author.
580     This is a conscious choice.  We want to use k8s in a controlled, consistent way, and we want to hide
581     the details from the component developer and the blueprint author.)
582
583     namespace:  the Kubernetes namespace into which the component is deployed
584     component_name:  the component name, used to derive names of Kubernetes entities
585     image: the docker image for the component being deployed
586     replica: the number of instances of the component to be deployed
587     always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of
588        the Docker image for the component, even if it is already present on the Kubernetes node.
589     k8sconfig contains:
590         - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images.
591           (DON'T PANIC:  these are just the names of secrets held in the Kubernetes secret store.)
592         - filebeat: a dictionary of filebeat sidecar parameters:
593             "log_path" : mount point for log volume in filebeat container
594             "data_path" : mount point for data volume in filebeat container
595             "config_path" : mount point for config volume in filebeat container
596             "config_subpath" :  subpath for config data in filebeat container
597             "config_map" : ConfigMap holding the filebeat configuration
598             "image": Docker image to use for filebeat
599         - tls: a dictionary of TLS-related information:
600             "cert_path": mount point for certificate volume in init container
601             "image": Docker image to use for TLS init container
602             "component_cert_dir" : default mount point for certs
603         - cert_post_processor: a dictionary of cert_post_processor information:
604             "image_tag": docker image to use for cert-post-processor init container
605     kwargs may have:
606         - volumes:  array of volume objects, where a volume object is:
607             {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
608         - ports: array of strings in the form "container_port:host_port"
609         - env: map of name-value pairs ( {name0: value0, name1: value1...}
610         - log_info: an object with info for setting up ELK logging, with the form:
611             {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}
612         - tls_info: an object with info for setting up TLS (HTTPS), with the form:
613             {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" }
614         - external_cert: an object with information for setting up the init container for external certificates creation, with the form:
615             {"external_cert":
616                 "external_cert_directory": "/path/to/directory_where_certs_should_be_placed",
617                 "use_external_tls": true or false,
618                 "ca_name": "ca-name-value",
619                 "cert_type": "P12" or "JKS" or "PEM",
620                 "external_certificate_parameters":
621                     "common_name": "common-name-value",
622                     "sans": "sans-value"}
623         - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"}
624             These label will be set on all the pods deployed as a result of this deploy() invocation.
625         - resources: dict with optional "limits" and "requests" resource requirements, each a dict containing:
626             - cpu:    number CPU usage, like 0.5
627             - memory: string memory requirement, like "2Gi"
628         - readiness: dict with health check info; if present, used to create a readiness probe for the main container.  Includes:
629             - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
630             - interval: period (in seconds) between probes
631             - timeout:  time (in seconds) to allow a probe to complete
632             - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types
633             - path: the full path to the script to be executed in the container for "script" and "docker" types
634         - liveness: dict with health check info; if present, used to create a liveness probe for the main container.  Includes:
635             - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
636             - interval: period (in seconds) between probes
637             - timeout:  time (in seconds) to allow a probe to complete
638             - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types
639             - path: the full path to the script to be executed in the container for "script" and "docker" types
640         - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
641
642     """
643
644     deployment_ok = False
645     cip_service_created = False
646     deployment_description = {
647         "namespace": namespace,
648         "location": kwargs.get("k8s_location"),
649         "deployment": '',
650         "services": []
651     }
652
653     try:
654
655         # Get API handles
656         _configure_api(kwargs.get("k8s_location"))
657         core = client.CoreV1Api()
658         k8s_apps_v1_api_client = client.AppsV1Api()
659
660         # Parse the port mapping
661         container_ports, port_map = parse_ports(kwargs.get("ports", []))
662
663         # Parse the volumes list into volumes and volume_mounts for the deployment
664         volumes, volume_mounts = _parse_volumes(kwargs.get("volumes", []))
665
666         # Initialize the list of containers that will be part of the pod
667         containers = []
668         init_containers = []
669
670         # Set up the ELK logging sidecar container, if needed
671         _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"),
672                                  k8sconfig.get("filebeat"))
673
674         # Set up TLS information
675         _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {},
676                                 k8sconfig.get("tls"))
677
678         # Set up external TLS information
679         external_cert = kwargs.get("external_cert")
680         if external_cert and external_cert.get("use_external_tls"):
681             _add_external_tls_init_container(ctx, init_containers, volumes, external_cert,
682                                              k8sconfig.get("external_cert"))
683             _add_cert_post_processor_init_container(ctx, init_containers, kwargs.get("tls_info") or {},
684                                                     k8sconfig.get("tls"), external_cert,
685                                                     k8sconfig.get("cert_post_processor"))
686
687         # Create the container for the component
688         # Make it the first container in the pod
689         container_args = {key: kwargs.get(key) for key in ("env", "readiness", "liveness", "resources")}
690         container_args['container_ports'] = container_ports
691         container_args['volume_mounts'] = volume_mounts
692         containers.insert(0, _create_container_object(component_name, image, always_pull, **container_args))
693
694         # Build the k8s Deployment object
695         labels = kwargs.get("labels", {})
696         labels["app"] = component_name
697         dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels,
698                                         pull_secrets=k8sconfig["image_pull_secrets"])
699
700         # Have k8s deploy it
701         k8s_apps_v1_api_client.create_namespaced_deployment(namespace, dep)
702         deployment_ok = True
703         deployment_description["deployment"] = _create_deployment_name(component_name)
704
705         # Create service(s), if a port mapping is specified
706         if port_map:
707             service_ports, exposed_ports, exposed_ports_ipv6 = _process_port_map(port_map)
708
709             # Create a ClusterIP service for access via the k8s network
710             service = _create_service_object(_create_service_name(component_name), component_name, service_ports, None,
711                                              labels, "ClusterIP", "IPv4")
712             core.create_namespaced_service(namespace, service)
713             cip_service_created = True
714             deployment_description["services"].append(_create_service_name(component_name))
715
716             # If there are ports to be exposed on the k8s nodes, create a "NodePort" service
717             if exposed_ports:
718                 exposed_service = \
719                     _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports,
720                                            '', labels, "NodePort", "IPv4")
721                 core.create_namespaced_service(namespace, exposed_service)
722                 deployment_description["services"].append(_create_exposed_service_name(component_name))
723
724             if exposed_ports_ipv6:
725                 exposed_service_ipv6 = \
726                     _create_service_object(_create_exposed_v6_service_name(component_name), component_name,
727                                            exposed_ports_ipv6, '', labels, "NodePort", "IPv6")
728                 core.create_namespaced_service(namespace, exposed_service_ipv6)
729                 deployment_description["services"].append(_create_exposed_v6_service_name(component_name))
730
731     except Exception as e:
732         # If the ClusterIP service was created, delete the service:
733         if cip_service_created:
734             core.delete_namespaced_service(_create_service_name(component_name), namespace)
735         # If the deployment was created but not the service, delete the deployment
736         if deployment_ok:
737             client.AppsV1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace,
738                                                             body=client.V1DeleteOptions())
739         raise e
740
741     return dep, deployment_description
742
743
744 def undeploy(deployment_description):
745     _configure_api(deployment_description["location"])
746
747     namespace = deployment_description["namespace"]
748
749     # remove any services associated with the component
750     for service in deployment_description["services"]:
751         client.CoreV1Api().delete_namespaced_service(service, namespace)
752
753     # Have k8s delete the underlying pods and replicaset when deleting the deployment.
754     options = client.V1DeleteOptions(propagation_policy="Foreground")
755     client.AppsV1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options)
756
757
758 def is_available(location, namespace, component_name):
759     _configure_api(location)
760     dep_status = client.AppsV1Api().read_namespaced_deployment_status(_create_deployment_name(component_name),
761                                                                       namespace)
762     # Check if the number of available replicas is equal to the number requested and that the replicas match the
763     # current spec This check can be used to verify completion of an initial deployment, a scale operation,
764     # or an update operation
765     return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas
766
767
768 def scale(deployment_description, replicas):
769     """ Trigger a scaling operation by updating the replica count for the Deployment """
770
771     def update_replica_count(spec):
772         spec.spec.replicas = replicas
773         return spec
774
775     _patch_deployment(deployment_description["location"], deployment_description["namespace"],
776                       deployment_description["deployment"], update_replica_count)
777
778
779 def upgrade(deployment_description, image, container_index=0):
780     """ Trigger a rolling upgrade by sending a new image name/tag to k8s """
781
782     def update_image(spec):
783         spec.spec.template.spec.containers[container_index].image = image
784         return spec
785
786     _patch_deployment(deployment_description["location"], deployment_description["namespace"],
787                       deployment_description["deployment"], update_image)
788
789
790 def rollback(deployment_description, rollback_to=0):
791     """
792     Undo upgrade by rolling back to a previous revision of the deployment.
793     By default, go back one revision.
794     rollback_to can be used to supply a specific revision number.
795     Returns the image for the app container and the replica count from the rolled-back deployment
796     """
797     '''
798     2018-07-13
799     Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method.
800     The k8s python client code throws an exception while processing the response from the API.
801     See:
802        - https://github.com/kubernetes-client/python/issues/491
803        - https://github.com/kubernetes/kubernetes/pull/63837
804     The fix has been merged into the master branch but is not in the latest release.
805     '''
806     _configure_api(deployment_description["location"])
807     deployment = deployment_description["deployment"]
808     namespace = deployment_description["namespace"]
809
810     # Initiate the rollback
811     client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback(
812         deployment,
813         namespace,
814         client.AppsV1beta1DeploymentRollback(name=deployment,
815                                              rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to)))
816
817     # Read back the spec for the rolled-back deployment
818     spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace)
819     return spec.spec.template.spec.containers[0].image, spec.spec.replicas
820
821
822 def execute_command_in_deployment(deployment_description, command):
823     """
824     Enumerates the pods in the k8s deployment identified by "deployment_description",
825     then executes the command (represented as an argv-style list) in "command" in
826     container 0 (the main application container) each of those pods.
827
828     Note that the sets of pods associated with a deployment can change over time.  The
829     enumeration is a snapshot at one point in time.  The command will not be executed in
830     pods that are created after the initial enumeration.   If a pod disappears after the
831     initial enumeration and before the command is executed, the attempt to execute the
832     command will fail.  This is not treated as a fatal error.
833
834     This approach is reasonable for the one current use case for "execute_command":  running a
835     script to notify a container that its configuration has changed as a result of a
836     policy change.  In this use case, the new configuration information is stored into
837     the configuration store (Consul), the pods are enumerated, and the command is executed.
838     If a pod disappears after the enumeration, the fact that the command cannot be run
839     doesn't matter--a nonexistent pod doesn't need to be reconfigured.  Similarly, a pod that
840     comes up after the enumeration will get its initial configuration from the updated version
841     in Consul.
842
843     The optimal solution here would be for k8s to provide an API call to execute a command in
844     all of the pods for a deployment.   Unfortunately, k8s does not provide such a call--the
845     only call provided by k8s operates at the pod level, not the deployment level.
846
847     Another interesting k8s factoid: there's no direct way to list the pods belong to a
848     particular k8s deployment.   The deployment code above sets a label ("k8sdeployment") on
849     the pod that has the k8s deployment name.  To list the pods, the code below queries for
850     pods with the label carrying the deployment name.
851     """
852     location = deployment_description["location"]
853     _configure_api(location)
854     deployment = deployment_description["deployment"]
855     namespace = deployment_description["namespace"]
856
857     # Get names of all the running pods belonging to the deployment
858     pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod(
859         namespace=namespace,
860         label_selector="k8sdeployment={0}".format(deployment),
861         field_selector="status.phase=Running"
862     ).items]
863
864     # Execute command in the running pods
865     return [_execute_command_in_pod(location, namespace, pod_name, command)
866             for pod_name in pod_names]