- LOG.warn(String.format("Sleeping for %dms to compensate for connection failure", waitMs));
- Thread.sleep(waitMs);
- } catch (InterruptedException e2) {
- LOG.warn(String.format("Failed to wait for %dms after bad fetch", waitMs));
+ for (String s : client.fetch(waitMs, limit)) {
+ out.add(s);
+ incrementReceivedMessage();
+ }
+ LOG.debug(String.format("Got %d records from %s", out.size(), this.toString()));
+ } catch (Exception e) {
+ // Connection exception
+ LOG.error(String.format("Dmaap Connection Issue Detected. %s", e.getMessage()), e);
+ try {
+ LOG.warn(String.format("Sleeping for %dms to compensate for connection failure", waitMs));
+ Thread.sleep(waitMs);
+ } catch (InterruptedException e2) {
+ LOG.warn(String.format("Failed to wait for %dms after bad fetch", waitMs));
+ Thread.currentThread().interrupt();
+ }