1 # ============LICENSE_START=======================================================
2 # Copyright (C) 2019 Nordix Foundation.
3 # ================================================================================
4 # extended by highstreet technologies GmbH (c) 2020
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
18 # SPDX-License-Identifier: Apache-2.0
19 # ============LICENSE_END=========================================================
35 odl_home = os.environ['ODL_HOME']
36 log_directory = odl_home + '/data/log/'
37 log_file = log_directory + 'installCerts.log'
38 with open(os.path.join(log_directory, 'installCerts.log'), 'w') as fp:
40 log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
41 if not os.path.exists(log_directory):
42 os.makedirs(log_directory)
43 logging.basicConfig(filename=log_file,level=logging.DEBUG,filemode='w',format=log_format)
44 print ('Start cert provisioning. Log file: ' + log_file);
47 if "ODL_CERT_DIR" in os.environ:
48 Path = os.environ['ODL_CERT_DIR']
52 username = os.environ['ODL_ADMIN_USERNAME']
53 password = os.environ['ODL_ADMIN_PASSWORD']
58 postKeystore= "/rests/operations/netconf-keystore:add-keystore-entry"
59 postPrivateKey= "/rests/operations/netconf-keystore:add-private-key"
60 postTrustedCertificate= "/rests/operations/netconf-keystore:add-trusted-certificate"
62 truststore_pass_file = Path + '/truststore.pass'
63 truststore_file = Path + '/truststore.jks'
65 keystore_pass_file = Path + '/keystore.pass'
66 keystore_file = Path + '/keystore.jks'
68 jks_files = [truststore_pass_file, keystore_pass_file, keystore_file, truststore_file]
70 envOdlFeaturesBoot='ODL_FEATURES_BOOT'
71 # Strategy sli-api is default
73 certreadyUrl="/rests/operations/SLI-API:healthcheck"
75 if "SDNRWT" in os.environ:
76 sdnrWt = os.environ['SDNRWT']
79 certreadyUrl="/rests/data/network-topology:network-topology"
80 logging.info('ODL ready strategy with command %s and url %s', certreadyCmd, certreadyUrl)
83 cred_string = username + ":" + password
84 headers = {'Authorization':'Basic %s' % base64.b64encode(cred_string.encode()).decode(),
85 'X-FromAppId': 'csit-sdnc',
86 'X-TransactionId': 'csit-sdnc',
87 'Accept':"application/json",
88 'Content-type':"application/yang-data+json"}
90 def readFile(folder, file):
91 key = open(Path + "/" + folder + "/" + file, "r")
94 fileRead = "\n".join(fileRead.splitlines()[1:-1])
97 def readTrustedCertificate(folder, file):
101 key = open(folder + "/" + file, "r")
102 lines = key.readlines()
104 if not "BEGIN CERTIFICATE" in line and not "END CERTIFICATE" in line and startCa:
106 elif "BEGIN CERTIFICATE" in line:
108 elif "END CERTIFICATE" in line:
110 listCert.append(caPem)
114 def makeKeystoreKey(clientKey, count):
115 odl_private_key = "ODL_private_key_%d" %count
117 json_keystore_key='{{\"input\": {{ \"key-credential\": {{\"key-id\": \"{odl_private_key}\", \"private-key\" : ' \
118 '\"{clientKey}\",\"passphrase\" : \"\"}}}}}}'.format(
119 odl_private_key=odl_private_key,
122 return json_keystore_key
124 def makePrivateKey(clientKey, clientCrt, certList, count):
127 for cert in certList:
128 caPem += '\"%s\",' % cert
129 caPem = caPem.rsplit(',', 1)[0]
130 odl_private_key="ODL_private_key_%d" %count
132 json_private_key='{{\"input\": {{ \"private-key\":{{\"name\": \"{odl_private_key}\", \"data\" : ' \
133 '\"{clientKey}\",\"certificate-chain\":[\"{clientCrt}\",{caPem}]}}}}}}'.format(
134 odl_private_key=odl_private_key,
139 return json_private_key
141 def makeTrustedCertificate(certList, count):
143 json_cert_format = ""
144 for cert in certList:
145 cert_name = "xNF_CA_certificate_%d_%d" %(count, number)
146 json_cert_format += '{{\"name\": \"{trusted_name}\",\"certificate\":\"{cert}\"}},\n'.format(
147 trusted_name=cert_name,
151 json_cert_format = json_cert_format.rsplit(',', 1)[0]
152 json_trusted_cert='{{\"input\": {{ \"trusted-certificate\": [{certificates}]}}}}'.format(
153 certificates=json_cert_format)
154 return json_trusted_cert
157 def makeRestconfPost(conn, json_file, apiCall):
158 req = conn.request("POST", apiCall, json_file, headers=headers)
159 res = conn.getresponse()
161 if res.status != 200 and res.status != 204:
162 logging.error("Error here, response back wasnt 200: Response was : %d , %s" % (res.status, res.reason))
163 writeCertInstallStatus("NOTOK")
165 logging.debug("Response :%s Reason :%s ",res.status, res.reason)
167 def extractZipFiles(zipFileList, count):
168 for zipFolder in zipFileList:
170 with zipfile.ZipFile(Path + "/" + zipFolder.strip(),"r") as zip_ref:
171 zip_ref.extractall(Path)
172 folder = zipFolder.rsplit(".")[0]
173 processFiles(folder, count)
174 except Exception as e:
175 logging.error("Error while extracting zip file(s). Exiting Certificate Installation.")
176 logging.info("Error details : %s" % e)
177 writeCertInstallStatus("NOTOK")
179 def processFiles(folder, count):
180 logging.info('Process folder: %d %s', count, folder)
181 for file in os.listdir(Path + "/" + folder):
182 if os.path.isfile(Path + "/" + folder + "/" + file.strip()):
184 clientKey = readFile(folder, file.strip())
185 elif "trustedCertificate" in file:
186 certList = readTrustedCertificate(Path + "/" + folder, file.strip())
188 clientCrt = readFile(folder, file.strip())
190 logging.error("Could not find file %s" % file.strip())
191 writeCertInstallStatus("NOTOK")
192 shutil.rmtree(Path + "/" + folder)
193 post_content(clientKey, clientCrt, certList, count)
195 def post_content(clientKey, clientCrt, certList, count):
196 logging.info('Post content: %d', count)
197 conn = http.client.HTTPConnection("localhost",odl_port)
200 json_keystore_key = makeKeystoreKey(clientKey, count)
201 logging.debug("Posting private key in to ODL keystore")
202 makeRestconfPost(conn, json_keystore_key, postKeystore)
205 json_trusted_cert = makeTrustedCertificate(certList, count)
206 logging.debug("Posting trusted cert list in to ODL")
207 makeRestconfPost(conn, json_trusted_cert, postTrustedCertificate)
209 if clientKey and clientCrt and certList:
210 json_private_key = makePrivateKey(clientKey, clientCrt, certList, count)
211 logging.debug("Posting the cert in to ODL")
212 makeRestconfPost(conn, json_private_key, postPrivateKey)
215 def makeHealthcheckCall(headers, timePassed):
217 # WAIT 10 minutes maximum and test every 30 seconds if HealthCheck API is returning 200
218 while timePassed < TIMEOUT:
220 conn = http.client.HTTPConnection("localhost",odl_port)
221 req = conn.request(certreadyCmd, certreadyUrl,headers=headers)
222 res = conn.getresponse()
224 httpStatus = res.status
225 if httpStatus == 200:
226 logging.debug("Healthcheck Passed in %d seconds." %timePassed)
230 logging.debug("Sleep: %d seconds before testing if Healthcheck worked. Total wait time up now is: %d seconds. Timeout is: %d seconds. Problem code was: %d" %(INTERVAL, timePassed, TIMEOUT, httpStatus))
232 logging.error("Cannot execute REST call. Sleep: %d seconds before testing if Healthcheck worked. Total wait time up now is: %d seconds. Timeout is: %d seconds." %(INTERVAL, timePassed, TIMEOUT))
233 timePassed = timeIncrement(timePassed)
235 if timePassed > TIMEOUT:
236 logging.error("TIME OUT: Healthcheck not passed in %d seconds... Could cause problems for testing activities..." %TIMEOUT)
237 writeCertInstallStatus("NOTOK")
242 def timeIncrement(timePassed):
244 timePassed = timePassed + INTERVAL
248 def get_pass(file_name):
250 with open(file_name, 'r') as file_obj:
251 password = file_obj.read().strip()
252 return "'{}'".format(password)
253 except Exception as e:
254 logging.error("Error occurred while fetching password : %s", e)
255 writeCertInstallStatus("NOTOK")
258 for file in os.listdir(Path):
259 if os.path.isfile(Path + '/' + file):
260 logging.debug("Cleaning up the file %s", Path + '/'+ file)
261 os.remove(Path + '/'+ file)
264 def jks_to_p12(file, password):
265 """Converts jks format into p12"""
270 if (file.endswith('.jks')):
271 p12_file = file.replace('.jks', '.p12')
272 jks_cmd = 'keytool -importkeystore -srckeystore {src_file} -destkeystore {dest_file} -srcstoretype JKS -srcstorepass {src_pass} -deststoretype PKCS12 -deststorepass {dest_pass}'.format(src_file=file, dest_file=p12_file, src_pass=password, dest_pass=password)
273 logging.debug("Converting %s into p12 format", file)
277 except Exception as e:
278 logging.error("Error occurred while converting jks to p12 format : %s", e)
279 writeCertInstallStatus("NOTOK")
282 def make_cert_chain(cert_chain, pattern):
285 matches = re.findall(pattern, cert_chain, re.DOTALL | re.MULTILINE)
287 cert_list.append(cert.strip())
290 logging.debug(" Certificate Chain empty: %s " % cert_chain)
293 def process_jks_files(count):
295 logging.info("Processing JKS files found in %s directory " % Path)
297 if all([os.path.isfile(f) for f in jks_files]):
298 keystore_pass = get_pass(keystore_pass_file)
299 keystore_file_p12 = jks_to_p12(keystore_file, keystore_pass)
301 client_key_cmd = 'openssl pkcs12 -in {src_file} -nocerts -nodes -passin pass:{src_pass}'.format(
302 src_file=keystore_file_p12, src_pass=keystore_pass)
303 client_crt_cmd = 'openssl pkcs12 -in {src_file} -clcerts -nokeys -passin pass:{src_pass}'.format(
304 src_file=keystore_file_p12, src_pass=keystore_pass)
306 truststore_pass = get_pass(truststore_pass_file)
307 truststore_p12 = jks_to_p12(truststore_file, truststore_pass)
309 trust_cert_cmd = 'openssl pkcs12 -in {src_file} -cacerts -nokeys -passin pass:{src_pass} '.format(
310 src_file=truststore_p12, src_pass=truststore_pass)
312 key_pattern = r'(?<=-----BEGIN PRIVATE KEY-----).*?(?=-----END PRIVATE KEY-----)'
313 client_key = subprocess.check_output(client_key_cmd, shell=True)
315 client_key = make_cert_chain(client_key, key_pattern)[0]
316 logging.debug("Key Ok")
318 cert_pattern = r'(?<=-----BEGIN CERTIFICATE-----).*?(?=-----END CERTIFICATE-----)'
319 client_cert = subprocess.check_output(client_crt_cmd, shell=True)
321 client_cert = make_cert_chain(client_cert, cert_pattern)[0]
322 logging.debug("Client Cert Ok")
324 ca_cert = subprocess.check_output(trust_cert_cmd, shell=True)
326 ca_cert_list = make_cert_chain(ca_cert, cert_pattern)
327 logging.debug("CA Cert Ok")
329 if client_key and client_cert and ca_cert:
330 post_content(client_key, client_cert, ca_cert_list, count)
332 logging.debug("No JKS files found in %s directory" % Path)
333 except subprocess.CalledProcessError as err:
334 print("CalledProcessError Execution of OpenSSL command failed: %s" % err)
335 writeCertInstallStatus("NOTOK")
336 except Exception as e:
337 logging.error("UnExpected Error while processing JKS files at {0}, Caused by: {1}".format(Path, e))
338 writeCertInstallStatus("NOTOK")
340 def readCertProperties():
342 This function searches for manually copied zip file
343 containing certificates. This is required as part
344 of backward compatibility.
345 If not foud, it searches for jks certificates.
347 connected = makeHealthcheckCall(headers, timePassed)
348 logging.info('Connected status: %s', connected)
351 if os.path.isfile(Path + "/certs.properties"):
352 with open(Path + "/certs.properties", "r") as f:
354 if not "*****" in line:
355 zipFileList.append(line)
357 extractZipFiles(zipFileList, count)
361 logging.debug("No certs.properties/zip files exist at: " + Path)
362 logging.info("Processing any available jks/p12 files under cert directory")
363 process_jks_files(count)
365 logging.info('Connected status: %s', connected)
366 logging.info('Stopping SDNR due to inability to install certificates')
367 writeCertInstallStatus("NOTOK")
369 def writeCertInstallStatus(installStatus):
370 if installStatus == "NOTOK":
371 with open(os.path.join(log_directory, 'INSTALLCERTSFAIL'), 'w') as fp:
374 elif installStatus == "OK":
375 with open(os.path.join(log_directory, 'INSTALLCERTSPASS'), 'w') as fp:
380 logging.info('Cert installation ending')
381 writeCertInstallStatus("OK")