Add the support of activate/deactivate events to RAN NSSMF simulator
[integration.git] / test / security / check_certificates / check_certificates / check_certificates_validity.py
1 #!/usr/bin/env python3
2 #   COPYRIGHT NOTICE STARTS HERE
3 #
4 #   Copyright 2020 Orange, Ltd.
5 #
6 #   Licensed under the Apache License, Version 2.0 (the "License");
7 #   you may not use this file except in compliance with the License.
8 #   You may obtain a copy of the License at
9 #
10 #       http://www.apache.org/licenses/LICENSE-2.0
11 #
12 #   Unless required by applicable law or agreed to in writing, software
13 #   distributed under the License is distributed on an "AS IS" BASIS,
14 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 #   See the License for the specific language governing permissions and
16 #   limitations under the License.
17 #
18 #   COPYRIGHT NOTICE ENDS HERE
19
20 # Check all the kubernetes pods, evaluate the certificate and build a
21 # certificate dashboard.
22 #
23 # Dependencies:
24 #     See requirements.txt
25 #     The dashboard is based on bulma framework
26 #
27 # Environment:
28 #   This script should be run on the local machine which has network access to
29 # the onap K8S cluster.
30 #   It requires k8s cluster config file on local machine
31 #   It requires also the ONAP IP provided through an env variable ONAP_IP
32 #   ONAP_NAMESPACE env variable is also considered
33 # if not set we set it to onap
34 # Example usage:
35 #       python check_certificates_validity.py
36 # the summary html page will be generated where the script is launched
37 """
38 Check ONAP certificates
39 """
40 import argparse
41 import logging
42 import os
43 import ssl
44 import sys
45 import OpenSSL
46 from datetime import datetime
47 from kubernetes import client, config
48 from jinja2 import Environment, FileSystemLoader, select_autoescape
49 from socket import *  # pylint: disable=W0614
50
51 # Set SSL timeout
52 setdefaulttimeout(10)
53
54 # Logger
55 LOG_LEVEL = 'INFO'
56 logging.basicConfig()
57 LOGGER = logging.getLogger("Gating-Index")
58 LOGGER.setLevel(LOG_LEVEL)
59 CERT_MODES = ['nodeport', 'ingress', 'internal']
60 EXP_CRITERIA_MIN = 30
61 EXP_CRITERIA_MAX = 389
62 EXPECTED_CERT_STRING = "C=US;O=ONAP;OU=OSAAF;CN=intermediateCA_9"
63 RESULT_PATH = "."
64
65
66 # Get arguments
67 parser = argparse.ArgumentParser()
68 parser.add_argument(
69     '-m',
70     '--mode',
71     choices=CERT_MODES,
72     help='Mode (nodeport, ingress, internal). If not set all modes are tried',
73     default='nodeport')
74 parser.add_argument(
75     '-i',
76     '--ip',
77     help='ONAP IP needed (for nodeport mode)',
78     default=os.environ.get('ONAP_IP'))
79 parser.add_argument(
80     '-n',
81     '--namespace',
82     help='ONAP namespace',
83     default='onap')
84 parser.add_argument(
85     '-d',
86     '--dir',
87     help='Result directory',
88     default=RESULT_PATH)
89
90 args = parser.parse_args()
91
92 # Get the ONAP namespace
93 onap_namespace = args.namespace
94 LOGGER.info("Verification of the %s certificates started", onap_namespace)
95
96 # Create the target dir (in case it does not exist)
97 if os.pardir not in args.dir:
98     os.makedirs(args.dir, exist_ok=True)
99
100 # Nodeport specific section
101 # Retrieve the kubernetes IP for mode nodeport
102 if args.mode == "nodeport":
103     if args.ip is None:
104         LOGGER.error(
105             "In nodeport mode, the IP of the ONAP cluster is required." +
106             "The value can be set using -i option " +
107             "or retrieved from the ONAP_IP env variable")
108         exit(parser.print_usage())
109     try:
110         nodeports_xfail_list = []
111         with open('nodeports_xfail.txt', 'r') as f:
112             first_line = f.readline()
113             for line in f:
114                 nodeports_xfail_list.append(
115                     line.split(" ", 1)[0].strip('\n'))
116                 LOGGER.info("nodeports xfail list: %s",
117                             nodeports_xfail_list)
118     except KeyError:
119         LOGGER.error("Please set the environment variable ONAP_IP")
120         sys.exit(1)
121     except FileNotFoundError:
122         LOGGER.warning("Nodeport xfail list not found")
123
124 # Kubernetes section
125 # retrieve the candidate ports first
126 if args.mode == "internal":
127     k8s_config = config.load_incluster_config()
128 else:
129     k8s_config = config.load_kube_config()
130
131 core = client.CoreV1Api()
132 api_instance = client.NetworkingV1Api(
133     client.ApiClient(k8s_config))
134 k8s_services = core.list_namespaced_service(onap_namespace).items
135 k8s_ingress = api_instance.list_namespaced_ingress(onap_namespace).items
136
137
138 def get_certifificate_info(host, port):
139     LOGGER.debug("Host: %s", host)
140     LOGGER.debug("Port: %s", port)
141     cert = ssl.get_server_certificate(
142         (host, port))
143     LOGGER.debug("get certificate")
144     x509 = OpenSSL.crypto.load_certificate(
145         OpenSSL.crypto.FILETYPE_PEM, cert)
146
147     LOGGER.debug("get certificate")
148     exp_date = datetime.strptime(
149         x509.get_notAfter().decode('ascii'), '%Y%m%d%H%M%SZ')
150     LOGGER.debug("Expiration date retrieved %s", exp_date)
151     issuer = x509.get_issuer().get_components()
152
153     issuer_info = ''
154     # format issuer nicely
155     for issuer_info_key, issuer_info_val in issuer:
156         issuer_info += (issuer_info_key.decode('utf-8') + "=" +
157                         issuer_info_val.decode('utf-8') + ";")
158     cert_validity = False
159     if issuer_info[:-1] == EXPECTED_CERT_STRING:
160         cert_validity = True
161
162     return {'expiration_date': exp_date,
163             'issuer': issuer_info[:-1],
164             'validity': cert_validity}
165
166
167 def test_services(k8s_services, mode):
168     success_criteria = True  # success criteria per scan
169     # looks for the certificates
170     node_ports_list = []
171     node_ports_ssl_error_list = []
172     node_ports_connection_error_list = []
173     node_ports_type_error_list = []
174     node_ports_reset_error_list = []
175
176     # for node ports and internal we consider the services
177     # for the ingress we consider the ingress
178     for service in k8s_services:
179         try:
180             for port in service.spec.ports:
181                 # For nodeport mode, we consider
182                 # - the IP of the cluster
183                 # - spec.port.node_port
184                 #
185                 # For internal mode, we consider
186                 # - spec.selector.app
187                 # - spec.port.port
188                 test_name = service.metadata.name
189                 test_port = None
190                 error_waiver = False  # waiver per port
191                 if mode == 'nodeport':
192                     test_url = args.ip
193                     test_port = port.node_port
194
195                     # Retrieve the nodeport xfail list
196                     # to consider SECCOM waiver if needed
197                     if test_port in nodeports_xfail_list:
198                         error_waiver = True
199                 else:  # internal mode
200                     test_port = port.port
201                     test_url = ''
202                     # in Internal mode there are 2 types
203                     # app
204                     # app.kubernetes.io/name
205                     try:
206                         test_url = service.spec.selector['app']
207                     except KeyError:
208                         test_url = service.spec.selector['app.kubernetes.io/name']
209
210                 if test_port is not None:
211                     LOGGER.info(
212                         "Look for certificate %s (%s:%s)",
213                         test_name,
214                         test_url,
215                         test_port)
216                     cert_info = get_certifificate_info(test_url, test_port)
217                     exp_date = cert_info['expiration_date']
218                     LOGGER.info("Expiration date retrieved %s", exp_date)
219                     # calculate the remaining time
220                     delta_time = (exp_date - datetime.now()).days
221
222                     # Test criteria
223                     if error_waiver:
224                         LOGGER.info("Port found in the xfail list," +
225                                     "do not consider it for success criteria")
226                     else:
227                         if (delta_time < EXP_CRITERIA_MIN or
228                                 delta_time > EXP_CRITERIA_MAX):
229                             success_criteria = False
230                         if cert_info['validity'] is False:
231                             success_criteria = False
232                     # add certificate to the list
233                     node_ports_list.append(
234                         {'pod_name': test_name,
235                          'pod_port': test_port,
236                          'expiration_date': str(exp_date),
237                          'remaining_days': delta_time,
238                          'cluster_ip': service.spec.cluster_ip,
239                          'issuer': cert_info['issuer'],
240                          'validity': cert_info['validity']})
241                 else:
242                     LOGGER.debug("Port value retrieved as None")
243         except ssl.SSLError as e:
244             LOGGER.exception("Bad certificate for port %s" % port)
245             node_ports_ssl_error_list.append(
246                 {'pod_name': test_name,
247                  'pod_port': test_port,
248                  'error_details': str(e)})
249         except ConnectionRefusedError as e:
250             LOGGER.exception("ConnectionrefusedError for port %s" % port)
251             node_ports_connection_error_list.append(
252                 {'pod_name': test_name,
253                  'pod_port': test_port,
254                  'error_details': str(e)})
255         except TypeError as e:
256             LOGGER.exception("Type Error for port %s" % port)
257             node_ports_type_error_list.append(
258                 {'pod_name': test_name,
259                  'pod_port': test_port,
260                  'error_details': str(e)})
261         except ConnectionResetError as e:
262             LOGGER.exception("ConnectionResetError for port %s" % port)
263             node_ports_reset_error_list.append(
264                 {'pod_name': test_name,
265                  'pod_port': test_port,
266                  'error_details': str(e)})
267         except:
268             LOGGER.error("Unknown error")
269
270     # Create html summary
271     jinja_env = Environment(
272         autoescape=select_autoescape(['html']),
273         loader=FileSystemLoader('./templates'))
274     if args.mode == 'nodeport':
275         jinja_env.get_template('cert-nodeports.html.j2').stream(
276             node_ports_list=node_ports_list,
277             node_ports_ssl_error_list=node_ports_ssl_error_list,
278             node_ports_connection_error_list=node_ports_connection_error_list,
279             node_ports_type_error_list=node_ports_type_error_list,
280             node_ports_reset_error_list=node_ports_reset_error_list).dump(
281             '{}/certificates.html'.format(args.dir))
282     else:
283         jinja_env.get_template('cert-internal.html.j2').stream(
284             node_ports_list=node_ports_list,
285             node_ports_ssl_error_list=node_ports_ssl_error_list,
286             node_ports_connection_error_list=node_ports_connection_error_list,
287             node_ports_type_error_list=node_ports_type_error_list,
288             node_ports_reset_error_list=node_ports_reset_error_list).dump(
289             '{}/certificates.html'.format(args.dir))
290
291     return success_criteria
292
293
294 def test_ingress(k8s_ingress, mode):
295     LOGGER.debug('Test %s mode', mode)
296     for ingress in k8s_ingress:
297         LOGGER.debug(ingress)
298     return True
299
300
301 # ***************************************************************************
302 # ***************************************************************************
303 # start of the test
304 # ***************************************************************************
305 # ***************************************************************************
306 test_status = True
307 if args.mode == "ingress":
308     test_routine = test_ingress
309     test_param = k8s_ingress
310 else:
311     test_routine = test_services
312     test_param = k8s_services
313
314 LOGGER.info(">>>> Test certificates: mode = %s", args.mode)
315 if test_routine(test_param, args.mode):
316     LOGGER.warning(">>>> Test PASS")
317 else:
318     LOGGER.warning(">>>> Test FAIL")
319     test_status = False
320
321 if test_status:
322     LOGGER.info(">>>> Test Check certificates PASS")
323 else:
324     LOGGER.error(">>>> Test Check certificates FAIL")
325     sys.exit(1)