Merge "Make docker node type consistent with k8s"
[dcaegen2/platform/plugins.git] / k8s / k8sclient / k8sclient.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #      http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
18 #
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
20 import os
21 import uuid
22 from msb import msb
23 from kubernetes import config, client, stream
24
25 # Default values for readiness probe
26 PROBE_DEFAULT_PERIOD = 15
27 PROBE_DEFAULT_TIMEOUT = 1
28
29 def _create_deployment_name(component_name):
30     return "dep-{0}".format(component_name)
31
32 def _create_service_name(component_name):
33     return "{0}".format(component_name)
34
35 def _create_exposed_service_name(component_name):
36     return ("x{0}".format(component_name))[:63]
37
38 def _configure_api():
39     # Look for a kubernetes config file in ~/.kube/config
40     kubepath = os.path.join(os.environ["HOME"], '.kube/config')
41     if os.path.exists(kubepath):
42         config.load_kube_config(kubepath)
43     else:
44         # Maybe we're running in a k8s container and we can use info provided by k8s
45         # We would like to use:
46         # config.load_incluster_config()
47         # but this looks into os.environ for kubernetes host and port, and from
48         # the plugin those aren't visible.   So we use the InClusterConfigLoader class,
49         # where we can set the environment to what we like.
50         # This is probably brittle!  Maybe there's a better alternative.
51         localenv = {
52             config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local",
53             config.incluster_config.SERVICE_PORT_ENV_NAME : "443"
54         }
55         config.incluster_config.InClusterConfigLoader(
56             token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME,
57             cert_filename=config.incluster_config.SERVICE_CERT_FILENAME,
58             environ=localenv
59         ).load_and_set()
60
61 def _create_probe(hc, port):
62     ''' Create a Kubernetes probe based on info in the health check dictionary hc '''
63     probe_type = hc['type']
64     probe = None
65     period = hc.get('interval', PROBE_DEFAULT_PERIOD)
66     timeout = hc.get('timeout', PROBE_DEFAULT_TIMEOUT)
67     if probe_type in ['http', 'https']:
68         probe = client.V1Probe(
69           failure_threshold = 1,
70           initial_delay_seconds = 5,
71           period_seconds = period,
72           timeout_seconds = timeout,
73           http_get = client.V1HTTPGetAction(
74               path = hc['endpoint'],
75               port = port,
76               scheme = probe_type.upper()
77           )
78         )
79     elif probe_type in ['script', 'docker']:
80         probe = client.V1Probe(
81           failure_threshold = 1,
82           initial_delay_seconds = 5,
83           period_seconds = period,
84           timeout_seconds = timeout,
85           _exec = client.V1ExecAction(
86               command = [hc['script']]
87           )
88         )
89     return probe
90
91 def _create_container_object(name, image, always_pull, env={}, container_ports=[], volume_mounts = [], readiness = None):
92     # Set up environment variables
93     # Copy any passed in environment variables
94     env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env.keys()]
95     # Add POD_IP with the IP address of the pod running the container
96     pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP"))
97     env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip))
98
99     # If a health check is specified, create a readiness probe
100     # (For an HTTP-based check, we assume it's at the first container port)
101     probe = None
102
103     if readiness:
104         hc_port = None
105         if len(container_ports) > 0:
106             hc_port = container_ports[0]
107         probe = _create_probe(readiness, hc_port)
108
109     # Define container for pod
110     return client.V1Container(
111         name=name,
112         image=image,
113         image_pull_policy='Always' if always_pull else 'IfNotPresent',
114         env=env_vars,
115         ports=[client.V1ContainerPort(container_port=p) for p in container_ports],
116         volume_mounts = volume_mounts,
117         readiness_probe = probe
118     )
119
120 def _create_deployment_object(component_name,
121                               containers,
122                               replicas,
123                               volumes,
124                               labels={},
125                               pull_secrets=[]):
126
127     deployment_name = _create_deployment_name(component_name)
128
129     # Label the pod with the deployment name, so we can find it easily
130     labels.update({"k8sdeployment" : deployment_name})
131
132     # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
133     # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
134     ips = []
135     for secret in pull_secrets:
136         ips.append(client.V1LocalObjectReference(name=secret))
137
138     # Define pod template
139     template = client.V1PodTemplateSpec(
140         metadata=client.V1ObjectMeta(labels=labels),
141         spec=client.V1PodSpec(hostname=component_name,
142                               containers=containers,
143                               volumes=volumes,
144                               image_pull_secrets=ips)
145     )
146
147     # Define deployment spec
148     spec = client.ExtensionsV1beta1DeploymentSpec(
149         replicas=replicas,
150         template=template
151     )
152
153     # Create deployment object
154     deployment = client.ExtensionsV1beta1Deployment(
155         kind="Deployment",
156         metadata=client.V1ObjectMeta(name=deployment_name),
157         spec=spec
158     )
159
160     return deployment
161
162 def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type):
163     service_spec = client.V1ServiceSpec(
164         ports=service_ports,
165         selector={"app" : component_name},
166         type=service_type
167     )
168     if annotations:
169         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations)
170     else:
171         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels)
172
173     service = client.V1Service(
174         kind="Service",
175         api_version="v1",
176         metadata=metadata,
177         spec=service_spec
178     )
179     return service
180
181 def _parse_ports(port_list):
182     container_ports = []
183     port_map = {}
184     for p in port_list:
185         try:
186             [container, host] = (p.strip()).split(":",2)
187             cport = int(container)
188             container_ports.append(cport)
189             hport = int(host)
190             port_map[container] = hport
191         except:
192             pass    # if something doesn't parse, we just ignore it
193
194     return container_ports, port_map
195
196 def _parse_volumes(volume_list):
197     volumes = []
198     volume_mounts = []
199     for v in volume_list:
200         vname = str(uuid.uuid4())
201         vhost = v['host']['path']
202         vcontainer = v['container']['bind']
203         vro = (v['container']['mode'] == 'ro')
204         volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
205         volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro))
206
207     return volumes, volume_mounts
208
209 def _service_exists(namespace, component_name):
210     exists = False
211     try:
212         _configure_api()
213         client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace)
214         exists = True
215     except client.rest.ApiException:
216         pass
217
218     return exists
219
220 def _patch_deployment(namespace, deployment, modify):
221     '''
222     Gets the current spec for 'deployment' in 'namespace',
223     uses the 'modify' function to change the spec,
224     then sends the updated spec to k8s.
225     '''
226     _configure_api()
227
228     # Get deployment spec
229     spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
230
231     # Apply changes to spec
232     spec = modify(spec)
233
234     # Patch the deploy with updated spec
235     client.ExtensionsV1beta1Api().patch_namespaced_deployment(deployment, namespace, spec)
236
237 def _execute_command_in_pod(namespace, pod_name, command):
238     '''
239     Execute the command (specified by an argv-style list in  the "command" parameter) in
240     the specified pod in the specified namespace.  For now at least, we use this only to
241     run a notification script in a pod after a configuration change.
242
243     The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec".
244     Unfortunately due to a bug/limitation in the Python client library, we can't call it directly.
245     We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API.
246     I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py.
247     There are several issues tracking this, in various states.  It isn't clear that there will ever
248     be a fix.
249         - https://github.com/kubernetes-client/python/issues/58
250         - https://github.com/kubernetes-client/python/issues/409
251         - https://github.com/kubernetes-client/python/issues/526
252
253     The main consequence of the workaround using "stream" is that the caller does not get an indication
254     of the exit code returned by the command when it completes execution.   It turns out that the
255     original implementation of notification in the Docker plugin did not use this result, so we can
256     still match the original notification functionality.
257
258     The "stream" approach returns a string containing any output sent by the command to stdout or stderr.
259     We'll return that so it can logged.
260     '''
261     _configure_api()
262     try:
263         output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec,
264                              name=pod_name,
265                              namespace=namespace,
266                              command=command,
267                              stdout=True,
268                              stderr=True,
269                             stdin=False,
270                             tty=False)
271     except client.rest.ApiException as e:
272         # If the exception indicates the pod wasn't found,  it's not a fatal error.
273         # It existed when we enumerated the pods for the deployment but no longer exists.
274         # Unfortunately, the only way to distinguish a pod not found from any other error
275         # is by looking at the reason text.
276         # (The ApiException's "status" field should contain the HTTP status code, which would
277         # be 404 if the pod isn't found, but empirical testing reveals that "status" is set
278         # to zero.)
279         if "404 not found" in e.reason.lower():
280             output = "Pod not found"
281         else:
282             raise e
283
284     return {"pod" : pod_name, "output" : output}
285
286 def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs):
287     '''
288     This will create a k8s Deployment and, if needed, one or two k8s Services.
289     (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
290     We're not exposing k8s to the component developer and the blueprint author.
291     This is a conscious choice.  We want to use k8s in a controlled, consistent way, and we want to hide
292     the details from the component developer and the blueprint author.)
293
294     namespace:  the Kubernetes namespace into which the component is deployed
295     component_name:  the component name, used to derive names of Kubernetes entities
296     image: the docker image for the component being deployed
297     replica: the number of instances of the component to be deployed
298     always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of
299        the Docker image for the component, even if it is already present on the Kubernetes node.
300     k8sconfig contains:
301         - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images.
302           (DON'T PANIC:  these are just the names of secrets held in the Kubernetes secret store.)
303         - filebeat: a dictionary of filebeat sidecar parameters:
304             "log_path" : mount point for log volume in filebeat container
305             "data_path" : mount point for data volume in filebeat container
306             "config_path" : mount point for config volume in filebeat container
307             "config_subpath" :  subpath for config data in filebeat container
308             "config_map" : ConfigMap holding the filebeat configuration
309             "image": Docker image to use for filebeat
310     kwargs may have:
311         - volumes:  array of volume objects, where a volume object is:
312             {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
313         - ports: array of strings in the form "container_port:host_port"
314         - env: map of name-value pairs ( {name0: value0, name1: value1...}
315         - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
316         - log_info: an object with info for setting up ELK logging, with the form:
317             {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}
318         - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"}
319             These label will be set on all the pods deployed as a result of this deploy() invocation.
320         - readiness: dict with health check info; if present, used to create a readiness probe for the main container.  Includes:
321             - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
322             - interval: period (in seconds) between probes
323             - timeout:  time (in seconds) to allow a probe to complete
324             - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types
325             - path: the full path to the script to be executed in the container for "script" and "docker" types
326
327     '''
328
329     deployment_ok = False
330     cip_service_created = False
331     deployment_description = {
332         "namespace": namespace,
333         "deployment": '',
334         "services": []
335     }
336
337     try:
338         _configure_api()
339
340         # Get API handles
341         core = client.CoreV1Api()
342         ext = client.ExtensionsV1beta1Api()
343
344         # Parse the port mapping into [container_port,...] and [{"host_port" : "container_port"},...]
345         container_ports, port_map = _parse_ports(kwargs.get("ports", []))
346
347         # Parse the volumes list into volumes and volume_mounts for the deployment
348         volumes, volume_mounts = _parse_volumes(kwargs.get("volumes",[]))
349
350         # Initialize the list of containers that will be part of the pod
351         containers = []
352
353         # Set up the ELK logging sidecar container, if needed
354         log_info = kwargs.get("log_info")
355         if log_info and "log_directory" in log_info:
356             log_dir = log_info["log_directory"]
357             fb = k8sconfig["filebeat"]
358             sidecar_volume_mounts = []
359
360             # Create the volume for component log files and volume mounts for the component and sidecar containers
361             volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
362             volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
363             sc_path = log_info["alternate_fb_path"] if "alternate_fb_path" in log_info  \
364                 else "{0}/{1}".format(fb["log_path"], component_name)
365             sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
366
367             # Create the volume for sidecar data and the volume mount for it
368             volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
369             sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=fb["data_path"]))
370
371             # Create the container for the sidecar
372             containers.append(_create_container_object("filebeat", fb["image"], False, {}, [], sidecar_volume_mounts))
373
374             # Create the volume for the sidecar configuration data and the volume mount for it
375             # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
376             volumes.append(
377                 client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=fb["config_map"])))
378             sidecar_volume_mounts.append(
379                 client.V1VolumeMount(name="filebeat-conf", mount_path=fb["config_path"], sub_path=fb["config_subpath"]))
380
381         # Create the container for the component
382         # Make it the first container in the pod
383         containers.insert(0, _create_container_object(component_name, image, always_pull, kwargs.get("env", {}), container_ports, volume_mounts, kwargs["readiness"]))
384
385         # Build the k8s Deployment object
386         labels = kwargs.get("labels", {})
387         labels.update({"app": component_name})
388         dep = _create_deployment_object(component_name, containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"])
389
390         # Have k8s deploy it
391         ext.create_namespaced_deployment(namespace, dep)
392         deployment_ok = True
393         deployment_description["deployment"] = _create_deployment_name(component_name)
394
395         # Create service(s), if a port mapping is specified
396         if port_map:
397             service_ports = []      # Ports exposed internally on the k8s network
398             exposed_ports = []      # Ports to be mapped to ports on the k8s nodes via NodePort
399             for cport, hport in port_map.iteritems():
400                 service_ports.append(client.V1ServicePort(port=int(cport),name="port-{}".format(cport)))
401                 if int(hport) != 0:
402                     exposed_ports.append(client.V1ServicePort(port=int(cport), node_port=int(hport),name="xport-{}".format(cport)))
403
404             # If there are ports to be exposed via MSB, set up the annotation for the service
405             msb_list = kwargs.get("msb_list")
406             annotations = msb.create_msb_annotation(msb_list) if msb_list else ''
407
408             # Create a ClusterIP service for access via the k8s network
409             service = _create_service_object(_create_service_name(component_name), component_name, service_ports, annotations, labels, "ClusterIP")
410             core.create_namespaced_service(namespace, service)
411             cip_service_created = True
412             deployment_description["services"].append(_create_service_name(component_name))
413
414             # If there are ports to be exposed on the k8s nodes, create a "NodePort" service
415             if len(exposed_ports) > 0:
416                 exposed_service = \
417                     _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, '', labels, "NodePort")
418                 core.create_namespaced_service(namespace, exposed_service)
419                 deployment_description["services"].append(_create_exposed_service_name(component_name))
420
421     except Exception as e:
422         # If the ClusterIP service was created, delete the service:
423         if cip_service_created:
424             core.delete_namespaced_service(_create_service_name(component_name), namespace)
425         # If the deployment was created but not the service, delete the deployment
426         if deployment_ok:
427             client.ExtensionsV1beta1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, client.V1DeleteOptions())
428         raise e
429
430     return dep, deployment_description
431
432 def undeploy(deployment_description):
433     _configure_api()
434
435     namespace = deployment_description["namespace"]
436
437     # remove any services associated with the component
438     for service in deployment_description["services"]:
439         client.CoreV1Api().delete_namespaced_service(service, namespace)
440
441     # Have k8s delete the underlying pods and replicaset when deleting the deployment.
442     options = client.V1DeleteOptions(propagation_policy="Foreground")
443     client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, options)
444
445 def is_available(namespace, component_name):
446     _configure_api()
447     dep_status = client.AppsV1beta1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace)
448     # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec
449     # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation
450     return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas
451
452 def scale(deployment_description, replicas):
453     ''' Trigger a scaling operation by updating the replica count for the Deployment '''
454
455     def update_replica_count(spec):
456         spec.spec.replicas = replicas
457         return spec
458
459     _patch_deployment(deployment_description["namespace"], deployment_description["deployment"], update_replica_count)
460
461 def upgrade(deployment_description, image, container_index = 0):
462     ''' Trigger a rolling upgrade by sending a new image name/tag to k8s '''
463
464     def update_image(spec):
465         spec.spec.template.spec.containers[container_index].image = image
466         return spec
467
468     _patch_deployment(deployment_description["namespace"], deployment_description["deployment"], update_image)
469
470 def rollback(deployment_description, rollback_to=0):
471     '''
472     Undo upgrade by rolling back to a previous revision of the deployment.
473     By default, go back one revision.
474     rollback_to can be used to supply a specific revision number.
475     Returns the image for the app container and the replica count from the rolled-back deployment
476     '''
477     '''
478     2018-07-13
479     Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method.
480     The k8s python client code throws an exception while processing the response from the API.
481     See:
482        - https://github.com/kubernetes-client/python/issues/491
483        - https://github.com/kubernetes/kubernetes/pull/63837
484     The fix has been merged into the master branch but is not in the latest release.
485     '''
486     _configure_api()
487     deployment = deployment_description["deployment"]
488     namespace = deployment_description["namespace"]
489
490     # Initiate the rollback
491     client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback(
492         deployment,
493         namespace,
494         client.AppsV1beta1DeploymentRollback(name=deployment, rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to)))
495
496     # Read back the spec for the rolled-back deployment
497     spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
498     return spec.spec.template.spec.containers[0].image, spec.spec.replicas
499
500 def execute_command_in_deployment(deployment_description, command):
501     '''
502     Enumerates the pods in the k8s deployment identified by "deployment_description",
503     then executes the command (represented as an argv-style list) in "command" in
504     container 0 (the main application container) each of those pods.
505
506     Note that the sets of pods associated with a deployment can change over time.  The
507     enumeration is a snapshot at one point in time.  The command will not be executed in
508     pods that are created after the initial enumeration.   If a pod disappears after the
509     initial enumeration and before the command is executed, the attempt to execute the
510     command will fail.  This is not treated as a fatal error.
511
512     This approach is reasonable for the one current use case for "execute_command":  running a
513     script to notify a container that its configuration has changed as a result of a
514     policy change.  In this use case, the new configuration information is stored into
515     the configuration store (Consul), the pods are enumerated, and the command is executed.
516     If a pod disappears after the enumeration, the fact that the command cannot be run
517     doesn't matter--a nonexistent pod doesn't need to be reconfigured.  Similarly, a pod that
518     comes up after the enumeration will get its initial configuration from the updated version
519     in Consul.
520
521     The optimal solution here would be for k8s to provide an API call to execute a command in
522     all of the pods for a deployment.   Unfortunately, k8s does not provide such a call--the
523     only call provided by k8s operates at the pod level, not the deployment level.
524
525     Another interesting k8s factoid: there's no direct way to list the pods belong to a
526     particular k8s deployment.   The deployment code above sets a label ("k8sdeployment") on
527     the pod that has the k8s deployment name.  To list the pods, the code below queries for
528     pods with the label carrying the deployment name.
529     '''
530
531     _configure_api()
532     deployment = deployment_description["deployment"]
533     namespace = deployment_description["namespace"]
534
535     # Get names of all the running pods belonging to the deployment
536     pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod(
537         namespace = namespace,
538         label_selector = "k8sdeployment={0}".format(deployment),
539         field_selector = "status.phase=Running"
540     ).items]
541
542     def do_execute(pod_name):
543         return _execute_command_in_pod(namespace, pod_name, command)
544
545     # Execute command in the running pods
546     return map(do_execute, pod_names)