3 # ============LICENSE_START====================================================
4 # Copyright (C) 2023 Nordix Foundation.
5 # =============================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
18 # SPDX-License-Identifier: Apache-2.0
19 # ============LICENSE_END======================================================
21 # Python utility to fetch kafka topic and look for required messages.
22 # Accepts the arguments {topic_name} and {list of expected values} and {timeout} to verify the kafka topic.
25 from confluent_kafka import Consumer, KafkaException
29 def consume_kafka_topic(topic, expected_values, timeout):
31 'bootstrap.servers': 'localhost:29092',
32 'group.id': 'testgrp',
33 'auto.offset.reset': 'earliest'
35 consumer = Consumer(config)
36 consumer.subscribe([topic])
38 start_time = time.time()
39 while time.time() - start_time < timeout:
40 msg = consumer.poll(1.0)
44 if msg.error().code() == KafkaException._PARTITION_EOF:
45 sys.stderr.write(f"Reached end of topic {msg.topic()} / partition {msg.partition()}\n")
50 raise KafkaException(msg.error())
53 message = msg.value().decode('utf-8')
54 if verify_msg(expected_values, message):
60 def verify_msg(expected_values, message):
61 for item in expected_values:
62 if item not in message:
67 if __name__ == '__main__':
68 topic_name = sys.argv[1]
69 timeout = sys.argv[2] # timeout in seconds for verifying the kafka topic
70 expected_values = sys.argv[3:]
71 consume_kafka_topic(topic_name, expected_values, timeout)