1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
24 from kubernetes import config, client, stream
26 # Default values for readiness probe
27 PROBE_DEFAULT_PERIOD = 15
28 PROBE_DEFAULT_TIMEOUT = 1
30 # Location of k8s cluster config file ("kubeconfig")
31 K8S_CONFIG_PATH="/opt/onap/kube/kubeconfig"
33 # Regular expression for interval/timeout specification
34 INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$")
35 # Conversion factors to seconds
36 FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600}
38 # Regular expression for port mapping
39 # group 1: container port
40 # group 2: / + protocol
43 PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$")
45 def _create_deployment_name(component_name):
46 return "dep-{0}".format(component_name)
48 def _create_service_name(component_name):
49 return "{0}".format(component_name)
51 def _create_exposed_service_name(component_name):
52 return ("x{0}".format(component_name))[:63]
54 def _configure_api(location=None):
55 # Look for a kubernetes config file
56 if os.path.exists(K8S_CONFIG_PATH):
57 config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False)
59 # Maybe we're running in a k8s container and we can use info provided by k8s
60 # We would like to use:
61 # config.load_incluster_config()
62 # but this looks into os.environ for kubernetes host and port, and from
63 # the plugin those aren't visible. So we use the InClusterConfigLoader class,
64 # where we can set the environment to what we like.
65 # This is probably brittle! Maybe there's a better alternative.
67 config.incluster_config.SERVICE_HOST_ENV_NAME : "kubernetes.default.svc.cluster.local",
68 config.incluster_config.SERVICE_PORT_ENV_NAME : "443"
70 config.incluster_config.InClusterConfigLoader(
71 token_filename=config.incluster_config.SERVICE_TOKEN_FILENAME,
72 cert_filename=config.incluster_config.SERVICE_CERT_FILENAME,
76 def _parse_interval(t):
78 Parse an interval specification
80 - a simple integer quantity, interpreted as seconds
81 - a string representation of a decimal integer, interpreted as seconds
82 - a string consisting of a represention of an decimal integer followed by a unit,
83 with "s" representing seconds, "m" representing minutes,
84 and "h" representing hours
85 Used for compatibility with the Docker plugin, where time intervals
86 for health checks were specified as strings with a number and a unit.
87 See 'intervalspec' above for the regular expression that's accepted.
89 m = INTERVAL_SPEC.match(str(t))
91 time = int(m.group(1)) * FACTORS[m.group(2)]
93 raise ValueError("Bad interval specification: {0}".format(t))
96 def _create_probe(hc, port, use_tls=False):
97 ''' Create a Kubernetes probe based on info in the health check dictionary hc '''
98 probe_type = hc['type']
100 period = _parse_interval(hc.get('interval', PROBE_DEFAULT_PERIOD))
101 timeout = _parse_interval(hc.get('timeout', PROBE_DEFAULT_TIMEOUT))
102 if probe_type in ['http', 'https']:
103 probe = client.V1Probe(
104 failure_threshold = 1,
105 initial_delay_seconds = 5,
106 period_seconds = period,
107 timeout_seconds = timeout,
108 http_get = client.V1HTTPGetAction(
109 path = hc['endpoint'],
111 scheme = probe_type.upper()
114 elif probe_type in ['script', 'docker']:
115 probe = client.V1Probe(
116 failure_threshold = 1,
117 initial_delay_seconds = 5,
118 period_seconds = period,
119 timeout_seconds = timeout,
120 _exec = client.V1ExecAction(
121 command = hc['script'].split( )
126 def _create_resources(resources=None):
127 if resources is not None:
128 resources_obj = client.V1ResourceRequirements(
129 limits = resources.get("limits"),
130 requests = resources.get("requests")
136 def _create_container_object(name, image, always_pull, use_tls=False, env={}, container_ports=[], volume_mounts = [], resources = None, readiness = None, liveness = None):
137 # Set up environment variables
138 # Copy any passed in environment variables
139 env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env.keys()]
140 # Add POD_IP with the IP address of the pod running the container
141 pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP"))
142 env_vars.append(client.V1EnvVar(name="POD_IP",value_from=pod_ip))
144 # If a health check is specified, create a readiness/liveness probe
145 # (For an HTTP-based check, we assume it's at the first container port)
151 if len(container_ports) > 0:
152 (hc_port, proto) = container_ports[0]
153 probe = _create_probe(readiness, hc_port, use_tls)
156 if len(container_ports) > 0:
157 (hc_port, proto) = container_ports[0]
158 live_probe = _create_probe(liveness, hc_port, use_tls)
161 resources_obj = _create_resources(resources)
164 # Define container for pod
165 return client.V1Container(
168 image_pull_policy='Always' if always_pull else 'IfNotPresent',
170 ports=[client.V1ContainerPort(container_port=p, protocol=proto) for (p, proto) in container_ports],
171 volume_mounts = volume_mounts,
172 resources = resources_obj,
173 readiness_probe = probe,
174 liveness_probe = live_probe
177 def _create_deployment_object(component_name,
185 deployment_name = _create_deployment_name(component_name)
187 # Label the pod with the deployment name, so we can find it easily
188 labels.update({"k8sdeployment" : deployment_name})
190 # pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
191 # See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
193 for secret in pull_secrets:
194 ips.append(client.V1LocalObjectReference(name=secret))
196 # Define pod template
197 template = client.V1PodTemplateSpec(
198 metadata=client.V1ObjectMeta(labels=labels),
199 spec=client.V1PodSpec(hostname=component_name,
200 containers=containers,
201 init_containers=init_containers,
203 image_pull_secrets=ips)
206 # Define deployment spec
207 spec = client.ExtensionsV1beta1DeploymentSpec(
212 # Create deployment object
213 deployment = client.ExtensionsV1beta1Deployment(
215 metadata=client.V1ObjectMeta(name=deployment_name),
221 def _create_service_object(service_name, component_name, service_ports, annotations, labels, service_type):
222 service_spec = client.V1ServiceSpec(
224 selector={"app" : component_name},
228 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels, annotations=annotations)
230 metadata = client.V1ObjectMeta(name=_create_service_name(service_name), labels=labels)
232 service = client.V1Service(
240 def _parse_ports(port_list):
242 Parse the port list into a list of container ports (needed to create the container)
243 and to a set of port mappings to set up k8s services.
248 m = PORTS.match(p.strip())
250 cport = int(m.group(1))
251 hport = int (m.group(4))
253 proto = (m.group(3)).upper()
256 container_ports.append((cport, proto))
257 port_map[(cport, proto)] = hport
259 raise ValueError("Bad port specification: {0}".format(p))
261 return container_ports, port_map
263 def _parse_volumes(volume_list):
266 for v in volume_list:
267 vname = str(uuid.uuid4())
268 vhost = v['host']['path']
269 vcontainer = v['container']['bind']
270 vro = (v['container'].get('mode') == 'ro')
271 volumes.append(client.V1Volume(name=vname, host_path=client.V1HostPathVolumeSource(path=vhost)))
272 volume_mounts.append(client.V1VolumeMount(name=vname, mount_path=vcontainer, read_only=vro))
274 return volumes, volume_mounts
276 def _service_exists(location, namespace, component_name):
279 _configure_api(location)
280 client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace)
282 except client.rest.ApiException:
287 def _patch_deployment(location, namespace, deployment, modify):
289 Gets the current spec for 'deployment' in 'namespace'
290 in the k8s cluster at 'location',
291 uses the 'modify' function to change the spec,
292 then sends the updated spec to k8s.
294 _configure_api(location)
296 # Get deployment spec
297 spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
299 # Apply changes to spec
302 # Patch the deploy with updated spec
303 client.ExtensionsV1beta1Api().patch_namespaced_deployment(deployment, namespace, spec)
305 def _execute_command_in_pod(location, namespace, pod_name, command):
307 Execute the command (specified by an argv-style list in the "command" parameter) in
308 the specified pod in the specified namespace at the specified location.
309 For now at least, we use this only to
310 run a notification script in a pod after a configuration change.
312 The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec".
313 Unfortunately due to a bug/limitation in the Python client library, we can't call it directly.
314 We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API.
315 I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py.
316 There are several issues tracking this, in various states. It isn't clear that there will ever
318 - https://github.com/kubernetes-client/python/issues/58
319 - https://github.com/kubernetes-client/python/issues/409
320 - https://github.com/kubernetes-client/python/issues/526
322 The main consequence of the workaround using "stream" is that the caller does not get an indication
323 of the exit code returned by the command when it completes execution. It turns out that the
324 original implementation of notification in the Docker plugin did not use this result, so we can
325 still match the original notification functionality.
327 The "stream" approach returns a string containing any output sent by the command to stdout or stderr.
328 We'll return that so it can logged.
330 _configure_api(location)
332 output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec,
340 except client.rest.ApiException as e:
341 # If the exception indicates the pod wasn't found, it's not a fatal error.
342 # It existed when we enumerated the pods for the deployment but no longer exists.
343 # Unfortunately, the only way to distinguish a pod not found from any other error
344 # is by looking at the reason text.
345 # (The ApiException's "status" field should contain the HTTP status code, which would
346 # be 404 if the pod isn't found, but empirical testing reveals that "status" is set
348 if "404 not found" in e.reason.lower():
349 output = "Pod not found"
353 return {"pod" : pod_name, "output" : output}
355 def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, resources, **kwargs):
357 This will create a k8s Deployment and, if needed, one or two k8s Services.
358 (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
359 We're not exposing k8s to the component developer and the blueprint author.
360 This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide
361 the details from the component developer and the blueprint author.)
363 namespace: the Kubernetes namespace into which the component is deployed
364 component_name: the component name, used to derive names of Kubernetes entities
365 image: the docker image for the component being deployed
366 replica: the number of instances of the component to be deployed
367 always_pull: boolean flag, indicating that Kubernetes should always pull a new copy of
368 the Docker image for the component, even if it is already present on the Kubernetes node.
370 - image_pull_secrets: a list of names of image pull secrets that can be used for retrieving images.
371 (DON'T PANIC: these are just the names of secrets held in the Kubernetes secret store.)
372 - filebeat: a dictionary of filebeat sidecar parameters:
373 "log_path" : mount point for log volume in filebeat container
374 "data_path" : mount point for data volume in filebeat container
375 "config_path" : mount point for config volume in filebeat container
376 "config_subpath" : subpath for config data in filebeat container
377 "config_map" : ConfigMap holding the filebeat configuration
378 "image": Docker image to use for filebeat
379 - tls: a dictionary of TLS init container parameters:
380 "cert_path": mount point for certificate volume in init container
381 "image": Docker image to use for TLS init container
383 - volumes: array of volume objects, where a volume object is:
384 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
385 - ports: array of strings in the form "container_port:host_port"
386 - env: map of name-value pairs ( {name0: value0, name1: value1...}
387 - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
388 - log_info: an object with info for setting up ELK logging, with the form:
389 {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}
390 - tls_info: an object with info for setting up TLS (HTTPS), with the form:
391 {"use_tls": true, "cert_directory": "/path/to/container/cert/directory" }
392 - labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"}
393 These label will be set on all the pods deployed as a result of this deploy() invocation.
394 - readiness: dict with health check info; if present, used to create a readiness probe for the main container. Includes:
395 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
396 - interval: period (in seconds) between probes
397 - timeout: time (in seconds) to allow a probe to complete
398 - endpoint: the path portion of the URL that points to the readiness endpoint for "http" and "https" types
399 - path: the full path to the script to be executed in the container for "script" and "docker" types
400 - liveness: dict with health check info; if present, used to create a liveness probe for the main container. Includes:
401 - type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
402 - interval: period (in seconds) between probes
403 - timeout: time (in seconds) to allow a probe to complete
404 - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types
405 - path: the full path to the script to be executed in the container for "script" and "docker" types
406 - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
411 deployment_ok = False
412 cip_service_created = False
413 deployment_description = {
414 "namespace": namespace,
415 "location" : kwargs.get("k8s_location"),
423 _configure_api(kwargs.get("k8s_location"))
424 core = client.CoreV1Api()
425 ext = client.ExtensionsV1beta1Api()
427 # Parse the port mapping
428 container_ports, port_map = _parse_ports(kwargs.get("ports", []))
430 # Parse the volumes list into volumes and volume_mounts for the deployment
431 volumes, volume_mounts = _parse_volumes(kwargs.get("volumes",[]))
433 # Initialize the list of containers that will be part of the pod
437 # Set up the ELK logging sidecar container, if needed
438 log_info = kwargs.get("log_info")
439 if log_info and "log_directory" in log_info:
440 log_dir = log_info["log_directory"]
441 fb = k8sconfig["filebeat"]
442 sidecar_volume_mounts = []
444 # Create the volume for component log files and volume mounts for the component and sidecar containers
445 volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
446 volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
447 sc_path = log_info["alternate_fb_path"] if "alternate_fb_path" in log_info \
448 else "{0}/{1}".format(fb["log_path"], component_name)
449 sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
451 # Create the volume for sidecar data and the volume mount for it
452 volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
453 sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=fb["data_path"]))
455 # Create the container for the sidecar
456 containers.append(_create_container_object("filebeat", fb["image"], False, False, {}, [], sidecar_volume_mounts))
458 # Create the volume for the sidecar configuration data and the volume mount for it
459 # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
461 client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=fb["config_map"])))
462 sidecar_volume_mounts.append(
463 client.V1VolumeMount(name="filebeat-conf", mount_path=fb["config_path"], sub_path=fb["config_subpath"]))
465 # Set up the TLS init container, if needed
466 tls_info = kwargs.get("tls_info")
468 if tls_info and "use_tls" in tls_info and tls_info["use_tls"]:
469 if "cert_directory" in tls_info and len(tls_info["cert_directory"]) > 0:
471 tls_config = k8sconfig["tls"]
473 # Create the certificate volume and volume mounts
474 volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource()))
475 volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=tls_info["cert_directory"]))
476 init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])]
478 # Create the init container
479 init_containers.append(_create_container_object("init-tls", tls_config["image"], False, False, {}, [], init_volume_mounts))
481 # Create the container for the component
482 # Make it the first container in the pod
483 containers.insert(0, _create_container_object(component_name, image, always_pull, use_tls, kwargs.get("env", {}), container_ports, volume_mounts, resources, kwargs["readiness"], kwargs.get("liveness")))
485 # Build the k8s Deployment object
486 labels = kwargs.get("labels", {})
487 labels.update({"app": component_name})
488 dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"])
491 ext.create_namespaced_deployment(namespace, dep)
493 deployment_description["deployment"] = _create_deployment_name(component_name)
495 # Create service(s), if a port mapping is specified
497 service_ports = [] # Ports exposed internally on the k8s network
498 exposed_ports = [] # Ports to be mapped to ports on the k8s nodes via NodePort
499 for (cport, proto), hport in port_map.iteritems():
500 service_ports.append(client.V1ServicePort(port=int(cport),protocol=proto,name="port-{0}-{1}".format(proto[0].lower(), cport)))
502 exposed_ports.append(client.V1ServicePort(port=int(cport),protocol=proto,node_port=int(hport),name="xport-{0}-{1}".format(proto[0].lower(),cport)))
504 # If there are ports to be exposed via MSB, set up the annotation for the service
505 msb_list = kwargs.get("msb_list")
506 annotations = msb.create_msb_annotation(msb_list) if msb_list else ''
508 # Create a ClusterIP service for access via the k8s network
509 service = _create_service_object(_create_service_name(component_name), component_name, service_ports, annotations, labels, "ClusterIP")
510 core.create_namespaced_service(namespace, service)
511 cip_service_created = True
512 deployment_description["services"].append(_create_service_name(component_name))
514 # If there are ports to be exposed on the k8s nodes, create a "NodePort" service
515 if len(exposed_ports) > 0:
517 _create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, '', labels, "NodePort")
518 core.create_namespaced_service(namespace, exposed_service)
519 deployment_description["services"].append(_create_exposed_service_name(component_name))
521 except Exception as e:
522 # If the ClusterIP service was created, delete the service:
523 if cip_service_created:
524 core.delete_namespaced_service(_create_service_name(component_name), namespace)
525 # If the deployment was created but not the service, delete the deployment
527 client.ExtensionsV1beta1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, client.V1DeleteOptions())
530 return dep, deployment_description
532 def undeploy(deployment_description):
533 _configure_api(deployment_description["location"])
535 namespace = deployment_description["namespace"]
537 # remove any services associated with the component
538 for service in deployment_description["services"]:
539 client.CoreV1Api().delete_namespaced_service(service, namespace)
541 # Have k8s delete the underlying pods and replicaset when deleting the deployment.
542 options = client.V1DeleteOptions(propagation_policy="Foreground")
543 client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, options)
545 def is_available(location, namespace, component_name):
546 _configure_api(location)
547 dep_status = client.AppsV1beta1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace)
548 # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec
549 # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation
550 return dep_status.status.available_replicas == dep_status.spec.replicas and dep_status.status.updated_replicas == dep_status.spec.replicas
552 def scale(deployment_description, replicas):
553 ''' Trigger a scaling operation by updating the replica count for the Deployment '''
555 def update_replica_count(spec):
556 spec.spec.replicas = replicas
559 _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_replica_count)
561 def upgrade(deployment_description, image, container_index = 0):
562 ''' Trigger a rolling upgrade by sending a new image name/tag to k8s '''
564 def update_image(spec):
565 spec.spec.template.spec.containers[container_index].image = image
568 _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_image)
570 def rollback(deployment_description, rollback_to=0):
572 Undo upgrade by rolling back to a previous revision of the deployment.
573 By default, go back one revision.
574 rollback_to can be used to supply a specific revision number.
575 Returns the image for the app container and the replica count from the rolled-back deployment
579 Currently this does not work due to a bug in the create_namespaced_deployment_rollback() method.
580 The k8s python client code throws an exception while processing the response from the API.
582 - https://github.com/kubernetes-client/python/issues/491
583 - https://github.com/kubernetes/kubernetes/pull/63837
584 The fix has been merged into the master branch but is not in the latest release.
586 _configure_api(deployment_description["location"])
587 deployment = deployment_description["deployment"]
588 namespace = deployment_description["namespace"]
590 # Initiate the rollback
591 client.ExtensionsV1beta1Api().create_namespaced_deployment_rollback(
594 client.AppsV1beta1DeploymentRollback(name=deployment, rollback_to=client.AppsV1beta1RollbackConfig(revision=rollback_to)))
596 # Read back the spec for the rolled-back deployment
597 spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
598 return spec.spec.template.spec.containers[0].image, spec.spec.replicas
600 def execute_command_in_deployment(deployment_description, command):
602 Enumerates the pods in the k8s deployment identified by "deployment_description",
603 then executes the command (represented as an argv-style list) in "command" in
604 container 0 (the main application container) each of those pods.
606 Note that the sets of pods associated with a deployment can change over time. The
607 enumeration is a snapshot at one point in time. The command will not be executed in
608 pods that are created after the initial enumeration. If a pod disappears after the
609 initial enumeration and before the command is executed, the attempt to execute the
610 command will fail. This is not treated as a fatal error.
612 This approach is reasonable for the one current use case for "execute_command": running a
613 script to notify a container that its configuration has changed as a result of a
614 policy change. In this use case, the new configuration information is stored into
615 the configuration store (Consul), the pods are enumerated, and the command is executed.
616 If a pod disappears after the enumeration, the fact that the command cannot be run
617 doesn't matter--a nonexistent pod doesn't need to be reconfigured. Similarly, a pod that
618 comes up after the enumeration will get its initial configuration from the updated version
621 The optimal solution here would be for k8s to provide an API call to execute a command in
622 all of the pods for a deployment. Unfortunately, k8s does not provide such a call--the
623 only call provided by k8s operates at the pod level, not the deployment level.
625 Another interesting k8s factoid: there's no direct way to list the pods belong to a
626 particular k8s deployment. The deployment code above sets a label ("k8sdeployment") on
627 the pod that has the k8s deployment name. To list the pods, the code below queries for
628 pods with the label carrying the deployment name.
630 location = deployment_description["location"]
631 _configure_api(location)
632 deployment = deployment_description["deployment"]
633 namespace = deployment_description["namespace"]
635 # Get names of all the running pods belonging to the deployment
636 pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod(
637 namespace = namespace,
638 label_selector = "k8sdeployment={0}".format(deployment),
639 field_selector = "status.phase=Running"
642 def do_execute(pod_name):
643 return _execute_command_in_pod(location, namespace, pod_name, command)
645 # Execute command in the running pods
646 return map(do_execute, pod_names)