Move CSIT to integration/csit repo
[integration/csit.git] / tests / dcaegen2-collectors-hv-ves / testcases / libraries / VesHvContainersUtilsLibrary.py
1 from time import time
2
3 from robot.api import logger
4 import os.path
5 import docker
6 from io import BytesIO
7 from os.path import basename
8 from tarfile import TarFile, TarInfo
9
10 LOCALHOST = "localhost"
11
12
13 class VesHvContainersUtilsLibrary:
14
15     def get_consul_api_access_url(self, method, image_name, port):
16         return self.create_url(
17             method,
18             self.get_instance_address(image_name, port)
19         )
20
21     def get_xnf_sim_api_access_url(self, method, host):
22         if is_running_inside_docker():
23             return self.create_url(method, host)
24         else:
25             logger.info("File `/.dockerenv` not found. Assuming local environment and using localhost.")
26             port_from_container_name = str(host)[-4:]
27             return self.create_url(method, LOCALHOST + ":" + port_from_container_name)
28
29     def get_dcae_app_api_access_url(self, method, image_name, port):
30         return self.create_url(
31             method,
32             self.get_instance_address(image_name, port)
33         )
34
35     def get_instance_address(self, image_name, port):
36         if is_running_inside_docker():
37             return image_name + ":" + port
38         else:
39             logger.info("File `/.dockerenv` not found. Assuming local environment and using localhost.")
40             return LOCALHOST + ":" + port
41
42     def create_url(self, method, host_address):
43         return method + host_address
44
45 def is_running_inside_docker():
46     return os.path.isfile("/.dockerenv")
47
48 def copy_to_container(container_id, filepaths, path='/etc/ves-hv'):
49     with create_archive(filepaths) as archive:
50         docker.APIClient('unix:///var/run/docker.sock') \
51             .put_archive(container=container_id, path=(path), data=archive)
52
53
54 def create_archive(filepaths):
55     tarstream = BytesIO()
56     tarfile = TarFile(fileobj=tarstream, mode='w')
57     for filepath in filepaths:
58         file = open(filepath, 'r')
59         file_data = file.read()
60
61         tarinfo = TarInfo(name=basename(file.name))
62         tarinfo.size = len(file_data)
63         tarinfo.mtime = time()
64
65         tarfile.addfile(tarinfo, BytesIO(file_data))
66
67     tarfile.close()
68     tarstream.seek(0)
69     return tarstream