7ca7b031dfaac7f7ae32650793a5b849cb401ba7
[dcaegen2/platform/plugins.git] / k8s / k8sclient / k8sclient.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #      http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
18 #
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
20 import os
21 import uuid
22 from msb import msb
23 from kubernetes import config, client
24
25 # Default values for readiness probe
26 PROBE_DEFAULT_PERIOD = 15
27 PROBE_DEFAULT_TIMEOUT = 1
28
29 def _create_deployment_name(component_name):
30     return "dep-{0}".format(component_name)
31
32 def _create_service_name(component_name):
33     return "{0}".format(component_name)
34
35 def _create_exposed_service_name(component_name):
36     return ("x{0}".format(component_name))[:63]
37
38 def _configure_api():
39     # Look for a kubernetes config file in ~/.kube/config
40     kubepath = os.path.join(os.environ["HOME"], '.kube/config')
41     if os.path.exists(kubepath):
42         config.load_kube_config(kubepath)
43     else:
44         # Maybe we're running in a k8s container and we can use info provided by k8s
45         # We would like to use:
46         # config.load_incluster_config()
47         # but this looks into os.environ for kubernetes host and port, and from
48         # the plugin those aren't visible.   So we use the InClusterConfigLoader class,
49         # where we can set the environment to what we like.
50         # This is probably brittle!  Maybe there's a better alternative.
51         localenv = {
52             config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local",
53             config.incluster_config.SERVICE_PORT_ENV_NAME : "443"
54         }
55         config.incluster_config.InClusterConfigLoader(
56             token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME,
57             cert_filename=config.incluster_config.SERVICE_CERT_FILENAME,
58             environ=localenv
59         ).load_and_set()
60
61 def _create_probe(hc, port):
62     ''' Create a Kubernetes probe based on info in the health check dictionary hc '''
63     probe_type = hc['type']
64     probe = None
65     period = hc.get('interval', PROBE_DEFAULT_PERIOD)
66     timeout = hc.get('timeout', PROBE_DEFAULT_TIMEOUT)
67     if probe_type == 'http' or probe_type == 'https':
68         probe = client.V1Probe(
69           failure_threshold = 1,
70           initial_delay_seconds = 5,
71           period_seconds = period,
72           timeout_seconds = timeout,
73           http_get = client.V1HTTPGetAction(
74               path = hc['endpoint'],
75               port = port,
76               scheme = probe_type.upper()
77           )  
78         )
79     elif probe_type == 'script' or probe_type == 'docker':
80         probe = client.V1Probe(
81           failure_threshold = 1,
82           initial_delay_seconds = 5,
83           period_seconds = period,
84           timeout_seconds = timeout,
85           _exec = client.V1ExecAction(
86               command = [hc['script']]
87           )  
88         )
89     return probe        
90
91 def _create_container_object(name, image, always_pull, env={}, container_ports=[], volume_mounts = [], readiness = None):
92     # Set up environment variables
93     # Copy any passed in environment variables
94     env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env.keys()]
95     # Add POD_IP with the IP address of the pod running the container
96     pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP"))
97     env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip))
98
99     # If a health check is specified, create a readiness probe
100     # (For an HTTP-based check, we assume it's at the first container port)
101     probe = None
102    
103     if (readiness):    
104         hc_port = None
105         if len(container_ports) > 0:
106             hc_port = container_ports[0]
107         probe = _create_probe(readiness, hc_port)
108
109     # Define container for pod
110     return client.V1Container(
111         name=name,
112         image=image,
113         image_pull_policy='Always' if always_pull else 'IfNotPresent',
114         env=env_vars,
115         ports=[client.V1ContainerPort(container_port=p) for p in container_ports],
116         volume_mounts = volume_mounts,
117         readiness_probe = probe
118     )
119
120 def _create_deployment_object(component_name,
121                               containers,
122                               replicas,
123                               volumes,
124                               labels, 
125                               pull_secrets=[]):
126
127     # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
128     # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
129     ips = []
130     for secret in pull_secrets:
131         ips.append(client.V1LocalObjectReference(name=secret))
132
133     # Define pod template
134     template = client.V1PodTemplateSpec(
135         metadata=client.V1ObjectMeta(labels=labels),
136         spec=client.V1PodSpec(hostname=component_name,
137                               containers=containers,
138                               volumes=volumes,
139                               image_pull_secrets=ips)
140     )
141
142     # Define deployment spec
143     spec = client.ExtensionsV1beta1DeploymentSpec(
144         replicas=replicas,
145         template=template
146     )
147     
148     # Create deployment object
149     deployment = client.ExtensionsV1beta1Deployment(
150         kind="Deployment",
151         metadata=client.V1ObjectMeta(name=_create_deployment_name(component_name)),
152         spec=spec
153     )
154     
155     return deployment
156
157 def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type):
158     service_spec = client.V1ServiceSpec(
159         ports=service_ports,
160         selector={"app" : component_name},
161         type=service_type
162     )
163     if annotations:
164         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations)
165     else:
166         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels)
167
168     service = client.V1Service(
169         kind="Service",
170         api_version="v1",
171         metadata=metadata,
172         spec=service_spec
173     )
174     return service
175
176 def _parse_ports(port_list):
177     container_ports = []
178     port_map = {}
179     for p in port_list:
180         try:
181             [container, host] = (p.strip()).split(":",2)
182             cport = int(container)
183             container_ports.append(cport)
184             hport = int(host)
185             port_map[container] = hport
186         except:
187             pass    # if something doesn't parse, we just ignore it
188         
189     return container_ports, port_map
190
191 def _parse_volumes(volume_list):
192     volumes = []
193     volume_mounts = []
194     for v in volume_list:
195         vname = str(uuid.uuid4())
196         vhost = v['host']['path']
197         vcontainer = v['container']['bind']
198         vro = (v['container']['mode'] == 'ro')
199         volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
200         volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro))
201             
202     return volumes, volume_mounts
203
204 def _service_exists(namespace, component_name):
205     exists = False
206     try:
207         _configure_api()
208         client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace)
209         exists = True
210     except client.rest.ApiException:
211         pass
212     
213     return exists
214
215 def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs):
216     '''
217     This will create a k8s Deployment and, if needed, one or two k8s Services.
218     (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
219     We're not exposing k8s to the component developer and the blueprint author.
220     This is a conscious choice.  We want to use k8s in a controlled, consistent way, and we want to hide
221     the details from the component developer and the blueprint author.)
222     
223     namespace:  the Kubernetes namespace into which the component is deployed
224     component_name:  the component name, used to derive names of Kubernetes entities
225     image: the docker image for the component being deployed
226     replica: the number of instances of the component to be deployed
227     always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of
228        the Docker image for the component, even if it is already present on the Kubernetes node.
229     k8sconfig contains:
230         - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images.
231           (DON'T PANIC:  these are just the names of secrets held in the Kubernetes secret store.)
232         - filebeat: a dictionary of filebeat sidecar parameters:
233             "log_path" : mount point for log volume in filebeat container
234             "data_path" : mount point for data volume in filebeat container
235             "config_path" : mount point for config volume in filebeat container
236             "config_subpath" :  subpath for config data in filebeat container
237             "config_map" : ConfigMap holding the filebeat configuration
238             "image": Docker image to use for filebeat
239     kwargs may have:
240         - volumes:  array of volume objects, where a volume object is:
241             {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
242         - ports: array of strings in the form "container_port:host_port"
243         - env: map of name-value pairs ( {name0: value0, name1: value1...}
244         - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
245         - log_info: an object with info for setting up ELK logging, with the form:
246             {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}
247         - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"}
248             These label will be set on all the pods deployed as a result of this deploy() invocation.
249         - readiness: dict with health check info; if present, used to create a readiness probe for the main container.  Includes:
250             - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
251             - interval: period (in seconds) between probes
252             - timeout:  time (in seconds) to allow a probe to complete
253             - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types
254             - path: the full path to the script to be executed in the container for "script" and "docker" types
255
256     '''
257
258     deployment_ok = False
259     cip_service_created = False
260     deployment_description = {
261         "namespace": namespace,
262         "deployment": '',
263         "services": []
264     }
265
266     try:
267         _configure_api()
268
269         # Get API handles
270         core = client.CoreV1Api()
271         ext = client.ExtensionsV1beta1Api()
272
273         # Parse the port mapping into [container_port,...] and [{"host_port" : "container_port"},...]
274         container_ports, port_map = _parse_ports(kwargs.get("ports", []))
275
276         # Parse the volumes list into volumes and volume_mounts for the deployment
277         volumes, volume_mounts = _parse_volumes(kwargs.get("volumes",[]))
278
279         # Initialize the list of containers that will be part of the pod
280         containers = []
281
282         # Set up the ELK logging sidecar container, if needed
283         log_info = kwargs.get("log_info")
284         if log_info and "log_directory" in log_info:
285             log_dir = log_info["log_directory"]
286             fb = k8sconfig["filebeat"]
287             sidecar_volume_mounts = []
288
289             # Create the volume for component log files and volume mounts for the component and sidecar containers
290             volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
291             volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
292             sc_path = log_info["alternate_fb_path"] if "alternate_fb_path" in log_info  \
293                 else "{0}/{1}".format(fb["log_path"], component_name)
294             sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
295
296             # Create the volume for sidecar data and the volume mount for it
297             volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
298             sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=fb["data_path"]))
299
300             # Create the container for the sidecar
301             containers.append(_create_container_object("filebeat", fb["image"], False, {}, [], sidecar_volume_mounts))
302
303             # Create the volume for the sidecar configuration data and the volume mount for it
304             # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
305             volumes.append(
306                 client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=fb["config_map"])))
307             sidecar_volume_mounts.append(
308                 client.V1VolumeMount(name="filebeat-conf", mount_path=fb["config_path"], sub_path=fb["config_subpath"]))
309
310         # Create the container for the component
311         # Make it the first container in the pod
312         containers.insert(0, _create_container_object(component_name, image, always_pull, kwargs.get("env", {}), container_ports, volume_mounts, kwargs["readiness"]))
313
314         # Build the k8s Deployment object
315         labels = kwargs.get("labels", {})
316         labels.update({"app": component_name})
317         dep = _create_deployment_object(component_name, containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"])
318
319         # Have k8s deploy it
320         ext.create_namespaced_deployment(namespace, dep)
321         deployment_ok = True
322         deployment_description["deployment"] = _create_deployment_name(component_name)
323
324         # Create service(s), if a port mapping is specified
325         if port_map:
326             service_ports = []      # Ports exposed internally on the k8s network
327             exposed_ports = []      # Ports to be mapped to ports on the k8s nodes via NodePort
328             for cport, hport in port_map.iteritems():
329                 service_ports.append(client.V1ServicePort(port=int(cport),name="port-{}".format(cport)))
330                 if int(hport) != 0:
331                     exposed_ports.append(client.V1ServicePort(port=int(cport), node_port=int(hport),name="xport-{}".format(cport)))
332
333             # If there are ports to be exposed via MSB, set up the annotation for the service
334             msb_list = kwargs.get("msb_list")
335             annotations = msb.create_msb_annotation(msb_list) if msb_list else ''
336
337             # Create a ClusterIP service for access via the k8s network
338             service = _create_service_object(_create_service_name(component_name), component_name, service_ports, annotations, labels, "ClusterIP")
339             core.create_namespaced_service(namespace, service)
340             cip_service_created = True
341             deployment_description["services"].append(_create_service_name(component_name))
342
343             # If there are ports to be exposed on the k8s nodes, create a "NodePort" service
344             if len(exposed_ports) > 0:
345                 exposed_service = \
346                     _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, '', labels, "NodePort")
347                 core.create_namespaced_service(namespace, exposed_service)
348                 deployment_description["services"].append(_create_exposed_service_name(component_name))
349
350     except Exception as e:
351         # If the ClusterIP service was created, delete the service:
352         if cip_service_created:
353             core.delete_namespaced_service(_create_service_name(component_name), namespace)
354         # If the deployment was created but not the service, delete the deployment
355         if deployment_ok:
356             client.ExtensionsV1beta1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, client.V1DeleteOptions())
357         raise e
358
359     return dep, deployment_description
360
361 def undeploy(deployment_description):
362     # TODO: do real configuration
363     _configure_api()
364
365     namespace = deployment_description["namespace"]
366     
367     # remove any services associated with the component
368     for service in deployment_description["services"]:
369         client.CoreV1Api().delete_namespaced_service(service, namespace)
370
371     # Have k8s delete the underlying pods and replicaset when deleting the deployment.
372     options = client.V1DeleteOptions(propagation_policy="Foreground")
373     client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, options)
374
375 def is_available(namespace, component_name):
376     _configure_api()
377     dep_status = client.AppsV1beta1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace)
378     # Check if the number of available replicas is equal to the number requested
379     return dep_status.status.available_replicas >= dep_status.spec.replicas
380
381 def scale(deployment_description, replicas):
382     # TODO: do real configuration
383     _configure_api()
384
385     namespace = deployment_description["namespace"]
386     name = deployment_description["deployment"]
387
388     # Get deployment spec
389     spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(name, namespace)
390
391     # Update the replica count in the spec
392     spec.spec.replicas = replicas
393     client.ExtensionsV1beta1Api().patch_namespaced_deployment(name, namespace, spec)
394