* ============LICENSE_START=======================================================
* ONAP : APPC
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Copyright (C) 2017 Amdocs
* =============================================================================
* See the License for the specific language governing permissions and
* limitations under the License.
*
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
* ============LICENSE_END=========================================================
*/
public class DmaapConsumerImpl implements Consumer {
- private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class);
- private final Configuration configuration = ConfigurationFactory.getConfiguration();
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class);
+ private final Configuration configuration = ConfigurationFactory.getConfiguration();
// Default values
- private static final int DEFAULT_TIMEOUT_MS = 60000;
- private static final int DEFAULT_LIMIT = 1000;
- private String topic;
- private boolean isMetricEnabled = false;
- private boolean useHttps = false;
- private MetricRegistry metricRegistry;
- private MRConsumer client = null;
- private Properties props = null;
-
+ private static final int DEFAULT_TIMEOUT_MS = 60000;
+ private static final int DEFAULT_LIMIT = 1000;
+ private String topic;
+ private boolean isMetricEnabled = false;
+ private boolean useHttps = false;
+ private MetricRegistry metricRegistry;
+ private MRConsumer client = null;
+ private Properties props = null;
public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,
- String user, String password) {
+ String user, String password) {
- this(urls, topicName, consumerGroupName, consumerId,user, password,null);
+ this(urls, topicName, consumerGroupName, consumerId, user, password, null);
}
public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,
- String user, String password, String filter) {
+ String user, String password, String filter) {
this.topic = topicName;
this.props = new Properties();
props.setProperty("host", urlsStr);
props.setProperty("group", consumerGroupName);
props.setProperty("id", consumerId);
- props.setProperty("username", user);
- props.setProperty("password", password);
+ if (user != null && password != null) {
+ props.setProperty("username", user);
+ props.setProperty("password", password);
+ } else {
+ props.setProperty("TransportType", "HTTPNOAUTH");
+ }
+
if (filter != null) {
props.setProperty("filter", filter);
}
metricRegistry = metricService.createRegistry("APPC");
DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory()
- .dmaapRequestCounterBuilder()
- .withName("DMAAP_KPI").withType(MetricType.COUNTER)
- .withRecievedMessage(0)
- .withPublishedMessage(0)
- .build();
+ .dmaapRequestCounterBuilder().withName("DMAAP_KPI").withType(MetricType.COUNTER)
+ .withRecievedMessage(0).withPublishedMessage(0).build();
if (metricRegistry.register(dmaapKpiMetric)) {
- Metric[] metrics = new Metric[]{dmaapKpiMetric};
+ Metric[] metrics = new Metric[] { dmaapKpiMetric };
LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics);
LogPublisher[] logPublishers = new LogPublisher[1];
logPublishers[0] = logPublisher;
PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory()
- .scheduledPolicyBuilder().withPublishers(logPublishers)
- .withMetrics(metrics)
- .build();
+ .scheduledPolicyBuilder().withPublishers(logPublishers).withMetrics(metrics).build();
LOG.debug("Policy getting initialized");
manuallyScheduledPublishingPolicy.init();
*/
private synchronized MRConsumer getClient(int waitMs, int limit) {
try {
- props.setProperty("timeout",String.valueOf(waitMs));
- props.setProperty("limit",String.valueOf(limit));
- String topicProducerPropFileName = DmaapUtil.createConsumerPropFile(topic,props);
+ props.setProperty("timeout", String.valueOf(waitMs));
+ props.setProperty("limit", String.valueOf(limit));
+ String topicProducerPropFileName = DmaapUtil.createConsumerPropFile(topic, props);
return MRClientFactory.createConsumer(topicProducerPropFileName);
} catch (IOException e1) {
- LOG.error("failed to createConsumer",e1);
+ LOG.error("failed to createConsumer", e1);
return null;
}
}
@Override
public synchronized void updateCredentials(String key, String secret) {
LOG.info(String.format("Setting auth to %s for %s", key, this.toString()));
- props.setProperty("user",String.valueOf(key));
- props.setProperty("password",String.valueOf(secret));
+ props.setProperty("username", String.valueOf(key));
+ props.setProperty("password", String.valueOf(secret));
client = null;
}
}
}
+ public Properties getProperties() {
+ return props;
+ }
+
+ public boolean isHttps() {
+ return useHttps;
+ }
}