# ================================================================================
# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2020 Pantheon.tech. All rights reserved.
-# Copyright (c) 2020 Nokia. All rights reserved.
+# Copyright (c) 2020-2021 Nokia. All rights reserved.
# Copyright (c) 2020 J. F. Lucas. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
PROBE_DEFAULT_TIMEOUT = 1
# Location of k8s cluster config file ("kubeconfig")
-K8S_CONFIG_PATH="/opt/onap/kube/kubeconfig"
+K8S_CONFIG_PATH = "/opt/onap/kube/kubeconfig"
# Regular expression for interval/timeout specification
INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$")
TRUSTSTORE_PATH = MOUNT_PATH + "truststore.jks"
DEFAULT_CERT_TYPE = "p12"
+
def _create_deployment_name(component_name):
return "dep-{0}".format(component_name)[:63]
+
def _create_service_name(component_name):
return "{0}".format(component_name)[:63]
+
def _create_exposed_service_name(component_name):
return ("x{0}".format(component_name))[:63]
+
def _create_exposed_v6_service_name(component_name):
return ("x{0}-ipv6".format(component_name))[:63]
+
def _configure_api(location=None):
# Look for a kubernetes config file
if os.path.exists(K8S_CONFIG_PATH):
# where we can set the environment to what we like.
# This is probably brittle! Maybe there's a better alternative.
localenv = {
- config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local",
- config.incluster_config.SERVICE_PORT_ENV_NAME : "443"
+ config.incluster_config.SERVICE_HOST_ENV_NAME: "kubernetes.default.svc.cluster.local",
+ config.incluster_config.SERVICE_PORT_ENV_NAME: "443"
}
config.incluster_config.InClusterConfigLoader(
token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME,
environ=localenv
).load_and_set()
+
def _parse_interval(t):
"""
Parse an interval specification
raise ValueError("Bad interval specification: {0}".format(t))
return time
+
def _create_probe(hc, port):
- ''' Create a Kubernetes probe based on info in the health check dictionary hc '''
+ """ Create a Kubernetes probe based on info in the health check dictionary hc """
probe_type = hc['type']
probe = None
period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD))
timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT))
if probe_type in ['http', 'https']:
probe = client.V1Probe(
- failure_threshold = 1,
- initial_delay_seconds = 5,
- period_seconds = period,
- timeout_seconds = timeout,
- http_get = client.V1HTTPGetAction(
- path = hc['endpoint'],
- port = port,
- scheme = probe_type.upper()
- )
+ failure_threshold=1,
+ initial_delay_seconds=5,
+ period_seconds=period,
+ timeout_seconds=timeout,
+ http_get=client.V1HTTPGetAction(
+ path=hc['endpoint'],
+ port=port,
+ scheme=probe_type.upper()
+ )
)
elif probe_type in ['script', 'docker']:
probe = client.V1Probe(
- failure_threshold = 1,
- initial_delay_seconds = 5,
- period_seconds = period,
- timeout_seconds = timeout,
- _exec = client.V1ExecAction(
- command = hc['script'].split( )
- )
+ failure_threshold=1,
+ initial_delay_seconds=5,
+ period_seconds=period,
+ timeout_seconds=timeout,
+ _exec=client.V1ExecAction(
+ command=hc['script'].split()
+ )
)
return probe
+
def _create_resources(resources=None):
if resources is not None:
resources_obj = client.V1ResourceRequirements(
- limits = resources.get("limits"),
- requests = resources.get("requests")
+ limits=resources.get("limits"),
+ requests=resources.get("requests")
)
return resources_obj
else:
return None
+
def _create_container_object(name, image, always_pull, **kwargs):
# Set up environment variables
# Copy any passed in environment variables
env = kwargs.get('env') or {}
env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env]
# Add POD_IP with the IP address of the pod running the container
- pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP"))
- env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip))
+ pod_ip = client.V1EnvVarSource(field_ref=client.V1ObjectFieldSelector(field_path="status.podIP"))
+ env_vars.append(client.V1EnvVar(name="POD_IP", value_from=pod_ip))
# If a health check is specified, create a readiness/liveness probe
# (For an HTTP-based check, we assume it's at the first container port)
liveness_probe=live_probe
)
+
def _create_deployment_object(component_name,
containers,
init_containers,
replicas,
volumes,
- labels={},
- pull_secrets=[]):
-
+ labels=None,
+ pull_secrets=None):
+ if labels is None:
+ labels = {}
+ if pull_secrets is None:
+ pull_secrets = []
deployment_name = _create_deployment_name(component_name)
# Label the pod with the deployment name, so we can find it easily
- labels.update({"k8sdeployment" : deployment_name})
+ labels.update({"k8sdeployment": deployment_name})
# pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
# See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
)
return service
+
def parse_ports(port_list):
- '''
+ """
Parse the port list into a list of container ports (needed to create the container)
and to a set of port mappings to set up k8s services.
- '''
+ """
container_ports = []
port_map = {}
for p in port_list:
volume_mounts = []
for v in volume_list:
vname = str(uuid.uuid4())
- vhost = v['host']['path']
vcontainer = v['container']['bind']
vro = (v['container'].get('mode') == 'ro')
- volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
+ if 'host' in v:
+ vhost = v['host']['path']
+ volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
+ if 'config_volume' in v:
+ vconfig_volume = v['config_volume']['name']
+ volumes.append(client.V1Volume(name=vname, config_map=client.V1ConfigMapVolumeSource(default_mode="0644",
+ name=vconfig_volume,
+ optional=True)))
volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro))
return volumes, volume_mounts
+
def _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, log_info, filebeat):
if not log_info or not filebeat:
return
volumes.append(
client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=filebeat["config_map"])))
sidecar_volume_mounts.append(
- client.V1VolumeMount(name="filebeat-conf", mount_path=filebeat["config_path"], sub_path=filebeat["config_subpath"]))
+ client.V1VolumeMount(name="filebeat-conf", mount_path=filebeat["config_path"],
+ sub_path=filebeat["config_subpath"]))
# Finally create the container for the sidecar
- containers.append(_create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts))
+ containers.append(
+ _create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts))
+
def _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, tls_info, tls_config):
- # Adds an InitContainer to the pod to set up TLS certificate information. For components that act as a
- # server(tls_info["use_tls"] is True), the InitContainer will populate a directory with server and CA certificate
- # materials in various formats. For other components (tls_info["use_tls"] is False, or tls_info is not specified),
- # the InitContainer will populate a directory with CA certificate materials in PEM and JKS formats.
- # In either case, the certificate directory is mounted onto the component container filesystem at the location
- # specified by tls_info["component_cert_dir"], if present, otherwise at the configured default mount point
- # (tls_config["component_cert_dir"]).
+ # Adds an InitContainer to the pod to set up TLS certificate information. For components that act as a server(
+ # tls_info["use_tls"] is True), the InitContainer will populate a directory with server and CA certificate
+ # materials in various formats. For other components (tls_info["use_tls"] is False, or tls_info is not
+ # specified), the InitContainer will populate a directory with CA certificate materials in PEM and JKS formats.
+ # In either case, the certificate directory is mounted onto the component container filesystem at the location
+ # specified by tls_info["component_cert_dir"], if present, otherwise at the configured default mount point (
+ # tls_config["component_cert_dir"]).
docker_image = tls_config["image"]
ctx.logger.info("Creating init container: TLS \n * [" + docker_image + "]")
init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])]
# Create the init container
- init_containers.append(_create_container_object("init-tls", docker_image, False, volume_mounts=init_volume_mounts, env=env))
+ init_containers.append(
+ _create_container_object("init-tls", docker_image, False, volume_mounts=init_volume_mounts, env=env))
+
def _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, external_tls_config):
# Adds an InitContainer to the pod which will generate external TLS certificates.
env = {}
output_path = external_cert.get("external_cert_directory")
if not output_path.endswith('/'):
- output_path += '/'
+ output_path += '/'
env["REQUEST_URL"] = external_tls_config.get("request_url")
env["REQUEST_TIMEOUT"] = external_tls_config.get("timeout")
# Create the volumes and volume mounts
sec = client.V1SecretVolumeSource(secret_name=external_tls_config.get("cert_secret_name"))
volumes.append(client.V1Volume(name="tls-volume", secret=sec))
- init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=external_cert.get("external_cert_directory")),
- client.V1VolumeMount(name="tls-volume", mount_path=MOUNT_PATH)]
+ init_volume_mounts = [
+ client.V1VolumeMount(name="tls-info", mount_path=external_cert.get("external_cert_directory")),
+ client.V1VolumeMount(name="tls-volume", mount_path=MOUNT_PATH)]
# Create the init container
- init_containers.append(_create_container_object("cert-service-client", docker_image, False, volume_mounts=init_volume_mounts, env=env))
+ init_containers.append(
+ _create_container_object("cert-service-client", docker_image, False, volume_mounts=init_volume_mounts, env=env))
-def _add_cert_post_processor_init_container(ctx, init_containers, tls_info, tls_config, external_cert, cert_post_processor_config):
+def _add_cert_post_processor_init_container(ctx, init_containers, tls_info, tls_config, external_cert,
+ cert_post_processor_config):
# Adds an InitContainer to the pod to merge TLS and external TLS truststore into single file.
docker_image = cert_post_processor_config["image_tag"]
ctx.logger.info("Creating init container: cert post processor \n * [" + docker_image + "]")
if output_type != 'pem':
ext_truststore_pass = ext_cert_dir + "truststore.pass"
- env = {}
- env["TRUSTSTORES_PATHS"] = tls_cert_file_path + ":" + ext_truststore_path
- env["TRUSTSTORES_PASSWORDS_PATHS"] = tls_cert_file_pass + ":" + ext_truststore_pass
- env["KEYSTORE_SOURCE_PATHS"] = _get_keystore_source_paths(output_type, ext_cert_dir)
- env["KEYSTORE_DESTINATION_PATHS"] = _get_keystore_destination_paths(output_type, tls_cert_dir)
+ env = {"TRUSTSTORES_PATHS": tls_cert_file_path + ":" + ext_truststore_path,
+ "TRUSTSTORES_PASSWORDS_PATHS": tls_cert_file_pass + ":" + ext_truststore_pass,
+ "KEYSTORE_SOURCE_PATHS": _get_keystore_source_paths(output_type, ext_cert_dir),
+ "KEYSTORE_DESTINATION_PATHS": _get_keystore_destination_paths(output_type, tls_cert_dir)}
ctx.logger.info("TRUSTSTORES_PATHS: " + env["TRUSTSTORES_PATHS"])
ctx.logger.info("TRUSTSTORES_PASSWORDS_PATHS: " + env["TRUSTSTORES_PASSWORDS_PATHS"])
init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_cert_dir)]
# Create the init container
- init_containers.append(_create_container_object("cert-post-processor", docker_image, False, volume_mounts=init_volume_mounts, env=env))
+ init_containers.append(
+ _create_container_object("cert-post-processor", docker_image, False, volume_mounts=init_volume_mounts, env=env))
def _get_file_extension(output_type):
'jks': 'jks',
}[output_type]
+
def _get_keystore_source_paths(output_type, ext_cert_dir):
source_paths_template = {
'p12': "{0}keystore.p12:{0}keystore.pass",
}[output_type]
return source_paths_template.format(ext_cert_dir)
+
def _get_keystore_destination_paths(output_type, tls_cert_dir):
destination_paths_template = {
'p12': "{0}cert.p12:{0}p12.pass",
exposed_ports.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name))
return service_ports, exposed_ports, exposed_ports_ipv6
+
def _service_exists(location, namespace, component_name):
exists = False
try:
return exists
+
def _patch_deployment(location, namespace, deployment, modify):
'''
Gets the current spec for 'deployment' in 'namespace'
# Patch the deploy with updated spec
client.AppsV1Api().patch_namespaced_deployment(deployment, namespace, spec)
+
def _execute_command_in_pod(location, namespace, pod_name, command):
'''
Execute the command (specified by an argv-style list in the "command" parameter) in
_configure_api(location)
try:
output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec,
- name=pod_name,
- namespace=namespace,
- command=command,
- stdout=True,
- stderr=True,
- stdin=False,
- tty=False)
+ name=pod_name,
+ namespace=namespace,
+ command=command,
+ stdout=True,
+ stderr=True,
+ stdin=False,
+ tty=False)
except client.rest.ApiException as e:
# If the exception indicates the pod wasn't found, it's not a fatal error.
# It existed when we enumerated the pods for the deployment but no longer exists.
else:
raise e
- return {"pod" : pod_name, "output" : output}
+ return {"pod": pod_name, "output": output}
+
def deploy(ctx, namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs):
- '''
+ """
This will create a k8s Deployment and, if needed, one or two k8s Services.
(We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
We're not exposing k8s to the component developer and the blueprint author.
- path: the full path to the script to be executed in the container for "script" and "docker" types
- k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
- '''
+ """
deployment_ok = False
cip_service_created = False
deployment_description = {
"namespace": namespace,
- "location" : kwargs.get("k8s_location"),
+ "location": kwargs.get("k8s_location"),
"deployment": '',
"services": []
}
init_containers = []
# Set up the ELK logging sidecar container, if needed
- _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"), k8sconfig.get("filebeat"))
+ _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"),
+ k8sconfig.get("filebeat"))
# Set up TLS information
- _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {}, k8sconfig.get("tls"))
+ _add_tls_init_container(ctx, init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {},
+ k8sconfig.get("tls"))
# Set up external TLS information
external_cert = kwargs.get("external_cert")
if external_cert and external_cert.get("use_external_tls"):
- _add_external_tls_init_container(ctx, init_containers, volumes, external_cert, k8sconfig.get("external_cert"))
- _add_cert_post_processor_init_container(ctx, init_containers, kwargs.get("tls_info") or {}, k8sconfig.get("tls"), external_cert, k8sconfig.get("cert_post_processor"))
+ _add_external_tls_init_container(ctx, init_containers, volumes, external_cert,
+ k8sconfig.get("external_cert"))
+ _add_cert_post_processor_init_container(ctx, init_containers, kwargs.get("tls_info") or {},
+ k8sconfig.get("tls"), external_cert,
+ k8sconfig.get("cert_post_processor"))
# Create the container for the component
# Make it the first container in the pod
# Build the k8s Deployment object
labels = kwargs.get("labels", {})
labels["app"] = component_name
- dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"])
+ dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels,
+ pull_secrets=k8sconfig["image_pull_secrets"])
# Have k8s deploy it
k8s_apps_v1_api_client.create_namespaced_deployment(namespace, dep)
core.delete_namespaced_service(_create_service_name(component_name), namespace)
# If the deployment was created but not the service, delete the deployment
if deployment_ok:
- client.AppsV1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, body=client.V1DeleteOptions())
+ client.AppsV1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace,
+ body=client.V1DeleteOptions())
raise e
return dep, deployment_description
+
def undeploy(deployment_description):
_configure_api(deployment_description["location"])
options = client.V1DeleteOptions(propagation_policy="Foreground")
client.AppsV1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options)
+
def is_available(location, namespace, component_name):
_configure_api(location)
- dep_status = client.AppsV1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace)
- # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec
- # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation
+ dep_status = client.AppsV1Api().read_namespaced_deployment_status(_create_deployment_name(component_name),
+ namespace)
+ # Check if the number of available replicas is equal to the number requested and that the replicas match the
+ # current spec This check can be used to verify completion of an initial deployment, a scale operation,
+ # or an update operation
return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas
+
def scale(deployment_description, replicas):
- ''' Trigger a scaling operation by updating the replica count for the Deployment '''
+ """ Trigger a scaling operation by updating the replica count for the Deployment """
def update_replica_count(spec):
spec.spec.replicas = replicas
return spec
- _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_replica_count)
+ _patch_deployment(deployment_description["location"], deployment_description["namespace"],
+ deployment_description["deployment"], update_replica_count)
-def upgrade(deployment_description, image, container_index = 0):
- ''' Trigger a rolling upgrade by sending a new image name/tag to k8s '''
+
+def upgrade(deployment_description, image, container_index=0):
+ """ Trigger a rolling upgrade by sending a new image name/tag to k8s """
def update_image(spec):
spec.spec.template.spec.containers[container_index].image = image
return spec
- _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_image)
+ _patch_deployment(deployment_description["location"], deployment_description["namespace"],
+ deployment_description["deployment"], update_image)
+
def rollback(deployment_description, rollback_to=0):
- '''
+ """
Undo upgrade by rolling back to a previous revision of the deployment.
By default, go back one revision.
rollback_to can be used to supply a specific revision number.
Returns the image for the app container and the replica count from the rolled-back deployment
- '''
+ """
'''
2018-07-13
Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method.
client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback(
deployment,
namespace,
- client.AppsV1beta1DeploymentRollback(name=deployment, rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to)))
+ client.AppsV1beta1DeploymentRollback(name=deployment,
+ rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to)))
# Read back the spec for the rolled-back deployment
spec = client.AppsV1Api().read_namespaced_deployment(deployment, namespace)
return spec.spec.template.spec.containers[0].image, spec.spec.replicas
+
def execute_command_in_deployment(deployment_description, command):
- '''
+ """
Enumerates the pods in the k8s deployment identified by "deployment_description",
then executes the command (represented as an argv-style list) in "command" in
container 0 (the main application container) each of those pods.
particular k8s deployment. The deployment code above sets a label ("k8sdeployment") on
the pod that has the k8s deployment name. To list the pods, the code below queries for
pods with the label carrying the deployment name.
- '''
+ """
location = deployment_description["location"]
_configure_api(location)
deployment = deployment_description["deployment"]
# Get names of all the running pods belonging to the deployment
pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod(
- namespace = namespace,
- label_selector = "k8sdeployment={0}".format(deployment),
- field_selector = "status.phase=Running"
+ namespace=namespace,
+ label_selector="k8sdeployment={0}".format(deployment),
+ field_selector="status.phase=Running"
).items]
# Execute command in the running pods