Update INFO.yaml with new PTL
[testsuite/python-testing-utils.git] / robotframework-onap / ONAPLibrary / KafkaKeywords.py
1 # Copyright 2019 AT&T Intellectual Property. All rights reserved.
2 # Copyright (C) 2022 Nordix Foundation
3 # Copyright (C) 2022 Nokia
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 #         http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 from kafka import KafkaConsumer
18 from kafka import KafkaProducer
19 from kafka import TopicPartition
20 import ssl
21 from robot.api.deco import keyword
22 from robot import utils
23 import logging
24
25 logging.getLogger("kafka").setLevel(logging.CRITICAL)
26
27
28 class KafkaKeywords(object):
29     """ Utilities useful for Kafka consuming and producing """
30
31     def __init__(self):
32         super(KafkaKeywords, self).__init__()
33         self._cache = utils.ConnectionCache('No Kafka Environments created')
34
35     @keyword
36     def connect(self, alias, kafka_host, sasl_user, sasl_password, sasl_mechanism="PLAIN"):
37         """connect to the specified kafka server"""
38         client = {
39             "bootstrap_servers": kafka_host,
40             "sasl_username": sasl_user,
41             "sasl_password": sasl_password,
42             "security_protocol": 'SASL_PLAINTEXT',
43             "ssl_context": ssl.create_default_context(),
44             "sasl_mechanism": sasl_mechanism
45         }
46         self._cache.register(client, alias=alias)
47
48     @keyword
49     def produce(self, alias, topic, key, value):
50         assert topic
51         assert value
52
53         producer = self._get_producer(alias)
54         return producer.send(topic, value=value, key=key)
55
56     def _get_producer(self, alias):
57         cache = self._cache.switch(alias)
58         prod = KafkaProducer(bootstrap_servers=cache['bootstrap_servers'],
59                              sasl_plain_username=cache['sasl_username'],
60                              sasl_plain_password=cache['sasl_password'],
61                              security_protocol=cache['security_protocol'],
62                              ssl_context=cache['ssl_context'],
63                              sasl_mechanism=cache['sasl_mechanism'],
64                              request_timeout_ms=5000)
65         return prod
66
67     @keyword
68     def consume(self, alias, topic_name, consumer_group=None):
69         assert topic_name
70
71         consumer = self._get_consumer(alias, topic_name, consumer_group)
72         msg = next(consumer)
73         consumer.close(autocommit=True)
74         if msg is None:
75             return None
76         else:
77             return msg.value
78
79     def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=False):
80         assert topic_name
81
82         # default to the topic as group name
83         if consumer_group:
84             cgn = consumer_group
85         else:
86             cgn = topic_name
87
88         cache = self._cache.switch(alias)
89
90         consumer = KafkaConsumer(bootstrap_servers=cache['bootstrap_servers'],
91                                  sasl_plain_username=cache['sasl_username'],
92                                  sasl_plain_password=cache['sasl_password'],
93                                  security_protocol=cache['security_protocol'],
94                                  ssl_context=cache['ssl_context'],
95                                  sasl_mechanism=cache['sasl_mechanism'],
96                                  group_id=cgn,
97                                  request_timeout_ms=10001)
98
99         consumer.topics()
100         partition_set = consumer.partitions_for_topic(str(topic_name))
101         partitions = []
102         for val in partition_set:
103             partitions.append(TopicPartition(str(topic_name), val))
104         consumer.assign(partitions)
105         last = consumer.end_offsets(partitions)
106         offset = max(last.values())
107
108         if set_offset_to_earliest:
109             consumer.seek_to_beginning()
110         else:
111             for tp in partitions:
112                 consumer.seek(tp, offset - 1)
113         return consumer