b2399dfba0c40c78a1c60df33dff15ef4b0e7b26
[integration/csit.git] / tests / sdnc / sdnc_netconf_tls_post_deploy / libraries / ClientManager.py
1 # ============LICENSE_START=======================================================
2 #  Copyright (C) 2020 Nordix Foundation.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # SPDX-License-Identifier: Apache-2.0
17 # ============LICENSE_END=========================================================
18
19 __author__ = "Ajay Deep Singh (ajay.deep.singh@est.tech)"
20 __copyright__ = "Copyright (C) 2020 Nordix Foundation"
21 __license__ = "Apache 2.0"
22
23 import os
24 import shutil
25 import subprocess
26
27 import docker
28 import jks
29 from OpenSSL import crypto
30 from docker.types import Mount
31
32 DEV_NULL = open(os.devnull, 'wb')
33 NETCONF_PNP_SIM_CONTAINER_NAME = 'netconf-simulator'
34 ARCHIVES_PATH = os.getenv("WORKSPACE") + "/archives/"
35
36
37 class ClientManager:
38
39     def __init__(self, mount_path, truststore_path):
40         self.mount_path = mount_path
41         self.truststore_path = truststore_path
42         self.caCertPem = mount_path + '/ca.pem'
43         self.serverKeyPem = mount_path + '/server_key.pem'
44         self.serverCertPem = mount_path + '/server_cert.pem'
45         self.keystoreJksPath = mount_path + '/keystore.jks'
46         self.keystoreP12Path = mount_path + '/keystore.p12'
47         self.keystorePassPath = mount_path + '/keystore.pass'
48         self.truststoreJksPath = mount_path + '/truststore.jks'
49         self.truststoreP12Path = mount_path + '/truststore.p12'
50         self.truststorePassPath = mount_path + '/truststore.pass'
51
52     # Function Create docker container.
53     def run_client_container(self, client_image, container_name, path_to_env, request_url, network):
54         self.create_mount_dir()
55         client = docker.from_env()
56         environment = self.read_env_list_from_file(path_to_env)
57         environment.append("REQUEST_URL=" + request_url)
58         container = client.containers.run(
59             image=client_image,
60             name=container_name,
61             environment=environment,
62             network=network,
63             user='root',
64             mounts=[Mount(target='/var/certs', source=self.mount_path, type='bind'),
65                     Mount(target='/etc/onap/aaf/certservice/certs/', source=self.truststore_path, type='bind')],
66             detach=True
67         )
68         exitcode = container.wait()
69         return exitcode
70
71     # Function to validate keystore/truststore can be opened with generated pass-phrase.
72     def can_open_keystore_and_truststore_with_pass(self, container_name):
73         if container_name != NETCONF_PNP_SIM_CONTAINER_NAME:
74             return self.can_open_keystore_and_truststore_jks_files()
75         else:
76             return self.can_open_keystore_and_truststore_p12_files()
77
78     # Function to validate keystore.jks/truststore.jks can be opened with generated pass-phrase.
79     def can_open_keystore_and_truststore_jks_files(self):
80         try:
81             jks.KeyStore.load(self.keystoreJksPath, open(self.keystorePassPath, 'rb').read())
82             jks.KeyStore.load(self.truststoreJksPath, open(self.truststorePassPath, 'rb').read())
83             return True
84         except Exception as e:
85             print("UnExpected Error in validating keystore.jks/truststore.jks: {0}".format(e))
86             return False
87
88     # Function to validate keystore.p12/truststore.p12 can be opened with generated pass-phrase.
89     def can_open_keystore_and_truststore_p12_files(self):
90         can_open_keystore = self.can_open_p12_file_with_pass_file(self.keystorePassPath, self.keystoreP12Path)
91         can_open_truststore = self.can_open_p12_file_with_pass_file(self.truststorePassPath, self.truststoreP12Path)
92         return can_open_keystore & can_open_truststore
93
94     # Method for Uploading Certificate in SDNC-Container.
95     # Creating/Uploading Server-key, Server-cert, Ca-cert PEM files in Netconf-Pnp-Simulator.
96     def can_install_keystore_and_truststore_certs(self, cmd, container_name):
97         continue_exec = True
98         if container_name == NETCONF_PNP_SIM_CONTAINER_NAME:
99             print("Generating PEM files for {0} from P12 files".format(container_name))
100             continue_exec = self.create_pem(self.keystorePassPath, self.keystoreP12Path, self.truststorePassPath,
101                                             self.truststoreP12Path)
102         if continue_exec:
103             print("Initiate Configuration Push for : {0}".format(container_name))
104             resp_code = self.execute_bash_config(cmd, container_name)
105             if resp_code == 0:
106                 print("Execution Successful for: {0}".format(container_name))
107                 return True
108             else:
109                 print("Execution Failed for: {0}".format(container_name))
110                 return False
111
112     def create_pem(self, keystore_pass_path, keystore_p12_path, truststore_pass_path, truststore_p12_path):
113         # Create [server_key.pem, server_cert.pem, ca.pem] files for Netconf-Pnp-Simulation/TLS Configuration.
114         try:
115             with open(self.serverKeyPem, "wb+") as key_file:
116                 key_file.write(crypto.dump_privatekey(crypto.FILETYPE_PEM,
117                                                       self.get_pkcs12(keystore_pass_path,
118                                                                       keystore_p12_path).get_privatekey()))
119             with open(self.serverCertPem, "wb+") as server_cert_file:
120                 server_cert_file.write(crypto.dump_certificate(crypto.FILETYPE_PEM,
121                                                                self.get_pkcs12(keystore_pass_path,
122                                                                                keystore_p12_path).get_certificate()))
123             with open(self.caCertPem, "wb+") as ca_cert_file:
124                 ca_cert_file.write(
125                     crypto.dump_certificate(crypto.FILETYPE_PEM,
126                                             self.get_pkcs12(truststore_pass_path,
127                                                             truststore_p12_path).get_ca_certificates()[0]))
128             return True
129         except IOError as err:
130             print("I/O Error: {0}".format(err))
131             return False
132         except Exception as e:
133             print("UnExpected Error: {0}".format(e))
134             return False
135
136     def can_open_p12_file_with_pass_file(self, pass_file_path, p12_file_path):
137         try:
138             if p12_file_path.split('/')[-1] == 'truststore.p12':
139                 pkcs12 = self.get_pkcs12(pass_file_path, p12_file_path).get_ca_certificates()[0]
140             else:
141                 pkcs12 = self.get_pkcs12(pass_file_path, p12_file_path).get_certificate()
142             if pkcs12 is None:
143                 return False
144             return True
145         except IOError as err:
146             print("I/O Error PKCS12 Creation failed: {0}".format(err))
147             return False
148         except Exception as e:
149             print("UnExpected Error PKCS12 Creation failed: {0}".format(e))
150             return False
151
152     def remove_client_container_and_save_logs(self, container_name, log_file_name):
153         client = docker.from_env()
154         container = client.containers.get(container_name)
155         text_file = open(ARCHIVES_PATH + container_name + '_' + log_file_name + ".log", "w")
156         text_file.write(container.logs())
157         text_file.close()
158         container.remove()
159         self.remove_mount_dir()
160
161     def create_mount_dir(self):
162         if not os.path.exists(self.mount_path):
163             os.makedirs(self.mount_path)
164
165     def remove_mount_dir(self):
166         shutil.rmtree(self.mount_path)
167
168     @staticmethod
169     def get_pkcs12(pass_file_path, p12_file_path):
170         # Load PKCS12 Object
171         password = open(pass_file_path, 'rb').read()
172         return crypto.load_pkcs12(open(p12_file_path, 'rb').read(), password)
173
174     @staticmethod
175     def execute_bash_config(cmd, container_name):
176         # Run command with arguments. Wait for command to complete or timeout, return code attribute.
177         try:
178             resp_code = subprocess.call(["%s %s" % (cmd, container_name)], shell=True, stdout=DEV_NULL,
179                                         stderr=subprocess.STDOUT)
180             print("Response Code from Config.sh execution: {0}".format(resp_code))
181             return resp_code
182         except subprocess.CalledProcessError as e:
183             print("CalledProcessError Certificate installation failed in SDNC-ODL Container: {0}".format(e))
184             return 1  # Return Error Code
185
186     @staticmethod
187     def get_container_logs(container_name):
188         client = docker.from_env()
189         container = client.containers.get(container_name)
190         logs = container.logs()
191         return logs
192
193     @staticmethod
194     def read_env_list_from_file(path):
195         f = open(path, "r")
196         r_list = []
197         for line in f:
198             line = line.strip()
199             if line[0] != "#":
200                 r_list.append(line)
201         return r_list