TLS sdc-be-init: truststore & keystore handling
[sdc.git] / catalog-be / src / main / resources / scripts / sdcBePy / common / normative / toscaTypes.py
1 import json
2 import zipfile
3 import os
4 import pycurl
5
6 from sdcBePy.common.errors import ResourceCreationError
7 from sdcBePy.common.logger import print_name_and_return_code, print_and_exit, log, debug
8 from sdcBePy.common.sdcBeProxy import SdcBeProxy
9
10
11 def process_and_create_normative_types(normative_type,
12                                        scheme=None, be_host=None, be_port=None, header=None,
13                                        tls_cert=None, tls_key=None, tls_key_pw=None, ca_cert=None, admin_user=None,
14                                        sdc_be_proxy=None,
15                                        update_version=False,
16                                        debug=False,
17                                        exit_on_success=False):
18     if sdc_be_proxy is None:
19         sdc_be_proxy = SdcBeProxy(be_host, be_port, header, scheme, tls_cert, tls_key, tls_key_pw, ca_cert, admin_user, debug=debug)
20
21     file_dir, normative_type_list = normative_type.get_parameters()
22
23     results = _create_normatives_type(file_dir, sdc_be_proxy, normative_type_list, update_version)
24     print_and_check_results(results, update_version, exit_on_success)
25
26
27 def print_and_check_results(results, update_version, exit_on_success=False):
28     if results is not None:
29         if len(results) == 0:
30             return
31         print("----------------------------------------")
32         for result in results:
33             print_name_and_return_code(result[0], result[1], with_line=False)
34         print("----------------------------------------")
35         check_results_and_exit(results, exit_on_success)
36     else:
37         raise ResourceCreationError("Results is none -> error occurred!!", 1)
38
39
40 def check_results_and_exit(results, exit_on_success):
41     if not _results_ok(results, _get_response_code()):
42         raise ResourceCreationError("Failed to create the normatives types !!", 1)
43     else:
44         if exit_on_success:
45             print_and_exit(0, "All normatives types created successfully!!")
46
47
48 def _create_normatives_type(file_dir, sdc_be_proxy, types, update_version):
49     results = []
50     response_codes = _get_response_code()
51     for normative_type in types:
52         if not os.path.exists(file_dir):
53             break
54         result = _send_request(sdc_be_proxy, file_dir, normative_type, update_version)
55         results.append(result)
56         if result[1] is None or result[1] not in response_codes:
57             raise ResourceCreationError("Failed creating normative type " + normative_type + ". " + str(result[1]),
58                                         1,
59                                         normative_type)
60     return results
61
62
63 def _send_request(sdc_be_proxy, file_dir, element_name, update_version):
64     try:
65         log("create normative type ", element_name)
66         debug("userId", sdc_be_proxy.con.user_header)
67         debug("fileDir", file_dir)
68
69         url = '/sdc2/rest/v1/catalog/upload/multipart'
70         if update_version is not None:
71             url += '?createNewVersion=' + _boolean_to_string(update_version)
72
73         send = _create_send_body(file_dir, element_name)
74
75         debug(send)
76         http_res = sdc_be_proxy.post_file(url, send)
77         if http_res is not None:
78             debug("http response=", http_res)
79
80         response = sdc_be_proxy.get_response_from_buffer()
81         debug(response)
82         return element_name, http_res, response
83
84     except Exception as inst:
85         print("ERROR=" + str(inst))
86         return element_name, None, None
87
88
89 def _create_send_body(file_dir, element_name):
90     yml_path = file_dir + element_name + "/" + element_name + ".yml"
91     path = file_dir + element_name + "/" + element_name + ".zip"
92
93     zf = zipfile.ZipFile(path, "w")
94     zf.write(yml_path, element_name + '.yml')
95     zf.close()
96
97     debug(path)
98     current_json_file = file_dir + element_name + "/" + element_name + ".json"
99
100     json_file = open(current_json_file, encoding='utf-8')
101
102     debug("before load json")
103     json_data = json.load(json_file, strict=False)
104     debug(json_data)
105
106     json_as_str = json.dumps(json_data)
107
108     return [('resourceMetadata', json_as_str), ('resourceZip', (pycurl.FORM_FILE, path))]
109
110
111 def _results_ok(results, response_codes):
112     for result in results:
113         if result[1] not in response_codes:
114             return False
115
116     return True
117
118
119 def _get_response_code():
120     response_codes = [200, 201, 409]
121     return response_codes
122
123
124 def _boolean_to_string(boolean_value):
125     return "true" if boolean_value else "false"