Add option to use services with ipv6
[dcaegen2/platform/plugins.git] / k8s / k8sclient / k8sclient.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # Copyright (c) 2020 Nokia. All rights reserved.
7 # Copyright (c) 2020 J. F. Lucas.  All rights reserved.
8 # ================================================================================
9 # Licensed under the Apache License, Version 2.0 (the "License");
10 # you may not use this file except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #      http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 # ============LICENSE_END=========================================================
21 #
22 import os
23 import re
24 import uuid
25
26 from kubernetes import config, client, stream
27
28 # Default values for readiness probe
29 PROBE_DEFAULT_PERIOD = 15
30 PROBE_DEFAULT_TIMEOUT = 1
31
32 # Location of k8s cluster config file ("kubeconfig")
33 K8S_CONFIG_PATH="/opt/onap/kube/kubeconfig"
34
35 # Regular expression for interval/timeout specification
36 INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$")
37 # Conversion factors to seconds
38 FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600}
39
40 # Regular expression for port mapping
41 # group 1: container port
42 # group 2: / + protocol
43 # group 3: protocol
44 # group 4: host port
45 PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$")
46
47 # Constants for external_cert
48 MOUNT_PATH = "/etc/onap/oom/certservice/certs/"
49 KEYSTORE_PATH = MOUNT_PATH + "certServiceClient-keystore.jks"
50 TRUSTSTORE_PATH = MOUNT_PATH + "truststore.jks"
51 DEFAULT_CERT_TYPE = "p12"
52
53 def _create_deployment_name(component_name):
54     return "dep-{0}".format(component_name)[:63]
55
56 def _create_service_name(component_name):
57     return "{0}".format(component_name)[:63]
58
59 def _create_exposed_service_name(component_name):
60     return ("x{0}".format(component_name))[:63]
61
62 def _create_exposed_v6_service_name(component_name):
63     return ("x{0}-ipv6".format(component_name))[:63]
64
65 def _configure_api(location=None):
66     # Look for a kubernetes config file
67     if os.path.exists(K8S_CONFIG_PATH):
68         config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False)
69     else:
70         # Maybe we're running in a k8s container and we can use info provided by k8s
71         # We would like to use:
72         # config.load_incluster_config()
73         # but this looks into os.environ for kubernetes host and port, and from
74         # the plugin those aren't visible.   So we use the InClusterConfigLoader class,
75         # where we can set the environment to what we like.
76         # This is probably brittle!  Maybe there's a better alternative.
77         localenv = {
78             config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local",
79             config.incluster_config.SERVICE_PORT_ENV_NAME : "443"
80         }
81         config.incluster_config.InClusterConfigLoader(
82             token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME,
83             cert_filename=config.incluster_config.SERVICE_CERT_FILENAME,
84             environ=localenv
85         ).load_and_set()
86
87 def _parse_interval(t):
88     """
89     Parse an interval specification
90     t can be
91        - a simple integer quantity, interpreted as seconds
92        - a string representation of a decimal integer, interpreted as seconds
93        - a string consisting of a represention of an decimal integer followed by a unit,
94          with "s" representing seconds, "m" representing minutes,
95          and "h" representing hours
96     Used for compatibility with the Docker plugin, where time intervals
97     for health checks were specified as strings with a number and a unit.
98     See 'intervalspec' above for the regular expression that's accepted.
99     """
100     m = INTERVAL_SPEC.match(str(t))
101     if m:
102         time = int(m.group(1)) * FACTORS[m.group(2)]
103     else:
104         raise ValueError("Bad interval specification: {0}".format(t))
105     return time
106
107 def _create_probe(hc, port):
108     ''' Create a Kubernetes probe based on info in the health check dictionary hc '''
109     probe_type = hc['type']
110     probe = None
111     period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD))
112     timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT))
113     if probe_type in ['http', 'https']:
114         probe = client.V1Probe(
115           failure_threshold = 1,
116           initial_delay_seconds = 5,
117           period_seconds = period,
118           timeout_seconds = timeout,
119           http_get = client.V1HTTPGetAction(
120               path = hc['endpoint'],
121               port = port,
122               scheme = probe_type.upper()
123           )
124         )
125     elif probe_type in ['script', 'docker']:
126         probe = client.V1Probe(
127           failure_threshold = 1,
128           initial_delay_seconds = 5,
129           period_seconds = period,
130           timeout_seconds = timeout,
131           _exec = client.V1ExecAction(
132               command = hc['script'].split( )
133           )
134         )
135     return probe
136
137 def _create_resources(resources=None):
138     if resources is not None:
139         resources_obj = client.V1ResourceRequirements(
140           limits = resources.get("limits"),
141           requests = resources.get("requests")
142         )
143         return resources_obj
144     else:
145         return None
146
147 def _create_container_object(name, image, always_pull, **kwargs):
148     # Set up environment variables
149     # Copy any passed in environment variables
150     env = kwargs.get('env') or {}
151     env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env]
152     # Add POD_IP with the IP address of the pod running the container
153     pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP"))
154     env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip))
155
156     # If a health check is specified, create a readiness/liveness probe
157     # (For an HTTP-based check, we assume it's at the first container port)
158     readiness = kwargs.get('readiness')
159     liveness = kwargs.get('liveness')
160     resources = kwargs.get('resources')
161     container_ports = kwargs.get('container_ports') or []
162
163     hc_port = container_ports[0][0] if container_ports else None
164     probe = _create_probe(readiness, hc_port) if readiness else None
165     live_probe = _create_probe(liveness, hc_port) if liveness else None
166     resources_obj = _create_resources(resources) if resources else None
167     port_objs = [client.V1ContainerPort(container_port=port, protocol=proto)
168                  for port, proto in container_ports]
169
170     # Define container for pod
171     return client.V1Container(
172         name=name,
173         image=image,
174         image_pull_policy='Always' if always_pull else 'IfNotPresent',
175         env=env_vars,
176         ports=port_objs,
177         volume_mounts=kwargs.get('volume_mounts') or [],
178         resources=resources_obj,
179         readiness_probe=probe,
180         liveness_probe=live_probe
181     )
182
183 def _create_deployment_object(component_name,
184                               containers,
185                               init_containers,
186                               replicas,
187                               volumes,
188                               labels={},
189                               pull_secrets=[]):
190
191     deployment_name = _create_deployment_name(component_name)
192
193     # Label the pod with the deployment name, so we can find it easily
194     labels.update({"k8sdeployment" : deployment_name})
195
196     # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
197     # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
198     ips = []
199     for secret in pull_secrets:
200         ips.append(client.V1LocalObjectReference(name=secret))
201
202     # Define pod template
203     template = client.V1PodTemplateSpec(
204         metadata=client.V1ObjectMeta(labels=labels),
205         spec=client.V1PodSpec(hostname=component_name,
206                               containers=containers,
207                               init_containers=init_containers,
208                               volumes=volumes,
209                               image_pull_secrets=ips)
210     )
211
212     # Define deployment spec
213     spec = client.V1DeploymentSpec(
214         replicas=replicas,
215         selector=client.V1LabelSelector(match_labels=labels),
216         template=template
217     )
218
219     # Create deployment object
220     deployment = client.V1Deployment(
221         api_version="apps/v1",
222         kind="Deployment",
223         metadata=client.V1ObjectMeta(name=deployment_name, labels=labels),
224         spec=spec
225     )
226
227     return deployment
228
229
230 def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type, ip_family):
231     service_spec = client.V1ServiceSpec(
232         ports=service_ports,
233         selector={"app": component_name},
234         type=service_type,
235         ip_family=ip_family
236     )
237     if annotations:
238         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations)
239     else:
240         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels)
241
242     service = client.V1Service(
243         kind="Service",
244         api_version="v1",
245         metadata=metadata,
246         spec=service_spec
247     )
248     return service
249
250 def parse_ports(port_list):
251     '''
252     Parse the port list into a list of container ports (needed to create the container)
253     and to a set of port mappings to set up k8s services.
254     '''
255     container_ports = []
256     port_map = {}
257     for p in port_list:
258         ipv6 = False
259         if type(p) is dict:
260             ipv6 = "ipv6" in p and p['ipv6']
261             p = "".join(str(v) for v in p['concat'])
262         m = PORTS.match(p.strip())
263         if m:
264             cport = int(m.group(1))
265             hport = int(m.group(4))
266             if m.group(3):
267                 proto = (m.group(3)).upper()
268             else:
269                 proto = "TCP"
270             port = (cport, proto)
271             if port not in container_ports:
272                 container_ports.append(port)
273             port_map[(cport, proto, ipv6)] = hport
274         else:
275             raise ValueError("Bad port specification: {0}".format(p))
276
277     return container_ports, port_map
278
279
280 def _parse_volumes(volume_list):
281     volumes = []
282     volume_mounts = []
283     for v in volume_list:
284         vname = str(uuid.uuid4())
285         vhost = v['host']['path']
286         vcontainer = v['container']['bind']
287         vro = (v['container'].get('mode') == 'ro')
288         volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
289         volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro))
290
291     return volumes, volume_mounts
292
293 def _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, log_info, filebeat):
294     if not log_info or not filebeat:
295         return
296     log_dir = log_info.get("log_directory")
297     if not log_dir:
298         return
299     sidecar_volume_mounts = []
300
301     # Create the volume for component log files and volume mounts for the component and sidecar containers
302     volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
303     volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
304     sc_path = log_info.get("alternate_fb_path") or "{0}/{1}".format(filebeat["log_path"], component_name)
305     sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
306
307     # Create the volume for sidecar data and the volume mount for it
308     volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
309     sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=filebeat["data_path"]))
310
311     # Create the volume for the sidecar configuration data and the volume mount for it
312     # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
313     volumes.append(
314         client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=filebeat["config_map"])))
315     sidecar_volume_mounts.append(
316         client.V1VolumeMount(name="filebeat-conf", mount_path=filebeat["config_path"], sub_path=filebeat["config_subpath"]))
317
318     # Finally create the container for the sidecar
319     containers.append(_create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts))
320
321 def _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, tls_info, tls_config):
322     #   Adds an InitContainer to the pod to set up TLS certificate information.  For components that act as a
323     #   server(tls_info["use_tls"] is True), the InitContainer will populate a directory with server and CA certificate
324     #   materials in various formats.   For other components (tls_info["use_tls"] is False, or tls_info is not specified),
325     #   the InitContainer will populate a directory with CA certificate materials in PEM and JKS formats.
326     #   In either case, the certificate directory is mounted onto the component container filesystem at the location
327     #   specified by tls_info["component_cert_dir"], if present, otherwise at the configured default mount point
328     #   (tls_config["component_cert_dir"]).
329     docker_image = tls_config["image"]
330     ctx.logger.info("Creating init container: TLS \n  * [" + docker_image + "]")
331
332     cert_directory = tls_info.get("cert_directory") or tls_config.get("component_cert_dir")
333     env = {}
334     env["TLS_SERVER"] = "true" if tls_info.get("use_tls") else "false"
335
336     # Create the certificate volume and volume mounts
337     volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource()))
338     volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=cert_directory))
339     init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])]
340
341     # Create the init container
342     init_containers.append(_create_container_object("init-tls", docker_image, False, volume_mounts=init_volume_mounts, env=env))
343
344 def _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, external_tls_config):
345     # Adds an InitContainer to the pod which will generate external TLS certificates.
346     docker_image = external_tls_config["image_tag"]
347     ctx.logger.info("Creating init container: external TLS \n  * [" + docker_image + "]")
348
349     env = {}
350     output_path = external_cert.get("external_cert_directory")
351     if not output_path.endswith('/'):
352        output_path += '/'
353
354     env["REQUEST_URL"] = external_tls_config.get("request_url")
355     env["REQUEST_TIMEOUT"] = external_tls_config.get("timeout")
356     env["OUTPUT_PATH"] = output_path + "external"
357     env["OUTPUT_TYPE"] = external_cert.get("cert_type")
358     env["CA_NAME"] = external_cert.get("ca_name")
359     env["COMMON_NAME"] = external_cert.get("external_certificate_parameters").get("common_name")
360     env["ORGANIZATION"] = external_tls_config.get("organization")
361     env["ORGANIZATION_UNIT"] = external_tls_config.get("organizational_unit")
362     env["LOCATION"] = external_tls_config.get("location")
363     env["STATE"] = external_tls_config.get("state")
364     env["COUNTRY"] = external_tls_config.get("country")
365     env["SANS"] = external_cert.get("external_certificate_parameters").get("sans")
366     env["KEYSTORE_PATH"] = KEYSTORE_PATH
367     env["KEYSTORE_PASSWORD"] = external_tls_config.get("keystore_password")
368     env["TRUSTSTORE_PATH"] = TRUSTSTORE_PATH
369     env["TRUSTSTORE_PASSWORD"] = external_tls_config.get("truststore_password")
370
371     # Create the volumes and volume mounts
372     sec = client.V1SecretVolumeSource(secret_name=external_tls_config.get("cert_secret_name"))
373     volumes.append(client.V1Volume(name="tls-volume", secret=sec))
374     init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=external_cert.get("external_cert_directory")),
375                           client.V1VolumeMount(name="tls-volume", mount_path=MOUNT_PATH)]
376
377     # Create the init container
378     init_containers.append(_create_container_object("cert-service-client", docker_image, False, volume_mounts=init_volume_mounts, env=env))
379
380
381 def _add_cert_post_processor_init_container(ctx, init_containers, tls_info, tls_config, external_cert, cert_post_processor_config):
382     # Adds an InitContainer to the pod to merge TLS and external TLS truststore into single file.
383     docker_image = cert_post_processor_config["image_tag"]
384     ctx.logger.info("Creating init container: cert post processor \n  * [" + docker_image + "]")
385
386     tls_cert_dir = tls_info.get("cert_directory") or tls_config.get("component_cert_dir")
387     if not tls_cert_dir.endswith('/'):
388         tls_cert_dir += '/'
389
390     tls_cert_file_path = tls_cert_dir + "trust.jks"
391     tls_cert_file_pass = tls_cert_dir + "trust.pass"
392
393     ext_cert_dir = tls_cert_dir + "external/"
394
395     output_type = (external_cert.get("cert_type") or DEFAULT_CERT_TYPE).lower()
396     ext_truststore_path = ext_cert_dir + "truststore." + _get_file_extension(output_type)
397     ext_truststore_pass = ''
398     if output_type != 'pem':
399         ext_truststore_pass = ext_cert_dir + "truststore.pass"
400
401     env = {}
402     env["TRUSTSTORES_PATHS"] = tls_cert_file_path + ":" + ext_truststore_path
403     env["TRUSTSTORES_PASSWORDS_PATHS"] = tls_cert_file_pass + ":" + ext_truststore_pass
404     env["KEYSTORE_SOURCE_PATHS"] = _get_keystore_source_paths(output_type, ext_cert_dir)
405     env["KEYSTORE_DESTINATION_PATHS"] = _get_keystore_destination_paths(output_type, tls_cert_dir)
406
407     ctx.logger.info("TRUSTSTORES_PATHS:            " + env["TRUSTSTORES_PATHS"])
408     ctx.logger.info("TRUSTSTORES_PASSWORDS_PATHS:  " + env["TRUSTSTORES_PASSWORDS_PATHS"])
409     ctx.logger.info("KEYSTORE_SOURCE_PATHS:        " + env["KEYSTORE_SOURCE_PATHS"])
410     ctx.logger.info("KEYSTORE_DESTINATION_PATHS:   " + env["KEYSTORE_DESTINATION_PATHS"])
411
412     # Create the volumes and volume mounts
413     init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_cert_dir)]
414
415     # Create the init container
416     init_containers.append(_create_container_object("cert-post-processor", docker_image, False, volume_mounts=init_volume_mounts, env=env))
417
418
419 def _get_file_extension(output_type):
420     return {
421         'p12': 'p12',
422         'pem': 'pem',
423         'jks': 'jks',
424     }[output_type]
425
426 def _get_keystore_source_paths(output_type, ext_cert_dir):
427     source_paths_template = {
428         'p12': "{0}keystore.p12:{0}keystore.pass",
429         'jks': "{0}keystore.jks:{0}keystore.pass",
430         'pem': "{0}keystore.pem:{0}key.pem",
431     }[output_type]
432     return source_paths_template.format(ext_cert_dir)
433
434 def _get_keystore_destination_paths(output_type, tls_cert_dir):
435     destination_paths_template = {
436         'p12': "{0}cert.p12:{0}p12.pass",
437         'jks': "{0}cert.jks:{0}jks.pass",
438         'pem': "{0}cert.pem:{0}key.pem",
439     }[output_type]
440     return destination_paths_template.format(tls_cert_dir)
441
442
443 def _process_port_map(port_map):
444     service_ports = []  # Ports exposed internally on the k8s network
445     exposed_ports = []  # Ports to be mapped to ports on the k8s nodes via NodePort
446     exposed_ports_ipv6 = []
447     for (cport, proto, ipv6), hport in port_map.items():
448         name = "xport-{0}-{1}".format(proto[0].lower(), cport)
449         cport = int(cport)
450         hport = int(hport)
451         port = client.V1ServicePort(port=cport, protocol=proto, name=name[1:])
452         if port not in service_ports:
453             service_ports.append(port)
454         if hport != 0:
455             if ipv6:
456                 exposed_ports_ipv6.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name))
457             else:
458                 exposed_ports.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name))
459     return service_ports, exposed_ports, exposed_ports_ipv6
460
461 def _service_exists(location, namespace, component_name):
462     exists = False
463     try:
464         _configure_api(location)
465         client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace)
466         exists = True
467     except client.rest.ApiException:
468         pass
469
470     return exists
471
472 def _patch_deployment(location, namespace, deployment, modify):
473     '''
474     Gets the current spec for 'deployment' in 'namespace'
475     in the k8s cluster at 'location',
476     uses the 'modify' function to change the spec,
477     then sends the updated spec to k8s.
478     '''
479     _configure_api(location)
480
481     # Get deployment spec
482     spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace)
483
484     # Apply changes to spec
485     spec = modify(spec)
486
487     # Patch the deploy with updated spec
488     client.AppsV1Api().patch_namespaced_deployment(deployment, namespace, spec)
489
490 def _execute_command_in_pod(location, namespace, pod_name, command):
491     '''
492     Execute the command (specified by an argv-style list in  the "command" parameter) in
493     the specified pod in the specified namespace at the specified location.
494     For now at least, we use this only to
495     run a notification script in a pod after a configuration change.
496
497     The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec".
498     Unfortunately due to a bug/limitation in the Python client library, we can't call it directly.
499     We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API.
500     I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py.
501     There are several issues tracking this, in various states.  It isn't clear that there will ever
502     be a fix.
503         - https://github.com/kubernetes-client/python/issues/58
504         - https://github.com/kubernetes-client/python/issues/409
505         - https://github.com/kubernetes-client/python/issues/526
506
507     The main consequence of the workaround using "stream" is that the caller does not get an indication
508     of the exit code returned by the command when it completes execution.   It turns out that the
509     original implementation of notification in the Docker plugin did not use this result, so we can
510     still match the original notification functionality.
511
512     The "stream" approach returns a string containing any output sent by the command to stdout or stderr.
513     We'll return that so it can logged.
514     '''
515     _configure_api(location)
516     try:
517         output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec,
518                              name=pod_name,
519                              namespace=namespace,
520                              command=command,
521                              stdout=True,
522                              stderr=True,
523                             stdin=False,
524                             tty=False)
525     except client.rest.ApiException as e:
526         # If the exception indicates the pod wasn't found,  it's not a fatal error.
527         # It existed when we enumerated the pods for the deployment but no longer exists.
528         # Unfortunately, the only way to distinguish a pod not found from any other error
529         # is by looking at the reason text.
530         # (The ApiException's "status" field should contain the HTTP status code, which would
531         # be 404 if the pod isn't found, but empirical testing reveals that "status" is set
532         # to zero.)
533         if "404 not found" in e.reason.lower():
534             output = "Pod not found"
535         else:
536             raise e
537
538     return {"pod" : pod_name, "output" : output}
539
540 def deploy(ctx, namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs):
541     '''
542     This will create a k8s Deployment and, if needed, one or two k8s Services.
543     (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
544     We're not exposing k8s to the component developer and the blueprint author.
545     This is a conscious choice.  We want to use k8s in a controlled, consistent way, and we want to hide
546     the details from the component developer and the blueprint author.)
547
548     namespace:  the Kubernetes namespace into which the component is deployed
549     component_name:  the component name, used to derive names of Kubernetes entities
550     image: the docker image for the component being deployed
551     replica: the number of instances of the component to be deployed
552     always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of
553        the Docker image for the component, even if it is already present on the Kubernetes node.
554     k8sconfig contains:
555         - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images.
556           (DON'T PANIC:  these are just the names of secrets held in the Kubernetes secret store.)
557         - filebeat: a dictionary of filebeat sidecar parameters:
558             "log_path" : mount point for log volume in filebeat container
559             "data_path" : mount point for data volume in filebeat container
560             "config_path" : mount point for config volume in filebeat container
561             "config_subpath" :  subpath for config data in filebeat container
562             "config_map" : ConfigMap holding the filebeat configuration
563             "image": Docker image to use for filebeat
564         - tls: a dictionary of TLS-related information:
565             "cert_path": mount point for certificate volume in init container
566             "image": Docker image to use for TLS init container
567             "component_cert_dir" : default mount point for certs
568         - cert_post_processor: a dictionary of cert_post_processor information:
569             "image_tag": docker image to use for cert-post-processor init container
570     kwargs may have:
571         - volumes:  array of volume objects, where a volume object is:
572             {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
573         - ports: array of strings in the form "container_port:host_port"
574         - env: map of name-value pairs ( {name0: value0, name1: value1...}
575         - log_info: an object with info for setting up ELK logging, with the form:
576             {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}
577         - tls_info: an object with info for setting up TLS (HTTPS), with the form:
578             {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" }
579         - external_cert: an object with information for setting up the init container for external certificates creation, with the form:
580             {"external_cert":
581                 "external_cert_directory": "/path/to/directory_where_certs_should_be_placed",
582                 "use_external_tls": true or false,
583                 "ca_name": "ca-name-value",
584                 "cert_type": "P12" or "JKS" or "PEM",
585                 "external_certificate_parameters":
586                     "common_name": "common-name-value",
587                     "sans": "sans-value"}
588         - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"}
589             These label will be set on all the pods deployed as a result of this deploy() invocation.
590         - resources: dict with optional "limits" and "requests" resource requirements, each a dict containing:
591             - cpu:    number CPU usage, like 0.5
592             - memory: string memory requirement, like "2Gi"
593         - readiness: dict with health check info; if present, used to create a readiness probe for the main container.  Includes:
594             - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
595             - interval: period (in seconds) between probes
596             - timeout:  time (in seconds) to allow a probe to complete
597             - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types
598             - path: the full path to the script to be executed in the container for "script" and "docker" types
599         - liveness: dict with health check info; if present, used to create a liveness probe for the main container.  Includes:
600             - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
601             - interval: period (in seconds) between probes
602             - timeout:  time (in seconds) to allow a probe to complete
603             - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types
604             - path: the full path to the script to be executed in the container for "script" and "docker" types
605         - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
606
607     '''
608
609     deployment_ok = False
610     cip_service_created = False
611     deployment_description = {
612         "namespace": namespace,
613         "location" : kwargs.get("k8s_location"),
614         "deployment": '',
615         "services": []
616     }
617
618     try:
619
620         # Get API handles
621         _configure_api(kwargs.get("k8s_location"))
622         core = client.CoreV1Api()
623         k8s_apps_v1_api_client = client.AppsV1Api()
624
625         # Parse the port mapping
626         container_ports, port_map = parse_ports(kwargs.get("ports", []))
627
628         # Parse the volumes list into volumes and volume_mounts for the deployment
629         volumes, volume_mounts = _parse_volumes(kwargs.get("volumes", []))
630
631         # Initialize the list of containers that will be part of the pod
632         containers = []
633         init_containers = []
634
635         # Set up the ELK logging sidecar container, if needed
636         _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"), k8sconfig.get("filebeat"))
637
638         # Set up TLS information
639         _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {}, k8sconfig.get("tls"))
640
641         # Set up external TLS information
642         external_cert = kwargs.get("external_cert")
643         if external_cert and external_cert.get("use_external_tls"):
644             _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, k8sconfig.get("external_cert"))
645             _add_cert_post_processor_init_container(ctx, init_containers, kwargs.get("tls_info") or {}, k8sconfig.get("tls"), external_cert, k8sconfig.get("cert_post_processor"))
646
647         # Create the container for the component
648         # Make it the first container in the pod
649         container_args = {key: kwargs.get(key) for key in ("env", "readiness", "liveness", "resources")}
650         container_args['container_ports'] = container_ports
651         container_args['volume_mounts'] = volume_mounts
652         containers.insert(0, _create_container_object(component_name, image, always_pull, **container_args))
653
654         # Build the k8s Deployment object
655         labels = kwargs.get("labels", {})
656         labels["app"] = component_name
657         dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"])
658
659         # Have k8s deploy it
660         k8s_apps_v1_api_client.create_namespaced_deployment(namespace, dep)
661         deployment_ok = True
662         deployment_description["deployment"] = _create_deployment_name(component_name)
663
664         # Create service(s), if a port mapping is specified
665         if port_map:
666             service_ports, exposed_ports, exposed_ports_ipv6 = _process_port_map(port_map)
667
668             # Create a ClusterIP service for access via the k8s network
669             service = _create_service_object(_create_service_name(component_name), component_name, service_ports, None,
670                                              labels, "ClusterIP", "IPv4")
671             core.create_namespaced_service(namespace, service)
672             cip_service_created = True
673             deployment_description["services"].append(_create_service_name(component_name))
674
675             # If there are ports to be exposed on the k8s nodes, create a "NodePort" service
676             if exposed_ports:
677                 exposed_service = \
678                     _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports,
679                                            '', labels, "NodePort", "IPv4")
680                 core.create_namespaced_service(namespace, exposed_service)
681                 deployment_description["services"].append(_create_exposed_service_name(component_name))
682
683             if exposed_ports_ipv6:
684                 exposed_service_ipv6 = \
685                     _create_service_object(_create_exposed_v6_service_name(component_name), component_name,
686                                            exposed_ports_ipv6, '', labels, "NodePort", "IPv6")
687                 core.create_namespaced_service(namespace, exposed_service_ipv6)
688                 deployment_description["services"].append(_create_exposed_v6_service_name(component_name))
689
690     except Exception as e:
691         # If the ClusterIP service was created, delete the service:
692         if cip_service_created:
693             core.delete_namespaced_service(_create_service_name(component_name), namespace)
694         # If the deployment was created but not the service, delete the deployment
695         if deployment_ok:
696             client.AppsV1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, body=client.V1DeleteOptions())
697         raise e
698
699     return dep, deployment_description
700
701 def undeploy(deployment_description):
702     _configure_api(deployment_description["location"])
703
704     namespace = deployment_description["namespace"]
705
706     # remove any services associated with the component
707     for service in deployment_description["services"]:
708         client.CoreV1Api().delete_namespaced_service(service, namespace)
709
710     # Have k8s delete the underlying pods and replicaset when deleting the deployment.
711     options = client.V1DeleteOptions(propagation_policy="Foreground")
712     client.AppsV1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options)
713
714 def is_available(location, namespace, component_name):
715     _configure_api(location)
716     dep_status = client.AppsV1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace)
717     # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec
718     # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation
719     return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas
720
721 def scale(deployment_description, replicas):
722     ''' Trigger a scaling operation by updating the replica count for the Deployment '''
723
724     def update_replica_count(spec):
725         spec.spec.replicas = replicas
726         return spec
727
728     _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_replica_count)
729
730 def upgrade(deployment_description, image, container_index = 0):
731     ''' Trigger a rolling upgrade by sending a new image name/tag to k8s '''
732
733     def update_image(spec):
734         spec.spec.template.spec.containers[container_index].image = image
735         return spec
736
737     _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_image)
738
739 def rollback(deployment_description, rollback_to=0):
740     '''
741     Undo upgrade by rolling back to a previous revision of the deployment.
742     By default, go back one revision.
743     rollback_to can be used to supply a specific revision number.
744     Returns the image for the app container and the replica count from the rolled-back deployment
745     '''
746     '''
747     2018-07-13
748     Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method.
749     The k8s python client code throws an exception while processing the response from the API.
750     See:
751        - https://github.com/kubernetes-client/python/issues/491
752        - https://github.com/kubernetes/kubernetes/pull/63837
753     The fix has been merged into the master branch but is not in the latest release.
754     '''
755     _configure_api(deployment_description["location"])
756     deployment = deployment_description["deployment"]
757     namespace = deployment_description["namespace"]
758
759     # Initiate the rollback
760     client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback(
761         deployment,
762         namespace,
763         client.AppsV1beta1DeploymentRollback(name=deployment, rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to)))
764
765     # Read back the spec for the rolled-back deployment
766     spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace)
767     return spec.spec.template.spec.containers[0].image, spec.spec.replicas
768
769 def execute_command_in_deployment(deployment_description, command):
770     '''
771     Enumerates the pods in the k8s deployment identified by "deployment_description",
772     then executes the command (represented as an argv-style list) in "command" in
773     container 0 (the main application container) each of those pods.
774
775     Note that the sets of pods associated with a deployment can change over time.  The
776     enumeration is a snapshot at one point in time.  The command will not be executed in
777     pods that are created after the initial enumeration.   If a pod disappears after the
778     initial enumeration and before the command is executed, the attempt to execute the
779     command will fail.  This is not treated as a fatal error.
780
781     This approach is reasonable for the one current use case for "execute_command":  running a
782     script to notify a container that its configuration has changed as a result of a
783     policy change.  In this use case, the new configuration information is stored into
784     the configuration store (Consul), the pods are enumerated, and the command is executed.
785     If a pod disappears after the enumeration, the fact that the command cannot be run
786     doesn't matter--a nonexistent pod doesn't need to be reconfigured.  Similarly, a pod that
787     comes up after the enumeration will get its initial configuration from the updated version
788     in Consul.
789
790     The optimal solution here would be for k8s to provide an API call to execute a command in
791     all of the pods for a deployment.   Unfortunately, k8s does not provide such a call--the
792     only call provided by k8s operates at the pod level, not the deployment level.
793
794     Another interesting k8s factoid: there's no direct way to list the pods belong to a
795     particular k8s deployment.   The deployment code above sets a label ("k8sdeployment") on
796     the pod that has the k8s deployment name.  To list the pods, the code below queries for
797     pods with the label carrying the deployment name.
798     '''
799     location = deployment_description["location"]
800     _configure_api(location)
801     deployment = deployment_description["deployment"]
802     namespace = deployment_description["namespace"]
803
804     # Get names of all the running pods belonging to the deployment
805     pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod(
806         namespace = namespace,
807         label_selector = "k8sdeployment={0}".format(deployment),
808         field_selector = "status.phase=Running"
809     ).items]
810
811     # Execute command in the running pods
812     return [_execute_command_in_pod(location, namespace, pod_name, command)
813             for pod_name in pod_names]