1 # Copyright 2019 AT&T Intellectual Property. All rights reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 from kafka import KafkaConsumer
16 from kafka import KafkaProducer
17 from kafka import TopicPartition
19 from robot.api.deco import keyword
20 from robot import utils
23 class KafkaKeywords(object):
24 """ Utilities useful for Kafka consuming and producing """
27 super(KafkaKeywords, self).__init__()
28 self._cache = utils.ConnectionCache('No Kafka Environments created')
31 def connect(self, alias, kafka_host, sasl_user, sasl_password):
32 """connect to the specified kafka server"""
34 "bootstrap_servers": kafka_host,
35 "sasl_plain_username": sasl_user,
36 "sasl_plain_password": sasl_password,
37 "security_protocol": 'SASL_PLAINTEXT',
38 "ssl_context": ssl.create_default_context(),
39 "sasl_mechanism": 'PLAIN'
41 self._cache.register(client, alias=alias)
44 def produce(self, alias, topic, key, value):
48 producer = self._get_producer(alias)
49 return producer.send(topic, value=value, key=key)
51 def _get_producer(self, alias):
52 cache = self._cache.switch(alias)
53 prod = KafkaProducer(bootstrap_servers=cache['bootstrap_servers'],
54 sasl_plain_username=cache['sasl_plain_username'],
55 sasl_plain_password=cache['sasl_plain_password'],
56 security_protocol=cache['security_protocol'],
57 ssl_context=cache['ssl_context'],
58 sasl_mechanism=cache['sasl_mechanism'],
59 request_timeout_ms=5000)
63 def consume(self, alias, topic_name, consumer_group=None):
66 consumer = self._get_consumer(alias, topic_name, consumer_group)
68 consumer.close(autocommit=True)
74 def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=False):
77 # default to the topic as group name
83 cache = self._cache.switch(alias)
85 consumer = KafkaConsumer(bootstrap_servers=cache['bootstrap_servers'],
86 sasl_plain_username=cache['sasl_plain_username'],
87 sasl_plain_password=cache['sasl_plain_password'],
88 security_protocol=cache['security_protocol'],
89 ssl_context=cache['ssl_context'],
90 sasl_mechanism=cache['sasl_mechanism'],
92 request_timeout_ms=10001)
94 partitions = [TopicPartition(str(topic_name), 0), TopicPartition(str(topic_name), 1), TopicPartition(str(topic_name), 2)]
95 consumer.assign(partitions)
96 last = consumer.end_offsets(partitions)
97 offset = max(last.values())
99 if set_offset_to_earliest:
100 consumer.seek_to_beginning()
102 for tp in partitions:
103 consumer.seek(tp, offset - 1)