Merge "optimize catalog api"
[vfc/nfvo/catalog.git] / catalog / pub / Dmaap-lib / nfvo-dmaap-python-lib / nfvo-dmaap-nfvo-v4.0-devel-ns / client / dmaap / publisher.py
1 # Copyright (c) 2019, CMCC Technologies Co., Ltd.
2 # Licensed under the Apache License, Version 2.0 (the "License")
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at
5 # http://www.apache.org/licenses/LICENSE-2.0
6 # Unless required by applicable law or agreed to in writing, software
7 # distributed under the License is distributed on an "AS IS" BASIS,
8 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
9 # See the License for the specific language governing permissions and
10 # limitations under the License.
11
12
13 import base64
14 import datetime
15 import hmac
16 import json
17 import logging
18 import time
19 from hashlib import sha1
20
21 import requests
22 from apscheduler.scheduler import Scheduler
23
24 from client.pub.exceptions import DmaapClientException
25
26 logger = logging.getLogger(__name__)
27
28
29 class BatchPublisherClient:
30     def __init__(self, host, topic, partition="", contenttype="text/plain", max_batch_size=100, max_batch_age_ms=1000):
31         self.host = host
32         self.topic = topic
33         self.partition = partition
34         self.contenttype = contenttype
35         self.max_batch_size = max_batch_size
36         self.max_batch_age_ms = max_batch_age_ms
37         self.pending = []
38         self.closed = False
39         self.dont_send_until_ms = 0
40         self.scheduler = Scheduler(standalone=False)
41
42         @self.scheduler.interval_schedule(second=1)
43         def crawl_job():
44             self.send_message(False)
45         self.scheduler.start()
46
47     def set_api_credentials(self, api_key, api_secret):
48         self.api_key = api_key
49         self.api_secret = api_secret
50
51     def send(self, partition, msg):
52         try:
53             if self.closed:
54                 raise DmaapClientException("The publisher was closed.")
55             message = Message(partition, msg)
56             self.pending.append(message)
57             return len(self.pending)
58         except Exception as e:
59             raise DmaapClientException("append message failed: " + e.message)
60
61     def send_message(self, force):
62         if force or self.should_send_now():
63             if not self.send_batch():
64                 logger.error("Send failed, %s message to send.", len(self.pending))
65
66     def should_send_now(self):
67         should_send = False
68         if len(self.pending) > 0:
69             now_ms = int(time.time() * 1000)
70             should_send = len(self.pending) >= self.max_batch_size
71             if not should_send:
72                 send_at_ms = self.pending[0].timestamp_ms
73                 should_send = send_at_ms <= now_ms
74
75             should_send = should_send and now_ms >= self.dont_send_until_ms
76
77         return should_send
78
79     def send_batch(self):
80         if len(self.pending) < 1:
81             return True
82         now_ms = int(time.time() * 1000)
83         url = self.create_url()
84         logger.info("sending %s msgs to %s . Oldest: %s ms", len(self.pending), url,
85                     str(now_ms - self.pending[0].timestamp_ms))
86         try:
87             str_msg = ''
88             if self.contenttype == "application/json":
89                 str_msg = self.parse_json()
90             elif self.contenttype == "text/plain":
91                 for m in self.pending:
92                     str_msg += m.msg
93                     str_msg += '\n'
94             elif self.contenttype == "application/cambria":
95                 for m in self.pending:
96                     str_msg += str(len(m.partition))
97                     str_msg += '.'
98                     str_msg += str(len(m.msg))
99                     str_msg += '.'
100                     str_msg += m.partition
101                     str_msg += m.msg
102                     str_msg += '\n'
103             else:
104                 for m in self.pending:
105                     str_msg += m.msg
106             msg = bytearray(str_msg)
107
108             start_ms = int(time.time() * 1000)
109             if self.api_key:
110                 headers = self.create_headers()
111             else:
112                 headers = {'content-type': self.contenttype}
113             ret = requests.post(url=url, data=msg, headers=headers)
114             if ret.status_code < 200 or ret.status_code > 299:
115                 return False
116             logger.info("MR reply ok (%s ms): %s", start_ms - int(time.time() * 1000), ret.json())
117             self.pending = []
118             return True
119
120         except Exception as e:
121             logger.error(e.message)
122             return False
123
124     def create_headers(self):
125         data = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + '-04:00'
126         hmac_code = hmac.new(self.api_secret.encode(), data.encode(), sha1).digest()
127         signature = base64.b64encode(hmac_code).decode()
128         auth = self.api_key + ':' + signature
129         headers = {
130             'X-CambriaDate': data,
131             'X-CambriaAuth': auth,
132             'content-type': self.contenttype
133         }
134         return headers
135
136     def create_url(self):
137         url = "http://%s/events/%s" % (self.host, self.topic)
138         if self.partition:
139             url = url + "?partitionKey=" + self.partition
140         return url
141
142     def parse_json(self):
143         data = []
144         for message in self.pending:
145             msg = json.loads(message.msg)
146             for m in msg:
147                 data.append(m)
148         return json.dumps(data)
149
150     def close(self, timeout):
151         try:
152             self.closed = True
153             self.scheduler.shutdown()
154             now_ms = int(time.time() * 1000)
155             wait_in_ms = now_ms + timeout * 1000
156
157             while int(time.time() * 1000) < wait_in_ms and len(self.pending) > 0:
158                 self.send_message(True)
159                 time.sleep(0.25)
160             return self.pending
161         except Exception as e:
162             raise DmaapClientException("send message failed: " + e.message)
163
164
165 class Message:
166     def __init__(self, partition, msg):
167         if not partition:
168             self.partition = ""
169         else:
170             self.partition = partition
171         self.msg = msg
172         self.timestamp_ms = int(time.time() * 1000)