import org.junit.Test;
import org.onap.dmaap.mr.client.MRClientFactory;
-import org.onap.dmaap.mr.client.impl.MRConstants;
-import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
public class MRConsumerImplTest extends TestCase {
public void testNullFilter() throws IOException {
final LinkedList<String> hosts = new LinkedList<String>();
hosts.add("localhost:8080");
- final MRConsumerImpl c = new MRConsumerImpl(hosts, "topic", "cg", "cid", -1, -1, null, null, null);
+ final MRConsumerImpl c = new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(hosts)
+ .setTopic("topic").setConsumerGroup("cg").setConsumerId("cid").setTimeoutMs(-1)
+ .setLimit(-1).setFilter(null).setApiKey_username(null).setApiSecret_password(null)
+ .createMRConsumerImpl();
final String url = c.createUrlPath(MRConstants.makeConsumerUrl("localhost:8080", "topic", "cg", "cid", "http"),
-1, -1);
assertEquals("http://localhost:8080/events/" + "topic/cg/cid", url);
public void testFilterWithNoTimeoutOrLimit() throws IOException {
final LinkedList<String> hosts = new LinkedList<String>();
hosts.add("localhost:8080");
- final MRConsumerImpl c = new MRConsumerImpl(hosts, "topic", "cg", "cid", -1, -1, "filter", null, null);
+ final MRConsumerImpl c = new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(hosts)
+ .setTopic("topic").setConsumerGroup("cg").setConsumerId("cid").setTimeoutMs(-1)
+ .setLimit(-1).setFilter("filter").setApiKey_username(null)
+ .setApiSecret_password(null).createMRConsumerImpl();
final String url = c.createUrlPath(MRConstants.makeConsumerUrl("localhost:8080", "topic", "cg", "cid", "http"),
-1, -1);
assertEquals("http://localhost:8080/events/" + "topic/cg/cid?filter=filter", url);
public void testTimeoutNoLimitNoFilter() throws IOException {
final LinkedList<String> hosts = new LinkedList<String>();
hosts.add("localhost:8080");
- final MRConsumerImpl c = new MRConsumerImpl(hosts, "topic", "cg", "cid", 30000, -1, null, null, null);
+ final MRConsumerImpl c = new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(hosts)
+ .setTopic("topic").setConsumerGroup("cg").setConsumerId("cid").setTimeoutMs(30000)
+ .setLimit(-1).setFilter(null).setApiKey_username(null).setApiSecret_password(null)
+ .createMRConsumerImpl();
final String url = c.createUrlPath(MRConstants.makeConsumerUrl("localhost:8080", "topic", "cg", "cid", "http"),
30000, -1);
assertEquals("http://localhost:8080/events/" + "topic/cg/cid?timeout=30000", url);
public void testNoTimeoutWithLimitNoFilter() throws IOException {
final LinkedList<String> hosts = new LinkedList<String>();
hosts.add("localhost:8080");
- final MRConsumerImpl c = new MRConsumerImpl(hosts, "topic", "cg", "cid", -1, 100, null, null, null);
+ final MRConsumerImpl c = new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(hosts)
+ .setTopic("topic").setConsumerGroup("cg").setConsumerId("cid").setTimeoutMs(-1)
+ .setLimit(100).setFilter(null).setApiKey_username(null).setApiSecret_password(null)
+ .createMRConsumerImpl();
final String url = c.createUrlPath(MRConstants.makeConsumerUrl("localhost:8080", "topic", "cg", "cid", "http"),
-1, 100);
assertEquals("http://localhost:8080/events/" + "topic/cg/cid?limit=100", url);
public void testWithTimeoutWithLimitWithFilter() throws IOException {
final LinkedList<String> hosts = new LinkedList<String>();
hosts.add("localhost:8080");
- final MRConsumerImpl c = new MRConsumerImpl(hosts, "topic", "cg", "cid", 1000, 400, "f", null, null);
+ final MRConsumerImpl c = new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(hosts)
+ .setTopic("topic").setConsumerGroup("cg").setConsumerId("cid").setTimeoutMs(1000)
+ .setLimit(400).setFilter("f").setApiKey_username(null).setApiSecret_password(null)
+ .createMRConsumerImpl();
final String url = c.createUrlPath(MRConstants.makeConsumerUrl("localhost:8080", "topic", "cg", "cid", "http"),
1000, 400);
assertEquals("http://localhost:8080/events/" + "topic/cg/cid?timeout=1000&limit=400&filter=f", url);
public void testFilterEncoding() throws IOException {
final LinkedList<String> hosts = new LinkedList<String>();
hosts.add("localhost:8080");
- final MRConsumerImpl c = new MRConsumerImpl(hosts, "topic", "cg", "cid", -1, -1, "{ \"foo\"=\"bar\"bar\" }",
- null, null);
+ final MRConsumerImpl c = new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(hosts)
+ .setTopic("topic").setConsumerGroup("cg").setConsumerId("cid").setTimeoutMs(-1)
+ .setLimit(-1).setFilter("{ \"foo\"=\"bar\"bar\" }").setApiKey_username(null)
+ .setApiSecret_password(null).createMRConsumerImpl();
final String url = c.createUrlPath(MRConstants.makeConsumerUrl("localhost:8080", "topic", "cg", "cid", "http"),
-1, -1);
assertEquals("http://localhost:8080/events/" + "topic/cg/cid?filter=%7B+%22foo%22%3D%22bar%22bar%22+%7D", url);
MRClientFactory.prop=properties;
- final MRConsumerImpl c = new MRConsumerImpl(hosts, "topic", "cg", "cid", -1, -1, "{ \"foo\"=\"bar\"bar\" }",
- null, null);
+ final MRConsumerImpl c = new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(hosts)
+ .setTopic("topic").setConsumerGroup("cg").setConsumerId("cid").setTimeoutMs(-1)
+ .setLimit(-1).setFilter("{ \"foo\"=\"bar\"bar\" }").setApiKey_username(null)
+ .setApiSecret_password(null).createMRConsumerImpl();
c.setProps(properties);
assertNotNull(c.fetchWithReturnConsumerResponse());
c.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
properties.store(new FileOutputStream(outFile), "");
MRClientFactory.prop=properties;
- final MRConsumerImpl c = new MRConsumerImpl(hosts, "topic", "cg", "cid", -1, -1, "{ \"foo\"=\"bar\"bar\" }",
- null, null);
+ final MRConsumerImpl c = new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(hosts)
+ .setTopic("topic").setConsumerGroup("cg").setConsumerId("cid").setTimeoutMs(-1)
+ .setLimit(-1).setFilter("{ \"foo\"=\"bar\"bar\" }").setApiKey_username(null)
+ .setApiSecret_password(null).createMRConsumerImpl();
c.setProps(properties);
try {
c.fetch();