6 from sdcBePy.common.helpers import check_arguments_not_none
9 def get_url(ip, port, protocol):
10 return "%s://%s:%s" % (protocol, ip, port)
15 BODY_SEPARATOR = "\r\n\r\n"
18 def __init__(self, be_ip, be_port, header, scheme, user_id="jh0003",
19 debug=False, connector=None):
20 if not check_arguments_not_none(be_ip, be_port, scheme, user_id):
21 raise AttributeError("The be_host, be_port, scheme or admin_user are missing")
22 url = get_url(be_ip, be_port, scheme)
23 self.con = connector if connector \
24 else CurlConnector(url, user_id, header, protocol=scheme, debug=debug)
26 def check_backend(self):
27 return self.con.get('/sdc2/rest/v1/user/jh0003')
29 def check_user(self, user_name):
30 return self.con.get("/sdc2/rest/v1/user" + user_name)
32 def create_user(self, first_name, last_name, user_id, email, role):
34 return self.con.post('/sdc2/rest/v1/user', json.dumps({
35 'firstName': first_name,
36 'lastName': last_name,
42 def check_consumer(self, consumer_name):
43 return self.con.get("/sdc2/rest/v1/consumers" + consumer_name)
45 def create_consumer(self, consumer_name, slat, password):
46 return self.con.post("/sdc2/rest/v1/consumers", json.dumps({
47 'consumerName': consumer_name,
49 'consumerPassword': password
52 def disable_locking(self, disable):
53 return self.con.post("/sdc2/rest/v1/catalog/lock", disable)
55 def get_normatives(self):
56 return self.con.get("/sdc2/rest/v1/screen", with_buffer=True)
58 def post_file(self, path, multi_part_form_data, buffer=None):
59 return self.con.post_file(path, multi_part_form_data, buffer)
61 def put_file(self, path, multi_part_form_data, buffer=None):
62 return self.con.put_file(path, multi_part_form_data, buffer)
64 def get_response_from_buffer(self):
65 value = self.con.buffer.getvalue()
66 self.con.buffer.truncate(0)
67 self.con.buffer.seek(0)
69 response = value.decode(self.CHARTSET).split(self.BODY_SEPARATOR)
70 return response[1] if len(response) == 2 else response[0]
74 CONTENT_TYPE_HEADER = "Content-Type: application/json"
75 ACCEPT_HEADER = "Accept: application/json; charset=UTF-8"
77 def __init__(self, url, user_id_header, header, buffer=None, protocol="http", debug=False):
79 self.__protocol = protocol
80 self.c = self.__build_default_curl()
82 self.user_header = "USER_ID: " + user_id_header
86 self.buffer = BytesIO()
89 self.basicauth_header = ""
91 self.basicauth_header = "Authorization: Basic " + header
93 def get(self, path, buffer=None, with_buffer=False):
95 self.c.setopt(pycurl.URL, self.url + path)
96 self.c.setopt(pycurl.HTTPHEADER, [self.user_header,
97 CurlConnector.CONTENT_TYPE_HEADER,
98 CurlConnector.ACCEPT_HEADER,
99 self.basicauth_header])
102 write = self.buffer.write if not buffer else buffer.write
103 self.c.setopt(pycurl.WRITEFUNCTION, write)
106 return self.c.getinfo(pycurl.RESPONSE_CODE)
110 def post(self, path, data):
112 self.c.setopt(pycurl.URL, self.url + path)
113 self.c.setopt(pycurl.POST, 1)
115 self.c.setopt(pycurl.HTTPHEADER, [self.user_header,
116 CurlConnector.CONTENT_TYPE_HEADER,
117 CurlConnector.ACCEPT_HEADER,
118 self.basicauth_header])
120 self.c.setopt(pycurl.POSTFIELDS, data)
123 self.c.setopt(pycurl.POST, 0)
125 return self.c.getinfo(pycurl.RESPONSE_CODE)
129 def post_file(self, path, post_body, buffer=None):
131 self.c.setopt(pycurl.URL, self.url + path)
132 self.c.setopt(pycurl.POST, 1)
133 self.c.setopt(pycurl.HTTPHEADER, [self.user_header, self.basicauth_header])
135 self.c.setopt(pycurl.HTTPPOST, post_body)
137 write = self.buffer.write if not buffer else buffer.write
138 self.c.setopt(pycurl.WRITEFUNCTION, write)
141 self.c.setopt(pycurl.POST, 0)
142 return self.c.getinfo(pycurl.RESPONSE_CODE)
143 except pycurl.error as ex:
147 def put_file(self, path, post_body, response_write_buffer=None):
148 curl = self.__build_default_curl()
149 curl.setopt(pycurl.URL, self.url + path)
150 curl.setopt(pycurl.HTTPHEADER, [self.user_header, self.basicauth_header])
151 curl.setopt(pycurl.CUSTOMREQUEST, "PUT")
153 curl.setopt(pycurl.HTTPPOST, post_body)
155 write = self.buffer.write if not response_write_buffer else response_write_buffer.write
156 curl.setopt(pycurl.WRITEFUNCTION, write)
159 response_code = curl.getinfo(pycurl.RESPONSE_CODE)
163 def __build_default_curl(self):
166 # disable printing not necessary logs in the terminal
167 curl.setopt(pycurl.WRITEFUNCTION, lambda x: None)
169 curl.setopt(pycurl.VERBOSE, 1)
171 if self.__protocol == 'https':
172 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
173 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
174 curl.setopt(pycurl.HEADER, True)