2 # COPYRIGHT NOTICE STARTS HERE
4 # Copyright 2020 Orange, Ltd.
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
18 # COPYRIGHT NOTICE ENDS HERE
20 # Check all the kubernetes pods, evaluate the certificate and build a
21 # certificate dashboard.
24 # See requirements.txt
25 # The dashboard is based on bulma framework
28 # This script should be run on the local machine which has network access to
29 # the onap K8S cluster.
30 # It requires k8s cluster config file on local machine
31 # It requires also the ONAP IP provided through an env variable ONAP_IP
32 # ONAP_NAMESPACE env variable is also considered
33 # if not set we set it to onap
35 # python check_certificates_validity.py
36 # the summary html page will be generated where the script is launched
38 Check ONAP certificates
46 from datetime import datetime
47 from kubernetes import client, config
48 from jinja2 import Environment, FileSystemLoader, select_autoescape
49 from socket import * # pylint: disable=W0614
57 LOGGER = logging.getLogger("Gating-Index")
58 LOGGER.setLevel(LOG_LEVEL)
59 CERT_MODES = ['nodeport', 'ingress', 'internal']
61 EXP_CRITERIA_MAX = 389
62 EXPECTED_CERT_STRING = "C=US;O=ONAP;OU=OSAAF;CN=intermediateCA_9"
63 EXPECTED_STRIMZI_CA_CERT_STRING = "O=io.strimzi;CN=cluster-ca v0"
68 parser = argparse.ArgumentParser()
73 help='Mode (nodeport, ingress, internal). If not set all modes are tried',
78 help='ONAP IP needed (for nodeport mode)',
79 default=os.environ.get('ONAP_IP'))
83 help='ONAP namespace',
88 help='Result directory',
91 args = parser.parse_args()
93 # Get the ONAP namespace
94 onap_namespace = args.namespace
95 LOGGER.info("Verification of the %s certificates started", onap_namespace)
97 # Create the target dir (in case it does not exist)
98 if os.pardir not in args.dir:
99 os.makedirs(args.dir, exist_ok=True)
101 # Nodeport specific section
102 # Retrieve the kubernetes IP for mode nodeport
103 if args.mode == "nodeport":
106 "In nodeport mode, the IP of the ONAP cluster is required." +
107 "The value can be set using -i option " +
108 "or retrieved from the ONAP_IP env variable")
109 exit(parser.print_usage())
111 nodeports_xfail_list = []
112 with open('nodeports_xfail.txt', 'r') as f:
113 first_line = f.readline()
115 nodeports_xfail_list.append(
116 line.split(" ", 1)[0].strip('\n'))
117 LOGGER.info("nodeports xfail list: %s",
118 nodeports_xfail_list)
120 LOGGER.error("Please set the environment variable ONAP_IP")
122 except FileNotFoundError:
123 LOGGER.warning("Nodeport xfail list not found")
126 # retrieve the candidate ports first
127 if args.mode == "internal":
128 k8s_config = config.load_incluster_config()
130 k8s_config = config.load_kube_config()
132 core = client.CoreV1Api()
133 api_instance = client.NetworkingV1Api(
134 client.ApiClient(k8s_config))
135 k8s_services = core.list_namespaced_service(onap_namespace).items
136 k8s_ingress = api_instance.list_namespaced_ingress(onap_namespace).items
139 def get_certifificate_info(host, port):
140 LOGGER.debug("Host: %s", host)
141 LOGGER.debug("Port: %s", port)
142 cert = ssl.get_server_certificate(
144 LOGGER.debug("get certificate")
145 x509 = OpenSSL.crypto.load_certificate(
146 OpenSSL.crypto.FILETYPE_PEM, cert)
148 LOGGER.debug("get certificate")
149 exp_date = datetime.strptime(
150 x509.get_notAfter().decode('ascii'), '%Y%m%d%H%M%SZ')
151 LOGGER.debug("Expiration date retrieved %s", exp_date)
152 issuer = x509.get_issuer().get_components()
155 # format issuer nicely
156 for issuer_info_key, issuer_info_val in issuer:
157 issuer_info += (issuer_info_key.decode('utf-8') + "=" +
158 issuer_info_val.decode('utf-8') + ";")
159 cert_validity = False
160 if issuer_info[:-1] in [EXPECTED_CERT_STRING, EXPECTED_STRIMZI_CA_CERT_STRING]:
163 return {'expiration_date': exp_date,
164 'issuer': issuer_info[:-1],
165 'validity': cert_validity}
168 def test_services(k8s_services, mode):
169 success_criteria = True # success criteria per scan
170 # looks for the certificates
172 node_ports_ssl_error_list = []
173 node_ports_connection_error_list = []
174 node_ports_type_error_list = []
175 node_ports_reset_error_list = []
177 # for node ports and internal we consider the services
178 # for the ingress we consider the ingress
179 for service in k8s_services:
181 for port in service.spec.ports:
182 # For nodeport mode, we consider
183 # - the IP of the cluster
184 # - spec.port.node_port
186 # For internal mode, we consider
187 # - spec.selector.app
189 test_name = service.metadata.name
191 error_waiver = False # waiver per port
192 if mode == 'nodeport':
194 test_port = port.node_port
196 # Retrieve the nodeport xfail list
197 # to consider SECCOM waiver if needed
198 if test_port in nodeports_xfail_list:
200 else: # internal mode
201 test_port = port.port
203 # in Internal mode there are 2 types
205 # app.kubernetes.io/name
207 test_url = service.spec.selector['app']
209 test_url = service.spec.selector['app.kubernetes.io/name']
211 if test_port is not None:
213 "Look for certificate %s (%s:%s)",
217 cert_info = get_certifificate_info(test_url, test_port)
218 exp_date = cert_info['expiration_date']
219 LOGGER.info("Expiration date retrieved %s", exp_date)
220 # calculate the remaining time
221 delta_time = (exp_date - datetime.now()).days
225 LOGGER.info("Port found in the xfail list," +
226 "do not consider it for success criteria")
228 if (delta_time < EXP_CRITERIA_MIN or
229 delta_time > EXP_CRITERIA_MAX):
230 success_criteria = False
231 if cert_info['validity'] is False:
232 success_criteria = False
233 # add certificate to the list
234 node_ports_list.append(
235 {'pod_name': test_name,
236 'pod_port': test_port,
237 'expiration_date': str(exp_date),
238 'remaining_days': delta_time,
239 'cluster_ip': service.spec.cluster_ip,
240 'issuer': cert_info['issuer'],
241 'validity': cert_info['validity']})
243 LOGGER.debug("Port value retrieved as None")
244 except ssl.SSLError as e:
245 LOGGER.exception("Bad certificate for port %s" % port)
246 node_ports_ssl_error_list.append(
247 {'pod_name': test_name,
248 'pod_port': test_port,
249 'error_details': str(e)})
250 except ConnectionRefusedError as e:
251 LOGGER.exception("ConnectionrefusedError for port %s" % port)
252 node_ports_connection_error_list.append(
253 {'pod_name': test_name,
254 'pod_port': test_port,
255 'error_details': str(e)})
256 except TypeError as e:
257 LOGGER.exception("Type Error for port %s" % port)
258 node_ports_type_error_list.append(
259 {'pod_name': test_name,
260 'pod_port': test_port,
261 'error_details': str(e)})
262 except ConnectionResetError as e:
263 LOGGER.exception("ConnectionResetError for port %s" % port)
264 node_ports_reset_error_list.append(
265 {'pod_name': test_name,
266 'pod_port': test_port,
267 'error_details': str(e)})
269 LOGGER.error("Unknown error")
271 # Create html summary
272 jinja_env = Environment(
273 autoescape=select_autoescape(['html']),
274 loader=FileSystemLoader('./templates'))
275 if args.mode == 'nodeport':
276 jinja_env.get_template('cert-nodeports.html.j2').stream(
277 node_ports_list=node_ports_list,
278 node_ports_ssl_error_list=node_ports_ssl_error_list,
279 node_ports_connection_error_list=node_ports_connection_error_list,
280 node_ports_type_error_list=node_ports_type_error_list,
281 node_ports_reset_error_list=node_ports_reset_error_list).dump(
282 '{}/certificates.html'.format(args.dir))
284 jinja_env.get_template('cert-internal.html.j2').stream(
285 node_ports_list=node_ports_list,
286 node_ports_ssl_error_list=node_ports_ssl_error_list,
287 node_ports_connection_error_list=node_ports_connection_error_list,
288 node_ports_type_error_list=node_ports_type_error_list,
289 node_ports_reset_error_list=node_ports_reset_error_list).dump(
290 '{}/certificates.html'.format(args.dir))
292 return success_criteria
295 def test_ingress(k8s_ingress, mode):
296 LOGGER.debug('Test %s mode', mode)
297 for ingress in k8s_ingress:
298 LOGGER.debug(ingress)
302 # ***************************************************************************
303 # ***************************************************************************
305 # ***************************************************************************
306 # ***************************************************************************
308 if args.mode == "ingress":
309 test_routine = test_ingress
310 test_param = k8s_ingress
312 test_routine = test_services
313 test_param = k8s_services
315 LOGGER.info(">>>> Test certificates: mode = %s", args.mode)
316 if test_routine(test_param, args.mode):
317 LOGGER.warning(">>>> Test PASS")
319 LOGGER.warning(">>>> Test FAIL")
323 LOGGER.info(">>>> Test Check certificates PASS")
325 LOGGER.error(">>>> Test Check certificates FAIL")