1 # Copyright 2019 AT&T Intellectual Property. All rights reserved.
2 # Copyright (C) 2022 Nordix Foundation
3 # Copyright (C) 2022 Nokia
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 from kafka import KafkaConsumer
18 from kafka import KafkaProducer
19 from kafka import TopicPartition
21 from robot.api.deco import keyword
22 from robot import utils
25 logging.getLogger("kafka").setLevel(logging.CRITICAL)
28 class KafkaKeywords(object):
29 """ Utilities useful for Kafka consuming and producing """
32 super(KafkaKeywords, self).__init__()
33 self._cache = utils.ConnectionCache('No Kafka Environments created')
36 def connect(self, alias, kafka_host, sasl_user, sasl_password, sasl_mechanism="PLAIN"):
37 """connect to the specified kafka server"""
39 "bootstrap_servers": kafka_host,
40 "sasl_username": sasl_user,
41 "sasl_password": sasl_password,
42 "security_protocol": 'SASL_PLAINTEXT',
43 "ssl_context": ssl.create_default_context(),
44 "sasl_mechanism": sasl_mechanism
46 self._cache.register(client, alias=alias)
49 def produce(self, alias, topic, key, value):
53 producer = self._get_producer(alias)
54 return producer.send(topic, value=value, key=key)
56 def _get_producer(self, alias):
57 cache = self._cache.switch(alias)
58 prod = KafkaProducer(bootstrap_servers=cache['bootstrap_servers'],
59 sasl_plain_username=cache['sasl_username'],
60 sasl_plain_password=cache['sasl_password'],
61 security_protocol=cache['security_protocol'],
62 ssl_context=cache['ssl_context'],
63 sasl_mechanism=cache['sasl_mechanism'],
64 request_timeout_ms=5000)
68 def consume(self, alias, topic_name, consumer_group=None):
71 consumer = self._get_consumer(alias, topic_name, consumer_group)
73 consumer.close(autocommit=True)
79 def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=False):
82 # default to the topic as group name
88 cache = self._cache.switch(alias)
90 consumer = KafkaConsumer(bootstrap_servers=cache['bootstrap_servers'],
91 sasl_plain_username=cache['sasl_username'],
92 sasl_plain_password=cache['sasl_password'],
93 security_protocol=cache['security_protocol'],
94 ssl_context=cache['ssl_context'],
95 sasl_mechanism=cache['sasl_mechanism'],
97 request_timeout_ms=10001)
100 partition_set = consumer.partitions_for_topic(str(topic_name))
102 for val in partition_set:
103 partitions.append(TopicPartition(str(topic_name), val))
104 consumer.assign(partitions)
105 last = consumer.end_offsets(partitions)
106 offset = max(last.values())
108 if set_offset_to_earliest:
109 consumer.seek_to_beginning()
111 for tp in partitions:
112 consumer.seek(tp, offset - 1)