update Dmaap lib
[modeling/etsicatalog.git] / catalog / pub / Dmaap_lib / dmaap / consumer.py
1 # Copyright (c) 2019, CMCC Technologies. Co., Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import base64
16 import datetime
17 import hmac
18 import json
19 import logging
20 from hashlib import sha1
21
22 import requests
23
24 from catalog.pub.Dmaap_lib.pub.exceptions import DmaapClientException
25
26 logger = logging.getLogger(__name__)
27
28
29 class ConsumerClient:
30     def __init__(self, base_url, topic, consumer_group, consumer_id, timeout_ms=-1, limit=-1, filter=''):
31         self.base_url = base_url
32         self.topic = topic
33         self.group = consumer_group
34         self.comsumer_id = consumer_id
35         self.timeout_ms = timeout_ms
36         self.limit = limit
37         self.filter = filter
38
39     def set_api_credentials(self, api_key, api_secret):
40         self.api_key = api_key
41         self.api_secret = api_secret
42
43     def create_url(self):
44         url = self.base_url + "/events/%s/%s/%s" % (self.topic, self.group, self.comsumer_id)
45         add_url = ""
46         if self.timeout_ms > -1:
47             add_url += "timeout=%s" % self.timeout_ms
48         if self.limit > -1:
49             if add_url:
50                 add_url += "&"
51             add_url += "limit=%s" % self.limit
52         if self.filter:
53             if add_url:
54                 add_url += "&"
55             add_url += "filter=%s" % self.filter.encode("utf-8")
56         if add_url:
57             url = url + "?" + add_url
58
59         return url
60
61     def create_headers(self):
62         data = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + '-04:00'
63         hmac_code = hmac.new(self.api_secret.encode(), data.encode(), sha1).digest()
64         signature = base64.b64encode(hmac_code).decode()
65         auth = self.api_key + ':' + signature
66         headers = {
67             'X-CambriaDate': data,
68             'X-CambriaAuth': auth
69         }
70         return headers
71
72     def fetch(self):
73         try:
74             msgs = []
75             url = self.create_url()
76             if self.api_key:
77                 headers = self.create_headers()
78                 ret = requests.get(url=url, headers=headers)
79             else:
80                 ret = requests.get(url)
81             logger.info("Status code is %s, detail is %s.", ret.status_code, ret.json())
82             if ret.status_code != 200:
83                 raise DmaapClientException(
84                     'Call dmaap failed. Status code is %s, detail is %s.' % (ret.status_code, ret.json()))
85             data = ret.json()
86             for msg in data:
87                 msg = json.loads(msg)
88                 msgs.append(msg)
89             return msgs
90         except Exception as e:
91             raise DmaapClientException(e.message)