# Copyright (c) 2019, CMCC Technologies. Co., Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import base64 import datetime import hmac import json import logging import time from hashlib import sha1 import requests from apscheduler.schedulers.background import BackgroundScheduler from catalog.pub.Dmaap_lib.pub.exceptions import DmaapClientException logger = logging.getLogger(__name__) class BatchPublisherClient: def __init__(self, base_url, topic, partition="", contenttype="text/plain", max_batch_size=100, max_batch_age_ms=1000): self.base_url = base_url self.topic = topic self.partition = partition self.contenttype = contenttype self.max_batch_size = max_batch_size self.max_batch_age_ms = max_batch_age_ms self.pending = [] self.closed = False self.dont_send_until_ms = 0 self.scheduler = BackgroundScheduler() # @self.scheduler.interval_schedule(second=1) @self.scheduler.scheduled_job(second=1) def crawl_job(): self.send_message(False) self.scheduler.start() def set_api_credentials(self, api_key, api_secret): self.api_key = api_key self.api_secret = api_secret def send(self, partition, msg): try: if self.closed: raise DmaapClientException("The publisher was closed.") message = Message(partition, msg) self.pending.append(message) return len(self.pending) except Exception as e: raise DmaapClientException("append message failed: " + e.message) def send_message(self, force): if force or self.should_send_now(): if not self.send_batch(): logger.error("Send failed, %s message to send.", len(self.pending)) def should_send_now(self): should_send = False if len(self.pending) > 0: now_ms = int(time.time() * 1000) should_send = len(self.pending) >= self.max_batch_size if not should_send: send_at_ms = self.pending[0].timestamp_ms should_send = send_at_ms <= now_ms should_send = should_send and now_ms >= self.dont_send_until_ms return should_send def send_batch(self): if len(self.pending) < 1: return True now_ms = int(time.time() * 1000) url = self.create_url() logger.info("sending %s msgs to %s . Oldest: %s ms", len(self.pending), url, str(now_ms - self.pending[0].timestamp_ms)) try: str_msg = '' if self.contenttype == "application/json": str_msg = self.parse_json() elif self.contenttype == "text/plain": for m in self.pending: str_msg += m.msg str_msg += '\n' elif self.contenttype == "application/cambria": for m in self.pending: str_msg += str(len(m.partition)) str_msg += '.' str_msg += str(len(m.msg)) str_msg += '.' str_msg += m.partition str_msg += m.msg str_msg += '\n' else: for m in self.pending: str_msg += m.msg msg = bytearray(str_msg) start_ms = int(time.time() * 1000) if self.api_key: headers = self.create_headers() else: headers = {'content-type': self.contenttype} ret = requests.post(url=url, data=msg, headers=headers) if ret.status_code < 200 or ret.status_code > 299: return False logger.info("MR reply ok (%s ms): %s", start_ms - int(time.time() * 1000), ret.json()) self.pending = [] return True except Exception as e: logger.error(e.message) return False def create_headers(self): data = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + '-04:00' hmac_code = hmac.new(self.api_secret.encode(), data.encode(), sha1).digest() signature = base64.b64encode(hmac_code).decode() auth = self.api_key + ':' + signature headers = { 'X-CambriaDate': data, 'X-CambriaAuth': auth, 'content-type': self.contenttype } return headers def create_url(self): url = self.base_url + "/events/%s" % self.topic if self.partition: url = url + "?partitionKey=" + self.partition return url def parse_json(self): data = [] for message in self.pending: msg = json.loads(message.msg) for m in msg: data.append(m) return json.dumps(data) def close(self, timeout): try: self.closed = True self.scheduler.shutdown() now_ms = int(time.time() * 1000) wait_in_ms = now_ms + timeout * 1000 while int(time.time() * 1000) < wait_in_ms and len(self.pending) > 0: self.send_message(True) time.sleep(0.25) return self.pending except Exception as e: raise DmaapClientException("send message failed: " + e.message) class Message: def __init__(self, partition, msg): if not partition: self.partition = "" else: self.partition = partition self.msg = msg self.timestamp_ms = int(time.time() * 1000)