Add support for updating image.
[dcaegen2/platform/plugins.git] / k8s / k8sclient / k8sclient.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #      http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
18 #
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
20 import os
21 import uuid
22 from msb import msb
23 from kubernetes import config, client
24
25 # Default values for readiness probe
26 PROBE_DEFAULT_PERIOD = 15
27 PROBE_DEFAULT_TIMEOUT = 1
28
29 def _create_deployment_name(component_name):
30     return "dep-{0}".format(component_name)
31
32 def _create_service_name(component_name):
33     return "{0}".format(component_name)
34
35 def _create_exposed_service_name(component_name):
36     return ("x{0}".format(component_name))[:63]
37
38 def _configure_api():
39     # Look for a kubernetes config file in ~/.kube/config
40     kubepath = os.path.join(os.environ["HOME"], '.kube/config')
41     if os.path.exists(kubepath):
42         config.load_kube_config(kubepath)
43     else:
44         # Maybe we're running in a k8s container and we can use info provided by k8s
45         # We would like to use:
46         # config.load_incluster_config()
47         # but this looks into os.environ for kubernetes host and port, and from
48         # the plugin those aren't visible.   So we use the InClusterConfigLoader class,
49         # where we can set the environment to what we like.
50         # This is probably brittle!  Maybe there's a better alternative.
51         localenv = {
52             config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local",
53             config.incluster_config.SERVICE_PORT_ENV_NAME : "443"
54         }
55         config.incluster_config.InClusterConfigLoader(
56             token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME,
57             cert_filename=config.incluster_config.SERVICE_CERT_FILENAME,
58             environ=localenv
59         ).load_and_set()
60
61 def _create_probe(hc, port):
62     ''' Create a Kubernetes probe based on info in the health check dictionary hc '''
63     probe_type = hc['type']
64     probe = None
65     period = hc.get('interval', PROBE_DEFAULT_PERIOD)
66     timeout = hc.get('timeout', PROBE_DEFAULT_TIMEOUT)
67     if probe_type in ['http', 'https']:
68         probe = client.V1Probe(
69           failure_threshold = 1,
70           initial_delay_seconds = 5,
71           period_seconds = period,
72           timeout_seconds = timeout,
73           http_get = client.V1HTTPGetAction(
74               path = hc['endpoint'],
75               port = port,
76               scheme = probe_type.upper()
77           )
78         )
79     elif probe_type in ['script', 'docker']:
80         probe = client.V1Probe(
81           failure_threshold = 1,
82           initial_delay_seconds = 5,
83           period_seconds = period,
84           timeout_seconds = timeout,
85           _exec = client.V1ExecAction(
86               command = [hc['script']]
87           )
88         )
89     return probe
90
91 def _create_container_object(name, image, always_pull, env={}, container_ports=[], volume_mounts = [], readiness = None):
92     # Set up environment variables
93     # Copy any passed in environment variables
94     env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env.keys()]
95     # Add POD_IP with the IP address of the pod running the container
96     pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP"))
97     env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip))
98
99     # If a health check is specified, create a readiness probe
100     # (For an HTTP-based check, we assume it's at the first container port)
101     probe = None
102
103     if readiness:
104         hc_port = None
105         if len(container_ports) > 0:
106             hc_port = container_ports[0]
107         probe = _create_probe(readiness, hc_port)
108
109     # Define container for pod
110     return client.V1Container(
111         name=name,
112         image=image,
113         image_pull_policy='Always' if always_pull else 'IfNotPresent',
114         env=env_vars,
115         ports=[client.V1ContainerPort(container_port=p) for p in container_ports],
116         volume_mounts = volume_mounts,
117         readiness_probe = probe
118     )
119
120 def _create_deployment_object(component_name,
121                               containers,
122                               replicas,
123                               volumes,
124                               labels,
125                               pull_secrets=[]):
126
127     # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
128     # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
129     ips = []
130     for secret in pull_secrets:
131         ips.append(client.V1LocalObjectReference(name=secret))
132
133     # Define pod template
134     template = client.V1PodTemplateSpec(
135         metadata=client.V1ObjectMeta(labels=labels),
136         spec=client.V1PodSpec(hostname=component_name,
137                               containers=containers,
138                               volumes=volumes,
139                               image_pull_secrets=ips)
140     )
141
142     # Define deployment spec
143     spec = client.ExtensionsV1beta1DeploymentSpec(
144         replicas=replicas,
145         template=template
146     )
147
148     # Create deployment object
149     deployment = client.ExtensionsV1beta1Deployment(
150         kind="Deployment",
151         metadata=client.V1ObjectMeta(name=_create_deployment_name(component_name)),
152         spec=spec
153     )
154
155     return deployment
156
157 def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type):
158     service_spec = client.V1ServiceSpec(
159         ports=service_ports,
160         selector={"app" : component_name},
161         type=service_type
162     )
163     if annotations:
164         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations)
165     else:
166         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels)
167
168     service = client.V1Service(
169         kind="Service",
170         api_version="v1",
171         metadata=metadata,
172         spec=service_spec
173     )
174     return service
175
176 def _parse_ports(port_list):
177     container_ports = []
178     port_map = {}
179     for p in port_list:
180         try:
181             [container, host] = (p.strip()).split(":",2)
182             cport = int(container)
183             container_ports.append(cport)
184             hport = int(host)
185             port_map[container] = hport
186         except:
187             pass    # if something doesn't parse, we just ignore it
188
189     return container_ports, port_map
190
191 def _parse_volumes(volume_list):
192     volumes = []
193     volume_mounts = []
194     for v in volume_list:
195         vname = str(uuid.uuid4())
196         vhost = v['host']['path']
197         vcontainer = v['container']['bind']
198         vro = (v['container']['mode'] == 'ro')
199         volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
200         volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro))
201
202     return volumes, volume_mounts
203
204 def _service_exists(namespace, component_name):
205     exists = False
206     try:
207         _configure_api()
208         client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace)
209         exists = True
210     except client.rest.ApiException:
211         pass
212
213     return exists
214
215 def _patch_deployment(namespace, deployment, modify):
216     '''
217     Gets the current spec for 'deployment' in 'namespace',
218     uses the 'modify' function to change the spec,
219     then sends the updated spec to k8s.
220     '''
221     _configure_api()
222
223     # Get deployment spec
224     spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
225
226     # Apply changes to spec
227     spec = modify(spec)
228
229     # Patch the deploy with updated spec
230     client.ExtensionsV1beta1Api().patch_namespaced_deployment(deployment, namespace, spec)
231
232 def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs):
233     '''
234     This will create a k8s Deployment and, if needed, one or two k8s Services.
235     (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
236     We're not exposing k8s to the component developer and the blueprint author.
237     This is a conscious choice.  We want to use k8s in a controlled, consistent way, and we want to hide
238     the details from the component developer and the blueprint author.)
239
240     namespace:  the Kubernetes namespace into which the component is deployed
241     component_name:  the component name, used to derive names of Kubernetes entities
242     image: the docker image for the component being deployed
243     replica: the number of instances of the component to be deployed
244     always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of
245        the Docker image for the component, even if it is already present on the Kubernetes node.
246     k8sconfig contains:
247         - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images.
248           (DON'T PANIC:  these are just the names of secrets held in the Kubernetes secret store.)
249         - filebeat: a dictionary of filebeat sidecar parameters:
250             "log_path" : mount point for log volume in filebeat container
251             "data_path" : mount point for data volume in filebeat container
252             "config_path" : mount point for config volume in filebeat container
253             "config_subpath" :  subpath for config data in filebeat container
254             "config_map" : ConfigMap holding the filebeat configuration
255             "image": Docker image to use for filebeat
256     kwargs may have:
257         - volumes:  array of volume objects, where a volume object is:
258             {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
259         - ports: array of strings in the form "container_port:host_port"
260         - env: map of name-value pairs ( {name0: value0, name1: value1...}
261         - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
262         - log_info: an object with info for setting up ELK logging, with the form:
263             {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}
264         - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"}
265             These label will be set on all the pods deployed as a result of this deploy() invocation.
266         - readiness: dict with health check info; if present, used to create a readiness probe for the main container.  Includes:
267             - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
268             - interval: period (in seconds) between probes
269             - timeout:  time (in seconds) to allow a probe to complete
270             - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types
271             - path: the full path to the script to be executed in the container for "script" and "docker" types
272
273     '''
274
275     deployment_ok = False
276     cip_service_created = False
277     deployment_description = {
278         "namespace": namespace,
279         "deployment": '',
280         "services": []
281     }
282
283     try:
284         _configure_api()
285
286         # Get API handles
287         core = client.CoreV1Api()
288         ext = client.ExtensionsV1beta1Api()
289
290         # Parse the port mapping into [container_port,...] and [{"host_port" : "container_port"},...]
291         container_ports, port_map = _parse_ports(kwargs.get("ports", []))
292
293         # Parse the volumes list into volumes and volume_mounts for the deployment
294         volumes, volume_mounts = _parse_volumes(kwargs.get("volumes",[]))
295
296         # Initialize the list of containers that will be part of the pod
297         containers = []
298
299         # Set up the ELK logging sidecar container, if needed
300         log_info = kwargs.get("log_info")
301         if log_info and "log_directory" in log_info:
302             log_dir = log_info["log_directory"]
303             fb = k8sconfig["filebeat"]
304             sidecar_volume_mounts = []
305
306             # Create the volume for component log files and volume mounts for the component and sidecar containers
307             volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
308             volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
309             sc_path = log_info["alternate_fb_path"] if "alternate_fb_path" in log_info  \
310                 else "{0}/{1}".format(fb["log_path"], component_name)
311             sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
312
313             # Create the volume for sidecar data and the volume mount for it
314             volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
315             sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=fb["data_path"]))
316
317             # Create the container for the sidecar
318             containers.append(_create_container_object("filebeat", fb["image"], False, {}, [], sidecar_volume_mounts))
319
320             # Create the volume for the sidecar configuration data and the volume mount for it
321             # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
322             volumes.append(
323                 client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=fb["config_map"])))
324             sidecar_volume_mounts.append(
325                 client.V1VolumeMount(name="filebeat-conf", mount_path=fb["config_path"], sub_path=fb["config_subpath"]))
326
327         # Create the container for the component
328         # Make it the first container in the pod
329         containers.insert(0, _create_container_object(component_name, image, always_pull, kwargs.get("env", {}), container_ports, volume_mounts, kwargs["readiness"]))
330
331         # Build the k8s Deployment object
332         labels = kwargs.get("labels", {})
333         labels.update({"app": component_name})
334         dep = _create_deployment_object(component_name, containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"])
335
336         # Have k8s deploy it
337         ext.create_namespaced_deployment(namespace, dep)
338         deployment_ok = True
339         deployment_description["deployment"] = _create_deployment_name(component_name)
340
341         # Create service(s), if a port mapping is specified
342         if port_map:
343             service_ports = []      # Ports exposed internally on the k8s network
344             exposed_ports = []      # Ports to be mapped to ports on the k8s nodes via NodePort
345             for cport, hport in port_map.iteritems():
346                 service_ports.append(client.V1ServicePort(port=int(cport),name="port-{}".format(cport)))
347                 if int(hport) != 0:
348                     exposed_ports.append(client.V1ServicePort(port=int(cport), node_port=int(hport),name="xport-{}".format(cport)))
349
350             # If there are ports to be exposed via MSB, set up the annotation for the service
351             msb_list = kwargs.get("msb_list")
352             annotations = msb.create_msb_annotation(msb_list) if msb_list else ''
353
354             # Create a ClusterIP service for access via the k8s network
355             service = _create_service_object(_create_service_name(component_name), component_name, service_ports, annotations, labels, "ClusterIP")
356             core.create_namespaced_service(namespace, service)
357             cip_service_created = True
358             deployment_description["services"].append(_create_service_name(component_name))
359
360             # If there are ports to be exposed on the k8s nodes, create a "NodePort" service
361             if len(exposed_ports) > 0:
362                 exposed_service = \
363                     _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, '', labels, "NodePort")
364                 core.create_namespaced_service(namespace, exposed_service)
365                 deployment_description["services"].append(_create_exposed_service_name(component_name))
366
367     except Exception as e:
368         # If the ClusterIP service was created, delete the service:
369         if cip_service_created:
370             core.delete_namespaced_service(_create_service_name(component_name), namespace)
371         # If the deployment was created but not the service, delete the deployment
372         if deployment_ok:
373             client.ExtensionsV1beta1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, client.V1DeleteOptions())
374         raise e
375
376     return dep, deployment_description
377
378 def undeploy(deployment_description):
379     _configure_api()
380
381     namespace = deployment_description["namespace"]
382
383     # remove any services associated with the component
384     for service in deployment_description["services"]:
385         client.CoreV1Api().delete_namespaced_service(service, namespace)
386
387     # Have k8s delete the underlying pods and replicaset when deleting the deployment.
388     options = client.V1DeleteOptions(propagation_policy="Foreground")
389     client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, options)
390
391 def is_available(namespace, component_name):
392     _configure_api()
393     dep_status = client.AppsV1beta1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace)
394     # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec
395     # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation
396     return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas
397
398 def scale(deployment_description, replicas):
399     ''' Trigger a scaling operation by updating the replica count for the Deployment '''
400
401     def update_replica_count(spec):
402         spec.spec.replicas = replicas
403         return spec
404
405     _patch_deployment(deployment_description["namespace"], deployment_description["deployment"], update_replica_count)
406
407 def upgrade(deployment_description, image, container_index = 0):
408     ''' Trigger a rolling upgrade by sending a new image name/tag to k8s '''
409
410     def update_image(spec):
411         spec.spec.template.spec.containers[container_index].image = image
412         return spec
413
414     _patch_deployment(deployment_description["namespace"], deployment_description["deployment"], update_image)
415
416 def rollback(deployment_description, rollback_to=0):
417     '''
418     Undo upgrade by rolling back to a previous revision of the deployment.
419     By default, go back one revision.
420     rollback_to can be used to supply a specific revision number.
421     Returns the image for the app container and the replica count from the rolled-back deployment
422     '''
423     '''
424     2018-07-13
425     Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method.
426     The k8s python client code throws an exception while processing the response from the API.
427     See:
428        - https://github.com/kubernetes-client/python/issues/491
429        - https://github.com/kubernetes/kubernetes/pull/63837
430     The fix has been merged into the master branch but is not in the latest release.
431     '''
432     _configure_api()
433     deployment = deployment_description["deployment"]
434     namespace = deployment_description["namespace"]
435
436     # Initiate the rollback
437     client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback(
438         deployment,
439         namespace,
440         client.AppsV1beta1DeploymentRollback(name=deployment, rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to)))
441
442     # Read back the spec for the rolled-back deployment
443     spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
444     return spec.spec.template.spec.containers[0].image, spec.spec.replicas