6 from sdcBePy.common.helpers import check_arguments_not_none
9 def get_url(ip, port, protocol):
10 return "%s://%s:%s" % (protocol, ip, port)
15 def __init__(self, be_ip, be_port, scheme, user_id="jh0003",
16 debug=False, connector=None):
17 if not check_arguments_not_none(be_ip, be_port, scheme, user_id):
18 raise AttributeError("The be_host, be_port, scheme or admin_user are missing")
19 url = get_url(be_ip, be_port, scheme)
20 self.con = connector if connector \
21 else CurlConnector(url, user_id, scheme=scheme, debug=debug)
23 def check_backend(self):
24 return self.con.get('/sdc2/rest/v1/user/jh0003')
26 def check_user(self, user_name):
27 return self.con.get("/sdc2/rest/v1/user/" + user_name)
29 def create_user(self, first_name, last_name, user_id, email, role):
30 return self.con.post('/sdc2/rest/v1/user', json.dumps({
31 'firstName': first_name,
32 'lastName': last_name,
38 def check_consumer(self, consumer_name):
39 return self.con.get("/sdc2/rest/v1/consumers/" + consumer_name)
41 def create_consumer(self, consumer_name, slat, password):
42 return self.con.post("/sdc2/rest/v1/consumers/", json.dumps({
43 'consumerName': consumer_name,
45 'consumerPassword': password
48 def post_file(self, path, multi_part_form_data):
49 return self.con.post_file(path, multi_part_form_data)
53 CONTENT_TYPE_HEADER = "Content-Type: application/json"
54 ACCEPT_HEADER = "Accept: application/json; charset=UTF-8"
56 def __init__(self, url, user_id_header, buffer=None, scheme="http", debug=False):
57 self.c = pycurl.Curl()
58 self.c.setopt(pycurl.HEADER, True)
60 self.user_header = "USER_ID: " + user_id_header
63 # disable printing not necessary logs in the terminal
64 self.c.setopt(pycurl.WRITEFUNCTION, lambda x: None)
66 self.c.setopt(pycurl.VERBOSE, 1)
69 self.buffer = BytesIO()
72 self._check_schema(scheme)
76 self.c.setopt(pycurl.URL, self.url + path)
77 self.c.setopt(pycurl.HTTPHEADER, [self.user_header,
78 CurlConnector.CONTENT_TYPE_HEADER,
79 CurlConnector.ACCEPT_HEADER])
82 return self.c.getinfo(pycurl.RESPONSE_CODE)
86 def post(self, path, data):
88 self.c.setopt(pycurl.URL, self.url + path)
89 self.c.setopt(pycurl.POST, 1)
90 self.c.setopt(pycurl.HTTPHEADER, [self.user_header,
91 CurlConnector.CONTENT_TYPE_HEADER,
92 CurlConnector.ACCEPT_HEADER])
94 self.c.setopt(pycurl.POSTFIELDS, data)
97 self.c.setopt(pycurl.POST, 0)
99 return self.c.getinfo(pycurl.RESPONSE_CODE)
103 def post_file(self, path, post_body, buffer=None):
105 self.c.setopt(pycurl.URL, self.url + path)
106 self.c.setopt(pycurl.POST, 1)
107 self.c.setopt(pycurl.HTTPHEADER, [self.user_header])
109 self.c.setopt(pycurl.HTTPPOST, post_body)
111 write = self.buffer.write if not buffer else buffer.write
112 self.c.setopt(pycurl.WRITEFUNCTION, write)
115 self.c.setopt(pycurl.POST, 0)
117 return self.c.getinfo(pycurl.RESPONSE_CODE)
121 def _check_schema(self, scheme):
122 if scheme == 'https':
123 self.c.setopt(pycurl.SSL_VERIFYPEER, 0)
124 self.c.setopt(pycurl.SSL_VERIFYHOST, 0)