*/
public VESMsgKafkaConsumer(Properties strimziKafkaProperties, Properties consumerProperties) {
Properties props = new Properties();
- props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrapServers"));
- props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, strimziKafkaProperties.getProperty("securityProtocol"));
- props.put(SaslConfigs.SASL_MECHANISM, strimziKafkaProperties.getProperty("saslMechanism"));
- props.put(SaslConfigs.SASL_JAAS_CONFIG, strimziKafkaProperties.getProperty("saslJaasConfig"));
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrap.servers"));
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, strimziKafkaProperties.getProperty("security.protocol"));
+ props.put(SaslConfigs.SASL_MECHANISM, strimziKafkaProperties.getProperty("sasl.mechanism"));
+ props.put(SaslConfigs.SASL_JAAS_CONFIG, strimziKafkaProperties.getProperty("sasl.jaas.config"));
props.put(ConsumerConfig.GROUP_ID_CONFIG,
consumerProperties.getProperty("consumerGroup") + "-" + consumerProperties.getProperty("topic"));
props.put(ConsumerConfig.CLIENT_ID_CONFIG,
public String getTopicName() {
return topicName;
}
+
+ public void stop() {
+ consumer.unsubscribe();
+ consumer.close();
+ }
}