X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=tests%2Fsdnc%2Fsdnc_netconf_tls_post_deploy%2Flibraries%2FClientManager.py;h=b1c024ff49e6997aabbbcaa3b2766fd2854359e9;hb=de929b31c7eaeb5f8769028e4e80f265afadef39;hp=ceff974246fc23bbeb21cb2e216514ff6d6f2d0f;hpb=59ece202f8ac9d84aae4bbe17defd9d374b513b8;p=integration%2Fcsit.git diff --git a/tests/sdnc/sdnc_netconf_tls_post_deploy/libraries/ClientManager.py b/tests/sdnc/sdnc_netconf_tls_post_deploy/libraries/ClientManager.py index ceff9742..b1c024ff 100644 --- a/tests/sdnc/sdnc_netconf_tls_post_deploy/libraries/ClientManager.py +++ b/tests/sdnc/sdnc_netconf_tls_post_deploy/libraries/ClientManager.py @@ -38,12 +38,15 @@ class ClientManager: def __init__(self, mount_path, truststore_path): self.mount_path = mount_path self.truststore_path = truststore_path + self.keyPem = mount_path + '/key.pem' self.caCertPem = mount_path + '/ca.pem' self.serverKeyPem = mount_path + '/server_key.pem' self.serverCertPem = mount_path + '/server_cert.pem' - self.keystoreJksPath = mount_path + '/keystore.jks' + self.keystorePemPath = mount_path + '/keystore.pem' + self.keystoreP12Path = mount_path + '/keystore.p12' self.keystorePassPath = mount_path + '/keystore.pass' - self.truststoreJksPath = mount_path + '/truststore.jks' + self.truststorePemPath = mount_path + '/truststore.pem' + self.truststoreP12Path = mount_path + '/truststore.p12' self.truststorePassPath = mount_path + '/truststore.pass' # Function Create docker container. @@ -65,20 +68,40 @@ class ClientManager: exitcode = container.wait() return exitcode - # Function to validate keystore.jks/truststore.jks can be opened with generated pass-phrase. - def can_open_keystore_and_truststore_with_pass(self): - can_open_keystore = self.can_open_jks_file_with_pass_file(self.keystorePassPath, self.keystoreJksPath) - can_open_truststore = self.can_open_jks_file_with_pass_file(self.truststorePassPath, self.truststoreJksPath) + # Function to validate keystore/truststore can be opened with generated pass-phrase. + def can_open_keystore_and_truststore_with_pass(self, container_name): + if container_name != NETCONF_PNP_SIM_CONTAINER_NAME: + return self.can_open_keystore_and_truststore_pem_files() + else: + return self.can_open_keystore_and_truststore_p12_files() + + # Function to validate keystore.pem/truststore.pem exist and are not empty. + def can_open_keystore_and_truststore_pem_files(self): + try: + private_key = self.file_exist_and_not_empty(self.keyPem) + keystore_pem = self.file_exist_and_not_empty(self.keystorePemPath) + truststore_pem = self.file_exist_and_not_empty(self.truststorePemPath) + return private_key and keystore_pem and truststore_pem + except Exception as e: + print("UnExpected Error in validating keystore.pem/truststore.pem: {0}".format(e)) + return False + + # Function to validate keystore.p12/truststore.p12 can be opened with generated pass-phrase. + def can_open_keystore_and_truststore_p12_files(self): + can_open_keystore = self.can_open_p12_file_with_pass_file(self.keystorePassPath, self.keystoreP12Path) + can_open_truststore = self.can_open_p12_file_with_pass_file(self.truststorePassPath, self.truststoreP12Path) return can_open_keystore & can_open_truststore # Method for Uploading Certificate in SDNC-Container. # Creating/Uploading Server-key, Server-cert, Ca-cert PEM files in Netconf-Pnp-Simulator. - def can_install_keystore_and_truststore_certs(self, cmd, container_name): + def can_install_keystore_and_truststore_certs(self, cmd, cmd_tls, container_name): continue_exec = True if container_name == NETCONF_PNP_SIM_CONTAINER_NAME: - print("Generating PEM files for {0} from JKS files".format(container_name)) - continue_exec = self.create_pem(self.keystorePassPath, self.keystoreJksPath, self.truststorePassPath, - self.truststoreJksPath) + print("Generating PEM files for {0} from P12 files".format(container_name)) + continue_exec = self.create_pem(self.keystorePassPath, self.keystoreP12Path, self.truststorePassPath, + self.truststoreP12Path) + else: + cmd = cmd_tls if continue_exec: print("Initiate Configuration Push for : {0}".format(container_name)) resp_code = self.execute_bash_config(cmd, container_name) @@ -89,19 +112,22 @@ class ClientManager: print("Execution Failed for: {0}".format(container_name)) return False - def create_pem(self, keystore_pass_file_path, keystore_jks_file_path, truststore_pass_file_path, - truststore_jks_file_path): + def create_pem(self, keystore_pass_path, keystore_p12_path, truststore_pass_path, truststore_p12_path): # Create [server_key.pem, server_cert.pem, ca.pem] files for Netconf-Pnp-Simulation/TLS Configuration. try: - keystore_p12 = self.get_pkcs12(keystore_pass_file_path, keystore_jks_file_path) - truststore_p12 = self.get_pkcs12(truststore_pass_file_path, truststore_jks_file_path) with open(self.serverKeyPem, "wb+") as key_file: - key_file.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, keystore_p12.get_privatekey())) + key_file.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, + self.get_pkcs12(keystore_pass_path, + keystore_p12_path).get_privatekey())) with open(self.serverCertPem, "wb+") as server_cert_file: - server_cert_file.write(crypto.dump_certificate(crypto.FILETYPE_PEM, keystore_p12.get_certificate())) + server_cert_file.write(crypto.dump_certificate(crypto.FILETYPE_PEM, + self.get_pkcs12(keystore_pass_path, + keystore_p12_path).get_certificate())) with open(self.caCertPem, "wb+") as ca_cert_file: ca_cert_file.write( - crypto.dump_certificate(crypto.FILETYPE_PEM, truststore_p12.get_ca_certificates()[0])) + crypto.dump_certificate(crypto.FILETYPE_PEM, + self.get_pkcs12(truststore_pass_path, + truststore_p12_path).get_ca_certificates()[0])) return True except IOError as err: print("I/O Error: {0}".format(err)) @@ -110,12 +136,12 @@ class ClientManager: print("UnExpected Error: {0}".format(e)) return False - def can_open_jks_file_with_pass_file(self, pass_file_path, jks_file_path): + def can_open_p12_file_with_pass_file(self, pass_file_path, p12_file_path): try: - if jks_file_path.split('/')[-1] == 'truststore.jks': - pkcs12 = self.get_pkcs12(pass_file_path, jks_file_path).get_ca_certificates()[0] + if p12_file_path.split('/')[-1] == 'truststore.p12': + pkcs12 = self.get_pkcs12(pass_file_path, p12_file_path).get_ca_certificates()[0] else: - pkcs12 = self.get_pkcs12(pass_file_path, jks_file_path).get_certificate() + pkcs12 = self.get_pkcs12(pass_file_path, p12_file_path).get_certificate() if pkcs12 is None: return False return True @@ -142,12 +168,14 @@ class ClientManager: def remove_mount_dir(self): shutil.rmtree(self.mount_path) + def file_exist_and_not_empty(self, path_to_file): + return os.path.isfile(path_to_file) and os.path.getsize(path_to_file) > 0 + @staticmethod - def get_pkcs12(pass_file_path, jks_file_path): + def get_pkcs12(pass_file_path, p12_file_path): # Load PKCS12 Object password = open(pass_file_path, 'rb').read() - p12 = crypto.load_pkcs12(open(jks_file_path, 'rb').read(), password) - return p12 + return crypto.load_pkcs12(open(p12_file_path, 'rb').read(), password) @staticmethod def execute_bash_config(cmd, container_name):