1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 from kubernetes import config, client, stream
25 # Default values for readiness probe
26 PROBE_DEFAULT_PERIOD = 15
27 PROBE_DEFAULT_TIMEOUT = 1
29 def _create_deployment_name(component_name):
30 return "dep-{0}".format(component_name)
32 def _create_service_name(component_name):
33 return "{0}".format(component_name)
35 def _create_exposed_service_name(component_name):
36 return ("x{0}".format(component_name))[:63]
39 # Look for a kubernetes config file in ~/.kube/config
40 kubepath = os.path.join(os.environ["HOME"], '.kube/config')
41 if os.path.exists(kubepath):
42 config.load_kube_config(kubepath)
44 # Maybe we're running in a k8s container and we can use info provided by k8s
45 # We would like to use:
46 # config.load_incluster_config()
47 # but this looks into os.environ for kubernetes host and port, and from
48 # the plugin those aren't visible. So we use the InClusterConfigLoader class,
49 # where we can set the environment to what we like.
50 # This is probably brittle! Maybe there's a better alternative.
52 config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local",
53 config.incluster_config.SERVICE_PORT_ENV_NAME : "443"
55 config.incluster_config.InClusterConfigLoader(
56 token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME,
57 cert_filename=config.incluster_config.SERVICE_CERT_FILENAME,
61 def _create_probe(hc, port):
62 ''' Create a Kubernetes probe based on info in the health check dictionary hc '''
63 probe_type = hc['type']
65 period = hc.get('interval', PROBE_DEFAULT_PERIOD)
66 timeout = hc.get('timeout', PROBE_DEFAULT_TIMEOUT)
67 if probe_type in ['http', 'https']:
68 probe = client.V1Probe(
69 failure_threshold = 1,
70 initial_delay_seconds = 5,
71 period_seconds = period,
72 timeout_seconds = timeout,
73 http_get = client.V1HTTPGetAction(
74 path = hc['endpoint'],
76 scheme = probe_type.upper()
79 elif probe_type in ['script', 'docker']:
80 probe = client.V1Probe(
81 failure_threshold = 1,
82 initial_delay_seconds = 5,
83 period_seconds = period,
84 timeout_seconds = timeout,
85 _exec = client.V1ExecAction(
86 command = [hc['script']]
91 def _create_container_object(name, image, always_pull, env={}, container_ports=[], volume_mounts = [], readiness = None):
92 # Set up environment variables
93 # Copy any passed in environment variables
94 env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env.keys()]
95 # Add POD_IP with the IP address of the pod running the container
96 pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP"))
97 env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip))
99 # If a health check is specified, create a readiness probe
100 # (For an HTTP-based check, we assume it's at the first container port)
105 if len(container_ports) > 0:
106 hc_port = container_ports[0]
107 probe = _create_probe(readiness, hc_port)
109 # Define container for pod
110 return client.V1Container(
113 image_pull_policy='Always' if always_pull else 'IfNotPresent',
115 ports=[client.V1ContainerPort(container_port=p) for p in container_ports],
116 volume_mounts = volume_mounts,
117 readiness_probe = probe
120 def _create_deployment_object(component_name,
127 deployment_name = _create_deployment_name(component_name)
129 # Label the pod with the deployment name, so we can find it easily
130 labels.update({"k8sdeployment" : deployment_name})
132 # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
133 # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
135 for secret in pull_secrets:
136 ips.append(client.V1LocalObjectReference(name=secret))
138 # Define pod template
139 template = client.V1PodTemplateSpec(
140 metadata=client.V1ObjectMeta(labels=labels),
141 spec=client.V1PodSpec(hostname=component_name,
142 containers=containers,
144 image_pull_secrets=ips)
147 # Define deployment spec
148 spec = client.ExtensionsV1beta1DeploymentSpec(
153 # Create deployment object
154 deployment = client.ExtensionsV1beta1Deployment(
156 metadata=client.V1ObjectMeta(name=deployment_name),
162 def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type):
163 service_spec = client.V1ServiceSpec(
165 selector={"app" : component_name},
169 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations)
171 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels)
173 service = client.V1Service(
181 def _parse_ports(port_list):
186 [container, host] = (p.strip()).split(":",2)
187 cport = int(container)
188 container_ports.append(cport)
190 port_map[container] = hport
192 pass # if something doesn't parse, we just ignore it
194 return container_ports, port_map
196 def _parse_volumes(volume_list):
199 for v in volume_list:
200 vname = str(uuid.uuid4())
201 vhost = v['host']['path']
202 vcontainer = v['container']['bind']
203 vro = (v['container']['mode'] == 'ro')
204 volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
205 volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro))
207 return volumes, volume_mounts
209 def _service_exists(namespace, component_name):
213 client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace)
215 except client.rest.ApiException:
220 def _patch_deployment(namespace, deployment, modify):
222 Gets the current spec for 'deployment' in 'namespace',
223 uses the 'modify' function to change the spec,
224 then sends the updated spec to k8s.
228 # Get deployment spec
229 spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
231 # Apply changes to spec
234 # Patch the deploy with updated spec
235 client.ExtensionsV1beta1Api().patch_namespaced_deployment(deployment, namespace, spec)
237 def _execute_command_in_pod(namespace, pod_name, command):
239 Execute the command (specified by an argv-style list in the "command" parameter) in
240 the specified pod in the specified namespace. For now at least, we use this only to
241 run a notification script in a pod after a configuration change.
243 The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec".
244 Unfortunately due to a bug/limitation in the Python client library, we can't call it directly.
245 We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API.
246 I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py.
247 There are several issues tracking this, in various states. It isn't clear that there will ever
249 - https://github.com/kubernetes-client/python/issues/58
250 - https://github.com/kubernetes-client/python/issues/409
251 - https://github.com/kubernetes-client/python/issues/526
253 The main consequence of the workaround using "stream" is that the caller does not get an indication
254 of the exit code returned by the command when it completes execution. It turns out that the
255 original implementation of notification in the Docker plugin did not use this result, so we can
256 still match the original notification functionality.
258 The "stream" approach returns a string containing any output sent by the command to stdout or stderr.
259 We'll return that so it can logged.
263 output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec,
271 except client.rest.ApiException as e:
272 # If the exception indicates the pod wasn't found, it's not a fatal error.
273 # It existed when we enumerated the pods for the deployment but no longer exists.
274 # Unfortunately, the only way to distinguish a pod not found from any other error
275 # is by looking at the reason text.
276 # (The ApiException's "status" field should contain the HTTP status code, which would
277 # be 404 if the pod isn't found, but empirical testing reveals that "status" is set
279 if "404 not found" in e.reason.lower():
280 output = "Pod not found"
284 return {"pod" : pod_name, "output" : output}
286 def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs):
288 This will create a k8s Deployment and, if needed, one or two k8s Services.
289 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
290 We're not exposing k8s to the component developer and the blueprint author.
291 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide
292 the details from the component developer and the blueprint author.)
294 namespace: the Kubernetes namespace into which the component is deployed
295 component_name: the component name, used to derive names of Kubernetes entities
296 image: the docker image for the component being deployed
297 replica: the number of instances of the component to be deployed
298 always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of
299 the Docker image for the component, even if it is already present on the Kubernetes node.
301 - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images.
302 (DON'T PANIC: these are just the names of secrets held in the Kubernetes secret store.)
303 - filebeat: a dictionary of filebeat sidecar parameters:
304 "log_path" : mount point for log volume in filebeat container
305 "data_path" : mount point for data volume in filebeat container
306 "config_path" : mount point for config volume in filebeat container
307 "config_subpath" : subpath for config data in filebeat container
308 "config_map" : ConfigMap holding the filebeat configuration
309 "image": Docker image to use for filebeat
311 - volumes: array of volume objects, where a volume object is:
312 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
313 - ports: array of strings in the form "container_port:host_port"
314 - env: map of name-value pairs ( {name0: value0, name1: value1...}
315 - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
316 - log_info: an object with info for setting up ELK logging, with the form:
317 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}
318 - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"}
319 These label will be set on all the pods deployed as a result of this deploy() invocation.
320 - readiness: dict with health check info; if present, used to create a readiness probe for the main container. Includes:
321 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
322 - interval: period (in seconds) between probes
323 - timeout: time (in seconds) to allow a probe to complete
324 - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types
325 - path: the full path to the script to be executed in the container for "script" and "docker" types
329 deployment_ok = False
330 cip_service_created = False
331 deployment_description = {
332 "namespace": namespace,
341 core = client.CoreV1Api()
342 ext = client.ExtensionsV1beta1Api()
344 # Parse the port mapping into [container_port,...] and [{"host_port" : "container_port"},...]
345 container_ports, port_map = _parse_ports(kwargs.get("ports", []))
347 # Parse the volumes list into volumes and volume_mounts for the deployment
348 volumes, volume_mounts = _parse_volumes(kwargs.get("volumes",[]))
350 # Initialize the list of containers that will be part of the pod
353 # Set up the ELK logging sidecar container, if needed
354 log_info = kwargs.get("log_info")
355 if log_info and "log_directory" in log_info:
356 log_dir = log_info["log_directory"]
357 fb = k8sconfig["filebeat"]
358 sidecar_volume_mounts = []
360 # Create the volume for component log files and volume mounts for the component and sidecar containers
361 volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
362 volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
363 sc_path = log_info["alternate_fb_path"] if "alternate_fb_path" in log_info \
364 else "{0}/{1}".format(fb["log_path"], component_name)
365 sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
367 # Create the volume for sidecar data and the volume mount for it
368 volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
369 sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=fb["data_path"]))
371 # Create the container for the sidecar
372 containers.append(_create_container_object("filebeat", fb["image"], False, {}, [], sidecar_volume_mounts))
374 # Create the volume for the sidecar configuration data and the volume mount for it
375 # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
377 client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=fb["config_map"])))
378 sidecar_volume_mounts.append(
379 client.V1VolumeMount(name="filebeat-conf", mount_path=fb["config_path"], sub_path=fb["config_subpath"]))
381 # Create the container for the component
382 # Make it the first container in the pod
383 containers.insert(0, _create_container_object(component_name, image, always_pull, kwargs.get("env", {}), container_ports, volume_mounts, kwargs["readiness"]))
385 # Build the k8s Deployment object
386 labels = kwargs.get("labels", {})
387 labels.update({"app": component_name})
388 dep = _create_deployment_object(component_name, containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"])
391 ext.create_namespaced_deployment(namespace, dep)
393 deployment_description["deployment"] = _create_deployment_name(component_name)
395 # Create service(s), if a port mapping is specified
397 service_ports = [] # Ports exposed internally on the k8s network
398 exposed_ports = [] # Ports to be mapped to ports on the k8s nodes via NodePort
399 for cport, hport in port_map.iteritems():
400 service_ports.append(client.V1ServicePort(port=int(cport),name="port-{}".format(cport)))
402 exposed_ports.append(client.V1ServicePort(port=int(cport), node_port=int(hport),name="xport-{}".format(cport)))
404 # If there are ports to be exposed via MSB, set up the annotation for the service
405 msb_list = kwargs.get("msb_list")
406 annotations = msb.create_msb_annotation(msb_list) if msb_list else ''
408 # Create a ClusterIP service for access via the k8s network
409 service = _create_service_object(_create_service_name(component_name), component_name, service_ports, annotations, labels, "ClusterIP")
410 core.create_namespaced_service(namespace, service)
411 cip_service_created = True
412 deployment_description["services"].append(_create_service_name(component_name))
414 # If there are ports to be exposed on the k8s nodes, create a "NodePort" service
415 if len(exposed_ports) > 0:
417 _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, '', labels, "NodePort")
418 core.create_namespaced_service(namespace, exposed_service)
419 deployment_description["services"].append(_create_exposed_service_name(component_name))
421 except Exception as e:
422 # If the ClusterIP service was created, delete the service:
423 if cip_service_created:
424 core.delete_namespaced_service(_create_service_name(component_name), namespace)
425 # If the deployment was created but not the service, delete the deployment
427 client.ExtensionsV1beta1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, client.V1DeleteOptions())
430 return dep, deployment_description
432 def undeploy(deployment_description):
435 namespace = deployment_description["namespace"]
437 # remove any services associated with the component
438 for service in deployment_description["services"]:
439 client.CoreV1Api().delete_namespaced_service(service, namespace)
441 # Have k8s delete the underlying pods and replicaset when deleting the deployment.
442 options = client.V1DeleteOptions(propagation_policy="Foreground")
443 client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, options)
445 def is_available(namespace, component_name):
447 dep_status = client.AppsV1beta1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace)
448 # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec
449 # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation
450 return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas
452 def scale(deployment_description, replicas):
453 ''' Trigger a scaling operation by updating the replica count for the Deployment '''
455 def update_replica_count(spec):
456 spec.spec.replicas = replicas
459 _patch_deployment(deployment_description["namespace"], deployment_description["deployment"], update_replica_count)
461 def upgrade(deployment_description, image, container_index = 0):
462 ''' Trigger a rolling upgrade by sending a new image name/tag to k8s '''
464 def update_image(spec):
465 spec.spec.template.spec.containers[container_index].image = image
468 _patch_deployment(deployment_description["namespace"], deployment_description["deployment"], update_image)
470 def rollback(deployment_description, rollback_to=0):
472 Undo upgrade by rolling back to a previous revision of the deployment.
473 By default, go back one revision.
474 rollback_to can be used to supply a specific revision number.
475 Returns the image for the app container and the replica count from the rolled-back deployment
479 Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method.
480 The k8s python client code throws an exception while processing the response from the API.
482 - https://github.com/kubernetes-client/python/issues/491
483 - https://github.com/kubernetes/kubernetes/pull/63837
484 The fix has been merged into the master branch but is not in the latest release.
487 deployment = deployment_description["deployment"]
488 namespace = deployment_description["namespace"]
490 # Initiate the rollback
491 client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback(
494 client.AppsV1beta1DeploymentRollback(name=deployment, rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to)))
496 # Read back the spec for the rolled-back deployment
497 spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
498 return spec.spec.template.spec.containers[0].image, spec.spec.replicas
500 def execute_command_in_deployment(deployment_description, command):
502 Enumerates the pods in the k8s deployment identified by "deployment_description",
503 then executes the command (represented as an argv-style list) in "command" in
504 container 0 (the main application container) each of those pods.
506 Note that the sets of pods associated with a deployment can change over time. The
507 enumeration is a snapshot at one point in time. The command will not be executed in
508 pods that are created after the initial enumeration. If a pod disappears after the
509 initial enumeration and before the command is executed, the attempt to execute the
510 command will fail. This is not treated as a fatal error.
512 This approach is reasonable for the one current use case for "execute_command": running a
513 script to notify a container that its configuration has changed as a result of a
514 policy change. In this use case, the new configuration information is stored into
515 the configuration store (Consul), the pods are enumerated, and the command is executed.
516 If a pod disappears after the enumeration, the fact that the command cannot be run
517 doesn't matter--a nonexistent pod doesn't need to be reconfigured. Similarly, a pod that
518 comes up after the enumeration will get its initial configuration from the updated version
521 The optimal solution here would be for k8s to provide an API call to execute a command in
522 all of the pods for a deployment. Unfortunately, k8s does not provide such a call--the
523 only call provided by k8s operates at the pod level, not the deployment level.
525 Another interesting k8s factoid: there's no direct way to list the pods belong to a
526 particular k8s deployment. The deployment code above sets a label ("k8sdeployment") on
527 the pod that has the k8s deployment name. To list the pods, the code below queries for
528 pods with the label carrying the deployment name.
532 deployment = deployment_description["deployment"]
533 namespace = deployment_description["namespace"]
535 # Get names of all the running pods belonging to the deployment
536 pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod(
537 namespace = namespace,
538 label_selector = "k8sdeployment={0}".format(deployment),
539 field_selector = "status.phase=Running"
542 def do_execute(pod_name):
543 return _execute_command_in_pod(namespace, pod_name, command)
545 # Execute command in the running pods
546 return map(do_execute, pod_names)