[KAFKA] Add support for scram-sha on kafka client 76/127176/2
authorefiacor <fiachra.corcoran@est.tech>
Thu, 17 Feb 2022 13:08:00 +0000 (13:08 +0000)
committerFiachra Corcoran <fiachra.corcoran@est.tech>
Tue, 1 Mar 2022 15:27:47 +0000 (15:27 +0000)
Signed-off-by: efiacor <fiachra.corcoran@est.tech>
Change-Id: I77bbfe97e6cade41b91b5b2c5fd04fd0e5aa4e34
Issue-ID: DMAAP-1705

robotframework-onap/ONAPLibrary/KafkaKeywords.py

index ba49d68..46e0b3b 100644 (file)
@@ -1,4 +1,5 @@
 # Copyright 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (C) 2022 Nordix Foundation
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -31,15 +32,15 @@ class KafkaKeywords(object):
         self._cache = utils.ConnectionCache('No Kafka Environments created')
 
     @keyword
-    def connect(self, alias, kafka_host, sasl_user, sasl_password):
+    def connect(self, alias, kafka_host, sasl_user, sasl_password, sasl_mechanism="SCRAM-SHA-512"):
         """connect to the specified kafka server"""
         client = {
             "bootstrap_servers": kafka_host,
-            "sasl_plain_username": sasl_user,
-            "sasl_plain_password": sasl_password,
+            "sasl_username": sasl_user,
+            "sasl_password": sasl_password,
             "security_protocol": 'SASL_PLAINTEXT',
             "ssl_context": ssl.create_default_context(),
-            "sasl_mechanism": 'PLAIN'
+            "sasl_mechanism": sasl_mechanism
         }
         self._cache.register(client, alias=alias)
 
@@ -54,8 +55,8 @@ class KafkaKeywords(object):
     def _get_producer(self, alias):
         cache = self._cache.switch(alias)
         prod = KafkaProducer(bootstrap_servers=cache['bootstrap_servers'],
-                             sasl_plain_username=cache['sasl_plain_username'],
-                             sasl_plain_password=cache['sasl_plain_password'],
+                             sasl_username=cache['sasl_username'],
+                             sasl_password=cache['sasl_password'],
                              security_protocol=cache['security_protocol'],
                              ssl_context=cache['ssl_context'],
                              sasl_mechanism=cache['sasl_mechanism'],
@@ -86,8 +87,8 @@ class KafkaKeywords(object):
         cache = self._cache.switch(alias)
 
         consumer = KafkaConsumer(bootstrap_servers=cache['bootstrap_servers'],
-                                 sasl_plain_username=cache['sasl_plain_username'],
-                                 sasl_plain_password=cache['sasl_plain_password'],
+                                 sasl_username=cache['sasl_username'],
+                                 sasl_password=cache['sasl_password'],
                                  security_protocol=cache['security_protocol'],
                                  ssl_context=cache['ssl_context'],
                                  sasl_mechanism=cache['sasl_mechanism'],