1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
24 from kubernetes import config, client, stream
26 # Default values for readiness probe
27 PROBE_DEFAULT_PERIOD = 15
28 PROBE_DEFAULT_TIMEOUT = 1
30 # Regular expression for interval/timeout specification
31 INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$")
32 # Conversion factors to seconds
33 FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600}
35 # Regular expression for port mapping
36 # group 1: container port
37 # group 2: / + protocol
40 PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$")
42 def _create_deployment_name(component_name):
43 return "dep-{0}".format(component_name)
45 def _create_service_name(component_name):
46 return "{0}".format(component_name)
48 def _create_exposed_service_name(component_name):
49 return ("x{0}".format(component_name))[:63]
52 # Look for a kubernetes config file in ~/.kube/config
53 kubepath = os.path.join(os.environ["HOME"], '.kube/config')
54 if os.path.exists(kubepath):
55 config.load_kube_config(kubepath)
57 # Maybe we're running in a k8s container and we can use info provided by k8s
58 # We would like to use:
59 # config.load_incluster_config()
60 # but this looks into os.environ for kubernetes host and port, and from
61 # the plugin those aren't visible. So we use the InClusterConfigLoader class,
62 # where we can set the environment to what we like.
63 # This is probably brittle! Maybe there's a better alternative.
65 config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local",
66 config.incluster_config.SERVICE_PORT_ENV_NAME : "443"
68 config.incluster_config.InClusterConfigLoader(
69 token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME,
70 cert_filename=config.incluster_config.SERVICE_CERT_FILENAME,
74 def _parse_interval(t):
76 Parse an interval specification
78 - a simple integer quantity, interpreted as seconds
79 - a string representation of a decimal integer, interpreted as seconds
80 - a string consisting of a represention of an decimal integer followed by a unit,
81 with "s" representing seconds, "m" representing minutes,
82 and "h" representing hours
83 Used for compatibility with the Docker plugin, where time intervals
84 for health checks were specified as strings with a number and a unit.
85 See 'intervalspec' above for the regular expression that's accepted.
87 m = INTERVAL_SPEC.match(str(t))
89 time = int(m.group(1)) * FACTORS[m.group(2)]
91 raise ValueError("Bad interval specification: {0}".format(t))
94 def _create_probe(hc, port, use_tls=False):
95 ''' Create a Kubernetes probe based on info in the health check dictionary hc '''
96 probe_type = hc['type']
98 period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD))
99 timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT))
100 if probe_type in ['http', 'https']:
101 probe = client.V1Probe(
102 failure_threshold = 1,
103 initial_delay_seconds = 5,
104 period_seconds = period,
105 timeout_seconds = timeout,
106 http_get = client.V1HTTPGetAction(
107 path = hc['endpoint'],
109 scheme = probe_type.upper()
112 elif probe_type in ['script', 'docker']:
113 probe = client.V1Probe(
114 failure_threshold = 1,
115 initial_delay_seconds = 5,
116 period_seconds = period,
117 timeout_seconds = timeout,
118 _exec = client.V1ExecAction(
119 command = [hc['script']]
124 def _create_resources(resources=None):
125 if resources is not None:
126 resources_obj = client.V1ResourceRequirements(
127 limits = resources.get("limits"),
128 requests = resources.get("requests")
134 def _create_container_object(name, image, always_pull, use_tls=False, env={}, container_ports=[], volume_mounts = [], resources = None, readiness = None):
135 # Set up environment variables
136 # Copy any passed in environment variables
137 env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env.keys()]
138 # Add POD_IP with the IP address of the pod running the container
139 pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP"))
140 env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip))
142 # If a health check is specified, create a readiness probe
143 # (For an HTTP-based check, we assume it's at the first container port)
148 if len(container_ports) > 0:
149 (hc_port, proto) = container_ports[0]
150 probe = _create_probe(readiness, hc_port, use_tls)
153 resources_obj = _create_resources(resources)
156 # Define container for pod
157 return client.V1Container(
160 image_pull_policy='Always' if always_pull else 'IfNotPresent',
162 ports=[client.V1ContainerPort(container_port=p, protocol=proto) for (p, proto) in container_ports],
163 volume_mounts = volume_mounts,
164 resources = resources_obj,
165 readiness_probe = probe
168 def _create_deployment_object(component_name,
176 deployment_name = _create_deployment_name(component_name)
178 # Label the pod with the deployment name, so we can find it easily
179 labels.update({"k8sdeployment" : deployment_name})
181 # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
182 # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
184 for secret in pull_secrets:
185 ips.append(client.V1LocalObjectReference(name=secret))
187 # Define pod template
188 template = client.V1PodTemplateSpec(
189 metadata=client.V1ObjectMeta(labels=labels),
190 spec=client.V1PodSpec(hostname=component_name,
191 containers=containers,
192 init_containers=init_containers,
194 image_pull_secrets=ips)
197 # Define deployment spec
198 spec = client.ExtensionsV1beta1DeploymentSpec(
203 # Create deployment object
204 deployment = client.ExtensionsV1beta1Deployment(
206 metadata=client.V1ObjectMeta(name=deployment_name),
212 def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type):
213 service_spec = client.V1ServiceSpec(
215 selector={"app" : component_name},
219 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations)
221 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels)
223 service = client.V1Service(
231 def _parse_ports(port_list):
233 Parse the port list into a list of container ports (needed to create the container)
234 and to a set of port mappings to set up k8s services.
239 m = PORTS.match(p.strip())
241 cport = int(m.group(1))
242 hport = int (m.group(4))
244 proto = (m.group(3)).upper()
247 container_ports.append((cport, proto))
248 port_map[(cport, proto)] = hport
250 raise ValueError("Bad port specification: {0}".format(p))
252 return container_ports, port_map
254 def _parse_volumes(volume_list):
257 for v in volume_list:
258 vname = str(uuid.uuid4())
259 vhost = v['host']['path']
260 vcontainer = v['container']['bind']
261 vro = (v['container']['mode'] == 'ro')
262 volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
263 volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro))
265 return volumes, volume_mounts
267 def _service_exists(namespace, component_name):
271 client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace)
273 except client.rest.ApiException:
278 def _patch_deployment(namespace, deployment, modify):
280 Gets the current spec for 'deployment' in 'namespace',
281 uses the 'modify' function to change the spec,
282 then sends the updated spec to k8s.
286 # Get deployment spec
287 spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
289 # Apply changes to spec
292 # Patch the deploy with updated spec
293 client.ExtensionsV1beta1Api().patch_namespaced_deployment(deployment, namespace, spec)
295 def _execute_command_in_pod(namespace, pod_name, command):
297 Execute the command (specified by an argv-style list in the "command" parameter) in
298 the specified pod in the specified namespace. For now at least, we use this only to
299 run a notification script in a pod after a configuration change.
301 The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec".
302 Unfortunately due to a bug/limitation in the Python client library, we can't call it directly.
303 We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API.
304 I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py.
305 There are several issues tracking this, in various states. It isn't clear that there will ever
307 - https://github.com/kubernetes-client/python/issues/58
308 - https://github.com/kubernetes-client/python/issues/409
309 - https://github.com/kubernetes-client/python/issues/526
311 The main consequence of the workaround using "stream" is that the caller does not get an indication
312 of the exit code returned by the command when it completes execution. It turns out that the
313 original implementation of notification in the Docker plugin did not use this result, so we can
314 still match the original notification functionality.
316 The "stream" approach returns a string containing any output sent by the command to stdout or stderr.
317 We'll return that so it can logged.
321 output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec,
329 except client.rest.ApiException as e:
330 # If the exception indicates the pod wasn't found, it's not a fatal error.
331 # It existed when we enumerated the pods for the deployment but no longer exists.
332 # Unfortunately, the only way to distinguish a pod not found from any other error
333 # is by looking at the reason text.
334 # (The ApiException's "status" field should contain the HTTP status code, which would
335 # be 404 if the pod isn't found, but empirical testing reveals that "status" is set
337 if "404 not found" in e.reason.lower():
338 output = "Pod not found"
342 return {"pod" : pod_name, "output" : output}
344 def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, resources, **kwargs):
346 This will create a k8s Deployment and, if needed, one or two k8s Services.
347 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
348 We're not exposing k8s to the component developer and the blueprint author.
349 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide
350 the details from the component developer and the blueprint author.)
352 namespace: the Kubernetes namespace into which the component is deployed
353 component_name: the component name, used to derive names of Kubernetes entities
354 image: the docker image for the component being deployed
355 replica: the number of instances of the component to be deployed
356 always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of
357 the Docker image for the component, even if it is already present on the Kubernetes node.
359 - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images.
360 (DON'T PANIC: these are just the names of secrets held in the Kubernetes secret store.)
361 - filebeat: a dictionary of filebeat sidecar parameters:
362 "log_path" : mount point for log volume in filebeat container
363 "data_path" : mount point for data volume in filebeat container
364 "config_path" : mount point for config volume in filebeat container
365 "config_subpath" : subpath for config data in filebeat container
366 "config_map" : ConfigMap holding the filebeat configuration
367 "image": Docker image to use for filebeat
368 - tls: a dictionary of TLS init container parameters:
369 "cert_path": mount point for certificate volume in init container
370 "image": Docker image to use for TLS init container
372 - volumes: array of volume objects, where a volume object is:
373 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
374 - ports: array of strings in the form "container_port:host_port"
375 - env: map of name-value pairs ( {name0: value0, name1: value1...}
376 - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
377 - log_info: an object with info for setting up ELK logging, with the form:
378 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}
379 - tls_info: an object with info for setting up TLS (HTTPS), with the form:
380 {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" }
381 - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"}
382 These label will be set on all the pods deployed as a result of this deploy() invocation.
383 - readiness: dict with health check info; if present, used to create a readiness probe for the main container. Includes:
384 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
385 - interval: period (in seconds) between probes
386 - timeout: time (in seconds) to allow a probe to complete
387 - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types
388 - path: the full path to the script to be executed in the container for "script" and "docker" types
392 deployment_ok = False
393 cip_service_created = False
394 deployment_description = {
395 "namespace": namespace,
404 core = client.CoreV1Api()
405 ext = client.ExtensionsV1beta1Api()
407 # Parse the port mapping
408 container_ports, port_map = _parse_ports(kwargs.get("ports", []))
410 # Parse the volumes list into volumes and volume_mounts for the deployment
411 volumes, volume_mounts = _parse_volumes(kwargs.get("volumes",[]))
413 # Initialize the list of containers that will be part of the pod
417 # Set up the ELK logging sidecar container, if needed
418 log_info = kwargs.get("log_info")
419 if log_info and "log_directory" in log_info:
420 log_dir = log_info["log_directory"]
421 fb = k8sconfig["filebeat"]
422 sidecar_volume_mounts = []
424 # Create the volume for component log files and volume mounts for the component and sidecar containers
425 volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
426 volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
427 sc_path = log_info["alternate_fb_path"] if "alternate_fb_path" in log_info \
428 else "{0}/{1}".format(fb["log_path"], component_name)
429 sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
431 # Create the volume for sidecar data and the volume mount for it
432 volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
433 sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=fb["data_path"]))
435 # Create the container for the sidecar
436 containers.append(_create_container_object("filebeat", fb["image"], False, False, {}, [], sidecar_volume_mounts))
438 # Create the volume for the sidecar configuration data and the volume mount for it
439 # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
441 client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=fb["config_map"])))
442 sidecar_volume_mounts.append(
443 client.V1VolumeMount(name="filebeat-conf", mount_path=fb["config_path"], sub_path=fb["config_subpath"]))
445 # Set up the TLS init container, if needed
446 tls_info = kwargs.get("tls_info")
448 if tls_info and "use_tls" in tls_info and tls_info["use_tls"]:
449 if "cert_directory" in tls_info and len(tls_info["cert_directory"]) > 0:
451 tls_config = k8sconfig["tls"]
453 # Create the certificate volume and volume mounts
454 volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource()))
455 volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=tls_info["cert_directory"]))
456 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])]
458 # Create the init container
459 init_containers.append(_create_container_object("init-tls", tls_config["image"], False, False, {}, [], init_volume_mounts))
461 # Create the container for the component
462 # Make it the first container in the pod
463 containers.insert(0, _create_container_object(component_name, image, always_pull, use_tls, kwargs.get("env", {}), container_ports, volume_mounts, resources, kwargs["readiness"]))
465 # Build the k8s Deployment object
466 labels = kwargs.get("labels", {})
467 labels.update({"app": component_name})
468 dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"])
471 ext.create_namespaced_deployment(namespace, dep)
473 deployment_description["deployment"] = _create_deployment_name(component_name)
475 # Create service(s), if a port mapping is specified
477 service_ports = [] # Ports exposed internally on the k8s network
478 exposed_ports = [] # Ports to be mapped to ports on the k8s nodes via NodePort
479 for (cport, proto), hport in port_map.iteritems():
480 service_ports.append(client.V1ServicePort(port=int(cport),protocol=proto,name="port-{0}-{1}".format(proto[0].lower(), cport)))
482 exposed_ports.append(client.V1ServicePort(port=int(cport),protocol=proto,node_port=int(hport),name="xport-{0}-{1}".format(proto[0].lower(),cport)))
484 # If there are ports to be exposed via MSB, set up the annotation for the service
485 msb_list = kwargs.get("msb_list")
486 annotations = msb.create_msb_annotation(msb_list) if msb_list else ''
488 # Create a ClusterIP service for access via the k8s network
489 service = _create_service_object(_create_service_name(component_name), component_name, service_ports, annotations, labels, "ClusterIP")
490 core.create_namespaced_service(namespace, service)
491 cip_service_created = True
492 deployment_description["services"].append(_create_service_name(component_name))
494 # If there are ports to be exposed on the k8s nodes, create a "NodePort" service
495 if len(exposed_ports) > 0:
497 _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, '', labels, "NodePort")
498 core.create_namespaced_service(namespace, exposed_service)
499 deployment_description["services"].append(_create_exposed_service_name(component_name))
501 except Exception as e:
502 # If the ClusterIP service was created, delete the service:
503 if cip_service_created:
504 core.delete_namespaced_service(_create_service_name(component_name), namespace)
505 # If the deployment was created but not the service, delete the deployment
507 client.ExtensionsV1beta1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, client.V1DeleteOptions())
510 return dep, deployment_description
512 def undeploy(deployment_description):
515 namespace = deployment_description["namespace"]
517 # remove any services associated with the component
518 for service in deployment_description["services"]:
519 client.CoreV1Api().delete_namespaced_service(service, namespace)
521 # Have k8s delete the underlying pods and replicaset when deleting the deployment.
522 options = client.V1DeleteOptions(propagation_policy="Foreground")
523 client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, options)
525 def is_available(namespace, component_name):
527 dep_status = client.AppsV1beta1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace)
528 # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec
529 # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation
530 return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas
532 def scale(deployment_description, replicas):
533 ''' Trigger a scaling operation by updating the replica count for the Deployment '''
535 def update_replica_count(spec):
536 spec.spec.replicas = replicas
539 _patch_deployment(deployment_description["namespace"], deployment_description["deployment"], update_replica_count)
541 def upgrade(deployment_description, image, container_index = 0):
542 ''' Trigger a rolling upgrade by sending a new image name/tag to k8s '''
544 def update_image(spec):
545 spec.spec.template.spec.containers[container_index].image = image
548 _patch_deployment(deployment_description["namespace"], deployment_description["deployment"], update_image)
550 def rollback(deployment_description, rollback_to=0):
552 Undo upgrade by rolling back to a previous revision of the deployment.
553 By default, go back one revision.
554 rollback_to can be used to supply a specific revision number.
555 Returns the image for the app container and the replica count from the rolled-back deployment
559 Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method.
560 The k8s python client code throws an exception while processing the response from the API.
562 - https://github.com/kubernetes-client/python/issues/491
563 - https://github.com/kubernetes/kubernetes/pull/63837
564 The fix has been merged into the master branch but is not in the latest release.
567 deployment = deployment_description["deployment"]
568 namespace = deployment_description["namespace"]
570 # Initiate the rollback
571 client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback(
574 client.AppsV1beta1DeploymentRollback(name=deployment, rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to)))
576 # Read back the spec for the rolled-back deployment
577 spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
578 return spec.spec.template.spec.containers[0].image, spec.spec.replicas
580 def execute_command_in_deployment(deployment_description, command):
582 Enumerates the pods in the k8s deployment identified by "deployment_description",
583 then executes the command (represented as an argv-style list) in "command" in
584 container 0 (the main application container) each of those pods.
586 Note that the sets of pods associated with a deployment can change over time. The
587 enumeration is a snapshot at one point in time. The command will not be executed in
588 pods that are created after the initial enumeration. If a pod disappears after the
589 initial enumeration and before the command is executed, the attempt to execute the
590 command will fail. This is not treated as a fatal error.
592 This approach is reasonable for the one current use case for "execute_command": running a
593 script to notify a container that its configuration has changed as a result of a
594 policy change. In this use case, the new configuration information is stored into
595 the configuration store (Consul), the pods are enumerated, and the command is executed.
596 If a pod disappears after the enumeration, the fact that the command cannot be run
597 doesn't matter--a nonexistent pod doesn't need to be reconfigured. Similarly, a pod that
598 comes up after the enumeration will get its initial configuration from the updated version
601 The optimal solution here would be for k8s to provide an API call to execute a command in
602 all of the pods for a deployment. Unfortunately, k8s does not provide such a call--the
603 only call provided by k8s operates at the pod level, not the deployment level.
605 Another interesting k8s factoid: there's no direct way to list the pods belong to a
606 particular k8s deployment. The deployment code above sets a label ("k8sdeployment") on
607 the pod that has the k8s deployment name. To list the pods, the code below queries for
608 pods with the label carrying the deployment name.
612 deployment = deployment_description["deployment"]
613 namespace = deployment_description["namespace"]
615 # Get names of all the running pods belonging to the deployment
616 pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod(
617 namespace = namespace,
618 label_selector = "k8sdeployment={0}".format(deployment),
619 field_selector = "status.phase=Running"
622 def do_execute(pod_name):
623 return _execute_command_in_pod(namespace, pod_name, command)
625 # Execute command in the running pods
626 return map(do_execute, pod_names)