Remove ca-cert from docker image
[optf/osdf.git] / osdf / adapters / dcae / message_router.py
1 # -------------------------------------------------------------------------
2 #   Copyright (c) 2015-2017 AT&T Intellectual Property
3 #
4 #   Licensed under the Apache License, Version 2.0 (the "License");
5 #   you may not use this file except in compliance with the License.
6 #   You may obtain a copy of the License at
7 #
8 #       http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #   Unless required by applicable law or agreed to in writing, software
11 #   distributed under the License is distributed on an "AS IS" BASIS,
12 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #   See the License for the specific language governing permissions and
14 #   limitations under the License.
15 #
16 # -------------------------------------------------------------------------
17 #
18
19 import requests
20
21 from osdf.operation.exceptions import MessageBusConfigurationException
22 from osdf.utils.data_types import list_like
23 from osdf.utils.interfaces import RestClient
24
25
26 class MessageRouterClient(object):
27     def __init__(self,
28                  dmaap_url='',
29                  consumer_group_id=':',
30                  timeout_ms=15000, fetch_limit=1000,
31                  userid_passwd=':'):
32         """Class initializer
33
34         :param dmaap_url: protocol, host and port; can also be a list of URLs
35                (e.g. https://dmaap-host.onapdemo.onap.org:3905/events/org.onap.dmaap.MULTICLOUD.URGENT),
36                can also be a list of such URLs
37         :param consumer_group_id: DMaaP consumer group and consumer id (':' separated)
38                consumer_group is unique for each subscriber; required for GET
39                consumer_id: DMaaP consumer ID (unique for each thread/process for a subscriber; required for GET)
40         :param timeout_ms: (optional, default 15 seconds or 15,000 ms) server-side timeout for GET request
41         :param fetch_limit: (optional, default 1000 messages per request for GET), ignored for "POST"
42         :param userid_passwd: (optional, userid:password for HTTP basic authentication)
43         """
44         mr_error = MessageBusConfigurationException
45         if not dmaap_url:  # definitely not DMaaP, so use UEB mode
46             raise mr_error("Not a valid DMaaP configuration")
47         self.topic_urls = [dmaap_url] if not list_like(dmaap_url) else dmaap_url
48         self.timeout_ms = timeout_ms
49         self.fetch_limit = fetch_limit
50         self.userid, self.passwd = userid_passwd.split(':')
51         consumer_group, consumer_id = consumer_group_id.split(':')
52         self.consumer_group = consumer_group
53         self.consumer_id = consumer_id
54
55     def get(self, outputjson=True):
56         """Fetch messages from message router (DMaaP or UEB)
57
58         :param outputjson: (optional, specifies if response is expected to be in json format), ignored for "POST"
59         :return: response as a json object (if outputjson is True) or as a string
60         """
61         url_fmt = "{topic_url}/{cgroup}/{cid}?timeout={timeout_ms}&limit={limit}"
62         urls = [url_fmt.format(topic_url=x, timeout_ms=self.timeout_ms, limit=self.fetch_limit,
63                                cgroup=self.consumer_group, cid=self.consumer_id) for x in self.topic_urls]
64         for url in urls[:-1]:
65             try:
66                 return self.http_request(method='GET', url=url, outputjson=outputjson)
67             except Exception:
68                 pass
69         return self.http_request(method='GET', url=urls[-1], outputjson=outputjson)
70
71     def post(self, msg, inputjson=True):
72         for url in self.topic_urls[:-1]:
73             try:
74                 return self.http_request(method='POST', url=url, inputjson=inputjson, msg=msg)
75             except Exception:
76                 pass
77         return self.http_request(method='POST', url=self.topic_urls[-1], inputjson=inputjson, msg=msg)
78
79     def http_request(self, url, method, inputjson=True, outputjson=True, msg=None, **kwargs):
80         """Perform the actual URL request (GET or POST), and do error handling
81
82         :param url: full URL (including topic, limit, timeout, etc.)
83         :param method: GET or POST
84         :param inputjson: Specify whether input is in json format (valid only for POST)
85         :param outputjson: Is response expected in a json format
86         :param msg: content to be posted (valid only for POST)
87         :return: response as a json object (if outputjson or POST) or as a string; None if error
88         """
89
90         rc = RestClient(userid=self.userid, passwd=self.passwd, url=url, method=method)
91         try:
92             res = rc.request(raw_response=True, data=msg, **kwargs)
93             if res.status_code == requests.codes.ok:
94                 return res.json() if outputjson or method == "POST" else res.content
95             else:
96                 raise Exception("HTTP Response Error: code {}; headers:{}, content: {}".format(
97                     res.status_code, res.headers, res.content))
98
99         except requests.RequestException as ex:
100             raise Exception("Request Exception occurred {}".format(str(ex)))