-
- /**
- * fetch messages
- *
- * @return list of messages
- * @throws MRApiException when error encountered by underlying libraries
- */
- public Iterable<String> fetch() throws MRApiException;
-
- /**
- * close underlying library consumer
- */
- public void close();
-
- /**
- * MR based consumer
- */
- public static class DmaapConsumerWrapper implements BusConsumer {
-
- /**
- * MR Consumer
- */
- protected MRConsumerImpl consumer;
-
- /**
- * MR Consumer Wrapper
- *
- * @param servers messaging bus hosts
- * @param topic topic
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param aafLogin AAF Login
- * @param aafPassword AAF Password
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Fetch Timeout
- * @param fetchLimit Fetch Limit
- */
- public DmaapConsumerWrapper(List<String> servers, String topic,
- String aafLogin, String aafPassword,
- String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit) throws MalformedURLException{
-
- this.consumer = new MRConsumerImpl(servers, topic,
- consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit,
- null, aafLogin, aafPassword);
-
- this.consumer.setUsername(aafLogin);
- this.consumer.setPassword(aafPassword);
-
- this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
-
- Properties props = new Properties();
- props.setProperty("Protocol", "http");
- this.consumer.setProps(props);
- this.consumer.setHost(servers.get(0) + ":3904");
- }
-
- /**
- * {@inheritDoc}
- */
- public Iterable<String> fetch() throws MRApiException {
- try {
- return this.consumer.fetch();
+
+ /**
+ * fetch messages.
+ *
+ * @return list of messages
+ * @throws MRApiException when error encountered by underlying libraries
+ */
+ Iterable<String> fetch() throws MRApiException;
+
+ /**
+ * close underlying library consumer.
+ */
+ void close();
+
+ /**
+ * MR based consumer.
+ */
+ class DmaapConsumerWrapper implements BusConsumer {
+
+ /**
+ * MR Consumer.
+ */
+ protected MRConsumerImpl consumer;
+
+ /**
+ * MR Consumer Wrapper.
+ *
+ * @param servers messaging bus hosts
+ * @param topic topic
+ * @param apiKey API Key
+ * @param apiSecret API Secret
+ * @param aafLogin AAF Login
+ * @param aafPassword AAF Password
+ * @param consumerGroup Consumer Group
+ * @param consumerInstance Consumer Instance
+ * @param fetchTimeout Fetch Timeout
+ * @param fetchLimit Fetch Limit
+ */
+ public DmaapConsumerWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
+ String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit)
+ throws MalformedURLException {
+
+ this(new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, null,
+ aafLogin, aafPassword), aafLogin, aafPassword, servers.get(0));
+
+ }
+
+ DmaapConsumerWrapper(MRConsumerImpl consumer, String aafLogin, String aafPassword, String host) {
+ this.consumer = consumer;
+ this.consumer.setUsername(aafLogin);
+ this.consumer.setPassword(aafPassword);
+ this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
+ this.consumer.setHost(host + ":3904");
+
+ Properties props = new Properties();
+ props.setProperty("Protocol", "http");
+ this.consumer.setProps(props);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Iterable<String> fetch() throws MRApiException {
+ try {
+ return consumer.fetch();