2ebb902833c2bee50ca865a2641360d3a00fcf13
[testsuite/python-testing-utils.git] / robotframework-onap / ONAPLibrary / KafkaKeywords.py
1 # Copyright 2019 AT&T Intellectual Property. All rights reserved.
2 # Copyright (C) 2022 Nordix Foundation
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 from kafka import KafkaConsumer
17 from kafka import KafkaProducer
18 from kafka import TopicPartition
19 import ssl
20 from robot.api.deco import keyword
21 from robot import utils
22 import logging
23
24 logging.getLogger("kafka").setLevel(logging.CRITICAL)
25
26
27 class KafkaKeywords(object):
28     """ Utilities useful for Kafka consuming and producing """
29
30     def __init__(self):
31         super(KafkaKeywords, self).__init__()
32         self._cache = utils.ConnectionCache('No Kafka Environments created')
33
34     @keyword
35     def connect(self, alias, kafka_host, sasl_user, sasl_password, sasl_mechanism="PLAIN"):
36         """connect to the specified kafka server"""
37         client = {
38             "bootstrap_servers": kafka_host,
39             "sasl_plain_username": sasl_user,
40             "sasl_plain_password": sasl_password,
41             "security_protocol": 'SASL_PLAINTEXT',
42             "ssl_context": ssl.create_default_context(),
43             "sasl_mechanism": sasl_mechanism
44         }
45         self._cache.register(client, alias=alias)
46
47     @keyword
48     def produce(self, alias, topic, key, value):
49         assert topic
50         assert value
51
52         producer = self._get_producer(alias)
53         return producer.send(topic, value=value, key=key)
54
55     def _get_producer(self, alias):
56         cache = self._cache.switch(alias)
57         prod = KafkaProducer(bootstrap_servers=cache['bootstrap_servers'],
58                              sasl_plain_username=cache['sasl_username'],
59                              sasl_plain_password=cache['sasl_password'],
60                              security_protocol=cache['security_protocol'],
61                              ssl_context=cache['ssl_context'],
62                              sasl_mechanism=cache['sasl_mechanism'],
63                              request_timeout_ms=5000)
64         return prod
65
66     @keyword
67     def consume(self, alias, topic_name, consumer_group=None):
68         assert topic_name
69
70         consumer = self._get_consumer(alias, topic_name, consumer_group)
71         msg = next(consumer)
72         consumer.close(autocommit=True)
73         if msg is None:
74             return None
75         else:
76             return msg.value
77
78     def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=False):
79         assert topic_name
80
81         # default to the topic as group name
82         if consumer_group:
83             cgn = consumer_group
84         else:
85             cgn = topic_name
86
87         cache = self._cache.switch(alias)
88
89         consumer = KafkaConsumer(bootstrap_servers=cache['bootstrap_servers'],
90                                  sasl_plain_username=cache['sasl_username'],
91                                  sasl_plain_password=cache['sasl_password'],
92                                  security_protocol=cache['security_protocol'],
93                                  ssl_context=cache['ssl_context'],
94                                  sasl_mechanism=cache['sasl_mechanism'],
95                                  group_id=cgn,
96                                  request_timeout_ms=10001)
97
98         consumer.topics()
99         partition_set = consumer.partitions_for_topic(str(topic_name))
100         partitions = []
101         for val in partition_set:
102             partitions.append(TopicPartition(str(topic_name), val))
103         consumer.assign(partitions)
104         last = consumer.end_offsets(partitions)
105         offset = max(last.values())
106
107         if set_offset_to_earliest:
108             consumer.seek_to_beginning()
109         else:
110             for tp in partitions:
111                 consumer.seek(tp, offset - 1)
112         return consumer