1 # ============LICENSE_START=======================================================
2 # Copyright (C) 2020 Nordix Foundation.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 # SPDX-License-Identifier: Apache-2.0
17 # ============LICENSE_END=========================================================
19 __author__ = "Ajay Deep Singh (ajay.deep.singh@est.tech)"
20 __copyright__ = "Copyright (C) 2020 Nordix Foundation"
21 __license__ = "Apache 2.0"
29 from OpenSSL import crypto
30 from docker.types import Mount
32 DEV_NULL = open(os.devnull, 'wb')
33 NETCONF_PNP_SIM_CONTAINER_NAME = 'netconf-simulator'
34 ARCHIVES_PATH = os.getenv("WORKSPACE") + "/archives/"
39 def __init__(self, mount_path, truststore_path):
40 self.mount_path = mount_path
41 self.truststore_path = truststore_path
42 self.caCertPem = mount_path + '/ca.pem'
43 self.serverKeyPem = mount_path + '/server_key.pem'
44 self.serverCertPem = mount_path + '/server_cert.pem'
45 self.keystoreJksPath = mount_path + '/keystore.jks'
46 self.keystoreP12Path = mount_path + '/keystore.p12'
47 self.keystorePassPath = mount_path + '/keystore.pass'
48 self.truststoreJksPath = mount_path + '/truststore.jks'
49 self.truststoreP12Path = mount_path + '/truststore.p12'
50 self.truststorePassPath = mount_path + '/truststore.pass'
52 # Function Create docker container.
53 def run_client_container(self, client_image, container_name, path_to_env, request_url, network):
54 self.create_mount_dir()
55 client = docker.from_env()
56 environment = self.read_env_list_from_file(path_to_env)
57 environment.append("REQUEST_URL=" + request_url)
58 container = client.containers.run(
61 environment=environment,
64 mounts=[Mount(target='/var/certs', source=self.mount_path, type='bind'),
65 Mount(target='/etc/onap/aaf/certservice/certs/', source=self.truststore_path, type='bind')],
68 exitcode = container.wait()
71 # Function to validate keystore/truststore can be opened with generated pass-phrase.
72 def can_open_keystore_and_truststore_with_pass(self, container_name):
73 if container_name != NETCONF_PNP_SIM_CONTAINER_NAME:
74 return self.can_open_keystore_and_truststore_jks_files()
76 return self.can_open_keystore_and_truststore_p12_files()
78 # Function to validate keystore.jks/truststore.jks can be opened with generated pass-phrase.
79 def can_open_keystore_and_truststore_jks_files(self):
81 jks.KeyStore.load(self.keystoreJksPath, open(self.keystorePassPath, 'rb').read())
82 jks.KeyStore.load(self.truststoreJksPath, open(self.truststorePassPath, 'rb').read())
84 except Exception as e:
85 print("UnExpected Error in validating keystore.jks/truststore.jks: {0}".format(e))
88 # Function to validate keystore.p12/truststore.p12 can be opened with generated pass-phrase.
89 def can_open_keystore_and_truststore_p12_files(self):
90 can_open_keystore = self.can_open_p12_file_with_pass_file(self.keystorePassPath, self.keystoreP12Path)
91 can_open_truststore = self.can_open_p12_file_with_pass_file(self.truststorePassPath, self.truststoreP12Path)
92 return can_open_keystore & can_open_truststore
94 # Method for Uploading Certificate in SDNC-Container.
95 # Creating/Uploading Server-key, Server-cert, Ca-cert PEM files in Netconf-Pnp-Simulator.
96 def can_install_keystore_and_truststore_certs(self, cmd, container_name):
98 if container_name == NETCONF_PNP_SIM_CONTAINER_NAME:
99 print("Generating PEM files for {0} from P12 files".format(container_name))
100 continue_exec = self.create_pem(self.keystorePassPath, self.keystoreP12Path, self.truststorePassPath,
101 self.truststoreP12Path)
103 print("Initiate Configuration Push for : {0}".format(container_name))
104 resp_code = self.execute_bash_config(cmd, container_name)
106 print("Execution Successful for: {0}".format(container_name))
109 print("Execution Failed for: {0}".format(container_name))
112 def create_pem(self, keystore_pass_path, keystore_p12_path, truststore_pass_path, truststore_p12_path):
113 # Create [server_key.pem, server_cert.pem, ca.pem] files for Netconf-Pnp-Simulation/TLS Configuration.
115 with open(self.serverKeyPem, "wb+") as key_file:
116 key_file.write(crypto.dump_privatekey(crypto.FILETYPE_PEM,
117 self.get_pkcs12(keystore_pass_path,
118 keystore_p12_path).get_privatekey()))
119 with open(self.serverCertPem, "wb+") as server_cert_file:
120 server_cert_file.write(crypto.dump_certificate(crypto.FILETYPE_PEM,
121 self.get_pkcs12(keystore_pass_path,
122 keystore_p12_path).get_certificate()))
123 with open(self.caCertPem, "wb+") as ca_cert_file:
125 crypto.dump_certificate(crypto.FILETYPE_PEM,
126 self.get_pkcs12(truststore_pass_path,
127 truststore_p12_path).get_ca_certificates()[0]))
129 except IOError as err:
130 print("I/O Error: {0}".format(err))
132 except Exception as e:
133 print("UnExpected Error: {0}".format(e))
136 def can_open_p12_file_with_pass_file(self, pass_file_path, p12_file_path):
138 if p12_file_path.split('/')[-1] == 'truststore.p12':
139 pkcs12 = self.get_pkcs12(pass_file_path, p12_file_path).get_ca_certificates()[0]
141 pkcs12 = self.get_pkcs12(pass_file_path, p12_file_path).get_certificate()
145 except IOError as err:
146 print("I/O Error PKCS12 Creation failed: {0}".format(err))
148 except Exception as e:
149 print("UnExpected Error PKCS12 Creation failed: {0}".format(e))
152 def remove_client_container_and_save_logs(self, container_name, log_file_name):
153 client = docker.from_env()
154 container = client.containers.get(container_name)
155 text_file = open(ARCHIVES_PATH + container_name + '_' + log_file_name + ".log", "w")
156 text_file.write(container.logs())
159 self.remove_mount_dir()
161 def create_mount_dir(self):
162 if not os.path.exists(self.mount_path):
163 os.makedirs(self.mount_path)
165 def remove_mount_dir(self):
166 shutil.rmtree(self.mount_path)
169 def get_pkcs12(pass_file_path, p12_file_path):
171 password = open(pass_file_path, 'rb').read()
172 return crypto.load_pkcs12(open(p12_file_path, 'rb').read(), password)
175 def execute_bash_config(cmd, container_name):
176 # Run command with arguments. Wait for command to complete or timeout, return code attribute.
178 resp_code = subprocess.call(["%s %s" % (cmd, container_name)], shell=True, stdout=DEV_NULL,
179 stderr=subprocess.STDOUT)
180 print("Response Code from Config.sh execution: {0}".format(resp_code))
182 except subprocess.CalledProcessError as e:
183 print("CalledProcessError Certificate installation failed in SDNC-ODL Container: {0}".format(e))
184 return 1 # Return Error Code
187 def get_container_logs(container_name):
188 client = docker.from_env()
189 container = client.containers.get(container_name)
190 logs = container.logs()
194 def read_env_list_from_file(path):