Update etsicatalog code to get notification message from sdc
[modeling/etsicatalog.git] / catalog / pub / Dmaap_lib / dmaap / consumer.py
1 # Copyright (c) 2019, CMCC Technologies. Co., Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import base64
16 import datetime
17 import hmac
18 import json
19 import logging
20 from hashlib import sha1
21
22 import requests
23
24 from catalog.pub.Dmaap_lib.pub.exceptions import DmaapClientException
25
26 requests.packages.urllib3.disable_warnings()
27 logger = logging.getLogger(__name__)
28
29
30 class ConsumerClient:
31     def __init__(self, base_url, topic, consumer_group, consumer_id, timeout_ms=-1, limit=-1, filter=''):
32         self.base_url = base_url
33         self.topic = topic
34         self.group = consumer_group
35         self.comsumer_id = consumer_id
36         self.timeout_ms = timeout_ms
37         self.limit = limit
38         self.filter = filter
39
40     def set_api_credentials(self, api_key, api_secret):
41         self.api_key = api_key
42         self.api_secret = api_secret
43
44     def create_url(self):
45         url = self.base_url + "/events/%s/%s/%s" % (self.topic, self.group, self.comsumer_id)
46         add_url = ""
47         if self.timeout_ms > -1:
48             add_url += "timeout=%s" % self.timeout_ms
49         if self.limit > -1:
50             if add_url:
51                 add_url += "&"
52             add_url += "limit=%s" % self.limit
53         if self.filter:
54             if add_url:
55                 add_url += "&"
56             add_url += "filter=%s" % self.filter.encode("utf-8")
57         if add_url:
58             url = url + "?" + add_url
59
60         return url
61
62     def create_headers(self):
63         data = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + '-04:00'
64         hmac_code = hmac.new(self.api_secret.encode(), data.encode(), sha1).digest()
65         signature = base64.b64encode(hmac_code).decode()
66         auth = self.api_key + ':' + signature
67         headers = {
68             'X-CambriaDate': data,
69             'X-CambriaAuth': auth
70         }
71         return headers
72
73     def fetch(self):
74         try:
75             msgs = []
76             url = self.create_url()
77             if self.api_key:
78                 headers = self.create_headers()
79                 ret = requests.get(url=url, headers=headers, verify=False)
80             else:
81                 ret = requests.get(url)
82             logger.info("Status code is %s, detail is %s.", ret.status_code, ret.json())
83             if ret.status_code != 200:
84                 raise DmaapClientException(
85                     'Call dmaap failed. Status code is %s, detail is %s.' % (ret.status_code, ret.json()))
86             data = ret.json()
87             for msg in data:
88                 msg = json.loads(msg)
89                 msgs.append(msg)
90             return msgs
91         except Exception as e:
92             raise DmaapClientException(e.message)