Merge "fix oauth code"
[ccsdk/features.git] / sdnr / wt / mountpoint-registrar / provider / src / main / java / org / onap / ccsdk / features / sdnr / wt / mountpointregistrar / kafka / VESMsgKafkaConsumer.java
index 80e232a..06e32e4 100644 (file)
@@ -33,10 +33,10 @@ public class VESMsgKafkaConsumer {
      */
     public VESMsgKafkaConsumer(Properties strimziKafkaProperties, Properties consumerProperties) {
         Properties props = new Properties();
-        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrapServers"));
-        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, strimziKafkaProperties.getProperty("securityProtocol"));
-        props.put(SaslConfigs.SASL_MECHANISM, strimziKafkaProperties.getProperty("saslMechanism"));
-        props.put(SaslConfigs.SASL_JAAS_CONFIG, strimziKafkaProperties.getProperty("saslJaasConfig"));
+        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrap.servers"));
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, strimziKafkaProperties.getProperty("security.protocol"));
+        props.put(SaslConfigs.SASL_MECHANISM, strimziKafkaProperties.getProperty("sasl.mechanism"));
+        props.put(SaslConfigs.SASL_JAAS_CONFIG, strimziKafkaProperties.getProperty("sasl.jaas.config"));
         props.put(ConsumerConfig.GROUP_ID_CONFIG,
                 consumerProperties.getProperty("consumerGroup") + "-" + consumerProperties.getProperty("topic"));
         props.put(ConsumerConfig.CLIENT_ID_CONFIG,
@@ -78,4 +78,9 @@ public class VESMsgKafkaConsumer {
     public String getTopicName() {
         return topicName;
     }
+
+    public void stop() {
+        consumer.unsubscribe();
+        consumer.close();
+    }
 }