Chore: Add gerrit maven verify GHA workflow
[sdnc/oam.git] / installation / sdnc / src / main / scripts / installCerts.py
1 # ============LICENSE_START=======================================================
2 #  Copyright (C) 2019 Nordix Foundation.
3 # ================================================================================
4 #  extended by highstreet technologies GmbH (c) 2020
5 #  Copyright (c) 2021 Nokia Intellectual Property.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 #      http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # SPDX-License-Identifier: Apache-2.0
20 # ============LICENSE_END=========================================================
21 #
22
23
24 # coding=utf-8
25 import os
26 import sys
27 import re
28 import http.client
29 import base64
30 import time
31 import zipfile
32 import shutil
33 import subprocess
34 import logging
35
36 odl_home = os.environ['ODL_HOME']
37 log_directory = odl_home + '/data/log/'
38 log_file = log_directory + 'installCerts.log'
39 with open(os.path.join(log_directory, 'installCerts.log'), 'w') as fp:
40     pass
41 log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
42 if not os.path.exists(log_directory):
43     os.makedirs(log_directory)
44 logging.basicConfig(filename=log_file,level=logging.DEBUG,filemode='w',format=log_format)
45 print ('Start cert provisioning. Log file: ' + log_file);
46
47 Path = "/tmp"
48 if "ODL_CERT_DIR" in os.environ:
49     Path = os.environ['ODL_CERT_DIR']
50
51 zipFileList = []
52
53 username = os.environ['ODL_ADMIN_USERNAME']
54 password = os.environ['ODL_ADMIN_PASSWORD']
55 TIMEOUT=1000
56 INTERVAL=30
57 timePassed=0
58
59 postKeystore= "/rests/operations/netconf-keystore:add-keystore-entry"
60 postPrivateKey= "/rests/operations/netconf-keystore:add-private-key"
61 postTrustedCertificate= "/rests/operations/netconf-keystore:add-trusted-certificate"
62
63 truststore_pass_file = Path + '/truststore.pass'
64 truststore_file = Path + '/truststore.jks'
65
66 keystore_pass_file = Path + '/keystore.pass'
67 keystore_file = Path + '/keystore.jks'
68
69 jks_files = [truststore_pass_file, keystore_pass_file, keystore_file, truststore_file]
70
71 envOdlFeaturesBoot='ODL_FEATURES_BOOT'
72 # Strategy sli-api is default
73 certreadyCmd="POST"
74 certreadyUrl="/rests/operations/SLI-API:healthcheck"
75
76 if "SDNRWT" in os.environ: 
77     sdnrWt = os.environ['SDNRWT']
78     if sdnrWt == "true":
79         certreadyCmd="GET"
80         certreadyUrl="/rests/data/network-topology:network-topology"
81 logging.info('ODL ready strategy with command %s and url %s', certreadyCmd, certreadyUrl)
82
83 odl_port = 8181
84 cred_string = username + ":" + password
85 headers = {'Authorization':'Basic %s' % base64.b64encode(cred_string.encode()).decode(),
86            'X-FromAppId': 'csit-sdnc',
87            'X-TransactionId': 'csit-sdnc',
88            'Accept':"application/json",
89            'Content-type':"application/yang-data+json"}
90
91 def readFile(folder, file):
92     key = open(Path + "/" + folder + "/" + file, "r")
93     fileRead = key.read()
94     key.close()
95     fileRead = "\n".join(fileRead.splitlines()[1:-1])
96     return fileRead
97
98 def readTrustedCertificate(folder, file):
99     listCert = list()
100     caPem = ""
101     startCa = False
102     key = open(folder + "/" + file, "r")
103     lines = key.readlines()
104     for line in lines:
105         if not "BEGIN CERTIFICATE" in line and not "END CERTIFICATE" in line and startCa:
106             caPem += line
107         elif "BEGIN CERTIFICATE" in line:
108             startCa = True
109         elif "END CERTIFICATE" in line:
110             startCa = False
111             listCert.append(caPem)
112             caPem = ""
113     return listCert
114
115 def makeKeystoreKey(clientKey, count):
116     odl_private_key = "ODL_private_key_%d" %count
117
118     json_keystore_key='{{\"input\": {{ \"key-credential\": {{\"key-id\": \"{odl_private_key}\", \"private-key\" : ' \
119                       '\"{clientKey}\",\"passphrase\" : \"\"}}}}}}'.format(
120         odl_private_key=odl_private_key,
121         clientKey=clientKey)
122
123     return json_keystore_key
124
125 def makePrivateKey(clientKey, clientCrt, certList, count):
126     caPem = ""
127     if certList:
128         for cert in certList:
129             caPem += '\"%s\",' % cert
130         caPem = caPem.rsplit(',', 1)[0]
131     odl_private_key="ODL_private_key_%d" %count
132
133     json_private_key='{{\"input\": {{ \"private-key\":{{\"name\": \"{odl_private_key}\", \"data\" : ' \
134                      '\"{clientKey}\",\"certificate-chain\":[\"{clientCrt}\",{caPem}]}}}}}}'.format(
135         odl_private_key=odl_private_key,
136         clientKey=clientKey,
137         clientCrt=clientCrt,
138         caPem=caPem)
139
140     return json_private_key
141
142 def makeTrustedCertificate(certList, count):
143     number = 0
144     json_cert_format = ""
145     for cert in certList:
146         cert_name = "xNF_CA_certificate_%d_%d" %(count, number)
147         json_cert_format += '{{\"name\": \"{trusted_name}\",\"certificate\":\"{cert}\"}},\n'.format(
148             trusted_name=cert_name,
149             cert=cert.strip())
150         number += 1
151
152     json_cert_format = json_cert_format.rsplit(',', 1)[0]
153     json_trusted_cert='{{\"input\": {{ \"trusted-certificate\": [{certificates}]}}}}'.format(
154         certificates=json_cert_format)
155     return json_trusted_cert
156
157
158 def makeRestconfPost(conn, json_file, apiCall):
159     req = conn.request("POST", apiCall, json_file, headers=headers)
160     res = conn.getresponse()
161     res.read()
162     if res.status != 200 and res.status != 204:
163         logging.error("Error here, response back wasnt 200: Response was : %d , %s" % (res.status, res.reason))
164         writeCertInstallStatus("NOTOK")
165     else:
166         logging.debug("Response :%s Reason :%s ",res.status, res.reason)
167
168 def extractZipFiles(zipFileList, count):
169     for zipFolder in zipFileList:
170         try:
171                 with zipfile.ZipFile(Path + "/" + zipFolder.strip(),"r") as zip_ref:
172                     zip_ref.extractall(Path)
173                 folder = zipFolder.rsplit(".")[0]
174                 processFiles(folder, count)
175         except Exception as e:
176                 logging.error("Error while extracting zip file(s). Exiting Certificate Installation.")
177                 logging.info("Error details : %s" % e)
178                 writeCertInstallStatus("NOTOK")
179
180 def processFiles(folder, count):
181     logging.info('Process folder: %d %s', count, folder)
182     for file in os.listdir(Path + "/" + folder):
183         if os.path.isfile(Path + "/" + folder + "/" + file.strip()):
184             if ".key" in file:
185                 clientKey = readFile(folder, file.strip())
186             elif "trustedCertificate" in file:
187                 certList = readTrustedCertificate(Path + "/" + folder, file.strip())
188             elif ".crt" in file:
189                 clientCrt = readFile(folder, file.strip())
190         else:
191             logging.error("Could not find file %s" % file.strip())
192             writeCertInstallStatus("NOTOK")
193     shutil.rmtree(Path + "/" + folder)
194     post_content(clientKey, clientCrt, certList, count)
195
196 def post_content(clientKey, clientCrt, certList, count):
197     logging.info('Post content: %d', count)
198     conn = http.client.HTTPConnection("localhost",odl_port)
199
200     if clientKey:
201         json_keystore_key = makeKeystoreKey(clientKey, count)
202         logging.debug("Posting private key in to ODL keystore")
203         makeRestconfPost(conn, json_keystore_key, postKeystore)
204
205     if certList:
206         json_trusted_cert = makeTrustedCertificate(certList, count)
207         logging.debug("Posting trusted cert list in to ODL")
208         makeRestconfPost(conn, json_trusted_cert, postTrustedCertificate)
209
210     if clientKey and clientCrt and certList:
211         json_private_key = makePrivateKey(clientKey, clientCrt, certList, count)
212         logging.debug("Posting the cert in to ODL")
213         makeRestconfPost(conn, json_private_key, postPrivateKey)
214
215
216 def makeHealthcheckCall(headers, timePassed):
217     connected = False
218     # WAIT 10 minutes maximum and test every 30 seconds if HealthCheck API is returning 200
219     while timePassed < TIMEOUT:
220         try:
221             conn = http.client.HTTPConnection("localhost",odl_port)
222             req = conn.request(certreadyCmd, certreadyUrl,headers=headers)
223             res = conn.getresponse()
224             res.read()
225             httpStatus = res.status
226             if httpStatus == 200:
227                 logging.debug("Healthcheck Passed in %d seconds." %timePassed)
228                 connected = True
229                 break
230             else:
231                 logging.debug("Sleep: %d seconds before testing if Healthcheck worked. Total wait time up now is: %d seconds. Timeout is: %d seconds. Problem code was: %d" %(INTERVAL, timePassed, TIMEOUT, httpStatus))
232         except:
233             logging.error("Cannot execute REST call. Sleep: %d seconds before testing if Healthcheck worked. Total wait time up now is: %d seconds. Timeout is: %d seconds." %(INTERVAL, timePassed, TIMEOUT))
234         timePassed = timeIncrement(timePassed)
235
236     if timePassed > TIMEOUT:
237         logging.error("TIME OUT: Healthcheck not passed in  %d seconds... Could cause problems for testing activities..." %TIMEOUT)
238         writeCertInstallStatus("NOTOK")
239
240     return connected
241
242
243 def timeIncrement(timePassed):
244     time.sleep(INTERVAL)
245     timePassed = timePassed + INTERVAL
246     return timePassed
247
248
249 def get_pass(file_name):
250     try:
251         with open(file_name, 'r') as file_obj:
252             password = file_obj.read().strip()
253         return "'{}'".format(password)
254     except Exception as e:
255         logging.error("Error occurred while fetching password : %s", e)
256         writeCertInstallStatus("NOTOK")
257
258 def cleanup():
259     for file in os.listdir(Path):
260         if os.path.isfile(Path + '/' + file):
261             logging.debug("Cleaning up the file %s", Path + '/'+ file)
262             os.remove(Path + '/'+ file)
263
264
265 def jks_to_p12(file, password):
266     """Converts jks format into p12"""
267     try:
268         certList = []
269         key = None
270         cert = None
271         if (file.endswith('.jks')):
272              p12_file = file.replace('.jks', '.p12')
273              jks_cmd = 'keytool -importkeystore -srckeystore {src_file} -destkeystore {dest_file} -srcstoretype JKS -srcstorepass {src_pass} -deststoretype PKCS12 -deststorepass {dest_pass}'.format(src_file=file, dest_file=p12_file, src_pass=password, dest_pass=password)
274              logging.debug("Converting %s into p12 format", file)
275              os.system(jks_cmd)
276              file = p12_file
277              return file
278     except Exception as e:
279         logging.error("Error occurred while converting jks to p12 format : %s", e)
280         writeCertInstallStatus("NOTOK")
281
282
283 def make_cert_chain(cert_chain, pattern):
284     cert_list = []
285     if cert_chain:
286         cert_chain = cert_chain.decode('utf-8')
287         matches = re.findall(pattern, cert_chain, re.DOTALL | re.MULTILINE)
288         for cert in matches:
289             cert_list.append(cert.strip())
290         return cert_list
291     else:
292         logging.debug(" Certificate Chain empty: %s " % cert_chain)
293
294
295 def process_jks_files(count):
296     ca_cert_list = []
297     logging.info("Processing JKS files found in %s directory " % Path)
298     try:
299         if all([os.path.isfile(f) for f in jks_files]):
300             keystore_pass = get_pass(keystore_pass_file)
301             keystore_file_p12 = jks_to_p12(keystore_file, keystore_pass)
302
303             client_key_cmd = 'openssl pkcs12 -in {src_file} -nocerts -nodes -passin pass:{src_pass}'.format(
304                 src_file=keystore_file_p12, src_pass=keystore_pass)
305             client_crt_cmd = 'openssl pkcs12 -in {src_file} -clcerts -nokeys  -passin pass:{src_pass}'.format(
306                 src_file=keystore_file_p12, src_pass=keystore_pass)
307
308             truststore_pass = get_pass(truststore_pass_file)
309             truststore_p12 = jks_to_p12(truststore_file, truststore_pass)
310
311             trust_cert_cmd = 'openssl pkcs12 -in {src_file} -cacerts -nokeys -passin pass:{src_pass} '.format(
312                 src_file=truststore_p12, src_pass=truststore_pass)
313
314             key_pattern = r'(?<=-----BEGIN PRIVATE KEY-----).*?(?=-----END PRIVATE KEY-----)'
315             client_key = subprocess.check_output(client_key_cmd, shell=True)
316             if client_key:
317                 client_key = make_cert_chain(client_key, key_pattern)[0]
318                 logging.debug("Key Ok")
319
320             cert_pattern = r'(?<=-----BEGIN CERTIFICATE-----).*?(?=-----END CERTIFICATE-----)'
321             client_cert = subprocess.check_output(client_crt_cmd, shell=True)
322             if client_cert:
323                 client_cert = make_cert_chain(client_cert, cert_pattern)[0]
324                 logging.debug("Client Cert Ok")
325
326             ca_cert = subprocess.check_output(trust_cert_cmd, shell=True)
327             if ca_cert:
328                 ca_cert_list = make_cert_chain(ca_cert, cert_pattern)
329                 logging.debug("CA Cert Ok")
330
331             if client_key and client_cert and ca_cert:
332                 post_content(client_key, client_cert, ca_cert_list, count)
333         else:
334             logging.debug("No JKS files found in %s directory" % Path)
335     except subprocess.CalledProcessError as err:
336         print("CalledProcessError Execution of OpenSSL command failed: %s" % err)
337         writeCertInstallStatus("NOTOK")
338     except Exception as e:
339         logging.error("UnExpected Error while processing JKS files at {0}, Caused by: {1}".format(Path, e))
340         writeCertInstallStatus("NOTOK")
341
342 def readCertProperties():
343     '''
344     This function searches for manually copied zip file
345     containing certificates. This is required as part
346     of backward compatibility.
347     If not foud, it searches for jks certificates.
348     '''
349     connected = makeHealthcheckCall(headers, timePassed)
350     logging.info('Connected status: %s', connected)
351     if connected:
352         count = 0
353         if os.path.isfile(Path + "/certs.properties"):
354             with open(Path + "/certs.properties", "r") as f:
355                 for line in f:
356                     if not "*****" in line:
357                         zipFileList.append(line)
358                     else:
359                         extractZipFiles(zipFileList, count)
360                         count += 1
361                         del zipFileList[:]
362         else:
363             logging.debug("No certs.properties/zip files exist at: " + Path)
364             logging.info("Processing any  available jks/p12 files under cert directory")
365             process_jks_files(count)
366     else:
367         logging.info('Connected status: %s', connected)
368         logging.info('Stopping SDNR due to inability to install certificates')
369         writeCertInstallStatus("NOTOK")
370         
371 def writeCertInstallStatus(installStatus):
372     if installStatus == "NOTOK":
373         with open(os.path.join(log_directory, 'INSTALLCERTSFAIL'), 'w') as fp:
374             pass
375             sys.exit(1)
376     elif installStatus == "OK":
377         with open(os.path.join(log_directory, 'INSTALLCERTSPASS'), 'w') as fp:
378             pass
379             sys.exit(0)
380
381 readCertProperties()
382 logging.info('Cert installation ending')
383 writeCertInstallStatus("OK")
384