import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-
import java.io.IOException;
import java.security.GeneralSecurityException;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
class UEBMessagingService implements MessagingService {
- private Consumer consumer;
- private Producer producer;
+ private final EELFLogger logger = EELFManager.getInstance().getLogger(UEBMessagingService.class);
- private final String DEFAULT_READ_TIMEOUT_MS = "60000";
- private final String DEFAULT_READ_LIMIT = "1000";
+ private static final String DEFAULT_READ_TIMEOUT_MS = "60000";
+ private static final String DEFAULT_READ_LIMIT = "1000";
+ private Consumer consumer;
+ private Producer producer;
private int readLimit;
- private final EELFLogger LOG = EELFManager.getInstance().getLogger(UEBMessagingService.class);
-
+ @Override
@SuppressWarnings("Since15")
- public void init(Properties props) throws IOException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException {
+ public void init(Properties props)
+ throws IOException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException {
if (props != null) {
String readTopic = props.getProperty(UEBPropertiesKeys.TOPIC_READ);
String readLimitString = props.getProperty(UEBPropertiesKeys.READ_LIMIT, DEFAULT_READ_LIMIT);
readLimit = Integer.parseInt(readLimitString);
//get hosts pool
- Collection<String> pool = new HashSet<String>();
+ Collection<String> pool = new HashSet<>();
String hostNames = props.getProperty(UEBPropertiesKeys.HOSTS);
if (hostNames != null && !hostNames.isEmpty()) {
- for (String name : hostNames.split(",")) {
- pool.add(name);
- }
+ pool.addAll(Arrays.asList(hostNames.split(",")));
}
-
//generate consumer id and group - same value for both
String consumerName = UUID.randomUUID().toString();
- String consumerID = consumerName;
//create consumer and producer
- consumer = new ConsumerImpl(pool, readTopic, consumerName, consumerID, readTimeout, apiKey, apiSecret);
+ consumer = new ConsumerImpl(pool, readTopic, consumerName, consumerName, readTimeout, apiKey, apiSecret);
producer = new ProducerImpl(pool, writeTopic, apiKey, apiSecret);
//initial consumer registration
try {
consumer.registerForRead();
- }catch(Exception e){
- LOG.error("Message consumer failed to register client "+consumerID);
+ } catch (Exception e) {
+ logger.error("Message consumer failed to register client " + consumerName, e);
}
}
}
+ @Override
public void send(String partition, String body) throws IOException {
producer.post(partition, body);
}
+ @Override
public List<String> fetch() throws IOException {
return consumer.fetch(readLimit);
}
+ @Override
public List<String> fetch(int limit) throws IOException {
return consumer.fetch(limit);
}
producer.close();
}
-}
+}
\ No newline at end of file