Fix Kafka consumer 15/120415/2
authorTomasz Pietruszkiewicz <tomasz.pietruszkiewicz@nokia.com>
Mon, 12 Apr 2021 12:33:41 +0000 (14:33 +0200)
committerTomasz Pietruszkiewicz <tomasz.pietruszkiewicz@nokia.com>
Mon, 12 Apr 2021 12:46:17 +0000 (14:46 +0200)
Change-Id: I8b52e2d1859b964a582289e2f63d70272d60f39a
Issue-ID: INT-1869
Signed-off-by: Tomasz Pietruszkiewicz <tomasz.pietruszkiewicz@nokia.com>
src/python/netconf_server/kafka_consumer_factory.py
src/python/netconf_server/netconf_kafka_client.py
src/python/tests/unit/test_netconf_kafka_client.py

index 332cd21..43ef1cb 100644 (file)
@@ -26,7 +26,7 @@ STANDARD_CHARSETS_UTF8 = 'utf-8'
 
 def provide_kafka_consumer(topic: str, server: str) -> KafkaConsumer:
     return KafkaConsumer(topic,
-                         consumer_timeout_ms=1000,
+                         consumer_timeout_ms=5000,
                          group_id='netconf-group',
                          auto_offset_reset='earliest',
                          enable_auto_commit=False,
index 8687802..027bde1 100644 (file)
@@ -75,5 +75,6 @@ class NetconfKafkaClient(object):
             message_value = message.value
             logger.info("Fetched config change %s" % message_value)
             messages.append(message_value)
+        consumer.close()
 
         return messages
index 9eff761..b3f45c1 100644 (file)
@@ -32,8 +32,10 @@ class TestNetconfKafkaClient(TestCase):
 
     def setUp(self):
         self.producer = MagicMock()
+        self.kafkaConsumerResponse = KafkaConsumerResponse(MagicMock(value=MESSAGE_1), MagicMock(value=MESSAGE_2))
+        self.kafkaConsumerResponse.close = MagicMock()
         self.kafka_customer_func = MagicMock(
-            return_value=[MagicMock(value=MESSAGE_1), MagicMock(value=MESSAGE_2)]
+            return_value=self.kafkaConsumerResponse
         )
         self.test_obj = NetconfKafkaClient(
             producer=self.producer,
@@ -58,3 +60,20 @@ class TestNetconfKafkaClient(TestCase):
         self.assertTrue(len(messages) == 2)
         self.assertTrue(MESSAGE_1 in messages)
         self.assertTrue(MESSAGE_2 in messages)
+
+
+class KafkaConsumerResponse(list):
+
+    def __new__(self, *args, **kwargs):
+        return super(KafkaConsumerResponse, self).__new__(self, args, kwargs)
+
+    def __init__(self, *args, **kwargs):
+        if len(args) == 1 and hasattr(args[0], '__iter__'):
+            list.__init__(self, args[0])
+        else:
+            list.__init__(self, args)
+        self.__dict__.update(kwargs)
+
+    def __call__(self, **kwargs):
+        self.__dict__.update(kwargs)
+        return self