6 from sdcBePy.common.helpers import check_arguments_not_none
9 def get_url(ip, port, protocol):
10 return "%s://%s:%s" % (protocol, ip, port)
15 def __init__(self, be_ip, be_port, scheme, user_id="jh0003",
16 debug=False, connector=None):
17 if not check_arguments_not_none(be_ip, be_port, scheme, user_id):
18 raise AttributeError("The be_host, be_port, scheme or admin_user are missing")
19 url = get_url(be_ip, be_port, scheme)
20 self.con = connector if connector \
21 else CurlConnector(url, user_id, scheme=scheme, debug=debug)
23 def check_backend(self):
24 return self.con.get('/sdc2/rest/v1/user/jh0003')
26 def check_user(self, user_name):
27 return self.con.get("/sdc2/rest/v1/user/" + user_name)
29 def create_user(self, first_name, last_name, user_id, email, role):
30 return self.con.post('/sdc2/rest/v1/user', json.dumps({
31 'firstName': first_name,
32 'lastName': last_name,
38 def post_file(self, path, multi_part_form_data):
39 return self.con.post_file(path, multi_part_form_data)
43 CONTENT_TYPE_HEADER = "Content-Type: application/json"
44 ACCEPT_HEADER = "Accept: application/json; charset=UTF-8"
46 def __init__(self, url, user_id_header, buffer=None, scheme="http", debug=False):
47 self.c = pycurl.Curl()
48 self.c.setopt(pycurl.HEADER, True)
50 self.user_header = "USER_ID: " + user_id_header
53 # disable printing not necessary logs in the terminal
54 self.c.setopt(pycurl.WRITEFUNCTION, lambda x: None)
56 self.c.setopt(pycurl.VERBOSE, 1)
59 self.buffer = BytesIO()
62 self._check_schema(scheme)
66 self.c.setopt(pycurl.URL, self.url + path)
67 self.c.setopt(pycurl.HTTPHEADER, [self.user_header,
68 CurlConnector.CONTENT_TYPE_HEADER,
69 CurlConnector.ACCEPT_HEADER])
72 return self.c.getinfo(pycurl.RESPONSE_CODE)
76 def post(self, path, data):
78 self.c.setopt(pycurl.URL, self.url + path)
79 self.c.setopt(pycurl.POST, 1)
80 self.c.setopt(pycurl.HTTPHEADER, [self.user_header,
81 CurlConnector.CONTENT_TYPE_HEADER,
82 CurlConnector.ACCEPT_HEADER])
84 self.c.setopt(pycurl.POSTFIELDS, data)
87 self.c.setopt(pycurl.POST, 0)
89 return self.c.getinfo(pycurl.RESPONSE_CODE)
93 def post_file(self, path, post_body, buffer=None):
95 self.c.setopt(pycurl.URL, self.url + path)
96 self.c.setopt(pycurl.POST, 1)
97 self.c.setopt(pycurl.HTTPHEADER, [self.user_header])
99 self.c.setopt(pycurl.HTTPPOST, post_body)
101 write = self.buffer.write if not buffer else buffer.write
102 self.c.setopt(pycurl.WRITEFUNCTION, write)
105 self.c.setopt(pycurl.POST, 0)
107 return self.c.getinfo(pycurl.RESPONSE_CODE)
111 def _check_schema(self, scheme):
112 if scheme == 'https':
113 self.c.setopt(pycurl.SSL_VERIFYPEER, 0)
114 self.c.setopt(pycurl.SSL_VERIFYHOST, 0)