1 # ============LICENSE_START=======================================================
2 # Copyright (C) 2019 Nordix Foundation.
3 # ================================================================================
4 # extended by highstreet technologies GmbH (c) 2020
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
18 # SPDX-License-Identifier: Apache-2.0
19 # ============LICENSE_END=========================================================
35 odl_home = os.environ['ODL_HOME']
36 log_directory = odl_home + '/data/log/'
37 log_file = log_directory + 'installCerts.log'
38 with open(os.path.join(log_directory, 'installCerts.log'), 'w') as fp:
40 log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
41 if not os.path.exists(log_directory):
42 os.makedirs(log_directory)
43 logging.basicConfig(filename=log_file,level=logging.DEBUG,filemode='w',format=log_format)
44 print ('Start cert provisioning. Log file: ' + log_file);
47 if "ODL_CERT_DIR" in os.environ:
48 Path = os.environ['ODL_CERT_DIR']
52 username = os.environ['ODL_ADMIN_USERNAME']
53 password = os.environ['ODL_ADMIN_PASSWORD']
54 newpassword = os.environ.get('ODL_ADMIN_NEWPASSWORD')
59 postKeystore= "/rests/operations/netconf-keystore:add-keystore-entry"
60 postPrivateKey= "/rests/operations/netconf-keystore:add-private-key"
61 postTrustedCertificate= "/rests/operations/netconf-keystore:add-trusted-certificate"
63 truststore_pass_file = Path + '/truststore.pass'
64 truststore_file = Path + '/truststore.jks'
66 keystore_pass_file = Path + '/keystore.pass'
67 keystore_file = Path + '/keystore.jks'
69 jks_files = [truststore_pass_file, keystore_pass_file, keystore_file, truststore_file]
71 envOdlFeaturesBoot='ODL_FEATURES_BOOT'
72 # Strategy sli-api is default
74 certreadyUrl="/rests/operations/SLI-API:healthcheck"
76 if "SDNRWT" in os.environ:
77 sdnrWt = os.environ['SDNRWT']
80 certreadyUrl="/rests/data/network-topology:network-topology"
81 logging.info('ODL ready strategy with command %s and url %s', certreadyCmd, certreadyUrl)
84 cred_string = username + ":" + password
85 headers = {'Authorization':'Basic %s' % base64.b64encode(cred_string.encode()).decode(),
86 'X-FromAppId': 'csit-sdnc',
87 'X-TransactionId': 'csit-sdnc',
88 'Accept':"application/json",
89 'Content-type':"application/yang-data+json"}
91 def readFile(folder, file):
92 key = open(Path + "/" + folder + "/" + file, "r")
95 fileRead = "\n".join(fileRead.splitlines()[1:-1])
98 def readTrustedCertificate(folder, file):
102 key = open(folder + "/" + file, "r")
103 lines = key.readlines()
105 if not "BEGIN CERTIFICATE" in line and not "END CERTIFICATE" in line and startCa:
107 elif "BEGIN CERTIFICATE" in line:
109 elif "END CERTIFICATE" in line:
111 listCert.append(caPem)
115 def makeKeystoreKey(clientKey, count):
116 odl_private_key = "ODL_private_key_%d" %count
118 json_keystore_key='{{\"input\": {{ \"key-credential\": {{\"key-id\": \"{odl_private_key}\", \"private-key\" : ' \
119 '\"{clientKey}\",\"passphrase\" : \"\"}}}}}}'.format(
120 odl_private_key=odl_private_key,
123 return json_keystore_key
125 def makePrivateKey(clientKey, clientCrt, certList, count):
128 for cert in certList:
129 caPem += '\"%s\",' % cert
130 caPem = caPem.rsplit(',', 1)[0]
131 odl_private_key="ODL_private_key_%d" %count
133 json_private_key='{{\"input\": {{ \"private-key\":{{\"name\": \"{odl_private_key}\", \"data\" : ' \
134 '\"{clientKey}\",\"certificate-chain\":[\"{clientCrt}\",{caPem}]}}}}}}'.format(
135 odl_private_key=odl_private_key,
140 return json_private_key
142 def makeTrustedCertificate(certList, count):
144 json_cert_format = ""
145 for cert in certList:
146 cert_name = "xNF_CA_certificate_%d_%d" %(count, number)
147 json_cert_format += '{{\"name\": \"{trusted_name}\",\"certificate\":\"{cert}\"}},\n'.format(
148 trusted_name=cert_name,
152 json_cert_format = json_cert_format.rsplit(',', 1)[0]
153 json_trusted_cert='{{\"input\": {{ \"trusted-certificate\": [{certificates}]}}}}'.format(
154 certificates=json_cert_format)
155 return json_trusted_cert
158 def makeRestconfPost(conn, json_file, apiCall):
159 req = conn.request("POST", apiCall, json_file, headers=headers)
160 res = conn.getresponse()
162 if res.status != 200 and res.status != 204:
163 logging.error("Error here, response back wasnt 200: Response was : %d , %s" % (res.status, res.reason))
164 writeCertInstallStatus("NOTOK")
166 logging.debug("Response :%s Reason :%s ",res.status, res.reason)
168 def extractZipFiles(zipFileList, count):
169 for zipFolder in zipFileList:
171 with zipfile.ZipFile(Path + "/" + zipFolder.strip(),"r") as zip_ref:
172 zip_ref.extractall(Path)
173 folder = zipFolder.rsplit(".")[0]
174 processFiles(folder, count)
175 except Exception as e:
176 logging.error("Error while extracting zip file(s). Exiting Certificate Installation.")
177 logging.info("Error details : %s" % e)
178 writeCertInstallStatus("NOTOK")
180 def processFiles(folder, count):
181 logging.info('Process folder: %d %s', count, folder)
182 for file in os.listdir(Path + "/" + folder):
183 if os.path.isfile(Path + "/" + folder + "/" + file.strip()):
185 clientKey = readFile(folder, file.strip())
186 elif "trustedCertificate" in file:
187 certList = readTrustedCertificate(Path + "/" + folder, file.strip())
189 clientCrt = readFile(folder, file.strip())
191 logging.error("Could not find file %s" % file.strip())
192 writeCertInstallStatus("NOTOK")
193 shutil.rmtree(Path + "/" + folder)
194 post_content(clientKey, clientCrt, certList, count)
196 def post_content(clientKey, clientCrt, certList, count):
197 logging.info('Post content: %d', count)
198 conn = http.client.HTTPConnection("localhost",odl_port)
201 json_keystore_key = makeKeystoreKey(clientKey, count)
202 logging.debug("Posting private key in to ODL keystore")
203 makeRestconfPost(conn, json_keystore_key, postKeystore)
206 json_trusted_cert = makeTrustedCertificate(certList, count)
207 logging.debug("Posting trusted cert list in to ODL")
208 makeRestconfPost(conn, json_trusted_cert, postTrustedCertificate)
210 if clientKey and clientCrt and certList:
211 json_private_key = makePrivateKey(clientKey, clientCrt, certList, count)
212 logging.debug("Posting the cert in to ODL")
213 makeRestconfPost(conn, json_private_key, postPrivateKey)
216 def makeHealthcheckCall(headers, timePassed):
218 # WAIT 10 minutes maximum and test every 30 seconds if HealthCheck API is returning 200
219 while timePassed < TIMEOUT:
221 conn = http.client.HTTPConnection("localhost",odl_port)
222 req = conn.request(certreadyCmd, certreadyUrl,headers=headers)
223 res = conn.getresponse()
225 httpStatus = res.status
226 if httpStatus == 200:
227 logging.debug("Healthcheck Passed in %d seconds." %timePassed)
231 logging.debug("Sleep: %d seconds before testing if Healthcheck worked. Total wait time up now is: %d seconds. Timeout is: %d seconds. Problem code was: %d" %(INTERVAL, timePassed, TIMEOUT, httpStatus))
233 logging.error("Cannot execute REST call. Sleep: %d seconds before testing if Healthcheck worked. Total wait time up now is: %d seconds. Timeout is: %d seconds." %(INTERVAL, timePassed, TIMEOUT))
234 timePassed = timeIncrement(timePassed)
236 if timePassed > TIMEOUT:
237 logging.error("TIME OUT: Healthcheck not passed in %d seconds... Could cause problems for testing activities..." %TIMEOUT)
238 writeCertInstallStatus("NOTOK")
243 def timeIncrement(timePassed):
245 timePassed = timePassed + INTERVAL
249 def get_pass(file_name):
251 with open(file_name, 'r') as file_obj:
252 password = file_obj.read().strip()
253 return "'{}'".format(password)
254 except Exception as e:
255 logging.error("Error occurred while fetching password : %s", e)
256 writeCertInstallStatus("NOTOK")
259 for file in os.listdir(Path):
260 if os.path.isfile(Path + '/' + file):
261 logging.debug("Cleaning up the file %s", Path + '/'+ file)
262 os.remove(Path + '/'+ file)
265 def jks_to_p12(file, password):
266 """Converts jks format into p12"""
271 if (file.endswith('.jks')):
272 p12_file = file.replace('.jks', '.p12')
273 jks_cmd = 'keytool -importkeystore -srckeystore {src_file} -destkeystore {dest_file} -srcstoretype JKS -srcstorepass {src_pass} -deststoretype PKCS12 -deststorepass {dest_pass}'.format(src_file=file, dest_file=p12_file, src_pass=password, dest_pass=password)
274 logging.debug("Converting %s into p12 format", file)
278 except Exception as e:
279 logging.error("Error occurred while converting jks to p12 format : %s", e)
280 writeCertInstallStatus("NOTOK")
283 def make_cert_chain(cert_chain, pattern):
286 matches = re.findall(pattern, cert_chain, re.DOTALL | re.MULTILINE)
288 cert_list.append(cert.strip())
291 logging.debug(" Certificate Chain empty: %s " % cert_chain)
294 def process_jks_files(count):
296 logging.info("Processing JKS files found in %s directory " % Path)
298 if all([os.path.isfile(f) for f in jks_files]):
299 keystore_pass = get_pass(keystore_pass_file)
300 keystore_file_p12 = jks_to_p12(keystore_file, keystore_pass)
302 client_key_cmd = 'openssl pkcs12 -in {src_file} -nocerts -nodes -passin pass:{src_pass}'.format(
303 src_file=keystore_file_p12, src_pass=keystore_pass)
304 client_crt_cmd = 'openssl pkcs12 -in {src_file} -clcerts -nokeys -passin pass:{src_pass}'.format(
305 src_file=keystore_file_p12, src_pass=keystore_pass)
307 truststore_pass = get_pass(truststore_pass_file)
308 truststore_p12 = jks_to_p12(truststore_file, truststore_pass)
310 trust_cert_cmd = 'openssl pkcs12 -in {src_file} -cacerts -nokeys -passin pass:{src_pass} '.format(
311 src_file=truststore_p12, src_pass=truststore_pass)
313 key_pattern = r'(?<=-----BEGIN PRIVATE KEY-----).*?(?=-----END PRIVATE KEY-----)'
314 client_key = subprocess.check_output(client_key_cmd, shell=True)
316 client_key = make_cert_chain(client_key, key_pattern)[0]
317 logging.debug("Key Ok")
319 cert_pattern = r'(?<=-----BEGIN CERTIFICATE-----).*?(?=-----END CERTIFICATE-----)'
320 client_cert = subprocess.check_output(client_crt_cmd, shell=True)
322 client_cert = make_cert_chain(client_cert, cert_pattern)[0]
323 logging.debug("Client Cert Ok")
325 ca_cert = subprocess.check_output(trust_cert_cmd, shell=True)
327 ca_cert_list = make_cert_chain(ca_cert, cert_pattern)
328 logging.debug("CA Cert Ok")
330 if client_key and client_cert and ca_cert:
331 post_content(client_key, client_cert, ca_cert_list, count)
333 logging.debug("No JKS files found in %s directory" % Path)
334 except subprocess.CalledProcessError as err:
335 print("CalledProcessError Execution of OpenSSL command failed: %s" % err)
336 writeCertInstallStatus("NOTOK")
337 except Exception as e:
338 logging.error("UnExpected Error while processing JKS files at {0}, Caused by: {1}".format(Path, e))
339 writeCertInstallStatus("NOTOK")
341 def replaceAdminPassword(username, password, newpassword):
342 if newpassword is None:
343 logging.info('Not to replace password for user %s', username)
345 logging.info('Replace password for user %s', username)
347 jsondata = '{\"password\": \"{newpassword}\"}'.format(newpassword=newpassword)
348 url = '/auth/v1/users/{username}@sdn'.format(username=username)
349 loggin.info("Url %s data $s", url, jsondata)
350 conn = http.client.HTTPConnection("localhost",odl_port)
351 req = conn.request("PUT", url, jsondata, headers=headers)
352 res = conn.getresponse()
354 httpStatus = res.status
355 if httpStatus == 200:
356 logging.debug("New password provided successfully for user %s", username)
358 logging.debug("Password change was not possible. Problem code was: %d", httpStatus)
360 logging.error("Cannot execute REST call to set password.")
361 writeCertInstallStatus("NOTOK")
364 def readCertProperties():
366 This function searches for manually copied zip file
367 containing certificates. This is required as part
368 of backward compatibility.
369 If not foud, it searches for jks certificates.
371 connected = makeHealthcheckCall(headers, timePassed)
372 logging.info('Connected status: %s', connected)
374 replaceAdminPassword(username, password, newpassword)
376 if os.path.isfile(Path + "/certs.properties"):
377 with open(Path + "/certs.properties", "r") as f:
379 if not "*****" in line:
380 zipFileList.append(line)
382 extractZipFiles(zipFileList, count)
386 logging.debug("No certs.properties/zip files exist at: " + Path)
387 logging.info("Processing any available jks/p12 files under cert directory")
388 process_jks_files(count)
390 logging.info('Connected status: %s', connected)
391 logging.info('Stopping SDNR due to inability to install certificates')
392 writeCertInstallStatus("NOTOK")
394 def writeCertInstallStatus(installStatus):
395 if installStatus == "NOTOK":
396 with open(os.path.join(log_directory, 'INSTALLCERTSFAIL'), 'w') as fp:
399 elif installStatus == "OK":
400 with open(os.path.join(log_directory, 'INSTALLCERTSPASS'), 'w') as fp:
405 logging.info('Cert installation ending')
406 writeCertInstallStatus("OK")