import os
import re
import uuid
+
from kubernetes import config, client, stream
# Default values for readiness probe
# Finally create the container for the sidecar
containers.append(_create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts))
-def _add_tls_init_container(init_containers, volumes, volume_mounts, tls_info, tls_config):
+def _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, tls_info, tls_config):
# Adds an InitContainer to the pod to set up TLS certificate information. For components that act as a
# server(tls_info["use_tls"] is True), the InitContainer will populate a directory with server and CA certificate
# materials in various formats. For other components (tls_info["use_tls"] is False, or tls_info is not specified),
# In either case, the certificate directory is mounted onto the component container filesystem at the location
# specified by tls_info["component_cert_dir"], if present, otherwise at the configured default mount point
# (tls_config["component_cert_dir"]).
+ docker_image = tls_config["image"]
+ ctx.logger.info("Creating init container: TLS \n * [" + docker_image + "]")
cert_directory = tls_info.get("cert_directory") or tls_config.get("component_cert_dir")
env = {}
init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])]
# Create the init container
- init_containers.append(_create_container_object("init-tls", tls_config["image"], False, volume_mounts=init_volume_mounts, env=env))
+ init_containers.append(_create_container_object("init-tls", docker_image, False, volume_mounts=init_volume_mounts, env=env))
+
+def _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, external_tls_config):
+ # Adds an InitContainer to the pod which will generate external TLS certificates.
+ docker_image = external_tls_config["image_tag"]
+ ctx.logger.info("Creating init container: external TLS \n * [" + docker_image + "]")
-def _add_external_tls_init_container(init_containers, volumes, external_cert, external_tls_config):
env = {}
output_path = external_cert.get("external_cert_directory")
if not output_path.endswith('/'):
client.V1VolumeMount(name="tls-volume", mount_path=MOUNT_PATH)]
# Create the init container
- init_containers.append(_create_container_object("cert-service-client", external_tls_config["image_tag"], False, volume_mounts=init_volume_mounts, env=env))
+ init_containers.append(_create_container_object("cert-service-client", docker_image, False, volume_mounts=init_volume_mounts, env=env))
+
+
+def _add_truststore_merger_init_container(ctx, init_containers, tls_info, tls_config, external_cert, truststore_merger_config):
+ # Adds an InitContainer to the pod to merge TLS and external TLS truststore into single file.
+ docker_image = truststore_merger_config["image_tag"]
+ ctx.logger.info("Creating init container: truststore merger \n * [" + docker_image + "]")
+
+ tls_cert_dir = tls_info.get("cert_directory") or tls_config.get("component_cert_dir")
+ if not tls_cert_dir.endswith('/'):
+ tls_cert_dir += '/'
+
+ tls_cert_file_path = tls_cert_dir + "trust.jks"
+ tls_cert_file_pass = tls_cert_dir + "trust.pass"
+
+ ext_cert_dir = tls_cert_dir + "external/"
+
+ output_type = (external_cert.get("cert_type") or 'p12').lower()
+ ext_truststore_path = ext_cert_dir + "truststore." + _get_file_extension(output_type)
+ ext_truststore_pass = ''
+ if output_type != 'pem':
+ ext_truststore_pass = ext_cert_dir + "truststore.pass"
+
+ env = {}
+ env["TRUSTSTORES_PATHS"] = tls_cert_file_path + ":" + ext_truststore_path
+ env["TRUSTSTORES_PASSWORDS_PATHS"] = tls_cert_file_pass + ":" + ext_truststore_pass
+
+ ctx.logger.info("TRUSTSTORES_PATHS: " + env["TRUSTSTORES_PATHS"])
+ ctx.logger.info("TRUSTSTORES_PASSWORDS_PATHS: " + env["TRUSTSTORES_PASSWORDS_PATHS"])
+
+ # Create the volumes and volume mounts
+ init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_cert_dir)]
+
+ # Create the init container
+ init_containers.append(_create_container_object("truststore-merger", docker_image, False, volume_mounts=init_volume_mounts, env=env))
+
+def _get_file_extension(output_type):
+ return {
+ 'p12': 'p12',
+ 'pem': 'pem',
+ 'jks': 'jks',
+ }[output_type]
def _process_port_map(port_map):
service_ports = [] # Ports exposed internally on the k8s network
return {"pod" : pod_name, "output" : output}
-def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs):
+def deploy(ctx, namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs):
'''
This will create a k8s Deployment and, if needed, one or two k8s Services.
(We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
"cert_path": mount point for certificate volume in init container
"image": Docker image to use for TLS init container
"component_cert_dir" : default mount point for certs
+ - truststore-merger: a dictionary of trustore-merger information:
+ "image_tag": docker image to use for truststore-merger init container
kwargs may have:
- volumes: array of volume objects, where a volume object is:
{"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
_add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"), k8sconfig.get("filebeat"))
# Set up TLS information
- _add_tls_init_container(init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {}, k8sconfig.get("tls"))
+ _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {}, k8sconfig.get("tls"))
# Set up external TLS information
external_cert = kwargs.get("external_cert")
if external_cert and external_cert.get("use_external_tls"):
- _add_external_tls_init_container(init_containers, volumes, external_cert, k8sconfig.get("external_cert"))
+ _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, k8sconfig.get("external_cert"))
+ _add_truststore_merger_init_container(ctx, init_containers, kwargs.get("tls_info") or {}, k8sconfig.get("tls"), external_cert, k8sconfig.get("truststore_merger"))
# Create the container for the component
# Make it the first container in the pod
"keystore_password" : "secret1",
"truststore_password" : "secret2"
},
+ "truststore_merger": {
+ "image_tag": "repo/oom-truststore-merger:1.2.3"
+ },
"cbs": {
"base_url": "https://config-binding-service:10443/service_component_all/test-component"
}
for k in expected_envs:
assert (k in envs and expected_envs[k] == envs[k])
+def verify_truststore_merger(dep):
+ cert_container = dep.spec.template.spec.init_containers[2]
+ print(cert_container)
+ assert cert_container.image == "repo/oom-truststore-merger:1.2.3"
+ assert cert_container.name == "truststore-merger"
+ assert len(cert_container.volume_mounts) == 1
+ assert cert_container.volume_mounts[0].name == "tls-info"
+ assert cert_container.volume_mounts[0].mount_path == "/opt/dcae/cacert/"
+
+ expected_envs = {
+ "TRUSTSTORES_PATHS": "/opt/dcae/cacert/trust.jks:/opt/dcae/cacert/external/truststore.p12",
+ "TRUSTSTORES_PASSWORDS_PATHS": "/opt/dcae/cacert/trust.pass:/opt/dcae/cacert/external/truststore.pass",
+ }
+
+ envs = {k.name: k.value for k in cert_container.env}
+ for k in expected_envs:
+ assert (k in envs and expected_envs[k] == envs[k])
+
+
def do_deploy(tls_info=None):
''' Common deployment operations '''
import k8sclient.k8sclient
if tls_info:
kwargs["tls_info"] = tls_info
- dep, deployment_description = k8sclient.k8sclient.deploy("k8stest", "testcomponent", "example.com/testcomponent:1.4.3", 1, False, k8s_test_config, **kwargs)
+ dep, deployment_description = k8sclient.k8sclient.deploy(k8s_ctx(), "k8stest", "testcomponent", "example.com/testcomponent:1.4.3", 1, False, k8s_test_config, **kwargs)
# Make sure all of the basic k8s parameters are correct
verify_common(dep, deployment_description)
kwargs['resources'] = _set_resources()
kwargs["external_cert"] = ext_tls_info
- dep, deployment_description = k8sclient.k8sclient.deploy("k8stest", "testcomponent", "example.com/testcomponent:1.4.3", 1, False, k8s_test_config, **kwargs)
+ dep, deployment_description = k8sclient.k8sclient.deploy(k8s_ctx(), "k8stest", "testcomponent", "example.com/testcomponent:1.4.3", 1, False, k8s_test_config, **kwargs)
# Make sure all of the basic k8s parameters are correct
verify_common(dep, deployment_description)
return dep, deployment_description
+
+class k8s_logger:
+ def info(self, text):
+ print(text)
+
+class k8s_ctx:
+ logger = k8s_logger()
+
+