X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=catalog%2Fpub%2FDmaap_lib%2Fdmaap%2Fpublisher.py;h=3ffff542091b4ec109dd5bc18fde04e6edfe6169;hb=29020a784229dcdf3f0a5ad0558b5c58e00b904a;hp=643ba90b071a54048345a2fb7087e64a06768a4a;hpb=fddda96911bdb3ec9841ac71e764cb6eb8fa08d5;p=modeling%2Fetsicatalog.git diff --git a/catalog/pub/Dmaap_lib/dmaap/publisher.py b/catalog/pub/Dmaap_lib/dmaap/publisher.py index 643ba90..3ffff54 100644 --- a/catalog/pub/Dmaap_lib/dmaap/publisher.py +++ b/catalog/pub/Dmaap_lib/dmaap/publisher.py @@ -1,15 +1,17 @@ -# Copyright (c) 2019, CMCC Technologies Co., Ltd. -# Licensed under the Apache License, Version 2.0 (the "License") +# Copyright (c) 2019, CMCC Technologies. Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 +# +# http://www.apache.org/licenses/LICENSE-2.0 +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import base64 import datetime import hmac @@ -19,16 +21,16 @@ import time from hashlib import sha1 import requests -from apscheduler.scheduler import Scheduler - -from ..pub.exceptions import DmaapClientException +from apscheduler.schedulers.background import BackgroundScheduler +from catalog.pub.Dmaap_lib.pub.exceptions import DmaapClientException logger = logging.getLogger(__name__) class BatchPublisherClient: - def __init__(self, host, topic, partition="", contenttype="text/plain", max_batch_size=100, max_batch_age_ms=1000): - self.host = host + def __init__(self, base_url, topic, partition="", contenttype="text/plain", max_batch_size=100, + max_batch_age_ms=1000): + self.base_url = base_url self.topic = topic self.partition = partition self.contenttype = contenttype @@ -37,13 +39,13 @@ class BatchPublisherClient: self.pending = [] self.closed = False self.dont_send_until_ms = 0 - self.scheduler = Scheduler(standalone=False) - self.api_key = '', - self.api_secret = '' + self.scheduler = BackgroundScheduler() - @self.scheduler.interval_schedule(second=1) + # @self.scheduler.interval_schedule(second=1) + @self.scheduler.scheduled_job(second=1) def crawl_job(): self.send_message(False) + self.scheduler.start() def set_api_credentials(self, api_key, api_secret): @@ -58,7 +60,7 @@ class BatchPublisherClient: self.pending.append(message) return len(self.pending) except Exception as e: - raise DmaapClientException("append message failed: " + e.message) + raise DmaapClientException("append message failed: " + str(e)) def send_message(self, force): if force or self.should_send_now(): @@ -120,7 +122,7 @@ class BatchPublisherClient: return True except Exception as e: - logger.error(e.message) + logger.error(str(e)) return False def create_headers(self): @@ -136,7 +138,7 @@ class BatchPublisherClient: return headers def create_url(self): - url = "http://%s/events/%s" % (self.host, self.topic) + url = self.base_url + "/events/%s" % self.topic if self.partition: url = url + "?partitionKey=" + self.partition return url @@ -161,7 +163,7 @@ class BatchPublisherClient: time.sleep(0.25) return self.pending except Exception as e: - raise DmaapClientException("send message failed: " + e.message) + raise DmaapClientException("send message failed: " + str(e)) class Message: