From: marekpl Date: Mon, 29 Jul 2019 11:29:49 +0000 (+0200) Subject: KafkaKeywords update X-Git-Tag: 6.0.0-ONAP~34 X-Git-Url: https://gerrit.onap.org/r/gitweb?a=commitdiff_plain;h=7ad3fbdb3ae959261f48073ce24de289516d5b26;p=testsuite%2Fpython-testing-utils.git KafkaKeywords update KafkaKeywords update Issue-ID: DCAEGEN2-565 Signed-off-by: marekpl Change-Id: I21f13a43347c510f1cb45437ae659e9b41e3f9ac --- diff --git a/robotframework-onap/ONAPLibrary/KafkaKeywords.py b/robotframework-onap/ONAPLibrary/KafkaKeywords.py index c178bc8..f5adce5 100644 --- a/robotframework-onap/ONAPLibrary/KafkaKeywords.py +++ b/robotframework-onap/ONAPLibrary/KafkaKeywords.py @@ -71,7 +71,7 @@ class KafkaKeywords(object): else: return msg.value - def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=True): + def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=False): assert topic_name # default to the topic as group name @@ -91,14 +91,15 @@ class KafkaKeywords(object): group_id=cgn, request_timeout_ms=10001) - consumer.assign([TopicPartition(str(topic_name), 0),TopicPartition(str(topic_name), 1),TopicPartition(str(topic_name), 2)]) - consumer.poll() + partitions = [TopicPartition(str(topic_name), 0), TopicPartition(str(topic_name), 1), TopicPartition(str(topic_name), 2)] + consumer.assign(partitions) + last = consumer.end_offsets(partitions) + offset = max(last.values()) if set_offset_to_earliest: consumer.seek_to_beginning() else: - consumer.seek_to_end() - - consumer.topics() + for tp in partitions: + consumer.seek(tp, offset - 1) return consumer