Add kafka docker container for policy CSITs
[policy/docker.git] / csit / resources / scripts / kafka_consumer.py
1 #!/usr/bin/env python3
2 #
3 # ============LICENSE_START====================================================
4 #  Copyright (C) 2023 Nordix Foundation.
5 # =============================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #      http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18 # SPDX-License-Identifier: Apache-2.0
19 # ============LICENSE_END======================================================
20
21 # Python utility to fetch kafka topic and look for required messages.
22 # Accepts the arguments {topic_name} and {list of expected values} and {timeout} to verify the kafka topic.
23
24
25 from confluent_kafka import Consumer, KafkaException
26 import sys
27 import time
28
29 def consume_kafka_topic(topic, expected_values, timeout):
30     config = {
31             'bootstrap.servers': 'localhost:29092',
32             'group.id': 'testgrp',
33             'auto.offset.reset': 'earliest'
34     }
35     consumer = Consumer(config)
36     consumer.subscribe([topic])
37     try:
38         start_time = time.time()
39         while time.time() - start_time < timeout:
40                 msg = consumer.poll(1.0)
41                 if msg is None:
42                     continue
43                 if msg.error():
44                     if msg.error().code() == KafkaException._PARTITION_EOF:
45                         sys.stderr.write(f"Reached end of topic {msg.topic()} / partition {msg.partition()}\n")
46                         print('ERROR')
47                         sys.exit(404)
48                     else:
49                         # Error
50                         raise KafkaException(msg.error())
51                 else:
52                     # Message received
53                     message = msg.value().decode('utf-8')
54                     if verify_msg(expected_values, message):
55                         print(message)
56                         sys.exit(200)
57     finally:
58         consumer.close()
59
60 def verify_msg(expected_values, message):
61     for item in expected_values:
62         if item not in message:
63             return False
64     return True
65
66
67 if __name__ == '__main__':
68     topic_name = sys.argv[1]
69     timeout = sys.argv[2]  # timeout in seconds for verifying the kafka topic
70     expected_values = sys.argv[3:]
71     consume_kafka_topic(topic_name, expected_values, timeout)