TLS sdc-be-init: truststore & keystore handling
[sdc.git] / catalog-be / src / main / resources / scripts / sdcBePy / common / sdcBeProxy.py
index 6fea657..2a1d310 100755 (executable)
@@ -5,28 +5,31 @@ import pycurl
 
 from sdcBePy.common.helpers import check_arguments_not_none
 
-
 def get_url(ip, port, protocol):
     return "%s://%s:%s" % (protocol, ip, port)
 
 
 class SdcBeProxy:
 
-    def __init__(self, be_ip, be_port, scheme, user_id="jh0003",
+    BODY_SEPARATOR = "\r\n\r\n"
+    CHARTSET = 'UTF-8'
+
+    def __init__(self, be_ip, be_port, header, scheme, tls_cert, tls_key, tls_key_pw, ca_cert, user_id="jh0003",
                  debug=False, connector=None):
         if not check_arguments_not_none(be_ip, be_port, scheme, user_id):
             raise AttributeError("The be_host, be_port, scheme or admin_user are missing")
         url = get_url(be_ip, be_port, scheme)
         self.con = connector if connector \
-            else CurlConnector(url, user_id, scheme=scheme, debug=debug)
+            else CurlConnector(url, user_id, header, tls_cert, tls_key, tls_key_pw, ca_cert, protocol=scheme, debug=debug)
 
     def check_backend(self):
         return self.con.get('/sdc2/rest/v1/user/jh0003')
 
     def check_user(self, user_name):
-        return self.con.get("/sdc2/rest/v1/user/" + user_name)
+          return self.con.get("/sdc2/rest/v1/user" + user_name)
 
     def create_user(self, first_name, last_name, user_id, email, role):
+
         return self.con.post('/sdc2/rest/v1/user', json.dumps({
             'firstName': first_name,
             'lastName': last_name,
@@ -36,47 +39,73 @@ class SdcBeProxy:
         }))
 
     def check_consumer(self, consumer_name):
-        return self.con.get("/sdc2/rest/v1/consumers/" + consumer_name)
+        return self.con.get("/sdc2/rest/v1/consumers" + consumer_name)
 
     def create_consumer(self, consumer_name, slat, password):
-        return self.con.post("/sdc2/rest/v1/consumers/", json.dumps({
+        return self.con.post("/sdc2/rest/v1/consumers", json.dumps({
             'consumerName': consumer_name,
             'consumerSalt': slat,
             'consumerPassword': password
         }))
 
-    def post_file(self, path, multi_part_form_data):
-        return self.con.post_file(path, multi_part_form_data)
+    def disable_locking(self, disable):
+        return self.con.post("/sdc2/rest/v1/catalog/lock", disable)
+
+    def get_normatives(self):
+        return self.con.get("/sdc2/rest/v1/screen", with_buffer=True)
+
+    def get_model_list(self):
+        return self.con.get("/sdc2/rest/v1/catalog/model", with_buffer=True)
+
+    def post_file(self, path, multi_part_form_data, buffer=None):
+        return self.con.post_file(path, multi_part_form_data, buffer)
 
+    def put_file(self, path, multi_part_form_data, buffer=None):
+        return self.con.put_file(path, multi_part_form_data, buffer)
+
+    def get_response_from_buffer(self):
+        value = self.con.buffer.getvalue()
+        self.con.buffer.truncate(0)
+        self.con.buffer.seek(0)
+
+        response = value.decode(self.CHARTSET).split(self.BODY_SEPARATOR)
+        return response[len(response) - 1] if len(response) > 1 else response[0]
 
 class CurlConnector:
     CONTENT_TYPE_HEADER = "Content-Type: application/json"
     ACCEPT_HEADER = "Accept: application/json; charset=UTF-8"
 
-    def __init__(self, url, user_id_header, buffer=None, scheme="http", debug=False):
-        self.c = pycurl.Curl()
-        self.c.setopt(pycurl.HEADER, True)
+    def __init__(self, url, user_id_header, header, tls_cert, tls_key, tls_key_pw, ca_cert, buffer=None, protocol="http", debug=False):
+        self.__debug = debug
+        self.__protocol = protocol
+        self.__tls_cert = tls_cert
+        self.__tls_key = tls_key
+        self.__tls_key_pw = tls_key_pw
+        self.__ca_cert = ca_cert
+        self.c = self.__build_default_curl()
 
         self.user_header = "USER_ID: " + user_id_header
-
-        if not debug:
-            # disable printing not necessary logs in the terminal
-            self.c.setopt(pycurl.WRITEFUNCTION, lambda x: None)
-        else:
-            self.c.setopt(pycurl.VERBOSE, 1)
+        self.url = url
 
         if not buffer:
             self.buffer = BytesIO()
 
-        self.url = url
-        self._check_schema(scheme)
+        if header is None:
+            self.basicauth_header = ""
+        else:
+            self.basicauth_header = "Authorization: Basic " + header
 
-    def get(self, path):
+    def get(self, path, buffer=None, with_buffer=False):
         try:
             self.c.setopt(pycurl.URL, self.url + path)
             self.c.setopt(pycurl.HTTPHEADER, [self.user_header,
                                               CurlConnector.CONTENT_TYPE_HEADER,
-                                              CurlConnector.ACCEPT_HEADER])
+                                              CurlConnector.ACCEPT_HEADER,
+                                              self.basicauth_header])
+
+            if with_buffer:
+                write = self.buffer.write if not buffer else buffer.write
+                self.c.setopt(pycurl.WRITEFUNCTION, write)
 
             self.c.perform()
             return self.c.getinfo(pycurl.RESPONSE_CODE)
@@ -87,9 +116,11 @@ class CurlConnector:
         try:
             self.c.setopt(pycurl.URL, self.url + path)
             self.c.setopt(pycurl.POST, 1)
+
             self.c.setopt(pycurl.HTTPHEADER, [self.user_header,
                                               CurlConnector.CONTENT_TYPE_HEADER,
-                                              CurlConnector.ACCEPT_HEADER])
+                                              CurlConnector.ACCEPT_HEADER,
+                                              self.basicauth_header])
 
             self.c.setopt(pycurl.POSTFIELDS, data)
 
@@ -104,7 +135,7 @@ class CurlConnector:
         try:
             self.c.setopt(pycurl.URL, self.url + path)
             self.c.setopt(pycurl.POST, 1)
-            self.c.setopt(pycurl.HTTPHEADER, [self.user_header])
+            self.c.setopt(pycurl.HTTPHEADER, [self.user_header, self.basicauth_header])
 
             self.c.setopt(pycurl.HTTPPOST, post_body)
 
@@ -113,15 +144,49 @@ class CurlConnector:
 
             self.c.perform()
             self.c.setopt(pycurl.POST, 0)
-
             return self.c.getinfo(pycurl.RESPONSE_CODE)
-        except pycurl.error:
+        except pycurl.error as ex:
+            print(ex)
             return 111
 
-    def _check_schema(self, scheme):
-        if scheme == 'https':
-            self.c.setopt(pycurl.SSL_VERIFYPEER, 0)
-            self.c.setopt(pycurl.SSL_VERIFYHOST, 0)
+    def put_file(self, path, post_body, response_write_buffer=None):
+        curl = self.__build_default_curl()
+        curl.setopt(pycurl.URL, self.url + path)
+        curl.setopt(pycurl.HTTPHEADER, [self.user_header, self.basicauth_header])
+        curl.setopt(pycurl.CUSTOMREQUEST, "PUT")
+
+        curl.setopt(pycurl.HTTPPOST, post_body)
+
+        write = self.buffer.write if not response_write_buffer else response_write_buffer.write
+        curl.setopt(pycurl.WRITEFUNCTION, write)
+
+        curl.perform()
+        response_code = curl.getinfo(pycurl.RESPONSE_CODE)
+        curl.close()
+        return response_code
+
+    def __build_default_curl(self):
+        curl = pycurl.Curl()
+        if not self.__debug:
+            # disable printing not necessary logs in the terminal
+            curl.setopt(pycurl.WRITEFUNCTION, lambda x: None)
+        else:
+            curl.setopt(pycurl.VERBOSE, 1)
+
+        if self.__protocol == 'https':
+            curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+            curl.setopt(pycurl.SSL_VERIFYHOST, 0)
+            if self.__tls_cert is not None and self.__tls_key is not None:
+                curl.setopt(curl.SSLCERT, self.__tls_cert)
+                curl.setopt(curl.SSLKEY, self.__tls_key)
+                if self.__tls_key_pw is not None:
+                  curl.setopt(curl.KEYPASSWD, self.__tls_key_pw)
+                if self.__ca_cert is not None:
+                    curl.setopt(pycurl.SSL_VERIFYPEER, 1)
+                    curl.setopt(pycurl.SSL_VERIFYHOST, 2)
+                    curl.setopt(curl.CAINFO, self.__ca_cert)
+        curl.setopt(pycurl.HEADER, True)
+        return curl
 
     def __del__(self):
         self.c.close()