6 from sdcBePy.common.helpers import check_arguments_not_none
9 def get_url(ip, port, protocol):
10 return "%s://%s:%s" % (protocol, ip, port)
15 BODY_SEPARATOR = "\r\n\r\n"
18 def __init__(self, be_ip, be_port, header, scheme, user_id="jh0003",
19 debug=False, connector=None):
20 if not check_arguments_not_none(be_ip, be_port, scheme, user_id):
21 raise AttributeError("The be_host, be_port, scheme or admin_user are missing")
22 url = get_url(be_ip, be_port, scheme)
23 self.con = connector if connector \
24 else CurlConnector(url, user_id, header, scheme=scheme, debug=debug)
26 def check_backend(self):
27 return self.con.get('/sdc2/rest/v1/user/jh0003')
29 def check_user(self, user_name):
30 return self.con.get("/sdc2/rest/v1/user" + user_name)
32 def create_user(self, first_name, last_name, user_id, email, role):
34 return self.con.post('/sdc2/rest/v1/user', json.dumps({
35 'firstName': first_name,
36 'lastName': last_name,
42 def check_consumer(self, consumer_name):
43 return self.con.get("/sdc2/rest/v1/consumers" + consumer_name)
45 def create_consumer(self, consumer_name, slat, password):
46 return self.con.post("/sdc2/rest/v1/consumers", json.dumps({
47 'consumerName': consumer_name,
49 'consumerPassword': password
52 def get_normatives(self):
53 return self.con.get("/sdc2/rest/v1/screen", with_buffer=True)
55 def post_file(self, path, multi_part_form_data):
56 return self.con.post_file(path, multi_part_form_data)
58 def get_response_from_buffer(self):
59 value = self.con.buffer.getvalue()
60 self.con.buffer.truncate(0)
61 self.con.buffer.seek(0)
63 response = value.decode(self.CHARTSET).split(self.BODY_SEPARATOR)
64 return response[1] if len(response) == 2 else response[0]
68 CONTENT_TYPE_HEADER = "Content-Type: application/json"
69 ACCEPT_HEADER = "Accept: application/json; charset=UTF-8"
71 def __init__(self, url, user_id_header, header, buffer=None, scheme="http", debug=False):
72 self.c = pycurl.Curl()
73 self.c.setopt(pycurl.HEADER, True)
75 self.user_header = "USER_ID: " + user_id_header
78 # disable printing not necessary logs in the terminal
79 self.c.setopt(pycurl.WRITEFUNCTION, lambda x: None)
81 self.c.setopt(pycurl.VERBOSE, 1)
84 self.buffer = BytesIO()
87 self.basicauth_header = ""
89 self.basicauth_header = "Authorization: Basic " + header
92 self._check_schema(scheme)
94 def get(self, path, buffer=None, with_buffer=False):
96 self.c.setopt(pycurl.URL, self.url + path)
97 self.c.setopt(pycurl.HTTPHEADER, [self.user_header,
98 CurlConnector.CONTENT_TYPE_HEADER,
99 CurlConnector.ACCEPT_HEADER,
100 self.basicauth_header])
104 write = self.buffer.write if not buffer else buffer.write
105 self.c.setopt(pycurl.WRITEFUNCTION, write)
108 return self.c.getinfo(pycurl.RESPONSE_CODE)
112 def post(self, path, data):
114 self.c.setopt(pycurl.URL, self.url + path)
115 self.c.setopt(pycurl.POST, 1)
117 self.c.setopt(pycurl.HTTPHEADER, [self.user_header,
118 CurlConnector.CONTENT_TYPE_HEADER,
119 CurlConnector.ACCEPT_HEADER,
120 self.basicauth_header])
122 self.c.setopt(pycurl.POSTFIELDS, data)
125 self.c.setopt(pycurl.POST, 0)
127 return self.c.getinfo(pycurl.RESPONSE_CODE)
131 def post_file(self, path, post_body, buffer=None):
133 self.c.setopt(pycurl.URL, self.url + path)
134 self.c.setopt(pycurl.POST, 1)
135 self.c.setopt(pycurl.HTTPHEADER, [self.user_header,
136 self.basicauth_header])
139 self.c.setopt(pycurl.HTTPPOST, post_body)
141 write = self.buffer.write if not buffer else buffer.write
142 self.c.setopt(pycurl.WRITEFUNCTION, write)
145 self.c.setopt(pycurl.POST, 0)
147 return self.c.getinfo(pycurl.RESPONSE_CODE)
151 def _check_schema(self, scheme):
152 if scheme == 'https':
153 self.c.setopt(pycurl.SSL_VERIFYPEER, 0)
154 self.c.setopt(pycurl.SSL_VERIFYHOST, 0)