X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=datarouter-node%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdatarouter%2Fnode%2FNodeConfigManager.java;h=2e2dc5f066aaf25d0c15333d44eba795c9d8eef0;hb=8b695ac559bda388781e5ad0e4b02d1e82dc9cef;hp=7ecbaafdc302ea056f8187f4a4544d4acc8b89ec;hpb=46ef61c0fe477483be17dbf9af2ef3b1023da0d8;p=dmaap%2Fdatarouter.git diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java index 7ecbaafd..2e2dc5f0 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java @@ -24,37 +24,48 @@ package org.onap.dmaap.datarouter.node; -import java.net.*; -import java.util.*; -import java.io.*; - -import org.apache.log4j.Logger; -import org.onap.dmaap.datarouter.node.eelf.EelfMsgs; +import static java.lang.System.exit; +import static java.lang.System.getProperty; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.net.URL; +import java.nio.file.Files; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Objects; +import java.util.Properties; +import java.util.Timer; +import org.onap.dmaap.datarouter.node.config.NodeConfig; +import org.onap.dmaap.datarouter.node.config.ProvData; +import org.onap.dmaap.datarouter.node.delivery.DeliveryQueueHelper; +import org.onap.dmaap.datarouter.node.eelf.EelfMsgs; +import org.onap.dmaap.datarouter.node.utils.NodeTlsManager; +import org.onap.dmaap.datarouter.node.utils.NodeUtils; /** * Maintain the configuration of a Data Router node - *

- * The NodeConfigManager is the single point of contact for servlet, delivery, event logging, and log retention - * subsystems to access configuration information. (Log4J has its own configuration mechanism). - *

- * There are two basic sets of configuration data. The static local configuration data, stored in a local configuration - * file (created as part of installation by SWM), and the dynamic global configuration data fetched from the data router - * provisioning server. + * + *

The NodeConfigManager is the single point of contact for servlet, delivery, event logging, and log retention + * subsystems to access configuration information. + * + *

There are two basic sets of configuration data. The static local configuration data, stored in a local + * configuration file (created as part of installation by SWM), and the dynamic global configuration data fetched from + * the data router provisioning server. */ public class NodeConfigManager implements DeliveryQueueHelper { - private static EELFLogger eelflogger = EELFManager.getInstance() - .getLogger("org.onap.dmaap.datarouter.node.NodeConfigManager"); - private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.node.NodeConfigManager"); - private static NodeConfigManager base = new NodeConfigManager(); - - private Timer timer = new Timer("Node Configuration Timer", true); + private static final String NODE_CONFIG_MANAGER = "NodeConfigManager"; + private static final EELFLogger eelfLogger = EELFManager.getInstance().getLogger(NodeConfigManager.class); private long maxfailuretimer; private long initfailuretimer; + private long waitForFileProcessFailureTimer; private long expirationtimer; private double failurebackoff; private long fairtimelimit; @@ -62,128 +73,131 @@ public class NodeConfigManager implements DeliveryQueueHelper { private double fdpstart; private double fdpstop; private int deliverythreads; - private String provurl; + private final String provurl; private String provhost; - private IsFrom provcheck; - private int gfport; - private int svcport; - private int port; - private String spooldir; - private String logdir; - private long logretention; - private String redirfile; - private String kstype; - private String ksfile; - private String kspass; - private String kpass; - private String tstype; - private String tsfile; - private String tspass; + private final int intHttpPort; + private final int intHttpsPort; + private final int extHttpsPort; + private final boolean tlsEnabled; private String myname; - private RedirManager rdmgr; - private RateLimitedOperation pfetcher; - private NodeConfig config; - private File quiesce; - private PublishId pid; - private String nak; - private TaskList configtasks = new TaskList(); - private String eventlogurl; - private String eventlogprefix; - private String eventlogsuffix; + private final String nak; + private final File quiesce; + private final String spooldir; + private final String logdir; + private final long logretention; + private final String eventlogurl; + private final String eventlogprefix; + private final String eventlogsuffix; private String eventloginterval; private boolean followredirects; - - - /** - * Get the default node configuration manager - */ - public static NodeConfigManager getInstance() { - return (base); + private final TaskList configtasks = new TaskList(); + private final PublishId publishId; + private final IsFrom provcheck; + private final RedirManager rdmgr; + private final Timer timer = new Timer("Node Configuration Timer", true); + private final RateLimitedOperation pfetcher; + private static NodeConfigManager base; + private static NodeTlsManager nodeTlsManager; + private NodeConfig nodeConfig; + private static Properties drNodeProperties; + + public static Properties getDrNodeProperties() { + if (drNodeProperties == null) { + try (FileInputStream props = new FileInputStream(getProperty( + "org.onap.dmaap.datarouter.node.properties", + "/opt/app/datartr/etc/node.properties"))) { + drNodeProperties = new Properties(); + drNodeProperties.load(props); + } catch (IOException e) { + eelfLogger.error("Failed to load NODE properties: " + e.getMessage(), e); + exit(1); + } + } + return drNodeProperties; } - /** - * Initialize the configuration of a Data Router node + * Initialize the configuration of a Data Router node. */ private NodeConfigManager() { - Properties p = new Properties(); - try { - p.load(new FileInputStream(System - .getProperty("org.onap.dmaap.datarouter.node.properties", "/opt/app/datartr/etc/node.properties"))); - } catch (Exception e) { - - NodeUtils.setIpAndFqdnForEelf("NodeConfigManager"); - eelflogger.error(EelfMsgs.MESSAGE_PROPERTIES_LOAD_ERROR); - logger.error("NODE0301 Unable to load local configuration file " + System - .getProperty("org.onap.dmaap.datarouter.node.properties", "/opt/app/datartr/etc/node.properties"), e); - } - provurl = p.getProperty("ProvisioningURL", "https://feeds-drtr.web.att.com/internal/prov"); + provurl = getDrNodeProperties().getProperty("ProvisioningURL", "http://dmaap-dr-prov:8080/internal/prov"); try { provhost = (new URL(provurl)).getHost(); } catch (Exception e) { - NodeUtils.setIpAndFqdnForEelf("NodeConfigManager"); - eelflogger.error(EelfMsgs.MESSAGE_BAD_PROV_URL, provurl); - logger.error("NODE0302 Bad provisioning server URL " + provurl); - System.exit(1); + NodeUtils.setIpAndFqdnForEelf(NODE_CONFIG_MANAGER); + eelfLogger.error(EelfMsgs.MESSAGE_BAD_PROV_URL, e, provurl); + exit(1); } - logger.info("NODE0303 Provisioning server is " + provhost); - eventlogurl = p.getProperty("LogUploadURL", "https://feeds-drtr.web.att.com/internal/logs"); + eelfLogger.debug("NODE0303 Provisioning server is at: " + provhost); provcheck = new IsFrom(provhost); - gfport = Integer.parseInt(p.getProperty("IntHttpPort", "8080")); - svcport = Integer.parseInt(p.getProperty("IntHttpsPort", "8443")); - port = Integer.parseInt(p.getProperty("ExtHttpsPort", "443")); - long minpfinterval = Long.parseLong(p.getProperty("MinProvFetchInterval", "10000")); - long minrsinterval = Long.parseLong(p.getProperty("MinRedirSaveInterval", "10000")); - spooldir = p.getProperty("SpoolDir", "spool"); + tlsEnabled = Boolean.parseBoolean(getDrNodeProperties().getProperty("TlsEnabled", "true")); + if (isTlsEnabled()) { + try { + nodeTlsManager = new NodeTlsManager(getDrNodeProperties()); + myname = nodeTlsManager.getMyNameFromCertificate(); + if (myname == null) { + NodeUtils.setIpAndFqdnForEelf(NODE_CONFIG_MANAGER); + eelfLogger.error(EelfMsgs.MESSAGE_KEYSTORE_FETCH_ERROR, nodeTlsManager.getKeyStorefile()); + eelfLogger.error("NODE0309 Unable to fetch canonical name from keystore file {}", nodeTlsManager.getKeyStorefile()); + exit(1); + } + eelfLogger.debug("NODE0304 My certificate says my name is {}", myname); + } catch (Exception e) { + eelfLogger.error("NODE0314 Failed to set up TLS config. Exiting", e); + exit(1); + } + } + myname = "dmaap-dr-node"; + eventlogurl = getDrNodeProperties().getProperty("LogUploadURL", "https://feeds-drtr.web.att.com/internal/logs"); + intHttpPort = Integer.parseInt(getDrNodeProperties().getProperty("IntHttpPort", "80")); + intHttpsPort = Integer.parseInt(getDrNodeProperties().getProperty("IntHttpsPort", "443")); + extHttpsPort = Integer.parseInt(getDrNodeProperties().getProperty("ExtHttpsPort", "443")); + spooldir = getDrNodeProperties().getProperty("SpoolDir", "spool"); + File fdir = new File(spooldir + "/f"); fdir.mkdirs(); - for (File junk : fdir.listFiles()) { - if (junk.isFile()) { - junk.delete(); + for (File junk : Objects.requireNonNull(fdir.listFiles())) { + try { + Files.deleteIfExists(junk.toPath()); + } catch (IOException e) { + eelfLogger.error("NODE0313 Failed to clear junk files from " + fdir.getPath(), e); } } - logdir = p.getProperty("LogDir", "logs"); + logdir = getDrNodeProperties().getProperty("LogDir", "logs"); (new File(logdir)).mkdirs(); - logretention = Long.parseLong(p.getProperty("LogRetention", "30")) * 86400000L; + logretention = Long.parseLong(getDrNodeProperties().getProperty("LogRetention", "30")) * 86400000L; eventlogprefix = logdir + "/events"; eventlogsuffix = ".log"; - String redirfile = p.getProperty("RedirectionFile", "etc/redirections.dat"); - kstype = p.getProperty("KeyStoreType", "jks"); - ksfile = p.getProperty("KeyStoreFile", "etc/keystore"); - kspass = p.getProperty("KeyStorePassword", "changeme"); - kpass = p.getProperty("KeyPassword", "changeme"); - tstype = p.getProperty("TrustStoreType", "jks"); - tsfile = p.getProperty("TrustStoreFile"); - tspass = p.getProperty("TrustStorePassword", "changeme"); - if (tsfile != null && tsfile.length() > 0) { - System.setProperty("javax.net.ssl.trustStoreType", tstype); - System.setProperty("javax.net.ssl.trustStore", tsfile); - System.setProperty("javax.net.ssl.trustStorePassword", tspass); - } - nak = p.getProperty("NodeAuthKey", "Node123!"); - quiesce = new File(p.getProperty("QuiesceFile", "etc/SHUTDOWN")); - myname = NodeUtils.getCanonicalName(kstype, ksfile, kspass); - if (myname == null) { - NodeUtils.setIpAndFqdnForEelf("NodeConfigManager"); - eelflogger.error(EelfMsgs.MESSAGE_KEYSTORE_FETCH_ERROR, ksfile); - logger.error("NODE0309 Unable to fetch canonical name from keystore file " + ksfile); - System.exit(1); - } - logger.info("NODE0304 My certificate says my name is " + myname); - pid = new PublishId(myname); - rdmgr = new RedirManager(redirfile, minrsinterval, timer); - pfetcher = new RateLimitedOperation(minpfinterval, timer) { + String redirfile = getDrNodeProperties().getProperty("RedirectionFile", "etc/redirections.dat"); + publishId = new PublishId(myname); + nak = getDrNodeProperties().getProperty("NodeAuthKey", "Node123!"); + quiesce = new File(getDrNodeProperties().getProperty("QuiesceFile", "etc/SHUTDOWN")); + rdmgr = new RedirManager(redirfile, + Long.parseLong(getDrNodeProperties().getProperty("MinRedirSaveInterval", "10000")), timer); + pfetcher = new RateLimitedOperation( + Long.parseLong(getDrNodeProperties().getProperty("MinProvFetchInterval", "10000")), timer) { public void run() { - fetchconfig(); + fetchNodeConfigFromProv(); } }; - logger.info("NODE0305 Attempting to fetch configuration at " + provurl); + eelfLogger.debug("NODE0305 Attempting to fetch configuration at " + provurl); pfetcher.request(); } + /** + * Get the default node configuration manager. + */ + public static NodeConfigManager getInstance() { + if (base == null) { + base = new NodeConfigManager(); + } + return base; + } + private void localconfig() { followredirects = Boolean.parseBoolean(getProvParam("FOLLOW_REDIRECTS", "false")); - eventloginterval = getProvParam("LOGROLL_INTERVAL", "5m"); + eventloginterval = getProvParam("LOGROLL_INTERVAL", "30s"); initfailuretimer = 10000; + waitForFileProcessFailureTimer = 600000; maxfailuretimer = 3600000; expirationtimer = 86400000; failurebackoff = 2.0; @@ -195,38 +209,53 @@ public class NodeConfigManager implements DeliveryQueueHelper { try { initfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_INIT_RETRY_INTERVAL")) * 1000); } catch (Exception e) { + eelfLogger.trace("Error parsing DELIVERY_INIT_RETRY_INTERVAL", e); + } + try { + waitForFileProcessFailureTimer = (long) (Double.parseDouble(getProvParam("DELIVERY_FILE_PROCESS_INTERVAL")) + * 1000); + } catch (Exception e) { + eelfLogger.trace("Error parsing DELIVERY_FILE_PROCESS_INTERVAL", e); } try { maxfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_MAX_RETRY_INTERVAL")) * 1000); } catch (Exception e) { + eelfLogger.trace("Error parsing DELIVERY_MAX_RETRY_INTERVAL", e); } try { expirationtimer = (long) (Double.parseDouble(getProvParam("DELIVERY_MAX_AGE")) * 1000); } catch (Exception e) { + eelfLogger.trace("Error parsing DELIVERY_MAX_AGE", e); } try { failurebackoff = Double.parseDouble(getProvParam("DELIVERY_RETRY_RATIO")); } catch (Exception e) { + eelfLogger.trace("Error parsing DELIVERY_RETRY_RATIO", e); } try { deliverythreads = Integer.parseInt(getProvParam("DELIVERY_THREADS")); } catch (Exception e) { + eelfLogger.trace("Error parsing DELIVERY_THREADS", e); } try { fairfilelimit = Integer.parseInt(getProvParam("FAIR_FILE_LIMIT")); } catch (Exception e) { + eelfLogger.trace("Error parsing FAIR_FILE_LIMIT", e); } try { fairtimelimit = (long) (Double.parseDouble(getProvParam("FAIR_TIME_LIMIT")) * 1000); } catch (Exception e) { + eelfLogger.trace("Error parsing FAIR_TIME_LIMIT", e); } try { fdpstart = Double.parseDouble(getProvParam("FREE_DISK_RED_PERCENT")) / 100.0; } catch (Exception e) { + eelfLogger.trace("Error parsing FREE_DISK_RED_PERCENT", e); } try { fdpstop = Double.parseDouble(getProvParam("FREE_DISK_YELLOW_PERCENT")) / 100.0; } catch (Exception e) { + eelfLogger.trace("Error parsing FREE_DISK_YELLOW_PERCENT", e); } if (fdpstart < 0.01) { fdpstart = 0.01; @@ -242,54 +271,60 @@ public class NodeConfigManager implements DeliveryQueueHelper { } } - private void fetchconfig() { + private void fetchNodeConfigFromProv() { try { - System.out.println("provurl:: " + provurl); - Reader r = new InputStreamReader((new URL(provurl)).openStream()); - config = new NodeConfig(new ProvData(r), myname, spooldir, port, nak); + eelfLogger.debug("NodeConfigMan.fetchNodeConfigFromProv: provurl:: {}", provurl); + URL url = new URL(provurl); + Reader reader = new InputStreamReader(url.openStream()); + nodeConfig = new NodeConfig(new ProvData(reader), myname, spooldir, extHttpsPort, nak); localconfig(); configtasks.startRun(); - Runnable rr; - while ((rr = configtasks.next()) != null) { - try { - rr.run(); - } catch (Exception e) { - } - } + runTasks(); } catch (Exception e) { - NodeUtils.setIpAndFqdnForEelf("fetchconfigs"); - eelflogger.error(EelfMsgs.MESSAGE_CONF_FAILED, e.toString()); - logger.error("NODE0306 Configuration failed " + e.toString() + " - try again later", e); + NodeUtils.setIpAndFqdnForEelf("fetchNodeConfigFromProv"); + eelfLogger.error(EelfMsgs.MESSAGE_CONF_FAILED, e.toString()); + eelfLogger.error("NODE0306 Configuration failed {} - try again later", e); pfetcher.request(); } } + private void runTasks() { + Runnable rr; + while ((rr = configtasks.next()) != null) { + try { + rr.run(); + } catch (Exception e) { + eelfLogger.error("NODE0518 Exception fetchconfig: " + e); + } + } + } + /** * Process a gofetch request from a particular IP address. If the IP address is not an IP address we would go to to * fetch the provisioning data, ignore the request. If the data has been fetched very recently (default 10 * seconds), wait a while before fetching again. */ - public synchronized void gofetch(String remoteaddr) { - if (provcheck.isFrom(remoteaddr)) { - logger.info("NODE0307 Received configuration fetch request from provisioning server " + remoteaddr); + synchronized void gofetch(String remoteAddr) { + if (provcheck.isReachable(remoteAddr)) { + eelfLogger.debug("NODE0307 Received configuration fetch request from provisioning server " + remoteAddr); pfetcher.request(); } else { - logger.info("NODE0308 Received configuration fetch request from unexpected server " + remoteaddr); + eelfLogger.debug("NODE0308 Received configuration fetch request from unexpected server " + remoteAddr); } } /** - * Am I configured? + * Am I configured. */ public boolean isConfigured() { - return (config != null); + return nodeConfig != null; } /** - * Am I shut down? + * Am I shut down. */ - public boolean isShutdown() { - return (quiesce.exists()); + boolean isShutdown() { + return quiesce.exists(); } /** @@ -298,19 +333,19 @@ public class NodeConfigManager implements DeliveryQueueHelper { * @param routing Target string * @return array of targets */ - public Target[] parseRouting(String routing) { - return (config.parseRouting(routing)); + Target[] parseRouting(String routing) { + return nodeConfig.parseRouting(routing); } /** - * Given a set of credentials and an IP address, is this request from another node? + * Given a set of credentials and an IP address, is this request from another node. * * @param credentials Credentials offered by the supposed node * @param ip IP address the request came from * @return If the credentials and IP address are recognized, true, otherwise false. */ - public boolean isAnotherNode(String credentials, String ip) { - return (config.isAnotherNode(credentials, ip)); + boolean isAnotherNode(String credentials, String ip) { + return nodeConfig.isAnotherNode(credentials, ip); } /** @@ -321,8 +356,18 @@ public class NodeConfigManager implements DeliveryQueueHelper { * @param ip The requesting IP address * @return True if the IP and credentials are valid for the specified feed. */ - public String isPublishPermitted(String feedid, String credentials, String ip) { - return (config.isPublishPermitted(feedid, credentials, ip)); + String isPublishPermitted(String feedid, String credentials, String ip) { + return nodeConfig.isPublishPermitted(feedid, credentials, ip); + } + + /** + * Check whether delete file is allowed. + * + * @param subId The ID of the subscription being requested + * @return True if the delete file is permitted for the subscriber. + */ + boolean isDeletePermitted(String subId) { + return nodeConfig.isDeletePermitted(subId); } /** @@ -332,8 +377,8 @@ public class NodeConfigManager implements DeliveryQueueHelper { * @param credentials The offered credentials * @return Null if the credentials are invalid or the user if they are valid. */ - public String getAuthUser(String feedid, String credentials) { - return (config.getAuthUser(feedid, credentials)); + String getAuthUser(String feedid, String credentials) { + return nodeConfig.getAuthUser(feedid, credentials); } /** @@ -344,73 +389,73 @@ public class NodeConfigManager implements DeliveryQueueHelper { * @param ip The IP address of the publish endpoint * @return Null if the request should be accepted or the correct hostname if it should be sent to another node. */ - public String getIngressNode(String feedid, String user, String ip) { - return (config.getIngressNode(feedid, user, ip)); + String getIngressNode(String feedid, String user, String ip) { + return nodeConfig.getIngressNode(feedid, user, ip); } /** - * Get a provisioned configuration parameter (from the provisioning server configuration) + * Get a provisioned configuration parameter (from the provisioning server configuration). * * @param name The name of the parameter * @return The value of the parameter or null if it is not defined. */ - public String getProvParam(String name) { - return (config.getProvParam(name)); + private String getProvParam(String name) { + return nodeConfig.getProvParam(name); } /** - * Get a provisioned configuration parameter (from the provisioning server configuration) + * Get a provisioned configuration parameter (from the provisioning server configuration). * * @param name The name of the parameter - * @param deflt The value to use if the parameter is not defined + * @param defaultValue The value to use if the parameter is not defined * @return The value of the parameter or deflt if it is not defined. */ - public String getProvParam(String name, String deflt) { - name = config.getProvParam(name); + private String getProvParam(String name, String defaultValue) { + name = nodeConfig.getProvParam(name); if (name == null) { - name = deflt; + name = defaultValue; } - return (name); + return name; } /** - * Generate a publish ID + * Generate a publish ID. */ public String getPublishId() { - return (pid.next()); + return publishId.next(); } /** * Get all the outbound spooling destinations. This will include both subscriptions and nodes. */ public DestInfo[] getAllDests() { - return (config.getAllDests()); + return nodeConfig.getAllDests(); } /** - * Register a task to run whenever the configuration changes + * Register a task to run whenever the configuration changes. */ public void registerConfigTask(Runnable task) { configtasks.addTask(task); } /** - * Deregister a task to run whenever the configuration changes + * Deregister a task to run whenever the configuration changes. */ - public void deregisterConfigTask(Runnable task) { + void deregisterConfigTask(Runnable task) { configtasks.removeTask(task); } /** * Get the URL to deliver a message to. * - * @param destinfo The destination information + * @param destinationInfo The destination information * @param fileid The file ID * @return The URL to deliver to */ - public String getDestURL(DestInfo destinfo, String fileid) { - String subid = destinfo.getSubId(); - String purl = destinfo.getURL(); + public String getDestURL(DestInfo destinationInfo, String fileid) { + String subid = destinationInfo.getSubId(); + String purl = destinationInfo.getURL(); if (followredirects && subid != null) { purl = rdmgr.lookup(subid, purl); } @@ -418,19 +463,12 @@ public class NodeConfigManager implements DeliveryQueueHelper { } /** - * Is a destination redirected? - */ - public boolean isDestRedirected(DestInfo destinfo) { - return (followredirects && rdmgr.isRedirected(destinfo.getSubId())); - } - - /** - * Set up redirection on receipt of a 3XX from a target URL + * Set up redirection on receipt of a 3XX from a target URL. */ - public boolean handleRedirection(DestInfo destinfo, String redirto, String fileid) { + public boolean handleRedirection(DestInfo destinationInfo, String redirto, String fileid) { fileid = "/" + fileid; - String subid = destinfo.getSubId(); - String purl = destinfo.getURL(); + String subid = destinationInfo.getSubId(); + String purl = destinationInfo.getURL(); if (followredirects && subid != null && redirto.endsWith(fileid)) { redirto = redirto.substring(0, redirto.length() - fileid.length()); if (!redirto.equals(purl)) { @@ -442,217 +480,216 @@ public class NodeConfigManager implements DeliveryQueueHelper { } /** - * Handle unreachable target URL + * Handle unreachable target URL. */ - public void handleUnreachable(DestInfo destinfo) { - String subid = destinfo.getSubId(); + public void handleUnreachable(DestInfo destinationInfo) { + String subid = destinationInfo.getSubId(); if (followredirects && subid != null) { rdmgr.forget(subid); } } /** - * Get the timeout before retrying after an initial delivery failure + * Get the timeout before retrying after an initial delivery failure. */ public long getInitFailureTimer() { - return (initfailuretimer); + return initfailuretimer; + } + + /** + * Get the timeout before retrying after delivery and wait for file processing. + */ + public long getWaitForFileProcessFailureTimer() { + return waitForFileProcessFailureTimer; } /** - * Get the maximum timeout between delivery attempts + * Get the maximum timeout between delivery attempts. */ public long getMaxFailureTimer() { - return (maxfailuretimer); + return maxfailuretimer; } /** - * Get the ratio between consecutive delivery attempts + * Get the ratio between consecutive delivery attempts. */ public double getFailureBackoff() { - return (failurebackoff); + return failurebackoff; } /** - * Get the expiration timer for deliveries + * Get the expiration timer for deliveries. */ public long getExpirationTimer() { - return (expirationtimer); + return expirationtimer; } /** * Get the maximum number of file delivery attempts before checking if another queue has work to be performed. */ public int getFairFileLimit() { - return (fairfilelimit); + return fairfilelimit; } /** * Get the maximum amount of time spent delivering files before checking if another queue has work to be performed. */ public long getFairTimeLimit() { - return (fairtimelimit); + return fairtimelimit; } /** - * Get the targets for a feed + * Get the targets for a feed. * * @param feedid The feed ID * @return The targets this feed should be delivered to */ - public Target[] getTargets(String feedid) { - return (config.getTargets(feedid)); - } - - /** - * Get the spool directory for temporary files - */ - public String getSpoolDir() { - return (spooldir + "/f"); + Target[] getTargets(String feedid) { + return nodeConfig.getTargets(feedid); } /** - * Get the base directory for spool directories + * Get the spool directory for temporary files. */ - public String getSpoolBase() { - return (spooldir); + String getSpoolDir() { + return spooldir + "/f"; } /** - * Get the key store type + * Get the spool directory for a subscription. */ - public String getKSType() { - return (kstype); - } - - /** - * Get the key store file - */ - public String getKSFile() { - return (ksfile); - } - - /** - * Get the key store password - */ - public String getKSPass() { - return (kspass); + String getSpoolDir(String subid, String remoteaddr) { + if (provcheck.isFrom(remoteaddr)) { + String sdir = nodeConfig.getSpoolDir(subid); + if (sdir != null) { + eelfLogger.debug("NODE0310 Received subscription reset request for subscription " + subid + + " from provisioning server " + remoteaddr); + } else { + eelfLogger.debug("NODE0311 Received subscription reset request for unknown subscription " + subid + + " from provisioning server " + remoteaddr); + } + return sdir; + } else { + eelfLogger.debug("NODE0312 Received subscription reset request from unexpected server " + remoteaddr); + return null; + } } /** - * Get the key password + * Get the base directory for spool directories. */ - public String getKPass() { - return (kpass); + public String getSpoolBase() { + return spooldir; } /** - * Get the http port + * Get the http port. */ - public int getHttpPort() { - return (gfport); + int getHttpPort() { + return intHttpPort; } /** - * Get the https port + * Get the https port. */ - public int getHttpsPort() { - return (svcport); + int getHttpsPort() { + return intHttpsPort; } /** - * Get the externally visible https port + * Get the externally visible https port. */ - public int getExtHttpsPort() { - return (port); + int getExtHttpsPort() { + return extHttpsPort; } /** - * Get the external name of this machine + * Get the external name of this machine. */ public String getMyName() { - return (myname); + return myname; } /** - * Get the number of threads to use for delivery + * Get the number of threads to use for delivery. */ public int getDeliveryThreads() { - return (deliverythreads); + return deliverythreads; } /** - * Get the URL for uploading the event log data + * Get the URL for uploading the event log data. */ public String getEventLogUrl() { - return (eventlogurl); + return eventlogurl; } /** - * Get the prefix for the names of event log files + * Get the prefix for the names of event log files. */ public String getEventLogPrefix() { - return (eventlogprefix); + return eventlogprefix; } /** - * Get the suffix for the names of the event log files + * Get the suffix for the names of the event log files. */ public String getEventLogSuffix() { - return (eventlogsuffix); + return eventlogsuffix; } /** - * Get the interval between event log file rollovers + * Get the interval between event log file rollovers. */ public String getEventLogInterval() { - return (eventloginterval); + return eventloginterval; } /** - * Should I follow redirects from subscribers? + * Should I follow redirects from subscribers. */ public boolean isFollowRedirects() { - return (followredirects); + return followredirects; } /** - * Get the directory where the event and node log files live + * Get the directory where the event and node log files live. */ public String getLogDir() { - return (logdir); + return logdir; } /** - * How long do I keep log files (in milliseconds) + * How long do I keep log files (in milliseconds). */ public long getLogRetention() { - return (logretention); + return logretention; } /** - * Get the timer + * Get the timer. */ public Timer getTimer() { - return (timer); + return timer; } /** - * Get the feed ID for a subscription + * Get the feed ID for a subscription. * * @param subid The subscription ID * @return The feed ID */ public String getFeedId(String subid) { - return (config.getFeedId(subid)); + return nodeConfig.getFeedId(subid); } /** - * Get the authorization string this node uses + * Get the authorization string this node uses. * * @return The Authorization string for this node */ public String getMyAuth() { - return (config.getMyAuth()); + return nodeConfig.getMyAuth(); } /** @@ -660,7 +697,7 @@ public class NodeConfigManager implements DeliveryQueueHelper { * FREE_DISK_RED_PERCENT / 100.0. Default is 0.05. Limited by 0.01 <= FreeDiskStart <= 0.5. */ public double getFreeDiskStart() { - return (fdpstart); + return fdpstart; } /** @@ -668,26 +705,137 @@ public class NodeConfigManager implements DeliveryQueueHelper { * FREE_DISK_YELLOW_PERCENT / 100.0. Default is 0.2. Limited by FreeDiskStart <= FreeDiskStop <= 0.5. */ public double getFreeDiskStop() { - return (fdpstop); + return fdpstop; + } + + protected boolean isTlsEnabled() { + return tlsEnabled; + } + + public static NodeTlsManager getNodeTlsManager() { + return nodeTlsManager; } /** - * Get the spool directory for a subscription + * Generate publish IDs. */ - public String getSpoolDir(String subid, String remoteaddr) { - if (provcheck.isFrom(remoteaddr)) { - String sdir = config.getSpoolDir(subid); - if (sdir != null) { - logger.info("NODE0310 Received subscription reset request for subscription " + subid - + " from provisioning server " + remoteaddr); - } else { - logger.info("NODE0311 Received subscription reset request for unknown subscription " + subid - + " from provisioning server " + remoteaddr); + static class PublishId { + + private long nextuid; + private final String myname; + + /** + * Generate publish IDs for the specified name. + * + * @param myname Unique identifier for this publish ID generator (usually fqdn of server) + */ + public PublishId(String myname) { + this.myname = myname; + } + + /** + * Generate a Data Router Publish ID that uniquely identifies the particular invocation of the Publish API for log + * correlation purposes. + */ + public synchronized String next() { + long now = System.currentTimeMillis(); + if (now < nextuid) { + now = nextuid; + } + nextuid = now + 1; + return (now + "." + myname); + } + } + + /** + * Manage a list of tasks to be executed when an event occurs. This makes the following guarantees: + *

+ */ + static class TaskList { + + private Iterator runlist; + private final HashSet tasks = new HashSet<>(); + private HashSet togo; + private HashSet sofar; + private HashSet added; + private HashSet removed; + + /** + * Start executing the sequence of tasks. + */ + synchronized void startRun() { + sofar = new HashSet<>(); + added = new HashSet<>(); + removed = new HashSet<>(); + togo = new HashSet<>(tasks); + runlist = togo.iterator(); + } + + /** + * Get the next task to execute. + */ + synchronized Runnable next() { + while (runlist != null) { + if (runlist.hasNext()) { + Runnable task = runlist.next(); + if (addTaskToSoFar(task)) { + return task; + } + } + if (!added.isEmpty()) { + togo = added; + added = new HashSet<>(); + removed.clear(); + runlist = togo.iterator(); + continue; + } + togo = null; + added = null; + removed = null; + sofar = null; + runlist = null; } - return (sdir); - } else { - logger.info("NODE0312 Received subscription reset request from unexpected server " + remoteaddr); return (null); } + + /** + * Add a task to the list of tasks to run whenever the event occurs. + */ + synchronized void addTask(Runnable task) { + if (runlist != null) { + added.add(task); + removed.remove(task); + } + tasks.add(task); + } + + /** + * Remove a task from the list of tasks to run whenever the event occurs. + */ + synchronized void removeTask(Runnable task) { + if (runlist != null) { + removed.add(task); + added.remove(task); + } + tasks.remove(task); + } + + private boolean addTaskToSoFar(Runnable task) { + if (removed.contains(task)) { + return false; + } + if (sofar.contains(task)) { + return false; + } + sofar.add(task); + return true; + } } }