public class DmaapProducerImpl implements Producer {
- private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducerImpl.class);
- private static final Configuration configuration = ConfigurationFactory.getConfiguration();
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducerImpl.class);
+ private static final Configuration configuration = ConfigurationFactory.getConfiguration();
- private Set<String> topics;
+ private Set<String> topics;
- private Properties props = null;
- private MetricRegistry metricRegistry;
- private boolean useHttps = false;
- private boolean isMetricEnabled = false;
-
- private Set<MRBatchingPublisher> clients;
+ private Properties props = null;
+ private MetricRegistry metricRegistry;
+ private boolean useHttps = false;
+ private boolean isMetricEnabled = false;
+
+ private Set<MRBatchingPublisher> clients;
-
public DmaapProducerImpl(Collection<String> urls, String topicName, String user, String password) {
- this(urls, (Set<String>)null, user, password);
+ this(urls, (Set<String>) null, user, password);
this.topics = new HashSet<>();
if (topicName != null) {
Collections.addAll(topics, topicName.split(","));
public DmaapProducerImpl(Collection<String> urls, Set<String> topicNames, String user, String password) {
topics = topicNames;
- if (urls == null || user == null || password == null) {
- throw new IllegalArgumentException("one of these mandaory argument is null: urls, user, password");
+ if (urls == null) {
+ throw new IllegalArgumentException("Mandaory argument is null: urls");
}
this.props = new Properties();
String urlsStr = StringUtils.join(urls, ',');
- props.setProperty("host",urlsStr);
+ props.setProperty("host", urlsStr);
props.setProperty("id", UUID.randomUUID().toString());
- props.setProperty("username",user);
- props.setProperty("password",password);
+ if (user != null && password != null) {
+ props.setProperty("username", user);
+ props.setProperty("password", password);
+ } else {
+ props.setProperty("TransportType", "HTTPNOAUTH");
+ }
}
private void initMetric() {
metricRegistry = metricService.createRegistry("APPC");
DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory()
- .dmaapRequestCounterBuilder()
- .withName("DMAAP_KPI").withType(MetricType.COUNTER)
- .withRecievedMessage(0)
- .withPublishedMessage(0)
- .build();
+ .dmaapRequestCounterBuilder().withName("DMAAP_KPI").withType(MetricType.COUNTER)
+ .withRecievedMessage(0).withPublishedMessage(0).build();
if (metricRegistry.register(dmaapKpiMetric)) {
- Metric[] metrics = new Metric[]{dmaapKpiMetric};
+ Metric[] metrics = new Metric[] { dmaapKpiMetric };
LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics);
LogPublisher[] logPublishers = new LogPublisher[1];
logPublishers[0] = logPublisher;
PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory()
- .scheduledPolicyBuilder()
- .withPublishers(logPublishers)
- .withMetrics(metrics)
- .build();
+ .scheduledPolicyBuilder().withPublishers(logPublishers).withMetrics(metrics).build();
LOG.debug("Policy getting initialized");
manuallyScheduledPublishingPolicy.init();
Set<MRBatchingPublisher> out = new HashSet<>();
for (String topic : topics) {
try {
- String topicProducerPropFileName = DmaapUtil.createProducerPropFile(topic,props);
+ String topicProducerPropFileName = DmaapUtil.createProducerPropFile(topic, props);
final MRBatchingPublisher client = MRClientFactory.createBatchingPublisher(topicProducerPropFileName);
out.add(client);
} catch (Exception e) {
@Override
public synchronized void updateCredentials(String key, String secret) {
LOG.info(String.format("Setting auth to %s for %s", key, this.toString()));
- props.setProperty("user", String.valueOf(key));
+ props.setProperty("username", String.valueOf(key));
props.setProperty("password", String.valueOf(secret));
clients = null;
}
public boolean post(String partition, String data) {
boolean success = true;
Properties properties = configuration.getProperties();
- if (properties != null && properties.getProperty("metric.enabled") != null ) {
+ if (properties != null && properties.getProperty("metric.enabled") != null) {
isMetricEnabled = Boolean.valueOf(properties.getProperty("metric.enabled"));
}
if (isMetricEnabled) {
initMetric();
}
-
- // Create clients once and reuse them on subsequent posts. This is
+
+ // Create clients once and reuse them on subsequent posts. This is
// to support failover to other servers in the Dmaap cluster.
if ((clients == null) || (clients.isEmpty())) {
LOG.info("Getting CambriaBatchingPublisher Clients ...");
clients = getClients();
}
-
+
for (MRBatchingPublisher client : clients) {
try {
LOG.debug(String.format("Posting %s to %s", data, client));
for (MRBatchingPublisher client : clients) {
try {
client.close(1, TimeUnit.SECONDS);
- } catch (IOException | InterruptedException e) {
+ } catch (IOException | InterruptedException e) {
LOG.warn(String.format("Failed to cleanly close Dmaap connection for [%s]", client), e);
}
}
}
}
+ public Properties getProperties() {
+ return props;
+ }
+
+ public boolean isHttps() {
+ return useHttps;
+ }
}