change kafka send method to be async. 14/120414/1
authorBartosz Gardziejewski <bartosz.gardziejewski@nokia.com>
Mon, 12 Apr 2021 12:21:34 +0000 (14:21 +0200)
committerBartosz Gardziejewski <bartosz.gardziejewski@nokia.com>
Mon, 12 Apr 2021 12:23:27 +0000 (14:23 +0200)
Issue-ID: INT-1869
Signed-off-by: Bartosz Gardziejewski <bartosz.gardziejewski@nokia.com>
Change-Id: I3301fd97f8f90443ebc4e3530b8c1fa10b5388c3

src/python/netconf_server/netconf_change_listener.py
src/python/netconf_server/netconf_kafka_client.py

index 628f757..cdb5457 100644 (file)
@@ -21,7 +21,6 @@ import logging
 
 from kafka.producer.future import FutureRecordMetadata
 
-
 from netconf_server.netconf_kafka_client import NetconfKafkaClient
 from netconf_server.netconf_kafka_message_factory import NetconfKafkaMessageFactory
 from netconf_server.sysrepo_interface.config_change_data import ConfigChangeData
@@ -53,11 +52,19 @@ class NetconfChangeListener(object):
                 kafka_message = NetconfChangeListener._create_kafka_message(change)
                 logging.info("Sending message '{}' to Kafka '{}' topic".format(kafka_message, self.topic))
                 response = self.kafka_client.send(self.topic, kafka_message)  # type: FutureRecordMetadata
-                logging.info("Response from Kafka: {}".format(response.get(timeout=1)))
-
+                self.set_up_callbacks_for_kafka_request(response)
+                logging.info("Module changes sent to Kafka")
             except Exception as e:
                 logger.error("Exception occurred during handling of sysrepo config change", e)
-        logger.info("Module changes sent to Kafka. Operation finished.")
+
+    @staticmethod
+    def set_up_callbacks_for_kafka_request(response):
+        response.add_callback(
+            lambda val: logging.info("Response from Kafka: {}".format(val))
+        )
+        response.add_errback(
+            lambda exc: logging.error("Exception from Kafka: {}".format(exc))
+        )
 
     @staticmethod
     def _create_kafka_message(change):
index 8687802..53e7ecd 100644 (file)
@@ -48,6 +48,10 @@ class NetconfKafkaClient(object):
         server = "{}:{}".format(host, port)
         producer = KafkaProducer(
             bootstrap_servers=server,
+            request_timeout_ms=15000,
+            retry_backoff_ms=1000,
+            max_in_flight_requests_per_connection=1,
+            retries=3,
             value_serializer=lambda x: dumps(x).encode(STANDARD_CHARSETS_UTF8)
         )