Fix whitespace issues in Python files
[demo.git] / vnfs / DAaaS / microservices / PythonApps / python-kafkaConsumer-inference-app / src / producer / CustomKafkaProducer.py
1 import logging
2 from confluent_kafka import Producer
3 import traceback
4
5 logging.basicConfig(format='%(asctime)s::%(process)d::%(levelname)s::%(message)s', level=logging.INFO, datefmt='%d-%b-%y %H:%M:%S')
6
7
8 class CustomKafkaProducer:
9     def __init__(self):
10         self.topic_name = "metrics3"
11         #self.topic_name = "adatopic1"
12         conf = {'bootstrap.servers': 'kafka-cluster-kafka-bootstrap:9092'
13                 }
14         self.producer = Producer(**conf)
15
16
17     def produce(self, kafka_msg, kafka_key):
18         try:
19             self.producer.produce(topic=self.topic_name,
20                               value=kafka_msg,
21                               key=kafka_key,
22                               callback=lambda err, msg: self.on_delivery(err, msg)
23             )
24             self.producer.flush()
25
26         except Exception as e:
27             #print("Error during producing to kafka topic. Stacktrace is %s",e)
28             logging.error("Error during producing to kafka topic.")
29             traceback.print_exc()
30
31
32     def on_delivery(self, err, msg):
33         if err:
34             print("Message failed delivery, error: %s", err)
35             logging.error('%s raised an error', err)
36         else:
37             logging.info("Message delivered to %s on partition %s",
38                         msg.topic(), msg.partition())