Remove ca-cert from docker image
[optf/osdf.git] / osdf / adapters / dcae / message_router.py
index e495331..0968812 100755 (executable)
 #
 
 import requests
-from osdf.utils.data_types import list_like
+
 from osdf.operation.exceptions import MessageBusConfigurationException
+from osdf.utils.data_types import list_like
+from osdf.utils.interfaces import RestClient
 
 
 class MessageRouterClient(object):
     def __init__(self,
-                 dmaap_url=None,
-                 mr_host_base_urls=None,
-                 topic=None,
-                 consumer_group=None, consumer_id=None,
+                 dmaap_url='',
+                 consumer_group_id=':',
                  timeout_ms=15000, fetch_limit=1000,
-                 userid=None, passwd=None):
-        """
-        :param dmaap_url: protocol, host and port; mostly for UEB
-               (e.g. https://dcae-msrt-ftl.homer.att.com:3905/)
-        :param mr_host_base_urls: for DMaaP, we get a topic URL (base_url + events/topic_name)
-               (e.g. https://dcae-msrt-ftl.homer.att.com:3905/events/com.att.dcae.dmaap.FTL.SNIRO-CM-SCHEDULER-RESPONSE)
-        :param consumer_group: DMaaP/UEB consumer group (unique for each subscriber; required for GET)
-        :param consumer_id: DMaaP/UEB consumer ID (unique for each thread/process for a subscriber; required for GET)
+                 userid_passwd=':'):
+        """Class initializer
+
+        :param dmaap_url: protocol, host and port; can also be a list of URLs
+               (e.g. https://dmaap-host.onapdemo.onap.org:3905/events/org.onap.dmaap.MULTICLOUD.URGENT),
+               can also be a list of such URLs
+        :param consumer_group_id: DMaaP consumer group and consumer id (':' separated)
+               consumer_group is unique for each subscriber; required for GET
+               consumer_id: DMaaP consumer ID (unique for each thread/process for a subscriber; required for GET)
         :param timeout_ms: (optional, default 15 seconds or 15,000 ms) server-side timeout for GET request
         :param fetch_limit: (optional, default 1000 messages per request for GET), ignored for "POST"
-        :param userid: (optional, userid for HTTP basic authentication)
-        :param passwd: (optional, password for HTTP basic authentication)
+        :param userid_passwd: (optional, userid:password for HTTP basic authentication)
         """
         mr_error = MessageBusConfigurationException
-        if dmaap_url is None:  # definitely not DMaaP, so use UEB mode
-            self.is_dmaap = False
-            if not (mr_host_base_urls and list_like(mr_host_base_urls)):
-                raise mr_error("Not a DMaaP or UEB configuration")
-            if not topic:
-                raise mr_error("Invalid topic: '{}'",format(topic))
-            self.topic_urls = ["{}/events/{}".format(base_url, topic) for base_url in mr_host_base_urls]
-        else:
-            self.is_dmaap = True
-            self.topic_urls = [dmaap_url]
-
+        if not dmaap_url:  # definitely not DMaaP, so use UEB mode
+            raise mr_error("Not a valid DMaaP configuration")
+        self.topic_urls = [dmaap_url] if not list_like(dmaap_url) else dmaap_url
         self.timeout_ms = timeout_ms
         self.fetch_limit = fetch_limit
-        self.auth = (userid, passwd) if userid and passwd else None
+        self.userid, self.passwd = userid_passwd.split(':')
+        consumer_group, consumer_id = consumer_group_id.split(':')
         self.consumer_group = consumer_group
         self.consumer_id = consumer_id
 
     def get(self, outputjson=True):
         """Fetch messages from message router (DMaaP or UEB)
+
         :param outputjson: (optional, specifies if response is expected to be in json format), ignored for "POST"
         :return: response as a json object (if outputjson is True) or as a string
         """
@@ -70,7 +64,7 @@ class MessageRouterClient(object):
         for url in urls[:-1]:
             try:
                 return self.http_request(method='GET', url=url, outputjson=outputjson)
-            except:
+            except Exception:
                 pass
         return self.http_request(method='GET', url=urls[-1], outputjson=outputjson)
 
@@ -78,13 +72,13 @@ class MessageRouterClient(object):
         for url in self.topic_urls[:-1]:
             try:
                 return self.http_request(method='POST', url=url, inputjson=inputjson, msg=msg)
-            except:
+            except Exception:
                 pass
         return self.http_request(method='POST', url=self.topic_urls[-1], inputjson=inputjson, msg=msg)
 
     def http_request(self, url, method, inputjson=True, outputjson=True, msg=None, **kwargs):
-        """
-        Perform the actual URL request (GET or POST), and do error handling
+        """Perform the actual URL request (GET or POST), and do error handling
+
         :param url: full URL (including topic, limit, timeout, etc.)
         :param method: GET or POST
         :param inputjson: Specify whether input is in json format (valid only for POST)
@@ -92,9 +86,15 @@ class MessageRouterClient(object):
         :param msg: content to be posted (valid only for POST)
         :return: response as a json object (if outputjson or POST) or as a string; None if error
         """
-        res = requests.request(url=url, method=method, auth=self.auth, **kwargs)
-        if res.status_code == requests.codes.ok:
-            return res.json() if outputjson or method == "POST" else res.content
-        else:
-            raise Exception("HTTP Response Error: code {}; headers:{}, content: {}".format(
-                res.status_code, res.headers, res.content))
+
+        rc = RestClient(userid=self.userid, passwd=self.passwd, url=url, method=method)
+        try:
+            res = rc.request(raw_response=True, data=msg, **kwargs)
+            if res.status_code == requests.codes.ok:
+                return res.json() if outputjson or method == "POST" else res.content
+            else:
+                raise Exception("HTTP Response Error: code {}; headers:{}, content: {}".format(
+                    res.status_code, res.headers, res.content))
+
+        except requests.RequestException as ex:
+            raise Exception("Request Exception occurred {}".format(str(ex)))