1 # Copyright 2019 AT&T Intellectual Property. All rights reserved.
2 # Copyright (C) 2022 Nordix Foundation
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 from kafka import KafkaConsumer
17 from kafka import KafkaProducer
18 from kafka import TopicPartition
20 from robot.api.deco import keyword
21 from robot import utils
24 logging.getLogger("kafka").setLevel(logging.CRITICAL)
27 class KafkaKeywords(object):
28 """ Utilities useful for Kafka consuming and producing """
31 super(KafkaKeywords, self).__init__()
32 self._cache = utils.ConnectionCache('No Kafka Environments created')
35 def connect(self, alias, kafka_host, sasl_user, sasl_password, sasl_mechanism="PLAIN"):
36 """connect to the specified kafka server"""
38 "bootstrap_servers": kafka_host,
39 "sasl_plain_username": sasl_user,
40 "sasl_plain_password": sasl_password,
41 "security_protocol": 'SASL_PLAINTEXT',
42 "ssl_context": ssl.create_default_context(),
43 "sasl_mechanism": sasl_mechanism
45 self._cache.register(client, alias=alias)
48 def produce(self, alias, topic, key, value):
52 producer = self._get_producer(alias)
53 return producer.send(topic, value=value, key=key)
55 def _get_producer(self, alias):
56 cache = self._cache.switch(alias)
57 prod = KafkaProducer(bootstrap_servers=cache['bootstrap_servers'],
58 sasl_plain_username=cache['sasl_username'],
59 sasl_plain_password=cache['sasl_password'],
60 security_protocol=cache['security_protocol'],
61 ssl_context=cache['ssl_context'],
62 sasl_mechanism=cache['sasl_mechanism'],
63 request_timeout_ms=5000)
67 def consume(self, alias, topic_name, consumer_group=None):
70 consumer = self._get_consumer(alias, topic_name, consumer_group)
72 consumer.close(autocommit=True)
78 def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=False):
81 # default to the topic as group name
87 cache = self._cache.switch(alias)
89 consumer = KafkaConsumer(bootstrap_servers=cache['bootstrap_servers'],
90 sasl_plain_username=cache['sasl_username'],
91 sasl_plain_password=cache['sasl_password'],
92 security_protocol=cache['security_protocol'],
93 ssl_context=cache['ssl_context'],
94 sasl_mechanism=cache['sasl_mechanism'],
96 request_timeout_ms=10001)
99 partition_set = consumer.partitions_for_topic(str(topic_name))
101 for val in partition_set:
102 partitions.append(TopicPartition(str(topic_name), val))
103 consumer.assign(partitions)
104 last = consumer.end_offsets(partitions)
105 offset = max(last.values())
107 if set_offset_to_earliest:
108 consumer.seek_to_beginning()
110 for tp in partitions:
111 consumer.seek(tp, offset - 1)