643ba90b071a54048345a2fb7087e64a06768a4a
[modeling/etsicatalog.git] / catalog / pub / Dmaap-lib / dmaap / publisher.py
1 # Copyright (c) 2019, CMCC Technologies Co., Ltd.
2 # Licensed under the Apache License, Version 2.0 (the "License")
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at
5 # http://www.apache.org/licenses/LICENSE-2.0
6 # Unless required by applicable law or agreed to in writing, software
7 # distributed under the License is distributed on an "AS IS" BASIS,
8 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
9 # See the License for the specific language governing permissions and
10 # limitations under the License.
11
12
13 import base64
14 import datetime
15 import hmac
16 import json
17 import logging
18 import time
19 from hashlib import sha1
20
21 import requests
22 from apscheduler.scheduler import Scheduler
23
24 from ..pub.exceptions import DmaapClientException
25
26 logger = logging.getLogger(__name__)
27
28
29 class BatchPublisherClient:
30     def __init__(self, host, topic, partition="", contenttype="text/plain", max_batch_size=100, max_batch_age_ms=1000):
31         self.host = host
32         self.topic = topic
33         self.partition = partition
34         self.contenttype = contenttype
35         self.max_batch_size = max_batch_size
36         self.max_batch_age_ms = max_batch_age_ms
37         self.pending = []
38         self.closed = False
39         self.dont_send_until_ms = 0
40         self.scheduler = Scheduler(standalone=False)
41         self.api_key = '',
42         self.api_secret = ''
43
44         @self.scheduler.interval_schedule(second=1)
45         def crawl_job():
46             self.send_message(False)
47         self.scheduler.start()
48
49     def set_api_credentials(self, api_key, api_secret):
50         self.api_key = api_key
51         self.api_secret = api_secret
52
53     def send(self, partition, msg):
54         try:
55             if self.closed:
56                 raise DmaapClientException("The publisher was closed.")
57             message = Message(partition, msg)
58             self.pending.append(message)
59             return len(self.pending)
60         except Exception as e:
61             raise DmaapClientException("append message failed: " + e.message)
62
63     def send_message(self, force):
64         if force or self.should_send_now():
65             if not self.send_batch():
66                 logger.error("Send failed, %s message to send.", len(self.pending))
67
68     def should_send_now(self):
69         should_send = False
70         if len(self.pending) > 0:
71             now_ms = int(time.time() * 1000)
72             should_send = len(self.pending) >= self.max_batch_size
73             if not should_send:
74                 send_at_ms = self.pending[0].timestamp_ms
75                 should_send = send_at_ms <= now_ms
76
77             should_send = should_send and now_ms >= self.dont_send_until_ms
78
79         return should_send
80
81     def send_batch(self):
82         if len(self.pending) < 1:
83             return True
84         now_ms = int(time.time() * 1000)
85         url = self.create_url()
86         logger.info("sending %s msgs to %s . Oldest: %s ms", len(self.pending), url,
87                     str(now_ms - self.pending[0].timestamp_ms))
88         try:
89             str_msg = ''
90             if self.contenttype == "application/json":
91                 str_msg = self.parse_json()
92             elif self.contenttype == "text/plain":
93                 for m in self.pending:
94                     str_msg += m.msg
95                     str_msg += '\n'
96             elif self.contenttype == "application/cambria":
97                 for m in self.pending:
98                     str_msg += str(len(m.partition))
99                     str_msg += '.'
100                     str_msg += str(len(m.msg))
101                     str_msg += '.'
102                     str_msg += m.partition
103                     str_msg += m.msg
104                     str_msg += '\n'
105             else:
106                 for m in self.pending:
107                     str_msg += m.msg
108             msg = bytearray(str_msg)
109
110             start_ms = int(time.time() * 1000)
111             if self.api_key:
112                 headers = self.create_headers()
113             else:
114                 headers = {'content-type': self.contenttype}
115             ret = requests.post(url=url, data=msg, headers=headers)
116             if ret.status_code < 200 or ret.status_code > 299:
117                 return False
118             logger.info("MR reply ok (%s ms): %s", start_ms - int(time.time() * 1000), ret.json())
119             self.pending = []
120             return True
121
122         except Exception as e:
123             logger.error(e.message)
124             return False
125
126     def create_headers(self):
127         data = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + '-04:00'
128         hmac_code = hmac.new(self.api_secret.encode(), data.encode(), sha1).digest()
129         signature = base64.b64encode(hmac_code).decode()
130         auth = self.api_key + ':' + signature
131         headers = {
132             'X-CambriaDate': data,
133             'X-CambriaAuth': auth,
134             'content-type': self.contenttype
135         }
136         return headers
137
138     def create_url(self):
139         url = "http://%s/events/%s" % (self.host, self.topic)
140         if self.partition:
141             url = url + "?partitionKey=" + self.partition
142         return url
143
144     def parse_json(self):
145         data = []
146         for message in self.pending:
147             msg = json.loads(message.msg)
148             for m in msg:
149                 data.append(m)
150         return json.dumps(data)
151
152     def close(self, timeout):
153         try:
154             self.closed = True
155             self.scheduler.shutdown()
156             now_ms = int(time.time() * 1000)
157             wait_in_ms = now_ms + timeout * 1000
158
159             while int(time.time() * 1000) < wait_in_ms and len(self.pending) > 0:
160                 self.send_message(True)
161                 time.sleep(0.25)
162             return self.pending
163         except Exception as e:
164             raise DmaapClientException("send message failed: " + e.message)
165
166
167 class Message:
168     def __init__(self, partition, msg):
169         if not partition:
170             self.partition = ""
171         else:
172             self.partition = partition
173         self.msg = msg
174         self.timestamp_ms = int(time.time() * 1000)