package org.onap.appc.adapter.messaging.dmaap.impl;
-
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.nsa.mr.client.MRBatchingPublisher;
import com.att.nsa.mr.client.MRClientFactory;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.onap.appc.adapter.message.Producer;
-import org.onap.appc.adapter.messaging.dmaap.impl.DmaapUtil;
import org.onap.appc.configuration.Configuration;
import org.onap.appc.configuration.ConfigurationFactory;
import org.onap.appc.metricservice.MetricRegistry;
import org.onap.appc.metricservice.MetricService;
+import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric;
import org.onap.appc.metricservice.metric.Metric;
import org.onap.appc.metricservice.metric.MetricType;
-import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric;
import org.onap.appc.metricservice.policy.PublishingPolicy;
import org.onap.appc.metricservice.publisher.LogPublisher;
import org.osgi.framework.BundleContext;
import org.osgi.framework.FrameworkUtil;
import org.osgi.framework.ServiceReference;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
public class DmaapProducerImpl implements Producer {
private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducerImpl.class);
private static final Configuration configuration = ConfigurationFactory.getConfiguration();
- private Set<String> topics = new HashSet<String>();
+ private Set<String> topics;
private Properties props = null;
- private static MetricRegistry metricRegistry;
+ private MetricRegistry metricRegistry;
private boolean useHttps = false;
- private boolean isMetricEnabled=false;
+ private boolean isMetricEnabled = false;
private Set<MRBatchingPublisher> clients;
this(urls, (Set<String>)null, user, password);
this.topics = new HashSet<>();
if (topicName != null) {
- for (String topic : topicName.split(",")) {
- topics.add(topic);
- }
+ Collections.addAll(topics, topicName.split(","));
}
}
public DmaapProducerImpl(Collection<String> urls, Set<String> topicNames, String user, String password) {
topics = topicNames;
- if(urls == null || user == null || password == null){
- throw new IllegalArgumentException("one of these mandaory argument is null: urls, user, password" );
+ if (urls == null || user == null || password == null) {
+ throw new IllegalArgumentException("one of these mandaory argument is null: urls, user, password");
}
this.props = new Properties();
String urlsStr = StringUtils.join(urls, ',');
props.setProperty("username",user);
props.setProperty("password",password);
}
+
private void initMetric() {
LOG.debug("Metric getting initialized");
MetricService metricService = getMetricservice();
- metricRegistry=metricService.createRegistry("APPC");
- DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory().
- dmaapRequestCounterBuilder().
- withName("DMAAP_KPI").withType(MetricType.COUNTER).
- withRecievedMessage(0)
+ if (metricService != null) {
+ metricRegistry = metricService.createRegistry("APPC");
+
+ DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory()
+ .dmaapRequestCounterBuilder()
+ .withName("DMAAP_KPI").withType(MetricType.COUNTER)
+ .withRecievedMessage(0)
.withPublishedMessage(0)
.build();
- if(metricRegistry.register(dmaapKpiMetric)) {
- Metric[] metrics = new Metric[]{dmaapKpiMetric};
- LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics);
- LogPublisher[] logPublishers = new LogPublisher[1];
- logPublishers[0] = logPublisher;
- PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory().
- scheduledPolicyBuilder().withPublishers(logPublishers).
- withMetrics(metrics).
- build();
- LOG.debug("Policy getting initialized");
- manuallyScheduledPublishingPolicy.init();
- LOG.debug("Metric initialized");
- }
+ if (metricRegistry.register(dmaapKpiMetric)) {
+ Metric[] metrics = new Metric[]{dmaapKpiMetric};
+ LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics);
+ LogPublisher[] logPublishers = new LogPublisher[1];
+ logPublishers[0] = logPublisher;
+
+ PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory()
+ .scheduledPolicyBuilder()
+ .withPublishers(logPublishers)
+ .withMetrics(metrics)
+ .build();
+
+ LOG.debug("Policy getting initialized");
+ manuallyScheduledPublishingPolicy.init();
+ LOG.debug("Metric initialized");
+ }
+ }
}
+
private Set<MRBatchingPublisher> getClients() {
Set<MRBatchingPublisher> out = new HashSet<>();
for (String topic : topics) {
try {
String topicProducerPropFileName = DmaapUtil.createProducerPropFile(topic,props);
- final MRBatchingPublisher client = MRClientFactory.createBatchingPublisher (topicProducerPropFileName);
+ final MRBatchingPublisher client = MRClientFactory.createBatchingPublisher(topicProducerPropFileName);
out.add(client);
} catch (Exception e) {
- LOG.error(e.getMessage());
+ LOG.error(e.getMessage(), e);
}
}
-
return out;
}
@Override
public synchronized void updateCredentials(String key, String secret) {
LOG.info(String.format("Setting auth to %s for %s", key, this.toString()));
- props.setProperty("user",String.valueOf(key));
- props.setProperty("password",String.valueOf(secret));
+ props.setProperty("user", String.valueOf(key));
+ props.setProperty("password", String.valueOf(secret));
clients = null;
}
@Override
public boolean post(String partition, String data) {
boolean success = true;
- Properties properties=configuration.getProperties();
- if(properties!=null && properties.getProperty("metric.enabled")!=null ){
- isMetricEnabled=Boolean.valueOf(properties.getProperty("metric.enabled"));
+ Properties properties = configuration.getProperties();
+ if (properties != null && properties.getProperty("metric.enabled") != null ) {
+ isMetricEnabled = Boolean.valueOf(properties.getProperty("metric.enabled"));
}
- if(isMetricEnabled){
+ if (isMetricEnabled) {
initMetric();
}
LOG.debug(String.format("Posting %s to %s", data, client));
client.send(partition, data);
} catch (IOException e) {
- LOG.error(e.getMessage());
+ LOG.error(e.getMessage(), e);
success = false;
}
}
- if(isMetricEnabled){
- ( (DmaapRequestCounterMetric) metricRegistry.metric("DMAAP_KPI")).incrementPublishedMessage();
- }
+ incrementPublishedMessage();
return success;
}
+ private void incrementPublishedMessage() {
+ if (isMetricEnabled && metricRegistry != null) {
+ ((DmaapRequestCounterMetric) metricRegistry.metric("DMAAP_KPI")).incrementPublishedMessage();
+ }
+ }
+
/**
- * Close producer Dmaap client
+ * Close producer Dmaap client.
*/
@Override
public void close() {
if ((clients == null) || (clients.isEmpty())) {
return;
}
-
LOG.debug("Closing Dmaap producer clients....");
for (MRBatchingPublisher client : clients) {
try {
client.close(1, TimeUnit.SECONDS);
} catch (IOException | InterruptedException e) {
- LOG.warn(String.format("Failed to cleanly close Dmaap connection for [%s]", client));
+ LOG.warn(String.format("Failed to cleanly close Dmaap connection for [%s]", client), e);
}
}
}
if (sref != null) {
LOG.info("Metric Service from bundlecontext");
return (MetricService) bctx.getService(sref);
-
} else {
LOG.info("Metric Service error from bundlecontext");
LOG.warn("Cannot find service reference for org.onap.appc.metricservice.MetricService");
return null;
-
}
}
- public Metric getMetric(String name){
- return metricRegistry.metric(name);
- }
}