}
- @Override
- public void run() {
- if (isReady) {
- isRunning = true;
- while (isRunning) {
- try {
- Response response = getMessages.invoke();
- Log.info("GET " + uri + " returned http status " + response.getStatus());
- String entity = response.readEntity(String.class);
- if (entity.contains("{")) {
- // Get rid of opening ["
- entity = entity.substring(2);
- // Get rid of closing "]
- entity = entity.substring(0, entity.length() - 2);
- // This replacement effectively un-escapes the JSON
- for (String message : entity.split("\",\"")) {
- try {
- processMsg(message.replace("\\\"", "\""));
- } catch (InvalidMessageException e) {
- Log.error("Message could not be processed", e);
- }
- }
- } else {
- Log.info("Entity doesn't appear to contain JSON elements");
- }
- } catch (Exception e) {
- Log.error("GET " + uri + " failed.", e);
- } finally {
- Log.info("Pausing " + fetchPause + " milliseconds before fetching from " + uri + " again.");
- try {
- Thread.sleep(fetchPause);
- } catch (InterruptedException e) {
- Log.error("Could not sleep thread", e);
- Thread.currentThread().interrupt();
- }
- }
- }
- }
- }
+ @Override
+ public void run() {
+ if (isReady) {
+ isRunning = true;
+ while (isRunning) {
+ try {
+ Response response = getMessages.invoke();
+ Log.info("GET " + uri + " returned http status " + response.getStatus());
+ String entity = response.readEntity(String.class);
+ if (response.getStatus() < 300) {
+ if (entity.contains("{")) {
+ // Get rid of opening ["
+ entity = entity.substring(2);
+ // Get rid of closing "]
+ entity = entity.substring(0, entity.length() - 2);
+ // This replacement effectively un-escapes the JSON
+ for (String message : entity.split("\",\"")) {
+ try {
+ processMsg(message.replace("\\\"", "\""));
+ } catch (InvalidMessageException e) {
+ Log.error("Message could not be processed", e);
+ }
+ }
+ } else {
+ if (entity.length() < 1) {
+ Log.info("GET was successful, but the server returned an empty message body.");
+ } else {
+ Log.info(
+ "GET was successful, but entity is not valid JSON. Message body will be logged, but not processed");
+ Log.info(entity);
+ }
+ }
+ } else {
+ Log.info("GET failed, message body will be logged, but not processed.");
+ Log.info(entity);
+ }
+ } catch (Exception e) {
+ Log.error("GET " + uri + " failed.", e);
+ } finally {
+ Log.info("Pausing " + fetchPause + " milliseconds before fetching from " + uri + " again.");
+ try {
+ Thread.sleep(fetchPause);
+ } catch (InterruptedException e) {
+ Log.error("Could not sleep thread", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+ }
@Override
public void init(Properties baseProperties, String consumerPropertiesPath) {