2 # COPYRIGHT NOTICE STARTS HERE
4 # Copyright 2020 Orange, Ltd.
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
18 # COPYRIGHT NOTICE ENDS HERE
20 # Check all the kubernetes pods, evaluate the certificate and build a
21 # certificate dashboard.
24 # See requirements.txt
25 # The dashboard is based on bulma framework
28 # This script should be run on the local machine which has network access to
29 # the onap K8S cluster.
30 # It requires k8s cluster config file on local machine
31 # It requires also the ONAP IP provided through an env variable ONAP_IP
32 # ONAP_NAMESPACE env variable is also considered
33 # if not set we set it to onap
35 # python check_certificates_validity.py
36 # the summary html page will be generated where the script is launched
38 Check ONAP certificates
46 from datetime import datetime
47 from kubernetes import client, config
48 from jinja2 import Environment, FileSystemLoader, select_autoescape
49 from socket import * # pylint: disable=W0614
57 LOGGER = logging.getLogger("Gating-Index")
58 LOGGER.setLevel(LOG_LEVEL)
59 CERT_MODES = ['nodeport', 'ingress', 'internal']
61 EXP_CRITERIA_MAX = 389
62 EXPECTED_CERT_STRING = "C=US;O=ONAP;OU=OSAAF;CN=intermediateCA_9"
67 parser = argparse.ArgumentParser()
72 help='Mode (nodeport, ingress, internal). If not set all modes are tried',
77 help='ONAP IP needed (for nodeport mode)',
78 default=os.environ.get('ONAP_IP'))
82 help='ONAP namespace',
87 help='Result directory',
90 args = parser.parse_args()
92 # Get the ONAP namespace
93 onap_namespace = args.namespace
94 LOGGER.info("Verification of the %s certificates started", onap_namespace)
96 # Create the target dir (in case it does not exist)
97 if os.pardir not in args.dir:
98 os.makedirs(args.dir, exist_ok=True)
100 # Nodeport specific section
101 # Retrieve the kubernetes IP for mode nodeport
102 if args.mode == "nodeport":
105 "In nodeport mode, the IP of the ONAP cluster is required." +
106 "The value can be set using -i option " +
107 "or retrieved from the ONAP_IP env variable")
108 exit(parser.print_usage())
110 nodeports_xfail_list = []
111 with open('nodeports_xfail.txt', 'r') as f:
112 first_line = f.readline()
114 nodeports_xfail_list.append(
115 line.split(" ", 1)[0].strip('\n'))
116 LOGGER.info("nodeports xfail list: %s",
117 nodeports_xfail_list)
119 LOGGER.error("Please set the environment variable ONAP_IP")
121 except FileNotFoundError:
122 LOGGER.warning("Nodeport xfail list not found")
125 # retrieve the candidate ports first
126 if args.mode == "internal":
127 k8s_config = config.load_incluster_config()
129 k8s_config = config.load_kube_config()
131 core = client.CoreV1Api()
132 api_instance = client.ExtensionsV1beta1Api(
133 client.ApiClient(k8s_config))
134 k8s_services = core.list_namespaced_service(onap_namespace).items
135 k8s_ingress = api_instance.list_namespaced_ingress(onap_namespace).items
138 def get_certifificate_info(host, port):
139 LOGGER.debug("Host: %s", host)
140 LOGGER.debug("Port: %s", port)
141 cert = ssl.get_server_certificate(
143 LOGGER.debug("get certificate")
144 x509 = OpenSSL.crypto.load_certificate(
145 OpenSSL.crypto.FILETYPE_PEM, cert)
147 LOGGER.debug("get certificate")
148 exp_date = datetime.strptime(
149 x509.get_notAfter().decode('ascii'), '%Y%m%d%H%M%SZ')
150 LOGGER.debug("Expiration date retrieved %s", exp_date)
151 issuer = x509.get_issuer().get_components()
154 # format issuer nicely
155 for issuer_info_key, issuer_info_val in issuer:
156 issuer_info += (issuer_info_key.decode('utf-8') + "=" +
157 issuer_info_val.decode('utf-8') + ";")
158 cert_validity = False
159 if issuer_info[:-1] == EXPECTED_CERT_STRING:
162 return {'expiration_date': exp_date,
163 'issuer': issuer_info[:-1],
164 'validity': cert_validity}
167 def test_services(k8s_services, mode):
168 success_criteria = True # success criteria per scan
169 # looks for the certificates
171 node_ports_ssl_error_list = []
172 node_ports_connection_error_list = []
173 node_ports_type_error_list = []
174 node_ports_reset_error_list = []
176 # for node ports and internal we consider the services
177 # for the ingress we consider the ingress
178 for service in k8s_services:
180 for port in service.spec.ports:
181 # For nodeport mode, we consider
182 # - the IP of the cluster
183 # - spec.port.node_port
185 # For internal mode, we consider
186 # - spec.selector.app
188 test_name = service.metadata.name
190 error_waiver = False # waiver per port
191 if mode == 'nodeport':
193 test_port = port.node_port
195 # Retrieve the nodeport xfail list
196 # to consider SECCOM waiver if needed
197 if test_port in nodeports_xfail_list:
199 else: # internal mode
200 test_port = port.port
202 # in Internal mode there are 2 types
204 # app.kubernetes.io/name
206 test_url = service.spec.selector['app']
208 test_url = service.spec.selector['app.kubernetes.io/name']
210 if test_port is not None:
212 "Look for certificate %s (%s:%s)",
216 cert_info = get_certifificate_info(test_url, test_port)
217 exp_date = cert_info['expiration_date']
218 LOGGER.info("Expiration date retrieved %s", exp_date)
219 # calculate the remaining time
220 delta_time = (exp_date - datetime.now()).days
224 LOGGER.info("Port found in the xfail list," +
225 "do not consider it for success criteria")
227 if (delta_time < EXP_CRITERIA_MIN or
228 delta_time > EXP_CRITERIA_MAX):
229 success_criteria = False
230 if cert_info['validity'] is False:
231 success_criteria = False
232 # add certificate to the list
233 node_ports_list.append(
234 {'pod_name': test_name,
235 'pod_port': test_port,
236 'expiration_date': str(exp_date),
237 'remaining_days': delta_time,
238 'cluster_ip': service.spec.cluster_ip,
239 'issuer': cert_info['issuer'],
240 'validity': cert_info['validity']})
242 LOGGER.debug("Port value retrieved as None")
243 except ssl.SSLError as e:
244 LOGGER.exception("Bad certificate for port %s" % port)
245 node_ports_ssl_error_list.append(
246 {'pod_name': test_name,
247 'pod_port': test_port,
248 'error_details': str(e)})
249 except ConnectionRefusedError as e:
250 LOGGER.exception("ConnectionrefusedError for port %s" % port)
251 node_ports_connection_error_list.append(
252 {'pod_name': test_name,
253 'pod_port': test_port,
254 'error_details': str(e)})
255 except TypeError as e:
256 LOGGER.exception("Type Error for port %s" % port)
257 node_ports_type_error_list.append(
258 {'pod_name': test_name,
259 'pod_port': test_port,
260 'error_details': str(e)})
261 except ConnectionResetError as e:
262 LOGGER.exception("ConnectionResetError for port %s" % port)
263 node_ports_reset_error_list.append(
264 {'pod_name': test_name,
265 'pod_port': test_port,
266 'error_details': str(e)})
268 LOGGER.error("Unknown error")
270 # Create html summary
271 jinja_env = Environment(
272 autoescape=select_autoescape(['html']),
273 loader=FileSystemLoader('./templates'))
274 if args.mode == 'nodeport':
275 jinja_env.get_template('cert-nodeports.html.j2').stream(
276 node_ports_list=node_ports_list,
277 node_ports_ssl_error_list=node_ports_ssl_error_list,
278 node_ports_connection_error_list=node_ports_connection_error_list,
279 node_ports_type_error_list=node_ports_type_error_list,
280 node_ports_reset_error_list=node_ports_reset_error_list).dump(
281 '{}/certificates.html'.format(args.dir))
283 jinja_env.get_template('cert-internal.html.j2').stream(
284 node_ports_list=node_ports_list,
285 node_ports_ssl_error_list=node_ports_ssl_error_list,
286 node_ports_connection_error_list=node_ports_connection_error_list,
287 node_ports_type_error_list=node_ports_type_error_list,
288 node_ports_reset_error_list=node_ports_reset_error_list).dump(
289 '{}/certificates.html'.format(args.dir))
291 return success_criteria
294 def test_ingress(k8s_ingress, mode):
295 LOGGER.debug('Test %s mode', mode)
296 for ingress in k8s_ingress:
297 LOGGER.debug(ingress)
301 # ***************************************************************************
302 # ***************************************************************************
304 # ***************************************************************************
305 # ***************************************************************************
307 if args.mode == "ingress":
308 test_routine = test_ingress
309 test_param = k8s_ingress
311 test_routine = test_services
312 test_param = k8s_services
314 LOGGER.info(">>>> Test certificates: mode = %s", args.mode)
315 if test_routine(test_param, args.mode):
316 LOGGER.warning(">>>> Test PASS")
318 LOGGER.warning(">>>> Test FAIL")
322 LOGGER.info(">>>> Test Check certificates PASS")
324 LOGGER.error(">>>> Test Check certificates FAIL")