rename package name
[modeling/etsicatalog.git] / catalog / pub / Dmaap_lib / dmaap / consumer.py
1 # Copyright (c) 2019, CMCC Technologies Co., Ltd.
2 # Licensed under the Apache License, Version 2.0 (the "License")
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at
5 # http://www.apache.org/licenses/LICENSE-2.0
6 # Unless required by applicable law or agreed to in writing, software
7 # distributed under the License is distributed on an "AS IS" BASIS,
8 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
9 # See the License for the specific language governing permissions and
10 # limitations under the License.
11
12
13 import base64
14 import hmac
15 import json
16 import logging
17 import datetime
18 import requests
19 from hashlib import sha1
20
21 from ..pub.exceptions import DmaapClientException
22
23 logger = logging.getLogger(__name__)
24
25
26 class ConsumerClient:
27     def __init__(self, host, topic, consumer_group, consumer_id, timeout_ms=-1, limit=-1, filter='',
28                  api_key='', api_secret=''):
29         self.host = host
30         self.topic = topic
31         self.group = consumer_group
32         self.comsumer_id = consumer_id
33         self.timeout_ms = timeout_ms
34         self.limit = limit
35         self.filter = filter
36         self.api_key = api_key
37         self.api_secret = api_secret
38
39     def set_api_credentials(self, api_key, api_secret):
40
41         self.api_key = api_key
42         self.api_secret = api_secret
43
44     def create_url(self):
45         url = "http://%s/events/%s/%s/%s" % (self.host, self.topic, self.group, self.comsumer_id)
46         add_url = ""
47         if self.timeout_ms > -1:
48             add_url += "timeout=%s" % self.timeout_ms
49         if self.limit > -1:
50             if add_url:
51                 add_url += "&"
52             add_url += "limit=%s" % self.limit
53         if self.filter:
54             if add_url:
55                 add_url += "&"
56             add_url += "filter=%s" % self.filter.encode("utf-8")
57         if add_url:
58             url = url + "?" + add_url
59
60         return url
61
62     def create_headers(self):
63         data = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + '-04:00'
64         hmac_code = hmac.new(self.api_secret.encode(), data.encode(), sha1).digest()
65         signature = base64.b64encode(hmac_code).decode()
66         auth = self.api_key + ':' + signature
67         headers = {
68             'X-CambriaDate': data,
69             'X-CambriaAuth': auth
70         }
71         return headers
72
73     def fetch(self):
74         try:
75             msgs = []
76             url = self.create_url()
77             if self.api_key:
78                 headers = self.create_headers()
79                 ret = requests.get(url=url, headers=headers)
80             else:
81                 ret = requests.get(url)
82             logger.info("Status code is %s, detail is %s.", ret.status_code, ret.json())
83             if ret.status_code != 200:
84                 raise DmaapClientException('Call dmaap failed. Status code is %s, detail is %s.' % (ret.status_code, ret.json()))
85             data = ret.json()
86             for msg in data:
87                 msg = json.loads(msg)
88                 msgs.append(msg)
89             return msgs
90         except Exception as e:
91             raise DmaapClientException(e.message)