1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 from kubernetes import config, client
25 # Default values for readiness probe
26 PROBE_DEFAULT_PERIOD = 15
27 PROBE_DEFAULT_TIMEOUT = 1
29 def _create_deployment_name(component_name):
30 return "dep-{0}".format(component_name)
32 def _create_service_name(component_name):
33 return "{0}".format(component_name)
35 def _create_exposed_service_name(component_name):
36 return ("x{0}".format(component_name))[:63]
39 # Look for a kubernetes config file in ~/.kube/config
40 kubepath = os.path.join(os.environ["HOME"], '.kube/config')
41 if os.path.exists(kubepath):
42 config.load_kube_config(kubepath)
44 # Maybe we're running in a k8s container and we can use info provided by k8s
45 # We would like to use:
46 # config.load_incluster_config()
47 # but this looks into os.environ for kubernetes host and port, and from
48 # the plugin those aren't visible. So we use the InClusterConfigLoader class,
49 # where we can set the environment to what we like.
50 # This is probably brittle! Maybe there's a better alternative.
52 config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local",
53 config.incluster_config.SERVICE_PORT_ENV_NAME : "443"
55 config.incluster_config.InClusterConfigLoader(
56 token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME,
57 cert_filename=config.incluster_config.SERVICE_CERT_FILENAME,
61 def _create_probe(hc, port):
62 ''' Create a Kubernetes probe based on info in the health check dictionary hc '''
63 probe_type = hc['type']
65 period = hc.get('interval', PROBE_DEFAULT_PERIOD)
66 timeout = hc.get('timeout', PROBE_DEFAULT_TIMEOUT)
67 if probe_type in ['http', 'https']:
68 probe = client.V1Probe(
69 failure_threshold = 1,
70 initial_delay_seconds = 5,
71 period_seconds = period,
72 timeout_seconds = timeout,
73 http_get = client.V1HTTPGetAction(
74 path = hc['endpoint'],
76 scheme = probe_type.upper()
79 elif probe_type in ['script', 'docker']:
80 probe = client.V1Probe(
81 failure_threshold = 1,
82 initial_delay_seconds = 5,
83 period_seconds = period,
84 timeout_seconds = timeout,
85 _exec = client.V1ExecAction(
86 command = [hc['script']]
91 def _create_container_object(name, image, always_pull, env={}, container_ports=[], volume_mounts = [], readiness = None):
92 # Set up environment variables
93 # Copy any passed in environment variables
94 env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env.keys()]
95 # Add POD_IP with the IP address of the pod running the container
96 pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP"))
97 env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip))
99 # If a health check is specified, create a readiness probe
100 # (For an HTTP-based check, we assume it's at the first container port)
105 if len(container_ports) > 0:
106 hc_port = container_ports[0]
107 probe = _create_probe(readiness, hc_port)
109 # Define container for pod
110 return client.V1Container(
113 image_pull_policy='Always' if always_pull else 'IfNotPresent',
115 ports=[client.V1ContainerPort(container_port=p) for p in container_ports],
116 volume_mounts = volume_mounts,
117 readiness_probe = probe
120 def _create_deployment_object(component_name,
127 # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
128 # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
130 for secret in pull_secrets:
131 ips.append(client.V1LocalObjectReference(name=secret))
133 # Define pod template
134 template = client.V1PodTemplateSpec(
135 metadata=client.V1ObjectMeta(labels=labels),
136 spec=client.V1PodSpec(hostname=component_name,
137 containers=containers,
139 image_pull_secrets=ips)
142 # Define deployment spec
143 spec = client.ExtensionsV1beta1DeploymentSpec(
148 # Create deployment object
149 deployment = client.ExtensionsV1beta1Deployment(
151 metadata=client.V1ObjectMeta(name=_create_deployment_name(component_name)),
157 def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type):
158 service_spec = client.V1ServiceSpec(
160 selector={"app" : component_name},
164 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations)
166 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels)
168 service = client.V1Service(
176 def _parse_ports(port_list):
181 [container, host] = (p.strip()).split(":",2)
182 cport = int(container)
183 container_ports.append(cport)
185 port_map[container] = hport
187 pass # if something doesn't parse, we just ignore it
189 return container_ports, port_map
191 def _parse_volumes(volume_list):
194 for v in volume_list:
195 vname = str(uuid.uuid4())
196 vhost = v['host']['path']
197 vcontainer = v['container']['bind']
198 vro = (v['container']['mode'] == 'ro')
199 volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
200 volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro))
202 return volumes, volume_mounts
204 def _service_exists(namespace, component_name):
208 client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace)
210 except client.rest.ApiException:
215 def _patch_deployment(namespace, deployment, modify):
217 Gets the current spec for 'deployment' in 'namespace',
218 uses the 'modify' function to change the spec,
219 then sends the updated spec to k8s.
223 # Get deployment spec
224 spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
226 # Apply changes to spec
229 # Patch the deploy with updated spec
230 client.ExtensionsV1beta1Api().patch_namespaced_deployment(deployment, namespace, spec)
232 def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs):
234 This will create a k8s Deployment and, if needed, one or two k8s Services.
235 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
236 We're not exposing k8s to the component developer and the blueprint author.
237 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide
238 the details from the component developer and the blueprint author.)
240 namespace: the Kubernetes namespace into which the component is deployed
241 component_name: the component name, used to derive names of Kubernetes entities
242 image: the docker image for the component being deployed
243 replica: the number of instances of the component to be deployed
244 always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of
245 the Docker image for the component, even if it is already present on the Kubernetes node.
247 - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images.
248 (DON'T PANIC: these are just the names of secrets held in the Kubernetes secret store.)
249 - filebeat: a dictionary of filebeat sidecar parameters:
250 "log_path" : mount point for log volume in filebeat container
251 "data_path" : mount point for data volume in filebeat container
252 "config_path" : mount point for config volume in filebeat container
253 "config_subpath" : subpath for config data in filebeat container
254 "config_map" : ConfigMap holding the filebeat configuration
255 "image": Docker image to use for filebeat
257 - volumes: array of volume objects, where a volume object is:
258 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
259 - ports: array of strings in the form "container_port:host_port"
260 - env: map of name-value pairs ( {name0: value0, name1: value1...}
261 - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
262 - log_info: an object with info for setting up ELK logging, with the form:
263 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}
264 - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"}
265 These label will be set on all the pods deployed as a result of this deploy() invocation.
266 - readiness: dict with health check info; if present, used to create a readiness probe for the main container. Includes:
267 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
268 - interval: period (in seconds) between probes
269 - timeout: time (in seconds) to allow a probe to complete
270 - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types
271 - path: the full path to the script to be executed in the container for "script" and "docker" types
275 deployment_ok = False
276 cip_service_created = False
277 deployment_description = {
278 "namespace": namespace,
287 core = client.CoreV1Api()
288 ext = client.ExtensionsV1beta1Api()
290 # Parse the port mapping into [container_port,...] and [{"host_port" : "container_port"},...]
291 container_ports, port_map = _parse_ports(kwargs.get("ports", []))
293 # Parse the volumes list into volumes and volume_mounts for the deployment
294 volumes, volume_mounts = _parse_volumes(kwargs.get("volumes",[]))
296 # Initialize the list of containers that will be part of the pod
299 # Set up the ELK logging sidecar container, if needed
300 log_info = kwargs.get("log_info")
301 if log_info and "log_directory" in log_info:
302 log_dir = log_info["log_directory"]
303 fb = k8sconfig["filebeat"]
304 sidecar_volume_mounts = []
306 # Create the volume for component log files and volume mounts for the component and sidecar containers
307 volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
308 volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
309 sc_path = log_info["alternate_fb_path"] if "alternate_fb_path" in log_info \
310 else "{0}/{1}".format(fb["log_path"], component_name)
311 sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
313 # Create the volume for sidecar data and the volume mount for it
314 volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
315 sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=fb["data_path"]))
317 # Create the container for the sidecar
318 containers.append(_create_container_object("filebeat", fb["image"], False, {}, [], sidecar_volume_mounts))
320 # Create the volume for the sidecar configuration data and the volume mount for it
321 # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
323 client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=fb["config_map"])))
324 sidecar_volume_mounts.append(
325 client.V1VolumeMount(name="filebeat-conf", mount_path=fb["config_path"], sub_path=fb["config_subpath"]))
327 # Create the container for the component
328 # Make it the first container in the pod
329 containers.insert(0, _create_container_object(component_name, image, always_pull, kwargs.get("env", {}), container_ports, volume_mounts, kwargs["readiness"]))
331 # Build the k8s Deployment object
332 labels = kwargs.get("labels", {})
333 labels.update({"app": component_name})
334 dep = _create_deployment_object(component_name, containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"])
337 ext.create_namespaced_deployment(namespace, dep)
339 deployment_description["deployment"] = _create_deployment_name(component_name)
341 # Create service(s), if a port mapping is specified
343 service_ports = [] # Ports exposed internally on the k8s network
344 exposed_ports = [] # Ports to be mapped to ports on the k8s nodes via NodePort
345 for cport, hport in port_map.iteritems():
346 service_ports.append(client.V1ServicePort(port=int(cport),name="port-{}".format(cport)))
348 exposed_ports.append(client.V1ServicePort(port=int(cport), node_port=int(hport),name="xport-{}".format(cport)))
350 # If there are ports to be exposed via MSB, set up the annotation for the service
351 msb_list = kwargs.get("msb_list")
352 annotations = msb.create_msb_annotation(msb_list) if msb_list else ''
354 # Create a ClusterIP service for access via the k8s network
355 service = _create_service_object(_create_service_name(component_name), component_name, service_ports, annotations, labels, "ClusterIP")
356 core.create_namespaced_service(namespace, service)
357 cip_service_created = True
358 deployment_description["services"].append(_create_service_name(component_name))
360 # If there are ports to be exposed on the k8s nodes, create a "NodePort" service
361 if len(exposed_ports) > 0:
363 _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, '', labels, "NodePort")
364 core.create_namespaced_service(namespace, exposed_service)
365 deployment_description["services"].append(_create_exposed_service_name(component_name))
367 except Exception as e:
368 # If the ClusterIP service was created, delete the service:
369 if cip_service_created:
370 core.delete_namespaced_service(_create_service_name(component_name), namespace)
371 # If the deployment was created but not the service, delete the deployment
373 client.ExtensionsV1beta1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, client.V1DeleteOptions())
376 return dep, deployment_description
378 def undeploy(deployment_description):
381 namespace = deployment_description["namespace"]
383 # remove any services associated with the component
384 for service in deployment_description["services"]:
385 client.CoreV1Api().delete_namespaced_service(service, namespace)
387 # Have k8s delete the underlying pods and replicaset when deleting the deployment.
388 options = client.V1DeleteOptions(propagation_policy="Foreground")
389 client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, options)
391 def is_available(namespace, component_name):
393 dep_status = client.AppsV1beta1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace)
394 # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec
395 # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation
396 return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas
398 def scale(deployment_description, replicas):
399 ''' Trigger a scaling operation by updating the replica count for the Deployment '''
401 def update_replica_count(spec):
402 spec.spec.replicas = replicas
405 _patch_deployment(deployment_description["namespace"], deployment_description["deployment"], update_replica_count)
407 def upgrade(deployment_description, image, container_index = 0):
408 ''' Trigger a rolling upgrade by sending a new image name/tag to k8s '''
410 def update_image(spec):
411 spec.spec.template.spec.containers[container_index].image = image
414 _patch_deployment(deployment_description["namespace"], deployment_description["deployment"], update_image)
416 def rollback(deployment_description, rollback_to=0):
418 Undo upgrade by rolling back to a previous revision of the deployment.
419 By default, go back one revision.
420 rollback_to can be used to supply a specific revision number.
421 Returns the image for the app container and the replica count from the rolled-back deployment
425 Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method.
426 The k8s python client code throws an exception while processing the response from the API.
428 - https://github.com/kubernetes-client/python/issues/491
429 - https://github.com/kubernetes/kubernetes/pull/63837
430 The fix has been merged into the master branch but is not in the latest release.
433 deployment = deployment_description["deployment"]
434 namespace = deployment_description["namespace"]
436 # Initiate the rollback
437 client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback(
440 client.AppsV1beta1DeploymentRollback(name=deployment, rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to)))
442 # Read back the spec for the rolled-back deployment
443 spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
444 return spec.spec.template.spec.containers[0].image, spec.spec.replicas