Remove SDNC CSIT tests
[integration/csit.git] / tests / sdnc / sdnc_netconf_tls_post_deploy / libraries / ClientManager.py
diff --git a/tests/sdnc/sdnc_netconf_tls_post_deploy/libraries/ClientManager.py b/tests/sdnc/sdnc_netconf_tls_post_deploy/libraries/ClientManager.py
deleted file mode 100644 (file)
index b1c024f..0000000
+++ /dev/null
@@ -1,207 +0,0 @@
-# ============LICENSE_START=======================================================
-#  Copyright (C) 2020 Nordix Foundation.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# SPDX-License-Identifier: Apache-2.0
-# ============LICENSE_END=========================================================
-
-__author__ = "Ajay Deep Singh (ajay.deep.singh@est.tech)"
-__copyright__ = "Copyright (C) 2020 Nordix Foundation"
-__license__ = "Apache 2.0"
-
-import os
-import shutil
-import subprocess
-
-import docker
-from OpenSSL import crypto
-from docker.types import Mount
-
-DEV_NULL = open(os.devnull, 'wb')
-NETCONF_PNP_SIM_CONTAINER_NAME = 'netconf-simulator'
-ARCHIVES_PATH = os.getenv("WORKSPACE") + "/archives/"
-
-
-class ClientManager:
-
-    def __init__(self, mount_path, truststore_path):
-        self.mount_path = mount_path
-        self.truststore_path = truststore_path
-        self.keyPem = mount_path + '/key.pem'
-        self.caCertPem = mount_path + '/ca.pem'
-        self.serverKeyPem = mount_path + '/server_key.pem'
-        self.serverCertPem = mount_path + '/server_cert.pem'
-        self.keystorePemPath = mount_path + '/keystore.pem'
-        self.keystoreP12Path = mount_path + '/keystore.p12'
-        self.keystorePassPath = mount_path + '/keystore.pass'
-        self.truststorePemPath = mount_path + '/truststore.pem'
-        self.truststoreP12Path = mount_path + '/truststore.p12'
-        self.truststorePassPath = mount_path + '/truststore.pass'
-
-    # Function Create docker container.
-    def run_client_container(self, client_image, container_name, path_to_env, request_url, network):
-        self.create_mount_dir()
-        client = docker.from_env()
-        environment = self.read_env_list_from_file(path_to_env)
-        environment.append("REQUEST_URL=" + request_url)
-        container = client.containers.run(
-            image=client_image,
-            name=container_name,
-            environment=environment,
-            network=network,
-            user='root',
-            mounts=[Mount(target='/var/certs', source=self.mount_path, type='bind'),
-                    Mount(target='/etc/onap/aaf/certservice/certs/', source=self.truststore_path, type='bind')],
-            detach=True
-        )
-        exitcode = container.wait()
-        return exitcode
-
-    # Function to validate keystore/truststore can be opened with generated pass-phrase.
-    def can_open_keystore_and_truststore_with_pass(self, container_name):
-        if container_name != NETCONF_PNP_SIM_CONTAINER_NAME:
-            return self.can_open_keystore_and_truststore_pem_files()
-        else:
-            return self.can_open_keystore_and_truststore_p12_files()
-
-    # Function to validate keystore.pem/truststore.pem exist and are not empty.
-    def can_open_keystore_and_truststore_pem_files(self):
-        try:
-            private_key = self.file_exist_and_not_empty(self.keyPem)
-            keystore_pem = self.file_exist_and_not_empty(self.keystorePemPath)
-            truststore_pem = self.file_exist_and_not_empty(self.truststorePemPath)
-            return private_key and keystore_pem and truststore_pem
-        except Exception as e:
-            print("UnExpected Error in validating keystore.pem/truststore.pem: {0}".format(e))
-            return False
-
-    # Function to validate keystore.p12/truststore.p12 can be opened with generated pass-phrase.
-    def can_open_keystore_and_truststore_p12_files(self):
-        can_open_keystore = self.can_open_p12_file_with_pass_file(self.keystorePassPath, self.keystoreP12Path)
-        can_open_truststore = self.can_open_p12_file_with_pass_file(self.truststorePassPath, self.truststoreP12Path)
-        return can_open_keystore & can_open_truststore
-
-    # Method for Uploading Certificate in SDNC-Container.
-    # Creating/Uploading Server-key, Server-cert, Ca-cert PEM files in Netconf-Pnp-Simulator.
-    def can_install_keystore_and_truststore_certs(self, cmd, cmd_tls, container_name):
-        continue_exec = True
-        if container_name == NETCONF_PNP_SIM_CONTAINER_NAME:
-            print("Generating PEM files for {0} from P12 files".format(container_name))
-            continue_exec = self.create_pem(self.keystorePassPath, self.keystoreP12Path, self.truststorePassPath,
-                                            self.truststoreP12Path)
-        else:
-            cmd = cmd_tls
-        if continue_exec:
-            print("Initiate Configuration Push for : {0}".format(container_name))
-            resp_code = self.execute_bash_config(cmd, container_name)
-            if resp_code == 0:
-                print("Execution Successful for: {0}".format(container_name))
-                return True
-            else:
-                print("Execution Failed for: {0}".format(container_name))
-                return False
-
-    def create_pem(self, keystore_pass_path, keystore_p12_path, truststore_pass_path, truststore_p12_path):
-        # Create [server_key.pem, server_cert.pem, ca.pem] files for Netconf-Pnp-Simulation/TLS Configuration.
-        try:
-            with open(self.serverKeyPem, "wb+") as key_file:
-                key_file.write(crypto.dump_privatekey(crypto.FILETYPE_PEM,
-                                                      self.get_pkcs12(keystore_pass_path,
-                                                                      keystore_p12_path).get_privatekey()))
-            with open(self.serverCertPem, "wb+") as server_cert_file:
-                server_cert_file.write(crypto.dump_certificate(crypto.FILETYPE_PEM,
-                                                               self.get_pkcs12(keystore_pass_path,
-                                                                               keystore_p12_path).get_certificate()))
-            with open(self.caCertPem, "wb+") as ca_cert_file:
-                ca_cert_file.write(
-                    crypto.dump_certificate(crypto.FILETYPE_PEM,
-                                            self.get_pkcs12(truststore_pass_path,
-                                                            truststore_p12_path).get_ca_certificates()[0]))
-            return True
-        except IOError as err:
-            print("I/O Error: {0}".format(err))
-            return False
-        except Exception as e:
-            print("UnExpected Error: {0}".format(e))
-            return False
-
-    def can_open_p12_file_with_pass_file(self, pass_file_path, p12_file_path):
-        try:
-            if p12_file_path.split('/')[-1] == 'truststore.p12':
-                pkcs12 = self.get_pkcs12(pass_file_path, p12_file_path).get_ca_certificates()[0]
-            else:
-                pkcs12 = self.get_pkcs12(pass_file_path, p12_file_path).get_certificate()
-            if pkcs12 is None:
-                return False
-            return True
-        except IOError as err:
-            print("I/O Error PKCS12 Creation failed: {0}".format(err))
-            return False
-        except Exception as e:
-            print("UnExpected Error PKCS12 Creation failed: {0}".format(e))
-            return False
-
-    def remove_client_container_and_save_logs(self, container_name, log_file_name):
-        client = docker.from_env()
-        container = client.containers.get(container_name)
-        text_file = open(ARCHIVES_PATH + container_name + '_' + log_file_name + ".log", "w")
-        text_file.write(container.logs())
-        text_file.close()
-        container.remove()
-        self.remove_mount_dir()
-
-    def create_mount_dir(self):
-        if not os.path.exists(self.mount_path):
-            os.makedirs(self.mount_path)
-
-    def remove_mount_dir(self):
-        shutil.rmtree(self.mount_path)
-
-    def file_exist_and_not_empty(self, path_to_file):
-        return os.path.isfile(path_to_file) and os.path.getsize(path_to_file) > 0
-
-    @staticmethod
-    def get_pkcs12(pass_file_path, p12_file_path):
-        # Load PKCS12 Object
-        password = open(pass_file_path, 'rb').read()
-        return crypto.load_pkcs12(open(p12_file_path, 'rb').read(), password)
-
-    @staticmethod
-    def execute_bash_config(cmd, container_name):
-        # Run command with arguments. Wait for command to complete or timeout, return code attribute.
-        try:
-            resp_code = subprocess.call(["%s %s" % (cmd, container_name)], shell=True, stdout=DEV_NULL,
-                                        stderr=subprocess.STDOUT)
-            print("Response Code from Config.sh execution: {0}".format(resp_code))
-            return resp_code
-        except subprocess.CalledProcessError as e:
-            print("CalledProcessError Certificate installation failed in SDNC-ODL Container: {0}".format(e))
-            return 1  # Return Error Code
-
-    @staticmethod
-    def get_container_logs(container_name):
-        client = docker.from_env()
-        container = client.containers.get(container_name)
-        logs = container.logs()
-        return logs
-
-    @staticmethod
-    def read_env_list_from_file(path):
-        f = open(path, "r")
-        r_list = []
-        for line in f:
-            line = line.strip()
-            if line[0] != "#":
-                r_list.append(line)
-        return r_list