KafkaKeywords update 91/92191/1
authormarekpl <marek.pondel@nokia.com>
Mon, 29 Jul 2019 11:29:49 +0000 (13:29 +0200)
committermarekpl <marek.pondel@nokia.com>
Mon, 29 Jul 2019 11:29:49 +0000 (13:29 +0200)
KafkaKeywords update

Issue-ID: DCAEGEN2-565
Signed-off-by: marekpl <marek.pondel@nokia.com>
Change-Id: I21f13a43347c510f1cb45437ae659e9b41e3f9ac

robotframework-onap/ONAPLibrary/KafkaKeywords.py

index c178bc8..f5adce5 100644 (file)
@@ -71,7 +71,7 @@ class KafkaKeywords(object):
         else:
             return msg.value
 
-    def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=True):
+    def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=False):
         assert topic_name
 
         # default to the topic as group name
@@ -91,14 +91,15 @@ class KafkaKeywords(object):
                                  group_id=cgn,
                                  request_timeout_ms=10001)
 
-       consumer.assign([TopicPartition(str(topic_name), 0),TopicPartition(str(topic_name), 1),TopicPartition(str(topic_name), 2)])
-        consumer.poll()
+       partitions = [TopicPartition(str(topic_name), 0), TopicPartition(str(topic_name), 1), TopicPartition(str(topic_name), 2)]
+       consumer.assign(partitions)
+       last = consumer.end_offsets(partitions)
+       offset = max(last.values())
 
         if set_offset_to_earliest:
             consumer.seek_to_beginning()
         else:
-            consumer.seek_to_end()
-
-        consumer.topics()
+            for tp in partitions:
+               consumer.seek(tp, offset - 1)
 
         return consumer