from sdcBePy.common.helpers import check_arguments_not_none
-
def get_url(ip, port, protocol):
return "%s://%s:%s" % (protocol, ip, port)
class SdcBeProxy:
- def __init__(self, be_ip, be_port, scheme, user_id="jh0003",
+ BODY_SEPARATOR = "\r\n\r\n"
+ CHARTSET = 'UTF-8'
+
+ def __init__(self, be_ip, be_port, header, scheme, tls_cert, tls_key, tls_key_pw, ca_cert, user_id="jh0003",
debug=False, connector=None):
if not check_arguments_not_none(be_ip, be_port, scheme, user_id):
raise AttributeError("The be_host, be_port, scheme or admin_user are missing")
url = get_url(be_ip, be_port, scheme)
self.con = connector if connector \
- else CurlConnector(url, user_id, scheme=scheme, debug=debug)
+ else CurlConnector(url, user_id, header, tls_cert, tls_key, tls_key_pw, ca_cert, protocol=scheme, debug=debug)
def check_backend(self):
return self.con.get('/sdc2/rest/v1/user/jh0003')
def check_user(self, user_name):
- return self.con.get("/sdc2/rest/v1/user/" + user_name)
+ return self.con.get("/sdc2/rest/v1/user" + user_name)
def create_user(self, first_name, last_name, user_id, email, role):
+
return self.con.post('/sdc2/rest/v1/user', json.dumps({
'firstName': first_name,
'lastName': last_name,
}))
def check_consumer(self, consumer_name):
- return self.con.get("/sdc2/rest/v1/consumers/" + consumer_name)
+ return self.con.get("/sdc2/rest/v1/consumers" + consumer_name)
def create_consumer(self, consumer_name, slat, password):
- return self.con.post("/sdc2/rest/v1/consumers/", json.dumps({
+ return self.con.post("/sdc2/rest/v1/consumers", json.dumps({
'consumerName': consumer_name,
'consumerSalt': slat,
'consumerPassword': password
}))
- def post_file(self, path, multi_part_form_data):
- return self.con.post_file(path, multi_part_form_data)
+ def disable_locking(self, disable):
+ return self.con.post("/sdc2/rest/v1/catalog/lock", disable)
+
+ def get_normatives(self):
+ return self.con.get("/sdc2/rest/v1/screen", with_buffer=True)
+
+ def get_model_list(self):
+ return self.con.get("/sdc2/rest/v1/catalog/model", with_buffer=True)
+
+ def post_file(self, path, multi_part_form_data, buffer=None):
+ return self.con.post_file(path, multi_part_form_data, buffer)
+ def put_file(self, path, multi_part_form_data, buffer=None):
+ return self.con.put_file(path, multi_part_form_data, buffer)
+
+ def get_response_from_buffer(self):
+ value = self.con.buffer.getvalue()
+ self.con.buffer.truncate(0)
+ self.con.buffer.seek(0)
+
+ response = value.decode(self.CHARTSET).split(self.BODY_SEPARATOR)
+ return response[len(response) - 1] if len(response) > 1 else response[0]
class CurlConnector:
CONTENT_TYPE_HEADER = "Content-Type: application/json"
ACCEPT_HEADER = "Accept: application/json; charset=UTF-8"
- def __init__(self, url, user_id_header, buffer=None, scheme="http", debug=False):
- self.c = pycurl.Curl()
- self.c.setopt(pycurl.HEADER, True)
+ def __init__(self, url, user_id_header, header, tls_cert, tls_key, tls_key_pw, ca_cert, buffer=None, protocol="http", debug=False):
+ self.__debug = debug
+ self.__protocol = protocol
+ self.__tls_cert = tls_cert
+ self.__tls_key = tls_key
+ self.__tls_key_pw = tls_key_pw
+ self.__ca_cert = ca_cert
+ self.c = self.__build_default_curl()
self.user_header = "USER_ID: " + user_id_header
-
- if not debug:
- # disable printing not necessary logs in the terminal
- self.c.setopt(pycurl.WRITEFUNCTION, lambda x: None)
- else:
- self.c.setopt(pycurl.VERBOSE, 1)
+ self.url = url
if not buffer:
self.buffer = BytesIO()
- self.url = url
- self._check_schema(scheme)
+ if header is None:
+ self.basicauth_header = ""
+ else:
+ self.basicauth_header = "Authorization: Basic " + header
- def get(self, path):
+ def get(self, path, buffer=None, with_buffer=False):
try:
self.c.setopt(pycurl.URL, self.url + path)
self.c.setopt(pycurl.HTTPHEADER, [self.user_header,
CurlConnector.CONTENT_TYPE_HEADER,
- CurlConnector.ACCEPT_HEADER])
+ CurlConnector.ACCEPT_HEADER,
+ self.basicauth_header])
+
+ if with_buffer:
+ write = self.buffer.write if not buffer else buffer.write
+ self.c.setopt(pycurl.WRITEFUNCTION, write)
self.c.perform()
return self.c.getinfo(pycurl.RESPONSE_CODE)
try:
self.c.setopt(pycurl.URL, self.url + path)
self.c.setopt(pycurl.POST, 1)
+
self.c.setopt(pycurl.HTTPHEADER, [self.user_header,
CurlConnector.CONTENT_TYPE_HEADER,
- CurlConnector.ACCEPT_HEADER])
+ CurlConnector.ACCEPT_HEADER,
+ self.basicauth_header])
self.c.setopt(pycurl.POSTFIELDS, data)
try:
self.c.setopt(pycurl.URL, self.url + path)
self.c.setopt(pycurl.POST, 1)
- self.c.setopt(pycurl.HTTPHEADER, [self.user_header])
+ self.c.setopt(pycurl.HTTPHEADER, [self.user_header, self.basicauth_header])
self.c.setopt(pycurl.HTTPPOST, post_body)
self.c.perform()
self.c.setopt(pycurl.POST, 0)
-
return self.c.getinfo(pycurl.RESPONSE_CODE)
- except pycurl.error:
+ except pycurl.error as ex:
+ print(ex)
return 111
- def _check_schema(self, scheme):
- if scheme == 'https':
- self.c.setopt(pycurl.SSL_VERIFYPEER, 0)
- self.c.setopt(pycurl.SSL_VERIFYHOST, 0)
+ def put_file(self, path, post_body, response_write_buffer=None):
+ curl = self.__build_default_curl()
+ curl.setopt(pycurl.URL, self.url + path)
+ curl.setopt(pycurl.HTTPHEADER, [self.user_header, self.basicauth_header])
+ curl.setopt(pycurl.CUSTOMREQUEST, "PUT")
+
+ curl.setopt(pycurl.HTTPPOST, post_body)
+
+ write = self.buffer.write if not response_write_buffer else response_write_buffer.write
+ curl.setopt(pycurl.WRITEFUNCTION, write)
+
+ curl.perform()
+ response_code = curl.getinfo(pycurl.RESPONSE_CODE)
+ curl.close()
+ return response_code
+
+ def __build_default_curl(self):
+ curl = pycurl.Curl()
+ if not self.__debug:
+ # disable printing not necessary logs in the terminal
+ curl.setopt(pycurl.WRITEFUNCTION, lambda x: None)
+ else:
+ curl.setopt(pycurl.VERBOSE, 1)
+
+ if self.__protocol == 'https':
+ curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+ curl.setopt(pycurl.SSL_VERIFYHOST, 0)
+ if self.__tls_cert is not None and self.__tls_key is not None:
+ curl.setopt(curl.SSLCERT, self.__tls_cert)
+ curl.setopt(curl.SSLKEY, self.__tls_key)
+ if self.__tls_key_pw is not None:
+ curl.setopt(curl.KEYPASSWD, self.__tls_key_pw)
+ if self.__ca_cert is not None:
+ curl.setopt(pycurl.SSL_VERIFYPEER, 1)
+ curl.setopt(pycurl.SSL_VERIFYHOST, 2)
+ curl.setopt(curl.CAINFO, self.__ca_cert)
+ curl.setopt(pycurl.HEADER, True)
+ return curl
def __del__(self):
self.c.close()