hv-ves ete fix warning 'Sending username and password in the clear'
[testsuite/python-testing-utils.git] / robotframework-onap / ONAPLibrary / KafkaKeywords.py
1 # Copyright 2019 AT&T Intellectual Property. All rights reserved.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 from kafka import KafkaConsumer
16 from kafka import KafkaProducer
17 from kafka import TopicPartition
18 import ssl
19 from robot.api.deco import keyword
20 from robot import utils
21 import logging
22
23 logging.getLogger("kafka").setLevel(logging.CRITICAL)
24
25 class KafkaKeywords(object):
26     """ Utilities useful for Kafka consuming and producing """
27
28     def __init__(self):
29         super(KafkaKeywords, self).__init__()
30         self._cache = utils.ConnectionCache('No Kafka Environments created')
31
32     @keyword
33     def connect(self, alias, kafka_host, sasl_user, sasl_password):
34         """connect to the specified kafka server"""
35         client = {
36             "bootstrap_servers": kafka_host,
37             "sasl_plain_username": sasl_user,
38             "sasl_plain_password": sasl_password,
39             "security_protocol": 'SASL_PLAINTEXT',
40             "ssl_context": ssl.create_default_context(),
41             "sasl_mechanism": 'PLAIN'
42         }
43         self._cache.register(client, alias=alias)
44
45     @keyword
46     def produce(self, alias, topic, key, value):
47         assert topic
48         assert value
49
50         producer = self._get_producer(alias)
51         return producer.send(topic, value=value, key=key)
52
53     def _get_producer(self, alias):
54         cache = self._cache.switch(alias)
55         prod = KafkaProducer(bootstrap_servers=cache['bootstrap_servers'],
56                              sasl_plain_username=cache['sasl_plain_username'],
57                              sasl_plain_password=cache['sasl_plain_password'],
58                              security_protocol=cache['security_protocol'],
59                              ssl_context=cache['ssl_context'],
60                              sasl_mechanism=cache['sasl_mechanism'],
61                              request_timeout_ms=5000)
62         return prod
63
64     @keyword
65     def consume(self, alias, topic_name, consumer_group=None):
66         assert topic_name
67
68         consumer = self._get_consumer(alias, topic_name, consumer_group)
69         msg = next(consumer)
70         consumer.close(autocommit=True)
71         if msg is None:
72             return None
73         else:
74             return msg.value
75
76     def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=False):
77         assert topic_name
78
79         # default to the topic as group name
80         if consumer_group:
81             cgn = consumer_group
82         else:
83             cgn = topic_name
84
85         cache = self._cache.switch(alias)
86
87         consumer = KafkaConsumer(bootstrap_servers=cache['bootstrap_servers'],
88                                  sasl_plain_username=cache['sasl_plain_username'],
89                                  sasl_plain_password=cache['sasl_plain_password'],
90                                  security_protocol=cache['security_protocol'],
91                                  ssl_context=cache['ssl_context'],
92                                  sasl_mechanism=cache['sasl_mechanism'],
93                                  group_id=cgn,
94                                  request_timeout_ms=10001)
95
96         consumer.topics()
97         partition_set = consumer.partitions_for_topic(str(topic_name))
98         partitions = []
99         for val in partition_set:
100             partitions.append(TopicPartition(str(topic_name), val))
101         consumer.assign(partitions)
102         last = consumer.end_offsets(partitions)
103         offset = max(last.values())
104
105         if set_offset_to_earliest:
106             consumer.seek_to_beginning()
107         else:
108             for tp in partitions:
109                 consumer.seek(tp, offset - 1)
110         return consumer