b1c024ff49e6997aabbbcaa3b2766fd2854359e9
[integration/csit.git] / tests / sdnc / sdnc_netconf_tls_post_deploy / libraries / ClientManager.py
1 # ============LICENSE_START=======================================================
2 #  Copyright (C) 2020 Nordix Foundation.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # SPDX-License-Identifier: Apache-2.0
17 # ============LICENSE_END=========================================================
18
19 __author__ = "Ajay Deep Singh (ajay.deep.singh@est.tech)"
20 __copyright__ = "Copyright (C) 2020 Nordix Foundation"
21 __license__ = "Apache 2.0"
22
23 import os
24 import shutil
25 import subprocess
26
27 import docker
28 from OpenSSL import crypto
29 from docker.types import Mount
30
31 DEV_NULL = open(os.devnull, 'wb')
32 NETCONF_PNP_SIM_CONTAINER_NAME = 'netconf-simulator'
33 ARCHIVES_PATH = os.getenv("WORKSPACE") + "/archives/"
34
35
36 class ClientManager:
37
38     def __init__(self, mount_path, truststore_path):
39         self.mount_path = mount_path
40         self.truststore_path = truststore_path
41         self.keyPem = mount_path + '/key.pem'
42         self.caCertPem = mount_path + '/ca.pem'
43         self.serverKeyPem = mount_path + '/server_key.pem'
44         self.serverCertPem = mount_path + '/server_cert.pem'
45         self.keystorePemPath = mount_path + '/keystore.pem'
46         self.keystoreP12Path = mount_path + '/keystore.p12'
47         self.keystorePassPath = mount_path + '/keystore.pass'
48         self.truststorePemPath = mount_path + '/truststore.pem'
49         self.truststoreP12Path = mount_path + '/truststore.p12'
50         self.truststorePassPath = mount_path + '/truststore.pass'
51
52     # Function Create docker container.
53     def run_client_container(self, client_image, container_name, path_to_env, request_url, network):
54         self.create_mount_dir()
55         client = docker.from_env()
56         environment = self.read_env_list_from_file(path_to_env)
57         environment.append("REQUEST_URL=" + request_url)
58         container = client.containers.run(
59             image=client_image,
60             name=container_name,
61             environment=environment,
62             network=network,
63             user='root',
64             mounts=[Mount(target='/var/certs', source=self.mount_path, type='bind'),
65                     Mount(target='/etc/onap/aaf/certservice/certs/', source=self.truststore_path, type='bind')],
66             detach=True
67         )
68         exitcode = container.wait()
69         return exitcode
70
71     # Function to validate keystore/truststore can be opened with generated pass-phrase.
72     def can_open_keystore_and_truststore_with_pass(self, container_name):
73         if container_name != NETCONF_PNP_SIM_CONTAINER_NAME:
74             return self.can_open_keystore_and_truststore_pem_files()
75         else:
76             return self.can_open_keystore_and_truststore_p12_files()
77
78     # Function to validate keystore.pem/truststore.pem exist and are not empty.
79     def can_open_keystore_and_truststore_pem_files(self):
80         try:
81             private_key = self.file_exist_and_not_empty(self.keyPem)
82             keystore_pem = self.file_exist_and_not_empty(self.keystorePemPath)
83             truststore_pem = self.file_exist_and_not_empty(self.truststorePemPath)
84             return private_key and keystore_pem and truststore_pem
85         except Exception as e:
86             print("UnExpected Error in validating keystore.pem/truststore.pem: {0}".format(e))
87             return False
88
89     # Function to validate keystore.p12/truststore.p12 can be opened with generated pass-phrase.
90     def can_open_keystore_and_truststore_p12_files(self):
91         can_open_keystore = self.can_open_p12_file_with_pass_file(self.keystorePassPath, self.keystoreP12Path)
92         can_open_truststore = self.can_open_p12_file_with_pass_file(self.truststorePassPath, self.truststoreP12Path)
93         return can_open_keystore & can_open_truststore
94
95     # Method for Uploading Certificate in SDNC-Container.
96     # Creating/Uploading Server-key, Server-cert, Ca-cert PEM files in Netconf-Pnp-Simulator.
97     def can_install_keystore_and_truststore_certs(self, cmd, cmd_tls, container_name):
98         continue_exec = True
99         if container_name == NETCONF_PNP_SIM_CONTAINER_NAME:
100             print("Generating PEM files for {0} from P12 files".format(container_name))
101             continue_exec = self.create_pem(self.keystorePassPath, self.keystoreP12Path, self.truststorePassPath,
102                                             self.truststoreP12Path)
103         else:
104             cmd = cmd_tls
105         if continue_exec:
106             print("Initiate Configuration Push for : {0}".format(container_name))
107             resp_code = self.execute_bash_config(cmd, container_name)
108             if resp_code == 0:
109                 print("Execution Successful for: {0}".format(container_name))
110                 return True
111             else:
112                 print("Execution Failed for: {0}".format(container_name))
113                 return False
114
115     def create_pem(self, keystore_pass_path, keystore_p12_path, truststore_pass_path, truststore_p12_path):
116         # Create [server_key.pem, server_cert.pem, ca.pem] files for Netconf-Pnp-Simulation/TLS Configuration.
117         try:
118             with open(self.serverKeyPem, "wb+") as key_file:
119                 key_file.write(crypto.dump_privatekey(crypto.FILETYPE_PEM,
120                                                       self.get_pkcs12(keystore_pass_path,
121                                                                       keystore_p12_path).get_privatekey()))
122             with open(self.serverCertPem, "wb+") as server_cert_file:
123                 server_cert_file.write(crypto.dump_certificate(crypto.FILETYPE_PEM,
124                                                                self.get_pkcs12(keystore_pass_path,
125                                                                                keystore_p12_path).get_certificate()))
126             with open(self.caCertPem, "wb+") as ca_cert_file:
127                 ca_cert_file.write(
128                     crypto.dump_certificate(crypto.FILETYPE_PEM,
129                                             self.get_pkcs12(truststore_pass_path,
130                                                             truststore_p12_path).get_ca_certificates()[0]))
131             return True
132         except IOError as err:
133             print("I/O Error: {0}".format(err))
134             return False
135         except Exception as e:
136             print("UnExpected Error: {0}".format(e))
137             return False
138
139     def can_open_p12_file_with_pass_file(self, pass_file_path, p12_file_path):
140         try:
141             if p12_file_path.split('/')[-1] == 'truststore.p12':
142                 pkcs12 = self.get_pkcs12(pass_file_path, p12_file_path).get_ca_certificates()[0]
143             else:
144                 pkcs12 = self.get_pkcs12(pass_file_path, p12_file_path).get_certificate()
145             if pkcs12 is None:
146                 return False
147             return True
148         except IOError as err:
149             print("I/O Error PKCS12 Creation failed: {0}".format(err))
150             return False
151         except Exception as e:
152             print("UnExpected Error PKCS12 Creation failed: {0}".format(e))
153             return False
154
155     def remove_client_container_and_save_logs(self, container_name, log_file_name):
156         client = docker.from_env()
157         container = client.containers.get(container_name)
158         text_file = open(ARCHIVES_PATH + container_name + '_' + log_file_name + ".log", "w")
159         text_file.write(container.logs())
160         text_file.close()
161         container.remove()
162         self.remove_mount_dir()
163
164     def create_mount_dir(self):
165         if not os.path.exists(self.mount_path):
166             os.makedirs(self.mount_path)
167
168     def remove_mount_dir(self):
169         shutil.rmtree(self.mount_path)
170
171     def file_exist_and_not_empty(self, path_to_file):
172         return os.path.isfile(path_to_file) and os.path.getsize(path_to_file) > 0
173
174     @staticmethod
175     def get_pkcs12(pass_file_path, p12_file_path):
176         # Load PKCS12 Object
177         password = open(pass_file_path, 'rb').read()
178         return crypto.load_pkcs12(open(p12_file_path, 'rb').read(), password)
179
180     @staticmethod
181     def execute_bash_config(cmd, container_name):
182         # Run command with arguments. Wait for command to complete or timeout, return code attribute.
183         try:
184             resp_code = subprocess.call(["%s %s" % (cmd, container_name)], shell=True, stdout=DEV_NULL,
185                                         stderr=subprocess.STDOUT)
186             print("Response Code from Config.sh execution: {0}".format(resp_code))
187             return resp_code
188         except subprocess.CalledProcessError as e:
189             print("CalledProcessError Certificate installation failed in SDNC-ODL Container: {0}".format(e))
190             return 1  # Return Error Code
191
192     @staticmethod
193     def get_container_logs(container_name):
194         client = docker.from_env()
195         container = client.containers.get(container_name)
196         logs = container.logs()
197         return logs
198
199     @staticmethod
200     def read_env_list_from_file(path):
201         f = open(path, "r")
202         r_list = []
203         for line in f:
204             line = line.strip()
205             if line[0] != "#":
206                 r_list.append(line)
207         return r_list