6 from sdcBePy.common.helpers import check_arguments_not_none
8 def get_url(ip, port, protocol):
9 return "%s://%s:%s" % (protocol, ip, port)
14 BODY_SEPARATOR = "\r\n\r\n"
17 def __init__(self, be_ip, be_port, header, scheme, tls_cert, tls_key, tls_key_pw, ca_cert, user_id="jh0003",
18 debug=False, connector=None):
19 if not check_arguments_not_none(be_ip, be_port, scheme, user_id):
20 raise AttributeError("The be_host, be_port, scheme or admin_user are missing")
21 url = get_url(be_ip, be_port, scheme)
22 self.con = connector if connector \
23 else CurlConnector(url, user_id, header, tls_cert, tls_key, tls_key_pw, ca_cert, protocol=scheme, debug=debug)
25 def check_backend(self):
26 return self.con.get('/sdc2/rest/v1/user/jh0003')
28 def check_user(self, user_name):
29 return self.con.get("/sdc2/rest/v1/user" + user_name)
31 def create_user(self, first_name, last_name, user_id, email, role):
33 return self.con.post('/sdc2/rest/v1/user', json.dumps({
34 'firstName': first_name,
35 'lastName': last_name,
41 def check_consumer(self, consumer_name):
42 return self.con.get("/sdc2/rest/v1/consumers" + consumer_name)
44 def create_consumer(self, consumer_name, slat, password):
45 return self.con.post("/sdc2/rest/v1/consumers", json.dumps({
46 'consumerName': consumer_name,
48 'consumerPassword': password
51 def disable_locking(self, disable):
52 return self.con.post("/sdc2/rest/v1/catalog/lock", disable)
54 def get_normatives(self):
55 return self.con.get("/sdc2/rest/v1/screen", with_buffer=True)
57 def get_model_list(self):
58 return self.con.get("/sdc2/rest/v1/catalog/model", with_buffer=True)
60 def post_file(self, path, multi_part_form_data, buffer=None):
61 return self.con.post_file(path, multi_part_form_data, buffer)
63 def put_file(self, path, multi_part_form_data, buffer=None):
64 return self.con.put_file(path, multi_part_form_data, buffer)
66 def get_response_from_buffer(self):
67 value = self.con.buffer.getvalue()
68 self.con.buffer.truncate(0)
69 self.con.buffer.seek(0)
71 response = value.decode(self.CHARTSET).split(self.BODY_SEPARATOR)
72 return response[len(response) - 1] if len(response) > 1 else response[0]
75 CONTENT_TYPE_HEADER = "Content-Type: application/json"
76 ACCEPT_HEADER = "Accept: application/json; charset=UTF-8"
78 def __init__(self, url, user_id_header, header, tls_cert, tls_key, tls_key_pw, ca_cert, buffer=None, protocol="http", debug=False):
80 self.__protocol = protocol
81 self.__tls_cert = tls_cert
82 self.__tls_key = tls_key
83 self.__tls_key_pw = tls_key_pw
84 self.__ca_cert = ca_cert
85 self.c = self.__build_default_curl()
87 self.user_header = "USER_ID: " + user_id_header
91 self.buffer = BytesIO()
94 self.basicauth_header = ""
96 self.basicauth_header = "Authorization: Basic " + header
98 def get(self, path, buffer=None, with_buffer=False):
100 self.c.setopt(pycurl.URL, self.url + path)
101 self.c.setopt(pycurl.HTTPHEADER, [self.user_header,
102 CurlConnector.CONTENT_TYPE_HEADER,
103 CurlConnector.ACCEPT_HEADER,
104 self.basicauth_header])
107 write = self.buffer.write if not buffer else buffer.write
108 self.c.setopt(pycurl.WRITEFUNCTION, write)
111 return self.c.getinfo(pycurl.RESPONSE_CODE)
115 def post(self, path, data):
117 self.c.setopt(pycurl.URL, self.url + path)
118 self.c.setopt(pycurl.POST, 1)
120 self.c.setopt(pycurl.HTTPHEADER, [self.user_header,
121 CurlConnector.CONTENT_TYPE_HEADER,
122 CurlConnector.ACCEPT_HEADER,
123 self.basicauth_header])
125 self.c.setopt(pycurl.POSTFIELDS, data)
128 self.c.setopt(pycurl.POST, 0)
130 return self.c.getinfo(pycurl.RESPONSE_CODE)
134 def post_file(self, path, post_body, buffer=None):
136 self.c.setopt(pycurl.URL, self.url + path)
137 self.c.setopt(pycurl.POST, 1)
138 self.c.setopt(pycurl.HTTPHEADER, [self.user_header, self.basicauth_header])
140 self.c.setopt(pycurl.HTTPPOST, post_body)
142 write = self.buffer.write if not buffer else buffer.write
143 self.c.setopt(pycurl.WRITEFUNCTION, write)
146 self.c.setopt(pycurl.POST, 0)
147 return self.c.getinfo(pycurl.RESPONSE_CODE)
148 except pycurl.error as ex:
152 def put_file(self, path, post_body, response_write_buffer=None):
153 curl = self.__build_default_curl()
154 curl.setopt(pycurl.URL, self.url + path)
155 curl.setopt(pycurl.HTTPHEADER, [self.user_header, self.basicauth_header])
156 curl.setopt(pycurl.CUSTOMREQUEST, "PUT")
158 curl.setopt(pycurl.HTTPPOST, post_body)
160 write = self.buffer.write if not response_write_buffer else response_write_buffer.write
161 curl.setopt(pycurl.WRITEFUNCTION, write)
164 response_code = curl.getinfo(pycurl.RESPONSE_CODE)
168 def __build_default_curl(self):
171 # disable printing not necessary logs in the terminal
172 curl.setopt(pycurl.WRITEFUNCTION, lambda x: None)
174 curl.setopt(pycurl.VERBOSE, 1)
176 if self.__protocol == 'https':
177 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
178 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
179 if self.__tls_cert is not None and self.__tls_key is not None:
180 curl.setopt(curl.SSLCERT, self.__tls_cert)
181 curl.setopt(curl.SSLKEY, self.__tls_key)
182 if self.__tls_key_pw is not None:
183 curl.setopt(curl.KEYPASSWD, self.__tls_key_pw)
184 if self.__ca_cert is not None:
185 curl.setopt(pycurl.SSL_VERIFYPEER, 1)
186 curl.setopt(pycurl.SSL_VERIFYHOST, 2)
187 curl.setopt(curl.CAINFO, self.__ca_cert)
188 curl.setopt(pycurl.HEADER, True)