- final DMaaPMRSubscriber subscriber =
- DMaaPMRFactory.create().createSubscriber(DMaaPSourceConfigMapper.map(pluginConfig));
-
- // Start a new thread with indefinite loop until receiver is stopped
- new Thread() {
- @Override
- public void run() {
- while (!isStopped()) {
- storeStructuredRecords(subscriber);
- try {
- final Integer pollingInterval = pluginConfig.getPollingInterval();
- LOG.debug("DMaaP MR Receiver sleeping for polling interval: {}", pollingInterval);
- TimeUnit.MILLISECONDS.sleep(pollingInterval);
- } catch (InterruptedException e) {
- final String errorMessage = String.format(
- "Interrupted Exception while DMaaP MR Receiver sleeping polling interval: %s", e);
- Thread.currentThread().interrupt();
- throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
- }
- }
- }
- }.start();
-
+ try(final DMaaPMRSubscriber subscriber =
+ DMaaPMRFactory.create().createSubscriber(DMaaPSourceConfigMapper.map(pluginConfig))){
+
+ // Start a new thread with indefinite loop until receiver is stopped
+ new Thread() {
+ @Override
+ public void run() {
+ while (!isStopped()) {
+ storeStructuredRecords(subscriber);
+ try {
+ final Integer pollingInterval = pluginConfig.getPollingInterval();
+ LOG.debug("DMaaP MR Receiver sleeping for polling interval: {}", pollingInterval);
+ TimeUnit.MILLISECONDS.sleep(pollingInterval);
+ } catch (InterruptedException e) {
+ final String errorMessage = String.format(
+ "Interrupted Exception while DMaaP MR Receiver sleeping polling interval: %s", e);
+ Thread.currentThread().interrupt();
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+ }
+ }
+ }.start();
+ } catch (Exception e) {
+ LOG.error("Exception in DMaaPMRReceiver onStart",e);
+ }