add sasl support to kafka 20/90120/1
authorDR695H <dr695h@att.com>
Tue, 18 Jun 2019 21:11:30 +0000 (17:11 -0400)
committerDR695H <dr695h@att.com>
Tue, 18 Jun 2019 21:13:02 +0000 (17:13 -0400)
Change-Id: I372bcc8a478d137f58b6ab9017a555f584e48f30
Issue-ID: TEST-158
Signed-off-by: DR695H <dr695h@att.com>
robotframework-onap/ONAPLibrary/KafkaKeywords.py
robotframework-onap/setup.py

index 6da5b52..41ef976 100644 (file)
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from pykafka import KafkaClient
-from pykafka.common import OffsetType
+
+from kafka import KafkaConsumer
+from kafka import KafkaProducer
+import ssl
 from robot.api.deco import keyword
 from robot import utils
 
@@ -25,9 +27,16 @@ class KafkaKeywords(object):
         self._cache = utils.ConnectionCache('No Kafka Environments created')
 
     @keyword
-    def connect(self, alias, kafka_host, kafka_version="1.0.0"):
+    def connect(self, alias, kafka_host, sasl_user, sasl_password):
         """connect to the specified kafka server"""
-        client = KafkaClient(hosts=kafka_host, broker_version=kafka_version)
+        client = {
+            "bootstrap_servers": kafka_host,
+            "sasl_plain_username": sasl_user,
+            "sasl_plain_password": sasl_password,
+            "security_protocol": 'SASL_SSL',
+            "ssl_context": ssl.create_default_context(),
+            "sasl_mechanism": 'PLAIN'
+        }
         self._cache.register(client, alias=alias)
 
     @keyword
@@ -35,12 +44,18 @@ class KafkaKeywords(object):
         assert topic
         assert value
 
-        producer = self._get_producer(alias, topic)
-        return producer.produce(value, key)
+        producer = self._get_producer(alias)
+        return producer.send(topic, value=value, key=key)
 
-    def _get_producer(self, alias, topic_name):
-        topic = self._cache.switch(alias).topics[topic_name]
-        prod = topic.get_sync_producer()
+    def _get_producer(self, alias):
+        cache = self._cache.switch(alias)
+        prod = KafkaProducer(bootstrap_servers=cache['bootstrap_servers'],
+                             sasl_plain_username=cache['sasl_plain_username'],
+                             sasl_plain_password=cache['sasl_password'],
+                             security_protocol=cache['security_protocol'],
+                             ssl_context=cache['ssl_context'],
+                             sasl_mechanism=cache['sasl_mechanism'],
+                             request_timeout_ms=5000)
         return prod
 
     @keyword
@@ -48,7 +63,8 @@ class KafkaKeywords(object):
         assert topic_name
 
         consumer = self._get_consumer(alias, topic_name, consumer_group)
-        msg = consumer.consume()
+        msg = next(consumer)
+        consumer.close(autocommit=True)
         if msg is None:
             return None
         else:
@@ -63,14 +79,22 @@ class KafkaKeywords(object):
         else:
             cgn = topic_name
 
-        topic = self._cache.switch(alias).topics[topic_name]
+        cache = self._cache.switch(alias)
+
+        consumer = KafkaConsumer(bootstrap_servers=cache['bootstrap_servers'],
+                                 sasl_plain_username=cache['sasl_plain_username'],
+                                 sasl_plain_password=cache['sasl_password'],
+                                 security_protocol=cache['security_protocol'],
+                                 ssl_context=cache['ssl_context'],
+                                 sasl_mechanism=cache['sasl_mechanism'],
+                                 group_id=cgn,
+                                 request_timeout_ms=5000)
 
-        offset_type = OffsetType.LATEST
         if set_offset_to_earliest:
-            offset_type = OffsetType.EARLIEST
+            consumer.seek_to_beginning()
+        else:
+            consumer.seek_to_end()
 
-        c = topic.get_simple_consumer(
-                consumer_group=cgn, auto_offset_reset=offset_type, auto_commit_enable=True,
-                reset_offset_on_start=True, consumer_timeout_ms=5000)
+        consumer.topics()
 
-        return c
+        return consumer
index ddc8f79..76a0f2c 100644 (file)
@@ -39,7 +39,7 @@ setup(
         'requests',
         'future',
         'robotframework-requests',
-        'pykafka',
+        'kafka-python',
         'urllib3'
     ],  # what we need
     packages=['eteutils', 'loadtest', 'vcpeutils', 'ONAPLibrary'],       # The name of your scripts package