From 23785037bc77749a748c875712db84836fdf07e1 Mon Sep 17 00:00:00 2001 From: dyh Date: Tue, 26 May 2020 09:56:56 +0800 Subject: [PATCH] add lib for Dmaap Issue-ID: MODELING-366 Change-Id: Id9a93649b49ae270a505f4aac790bc761862cf34 Signed-off-by: dyh --- catalog/pub/Dmaap-lib/__init__.py | 13 +++ catalog/pub/Dmaap-lib/dmaap/__init__.py | 10 ++ catalog/pub/Dmaap-lib/dmaap/consumer.py | 91 +++++++++++++++ catalog/pub/Dmaap-lib/dmaap/identity.py | 46 ++++++++ catalog/pub/Dmaap-lib/dmaap/publisher.py | 174 ++++++++++++++++++++++++++++ catalog/pub/Dmaap-lib/pub/__init__.py | 10 ++ catalog/pub/Dmaap-lib/pub/exceptions.py | 15 +++ catalog/pub/Dmaap-lib/test/test_consumer.py | 81 +++++++++++++ catalog/pub/Dmaap-lib/test/test_identity.py | 38 ++++++ 9 files changed, 478 insertions(+) create mode 100644 catalog/pub/Dmaap-lib/__init__.py create mode 100644 catalog/pub/Dmaap-lib/dmaap/__init__.py create mode 100644 catalog/pub/Dmaap-lib/dmaap/consumer.py create mode 100644 catalog/pub/Dmaap-lib/dmaap/identity.py create mode 100644 catalog/pub/Dmaap-lib/dmaap/publisher.py create mode 100644 catalog/pub/Dmaap-lib/pub/__init__.py create mode 100644 catalog/pub/Dmaap-lib/pub/exceptions.py create mode 100644 catalog/pub/Dmaap-lib/test/test_consumer.py create mode 100644 catalog/pub/Dmaap-lib/test/test_identity.py diff --git a/catalog/pub/Dmaap-lib/__init__.py b/catalog/pub/Dmaap-lib/__init__.py new file mode 100644 index 0000000..7ae04f0 --- /dev/null +++ b/catalog/pub/Dmaap-lib/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/catalog/pub/Dmaap-lib/dmaap/__init__.py b/catalog/pub/Dmaap-lib/dmaap/__init__.py new file mode 100644 index 0000000..0c1e8e1 --- /dev/null +++ b/catalog/pub/Dmaap-lib/dmaap/__init__.py @@ -0,0 +1,10 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/catalog/pub/Dmaap-lib/dmaap/consumer.py b/catalog/pub/Dmaap-lib/dmaap/consumer.py new file mode 100644 index 0000000..054791c --- /dev/null +++ b/catalog/pub/Dmaap-lib/dmaap/consumer.py @@ -0,0 +1,91 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import base64 +import hmac +import json +import logging +import datetime +import requests +from hashlib import sha1 + +from ..pub.exceptions import DmaapClientException + +logger = logging.getLogger(__name__) + + +class ConsumerClient: + def __init__(self, host, topic, consumer_group, consumer_id, timeout_ms=-1, limit=-1, filter='', + api_key='', api_secret=''): + self.host = host + self.topic = topic + self.group = consumer_group + self.comsumer_id = consumer_id + self.timeout_ms = timeout_ms + self.limit = limit + self.filter = filter + self.api_key = api_key + self.api_secret = api_secret + + def set_api_credentials(self, api_key, api_secret): + + self.api_key = api_key + self.api_secret = api_secret + + def create_url(self): + url = "http://%s/events/%s/%s/%s" % (self.host, self.topic, self.group, self.comsumer_id) + add_url = "" + if self.timeout_ms > -1: + add_url += "timeout=%s" % self.timeout_ms + if self.limit > -1: + if add_url: + add_url += "&" + add_url += "limit=%s" % self.limit + if self.filter: + if add_url: + add_url += "&" + add_url += "filter=%s" % self.filter.encode("utf-8") + if add_url: + url = url + "?" + add_url + + return url + + def create_headers(self): + data = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + '-04:00' + hmac_code = hmac.new(self.api_secret.encode(), data.encode(), sha1).digest() + signature = base64.b64encode(hmac_code).decode() + auth = self.api_key + ':' + signature + headers = { + 'X-CambriaDate': data, + 'X-CambriaAuth': auth + } + return headers + + def fetch(self): + try: + msgs = [] + url = self.create_url() + if self.api_key: + headers = self.create_headers() + ret = requests.get(url=url, headers=headers) + else: + ret = requests.get(url) + logger.info("Status code is %s, detail is %s.", ret.status_code, ret.json()) + if ret.status_code != 200: + raise DmaapClientException('Call dmaap failed. Status code is %s, detail is %s.' % (ret.status_code, ret.json())) + data = ret.json() + for msg in data: + msg = json.loads(msg) + msgs.append(msg) + return msgs + except Exception as e: + raise DmaapClientException(e.message) diff --git a/catalog/pub/Dmaap-lib/dmaap/identity.py b/catalog/pub/Dmaap-lib/dmaap/identity.py new file mode 100644 index 0000000..1dcaad8 --- /dev/null +++ b/catalog/pub/Dmaap-lib/dmaap/identity.py @@ -0,0 +1,46 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import json +import logging +import requests + +from ..pub.exceptions import DmaapClientException + +logger = logging.getLogger(__name__) + + +class IdentityClient: + def __init__(self, host): + self.host = host + + def create_apikey(self, email, description): + try: + headers = {'content-type': 'application/json;charset=UTF-8'} + data = { + 'email': email, + 'description': description + } + data = json.JSONEncoder().encode(data) + url = "http://%s/apiKeys/create" % (self.host) + ret = requests.post(url=url, data=data, headers=headers) + logger.info('create apiKey, response status_code: %s, body: %s', ret.status_code, ret.json()) + if ret.status_code != 200: + raise DmaapClientException(ret.json()) + ret = ret.json() + resp_data = { + 'apiKey': ret.get('key', ''), + 'apiSecret': ret.get('secret', '') + } + return resp_data + except Exception as e: + raise DmaapClientException('create apikey from dmaap failed: ' + e.message) diff --git a/catalog/pub/Dmaap-lib/dmaap/publisher.py b/catalog/pub/Dmaap-lib/dmaap/publisher.py new file mode 100644 index 0000000..643ba90 --- /dev/null +++ b/catalog/pub/Dmaap-lib/dmaap/publisher.py @@ -0,0 +1,174 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import base64 +import datetime +import hmac +import json +import logging +import time +from hashlib import sha1 + +import requests +from apscheduler.scheduler import Scheduler + +from ..pub.exceptions import DmaapClientException + +logger = logging.getLogger(__name__) + + +class BatchPublisherClient: + def __init__(self, host, topic, partition="", contenttype="text/plain", max_batch_size=100, max_batch_age_ms=1000): + self.host = host + self.topic = topic + self.partition = partition + self.contenttype = contenttype + self.max_batch_size = max_batch_size + self.max_batch_age_ms = max_batch_age_ms + self.pending = [] + self.closed = False + self.dont_send_until_ms = 0 + self.scheduler = Scheduler(standalone=False) + self.api_key = '', + self.api_secret = '' + + @self.scheduler.interval_schedule(second=1) + def crawl_job(): + self.send_message(False) + self.scheduler.start() + + def set_api_credentials(self, api_key, api_secret): + self.api_key = api_key + self.api_secret = api_secret + + def send(self, partition, msg): + try: + if self.closed: + raise DmaapClientException("The publisher was closed.") + message = Message(partition, msg) + self.pending.append(message) + return len(self.pending) + except Exception as e: + raise DmaapClientException("append message failed: " + e.message) + + def send_message(self, force): + if force or self.should_send_now(): + if not self.send_batch(): + logger.error("Send failed, %s message to send.", len(self.pending)) + + def should_send_now(self): + should_send = False + if len(self.pending) > 0: + now_ms = int(time.time() * 1000) + should_send = len(self.pending) >= self.max_batch_size + if not should_send: + send_at_ms = self.pending[0].timestamp_ms + should_send = send_at_ms <= now_ms + + should_send = should_send and now_ms >= self.dont_send_until_ms + + return should_send + + def send_batch(self): + if len(self.pending) < 1: + return True + now_ms = int(time.time() * 1000) + url = self.create_url() + logger.info("sending %s msgs to %s . Oldest: %s ms", len(self.pending), url, + str(now_ms - self.pending[0].timestamp_ms)) + try: + str_msg = '' + if self.contenttype == "application/json": + str_msg = self.parse_json() + elif self.contenttype == "text/plain": + for m in self.pending: + str_msg += m.msg + str_msg += '\n' + elif self.contenttype == "application/cambria": + for m in self.pending: + str_msg += str(len(m.partition)) + str_msg += '.' + str_msg += str(len(m.msg)) + str_msg += '.' + str_msg += m.partition + str_msg += m.msg + str_msg += '\n' + else: + for m in self.pending: + str_msg += m.msg + msg = bytearray(str_msg) + + start_ms = int(time.time() * 1000) + if self.api_key: + headers = self.create_headers() + else: + headers = {'content-type': self.contenttype} + ret = requests.post(url=url, data=msg, headers=headers) + if ret.status_code < 200 or ret.status_code > 299: + return False + logger.info("MR reply ok (%s ms): %s", start_ms - int(time.time() * 1000), ret.json()) + self.pending = [] + return True + + except Exception as e: + logger.error(e.message) + return False + + def create_headers(self): + data = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + '-04:00' + hmac_code = hmac.new(self.api_secret.encode(), data.encode(), sha1).digest() + signature = base64.b64encode(hmac_code).decode() + auth = self.api_key + ':' + signature + headers = { + 'X-CambriaDate': data, + 'X-CambriaAuth': auth, + 'content-type': self.contenttype + } + return headers + + def create_url(self): + url = "http://%s/events/%s" % (self.host, self.topic) + if self.partition: + url = url + "?partitionKey=" + self.partition + return url + + def parse_json(self): + data = [] + for message in self.pending: + msg = json.loads(message.msg) + for m in msg: + data.append(m) + return json.dumps(data) + + def close(self, timeout): + try: + self.closed = True + self.scheduler.shutdown() + now_ms = int(time.time() * 1000) + wait_in_ms = now_ms + timeout * 1000 + + while int(time.time() * 1000) < wait_in_ms and len(self.pending) > 0: + self.send_message(True) + time.sleep(0.25) + return self.pending + except Exception as e: + raise DmaapClientException("send message failed: " + e.message) + + +class Message: + def __init__(self, partition, msg): + if not partition: + self.partition = "" + else: + self.partition = partition + self.msg = msg + self.timestamp_ms = int(time.time() * 1000) diff --git a/catalog/pub/Dmaap-lib/pub/__init__.py b/catalog/pub/Dmaap-lib/pub/__init__.py new file mode 100644 index 0000000..0c1e8e1 --- /dev/null +++ b/catalog/pub/Dmaap-lib/pub/__init__.py @@ -0,0 +1,10 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/catalog/pub/Dmaap-lib/pub/exceptions.py b/catalog/pub/Dmaap-lib/pub/exceptions.py new file mode 100644 index 0000000..6b65fcf --- /dev/null +++ b/catalog/pub/Dmaap-lib/pub/exceptions.py @@ -0,0 +1,15 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License") +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class DmaapClientException(Exception): + def __init__(self, msg): + Exception.__init__(self, msg) diff --git a/catalog/pub/Dmaap-lib/test/test_consumer.py b/catalog/pub/Dmaap-lib/test/test_consumer.py new file mode 100644 index 0000000..1f89f65 --- /dev/null +++ b/catalog/pub/Dmaap-lib/test/test_consumer.py @@ -0,0 +1,81 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +import datetime +import hmac +import unittest +from _sha1 import sha1 + +from ..dmaap.consumer import ConsumerClient + + +class CreateApiKeyTest(unittest.TestCase): + def setUp(self): + self.apiKey = "7TuwzpLJ4QfQs4O" + self.apiSecret = "7TuwzpLJ4QfQs4O" + self.host = '127.0.0.1' + self.topic = 'abc' + self.group = 'def' + self.comsumer_id = '123' + self.timeout_ms = 3 + self.limit = 3 + self.filter = 'test' + + def tearDown(self): + self.ret_url = "" + + def test_create_url(self): + exp_url = 'http://127.0.0.1/events/abc/def/123' + consumer = ConsumerClient(self.host, self.topic, self.group, self.comsumer_id) + ret_url = consumer.create_url() + self.assertEqual(exp_url, ret_url) + + def test_create_timeout_url(self): + exp_url = 'http://127.0.0.1/events/abc/def/123?timeout=3' + consumer = ConsumerClient(self.host, self.topic, self.group, self.comsumer_id, self.timeout_ms) + ret_url = consumer.create_url() + self.assertEqual(exp_url, ret_url) + + def test_create_limit_url(self): + + exp_url = 'http://127.0.0.1/events/abc/def/123?timeout=3&limit=3' + consumer = ConsumerClient(self.host, self.topic, self.group, self.comsumer_id, + self.timeout_ms, self.limit) + ret_url = consumer.create_url() + self.assertEqual(exp_url, ret_url) + + def test_create_filter_url(self): + + exp_url = "http://127.0.0.1/events/abc/def/123?timeout=3&limit=3&filter=b'test'" + consumer = ConsumerClient(self.host, self.topic, self.group, self.comsumer_id, + self.timeout_ms, self.limit, self.filter) + ret_url = consumer.create_url() + self.assertEqual(exp_url, ret_url) + + def test_create_headers(self): + data = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + '-04:00' + hmac_code = hmac.new(self.apiSecret.encode(), data.encode(), sha1).digest() + signature = base64.b64encode(hmac_code).decode() + auth = self.apiKey + ':' + signature + exp_headers = { + 'X-CambriaDate': data, + 'X-CambriaAuth': auth + } + + consumer = ConsumerClient(self.host, self.topic, self.group, self.comsumer_id, + self.timeout_ms, self.limit, self.filter, self.apiKey, self.apiSecret) + consumer.set_api_credentials(self.apiKey, self.apiSecret) + rea_headers = consumer.create_headers() + self.assertEqual(exp_headers, rea_headers) diff --git a/catalog/pub/Dmaap-lib/test/test_identity.py b/catalog/pub/Dmaap-lib/test/test_identity.py new file mode 100644 index 0000000..0f88a5e --- /dev/null +++ b/catalog/pub/Dmaap-lib/test/test_identity.py @@ -0,0 +1,38 @@ +# Copyright (c) 2019, CMCC Technologies Co., Ltd. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import mock + +from ..dmaap.identity import IdentityClient + + +class CreateApiKeyTest(unittest.TestCase): + def setUp(self): + self.apiKey = "7TuwzpLJ4QfQs4O" + self.apiSecret = "7TuwzpLJ4QfQs4O" + self.host = '127.0.0.1' + + def tearDown(self): + self.ret_url = "" + + @mock.patch.object(IdentityClient, 'create_apikey') + def test_create_apiKey(self, mock_create_apikey): + mock_create_apikey.return_value = { + 'apiKey': "7TuwzpLJ4QfQs4O", + 'apiSecret': "7TuwzpLJ4QfQs4O" + } + resp_data = IdentityClient(self.host).create_apikey('', 'description') + self.assertEqual(self.apiKey, resp_data.get("apiKey")) + self.assertEqual(self.apiSecret, resp_data.get("apiSecret")) -- 2.16.6