update link to upper-constraints.txt
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / NodeConfigManager.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.node;
26
27 import static java.lang.System.exit;
28 import static java.lang.System.getProperty;
29
30 import com.att.eelf.configuration.EELFLogger;
31 import com.att.eelf.configuration.EELFManager;
32 import java.io.File;
33 import java.io.FileInputStream;
34 import java.io.IOException;
35 import java.io.InputStreamReader;
36 import java.io.Reader;
37 import java.net.URL;
38 import java.nio.file.Files;
39 import java.util.HashSet;
40 import java.util.Iterator;
41 import java.util.Objects;
42 import java.util.Properties;
43 import java.util.Timer;
44 import org.onap.dmaap.datarouter.node.config.NodeConfig;
45 import org.onap.dmaap.datarouter.node.config.ProvData;
46 import org.onap.dmaap.datarouter.node.delivery.DeliveryQueueHelper;
47 import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
48 import org.onap.dmaap.datarouter.node.utils.NodeTlsManager;
49 import org.onap.dmaap.datarouter.node.utils.NodeUtils;
50
51
52 /**
53  * Maintain the configuration of a Data Router node
54  *
55  * <p>The NodeConfigManager is the single point of contact for servlet, delivery, event logging, and log retention
56  * subsystems to access configuration information.
57  *
58  * <p>There are two basic sets of configuration data.  The static local configuration data, stored in a local
59  * configuration file (created as part of installation by SWM), and the dynamic global configuration data fetched from
60  * the data router provisioning server.
61  */
62 public class NodeConfigManager implements DeliveryQueueHelper {
63
64     private static final String NODE_CONFIG_MANAGER = "NodeConfigManager";
65     private static final EELFLogger eelfLogger = EELFManager.getInstance().getLogger(NodeConfigManager.class);
66     private long maxfailuretimer;
67     private long initfailuretimer;
68     private long waitForFileProcessFailureTimer;
69     private long expirationtimer;
70     private double failurebackoff;
71     private long fairtimelimit;
72     private int fairfilelimit;
73     private double fdpstart;
74     private double fdpstop;
75     private int deliverythreads;
76     private final String provurl;
77     private String provhost;
78     private final int intHttpPort;
79     private final int intHttpsPort;
80     private final int extHttpsPort;
81     private final boolean tlsEnabled;
82     private String myname;
83     private final String nak;
84     private final File quiesce;
85     private final String spooldir;
86     private final String logdir;
87     private final long logretention;
88     private final String eventlogurl;
89     private final String eventlogprefix;
90     private final String eventlogsuffix;
91     private String eventloginterval;
92     private boolean followredirects;
93     private final TaskList configtasks = new TaskList();
94     private final PublishId publishId;
95     private final IsFrom provcheck;
96     private final RedirManager rdmgr;
97     private final Timer timer = new Timer("Node Configuration Timer", true);
98     private final RateLimitedOperation pfetcher;
99     private static NodeConfigManager base;
100     private static NodeTlsManager nodeTlsManager;
101     private NodeConfig nodeConfig;
102     private static Properties drNodeProperties;
103
104     public static Properties getDrNodeProperties() {
105         if (drNodeProperties == null) {
106             try (FileInputStream props = new FileInputStream(getProperty(
107                 "org.onap.dmaap.datarouter.node.properties",
108                 "/opt/app/datartr/etc/node.properties"))) {
109                 drNodeProperties = new Properties();
110                 drNodeProperties.load(props);
111             } catch (IOException e) {
112                 eelfLogger.error("Failed to load NODE properties: " + e.getMessage(), e);
113                 exit(1);
114             }
115         }
116         return drNodeProperties;
117     }
118     /**
119      * Initialize the configuration of a Data Router node.
120      */
121     private NodeConfigManager() {
122         provurl = getDrNodeProperties().getProperty("ProvisioningURL", "http://dmaap-dr-prov:8080/internal/prov");
123         try {
124             provhost = (new URL(provurl)).getHost();
125         } catch (Exception e) {
126             NodeUtils.setIpAndFqdnForEelf(NODE_CONFIG_MANAGER);
127             eelfLogger.error(EelfMsgs.MESSAGE_BAD_PROV_URL, e, provurl);
128             exit(1);
129         }
130         eelfLogger.debug("NODE0303 Provisioning server is at: " + provhost);
131         provcheck = new IsFrom(provhost);
132         tlsEnabled = Boolean.parseBoolean(getDrNodeProperties().getProperty("TlsEnabled", "true"));
133         if (isTlsEnabled()) {
134             try {
135                 nodeTlsManager = new NodeTlsManager(getDrNodeProperties());
136                 myname = nodeTlsManager.getMyNameFromCertificate();
137                 if (myname == null) {
138                     NodeUtils.setIpAndFqdnForEelf(NODE_CONFIG_MANAGER);
139                     eelfLogger.error(EelfMsgs.MESSAGE_KEYSTORE_FETCH_ERROR, nodeTlsManager.getKeyStorefile());
140                     eelfLogger.error("NODE0309 Unable to fetch canonical name from keystore file {}", nodeTlsManager.getKeyStorefile());
141                     exit(1);
142                 }
143                 eelfLogger.debug("NODE0304 My certificate says my name is {}", myname);
144             } catch (Exception e) {
145                 eelfLogger.error("NODE0314 Failed to set up TLS config. Exiting", e);
146                 exit(1);
147             }
148         }
149         myname = "dmaap-dr-node";
150         eventlogurl = getDrNodeProperties().getProperty("LogUploadURL", "https://feeds-drtr.web.att.com/internal/logs");
151         intHttpPort = Integer.parseInt(getDrNodeProperties().getProperty("IntHttpPort", "80"));
152         intHttpsPort = Integer.parseInt(getDrNodeProperties().getProperty("IntHttpsPort", "443"));
153         extHttpsPort = Integer.parseInt(getDrNodeProperties().getProperty("ExtHttpsPort", "443"));
154         spooldir = getDrNodeProperties().getProperty("SpoolDir", "spool");
155
156         File fdir = new File(spooldir + "/f");
157         fdir.mkdirs();
158         for (File junk : Objects.requireNonNull(fdir.listFiles())) {
159             try {
160                 Files.deleteIfExists(junk.toPath());
161             } catch (IOException e) {
162                 eelfLogger.error("NODE0313 Failed to clear junk files from " + fdir.getPath(), e);
163             }
164         }
165         logdir = getDrNodeProperties().getProperty("LogDir", "logs");
166         (new File(logdir)).mkdirs();
167         logretention = Long.parseLong(getDrNodeProperties().getProperty("LogRetention", "30")) * 86400000L;
168         eventlogprefix = logdir + "/events";
169         eventlogsuffix = ".log";
170         String redirfile = getDrNodeProperties().getProperty("RedirectionFile", "etc/redirections.dat");
171         publishId = new PublishId(myname);
172         nak = getDrNodeProperties().getProperty("NodeAuthKey", "Node123!");
173         quiesce = new File(getDrNodeProperties().getProperty("QuiesceFile", "etc/SHUTDOWN"));
174         rdmgr = new RedirManager(redirfile,
175             Long.parseLong(getDrNodeProperties().getProperty("MinRedirSaveInterval", "10000")), timer);
176         pfetcher = new RateLimitedOperation(
177             Long.parseLong(getDrNodeProperties().getProperty("MinProvFetchInterval", "10000")), timer) {
178             public void run() {
179                 fetchNodeConfigFromProv();
180             }
181         };
182         eelfLogger.debug("NODE0305 Attempting to fetch configuration at " + provurl);
183         pfetcher.request();
184     }
185
186     /**
187      * Get the default node configuration manager.
188      */
189     public static NodeConfigManager getInstance() {
190         if (base == null) {
191             base = new NodeConfigManager();
192         }
193         return base;
194     }
195
196     private void localconfig() {
197         followredirects = Boolean.parseBoolean(getProvParam("FOLLOW_REDIRECTS", "false"));
198         eventloginterval = getProvParam("LOGROLL_INTERVAL", "30s");
199         initfailuretimer = 10000;
200         waitForFileProcessFailureTimer = 600000;
201         maxfailuretimer = 3600000;
202         expirationtimer = 86400000;
203         failurebackoff = 2.0;
204         deliverythreads = 40;
205         fairfilelimit = 100;
206         fairtimelimit = 60000;
207         fdpstart = 0.05;
208         fdpstop = 0.2;
209         try {
210             initfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_INIT_RETRY_INTERVAL")) * 1000);
211         } catch (Exception e) {
212             eelfLogger.trace("Error parsing DELIVERY_INIT_RETRY_INTERVAL", e);
213         }
214         try {
215             waitForFileProcessFailureTimer = (long) (Double.parseDouble(getProvParam("DELIVERY_FILE_PROCESS_INTERVAL"))
216                     * 1000);
217         } catch (Exception e) {
218             eelfLogger.trace("Error parsing DELIVERY_FILE_PROCESS_INTERVAL", e);
219         }
220         try {
221             maxfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_MAX_RETRY_INTERVAL")) * 1000);
222         } catch (Exception e) {
223             eelfLogger.trace("Error parsing DELIVERY_MAX_RETRY_INTERVAL", e);
224         }
225         try {
226             expirationtimer = (long) (Double.parseDouble(getProvParam("DELIVERY_MAX_AGE")) * 1000);
227         } catch (Exception e) {
228             eelfLogger.trace("Error parsing DELIVERY_MAX_AGE", e);
229         }
230         try {
231             failurebackoff = Double.parseDouble(getProvParam("DELIVERY_RETRY_RATIO"));
232         } catch (Exception e) {
233             eelfLogger.trace("Error parsing DELIVERY_RETRY_RATIO", e);
234         }
235         try {
236             deliverythreads = Integer.parseInt(getProvParam("DELIVERY_THREADS"));
237         } catch (Exception e) {
238             eelfLogger.trace("Error parsing DELIVERY_THREADS", e);
239         }
240         try {
241             fairfilelimit = Integer.parseInt(getProvParam("FAIR_FILE_LIMIT"));
242         } catch (Exception e) {
243             eelfLogger.trace("Error parsing FAIR_FILE_LIMIT", e);
244         }
245         try {
246             fairtimelimit = (long) (Double.parseDouble(getProvParam("FAIR_TIME_LIMIT")) * 1000);
247         } catch (Exception e) {
248             eelfLogger.trace("Error parsing FAIR_TIME_LIMIT", e);
249         }
250         try {
251             fdpstart = Double.parseDouble(getProvParam("FREE_DISK_RED_PERCENT")) / 100.0;
252         } catch (Exception e) {
253             eelfLogger.trace("Error parsing FREE_DISK_RED_PERCENT", e);
254         }
255         try {
256             fdpstop = Double.parseDouble(getProvParam("FREE_DISK_YELLOW_PERCENT")) / 100.0;
257         } catch (Exception e) {
258             eelfLogger.trace("Error parsing FREE_DISK_YELLOW_PERCENT", e);
259         }
260         if (fdpstart < 0.01) {
261             fdpstart = 0.01;
262         }
263         if (fdpstart > 0.5) {
264             fdpstart = 0.5;
265         }
266         if (fdpstop < fdpstart) {
267             fdpstop = fdpstart;
268         }
269         if (fdpstop > 0.5) {
270             fdpstop = 0.5;
271         }
272     }
273
274     private void fetchNodeConfigFromProv() {
275         try {
276             eelfLogger.debug("NodeConfigMan.fetchNodeConfigFromProv: provurl:: {}", provurl);
277             URL url = new URL(provurl);
278             Reader reader = new InputStreamReader(url.openStream());
279             nodeConfig = new NodeConfig(new ProvData(reader), myname, spooldir, extHttpsPort, nak);
280             localconfig();
281             configtasks.startRun();
282             runTasks();
283         } catch (Exception e) {
284             NodeUtils.setIpAndFqdnForEelf("fetchNodeConfigFromProv");
285             eelfLogger.error(EelfMsgs.MESSAGE_CONF_FAILED, e.toString());
286             eelfLogger.error("NODE0306 Configuration failed {} - try again later", e);
287             pfetcher.request();
288         }
289     }
290
291     private void runTasks() {
292         Runnable rr;
293         while ((rr = configtasks.next()) != null) {
294             try {
295                 rr.run();
296             } catch (Exception e) {
297                 eelfLogger.error("NODE0518 Exception fetchconfig: " + e);
298             }
299         }
300     }
301
302     /**
303      * Process a gofetch request from a particular IP address.  If the IP address is not an IP address we would go to to
304      * fetch the provisioning data, ignore the request.  If the data has been fetched very recently (default 10
305      * seconds), wait a while before fetching again.
306      */
307     synchronized void gofetch(String remoteAddr) {
308         if (provcheck.isReachable(remoteAddr)) {
309             eelfLogger.debug("NODE0307 Received configuration fetch request from provisioning server " + remoteAddr);
310             pfetcher.request();
311         } else {
312             eelfLogger.debug("NODE0308 Received configuration fetch request from unexpected server " + remoteAddr);
313         }
314     }
315
316     /**
317      * Am I configured.
318      */
319     public boolean isConfigured() {
320         return nodeConfig != null;
321     }
322
323     /**
324      * Am I shut down.
325      */
326     boolean isShutdown() {
327         return quiesce.exists();
328     }
329
330     /**
331      * Given a routing string, get the targets.
332      *
333      * @param routing Target string
334      * @return array of targets
335      */
336     Target[] parseRouting(String routing) {
337         return nodeConfig.parseRouting(routing);
338     }
339
340     /**
341      * Given a set of credentials and an IP address, is this request from another node.
342      *
343      * @param credentials Credentials offered by the supposed node
344      * @param ip IP address the request came from
345      * @return If the credentials and IP address are recognized, true, otherwise false.
346      */
347     boolean isAnotherNode(String credentials, String ip) {
348         return nodeConfig.isAnotherNode(credentials, ip);
349     }
350
351     /**
352      * Check whether publication is allowed.
353      *
354      * @param feedid The ID of the feed being requested
355      * @param credentials The offered credentials
356      * @param ip The requesting IP address
357      * @return True if the IP and credentials are valid for the specified feed.
358      */
359     String isPublishPermitted(String feedid, String credentials, String ip) {
360         return nodeConfig.isPublishPermitted(feedid, credentials, ip);
361     }
362
363     /**
364      * Check whether delete file is allowed.
365      *
366      * @param subId The ID of the subscription being requested
367      * @return True if the delete file is permitted for the subscriber.
368      */
369     boolean isDeletePermitted(String subId) {
370         return nodeConfig.isDeletePermitted(subId);
371     }
372
373     /**
374      * Check who the user is given the feed ID and the offered credentials.
375      *
376      * @param feedid The ID of the feed specified
377      * @param credentials The offered credentials
378      * @return Null if the credentials are invalid or the user if they are valid.
379      */
380     String getAuthUser(String feedid, String credentials) {
381         return nodeConfig.getAuthUser(feedid, credentials);
382     }
383
384     /**
385      * Check if the publish request should be sent to another node based on the feedid, user, and source IP address.
386      *
387      * @param feedid The ID of the feed specified
388      * @param user The publishing user
389      * @param ip The IP address of the publish endpoint
390      * @return Null if the request should be accepted or the correct hostname if it should be sent to another node.
391      */
392     String getIngressNode(String feedid, String user, String ip) {
393         return nodeConfig.getIngressNode(feedid, user, ip);
394     }
395
396     /**
397      * Get a provisioned configuration parameter (from the provisioning server configuration).
398      *
399      * @param name The name of the parameter
400      * @return The value of the parameter or null if it is not defined.
401      */
402     private String getProvParam(String name) {
403         return nodeConfig.getProvParam(name);
404     }
405
406     /**
407      * Get a provisioned configuration parameter (from the provisioning server configuration).
408      *
409      * @param name The name of the parameter
410      * @param defaultValue The value to use if the parameter is not defined
411      * @return The value of the parameter or deflt if it is not defined.
412      */
413     private String getProvParam(String name, String defaultValue) {
414         name = nodeConfig.getProvParam(name);
415         if (name == null) {
416             name = defaultValue;
417         }
418         return name;
419     }
420
421     /**
422      * Generate a publish ID.
423      */
424     public String getPublishId() {
425         return publishId.next();
426     }
427
428     /**
429      * Get all the outbound spooling destinations. This will include both subscriptions and nodes.
430      */
431     public DestInfo[] getAllDests() {
432         return nodeConfig.getAllDests();
433     }
434
435     /**
436      * Register a task to run whenever the configuration changes.
437      */
438     public void registerConfigTask(Runnable task) {
439         configtasks.addTask(task);
440     }
441
442     /**
443      * Deregister a task to run whenever the configuration changes.
444      */
445     void deregisterConfigTask(Runnable task) {
446         configtasks.removeTask(task);
447     }
448
449     /**
450      * Get the URL to deliver a message to.
451      *
452      * @param destinationInfo The destination information
453      * @param fileid The file ID
454      * @return The URL to deliver to
455      */
456     public String getDestURL(DestInfo destinationInfo, String fileid) {
457         String subid = destinationInfo.getSubId();
458         String purl = destinationInfo.getURL();
459         if (followredirects && subid != null) {
460             purl = rdmgr.lookup(subid, purl);
461         }
462         return (purl + "/" + fileid);
463     }
464
465     /**
466      * Set up redirection on receipt of a 3XX from a target URL.
467      */
468     public boolean handleRedirection(DestInfo destinationInfo, String redirto, String fileid) {
469         fileid = "/" + fileid;
470         String subid = destinationInfo.getSubId();
471         String purl = destinationInfo.getURL();
472         if (followredirects && subid != null && redirto.endsWith(fileid)) {
473             redirto = redirto.substring(0, redirto.length() - fileid.length());
474             if (!redirto.equals(purl)) {
475                 rdmgr.redirect(subid, purl, redirto);
476                 return (true);
477             }
478         }
479         return (false);
480     }
481
482     /**
483      * Handle unreachable target URL.
484      */
485     public void handleUnreachable(DestInfo destinationInfo) {
486         String subid = destinationInfo.getSubId();
487         if (followredirects && subid != null) {
488             rdmgr.forget(subid);
489         }
490     }
491
492     /**
493      * Get the timeout before retrying after an initial delivery failure.
494      */
495     public long getInitFailureTimer() {
496         return initfailuretimer;
497     }
498
499     /**
500      * Get the timeout before retrying after delivery and wait for file processing.
501      */
502     public long getWaitForFileProcessFailureTimer() {
503         return waitForFileProcessFailureTimer;
504     }
505
506     /**
507      * Get the maximum timeout between delivery attempts.
508      */
509     public long getMaxFailureTimer() {
510         return maxfailuretimer;
511     }
512
513     /**
514      * Get the ratio between consecutive delivery attempts.
515      */
516     public double getFailureBackoff() {
517         return failurebackoff;
518     }
519
520     /**
521      * Get the expiration timer for deliveries.
522      */
523     public long getExpirationTimer() {
524         return expirationtimer;
525     }
526
527     /**
528      * Get the maximum number of file delivery attempts before checking if another queue has work to be performed.
529      */
530     public int getFairFileLimit() {
531         return fairfilelimit;
532     }
533
534     /**
535      * Get the maximum amount of time spent delivering files before checking if another queue has work to be performed.
536      */
537     public long getFairTimeLimit() {
538         return fairtimelimit;
539     }
540
541     /**
542      * Get the targets for a feed.
543      *
544      * @param feedid The feed ID
545      * @return The targets this feed should be delivered to
546      */
547     Target[] getTargets(String feedid) {
548         return nodeConfig.getTargets(feedid);
549     }
550
551     /**
552      * Get the spool directory for temporary files.
553      */
554     String getSpoolDir() {
555         return spooldir + "/f";
556     }
557
558     /**
559      * Get the spool directory for a subscription.
560      */
561     String getSpoolDir(String subid, String remoteaddr) {
562         if (provcheck.isFrom(remoteaddr)) {
563             String sdir = nodeConfig.getSpoolDir(subid);
564             if (sdir != null) {
565                 eelfLogger.debug("NODE0310 Received subscription reset request for subscription " + subid
566                         + " from provisioning server " + remoteaddr);
567             } else {
568                 eelfLogger.debug("NODE0311 Received subscription reset request for unknown subscription " + subid
569                         + " from provisioning server " + remoteaddr);
570             }
571             return sdir;
572         } else {
573             eelfLogger.debug("NODE0312 Received subscription reset request from unexpected server " + remoteaddr);
574             return null;
575         }
576     }
577
578     /**
579      * Get the base directory for spool directories.
580      */
581     public String getSpoolBase() {
582         return spooldir;
583     }
584
585     /**
586      * Get the http port.
587      */
588     int getHttpPort() {
589         return intHttpPort;
590     }
591
592     /**
593      * Get the https port.
594      */
595     int getHttpsPort() {
596         return intHttpsPort;
597     }
598
599     /**
600      * Get the externally visible https port.
601      */
602     int getExtHttpsPort() {
603         return extHttpsPort;
604     }
605
606     /**
607      * Get the external name of this machine.
608      */
609     public String getMyName() {
610         return myname;
611     }
612
613     /**
614      * Get the number of threads to use for delivery.
615      */
616     public int getDeliveryThreads() {
617         return deliverythreads;
618     }
619
620     /**
621      * Get the URL for uploading the event log data.
622      */
623     public String getEventLogUrl() {
624         return eventlogurl;
625     }
626
627     /**
628      * Get the prefix for the names of event log files.
629      */
630     public String getEventLogPrefix() {
631         return eventlogprefix;
632     }
633
634     /**
635      * Get the suffix for the names of the event log files.
636      */
637     public String getEventLogSuffix() {
638         return eventlogsuffix;
639     }
640
641     /**
642      * Get the interval between event log file rollovers.
643      */
644     public String getEventLogInterval() {
645         return eventloginterval;
646     }
647
648     /**
649      * Should I follow redirects from subscribers.
650      */
651     public boolean isFollowRedirects() {
652         return followredirects;
653     }
654
655     /**
656      * Get the directory where the event and node log files live.
657      */
658     public String getLogDir() {
659         return logdir;
660     }
661
662     /**
663      * How long do I keep log files (in milliseconds).
664      */
665     public long getLogRetention() {
666         return logretention;
667     }
668
669     /**
670      * Get the timer.
671      */
672     public Timer getTimer() {
673         return timer;
674     }
675
676     /**
677      * Get the feed ID for a subscription.
678      *
679      * @param subid The subscription ID
680      * @return The feed ID
681      */
682     public String getFeedId(String subid) {
683         return nodeConfig.getFeedId(subid);
684     }
685
686     /**
687      * Get the authorization string this node uses.
688      *
689      * @return The Authorization string for this node
690      */
691     public String getMyAuth() {
692         return nodeConfig.getMyAuth();
693     }
694
695     /**
696      * Get the fraction of free spool disk space where we start throwing away undelivered files.  This is
697      * FREE_DISK_RED_PERCENT / 100.0.  Default is 0.05.  Limited by 0.01 <= FreeDiskStart <= 0.5.
698      */
699     public double getFreeDiskStart() {
700         return fdpstart;
701     }
702
703     /**
704      * Get the fraction of free spool disk space where we stop throwing away undelivered files.  This is
705      * FREE_DISK_YELLOW_PERCENT / 100.0.  Default is 0.2.  Limited by FreeDiskStart <= FreeDiskStop <= 0.5.
706      */
707     public double getFreeDiskStop() {
708         return fdpstop;
709     }
710
711     protected boolean isTlsEnabled() {
712         return tlsEnabled;
713     }
714
715     public static NodeTlsManager getNodeTlsManager() {
716         return nodeTlsManager;
717     }
718
719     /**
720      * Generate publish IDs.
721      */
722     static class PublishId {
723
724         private long nextuid;
725         private final String myname;
726
727         /**
728          * Generate publish IDs for the specified name.
729          *
730          * @param myname Unique identifier for this publish ID generator (usually fqdn of server)
731          */
732         public PublishId(String myname) {
733             this.myname = myname;
734         }
735
736         /**
737          * Generate a Data Router Publish ID that uniquely identifies the particular invocation of the Publish API for log
738          * correlation purposes.
739          */
740         public synchronized String next() {
741             long now = System.currentTimeMillis();
742             if (now < nextuid) {
743                 now = nextuid;
744             }
745             nextuid = now + 1;
746             return (now + "." + myname);
747         }
748     }
749
750     /**
751      * Manage a list of tasks to be executed when an event occurs. This makes the following guarantees:
752      * <ul>
753      * <li>Tasks can be safely added and removed in the middle of a run.</li>
754      * <li>No task will be returned more than once during a run.</li>
755      * <li>No task will be returned when it is not, at that moment, in the list of tasks.</li>
756      * <li>At the moment when next() returns null, all tasks on the list have been returned during the run.</li>
757      * <li>Initially and once next() returns null during a run, next() will continue to return null until startRun() is
758      * called.
759      * </ul>
760      */
761     static class TaskList {
762
763         private Iterator<Runnable> runlist;
764         private final HashSet<Runnable> tasks = new HashSet<>();
765         private HashSet<Runnable> togo;
766         private HashSet<Runnable> sofar;
767         private HashSet<Runnable> added;
768         private HashSet<Runnable> removed;
769
770         /**
771          * Start executing the sequence of tasks.
772          */
773         synchronized void startRun() {
774             sofar = new HashSet<>();
775             added = new HashSet<>();
776             removed = new HashSet<>();
777             togo = new HashSet<>(tasks);
778             runlist = togo.iterator();
779         }
780
781         /**
782          * Get the next task to execute.
783          */
784         synchronized Runnable next() {
785             while (runlist != null) {
786                 if (runlist.hasNext()) {
787                     Runnable task = runlist.next();
788                     if (addTaskToSoFar(task)) {
789                         return task;
790                     }
791                 }
792                 if (!added.isEmpty()) {
793                     togo = added;
794                     added = new HashSet<>();
795                     removed.clear();
796                     runlist = togo.iterator();
797                     continue;
798                 }
799                 togo = null;
800                 added = null;
801                 removed = null;
802                 sofar = null;
803                 runlist = null;
804             }
805             return (null);
806         }
807
808         /**
809          * Add a task to the list of tasks to run whenever the event occurs.
810          */
811         synchronized void addTask(Runnable task) {
812             if (runlist != null) {
813                 added.add(task);
814                 removed.remove(task);
815             }
816             tasks.add(task);
817         }
818
819         /**
820          * Remove a task from the list of tasks to run whenever the event occurs.
821          */
822         synchronized void removeTask(Runnable task) {
823             if (runlist != null) {
824                 removed.add(task);
825                 added.remove(task);
826             }
827             tasks.remove(task);
828         }
829
830         private boolean addTaskToSoFar(Runnable task) {
831             if (removed.contains(task)) {
832                 return false;
833             }
834             if (sofar.contains(task)) {
835                 return false;
836             }
837             sofar.add(task);
838             return true;
839         }
840     }
841 }