improved unicode encoding for the string in py2 and py3
[testsuite/python-testing-utils.git] / robotframework-onap / ONAPLibrary / KafkaKeywords.py
1 # Copyright 2019 AT&T Intellectual Property. All rights reserved.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 from kafka import KafkaConsumer
16 from kafka import KafkaProducer
17 from kafka import TopicPartition
18 import ssl
19 from robot.api.deco import keyword
20 from robot import utils
21
22
23 class KafkaKeywords(object):
24     """ Utilities useful for Kafka consuming and producing """
25
26     def __init__(self):
27         super(KafkaKeywords, self).__init__()
28         self._cache = utils.ConnectionCache('No Kafka Environments created')
29
30     @keyword
31     def connect(self, alias, kafka_host, sasl_user, sasl_password):
32         """connect to the specified kafka server"""
33         client = {
34             "bootstrap_servers": kafka_host,
35             "sasl_plain_username": sasl_user,
36             "sasl_plain_password": sasl_password,
37             "security_protocol": 'SASL_PLAINTEXT',
38             "ssl_context": ssl.create_default_context(),
39             "sasl_mechanism": 'PLAIN'
40         }
41         self._cache.register(client, alias=alias)
42
43     @keyword
44     def produce(self, alias, topic, key, value):
45         assert topic
46         assert value
47
48         producer = self._get_producer(alias)
49         return producer.send(topic, value=value, key=key)
50
51     def _get_producer(self, alias):
52         cache = self._cache.switch(alias)
53         prod = KafkaProducer(bootstrap_servers=cache['bootstrap_servers'],
54                              sasl_plain_username=cache['sasl_plain_username'],
55                              sasl_plain_password=cache['sasl_plain_password'],
56                              security_protocol=cache['security_protocol'],
57                              ssl_context=cache['ssl_context'],
58                              sasl_mechanism=cache['sasl_mechanism'],
59                              request_timeout_ms=5000)
60         return prod
61
62     @keyword
63     def consume(self, alias, topic_name, consumer_group=None):
64         assert topic_name
65
66         consumer = self._get_consumer(alias, topic_name, consumer_group)
67         msg = next(consumer)
68         consumer.close(autocommit=True)
69         if msg is None:
70             return None
71         else:
72             return msg.value
73
74     def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=False):
75         assert topic_name
76
77         # default to the topic as group name
78         if consumer_group:
79             cgn = consumer_group
80         else:
81             cgn = topic_name
82
83         cache = self._cache.switch(alias)
84
85         consumer = KafkaConsumer(bootstrap_servers=cache['bootstrap_servers'],
86                                  sasl_plain_username=cache['sasl_plain_username'],
87                                  sasl_plain_password=cache['sasl_plain_password'],
88                                  security_protocol=cache['security_protocol'],
89                                  ssl_context=cache['ssl_context'],
90                                  sasl_mechanism=cache['sasl_mechanism'],
91                                  group_id=cgn,
92                                  request_timeout_ms=10001)
93
94         consumer.topics()
95         partition_set = consumer.partitions_for_topic(str(topic_name))
96         partitions = []
97         for val in partition_set:
98             partitions.append(TopicPartition(str(topic_name), val))
99         consumer.assign(partitions)
100         last = consumer.end_offsets(partitions)
101         offset = max(last.values())
102
103         if set_offset_to_earliest:
104             consumer.seek_to_beginning()
105         else:
106             for tp in partitions:
107                 consumer.seek(tp, offset - 1)
108
109         return consumer