First working draft of kafka for inference app
[demo.git] / vnfs / DAaaS / microservices / PythonApps / python-kafkaConsumer-inference-app / src / producer / CustomKafkaProducer.py
diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py
new file mode 100644 (file)
index 0000000..8f726bd
--- /dev/null
@@ -0,0 +1,38 @@
+import logging
+from confluent_kafka import Producer
+import traceback
+
+logging.basicConfig(format='%(asctime)s::%(process)d::%(levelname)s::%(message)s', level=logging.INFO, datefmt='%d-%b-%y %H:%M:%S')
+
+
+class CustomKafkaProducer:
+    def __init__(self):
+        self.topic_name = "metrics3"
+        #self.topic_name = "adatopic1"
+        conf = {'bootstrap.servers': 'kafka-cluster-kafka-bootstrap:9092'
+                }
+        self.producer = Producer(**conf)
+
+
+    def produce(self, kafka_msg, kafka_key):
+        try:
+            self.producer.produce(topic=self.topic_name,
+                              value=kafka_msg,
+                              key=kafka_key,
+                              callback=lambda err, msg: self.on_delivery(err, msg)
+            )
+            self.producer.flush()
+
+        except Exception as e:
+            #print("Error during producing to kafka topic. Stacktrace is %s",e)
+            logging.error("Error during producing to kafka topic.")
+            traceback.print_exc()
+
+
+    def on_delivery(self, err, msg):
+        if err:
+            print("Message failed delivery, error: %s", err)
+            logging.error('%s raised an error', err)
+        else:
+            logging.info("Message delivered to %s on partition %s",
+                        msg.topic(), msg.partition())
\ No newline at end of file