+ /**
+ * fetch messages.
+ *
+ * @return list of messages
+ * @throws MRApiException when error encountered by underlying libraries
+ */
+ Iterable<String> fetch() throws MRApiException;
+
+ /**
+ * close underlying library consumer.
+ */
+ void close();
+
+ /**
+ * MR based consumer.
+ */
+ class DmaapConsumerWrapper implements BusConsumer {
+
+ /**
+ * MR Consumer.
+ */
+ protected MRConsumerImpl consumer;
+
+ /**
+ * MR Consumer Wrapper.
+ *
+ * @param servers messaging bus hosts
+ * @param topic topic
+ * @param apiKey API Key
+ * @param apiSecret API Secret
+ * @param aafLogin AAF Login
+ * @param aafPassword AAF Password
+ * @param consumerGroup Consumer Group
+ * @param consumerInstance Consumer Instance
+ * @param fetchTimeout Fetch Timeout
+ * @param fetchLimit Fetch Limit
+ */
+ public DmaapConsumerWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
+ String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit)
+ throws MalformedURLException {
+
+ this(new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, null,
+ aafLogin, aafPassword), aafLogin, aafPassword, servers.get(0));
+
+ }
+
+ DmaapConsumerWrapper(MRConsumerImpl consumer, String aafLogin, String aafPassword, String host) {
+ this.consumer = consumer;
+ this.consumer.setUsername(aafLogin);
+ this.consumer.setPassword(aafPassword);
+ this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
+ this.consumer.setHost(host + ":3904");
+
+ Properties props = new Properties();
+ props.setProperty("Protocol", "http");
+ this.consumer.setProps(props);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Iterable<String> fetch() throws MRApiException {
+ try {
+ return consumer.fetch();
+ } catch (Exception e) {
+ throw new MRApiException("Error during MR consumer Fetch ", e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close() {
+ this.consumer.close();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("DmaapConsumerWrapper [").append("consumer.getAuthDate()=").append(consumer.getAuthDate())
+ .append(", consumer.getAuthKey()=").append(consumer.getAuthKey()).append(", consumer.getHost()=")
+ .append(consumer.getHost()).append(", consumer.getProtocolFlag()=")
+ .append(consumer.getProtocolFlag()).append(", consumer.getUsername()=")
+ .append(consumer.getUsername()).append("]");
+ return builder.toString();
+ }
+ }