return hc.returncode
def check_readiness(k8s, verbosity):
- k8s_controllers, not_ready_controllers = get_k8s_controllers(k8s)
-
- # check pods only when it is explicitly wanted (judging readiness by deployment status)
- if verbosity > 1:
- pods = k8s.get_resources('api/v1', 'pods')
- unready_pods = chain.from_iterable(
- get_names(not_ready_pods(
- pods_by_parent(pods, x)))
- for x in not_ready_controllers)
- else:
- unready_pods = []
+ k8s_controllers, not_ready_controllers = get_k8s_controllers(k8s)
+
+ # check pods only when it is explicitly wanted (judging readiness by deployment status)
+ if verbosity > 1:
+ pods = k8s.get_resources('api/v1', 'pods')
+ unready_pods = chain.from_iterable(
+ get_names(not_ready_pods(
+ pods_by_parent(pods, x)))
+ for x in not_ready_controllers)
+ else:
+ unready_pods = []
- print_status(verbosity, k8s_controllers, unready_pods)
- return not not_ready_controllers
+ print_status(verbosity, k8s_controllers, unready_pods)
+ return not not_ready_controllers
def check_in_loop(k8s, max_time, sleep_time, verbosity):
max_end_time = datetime.datetime.now() + datetime.timedelta(minutes=max_time)
req = requests.get(url, verify=False)
else:
req = requests.get(url, verify=self.crt_tmp_file.name, cert=self.crt_tmp_file.name)
- except requests.exceptions.ConnectionError as err:
+ except requests.exceptions.ConnectionError:
sys.exit('Error: Could not connect to {}'.format(self.url))
if req.status_code == 200:
json = req.json()
config['users'][0]['user']['client-certificate-data'])))
certs.update(dict(client_key=b64decode(
config['users'][0]['user']['client-key-data'])))
- except KeyError as err:
+ except KeyError:
print('Warning: could not get Kubernetes config for certificates. ' \
'Turning off SSL authentication.')
self.no_ssl_auth = True