- MutablePair<Integer, String> consumerGroupPair = null;
-
- synchronized (consumerGroupsMap) {
- Map<String, MutablePair<Integer, String>> consumerGroupMap = consumerGroupsMap.get(topicName);
- if (consumerGroupMap == null) {
- consumerGroupMap = new LinkedHashMap<>();
- consumerGroupsMap.put(topicName, consumerGroupMap);
- LOGGER.trace(
- TOPIC_TAG + topicName + ", Created consumer map entry for consumer group " + consumerGroup);
- }
-
- consumerGroupPair = consumerGroupMap.get(consumerGroup);
- if (consumerGroupPair == null) {
- consumerGroupPair = new MutablePair<>(-1, consumerId);
- consumerGroupMap.put(consumerGroup, consumerGroupPair);
- LOGGER.trace(TOPIC_TAG + topicName + ", Created consumer group entry for consumer group "
- + consumerGroup + ":" + consumerId);
- }
- }
+ try {
+ List<String> lst = topic2data.computeIfAbsent(topicName, this::makeTopicData).read(consumerGroup, limit,
+ timeoutMs);