--- /dev/null
+/*******************************************************************************\r
+ * ============LICENSE_START==================================================\r
+ * * org.onap.dmaap\r
+ * * ===========================================================================\r
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * * ===========================================================================\r
+ * * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * * you may not use this file except in compliance with the License.\r
+ * * You may obtain a copy of the License at\r
+ * * \r
+ * * http://www.apache.org/licenses/LICENSE-2.0\r
+ * * \r
+ * * Unless required by applicable law or agreed to in writing, software\r
+ * * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * * See the License for the specific language governing permissions and\r
+ * * limitations under the License.\r
+ * * ============LICENSE_END====================================================\r
+ * *\r
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
+ * *\r
+ ******************************************************************************/\r
+package org.onap.dmaap.datarouter.node;\r
+\r
+import java.util.*;\r
+import java.util.regex.*;\r
+import java.io.*;\r
+import java.nio.file.*;\r
+import java.text.*;\r
+\r
+/**\r
+ * Cleanup of old log files.\r
+ * <p>\r
+ * Periodically scan the log directory for log files that are older than\r
+ * the log file retention interval, and delete them. In a future release,\r
+ * This class will also be responsible for uploading events logs to the\r
+ * log server to support the log query APIs.\r
+ */\r
+\r
+public class LogManager extends TimerTask {\r
+ private NodeConfigManager config;\r
+ private Matcher isnodelog;\r
+ private Matcher iseventlog;\r
+ private Uploader worker;\r
+ private String uploaddir;\r
+ private String logdir;\r
+ private class Uploader extends Thread implements DeliveryQueueHelper {\r
+ public long getInitFailureTimer() { return(10000L); }\r
+ public double getFailureBackoff() { return(2.0); }\r
+ public long getMaxFailureTimer() { return(150000L); }\r
+ public long getExpirationTimer() { return(604800000L); }\r
+ public int getFairFileLimit() { return(10000); }\r
+ public long getFairTimeLimit() { return(86400000); }\r
+ public String getDestURL(DestInfo dest, String fileid) {\r
+ return(config.getEventLogUrl());\r
+ }\r
+ public void handleUnreachable(DestInfo dest) {}\r
+ public boolean handleRedirection(DestInfo dest, String location, String fileid) { return(false); }\r
+ public boolean isFollowRedirects() { return(false); }\r
+ public String getFeedId(String subid) { return(null); }\r
+ private DeliveryQueue dq;\r
+ public Uploader() {\r
+ dq = new DeliveryQueue(this, new DestInfo("LogUpload", uploaddir, null, null, null, config.getMyName(), config.getMyAuth(), false, false));\r
+ setDaemon(true);\r
+ setName("Log Uploader");\r
+ start();\r
+ }\r
+ private synchronized void snooze() {\r
+ try {\r
+ wait(10000);\r
+ } catch (Exception e) {\r
+ }\r
+ }\r
+ private synchronized void poke() {\r
+ notify();\r
+ }\r
+ public void run() {\r
+ while (true) {\r
+ scan();\r
+ dq.run();\r
+ snooze();\r
+ }\r
+ }\r
+ private void scan() {\r
+ long threshold = System.currentTimeMillis() - config.getLogRetention();\r
+ File dir = new File(logdir);\r
+ String[] fns = dir.list();\r
+ Arrays.sort(fns);\r
+ String lastqueued = "events-000000000000.log";\r
+ String curlog = StatusLog.getCurLogFile();\r
+ curlog = curlog.substring(curlog.lastIndexOf('/') + 1);\r
+ try {\r
+ Writer w = new FileWriter(uploaddir + "/.meta");\r
+ w.write("POST\tlogdata\nContent-Type\ttext/plain\n");\r
+ w.close();\r
+ BufferedReader br = new BufferedReader(new FileReader(uploaddir + "/.lastqueued"));\r
+ lastqueued = br.readLine();\r
+ br.close();\r
+ } catch (Exception e) {\r
+ }\r
+ for (String fn: fns) {\r
+ if (!isnodelog.reset(fn).matches()) {\r
+ if (!iseventlog.reset(fn).matches()) {\r
+ continue;\r
+ }\r
+ if (lastqueued.compareTo(fn) < 0 && curlog.compareTo(fn) > 0) {\r
+ lastqueued = fn;\r
+ try {\r
+ String pid = config.getPublishId();\r
+ Files.createLink(Paths.get(uploaddir + "/" + pid), Paths.get(logdir + "/" + fn));\r
+ Files.createLink(Paths.get(uploaddir + "/" + pid + ".M"), Paths.get(uploaddir + "/.meta"));\r
+ } catch (Exception e) {\r
+ }\r
+ }\r
+ }\r
+ File f = new File(dir, fn);\r
+ if (f.lastModified() < threshold) {\r
+ f.delete();\r
+ }\r
+ }\r
+ try {\r
+ (new File(uploaddir + "/.meta")).delete();\r
+ Writer w = new FileWriter(uploaddir + "/.lastqueued");\r
+ w.write(lastqueued + "\n");\r
+ w.close();\r
+ } catch (Exception e) {\r
+ }\r
+ }\r
+ }\r
+ /**\r
+ * Construct a log manager\r
+ * <p>\r
+ * The log manager will check for expired log files every 5 minutes\r
+ * at 20 seconds after the 5 minute boundary. (Actually, the\r
+ * interval is the event log rollover interval, which\r
+ * defaults to 5 minutes).\r
+ */\r
+ public LogManager(NodeConfigManager config) {\r
+ this.config = config;\r
+ try {\r
+ isnodelog = Pattern.compile("node\\.log\\.\\d{8}").matcher("");\r
+ iseventlog = Pattern.compile("events-\\d{12}\\.log").matcher("");\r
+ } catch (Exception e) {}\r
+ logdir = config.getLogDir();\r
+ uploaddir = logdir + "/.spool";\r
+ (new File(uploaddir)).mkdirs();\r
+ long now = System.currentTimeMillis();\r
+ long intvl = StatusLog.parseInterval(config.getEventLogInterval(), 300000);\r
+ long when = now - now % intvl + intvl + 20000L;\r
+ config.getTimer().scheduleAtFixedRate(this, when - now, intvl);\r
+ worker = new Uploader();\r
+ }\r
+ /**\r
+ * Trigger check for expired log files and log files to upload\r
+ */\r
+ public void run() {\r
+ worker.poke();\r
+ }\r
+}\r