6 from sdcBePy.common.helpers import check_arguments_not_none
9 def get_url(ip, port, protocol):
10 return "%s://%s:%s" % (protocol, ip, port)
15 BODY_SEPARATOR = "\r\n\r\n"
18 def __init__(self, be_ip, be_port, header, scheme, user_id="jh0003",
19 debug=False, connector=None):
20 if not check_arguments_not_none(be_ip, be_port, scheme, user_id):
21 raise AttributeError("The be_host, be_port, scheme or admin_user are missing")
22 url = get_url(be_ip, be_port, scheme)
23 self.con = connector if connector \
24 else CurlConnector(url, user_id, header, protocol=scheme, debug=debug)
26 def check_backend(self):
27 return self.con.get('/sdc2/rest/v1/user/jh0003')
29 def check_user(self, user_name):
30 return self.con.get("/sdc2/rest/v1/user" + user_name)
32 def create_user(self, first_name, last_name, user_id, email, role):
34 return self.con.post('/sdc2/rest/v1/user', json.dumps({
35 'firstName': first_name,
36 'lastName': last_name,
42 def check_consumer(self, consumer_name):
43 return self.con.get("/sdc2/rest/v1/consumers" + consumer_name)
45 def create_consumer(self, consumer_name, slat, password):
46 return self.con.post("/sdc2/rest/v1/consumers", json.dumps({
47 'consumerName': consumer_name,
49 'consumerPassword': password
52 def get_normatives(self):
53 return self.con.get("/sdc2/rest/v1/screen", with_buffer=True)
55 def post_file(self, path, multi_part_form_data, buffer=None):
56 return self.con.post_file(path, multi_part_form_data, buffer)
58 def put_file(self, path, multi_part_form_data, buffer=None):
59 return self.con.put_file(path, multi_part_form_data, buffer)
61 def get_response_from_buffer(self):
62 value = self.con.buffer.getvalue()
63 self.con.buffer.truncate(0)
64 self.con.buffer.seek(0)
66 response = value.decode(self.CHARTSET).split(self.BODY_SEPARATOR)
67 return response[1] if len(response) == 2 else response[0]
71 CONTENT_TYPE_HEADER = "Content-Type: application/json"
72 ACCEPT_HEADER = "Accept: application/json; charset=UTF-8"
74 def __init__(self, url, user_id_header, header, buffer=None, protocol="http", debug=False):
76 self.__protocol = protocol
77 self.c = self.__build_default_curl()
79 self.user_header = "USER_ID: " + user_id_header
83 self.buffer = BytesIO()
86 self.basicauth_header = ""
88 self.basicauth_header = "Authorization: Basic " + header
90 def get(self, path, buffer=None, with_buffer=False):
92 self.c.setopt(pycurl.URL, self.url + path)
93 self.c.setopt(pycurl.HTTPHEADER, [self.user_header,
94 CurlConnector.CONTENT_TYPE_HEADER,
95 CurlConnector.ACCEPT_HEADER,
96 self.basicauth_header])
99 write = self.buffer.write if not buffer else buffer.write
100 self.c.setopt(pycurl.WRITEFUNCTION, write)
103 return self.c.getinfo(pycurl.RESPONSE_CODE)
107 def post(self, path, data):
109 self.c.setopt(pycurl.URL, self.url + path)
110 self.c.setopt(pycurl.POST, 1)
112 self.c.setopt(pycurl.HTTPHEADER, [self.user_header,
113 CurlConnector.CONTENT_TYPE_HEADER,
114 CurlConnector.ACCEPT_HEADER,
115 self.basicauth_header])
117 self.c.setopt(pycurl.POSTFIELDS, data)
120 self.c.setopt(pycurl.POST, 0)
122 return self.c.getinfo(pycurl.RESPONSE_CODE)
126 def post_file(self, path, post_body, buffer=None):
128 self.c.setopt(pycurl.URL, self.url + path)
129 self.c.setopt(pycurl.POST, 1)
130 self.c.setopt(pycurl.HTTPHEADER, [self.user_header, self.basicauth_header])
132 self.c.setopt(pycurl.HTTPPOST, post_body)
134 write = self.buffer.write if not buffer else buffer.write
135 self.c.setopt(pycurl.WRITEFUNCTION, write)
138 self.c.setopt(pycurl.POST, 0)
139 return self.c.getinfo(pycurl.RESPONSE_CODE)
140 except pycurl.error as ex:
144 def put_file(self, path, post_body, response_write_buffer=None):
145 curl = self.__build_default_curl()
146 curl.setopt(pycurl.URL, self.url + path)
147 curl.setopt(pycurl.HTTPHEADER, [self.user_header, self.basicauth_header])
148 curl.setopt(pycurl.CUSTOMREQUEST, "PUT")
150 curl.setopt(pycurl.HTTPPOST, post_body)
152 write = self.buffer.write if not response_write_buffer else response_write_buffer.write
153 curl.setopt(pycurl.WRITEFUNCTION, write)
156 response_code = curl.getinfo(pycurl.RESPONSE_CODE)
160 def __build_default_curl(self):
163 # disable printing not necessary logs in the terminal
164 curl.setopt(pycurl.WRITEFUNCTION, lambda x: None)
166 curl.setopt(pycurl.VERBOSE, 1)
168 if self.__protocol == 'https':
169 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
170 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
171 curl.setopt(pycurl.HEADER, True)