1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # Copyright (c) 2020 Nokia. All rights reserved.
7 # Copyright (c) 2020 J. F. Lucas. All rights reserved.
8 # ================================================================================
9 # Licensed under the Apache License, Version 2.0 (the "License");
10 # you may not use this file except in compliance with the License.
11 # You may obtain a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 # ============LICENSE_END=========================================================
26 from kubernetes import config, client, stream
28 # Default values for readiness probe
29 PROBE_DEFAULT_PERIOD = 15
30 PROBE_DEFAULT_TIMEOUT = 1
32 # Location of k8s cluster config file ("kubeconfig")
33 K8S_CONFIG_PATH="/opt/onap/kube/kubeconfig"
35 # Regular expression for interval/timeout specification
36 INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$")
37 # Conversion factors to seconds
38 FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600}
40 # Regular expression for port mapping
41 # group 1: container port
42 # group 2: / + protocol
45 PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$")
47 # Constants for external_cert
48 MOUNT_PATH = "/etc/onap/oom/certservice/certs/"
49 KEYSTORE_PATH = MOUNT_PATH + "certServiceClient-keystore.jks"
50 TRUSTSTORE_PATH = MOUNT_PATH + "truststore.jks"
51 DEFAULT_CERT_TYPE = "p12"
53 def _create_deployment_name(component_name):
54 return "dep-{0}".format(component_name)[:63]
56 def _create_service_name(component_name):
57 return "{0}".format(component_name)[:63]
59 def _create_exposed_service_name(component_name):
60 return ("x{0}".format(component_name))[:63]
62 def _create_exposed_v6_service_name(component_name):
63 return ("x{0}-ipv6".format(component_name))[:63]
65 def _configure_api(location=None):
66 # Look for a kubernetes config file
67 if os.path.exists(K8S_CONFIG_PATH):
68 config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False)
70 # Maybe we're running in a k8s container and we can use info provided by k8s
71 # We would like to use:
72 # config.load_incluster_config()
73 # but this looks into os.environ for kubernetes host and port, and from
74 # the plugin those aren't visible. So we use the InClusterConfigLoader class,
75 # where we can set the environment to what we like.
76 # This is probably brittle! Maybe there's a better alternative.
78 config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local",
79 config.incluster_config.SERVICE_PORT_ENV_NAME : "443"
81 config.incluster_config.InClusterConfigLoader(
82 token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME,
83 cert_filename=config.incluster_config.SERVICE_CERT_FILENAME,
87 def _parse_interval(t):
89 Parse an interval specification
91 - a simple integer quantity, interpreted as seconds
92 - a string representation of a decimal integer, interpreted as seconds
93 - a string consisting of a represention of an decimal integer followed by a unit,
94 with "s" representing seconds, "m" representing minutes,
95 and "h" representing hours
96 Used for compatibility with the Docker plugin, where time intervals
97 for health checks were specified as strings with a number and a unit.
98 See 'intervalspec' above for the regular expression that's accepted.
100 m = INTERVAL_SPEC.match(str(t))
102 time = int(m.group(1)) * FACTORS[m.group(2)]
104 raise ValueError("Bad interval specification: {0}".format(t))
107 def _create_probe(hc, port):
108 ''' Create a Kubernetes probe based on info in the health check dictionary hc '''
109 probe_type = hc['type']
111 period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD))
112 timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT))
113 if probe_type in ['http', 'https']:
114 probe = client.V1Probe(
115 failure_threshold = 1,
116 initial_delay_seconds = 5,
117 period_seconds = period,
118 timeout_seconds = timeout,
119 http_get = client.V1HTTPGetAction(
120 path = hc['endpoint'],
122 scheme = probe_type.upper()
125 elif probe_type in ['script', 'docker']:
126 probe = client.V1Probe(
127 failure_threshold = 1,
128 initial_delay_seconds = 5,
129 period_seconds = period,
130 timeout_seconds = timeout,
131 _exec = client.V1ExecAction(
132 command = hc['script'].split( )
137 def _create_resources(resources=None):
138 if resources is not None:
139 resources_obj = client.V1ResourceRequirements(
140 limits = resources.get("limits"),
141 requests = resources.get("requests")
147 def _create_container_object(name, image, always_pull, **kwargs):
148 # Set up environment variables
149 # Copy any passed in environment variables
150 env = kwargs.get('env') or {}
151 env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env]
152 # Add POD_IP with the IP address of the pod running the container
153 pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP"))
154 env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip))
156 # If a health check is specified, create a readiness/liveness probe
157 # (For an HTTP-based check, we assume it's at the first container port)
158 readiness = kwargs.get('readiness')
159 liveness = kwargs.get('liveness')
160 resources = kwargs.get('resources')
161 container_ports = kwargs.get('container_ports') or []
163 hc_port = container_ports[0][0] if container_ports else None
164 probe = _create_probe(readiness, hc_port) if readiness else None
165 live_probe = _create_probe(liveness, hc_port) if liveness else None
166 resources_obj = _create_resources(resources) if resources else None
167 port_objs = [client.V1ContainerPort(container_port=port, protocol=proto)
168 for port, proto in container_ports]
170 # Define container for pod
171 return client.V1Container(
174 image_pull_policy='Always' if always_pull else 'IfNotPresent',
177 volume_mounts=kwargs.get('volume_mounts') or [],
178 resources=resources_obj,
179 readiness_probe=probe,
180 liveness_probe=live_probe
183 def _create_deployment_object(component_name,
191 deployment_name = _create_deployment_name(component_name)
193 # Label the pod with the deployment name, so we can find it easily
194 labels.update({"k8sdeployment" : deployment_name})
196 # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
197 # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
199 for secret in pull_secrets:
200 ips.append(client.V1LocalObjectReference(name=secret))
202 # Define pod template
203 template = client.V1PodTemplateSpec(
204 metadata=client.V1ObjectMeta(labels=labels),
205 spec=client.V1PodSpec(hostname=component_name,
206 containers=containers,
207 init_containers=init_containers,
209 image_pull_secrets=ips)
212 # Define deployment spec
213 spec = client.V1DeploymentSpec(
215 selector=client.V1LabelSelector(match_labels=labels),
219 # Create deployment object
220 deployment = client.V1Deployment(
221 api_version="apps/v1",
223 metadata=client.V1ObjectMeta(name=deployment_name, labels=labels),
230 def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type, ip_family):
231 service_spec = client.V1ServiceSpec(
233 selector={"app": component_name},
238 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations)
240 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels)
242 service = client.V1Service(
250 def parse_ports(port_list):
252 Parse the port list into a list of container ports (needed to create the container)
253 and to a set of port mappings to set up k8s services.
260 ipv6 = "ipv6" in p and p['ipv6']
261 p = "".join(str(v) for v in p['concat'])
262 m = PORTS.match(p.strip())
264 cport = int(m.group(1))
265 hport = int(m.group(4))
267 proto = (m.group(3)).upper()
270 port = (cport, proto)
271 if port not in container_ports:
272 container_ports.append(port)
273 port_map[(cport, proto, ipv6)] = hport
275 raise ValueError("Bad port specification: {0}".format(p))
277 return container_ports, port_map
280 def _parse_volumes(volume_list):
283 for v in volume_list:
284 vname = str(uuid.uuid4())
285 vhost = v['host']['path']
286 vcontainer = v['container']['bind']
287 vro = (v['container'].get('mode') == 'ro')
288 volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
289 volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro))
291 return volumes, volume_mounts
293 def _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, log_info, filebeat):
294 if not log_info or not filebeat:
296 log_dir = log_info.get("log_directory")
299 sidecar_volume_mounts = []
301 # Create the volume for component log files and volume mounts for the component and sidecar containers
302 volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
303 volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
304 sc_path = log_info.get("alternate_fb_path") or "{0}/{1}".format(filebeat["log_path"], component_name)
305 sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
307 # Create the volume for sidecar data and the volume mount for it
308 volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
309 sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=filebeat["data_path"]))
311 # Create the volume for the sidecar configuration data and the volume mount for it
312 # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
314 client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=filebeat["config_map"])))
315 sidecar_volume_mounts.append(
316 client.V1VolumeMount(name="filebeat-conf", mount_path=filebeat["config_path"], sub_path=filebeat["config_subpath"]))
318 # Finally create the container for the sidecar
319 containers.append(_create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts))
321 def _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, tls_info, tls_config):
322 # Adds an InitContainer to the pod to set up TLS certificate information. For components that act as a
323 # server(tls_info["use_tls"] is True), the InitContainer will populate a directory with server and CA certificate
324 # materials in various formats. For other components (tls_info["use_tls"] is False, or tls_info is not specified),
325 # the InitContainer will populate a directory with CA certificate materials in PEM and JKS formats.
326 # In either case, the certificate directory is mounted onto the component container filesystem at the location
327 # specified by tls_info["component_cert_dir"], if present, otherwise at the configured default mount point
328 # (tls_config["component_cert_dir"]).
329 docker_image = tls_config["image"]
330 ctx.logger.info("Creating init container: TLS \n * [" + docker_image + "]")
332 cert_directory = tls_info.get("cert_directory") or tls_config.get("component_cert_dir")
334 env["TLS_SERVER"] = "true" if tls_info.get("use_tls") else "false"
336 # Create the certificate volume and volume mounts
337 volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource()))
338 volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=cert_directory))
339 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])]
341 # Create the init container
342 init_containers.append(_create_container_object("init-tls", docker_image, False, volume_mounts=init_volume_mounts, env=env))
344 def _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, external_tls_config):
345 # Adds an InitContainer to the pod which will generate external TLS certificates.
346 docker_image = external_tls_config["image_tag"]
347 ctx.logger.info("Creating init container: external TLS \n * [" + docker_image + "]")
350 output_path = external_cert.get("external_cert_directory")
351 if not output_path.endswith('/'):
354 env["REQUEST_URL"] = external_tls_config.get("request_url")
355 env["REQUEST_TIMEOUT"] = external_tls_config.get("timeout")
356 env["OUTPUT_PATH"] = output_path + "external"
357 env["OUTPUT_TYPE"] = external_cert.get("cert_type")
358 env["CA_NAME"] = external_cert.get("ca_name")
359 env["COMMON_NAME"] = external_cert.get("external_certificate_parameters").get("common_name")
360 env["ORGANIZATION"] = external_tls_config.get("organization")
361 env["ORGANIZATION_UNIT"] = external_tls_config.get("organizational_unit")
362 env["LOCATION"] = external_tls_config.get("location")
363 env["STATE"] = external_tls_config.get("state")
364 env["COUNTRY"] = external_tls_config.get("country")
365 env["SANS"] = external_cert.get("external_certificate_parameters").get("sans")
366 env["KEYSTORE_PATH"] = KEYSTORE_PATH
367 env["KEYSTORE_PASSWORD"] = external_tls_config.get("keystore_password")
368 env["TRUSTSTORE_PATH"] = TRUSTSTORE_PATH
369 env["TRUSTSTORE_PASSWORD"] = external_tls_config.get("truststore_password")
371 # Create the volumes and volume mounts
372 sec = client.V1SecretVolumeSource(secret_name=external_tls_config.get("cert_secret_name"))
373 volumes.append(client.V1Volume(name="tls-volume", secret=sec))
374 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=external_cert.get("external_cert_directory")),
375 client.V1VolumeMount(name="tls-volume", mount_path=MOUNT_PATH)]
377 # Create the init container
378 init_containers.append(_create_container_object("cert-service-client", docker_image, False, volume_mounts=init_volume_mounts, env=env))
381 def _add_cert_post_processor_init_container(ctx, init_containers, tls_info, tls_config, external_cert, cert_post_processor_config):
382 # Adds an InitContainer to the pod to merge TLS and external TLS truststore into single file.
383 docker_image = cert_post_processor_config["image_tag"]
384 ctx.logger.info("Creating init container: cert post processor \n * [" + docker_image + "]")
386 tls_cert_dir = tls_info.get("cert_directory") or tls_config.get("component_cert_dir")
387 if not tls_cert_dir.endswith('/'):
390 tls_cert_file_path = tls_cert_dir + "trust.jks"
391 tls_cert_file_pass = tls_cert_dir + "trust.pass"
393 ext_cert_dir = tls_cert_dir + "external/"
395 output_type = (external_cert.get("cert_type") or DEFAULT_CERT_TYPE).lower()
396 ext_truststore_path = ext_cert_dir + "truststore." + _get_file_extension(output_type)
397 ext_truststore_pass = ''
398 if output_type != 'pem':
399 ext_truststore_pass = ext_cert_dir + "truststore.pass"
402 env["TRUSTSTORES_PATHS"] = tls_cert_file_path + ":" + ext_truststore_path
403 env["TRUSTSTORES_PASSWORDS_PATHS"] = tls_cert_file_pass + ":" + ext_truststore_pass
404 env["KEYSTORE_SOURCE_PATHS"] = _get_keystore_source_paths(output_type, ext_cert_dir)
405 env["KEYSTORE_DESTINATION_PATHS"] = _get_keystore_destination_paths(output_type, tls_cert_dir)
407 ctx.logger.info("TRUSTSTORES_PATHS: " + env["TRUSTSTORES_PATHS"])
408 ctx.logger.info("TRUSTSTORES_PASSWORDS_PATHS: " + env["TRUSTSTORES_PASSWORDS_PATHS"])
409 ctx.logger.info("KEYSTORE_SOURCE_PATHS: " + env["KEYSTORE_SOURCE_PATHS"])
410 ctx.logger.info("KEYSTORE_DESTINATION_PATHS: " + env["KEYSTORE_DESTINATION_PATHS"])
412 # Create the volumes and volume mounts
413 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_cert_dir)]
415 # Create the init container
416 init_containers.append(_create_container_object("cert-post-processor", docker_image, False, volume_mounts=init_volume_mounts, env=env))
419 def _get_file_extension(output_type):
426 def _get_keystore_source_paths(output_type, ext_cert_dir):
427 source_paths_template = {
428 'p12': "{0}keystore.p12:{0}keystore.pass",
429 'jks': "{0}keystore.jks:{0}keystore.pass",
430 'pem': "{0}keystore.pem:{0}key.pem",
432 return source_paths_template.format(ext_cert_dir)
434 def _get_keystore_destination_paths(output_type, tls_cert_dir):
435 destination_paths_template = {
436 'p12': "{0}cert.p12:{0}p12.pass",
437 'jks': "{0}cert.jks:{0}jks.pass",
438 'pem': "{0}cert.pem:{0}key.pem",
440 return destination_paths_template.format(tls_cert_dir)
443 def _process_port_map(port_map):
444 service_ports = [] # Ports exposed internally on the k8s network
445 exposed_ports = [] # Ports to be mapped to ports on the k8s nodes via NodePort
446 exposed_ports_ipv6 = []
447 for (cport, proto, ipv6), hport in port_map.items():
448 name = "xport-{0}-{1}".format(proto[0].lower(), cport)
451 port = client.V1ServicePort(port=cport, protocol=proto, name=name[1:])
452 if port not in service_ports:
453 service_ports.append(port)
456 exposed_ports_ipv6.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name))
458 exposed_ports.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name))
459 return service_ports, exposed_ports, exposed_ports_ipv6
461 def _service_exists(location, namespace, component_name):
464 _configure_api(location)
465 client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace)
467 except client.rest.ApiException:
472 def _patch_deployment(location, namespace, deployment, modify):
474 Gets the current spec for 'deployment' in 'namespace'
475 in the k8s cluster at 'location',
476 uses the 'modify' function to change the spec,
477 then sends the updated spec to k8s.
479 _configure_api(location)
481 # Get deployment spec
482 spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace)
484 # Apply changes to spec
487 # Patch the deploy with updated spec
488 client.AppsV1Api().patch_namespaced_deployment(deployment, namespace, spec)
490 def _execute_command_in_pod(location, namespace, pod_name, command):
492 Execute the command (specified by an argv-style list in the "command" parameter) in
493 the specified pod in the specified namespace at the specified location.
494 For now at least, we use this only to
495 run a notification script in a pod after a configuration change.
497 The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec".
498 Unfortunately due to a bug/limitation in the Python client library, we can't call it directly.
499 We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API.
500 I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py.
501 There are several issues tracking this, in various states. It isn't clear that there will ever
503 - https://github.com/kubernetes-client/python/issues/58
504 - https://github.com/kubernetes-client/python/issues/409
505 - https://github.com/kubernetes-client/python/issues/526
507 The main consequence of the workaround using "stream" is that the caller does not get an indication
508 of the exit code returned by the command when it completes execution. It turns out that the
509 original implementation of notification in the Docker plugin did not use this result, so we can
510 still match the original notification functionality.
512 The "stream" approach returns a string containing any output sent by the command to stdout or stderr.
513 We'll return that so it can logged.
515 _configure_api(location)
517 output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec,
525 except client.rest.ApiException as e:
526 # If the exception indicates the pod wasn't found, it's not a fatal error.
527 # It existed when we enumerated the pods for the deployment but no longer exists.
528 # Unfortunately, the only way to distinguish a pod not found from any other error
529 # is by looking at the reason text.
530 # (The ApiException's "status" field should contain the HTTP status code, which would
531 # be 404 if the pod isn't found, but empirical testing reveals that "status" is set
533 if "404 not found" in e.reason.lower():
534 output = "Pod not found"
538 return {"pod" : pod_name, "output" : output}
540 def deploy(ctx, namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs):
542 This will create a k8s Deployment and, if needed, one or two k8s Services.
543 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
544 We're not exposing k8s to the component developer and the blueprint author.
545 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide
546 the details from the component developer and the blueprint author.)
548 namespace: the Kubernetes namespace into which the component is deployed
549 component_name: the component name, used to derive names of Kubernetes entities
550 image: the docker image for the component being deployed
551 replica: the number of instances of the component to be deployed
552 always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of
553 the Docker image for the component, even if it is already present on the Kubernetes node.
555 - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images.
556 (DON'T PANIC: these are just the names of secrets held in the Kubernetes secret store.)
557 - filebeat: a dictionary of filebeat sidecar parameters:
558 "log_path" : mount point for log volume in filebeat container
559 "data_path" : mount point for data volume in filebeat container
560 "config_path" : mount point for config volume in filebeat container
561 "config_subpath" : subpath for config data in filebeat container
562 "config_map" : ConfigMap holding the filebeat configuration
563 "image": Docker image to use for filebeat
564 - tls: a dictionary of TLS-related information:
565 "cert_path": mount point for certificate volume in init container
566 "image": Docker image to use for TLS init container
567 "component_cert_dir" : default mount point for certs
568 - cert_post_processor: a dictionary of cert_post_processor information:
569 "image_tag": docker image to use for cert-post-processor init container
571 - volumes: array of volume objects, where a volume object is:
572 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
573 - ports: array of strings in the form "container_port:host_port"
574 - env: map of name-value pairs ( {name0: value0, name1: value1...}
575 - log_info: an object with info for setting up ELK logging, with the form:
576 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}
577 - tls_info: an object with info for setting up TLS (HTTPS), with the form:
578 {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" }
579 - external_cert: an object with information for setting up the init container for external certificates creation, with the form:
581 "external_cert_directory": "/path/to/directory_where_certs_should_be_placed",
582 "use_external_tls": true or false,
583 "ca_name": "ca-name-value",
584 "cert_type": "P12" or "JKS" or "PEM",
585 "external_certificate_parameters":
586 "common_name": "common-name-value",
587 "sans": "sans-value"}
588 - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"}
589 These label will be set on all the pods deployed as a result of this deploy() invocation.
590 - resources: dict with optional "limits" and "requests" resource requirements, each a dict containing:
591 - cpu: number CPU usage, like 0.5
592 - memory: string memory requirement, like "2Gi"
593 - readiness: dict with health check info; if present, used to create a readiness probe for the main container. Includes:
594 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
595 - interval: period (in seconds) between probes
596 - timeout: time (in seconds) to allow a probe to complete
597 - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types
598 - path: the full path to the script to be executed in the container for "script" and "docker" types
599 - liveness: dict with health check info; if present, used to create a liveness probe for the main container. Includes:
600 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
601 - interval: period (in seconds) between probes
602 - timeout: time (in seconds) to allow a probe to complete
603 - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types
604 - path: the full path to the script to be executed in the container for "script" and "docker" types
605 - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
609 deployment_ok = False
610 cip_service_created = False
611 deployment_description = {
612 "namespace": namespace,
613 "location" : kwargs.get("k8s_location"),
621 _configure_api(kwargs.get("k8s_location"))
622 core = client.CoreV1Api()
623 k8s_apps_v1_api_client = client.AppsV1Api()
625 # Parse the port mapping
626 container_ports, port_map = parse_ports(kwargs.get("ports", []))
628 # Parse the volumes list into volumes and volume_mounts for the deployment
629 volumes, volume_mounts = _parse_volumes(kwargs.get("volumes", []))
631 # Initialize the list of containers that will be part of the pod
635 # Set up the ELK logging sidecar container, if needed
636 _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"), k8sconfig.get("filebeat"))
638 # Set up TLS information
639 _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {}, k8sconfig.get("tls"))
641 # Set up external TLS information
642 external_cert = kwargs.get("external_cert")
643 if external_cert and external_cert.get("use_external_tls"):
644 _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, k8sconfig.get("external_cert"))
645 _add_cert_post_processor_init_container(ctx, init_containers, kwargs.get("tls_info") or {}, k8sconfig.get("tls"), external_cert, k8sconfig.get("cert_post_processor"))
647 # Create the container for the component
648 # Make it the first container in the pod
649 container_args = {key: kwargs.get(key) for key in ("env", "readiness", "liveness", "resources")}
650 container_args['container_ports'] = container_ports
651 container_args['volume_mounts'] = volume_mounts
652 containers.insert(0, _create_container_object(component_name, image, always_pull, **container_args))
654 # Build the k8s Deployment object
655 labels = kwargs.get("labels", {})
656 labels["app"] = component_name
657 dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"])
660 k8s_apps_v1_api_client.create_namespaced_deployment(namespace, dep)
662 deployment_description["deployment"] = _create_deployment_name(component_name)
664 # Create service(s), if a port mapping is specified
666 service_ports, exposed_ports, exposed_ports_ipv6 = _process_port_map(port_map)
668 # Create a ClusterIP service for access via the k8s network
669 service = _create_service_object(_create_service_name(component_name), component_name, service_ports, None,
670 labels, "ClusterIP", "IPv4")
671 core.create_namespaced_service(namespace, service)
672 cip_service_created = True
673 deployment_description["services"].append(_create_service_name(component_name))
675 # If there are ports to be exposed on the k8s nodes, create a "NodePort" service
678 _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports,
679 '', labels, "NodePort", "IPv4")
680 core.create_namespaced_service(namespace, exposed_service)
681 deployment_description["services"].append(_create_exposed_service_name(component_name))
683 if exposed_ports_ipv6:
684 exposed_service_ipv6 = \
685 _create_service_object(_create_exposed_v6_service_name(component_name), component_name,
686 exposed_ports_ipv6, '', labels, "NodePort", "IPv6")
687 core.create_namespaced_service(namespace, exposed_service_ipv6)
688 deployment_description["services"].append(_create_exposed_v6_service_name(component_name))
690 except Exception as e:
691 # If the ClusterIP service was created, delete the service:
692 if cip_service_created:
693 core.delete_namespaced_service(_create_service_name(component_name), namespace)
694 # If the deployment was created but not the service, delete the deployment
696 client.AppsV1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, body=client.V1DeleteOptions())
699 return dep, deployment_description
701 def undeploy(deployment_description):
702 _configure_api(deployment_description["location"])
704 namespace = deployment_description["namespace"]
706 # remove any services associated with the component
707 for service in deployment_description["services"]:
708 client.CoreV1Api().delete_namespaced_service(service, namespace)
710 # Have k8s delete the underlying pods and replicaset when deleting the deployment.
711 options = client.V1DeleteOptions(propagation_policy="Foreground")
712 client.AppsV1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options)
714 def is_available(location, namespace, component_name):
715 _configure_api(location)
716 dep_status = client.AppsV1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace)
717 # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec
718 # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation
719 return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas
721 def scale(deployment_description, replicas):
722 ''' Trigger a scaling operation by updating the replica count for the Deployment '''
724 def update_replica_count(spec):
725 spec.spec.replicas = replicas
728 _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_replica_count)
730 def upgrade(deployment_description, image, container_index = 0):
731 ''' Trigger a rolling upgrade by sending a new image name/tag to k8s '''
733 def update_image(spec):
734 spec.spec.template.spec.containers[container_index].image = image
737 _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_image)
739 def rollback(deployment_description, rollback_to=0):
741 Undo upgrade by rolling back to a previous revision of the deployment.
742 By default, go back one revision.
743 rollback_to can be used to supply a specific revision number.
744 Returns the image for the app container and the replica count from the rolled-back deployment
748 Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method.
749 The k8s python client code throws an exception while processing the response from the API.
751 - https://github.com/kubernetes-client/python/issues/491
752 - https://github.com/kubernetes/kubernetes/pull/63837
753 The fix has been merged into the master branch but is not in the latest release.
755 _configure_api(deployment_description["location"])
756 deployment = deployment_description["deployment"]
757 namespace = deployment_description["namespace"]
759 # Initiate the rollback
760 client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback(
763 client.AppsV1beta1DeploymentRollback(name=deployment, rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to)))
765 # Read back the spec for the rolled-back deployment
766 spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace)
767 return spec.spec.template.spec.containers[0].image, spec.spec.replicas
769 def execute_command_in_deployment(deployment_description, command):
771 Enumerates the pods in the k8s deployment identified by "deployment_description",
772 then executes the command (represented as an argv-style list) in "command" in
773 container 0 (the main application container) each of those pods.
775 Note that the sets of pods associated with a deployment can change over time. The
776 enumeration is a snapshot at one point in time. The command will not be executed in
777 pods that are created after the initial enumeration. If a pod disappears after the
778 initial enumeration and before the command is executed, the attempt to execute the
779 command will fail. This is not treated as a fatal error.
781 This approach is reasonable for the one current use case for "execute_command": running a
782 script to notify a container that its configuration has changed as a result of a
783 policy change. In this use case, the new configuration information is stored into
784 the configuration store (Consul), the pods are enumerated, and the command is executed.
785 If a pod disappears after the enumeration, the fact that the command cannot be run
786 doesn't matter--a nonexistent pod doesn't need to be reconfigured. Similarly, a pod that
787 comes up after the enumeration will get its initial configuration from the updated version
790 The optimal solution here would be for k8s to provide an API call to execute a command in
791 all of the pods for a deployment. Unfortunately, k8s does not provide such a call--the
792 only call provided by k8s operates at the pod level, not the deployment level.
794 Another interesting k8s factoid: there's no direct way to list the pods belong to a
795 particular k8s deployment. The deployment code above sets a label ("k8sdeployment") on
796 the pod that has the k8s deployment name. To list the pods, the code below queries for
797 pods with the label carrying the deployment name.
799 location = deployment_description["location"]
800 _configure_api(location)
801 deployment = deployment_description["deployment"]
802 namespace = deployment_description["namespace"]
804 # Get names of all the running pods belonging to the deployment
805 pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod(
806 namespace = namespace,
807 label_selector = "k8sdeployment={0}".format(deployment),
808 field_selector = "status.phase=Running"
811 # Execute command in the running pods
812 return [_execute_command_in_pod(location, namespace, pod_name, command)
813 for pod_name in pod_names]