Add resource_config to specify CPU and menory
[dcaegen2/platform/plugins.git] / k8s / k8sclient / k8sclient.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #      http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
18 #
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
20 import os
21 import re
22 import uuid
23 from msb import msb
24 from kubernetes import config, client, stream
25
26 # Default values for readiness probe
27 PROBE_DEFAULT_PERIOD = 15
28 PROBE_DEFAULT_TIMEOUT = 1
29
30 # Regular expression for interval/timeout specification
31 INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$")
32 # Conversion factors to seconds
33 FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600}
34
35 # Regular expression for port mapping
36 # group 1: container port
37 # group 2: / + protocol
38 # group 3: protocol
39 # group 4: host port
40 PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$")
41    
42 def _create_deployment_name(component_name):
43     return "dep-{0}".format(component_name)
44
45 def _create_service_name(component_name):
46     return "{0}".format(component_name)
47
48 def _create_exposed_service_name(component_name):
49     return ("x{0}".format(component_name))[:63]
50
51 def _configure_api():
52     # Look for a kubernetes config file in ~/.kube/config
53     kubepath = os.path.join(os.environ["HOME"], '.kube/config')
54     if os.path.exists(kubepath):
55         config.load_kube_config(kubepath)
56     else:
57         # Maybe we're running in a k8s container and we can use info provided by k8s
58         # We would like to use:
59         # config.load_incluster_config()
60         # but this looks into os.environ for kubernetes host and port, and from
61         # the plugin those aren't visible.   So we use the InClusterConfigLoader class,
62         # where we can set the environment to what we like.
63         # This is probably brittle!  Maybe there's a better alternative.
64         localenv = {
65             config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local",
66             config.incluster_config.SERVICE_PORT_ENV_NAME : "443"
67         }
68         config.incluster_config.InClusterConfigLoader(
69             token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME,
70             cert_filename=config.incluster_config.SERVICE_CERT_FILENAME,
71             environ=localenv
72         ).load_and_set()
73
74 def _parse_interval(t):
75     """
76     Parse an interval specification
77     t can be
78        - a simple integer quantity, interpreted as seconds
79        - a string representation of a decimal integer, interpreted as seconds
80        - a string consisting of a represention of an decimal integer followed by a unit,
81          with "s" representing seconds, "m" representing minutes,
82          and "h" representing hours
83     Used for compatibility with the Docker plugin, where time intervals
84     for health checks were specified as strings with a number and a unit.
85     See 'intervalspec' above for the regular expression that's accepted.
86     """
87     m = INTERVAL_SPEC.match(str(t))
88     if m:
89         time = int(m.group(1)) * FACTORS[m.group(2)]
90     else:
91         raise ValueError("Bad interval specification: {0}".format(t))
92     return time
93
94 def _create_probe(hc, port, use_tls=False):
95     ''' Create a Kubernetes probe based on info in the health check dictionary hc '''
96     probe_type = hc['type']
97     probe = None
98     period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD))
99     timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT))
100     if probe_type in ['http', 'https']:
101         probe = client.V1Probe(
102           failure_threshold = 1,
103           initial_delay_seconds = 5,
104           period_seconds = period,
105           timeout_seconds = timeout,
106           http_get = client.V1HTTPGetAction(
107               path = hc['endpoint'],
108               port = port,
109               scheme = probe_type.upper()
110           )
111         )
112     elif probe_type in ['script', 'docker']:
113         probe = client.V1Probe(
114           failure_threshold = 1,
115           initial_delay_seconds = 5,
116           period_seconds = period,
117           timeout_seconds = timeout,
118           _exec = client.V1ExecAction(
119               command = [hc['script']]
120           )
121         )
122     return probe
123
124 def _create_resources(resources=None):
125     if resources is not None:
126         resources_obj = client.V1ResourceRequirements(
127           limits = resources.get("limits"),
128           requests = resources.get("requests")
129         )   
130         return resources_obj
131     else:
132         return None
133
134 def _create_container_object(name, image, always_pull, use_tls=False, env={}, container_ports=[], volume_mounts = [], resources = None, readiness = None):
135     # Set up environment variables
136     # Copy any passed in environment variables
137     env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env.keys()]
138     # Add POD_IP with the IP address of the pod running the container
139     pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP"))
140     env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip))
141
142     # If a health check is specified, create a readiness probe
143     # (For an HTTP-based check, we assume it's at the first container port)
144     probe = None
145
146     if readiness:
147         hc_port = None
148         if len(container_ports) > 0:
149             (hc_port, proto) = container_ports[0]
150         probe = _create_probe(readiness, hc_port, use_tls)
151
152     if resources:
153         resources_obj = _create_resources(resources)
154     else:
155         resources_obj = None
156     # Define container for pod
157     return client.V1Container(
158         name=name,
159         image=image,
160         image_pull_policy='Always' if always_pull else 'IfNotPresent',
161         env=env_vars,
162         ports=[client.V1ContainerPort(container_port=p, protocol=proto) for (p, proto) in container_ports],
163         volume_mounts = volume_mounts,
164         resources = resources_obj,
165         readiness_probe = probe
166     )
167
168 def _create_deployment_object(component_name,
169                               containers,
170                               init_containers,
171                               replicas,
172                               volumes,
173                               labels={},
174                               pull_secrets=[]):
175
176     deployment_name = _create_deployment_name(component_name)
177
178     # Label the pod with the deployment name, so we can find it easily
179     labels.update({"k8sdeployment" : deployment_name})
180
181     # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
182     # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
183     ips = []
184     for secret in pull_secrets:
185         ips.append(client.V1LocalObjectReference(name=secret))
186
187     # Define pod template
188     template = client.V1PodTemplateSpec(
189         metadata=client.V1ObjectMeta(labels=labels),
190         spec=client.V1PodSpec(hostname=component_name,
191                               containers=containers,
192                               init_containers=init_containers,
193                               volumes=volumes,
194                               image_pull_secrets=ips)
195     )
196
197     # Define deployment spec
198     spec = client.ExtensionsV1beta1DeploymentSpec(
199         replicas=replicas,
200         template=template
201     )
202
203     # Create deployment object
204     deployment = client.ExtensionsV1beta1Deployment(
205         kind="Deployment",
206         metadata=client.V1ObjectMeta(name=deployment_name),
207         spec=spec
208     )
209
210     return deployment
211
212 def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type):
213     service_spec = client.V1ServiceSpec(
214         ports=service_ports,
215         selector={"app" : component_name},
216         type=service_type
217     )
218     if annotations:
219         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations)
220     else:
221         metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels)
222
223     service = client.V1Service(
224         kind="Service",
225         api_version="v1",
226         metadata=metadata,
227         spec=service_spec
228     )
229     return service
230
231 def _parse_ports(port_list):
232     '''
233     Parse the port list into a list of container ports (needed to create the container)
234     and to a set of port mappings to set up k8s services.
235     '''
236     container_ports = []
237     port_map = {}
238     for p in port_list:
239         m = PORTS.match(p.strip())
240         if m:
241             cport = int(m.group(1))
242             hport = int (m.group(4))
243             if m.group(3):
244                 proto = (m.group(3)).upper()
245             else:
246                 proto = "TCP"
247             container_ports.append((cport, proto))
248             port_map[(cport, proto)] = hport
249         else:
250             raise ValueError("Bad port specification: {0}".format(p))
251
252     return container_ports, port_map
253
254 def _parse_volumes(volume_list):
255     volumes = []
256     volume_mounts = []
257     for v in volume_list:
258         vname = str(uuid.uuid4())
259         vhost = v['host']['path']
260         vcontainer = v['container']['bind']
261         vro = (v['container']['mode'] == 'ro')
262         volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
263         volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro))
264
265     return volumes, volume_mounts
266
267 def _service_exists(namespace, component_name):
268     exists = False
269     try:
270         _configure_api()
271         client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace)
272         exists = True
273     except client.rest.ApiException:
274         pass
275
276     return exists
277
278 def _patch_deployment(namespace, deployment, modify):
279     '''
280     Gets the current spec for 'deployment' in 'namespace',
281     uses the 'modify' function to change the spec,
282     then sends the updated spec to k8s.
283     '''
284     _configure_api()
285
286     # Get deployment spec
287     spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
288
289     # Apply changes to spec
290     spec = modify(spec)
291
292     # Patch the deploy with updated spec
293     client.ExtensionsV1beta1Api().patch_namespaced_deployment(deployment, namespace, spec)
294
295 def _execute_command_in_pod(namespace, pod_name, command):
296     '''
297     Execute the command (specified by an argv-style list in  the "command" parameter) in
298     the specified pod in the specified namespace.  For now at least, we use this only to
299     run a notification script in a pod after a configuration change.
300
301     The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec".
302     Unfortunately due to a bug/limitation in the Python client library, we can't call it directly.
303     We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API.
304     I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py.
305     There are several issues tracking this, in various states.  It isn't clear that there will ever
306     be a fix.
307         - https://github.com/kubernetes-client/python/issues/58
308         - https://github.com/kubernetes-client/python/issues/409
309         - https://github.com/kubernetes-client/python/issues/526
310
311     The main consequence of the workaround using "stream" is that the caller does not get an indication
312     of the exit code returned by the command when it completes execution.   It turns out that the
313     original implementation of notification in the Docker plugin did not use this result, so we can
314     still match the original notification functionality.
315
316     The "stream" approach returns a string containing any output sent by the command to stdout or stderr.
317     We'll return that so it can logged.
318     '''
319     _configure_api()
320     try:
321         output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec,
322                              name=pod_name,
323                              namespace=namespace,
324                              command=command,
325                              stdout=True,
326                              stderr=True,
327                             stdin=False,
328                             tty=False)
329     except client.rest.ApiException as e:
330         # If the exception indicates the pod wasn't found,  it's not a fatal error.
331         # It existed when we enumerated the pods for the deployment but no longer exists.
332         # Unfortunately, the only way to distinguish a pod not found from any other error
333         # is by looking at the reason text.
334         # (The ApiException's "status" field should contain the HTTP status code, which would
335         # be 404 if the pod isn't found, but empirical testing reveals that "status" is set
336         # to zero.)
337         if "404 not found" in e.reason.lower():
338             output = "Pod not found"
339         else:
340             raise e
341
342     return {"pod" : pod_name, "output" : output}
343
344 def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, resources, **kwargs):
345     '''
346     This will create a k8s Deployment and, if needed, one or two k8s Services.
347     (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
348     We're not exposing k8s to the component developer and the blueprint author.
349     This is a conscious choice.  We want to use k8s in a controlled, consistent way, and we want to hide
350     the details from the component developer and the blueprint author.)
351
352     namespace:  the Kubernetes namespace into which the component is deployed
353     component_name:  the component name, used to derive names of Kubernetes entities
354     image: the docker image for the component being deployed
355     replica: the number of instances of the component to be deployed
356     always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of
357        the Docker image for the component, even if it is already present on the Kubernetes node.
358     k8sconfig contains:
359         - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images.
360           (DON'T PANIC:  these are just the names of secrets held in the Kubernetes secret store.)
361         - filebeat: a dictionary of filebeat sidecar parameters:
362             "log_path" : mount point for log volume in filebeat container
363             "data_path" : mount point for data volume in filebeat container
364             "config_path" : mount point for config volume in filebeat container
365             "config_subpath" :  subpath for config data in filebeat container
366             "config_map" : ConfigMap holding the filebeat configuration
367             "image": Docker image to use for filebeat
368         - tls: a dictionary of TLS init container parameters:
369             "cert_path": mount point for certificate volume in init container
370             "image": Docker image to use for TLS init container
371     kwargs may have:
372         - volumes:  array of volume objects, where a volume object is:
373             {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
374         - ports: array of strings in the form "container_port:host_port"
375         - env: map of name-value pairs ( {name0: value0, name1: value1...}
376         - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
377         - log_info: an object with info for setting up ELK logging, with the form:
378             {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}
379         - tls_info: an object with info for setting up TLS (HTTPS), with the form:
380             {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" }
381         - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"}
382             These label will be set on all the pods deployed as a result of this deploy() invocation.
383         - readiness: dict with health check info; if present, used to create a readiness probe for the main container.  Includes:
384             - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
385             - interval: period (in seconds) between probes
386             - timeout:  time (in seconds) to allow a probe to complete
387             - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types
388             - path: the full path to the script to be executed in the container for "script" and "docker" types
389
390     '''
391
392     deployment_ok = False
393     cip_service_created = False
394     deployment_description = {
395         "namespace": namespace,
396         "deployment": '',
397         "services": []
398     }
399
400     try:
401         _configure_api()
402
403         # Get API handles
404         core = client.CoreV1Api()
405         ext = client.ExtensionsV1beta1Api()
406
407         # Parse the port mapping
408         container_ports, port_map = _parse_ports(kwargs.get("ports", []))
409
410         # Parse the volumes list into volumes and volume_mounts for the deployment
411         volumes, volume_mounts = _parse_volumes(kwargs.get("volumes",[]))
412
413         # Initialize the list of containers that will be part of the pod
414         containers = []
415         init_containers = []
416
417         # Set up the ELK logging sidecar container, if needed
418         log_info = kwargs.get("log_info")
419         if log_info and "log_directory" in log_info:
420             log_dir = log_info["log_directory"]
421             fb = k8sconfig["filebeat"]
422             sidecar_volume_mounts = []
423
424             # Create the volume for component log files and volume mounts for the component and sidecar containers
425             volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
426             volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
427             sc_path = log_info["alternate_fb_path"] if "alternate_fb_path" in log_info  \
428                 else "{0}/{1}".format(fb["log_path"], component_name)
429             sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
430
431             # Create the volume for sidecar data and the volume mount for it
432             volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
433             sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=fb["data_path"]))
434
435             # Create the container for the sidecar
436             containers.append(_create_container_object("filebeat", fb["image"], False, False, {}, [], sidecar_volume_mounts))
437
438             # Create the volume for the sidecar configuration data and the volume mount for it
439             # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
440             volumes.append(
441                 client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=fb["config_map"])))
442             sidecar_volume_mounts.append(
443                 client.V1VolumeMount(name="filebeat-conf", mount_path=fb["config_path"], sub_path=fb["config_subpath"]))
444
445         # Set up the TLS init container, if needed
446         tls_info = kwargs.get("tls_info")
447         use_tls = False
448         if tls_info and "use_tls" in tls_info and tls_info["use_tls"]:
449             if "cert_directory" in tls_info and len(tls_info["cert_directory"]) > 0:
450                 use_tls = True
451                 tls_config = k8sconfig["tls"]
452
453                 # Create the certificate volume and volume mounts
454                 volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource()))
455                 volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=tls_info["cert_directory"]))
456                 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])]
457
458                 # Create the init container
459                 init_containers.append(_create_container_object("init-tls", tls_config["image"], False, False, {}, [], init_volume_mounts))
460
461         # Create the container for the component
462         # Make it the first container in the pod
463         containers.insert(0, _create_container_object(component_name, image, always_pull, use_tls, kwargs.get("env", {}), container_ports, volume_mounts,  resources, kwargs["readiness"]))
464
465         # Build the k8s Deployment object
466         labels = kwargs.get("labels", {})
467         labels.update({"app": component_name})
468         dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"])
469
470         # Have k8s deploy it
471         ext.create_namespaced_deployment(namespace, dep)
472         deployment_ok = True
473         deployment_description["deployment"] = _create_deployment_name(component_name)
474
475         # Create service(s), if a port mapping is specified
476         if port_map:
477             service_ports = []      # Ports exposed internally on the k8s network
478             exposed_ports = []      # Ports to be mapped to ports on the k8s nodes via NodePort
479             for (cport, proto), hport in port_map.iteritems():
480                 service_ports.append(client.V1ServicePort(port=int(cport),protocol=proto,name="port-{0}-{1}".format(proto[0].lower(), cport)))
481                 if int(hport) != 0:
482                     exposed_ports.append(client.V1ServicePort(port=int(cport),protocol=proto,node_port=int(hport),name="xport-{0}-{1}".format(proto[0].lower(),cport)))
483
484             # If there are ports to be exposed via MSB, set up the annotation for the service
485             msb_list = kwargs.get("msb_list")
486             annotations = msb.create_msb_annotation(msb_list) if msb_list else ''
487
488             # Create a ClusterIP service for access via the k8s network
489             service = _create_service_object(_create_service_name(component_name), component_name, service_ports, annotations, labels, "ClusterIP")
490             core.create_namespaced_service(namespace, service)
491             cip_service_created = True
492             deployment_description["services"].append(_create_service_name(component_name))
493
494             # If there are ports to be exposed on the k8s nodes, create a "NodePort" service
495             if len(exposed_ports) > 0:
496                 exposed_service = \
497                     _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, '', labels, "NodePort")
498                 core.create_namespaced_service(namespace, exposed_service)
499                 deployment_description["services"].append(_create_exposed_service_name(component_name))
500
501     except Exception as e:
502         # If the ClusterIP service was created, delete the service:
503         if cip_service_created:
504             core.delete_namespaced_service(_create_service_name(component_name), namespace)
505         # If the deployment was created but not the service, delete the deployment
506         if deployment_ok:
507             client.ExtensionsV1beta1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, client.V1DeleteOptions())
508         raise e
509
510     return dep, deployment_description
511
512 def undeploy(deployment_description):
513     _configure_api()
514
515     namespace = deployment_description["namespace"]
516
517     # remove any services associated with the component
518     for service in deployment_description["services"]:
519         client.CoreV1Api().delete_namespaced_service(service, namespace)
520
521     # Have k8s delete the underlying pods and replicaset when deleting the deployment.
522     options = client.V1DeleteOptions(propagation_policy="Foreground")
523     client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, options)
524
525 def is_available(namespace, component_name):
526     _configure_api()
527     dep_status = client.AppsV1beta1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace)
528     # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec
529     # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation
530     return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas
531
532 def scale(deployment_description, replicas):
533     ''' Trigger a scaling operation by updating the replica count for the Deployment '''
534
535     def update_replica_count(spec):
536         spec.spec.replicas = replicas
537         return spec
538
539     _patch_deployment(deployment_description["namespace"], deployment_description["deployment"], update_replica_count)
540
541 def upgrade(deployment_description, image, container_index = 0):
542     ''' Trigger a rolling upgrade by sending a new image name/tag to k8s '''
543
544     def update_image(spec):
545         spec.spec.template.spec.containers[container_index].image = image
546         return spec
547
548     _patch_deployment(deployment_description["namespace"], deployment_description["deployment"], update_image)
549
550 def rollback(deployment_description, rollback_to=0):
551     '''
552     Undo upgrade by rolling back to a previous revision of the deployment.
553     By default, go back one revision.
554     rollback_to can be used to supply a specific revision number.
555     Returns the image for the app container and the replica count from the rolled-back deployment
556     '''
557     '''
558     2018-07-13
559     Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method.
560     The k8s python client code throws an exception while processing the response from the API.
561     See:
562        - https://github.com/kubernetes-client/python/issues/491
563        - https://github.com/kubernetes/kubernetes/pull/63837
564     The fix has been merged into the master branch but is not in the latest release.
565     '''
566     _configure_api()
567     deployment = deployment_description["deployment"]
568     namespace = deployment_description["namespace"]
569
570     # Initiate the rollback
571     client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback(
572         deployment,
573         namespace,
574         client.AppsV1beta1DeploymentRollback(name=deployment, rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to)))
575
576     # Read back the spec for the rolled-back deployment
577     spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
578     return spec.spec.template.spec.containers[0].image, spec.spec.replicas
579
580 def execute_command_in_deployment(deployment_description, command):
581     '''
582     Enumerates the pods in the k8s deployment identified by "deployment_description",
583     then executes the command (represented as an argv-style list) in "command" in
584     container 0 (the main application container) each of those pods.
585
586     Note that the sets of pods associated with a deployment can change over time.  The
587     enumeration is a snapshot at one point in time.  The command will not be executed in
588     pods that are created after the initial enumeration.   If a pod disappears after the
589     initial enumeration and before the command is executed, the attempt to execute the
590     command will fail.  This is not treated as a fatal error.
591
592     This approach is reasonable for the one current use case for "execute_command":  running a
593     script to notify a container that its configuration has changed as a result of a
594     policy change.  In this use case, the new configuration information is stored into
595     the configuration store (Consul), the pods are enumerated, and the command is executed.
596     If a pod disappears after the enumeration, the fact that the command cannot be run
597     doesn't matter--a nonexistent pod doesn't need to be reconfigured.  Similarly, a pod that
598     comes up after the enumeration will get its initial configuration from the updated version
599     in Consul.
600
601     The optimal solution here would be for k8s to provide an API call to execute a command in
602     all of the pods for a deployment.   Unfortunately, k8s does not provide such a call--the
603     only call provided by k8s operates at the pod level, not the deployment level.
604
605     Another interesting k8s factoid: there's no direct way to list the pods belong to a
606     particular k8s deployment.   The deployment code above sets a label ("k8sdeployment") on
607     the pod that has the k8s deployment name.  To list the pods, the code below queries for
608     pods with the label carrying the deployment name.
609     '''
610
611     _configure_api()
612     deployment = deployment_description["deployment"]
613     namespace = deployment_description["namespace"]
614
615     # Get names of all the running pods belonging to the deployment
616     pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod(
617         namespace = namespace,
618         label_selector = "k8sdeployment={0}".format(deployment),
619         field_selector = "status.phase=Running"
620     ).items]
621
622     def do_execute(pod_name):
623         return _execute_command_in_pod(namespace, pod_name, command)
624
625     # Execute command in the running pods
626     return map(do_execute, pod_names)