X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=dmaap-listener%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fccsdk%2Fsli%2Fnorthbound%2Fdmaapclient%2FSdncDmaapConsumer.java;h=3fc769d35365c90741c43a0ea758e9f54e6b696d;hb=2a3c5a197236f7ba01b39c8bb51a6b977c964eeb;hp=ed71337a357f685b21e0a10930f4d96f5aab9487;hpb=0d27d85eb387bd56b1f8dab34a9381bcc20db7d5;p=ccsdk%2Fsli%2Fnorthbound.git diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java index ed71337a..3fc769d3 100644 --- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java +++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java @@ -2,8 +2,8 @@ * ============LICENSE_START======================================================= * openECOMP : SDN-C * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights - * reserved. + * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights + * reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,137 +21,14 @@ package org.onap.ccsdk.sli.northbound.dmaapclient; -import com.att.nsa.mr.client.MRClientFactory; -import com.att.nsa.mr.client.MRConsumer; -import com.att.nsa.mr.client.response.MRConsumerResponse; -import java.io.File; -import java.io.FileInputStream; import java.util.Properties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public abstract class SdncDmaapConsumer implements Runnable { +public abstract interface SdncDmaapConsumer extends Runnable { + public abstract void init(Properties baseProperties, String consumerPropertiesPath); - private static final Logger LOG = LoggerFactory - .getLogger(SdncDmaapConsumer.class); + public abstract void processMsg(String msg) throws InvalidMessageException; - private String propertiesPath = ""; - private Properties properties = null; - private MRConsumer consumer = null; - private MRConsumerResponse consumerResponse = null; - private boolean running = false; - private boolean ready = false; - private int fetchPause = 5000; // Default pause between fetch - 5 seconds - private int timeout = 15000; // Default timeout - 15 seconds + public abstract boolean isReady(); - public SdncDmaapConsumer() { - - } - - public SdncDmaapConsumer(Properties properties, String propertiesPath) { - init(properties, propertiesPath); - } - - public boolean isReady() { - return ready; - } - - public boolean isRunning() { - return running; - } - - public String getProperty(String name) { - return properties.getProperty(name, ""); - } - - public void init(Properties properties, String propertiesPath) { - - this.propertiesPath = propertiesPath; - - try (FileInputStream in = new FileInputStream(new File(propertiesPath))) { - - LOG.debug("propertiesPath: " + propertiesPath); - this.properties = (Properties) properties.clone(); - this.properties.load(in); - - - String timeoutStr = this.properties.getProperty("timeout"); - LOG.debug("timeoutStr: " + timeoutStr); - - if ((timeoutStr != null) && (timeoutStr.length() > 0)) { - timeout = parseTimeOutValue(timeoutStr); - } - - String fetchPauseStr = this.properties.getProperty("fetchPause"); - LOG.debug("fetchPause(Str): " + fetchPauseStr); - if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) { - fetchPause = parseFetchPause(fetchPauseStr); - } - LOG.debug("fetchPause: " + fetchPause); - - - this.consumer = MRClientFactory.createConsumer(propertiesPath); - ready = true; - } catch (Exception e) { - LOG.error("Error initializing DMaaP consumer from file " + propertiesPath, e); - } - } - - private int parseTimeOutValue(String timeoutStr) { - try { - return Integer.parseInt(timeoutStr); - } catch (NumberFormatException e) { - LOG.error("Non-numeric value specified for timeout (" + timeoutStr + ")"); - } - return timeout; - } - - private int parseFetchPause(String fetchPauseStr) { - try { - return Integer.parseInt(fetchPauseStr); - } catch (NumberFormatException e) { - LOG.error("Non-numeric value specified for fetchPause (" + fetchPauseStr + ")"); - } - return fetchPause; - } - - - @Override - public void run() { - if (ready) { - - running = true; - - while (running) { - - try { - boolean noData = true; - consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1); - for (String msg : consumerResponse.getActualMessages()) { - noData = false; - LOG.info("Received message from DMaaP:\n" + msg); - processMsg(msg); - } - - if (noData) { - pauseThread(); - } - } catch (Exception e) { - LOG.error("Caught exception reading from DMaaP", e); - running = false; - } - } - } - } - - private void pauseThread() throws InterruptedException { - if (fetchPause > 0) { - LOG.info(String.format("No data received from fetch. Pausing %d ms before retry", fetchPause)); - Thread.sleep(fetchPause); - } else { - LOG.info("No data received from fetch. No fetch pause specified - retrying immediately"); - } - } - - abstract public void processMsg(String msg) throws InvalidMessageException; + public abstract boolean isRunning(); }