add so delete request
[testsuite/python-testing-utils.git] / robotframework-onap / ONAPLibrary / KafkaKeywords.py
1 # Copyright 2019 AT&T Intellectual Property. All rights reserved.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 from kafka import KafkaConsumer
16 from kafka import KafkaProducer
17 from kafka import TopicPartition
18 import ssl
19 from robot.api.deco import keyword
20 from robot import utils
21
22
23 class KafkaKeywords(object):
24     """ Utilities useful for Kafka consuming and producing """
25
26     def __init__(self):
27         super(KafkaKeywords, self).__init__()
28         self._cache = utils.ConnectionCache('No Kafka Environments created')
29
30     @keyword
31     def connect(self, alias, kafka_host, sasl_user, sasl_password):
32         """connect to the specified kafka server"""
33         client = {
34             "bootstrap_servers": kafka_host,
35             "sasl_plain_username": sasl_user,
36             "sasl_plain_password": sasl_password,
37             "security_protocol": 'SASL_PLAINTEXT',
38             "ssl_context": ssl.create_default_context(),
39             "sasl_mechanism": 'PLAIN'
40         }
41         self._cache.register(client, alias=alias)
42
43     @keyword
44     def produce(self, alias, topic, key, value):
45         assert topic
46         assert value
47
48         producer = self._get_producer(alias)
49         return producer.send(topic, value=value, key=key)
50
51     def _get_producer(self, alias):
52         cache = self._cache.switch(alias)
53         prod = KafkaProducer(bootstrap_servers=cache['bootstrap_servers'],
54                              sasl_plain_username=cache['sasl_plain_username'],
55                              sasl_plain_password=cache['sasl_plain_password'],
56                              security_protocol=cache['security_protocol'],
57                              ssl_context=cache['ssl_context'],
58                              sasl_mechanism=cache['sasl_mechanism'],
59                              request_timeout_ms=5000)
60         return prod
61
62     @keyword
63     def consume(self, alias, topic_name, consumer_group=None):
64         assert topic_name
65
66         consumer = self._get_consumer(alias, topic_name, consumer_group)
67         msg = next(consumer)
68         consumer.close(autocommit=True)
69         if msg is None:
70             return None
71         else:
72             return msg.value
73
74     def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=False):
75         assert topic_name
76
77         # default to the topic as group name
78         if consumer_group:
79             cgn = consumer_group
80         else:
81             cgn = topic_name
82
83         cache = self._cache.switch(alias)
84
85         consumer = KafkaConsumer(bootstrap_servers=cache['bootstrap_servers'],
86                                  sasl_plain_username=cache['sasl_plain_username'],
87                                  sasl_plain_password=cache['sasl_plain_password'],
88                                  security_protocol=cache['security_protocol'],
89                                  ssl_context=cache['ssl_context'],
90                                  sasl_mechanism=cache['sasl_mechanism'],
91                                  group_id=cgn,
92                                  request_timeout_ms=10001)
93
94         partitions = [TopicPartition(str(topic_name), 0), TopicPartition(str(topic_name), 1), TopicPartition(str(topic_name), 2)]
95         consumer.assign(partitions)
96         last = consumer.end_offsets(partitions)
97         offset = max(last.values())
98
99         if set_offset_to_earliest:
100             consumer.seek_to_beginning()
101         else:
102             for tp in partitions:
103                 consumer.seek(tp, offset - 1)
104
105         return consumer