2 from confluent_kafka import Producer
5 logging.basicConfig(format='%(asctime)s::%(process)d::%(levelname)s::%(message)s', level=logging.INFO, datefmt='%d-%b-%y %H:%M:%S')
8 class CustomKafkaProducer:
10 self.topic_name = "metrics3"
11 #self.topic_name = "adatopic1"
12 conf = {'bootstrap.servers': 'kafka-cluster-kafka-bootstrap:9092'
14 self.producer = Producer(**conf)
17 def produce(self, kafka_msg, kafka_key):
19 self.producer.produce(topic=self.topic_name,
22 callback=lambda err, msg: self.on_delivery(err, msg)
26 except Exception as e:
27 #print("Error during producing to kafka topic. Stacktrace is %s",e)
28 logging.error("Error during producing to kafka topic.")
32 def on_delivery(self, err, msg):
34 print("Message failed delivery, error: %s", err)
35 logging.error('%s raised an error', err)
37 logging.info("Message delivered to %s on partition %s",
38 msg.topic(), msg.partition())