e49533149a83e56dcf9cc936e8e0985d21b688f8
[optf/osdf.git] / adapters / dcae / message_router.py
1 # -------------------------------------------------------------------------
2 #   Copyright (c) 2015-2017 AT&T Intellectual Property
3 #
4 #   Licensed under the Apache License, Version 2.0 (the "License");
5 #   you may not use this file except in compliance with the License.
6 #   You may obtain a copy of the License at
7 #
8 #       http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #   Unless required by applicable law or agreed to in writing, software
11 #   distributed under the License is distributed on an "AS IS" BASIS,
12 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #   See the License for the specific language governing permissions and
14 #   limitations under the License.
15 #
16 # -------------------------------------------------------------------------
17 #
18
19 import requests
20 from osdf.utils.data_types import list_like
21 from osdf.operation.exceptions import MessageBusConfigurationException
22
23
24 class MessageRouterClient(object):
25     def __init__(self,
26                  dmaap_url=None,
27                  mr_host_base_urls=None,
28                  topic=None,
29                  consumer_group=None, consumer_id=None,
30                  timeout_ms=15000, fetch_limit=1000,
31                  userid=None, passwd=None):
32         """
33         :param dmaap_url: protocol, host and port; mostly for UEB
34                (e.g. https://dcae-msrt-ftl.homer.att.com:3905/)
35         :param mr_host_base_urls: for DMaaP, we get a topic URL (base_url + events/topic_name)
36                (e.g. https://dcae-msrt-ftl.homer.att.com:3905/events/com.att.dcae.dmaap.FTL.SNIRO-CM-SCHEDULER-RESPONSE)
37         :param consumer_group: DMaaP/UEB consumer group (unique for each subscriber; required for GET)
38         :param consumer_id: DMaaP/UEB consumer ID (unique for each thread/process for a subscriber; required for GET)
39         :param timeout_ms: (optional, default 15 seconds or 15,000 ms) server-side timeout for GET request
40         :param fetch_limit: (optional, default 1000 messages per request for GET), ignored for "POST"
41         :param userid: (optional, userid for HTTP basic authentication)
42         :param passwd: (optional, password for HTTP basic authentication)
43         """
44         mr_error = MessageBusConfigurationException
45         if dmaap_url is None:  # definitely not DMaaP, so use UEB mode
46             self.is_dmaap = False
47             if not (mr_host_base_urls and list_like(mr_host_base_urls)):
48                 raise mr_error("Not a DMaaP or UEB configuration")
49             if not topic:
50                 raise mr_error("Invalid topic: '{}'",format(topic))
51             self.topic_urls = ["{}/events/{}".format(base_url, topic) for base_url in mr_host_base_urls]
52         else:
53             self.is_dmaap = True
54             self.topic_urls = [dmaap_url]
55
56         self.timeout_ms = timeout_ms
57         self.fetch_limit = fetch_limit
58         self.auth = (userid, passwd) if userid and passwd else None
59         self.consumer_group = consumer_group
60         self.consumer_id = consumer_id
61
62     def get(self, outputjson=True):
63         """Fetch messages from message router (DMaaP or UEB)
64         :param outputjson: (optional, specifies if response is expected to be in json format), ignored for "POST"
65         :return: response as a json object (if outputjson is True) or as a string
66         """
67         url_fmt = "{topic_url}/{cgroup}/{cid}?timeout={timeout_ms}&limit={limit}"
68         urls = [url_fmt.format(topic_url=x, timeout_ms=self.timeout_ms, limit=self.fetch_limit,
69                                cgroup=self.consumer_group, cid=self.consumer_id) for x in self.topic_urls]
70         for url in urls[:-1]:
71             try:
72                 return self.http_request(method='GET', url=url, outputjson=outputjson)
73             except:
74                 pass
75         return self.http_request(method='GET', url=urls[-1], outputjson=outputjson)
76
77     def post(self, msg, inputjson=True):
78         for url in self.topic_urls[:-1]:
79             try:
80                 return self.http_request(method='POST', url=url, inputjson=inputjson, msg=msg)
81             except:
82                 pass
83         return self.http_request(method='POST', url=self.topic_urls[-1], inputjson=inputjson, msg=msg)
84
85     def http_request(self, url, method, inputjson=True, outputjson=True, msg=None, **kwargs):
86         """
87         Perform the actual URL request (GET or POST), and do error handling
88         :param url: full URL (including topic, limit, timeout, etc.)
89         :param method: GET or POST
90         :param inputjson: Specify whether input is in json format (valid only for POST)
91         :param outputjson: Is response expected in a json format
92         :param msg: content to be posted (valid only for POST)
93         :return: response as a json object (if outputjson or POST) or as a string; None if error
94         """
95         res = requests.request(url=url, method=method, auth=self.auth, **kwargs)
96         if res.status_code == requests.codes.ok:
97             return res.json() if outputjson or method == "POST" else res.content
98         else:
99             raise Exception("HTTP Response Error: code {}; headers:{}, content: {}".format(
100                 res.status_code, res.headers, res.content))