[STRIMZI_CA] Add strimzi ca to cert check
[integration.git] / test / security / check_certificates / check_certificates / check_certificates_validity.py
1 #!/usr/bin/env python3
2 #   COPYRIGHT NOTICE STARTS HERE
3 #
4 #   Copyright 2020 Orange, Ltd.
5 #
6 #   Licensed under the Apache License, Version 2.0 (the "License");
7 #   you may not use this file except in compliance with the License.
8 #   You may obtain a copy of the License at
9 #
10 #       http://www.apache.org/licenses/LICENSE-2.0
11 #
12 #   Unless required by applicable law or agreed to in writing, software
13 #   distributed under the License is distributed on an "AS IS" BASIS,
14 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 #   See the License for the specific language governing permissions and
16 #   limitations under the License.
17 #
18 #   COPYRIGHT NOTICE ENDS HERE
19
20 # Check all the kubernetes pods, evaluate the certificate and build a
21 # certificate dashboard.
22 #
23 # Dependencies:
24 #     See requirements.txt
25 #     The dashboard is based on bulma framework
26 #
27 # Environment:
28 #   This script should be run on the local machine which has network access to
29 # the onap K8S cluster.
30 #   It requires k8s cluster config file on local machine
31 #   It requires also the ONAP IP provided through an env variable ONAP_IP
32 #   ONAP_NAMESPACE env variable is also considered
33 # if not set we set it to onap
34 # Example usage:
35 #       python check_certificates_validity.py
36 # the summary html page will be generated where the script is launched
37 """
38 Check ONAP certificates
39 """
40 import argparse
41 import logging
42 import os
43 import ssl
44 import sys
45 import OpenSSL
46 from datetime import datetime
47 from kubernetes import client, config
48 from jinja2 import Environment, FileSystemLoader, select_autoescape
49 from socket import *  # pylint: disable=W0614
50
51 # Set SSL timeout
52 setdefaulttimeout(10)
53
54 # Logger
55 LOG_LEVEL = 'INFO'
56 logging.basicConfig()
57 LOGGER = logging.getLogger("Gating-Index")
58 LOGGER.setLevel(LOG_LEVEL)
59 CERT_MODES = ['nodeport', 'ingress', 'internal']
60 EXP_CRITERIA_MIN = 30
61 EXP_CRITERIA_MAX = 389
62 EXPECTED_CERT_STRING = "C=US;O=ONAP;OU=OSAAF;CN=intermediateCA_9"
63 EXPECTED_STRIMZI_CA_CERT_STRING = "O=io.strimzi;CN=cluster-ca v0"
64 RESULT_PATH = "."
65
66
67 # Get arguments
68 parser = argparse.ArgumentParser()
69 parser.add_argument(
70     '-m',
71     '--mode',
72     choices=CERT_MODES,
73     help='Mode (nodeport, ingress, internal). If not set all modes are tried',
74     default='nodeport')
75 parser.add_argument(
76     '-i',
77     '--ip',
78     help='ONAP IP needed (for nodeport mode)',
79     default=os.environ.get('ONAP_IP'))
80 parser.add_argument(
81     '-n',
82     '--namespace',
83     help='ONAP namespace',
84     default='onap')
85 parser.add_argument(
86     '-d',
87     '--dir',
88     help='Result directory',
89     default=RESULT_PATH)
90
91 args = parser.parse_args()
92
93 # Get the ONAP namespace
94 onap_namespace = args.namespace
95 LOGGER.info("Verification of the %s certificates started", onap_namespace)
96
97 # Create the target dir (in case it does not exist)
98 if os.pardir not in args.dir:
99     os.makedirs(args.dir, exist_ok=True)
100
101 # Nodeport specific section
102 # Retrieve the kubernetes IP for mode nodeport
103 if args.mode == "nodeport":
104     if args.ip is None:
105         LOGGER.error(
106             "In nodeport mode, the IP of the ONAP cluster is required." +
107             "The value can be set using -i option " +
108             "or retrieved from the ONAP_IP env variable")
109         exit(parser.print_usage())
110     try:
111         nodeports_xfail_list = []
112         with open('nodeports_xfail.txt', 'r') as f:
113             first_line = f.readline()
114             for line in f:
115                 nodeports_xfail_list.append(
116                     line.split(" ", 1)[0].strip('\n'))
117                 LOGGER.info("nodeports xfail list: %s",
118                             nodeports_xfail_list)
119     except KeyError:
120         LOGGER.error("Please set the environment variable ONAP_IP")
121         sys.exit(1)
122     except FileNotFoundError:
123         LOGGER.warning("Nodeport xfail list not found")
124
125 # Kubernetes section
126 # retrieve the candidate ports first
127 if args.mode == "internal":
128     k8s_config = config.load_incluster_config()
129 else:
130     k8s_config = config.load_kube_config()
131
132 core = client.CoreV1Api()
133 api_instance = client.NetworkingV1Api(
134     client.ApiClient(k8s_config))
135 k8s_services = core.list_namespaced_service(onap_namespace).items
136 k8s_ingress = api_instance.list_namespaced_ingress(onap_namespace).items
137
138
139 def get_certifificate_info(host, port):
140     LOGGER.debug("Host: %s", host)
141     LOGGER.debug("Port: %s", port)
142     cert = ssl.get_server_certificate(
143         (host, port))
144     LOGGER.debug("get certificate")
145     x509 = OpenSSL.crypto.load_certificate(
146         OpenSSL.crypto.FILETYPE_PEM, cert)
147
148     LOGGER.debug("get certificate")
149     exp_date = datetime.strptime(
150         x509.get_notAfter().decode('ascii'), '%Y%m%d%H%M%SZ')
151     LOGGER.debug("Expiration date retrieved %s", exp_date)
152     issuer = x509.get_issuer().get_components()
153
154     issuer_info = ''
155     # format issuer nicely
156     for issuer_info_key, issuer_info_val in issuer:
157         issuer_info += (issuer_info_key.decode('utf-8') + "=" +
158                         issuer_info_val.decode('utf-8') + ";")
159     cert_validity = False
160     if issuer_info[:-1] in [EXPECTED_CERT_STRING, EXPECTED_STRIMZI_CA_CERT_STRING]:
161         cert_validity = True
162
163     return {'expiration_date': exp_date,
164             'issuer': issuer_info[:-1],
165             'validity': cert_validity}
166
167
168 def test_services(k8s_services, mode):
169     success_criteria = True  # success criteria per scan
170     # looks for the certificates
171     node_ports_list = []
172     node_ports_ssl_error_list = []
173     node_ports_connection_error_list = []
174     node_ports_type_error_list = []
175     node_ports_reset_error_list = []
176
177     # for node ports and internal we consider the services
178     # for the ingress we consider the ingress
179     for service in k8s_services:
180         try:
181             for port in service.spec.ports:
182                 # For nodeport mode, we consider
183                 # - the IP of the cluster
184                 # - spec.port.node_port
185                 #
186                 # For internal mode, we consider
187                 # - spec.selector.app
188                 # - spec.port.port
189                 test_name = service.metadata.name
190                 test_port = None
191                 error_waiver = False  # waiver per port
192                 if mode == 'nodeport':
193                     test_url = args.ip
194                     test_port = port.node_port
195
196                     # Retrieve the nodeport xfail list
197                     # to consider SECCOM waiver if needed
198                     if test_port in nodeports_xfail_list:
199                         error_waiver = True
200                 else:  # internal mode
201                     test_port = port.port
202                     test_url = ''
203                     # in Internal mode there are 2 types
204                     # app
205                     # app.kubernetes.io/name
206                     try:
207                         test_url = service.spec.selector['app']
208                     except KeyError:
209                         test_url = service.spec.selector['app.kubernetes.io/name']
210
211                 if test_port is not None:
212                     LOGGER.info(
213                         "Look for certificate %s (%s:%s)",
214                         test_name,
215                         test_url,
216                         test_port)
217                     cert_info = get_certifificate_info(test_url, test_port)
218                     exp_date = cert_info['expiration_date']
219                     LOGGER.info("Expiration date retrieved %s", exp_date)
220                     # calculate the remaining time
221                     delta_time = (exp_date - datetime.now()).days
222
223                     # Test criteria
224                     if error_waiver:
225                         LOGGER.info("Port found in the xfail list," +
226                                     "do not consider it for success criteria")
227                     else:
228                         if (delta_time < EXP_CRITERIA_MIN or
229                                 delta_time > EXP_CRITERIA_MAX):
230                             success_criteria = False
231                         if cert_info['validity'] is False:
232                             success_criteria = False
233                     # add certificate to the list
234                     node_ports_list.append(
235                         {'pod_name': test_name,
236                          'pod_port': test_port,
237                          'expiration_date': str(exp_date),
238                          'remaining_days': delta_time,
239                          'cluster_ip': service.spec.cluster_ip,
240                          'issuer': cert_info['issuer'],
241                          'validity': cert_info['validity']})
242                 else:
243                     LOGGER.debug("Port value retrieved as None")
244         except ssl.SSLError as e:
245             LOGGER.exception("Bad certificate for port %s" % port)
246             node_ports_ssl_error_list.append(
247                 {'pod_name': test_name,
248                  'pod_port': test_port,
249                  'error_details': str(e)})
250         except ConnectionRefusedError as e:
251             LOGGER.exception("ConnectionrefusedError for port %s" % port)
252             node_ports_connection_error_list.append(
253                 {'pod_name': test_name,
254                  'pod_port': test_port,
255                  'error_details': str(e)})
256         except TypeError as e:
257             LOGGER.exception("Type Error for port %s" % port)
258             node_ports_type_error_list.append(
259                 {'pod_name': test_name,
260                  'pod_port': test_port,
261                  'error_details': str(e)})
262         except ConnectionResetError as e:
263             LOGGER.exception("ConnectionResetError for port %s" % port)
264             node_ports_reset_error_list.append(
265                 {'pod_name': test_name,
266                  'pod_port': test_port,
267                  'error_details': str(e)})
268         except:
269             LOGGER.error("Unknown error")
270
271     # Create html summary
272     jinja_env = Environment(
273         autoescape=select_autoescape(['html']),
274         loader=FileSystemLoader('./templates'))
275     if args.mode == 'nodeport':
276         jinja_env.get_template('cert-nodeports.html.j2').stream(
277             node_ports_list=node_ports_list,
278             node_ports_ssl_error_list=node_ports_ssl_error_list,
279             node_ports_connection_error_list=node_ports_connection_error_list,
280             node_ports_type_error_list=node_ports_type_error_list,
281             node_ports_reset_error_list=node_ports_reset_error_list).dump(
282             '{}/certificates.html'.format(args.dir))
283     else:
284         jinja_env.get_template('cert-internal.html.j2').stream(
285             node_ports_list=node_ports_list,
286             node_ports_ssl_error_list=node_ports_ssl_error_list,
287             node_ports_connection_error_list=node_ports_connection_error_list,
288             node_ports_type_error_list=node_ports_type_error_list,
289             node_ports_reset_error_list=node_ports_reset_error_list).dump(
290             '{}/certificates.html'.format(args.dir))
291
292     return success_criteria
293
294
295 def test_ingress(k8s_ingress, mode):
296     LOGGER.debug('Test %s mode', mode)
297     for ingress in k8s_ingress:
298         LOGGER.debug(ingress)
299     return True
300
301
302 # ***************************************************************************
303 # ***************************************************************************
304 # start of the test
305 # ***************************************************************************
306 # ***************************************************************************
307 test_status = True
308 if args.mode == "ingress":
309     test_routine = test_ingress
310     test_param = k8s_ingress
311 else:
312     test_routine = test_services
313     test_param = k8s_services
314
315 LOGGER.info(">>>> Test certificates: mode = %s", args.mode)
316 if test_routine(test_param, args.mode):
317     LOGGER.warning(">>>> Test PASS")
318 else:
319     LOGGER.warning(">>>> Test FAIL")
320     test_status = False
321
322 if test_status:
323     LOGGER.info(">>>> Test Check certificates PASS")
324 else:
325     LOGGER.error(">>>> Test Check certificates FAIL")
326     sys.exit(1)