X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=dcae-analytics-cdap-plugins%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fdcae%2Fapod%2Fanalytics%2Fcdap%2Fplugins%2Fstreaming%2Fdmaap%2FDMaaPMRReceiver.java;fp=dcae-analytics-cdap-plugins%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fdcae%2Fapod%2Fanalytics%2Fcdap%2Fplugins%2Fstreaming%2Fdmaap%2FDMaaPMRReceiver.java;h=2d4b30e9e0fd732ed38930b466c470dd2009f103;hb=53ec499d199d2ed8c50ab28554451fb532833275;hp=1c291af0ff96525b9a79548925c504069a0a36df;hpb=dc81b9204ecb1fabffb5ff863f69120d3f443d4c;p=dcaegen2%2Fanalytics%2Ftca.git diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java b/dcae-analytics-cdap-plugins/src/main/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java index 1c291af..2d4b30e 100644 --- a/dcae-analytics-cdap-plugins/src/main/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java +++ b/dcae-analytics-cdap-plugins/src/main/java/org/onap/dcae/apod/analytics/cdap/plugins/streaming/dmaap/DMaaPMRReceiver.java @@ -22,6 +22,7 @@ package org.onap.dcae.apod.analytics.cdap.plugins.streaming.dmaap; import co.cask.cdap.api.data.format.StructuredRecord; import co.cask.cdap.api.metrics.Metrics; + import com.google.common.base.Optional; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.receiver.Receiver; @@ -63,29 +64,31 @@ public class DMaaPMRReceiver extends Receiver { public void onStart() { // create DMaaP MR Subscriber - final DMaaPMRSubscriber subscriber = - DMaaPMRFactory.create().createSubscriber(DMaaPSourceConfigMapper.map(pluginConfig)); - - // Start a new thread with indefinite loop until receiver is stopped - new Thread() { - @Override - public void run() { - while (!isStopped()) { - storeStructuredRecords(subscriber); - try { - final Integer pollingInterval = pluginConfig.getPollingInterval(); - LOG.debug("DMaaP MR Receiver sleeping for polling interval: {}", pollingInterval); - TimeUnit.MILLISECONDS.sleep(pollingInterval); - } catch (InterruptedException e) { - final String errorMessage = String.format( - "Interrupted Exception while DMaaP MR Receiver sleeping polling interval: %s", e); - Thread.currentThread().interrupt(); - throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); - } - } - } - }.start(); - + try(final DMaaPMRSubscriber subscriber = + DMaaPMRFactory.create().createSubscriber(DMaaPSourceConfigMapper.map(pluginConfig))){ + + // Start a new thread with indefinite loop until receiver is stopped + new Thread() { + @Override + public void run() { + while (!isStopped()) { + storeStructuredRecords(subscriber); + try { + final Integer pollingInterval = pluginConfig.getPollingInterval(); + LOG.debug("DMaaP MR Receiver sleeping for polling interval: {}", pollingInterval); + TimeUnit.MILLISECONDS.sleep(pollingInterval); + } catch (InterruptedException e) { + final String errorMessage = String.format( + "Interrupted Exception while DMaaP MR Receiver sleeping polling interval: %s", e); + Thread.currentThread().interrupt(); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + } + } + }.start(); + } catch (Exception e) { + LOG.error("Exception in DMaaPMRReceiver onStart",e); + } } @Override