1 # ============LICENSE_START=======================================================
2 # Copyright (C) 2019 Nordix Foundation.
3 # ================================================================================
4 # extended by highstreet technologies GmbH (c) 2020
5 # Copyright (c) 2021 Nokia Intellectual Property.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # SPDX-License-Identifier: Apache-2.0
20 # ============LICENSE_END=========================================================
36 odl_home = os.environ['ODL_HOME']
37 log_directory = odl_home + '/data/log/'
38 log_file = log_directory + 'installCerts.log'
39 with open(os.path.join(log_directory, 'installCerts.log'), 'w') as fp:
41 log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
42 if not os.path.exists(log_directory):
43 os.makedirs(log_directory)
44 logging.basicConfig(filename=log_file,level=logging.DEBUG,filemode='w',format=log_format)
45 print ('Start cert provisioning. Log file: ' + log_file);
48 if "ODL_CERT_DIR" in os.environ:
49 Path = os.environ['ODL_CERT_DIR']
53 username = os.environ['ODL_ADMIN_USERNAME']
54 password = os.environ['ODL_ADMIN_PASSWORD']
59 postKeystore= "/rests/operations/netconf-keystore:add-keystore-entry"
60 postPrivateKey= "/rests/operations/netconf-keystore:add-private-key"
61 postTrustedCertificate= "/rests/operations/netconf-keystore:add-trusted-certificate"
63 truststore_pass_file = Path + '/truststore.pass'
64 truststore_file = Path + '/truststore.jks'
66 keystore_pass_file = Path + '/keystore.pass'
67 keystore_file = Path + '/keystore.jks'
69 jks_files = [truststore_pass_file, keystore_pass_file, keystore_file, truststore_file]
71 envOdlFeaturesBoot='ODL_FEATURES_BOOT'
72 # Strategy sli-api is default
74 certreadyUrl="/rests/operations/SLI-API:healthcheck"
76 if "SDNRWT" in os.environ:
77 sdnrWt = os.environ['SDNRWT']
80 certreadyUrl="/rests/data/network-topology:network-topology"
81 logging.info('ODL ready strategy with command %s and url %s', certreadyCmd, certreadyUrl)
84 cred_string = username + ":" + password
85 headers = {'Authorization':'Basic %s' % base64.b64encode(cred_string.encode()).decode(),
86 'X-FromAppId': 'csit-sdnc',
87 'X-TransactionId': 'csit-sdnc',
88 'Accept':"application/json",
89 'Content-type':"application/yang-data+json"}
91 def readFile(folder, file):
92 key = open(Path + "/" + folder + "/" + file, "r")
95 fileRead = "\n".join(fileRead.splitlines()[1:-1])
98 def readTrustedCertificate(folder, file):
102 key = open(folder + "/" + file, "r")
103 lines = key.readlines()
105 if not "BEGIN CERTIFICATE" in line and not "END CERTIFICATE" in line and startCa:
107 elif "BEGIN CERTIFICATE" in line:
109 elif "END CERTIFICATE" in line:
111 listCert.append(caPem)
115 def makeKeystoreKey(clientKey, count):
116 odl_private_key = "ODL_private_key_%d" %count
118 json_keystore_key='{{\"input\": {{ \"key-credential\": {{\"key-id\": \"{odl_private_key}\", \"private-key\" : ' \
119 '\"{clientKey}\",\"passphrase\" : \"\"}}}}}}'.format(
120 odl_private_key=odl_private_key,
123 return json_keystore_key
125 def makePrivateKey(clientKey, clientCrt, certList, count):
128 for cert in certList:
129 caPem += '\"%s\",' % cert
130 caPem = caPem.rsplit(',', 1)[0]
131 odl_private_key="ODL_private_key_%d" %count
133 json_private_key='{{\"input\": {{ \"private-key\":{{\"name\": \"{odl_private_key}\", \"data\" : ' \
134 '\"{clientKey}\",\"certificate-chain\":[\"{clientCrt}\",{caPem}]}}}}}}'.format(
135 odl_private_key=odl_private_key,
140 return json_private_key
142 def makeTrustedCertificate(certList, count):
144 json_cert_format = ""
145 for cert in certList:
146 cert_name = "xNF_CA_certificate_%d_%d" %(count, number)
147 json_cert_format += '{{\"name\": \"{trusted_name}\",\"certificate\":\"{cert}\"}},\n'.format(
148 trusted_name=cert_name,
152 json_cert_format = json_cert_format.rsplit(',', 1)[0]
153 json_trusted_cert='{{\"input\": {{ \"trusted-certificate\": [{certificates}]}}}}'.format(
154 certificates=json_cert_format)
155 return json_trusted_cert
158 def makeRestconfPost(conn, json_file, apiCall):
159 req = conn.request("POST", apiCall, json_file, headers=headers)
160 res = conn.getresponse()
162 if res.status != 200 and res.status != 204:
163 logging.error("Error here, response back wasnt 200: Response was : %d , %s" % (res.status, res.reason))
164 writeCertInstallStatus("NOTOK")
166 logging.debug("Response :%s Reason :%s ",res.status, res.reason)
168 def extractZipFiles(zipFileList, count):
169 for zipFolder in zipFileList:
171 with zipfile.ZipFile(Path + "/" + zipFolder.strip(),"r") as zip_ref:
172 zip_ref.extractall(Path)
173 folder = zipFolder.rsplit(".")[0]
174 processFiles(folder, count)
175 except Exception as e:
176 logging.error("Error while extracting zip file(s). Exiting Certificate Installation.")
177 logging.info("Error details : %s" % e)
178 writeCertInstallStatus("NOTOK")
180 def processFiles(folder, count):
181 logging.info('Process folder: %d %s', count, folder)
182 for file in os.listdir(Path + "/" + folder):
183 if os.path.isfile(Path + "/" + folder + "/" + file.strip()):
185 clientKey = readFile(folder, file.strip())
186 elif "trustedCertificate" in file:
187 certList = readTrustedCertificate(Path + "/" + folder, file.strip())
189 clientCrt = readFile(folder, file.strip())
191 logging.error("Could not find file %s" % file.strip())
192 writeCertInstallStatus("NOTOK")
193 shutil.rmtree(Path + "/" + folder)
194 post_content(clientKey, clientCrt, certList, count)
196 def post_content(clientKey, clientCrt, certList, count):
197 logging.info('Post content: %d', count)
198 conn = http.client.HTTPConnection("localhost",odl_port)
201 json_keystore_key = makeKeystoreKey(clientKey, count)
202 logging.debug("Posting private key in to ODL keystore")
203 makeRestconfPost(conn, json_keystore_key, postKeystore)
206 json_trusted_cert = makeTrustedCertificate(certList, count)
207 logging.debug("Posting trusted cert list in to ODL")
208 makeRestconfPost(conn, json_trusted_cert, postTrustedCertificate)
210 if clientKey and clientCrt and certList:
211 json_private_key = makePrivateKey(clientKey, clientCrt, certList, count)
212 logging.debug("Posting the cert in to ODL")
213 makeRestconfPost(conn, json_private_key, postPrivateKey)
216 def makeHealthcheckCall(headers, timePassed):
218 # WAIT 10 minutes maximum and test every 30 seconds if HealthCheck API is returning 200
219 while timePassed < TIMEOUT:
221 conn = http.client.HTTPConnection("localhost",odl_port)
222 req = conn.request(certreadyCmd, certreadyUrl,headers=headers)
223 res = conn.getresponse()
225 httpStatus = res.status
226 if httpStatus == 200:
227 logging.debug("Healthcheck Passed in %d seconds." %timePassed)
231 logging.debug("Sleep: %d seconds before testing if Healthcheck worked. Total wait time up now is: %d seconds. Timeout is: %d seconds. Problem code was: %d" %(INTERVAL, timePassed, TIMEOUT, httpStatus))
233 logging.error("Cannot execute REST call. Sleep: %d seconds before testing if Healthcheck worked. Total wait time up now is: %d seconds. Timeout is: %d seconds." %(INTERVAL, timePassed, TIMEOUT))
234 timePassed = timeIncrement(timePassed)
236 if timePassed > TIMEOUT:
237 logging.error("TIME OUT: Healthcheck not passed in %d seconds... Could cause problems for testing activities..." %TIMEOUT)
238 writeCertInstallStatus("NOTOK")
243 def timeIncrement(timePassed):
245 timePassed = timePassed + INTERVAL
249 def get_pass(file_name):
251 with open(file_name, 'r') as file_obj:
252 password = file_obj.read().strip()
253 return "'{}'".format(password)
254 except Exception as e:
255 logging.error("Error occurred while fetching password : %s", e)
256 writeCertInstallStatus("NOTOK")
259 for file in os.listdir(Path):
260 if os.path.isfile(Path + '/' + file):
261 logging.debug("Cleaning up the file %s", Path + '/'+ file)
262 os.remove(Path + '/'+ file)
265 def jks_to_p12(file, password):
266 """Converts jks format into p12"""
271 if (file.endswith('.jks')):
272 p12_file = file.replace('.jks', '.p12')
273 jks_cmd = 'keytool -importkeystore -srckeystore {src_file} -destkeystore {dest_file} -srcstoretype JKS -srcstorepass {src_pass} -deststoretype PKCS12 -deststorepass {dest_pass}'.format(src_file=file, dest_file=p12_file, src_pass=password, dest_pass=password)
274 logging.debug("Converting %s into p12 format", file)
278 except Exception as e:
279 logging.error("Error occurred while converting jks to p12 format : %s", e)
280 writeCertInstallStatus("NOTOK")
283 def make_cert_chain(cert_chain, pattern):
286 cert_chain = cert_chain.decode('utf-8')
287 matches = re.findall(pattern, cert_chain, re.DOTALL | re.MULTILINE)
289 cert_list.append(cert.strip())
292 logging.debug(" Certificate Chain empty: %s " % cert_chain)
295 def process_jks_files(count):
297 logging.info("Processing JKS files found in %s directory " % Path)
299 if all([os.path.isfile(f) for f in jks_files]):
300 keystore_pass = get_pass(keystore_pass_file)
301 keystore_file_p12 = jks_to_p12(keystore_file, keystore_pass)
303 client_key_cmd = 'openssl pkcs12 -in {src_file} -nocerts -nodes -passin pass:{src_pass}'.format(
304 src_file=keystore_file_p12, src_pass=keystore_pass)
305 client_crt_cmd = 'openssl pkcs12 -in {src_file} -clcerts -nokeys -passin pass:{src_pass}'.format(
306 src_file=keystore_file_p12, src_pass=keystore_pass)
308 truststore_pass = get_pass(truststore_pass_file)
309 truststore_p12 = jks_to_p12(truststore_file, truststore_pass)
311 trust_cert_cmd = 'openssl pkcs12 -in {src_file} -cacerts -nokeys -passin pass:{src_pass} '.format(
312 src_file=truststore_p12, src_pass=truststore_pass)
314 key_pattern = r'(?<=-----BEGIN PRIVATE KEY-----).*?(?=-----END PRIVATE KEY-----)'
315 client_key = subprocess.check_output(client_key_cmd, shell=True)
317 client_key = make_cert_chain(client_key, key_pattern)[0]
318 logging.debug("Key Ok")
320 cert_pattern = r'(?<=-----BEGIN CERTIFICATE-----).*?(?=-----END CERTIFICATE-----)'
321 client_cert = subprocess.check_output(client_crt_cmd, shell=True)
323 client_cert = make_cert_chain(client_cert, cert_pattern)[0]
324 logging.debug("Client Cert Ok")
326 ca_cert = subprocess.check_output(trust_cert_cmd, shell=True)
328 ca_cert_list = make_cert_chain(ca_cert, cert_pattern)
329 logging.debug("CA Cert Ok")
331 if client_key and client_cert and ca_cert:
332 post_content(client_key, client_cert, ca_cert_list, count)
334 logging.debug("No JKS files found in %s directory" % Path)
335 except subprocess.CalledProcessError as err:
336 print("CalledProcessError Execution of OpenSSL command failed: %s" % err)
337 writeCertInstallStatus("NOTOK")
338 except Exception as e:
339 logging.error("UnExpected Error while processing JKS files at {0}, Caused by: {1}".format(Path, e))
340 writeCertInstallStatus("NOTOK")
342 def readCertProperties():
344 This function searches for manually copied zip file
345 containing certificates. This is required as part
346 of backward compatibility.
347 If not foud, it searches for jks certificates.
349 connected = makeHealthcheckCall(headers, timePassed)
350 logging.info('Connected status: %s', connected)
353 if os.path.isfile(Path + "/certs.properties"):
354 with open(Path + "/certs.properties", "r") as f:
356 if not "*****" in line:
357 zipFileList.append(line)
359 extractZipFiles(zipFileList, count)
363 logging.debug("No certs.properties/zip files exist at: " + Path)
364 logging.info("Processing any available jks/p12 files under cert directory")
365 process_jks_files(count)
367 logging.info('Connected status: %s', connected)
368 logging.info('Stopping SDNR due to inability to install certificates')
369 writeCertInstallStatus("NOTOK")
371 def writeCertInstallStatus(installStatus):
372 if installStatus == "NOTOK":
373 with open(os.path.join(log_directory, 'INSTALLCERTSFAIL'), 'w') as fp:
376 elif installStatus == "OK":
377 with open(os.path.join(log_directory, 'INSTALLCERTSPASS'), 'w') as fp:
382 logging.info('Cert installation ending')
383 writeCertInstallStatus("OK")