from kafka.producer.future import FutureRecordMetadata
-
from netconf_server.netconf_kafka_client import NetconfKafkaClient
from netconf_server.netconf_kafka_message_factory import NetconfKafkaMessageFactory
from netconf_server.sysrepo_interface.config_change_data import ConfigChangeData
kafka_message = NetconfChangeListener._create_kafka_message(change)
logging.info("Sending message '{}' to Kafka '{}' topic".format(kafka_message, self.topic))
response = self.kafka_client.send(self.topic, kafka_message) # type: FutureRecordMetadata
- logging.info("Response from Kafka: {}".format(response.get(timeout=1)))
-
+ self.set_up_callbacks_for_kafka_request(response)
+ logging.info("Module changes sent to Kafka")
except Exception as e:
logger.error("Exception occurred during handling of sysrepo config change", e)
- logger.info("Module changes sent to Kafka. Operation finished.")
+
+ @staticmethod
+ def set_up_callbacks_for_kafka_request(response):
+ response.add_callback(
+ lambda val: logging.info("Response from Kafka: {}".format(val))
+ )
+ response.add_errback(
+ lambda exc: logging.error("Exception from Kafka: {}".format(exc))
+ )
@staticmethod
def _create_kafka_message(change):
server = "{}:{}".format(host, port)
producer = KafkaProducer(
bootstrap_servers=server,
+ request_timeout_ms=15000,
+ retry_backoff_ms=1000,
+ max_in_flight_requests_per_connection=1,
+ retries=3,
value_serializer=lambda x: dumps(x).encode(STANDARD_CHARSETS_UTF8)
)