1 # ============LICENSE_START=======================================================
2 # Copyright (C) 2019 Nordix Foundation.
3 # ================================================================================
4 # extended by highstreet technologies GmbH (c) 2020
5 # Copyright (c) 2021 Nokia Intellectual Property.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # SPDX-License-Identifier: Apache-2.0
20 # ============LICENSE_END=========================================================
36 odl_home = os.environ['ODL_HOME']
37 log_directory = odl_home + '/data/log/'
38 log_file = log_directory + 'installCerts.log'
39 with open(os.path.join(log_directory, 'installCerts.log'), 'w') as fp:
41 log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
42 if not os.path.exists(log_directory):
43 os.makedirs(log_directory)
44 logging.basicConfig(filename=log_file,level=logging.DEBUG,filemode='w',format=log_format)
45 print ('Start cert provisioning. Log file: ' + log_file);
48 if "ODL_CERT_DIR" in os.environ:
49 Path = os.environ['ODL_CERT_DIR']
53 username = os.environ['ODL_ADMIN_USERNAME']
54 password = os.environ['ODL_ADMIN_PASSWORD']
55 newpassword = os.environ.get('ODL_ADMIN_NEWPASSWORD')
60 postKeystore= "/rests/operations/netconf-keystore:add-keystore-entry"
61 postPrivateKey= "/rests/operations/netconf-keystore:add-private-key"
62 postTrustedCertificate= "/rests/operations/netconf-keystore:add-trusted-certificate"
64 truststore_pass_file = Path + '/truststore.pass'
65 truststore_file = Path + '/truststore.jks'
67 keystore_pass_file = Path + '/keystore.pass'
68 keystore_file = Path + '/keystore.jks'
70 jks_files = [truststore_pass_file, keystore_pass_file, keystore_file, truststore_file]
72 envOdlFeaturesBoot='ODL_FEATURES_BOOT'
73 # Strategy sli-api is default
75 certreadyUrl="/rests/operations/SLI-API:healthcheck"
77 if "SDNRWT" in os.environ:
78 sdnrWt = os.environ['SDNRWT']
81 certreadyUrl="/rests/data/network-topology:network-topology"
82 logging.info('ODL ready strategy with command %s and url %s', certreadyCmd, certreadyUrl)
85 cred_string = username + ":" + password
86 headers = {'Authorization':'Basic %s' % base64.b64encode(cred_string.encode()).decode(),
87 'X-FromAppId': 'csit-sdnc',
88 'X-TransactionId': 'csit-sdnc',
89 'Accept':"application/json",
90 'Content-type':"application/yang-data+json"}
92 def readFile(folder, file):
93 key = open(Path + "/" + folder + "/" + file, "r")
96 fileRead = "\n".join(fileRead.splitlines()[1:-1])
99 def readTrustedCertificate(folder, file):
103 key = open(folder + "/" + file, "r")
104 lines = key.readlines()
106 if not "BEGIN CERTIFICATE" in line and not "END CERTIFICATE" in line and startCa:
108 elif "BEGIN CERTIFICATE" in line:
110 elif "END CERTIFICATE" in line:
112 listCert.append(caPem)
116 def makeKeystoreKey(clientKey, count):
117 odl_private_key = "ODL_private_key_%d" %count
119 json_keystore_key='{{\"input\": {{ \"key-credential\": {{\"key-id\": \"{odl_private_key}\", \"private-key\" : ' \
120 '\"{clientKey}\",\"passphrase\" : \"\"}}}}}}'.format(
121 odl_private_key=odl_private_key,
124 return json_keystore_key
126 def makePrivateKey(clientKey, clientCrt, certList, count):
129 for cert in certList:
130 caPem += '\"%s\",' % cert
131 caPem = caPem.rsplit(',', 1)[0]
132 odl_private_key="ODL_private_key_%d" %count
134 json_private_key='{{\"input\": {{ \"private-key\":{{\"name\": \"{odl_private_key}\", \"data\" : ' \
135 '\"{clientKey}\",\"certificate-chain\":[\"{clientCrt}\",{caPem}]}}}}}}'.format(
136 odl_private_key=odl_private_key,
141 return json_private_key
143 def makeTrustedCertificate(certList, count):
145 json_cert_format = ""
146 for cert in certList:
147 cert_name = "xNF_CA_certificate_%d_%d" %(count, number)
148 json_cert_format += '{{\"name\": \"{trusted_name}\",\"certificate\":\"{cert}\"}},\n'.format(
149 trusted_name=cert_name,
153 json_cert_format = json_cert_format.rsplit(',', 1)[0]
154 json_trusted_cert='{{\"input\": {{ \"trusted-certificate\": [{certificates}]}}}}'.format(
155 certificates=json_cert_format)
156 return json_trusted_cert
159 def makeRestconfPost(conn, json_file, apiCall):
160 req = conn.request("POST", apiCall, json_file, headers=headers)
161 res = conn.getresponse()
163 if res.status != 200 and res.status != 204:
164 logging.error("Error here, response back wasnt 200: Response was : %d , %s" % (res.status, res.reason))
165 writeCertInstallStatus("NOTOK")
167 logging.debug("Response :%s Reason :%s ",res.status, res.reason)
169 def extractZipFiles(zipFileList, count):
170 for zipFolder in zipFileList:
172 with zipfile.ZipFile(Path + "/" + zipFolder.strip(),"r") as zip_ref:
173 zip_ref.extractall(Path)
174 folder = zipFolder.rsplit(".")[0]
175 processFiles(folder, count)
176 except Exception as e:
177 logging.error("Error while extracting zip file(s). Exiting Certificate Installation.")
178 logging.info("Error details : %s" % e)
179 writeCertInstallStatus("NOTOK")
181 def processFiles(folder, count):
182 logging.info('Process folder: %d %s', count, folder)
183 for file in os.listdir(Path + "/" + folder):
184 if os.path.isfile(Path + "/" + folder + "/" + file.strip()):
186 clientKey = readFile(folder, file.strip())
187 elif "trustedCertificate" in file:
188 certList = readTrustedCertificate(Path + "/" + folder, file.strip())
190 clientCrt = readFile(folder, file.strip())
192 logging.error("Could not find file %s" % file.strip())
193 writeCertInstallStatus("NOTOK")
194 shutil.rmtree(Path + "/" + folder)
195 post_content(clientKey, clientCrt, certList, count)
197 def post_content(clientKey, clientCrt, certList, count):
198 logging.info('Post content: %d', count)
199 conn = http.client.HTTPConnection("localhost",odl_port)
202 json_keystore_key = makeKeystoreKey(clientKey, count)
203 logging.debug("Posting private key in to ODL keystore")
204 makeRestconfPost(conn, json_keystore_key, postKeystore)
207 json_trusted_cert = makeTrustedCertificate(certList, count)
208 logging.debug("Posting trusted cert list in to ODL")
209 makeRestconfPost(conn, json_trusted_cert, postTrustedCertificate)
211 if clientKey and clientCrt and certList:
212 json_private_key = makePrivateKey(clientKey, clientCrt, certList, count)
213 logging.debug("Posting the cert in to ODL")
214 makeRestconfPost(conn, json_private_key, postPrivateKey)
217 def makeHealthcheckCall(headers, timePassed):
219 # WAIT 10 minutes maximum and test every 30 seconds if HealthCheck API is returning 200
220 while timePassed < TIMEOUT:
222 conn = http.client.HTTPConnection("localhost",odl_port)
223 req = conn.request(certreadyCmd, certreadyUrl,headers=headers)
224 res = conn.getresponse()
226 httpStatus = res.status
227 if httpStatus == 200:
228 logging.debug("Healthcheck Passed in %d seconds." %timePassed)
232 logging.debug("Sleep: %d seconds before testing if Healthcheck worked. Total wait time up now is: %d seconds. Timeout is: %d seconds. Problem code was: %d" %(INTERVAL, timePassed, TIMEOUT, httpStatus))
234 logging.error("Cannot execute REST call. Sleep: %d seconds before testing if Healthcheck worked. Total wait time up now is: %d seconds. Timeout is: %d seconds." %(INTERVAL, timePassed, TIMEOUT))
235 timePassed = timeIncrement(timePassed)
237 if timePassed > TIMEOUT:
238 logging.error("TIME OUT: Healthcheck not passed in %d seconds... Could cause problems for testing activities..." %TIMEOUT)
239 writeCertInstallStatus("NOTOK")
244 def timeIncrement(timePassed):
246 timePassed = timePassed + INTERVAL
250 def get_pass(file_name):
252 with open(file_name, 'r') as file_obj:
253 password = file_obj.read().strip()
254 return "'{}'".format(password)
255 except Exception as e:
256 logging.error("Error occurred while fetching password : %s", e)
257 writeCertInstallStatus("NOTOK")
260 for file in os.listdir(Path):
261 if os.path.isfile(Path + '/' + file):
262 logging.debug("Cleaning up the file %s", Path + '/'+ file)
263 os.remove(Path + '/'+ file)
266 def jks_to_p12(file, password):
267 """Converts jks format into p12"""
272 if (file.endswith('.jks')):
273 p12_file = file.replace('.jks', '.p12')
274 jks_cmd = 'keytool -importkeystore -srckeystore {src_file} -destkeystore {dest_file} -srcstoretype JKS -srcstorepass {src_pass} -deststoretype PKCS12 -deststorepass {dest_pass}'.format(src_file=file, dest_file=p12_file, src_pass=password, dest_pass=password)
275 logging.debug("Converting %s into p12 format", file)
279 except Exception as e:
280 logging.error("Error occurred while converting jks to p12 format : %s", e)
281 writeCertInstallStatus("NOTOK")
284 def make_cert_chain(cert_chain, pattern):
287 cert_chain = cert_chain.decode('utf-8')
288 matches = re.findall(pattern, cert_chain, re.DOTALL | re.MULTILINE)
290 cert_list.append(cert.strip())
293 logging.debug(" Certificate Chain empty: %s " % cert_chain)
296 def process_jks_files(count):
298 logging.info("Processing JKS files found in %s directory " % Path)
300 if all([os.path.isfile(f) for f in jks_files]):
301 keystore_pass = get_pass(keystore_pass_file)
302 keystore_file_p12 = jks_to_p12(keystore_file, keystore_pass)
304 client_key_cmd = 'openssl pkcs12 -in {src_file} -nocerts -nodes -passin pass:{src_pass}'.format(
305 src_file=keystore_file_p12, src_pass=keystore_pass)
306 client_crt_cmd = 'openssl pkcs12 -in {src_file} -clcerts -nokeys -passin pass:{src_pass}'.format(
307 src_file=keystore_file_p12, src_pass=keystore_pass)
309 truststore_pass = get_pass(truststore_pass_file)
310 truststore_p12 = jks_to_p12(truststore_file, truststore_pass)
312 trust_cert_cmd = 'openssl pkcs12 -in {src_file} -cacerts -nokeys -passin pass:{src_pass} '.format(
313 src_file=truststore_p12, src_pass=truststore_pass)
315 key_pattern = r'(?<=-----BEGIN PRIVATE KEY-----).*?(?=-----END PRIVATE KEY-----)'
316 client_key = subprocess.check_output(client_key_cmd, shell=True)
318 client_key = make_cert_chain(client_key, key_pattern)[0]
319 logging.debug("Key Ok")
321 cert_pattern = r'(?<=-----BEGIN CERTIFICATE-----).*?(?=-----END CERTIFICATE-----)'
322 client_cert = subprocess.check_output(client_crt_cmd, shell=True)
324 client_cert = make_cert_chain(client_cert, cert_pattern)[0]
325 logging.debug("Client Cert Ok")
327 ca_cert = subprocess.check_output(trust_cert_cmd, shell=True)
329 ca_cert_list = make_cert_chain(ca_cert, cert_pattern)
330 logging.debug("CA Cert Ok")
332 if client_key and client_cert and ca_cert:
333 post_content(client_key, client_cert, ca_cert_list, count)
335 logging.debug("No JKS files found in %s directory" % Path)
336 except subprocess.CalledProcessError as err:
337 print("CalledProcessError Execution of OpenSSL command failed: %s" % err)
338 writeCertInstallStatus("NOTOK")
339 except Exception as e:
340 logging.error("UnExpected Error while processing JKS files at {0}, Caused by: {1}".format(Path, e))
341 writeCertInstallStatus("NOTOK")
343 def replaceAdminPassword(username, password, newpassword):
344 if newpassword is None:
345 logging.info('Not to replace password for user %s', username)
347 logging.info('Replace password for user %s', username)
349 jsondata = '{\"password\": \"{newpassword}\"}'.format(newpassword=newpassword)
350 url = '/auth/v1/users/{username}@sdn'.format(username=username)
351 loggin.info("Url %s data $s", url, jsondata)
352 conn = http.client.HTTPConnection("localhost",odl_port)
353 req = conn.request("PUT", url, jsondata, headers=headers)
354 res = conn.getresponse()
356 httpStatus = res.status
357 if httpStatus == 200:
358 logging.debug("New password provided successfully for user %s", username)
360 logging.debug("Password change was not possible. Problem code was: %d", httpStatus)
362 logging.error("Cannot execute REST call to set password.")
363 writeCertInstallStatus("NOTOK")
366 def readCertProperties():
368 This function searches for manually copied zip file
369 containing certificates. This is required as part
370 of backward compatibility.
371 If not foud, it searches for jks certificates.
373 connected = makeHealthcheckCall(headers, timePassed)
374 logging.info('Connected status: %s', connected)
376 replaceAdminPassword(username, password, newpassword)
378 if os.path.isfile(Path + "/certs.properties"):
379 with open(Path + "/certs.properties", "r") as f:
381 if not "*****" in line:
382 zipFileList.append(line)
384 extractZipFiles(zipFileList, count)
388 logging.debug("No certs.properties/zip files exist at: " + Path)
389 logging.info("Processing any available jks/p12 files under cert directory")
390 process_jks_files(count)
392 logging.info('Connected status: %s', connected)
393 logging.info('Stopping SDNR due to inability to install certificates')
394 writeCertInstallStatus("NOTOK")
396 def writeCertInstallStatus(installStatus):
397 if installStatus == "NOTOK":
398 with open(os.path.join(log_directory, 'INSTALLCERTSFAIL'), 'w') as fp:
401 elif installStatus == "OK":
402 with open(os.path.join(log_directory, 'INSTALLCERTSPASS'), 'w') as fp:
407 logging.info('Cert installation ending')
408 writeCertInstallStatus("OK")