import com.att.dmf.mr.constants.CambriaConstants;
import com.att.dmf.mr.exception.DMaaPErrorMessages;
import com.att.dmf.mr.utils.ConfigurationReader;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
+
+
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.nsa.metrics.CdmTimer;
NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED
}
- // @Qualifier("kafkalockavoid")
-
- // @Resource
- // @Qualifier("kafkalockavoid")
- // KafkaLiveLockAvoider2 kafkaLiveLockAvoider;
+
+
@Autowired
private DMaaPErrorMessages errorMessages;
- // KafkaLiveLockAvoider kafkaLiveLockAvoider = new KafkaLiveLockAvoider();
+
/**
* User defined exception class for kafka consumer cache
*
EnsurePath ensurePath = new EnsurePath(fBaseZkPath);
ensurePath.ensure(curator.getZookeeperClient());
- // final long freq = fSettings.getLong(kSetting_SweepEverySeconds,
- // kDefault_SweepEverySeconds);
+
+
long freq = kDefault_SweepEverySeconds;
String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
kSetting_SweepEverySeconds);
if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
throw new KafkaConsumerCacheException("The cache service is unavailable.");
ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
- // final String consumerKey = makeConsumerKey(topic, consumerGroupId,
- // clientId);
+
+
Enumeration<String> strEnum = fConsumers.keys();
String consumerLocalKey = null;
while (strEnum.hasMoreElements()) {
if (consumerLocalKey.startsWith(topicgroup) && (!consumerLocalKey.endsWith("::" + clientId))) {
- // System.out.println("consumer key returning from
- // getConsumerListforCG +++++++++ " + consumerLocalKey
- // + " " + fConsumers.get(consumerLocalKey));
+
+
+
kcl.add(fConsumers.get(consumerLocalKey));
}
if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
throw new KafkaConsumerCacheException("The cache service is unavailable.");
ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
- // final String consumerKey = makeConsumerKey(topic, consumerGroupId,
- // clientId);
+
Enumeration<String> strEnum = fConsumers.keys();
String consumerLocalKey = null;
while (strEnum.hasMoreElements()) {
if (consumerLocalKey.startsWith(group)) {
- // System.out.println("consumer key returning from
- // getConsumerListforCG +++++++++ " + consumerLocalKey
- // + " " + fConsumers.get(consumerLocalKey));
+
kcl.add(fConsumers.get(consumerLocalKey));
}
final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
fConsumers.put(consumerKey, consumer);
- // String appId = "node-instance-"+i;
+
log.info("^@ Consumer Added to Cache Consumer Key" + consumerKey + " ApiId" + fApiId);
}
final Kafka011Consumer kc = fConsumers.get(key);
log.info("closing Kafka consumer " + key + " object " + kc);
if (kc != null) {
- // log.info("closing Kafka consumer " + key);
+
if (kc.close()) {
fConsumers.remove(key);
mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs);
}
- // final long mustTouchEveryMs =
- // fSettings.getLong(kSetting_TouchEveryMs, kDefault_MustTouchEveryMs);
+
final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs;
for (Entry<String, Kafka011Consumer> e : fConsumers.entrySet()) {
}
private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class);
- // private static final Logger log =
- // LoggerFactory.getLogger(KafkaConsumerCache.class);
+
}
\ No newline at end of file