update Dmaap lib
[modeling/etsicatalog.git] / catalog / pub / Dmaap_lib / dmaap / publisher.py
1 # Copyright (c) 2019, CMCC Technologies. Co., Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import base64
16 import datetime
17 import hmac
18 import json
19 import logging
20 import time
21 from hashlib import sha1
22
23 import requests
24 from apscheduler.scheduler import Scheduler
25
26 from catalog.pub.Dmaap_lib.pub.exceptions import DmaapClientException
27
28 logger = logging.getLogger(__name__)
29
30
31 class BatchPublisherClient:
32     def __init__(self, base_url, topic, partition="", contenttype="text/plain", max_batch_size=100,
33                  max_batch_age_ms=1000):
34         self.base_url = base_url
35         self.topic = topic
36         self.partition = partition
37         self.contenttype = contenttype
38         self.max_batch_size = max_batch_size
39         self.max_batch_age_ms = max_batch_age_ms
40         self.pending = []
41         self.closed = False
42         self.dont_send_until_ms = 0
43         self.scheduler = Scheduler(standalone=False)
44
45         @self.scheduler.interval_schedule(second=1)
46         def crawl_job():
47             self.send_message(False)
48
49         self.scheduler.start()
50
51     def set_api_credentials(self, api_key, api_secret):
52         self.api_key = api_key
53         self.api_secret = api_secret
54
55     def send(self, partition, msg):
56         try:
57             if self.closed:
58                 raise DmaapClientException("The publisher was closed.")
59             message = Message(partition, msg)
60             self.pending.append(message)
61             return len(self.pending)
62         except Exception as e:
63             raise DmaapClientException("append message failed: " + e.message)
64
65     def send_message(self, force):
66         if force or self.should_send_now():
67             if not self.send_batch():
68                 logger.error("Send failed, %s message to send.", len(self.pending))
69
70     def should_send_now(self):
71         should_send = False
72         if len(self.pending) > 0:
73             now_ms = int(time.time() * 1000)
74             should_send = len(self.pending) >= self.max_batch_size
75             if not should_send:
76                 send_at_ms = self.pending[0].timestamp_ms
77                 should_send = send_at_ms <= now_ms
78
79             should_send = should_send and now_ms >= self.dont_send_until_ms
80
81         return should_send
82
83     def send_batch(self):
84         if len(self.pending) < 1:
85             return True
86         now_ms = int(time.time() * 1000)
87         url = self.create_url()
88         logger.info("sending %s msgs to %s . Oldest: %s ms", len(self.pending), url,
89                     str(now_ms - self.pending[0].timestamp_ms))
90         try:
91             str_msg = ''
92             if self.contenttype == "application/json":
93                 str_msg = self.parse_json()
94             elif self.contenttype == "text/plain":
95                 for m in self.pending:
96                     str_msg += m.msg
97                     str_msg += '\n'
98             elif self.contenttype == "application/cambria":
99                 for m in self.pending:
100                     str_msg += str(len(m.partition))
101                     str_msg += '.'
102                     str_msg += str(len(m.msg))
103                     str_msg += '.'
104                     str_msg += m.partition
105                     str_msg += m.msg
106                     str_msg += '\n'
107             else:
108                 for m in self.pending:
109                     str_msg += m.msg
110             msg = bytearray(str_msg)
111
112             start_ms = int(time.time() * 1000)
113             if self.api_key:
114                 headers = self.create_headers()
115             else:
116                 headers = {'content-type': self.contenttype}
117             ret = requests.post(url=url, data=msg, headers=headers)
118             if ret.status_code < 200 or ret.status_code > 299:
119                 return False
120             logger.info("MR reply ok (%s ms): %s", start_ms - int(time.time() * 1000), ret.json())
121             self.pending = []
122             return True
123
124         except Exception as e:
125             logger.error(e.message)
126             return False
127
128     def create_headers(self):
129         data = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + '-04:00'
130         hmac_code = hmac.new(self.api_secret.encode(), data.encode(), sha1).digest()
131         signature = base64.b64encode(hmac_code).decode()
132         auth = self.api_key + ':' + signature
133         headers = {
134             'X-CambriaDate': data,
135             'X-CambriaAuth': auth,
136             'content-type': self.contenttype
137         }
138         return headers
139
140     def create_url(self):
141         url = self.base_url + "/events/%s" % self.topic
142         if self.partition:
143             url = url + "?partitionKey=" + self.partition
144         return url
145
146     def parse_json(self):
147         data = []
148         for message in self.pending:
149             msg = json.loads(message.msg)
150             for m in msg:
151                 data.append(m)
152         return json.dumps(data)
153
154     def close(self, timeout):
155         try:
156             self.closed = True
157             self.scheduler.shutdown()
158             now_ms = int(time.time() * 1000)
159             wait_in_ms = now_ms + timeout * 1000
160
161             while int(time.time() * 1000) < wait_in_ms and len(self.pending) > 0:
162                 self.send_message(True)
163                 time.sleep(0.25)
164             return self.pending
165         except Exception as e:
166             raise DmaapClientException("send message failed: " + e.message)
167
168
169 class Message:
170     def __init__(self, partition, msg):
171         if not partition:
172             self.partition = ""
173         else:
174             self.partition = partition
175         self.msg = msg
176         self.timestamp_ms = int(time.time() * 1000)