TLS sdc-be-init: truststore & keystore handling
[sdc.git] / catalog-be / src / main / resources / scripts / sdcBePy / common / sdcBeProxy.py
1 import json
2 from io import BytesIO
3
4 import pycurl
5
6 from sdcBePy.common.helpers import check_arguments_not_none
7
8 def get_url(ip, port, protocol):
9     return "%s://%s:%s" % (protocol, ip, port)
10
11
12 class SdcBeProxy:
13
14     BODY_SEPARATOR = "\r\n\r\n"
15     CHARTSET = 'UTF-8'
16
17     def __init__(self, be_ip, be_port, header, scheme, tls_cert, tls_key, tls_key_pw, ca_cert, user_id="jh0003",
18                  debug=False, connector=None):
19         if not check_arguments_not_none(be_ip, be_port, scheme, user_id):
20             raise AttributeError("The be_host, be_port, scheme or admin_user are missing")
21         url = get_url(be_ip, be_port, scheme)
22         self.con = connector if connector \
23             else CurlConnector(url, user_id, header, tls_cert, tls_key, tls_key_pw, ca_cert, protocol=scheme, debug=debug)
24
25     def check_backend(self):
26         return self.con.get('/sdc2/rest/v1/user/jh0003')
27
28     def check_user(self, user_name):
29           return self.con.get("/sdc2/rest/v1/user" + user_name)
30
31     def create_user(self, first_name, last_name, user_id, email, role):
32
33         return self.con.post('/sdc2/rest/v1/user', json.dumps({
34             'firstName': first_name,
35             'lastName': last_name,
36             'userId': user_id,
37             'email': email,
38             'role': role
39         }))
40
41     def check_consumer(self, consumer_name):
42         return self.con.get("/sdc2/rest/v1/consumers" + consumer_name)
43
44     def create_consumer(self, consumer_name, slat, password):
45         return self.con.post("/sdc2/rest/v1/consumers", json.dumps({
46             'consumerName': consumer_name,
47             'consumerSalt': slat,
48             'consumerPassword': password
49         }))
50
51     def disable_locking(self, disable):
52         return self.con.post("/sdc2/rest/v1/catalog/lock", disable)
53
54     def get_normatives(self):
55         return self.con.get("/sdc2/rest/v1/screen", with_buffer=True)
56
57     def get_model_list(self):
58         return self.con.get("/sdc2/rest/v1/catalog/model", with_buffer=True)
59
60     def post_file(self, path, multi_part_form_data, buffer=None):
61         return self.con.post_file(path, multi_part_form_data, buffer)
62
63     def put_file(self, path, multi_part_form_data, buffer=None):
64         return self.con.put_file(path, multi_part_form_data, buffer)
65
66     def get_response_from_buffer(self):
67         value = self.con.buffer.getvalue()
68         self.con.buffer.truncate(0)
69         self.con.buffer.seek(0)
70
71         response = value.decode(self.CHARTSET).split(self.BODY_SEPARATOR)
72         return response[len(response) - 1] if len(response) > 1 else response[0]
73
74 class CurlConnector:
75     CONTENT_TYPE_HEADER = "Content-Type: application/json"
76     ACCEPT_HEADER = "Accept: application/json; charset=UTF-8"
77
78     def __init__(self, url, user_id_header, header, tls_cert, tls_key, tls_key_pw, ca_cert, buffer=None, protocol="http", debug=False):
79         self.__debug = debug
80         self.__protocol = protocol
81         self.__tls_cert = tls_cert
82         self.__tls_key = tls_key
83         self.__tls_key_pw = tls_key_pw
84         self.__ca_cert = ca_cert
85         self.c = self.__build_default_curl()
86
87         self.user_header = "USER_ID: " + user_id_header
88         self.url = url
89
90         if not buffer:
91             self.buffer = BytesIO()
92
93         if header is None:
94             self.basicauth_header = ""
95         else:
96             self.basicauth_header = "Authorization: Basic " + header
97
98     def get(self, path, buffer=None, with_buffer=False):
99         try:
100             self.c.setopt(pycurl.URL, self.url + path)
101             self.c.setopt(pycurl.HTTPHEADER, [self.user_header,
102                                               CurlConnector.CONTENT_TYPE_HEADER,
103                                               CurlConnector.ACCEPT_HEADER,
104                                               self.basicauth_header])
105
106             if with_buffer:
107                 write = self.buffer.write if not buffer else buffer.write
108                 self.c.setopt(pycurl.WRITEFUNCTION, write)
109
110             self.c.perform()
111             return self.c.getinfo(pycurl.RESPONSE_CODE)
112         except pycurl.error:
113             return 111
114
115     def post(self, path, data):
116         try:
117             self.c.setopt(pycurl.URL, self.url + path)
118             self.c.setopt(pycurl.POST, 1)
119
120             self.c.setopt(pycurl.HTTPHEADER, [self.user_header,
121                                               CurlConnector.CONTENT_TYPE_HEADER,
122                                               CurlConnector.ACCEPT_HEADER,
123                                               self.basicauth_header])
124
125             self.c.setopt(pycurl.POSTFIELDS, data)
126
127             self.c.perform()
128             self.c.setopt(pycurl.POST, 0)
129
130             return self.c.getinfo(pycurl.RESPONSE_CODE)
131         except pycurl.error:
132             return 111
133
134     def post_file(self, path, post_body, buffer=None):
135         try:
136             self.c.setopt(pycurl.URL, self.url + path)
137             self.c.setopt(pycurl.POST, 1)
138             self.c.setopt(pycurl.HTTPHEADER, [self.user_header, self.basicauth_header])
139
140             self.c.setopt(pycurl.HTTPPOST, post_body)
141
142             write = self.buffer.write if not buffer else buffer.write
143             self.c.setopt(pycurl.WRITEFUNCTION, write)
144
145             self.c.perform()
146             self.c.setopt(pycurl.POST, 0)
147             return self.c.getinfo(pycurl.RESPONSE_CODE)
148         except pycurl.error as ex:
149             print(ex)
150             return 111
151
152     def put_file(self, path, post_body, response_write_buffer=None):
153         curl = self.__build_default_curl()
154         curl.setopt(pycurl.URL, self.url + path)
155         curl.setopt(pycurl.HTTPHEADER, [self.user_header, self.basicauth_header])
156         curl.setopt(pycurl.CUSTOMREQUEST, "PUT")
157
158         curl.setopt(pycurl.HTTPPOST, post_body)
159
160         write = self.buffer.write if not response_write_buffer else response_write_buffer.write
161         curl.setopt(pycurl.WRITEFUNCTION, write)
162
163         curl.perform()
164         response_code = curl.getinfo(pycurl.RESPONSE_CODE)
165         curl.close()
166         return response_code
167
168     def __build_default_curl(self):
169         curl = pycurl.Curl()
170         if not self.__debug:
171             # disable printing not necessary logs in the terminal
172             curl.setopt(pycurl.WRITEFUNCTION, lambda x: None)
173         else:
174             curl.setopt(pycurl.VERBOSE, 1)
175
176         if self.__protocol == 'https':
177             curl.setopt(pycurl.SSL_VERIFYPEER, 0)
178             curl.setopt(pycurl.SSL_VERIFYHOST, 0)
179             if self.__tls_cert is not None and self.__tls_key is not None:
180                 curl.setopt(curl.SSLCERT, self.__tls_cert)
181                 curl.setopt(curl.SSLKEY, self.__tls_key)
182                 if self.__tls_key_pw is not None:
183                   curl.setopt(curl.KEYPASSWD, self.__tls_key_pw)
184                 if self.__ca_cert is not None:
185                     curl.setopt(pycurl.SSL_VERIFYPEER, 1)
186                     curl.setopt(pycurl.SSL_VERIFYHOST, 2)
187                     curl.setopt(curl.CAINFO, self.__ca_cert)
188         curl.setopt(pycurl.HEADER, True)
189         return curl
190
191     def __del__(self):
192         self.c.close()