Merge "Fixed Blocker issue in DRRouterCLI.java"
[dmaap/datarouter.git] / datarouter-prov / src / main / java / org / onap / dmaap / datarouter / provisioning / utils / LogfileLoader.java
1 /*******************************************************************************\r
2  * ============LICENSE_START==================================================\r
3  * * org.onap.dmaap\r
4  * * ===========================================================================\r
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * * ===========================================================================\r
7  * * Licensed under the Apache License, Version 2.0 (the "License");\r
8  * * you may not use this file except in compliance with the License.\r
9  * * You may obtain a copy of the License at\r
10  * *\r
11  *  *      http://www.apache.org/licenses/LICENSE-2.0\r
12  * *\r
13  *  * Unless required by applicable law or agreed to in writing, software\r
14  * * distributed under the License is distributed on an "AS IS" BASIS,\r
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  * * See the License for the specific language governing permissions and\r
17  * * limitations under the License.\r
18  * * ============LICENSE_END====================================================\r
19  * *\r
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
21  * *\r
22  ******************************************************************************/\r
23 \r
24 \r
25 package org.onap.dmaap.datarouter.provisioning.utils;\r
26 \r
27 import java.io.File;\r
28 import java.io.FileInputStream;\r
29 import java.io.FileNotFoundException;\r
30 import java.io.FileReader;\r
31 import java.io.FilenameFilter;\r
32 import java.io.IOException;\r
33 import java.io.InputStreamReader;\r
34 import java.io.LineNumberReader;\r
35 import java.io.Reader;\r
36 import java.sql.Connection;\r
37 import java.sql.PreparedStatement;\r
38 import java.sql.ResultSet;\r
39 import java.sql.SQLException;\r
40 import java.sql.Statement;\r
41 import java.text.ParseException;\r
42 import java.util.Date;\r
43 import java.util.HashMap;\r
44 import java.util.Iterator;\r
45 import java.util.Map;\r
46 import java.util.TreeSet;\r
47 import java.util.zip.GZIPInputStream;\r
48 \r
49 import org.apache.log4j.Logger;\r
50 import org.onap.dmaap.datarouter.provisioning.BaseServlet;\r
51 import org.onap.dmaap.datarouter.provisioning.beans.DeliveryExtraRecord;\r
52 import org.onap.dmaap.datarouter.provisioning.beans.DeliveryRecord;\r
53 import org.onap.dmaap.datarouter.provisioning.beans.ExpiryRecord;\r
54 import org.onap.dmaap.datarouter.provisioning.beans.Loadable;\r
55 import org.onap.dmaap.datarouter.provisioning.beans.LogRecord;\r
56 import org.onap.dmaap.datarouter.provisioning.beans.Parameters;\r
57 import org.onap.dmaap.datarouter.provisioning.beans.PubFailRecord;\r
58 import org.onap.dmaap.datarouter.provisioning.beans.PublishRecord;\r
59 \r
60 /**\r
61  * This class provides methods that run in a separate thread, in order to process logfiles uploaded into the spooldir.\r
62  * These logfiles are loaded into the MariaDB LOG_RECORDS table. In a running provisioning server, there should only be\r
63  * two places where records can be loaded into this table; here, and in the method DB.retroFit4() which may be run at\r
64  * startup to load the old (1.0) style log tables into LOG_RECORDS;\r
65  * <p>This method maintains an {@link RLEBitSet} which can be used to easily see what records are presently in the\r
66  * database.\r
67  * This bit set is used to synchronize between provisioning servers.</p>\r
68  *\r
69  * @author Robert Eby\r
70  * @version $Id: LogfileLoader.java,v 1.22 2014/03/12 19:45:41 eby Exp $\r
71  */\r
72 public class LogfileLoader extends Thread {\r
73     /**\r
74      * Default number of log records to keep when pruning.  Keep 10M by default.\r
75      */\r
76     public static final long DEFAULT_LOG_RETENTION = 10000000L;\r
77     /**\r
78      * NOT USED: Percentage of free space required before old records are removed.\r
79      */\r
80     public static final int REQUIRED_FREE_PCT = 20;\r
81 \r
82     /**\r
83      * This is a singleton -- there is only one LogfileLoader object in the server\r
84      */\r
85     private static LogfileLoader p;\r
86 \r
87     /**\r
88      * Get the singleton LogfileLoader object, and start it if it is not running.\r
89      *\r
90      * @return the LogfileLoader\r
91      */\r
92     public static synchronized LogfileLoader getLoader() {\r
93         if (p == null)\r
94             p = new LogfileLoader();\r
95         if (!p.isAlive())\r
96             p.start();\r
97         return p;\r
98     }\r
99 \r
100     /**\r
101      * The PreparedStatement which is loaded by a <i>Loadable</i>.\r
102      */\r
103     public static final String INSERT_SQL = "insert into LOG_RECORDS values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";\r
104     /**\r
105      * Each server can assign this many IDs\r
106      */\r
107     private static final long SET_SIZE = (1L << 56);\r
108 \r
109     private final Logger logger;\r
110     private final DB db;\r
111     private final String spooldir;\r
112     private final long set_start;\r
113     private final long set_end;\r
114     private RLEBitSet seq_set;\r
115     private long nextid;\r
116     private boolean idle;\r
117 \r
118     private LogfileLoader() {\r
119         this.logger = Logger.getLogger("org.onap.dmaap.datarouter.provisioning.internal");\r
120         this.db = new DB();\r
121         this.spooldir = db.getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");\r
122         this.set_start = getIdRange();\r
123         this.set_end = set_start + SET_SIZE - 1;\r
124         this.seq_set = new RLEBitSet();\r
125         this.nextid = 0;\r
126         this.idle = false;\r
127 \r
128         // This is a potentially lengthy operation, so has been moved to run()\r
129         //initializeNextid();\r
130         this.setDaemon(true);\r
131         this.setName("LogfileLoader");\r
132     }\r
133 \r
134     private long getIdRange() {\r
135         long n;\r
136         if (BaseServlet.isInitialActivePOD())\r
137             n = 0;\r
138         else if (BaseServlet.isInitialStandbyPOD())\r
139             n = SET_SIZE;\r
140         else\r
141             n = SET_SIZE * 2;\r
142         String r = String.format("[%X .. %X]", n, n + SET_SIZE - 1);\r
143         logger.debug("This server shall assign RECORD_IDs in the range " + r);\r
144         return n;\r
145     }\r
146 \r
147     /**\r
148      * Return the bit set representing the record ID's that are loaded in this database.\r
149      *\r
150      * @return the bit set\r
151      */\r
152     public RLEBitSet getBitSet() {\r
153         return seq_set;\r
154     }\r
155 \r
156     /**\r
157      * True if the LogfileLoader is currently waiting for work.\r
158      *\r
159      * @return true if idle\r
160      */\r
161     public boolean isIdle() {\r
162         return idle;\r
163     }\r
164 \r
165     /**\r
166      * Run continuously to look for new logfiles in the spool directory and import them into the DB.\r
167      * The spool is checked once per second.  If free space on the MariaDB filesystem falls below\r
168      * REQUIRED_FREE_PCT (normally 20%) then the oldest logfile entries are removed and the LOG_RECORDS\r
169      * table is compacted until free space rises above the threshold.\r
170      */\r
171     @Override\r
172     public void run() {\r
173         initializeNextid();    // moved from the constructor\r
174         while (true) {\r
175             try {\r
176                 File dirfile = new File(spooldir);\r
177                 while (true) {\r
178                     // process IN files\r
179                     File[] infiles = dirfile.listFiles(new FilenameFilter() {\r
180                         @Override\r
181                         public boolean accept(File dir, String name) {\r
182                             return name.startsWith("IN.");\r
183                         }\r
184                     });\r
185 \r
186                     if (infiles.length == 0) {\r
187                         idle = true;\r
188                         try {\r
189                             Thread.sleep(1000L);\r
190                         } catch (InterruptedException e) {\r
191                             Thread.currentThread().interrupt();\r
192                         }\r
193                         idle = false;\r
194                     } else {\r
195                         // Remove old rows\r
196                         if (pruneRecords()) {\r
197                             // Removed at least some entries, recompute the bit map\r
198                             initializeNextid();\r
199                         }\r
200 \r
201                         // Process incoming logfiles\r
202                         for (File f : infiles) {\r
203                             if (logger.isDebugEnabled())\r
204                                 logger.debug("PROV8001 Starting " + f + " ...");\r
205                             long time = System.currentTimeMillis();\r
206                             int[] n = process(f);\r
207                             time = System.currentTimeMillis() - time;\r
208                             logger.info(String\r
209                                     .format("PROV8000 Processed %s in %d ms; %d of %d records.",\r
210                                             f.toString(), time, n[0], n[1]));\r
211                             f.delete();\r
212                         }\r
213                     }\r
214                 }\r
215             } catch (Exception e) {\r
216                 logger.warn("PROV0020: Caught exception in LogfileLoader: " + e);\r
217                 e.printStackTrace();\r
218             }\r
219         }\r
220     }\r
221 \r
222     private boolean pruneRecords() {\r
223         boolean did1 = false;\r
224         long count = countRecords();\r
225         long threshold = DEFAULT_LOG_RETENTION;\r
226         Parameters param = Parameters.getParameter(Parameters.PROV_LOG_RETENTION);\r
227         if (param != null) {\r
228             try {\r
229                 long n = Long.parseLong(param.getValue());\r
230                 // This check is to prevent inadvertent errors from wiping the table out\r
231                 if (n > 1000000L)\r
232                     threshold = n;\r
233             } catch (NumberFormatException e) {\r
234                 // ignore\r
235             }\r
236         }\r
237         logger.debug("Pruning LOG_RECORD table: records in DB=" + count + ", threshold=" + threshold);\r
238         if (count > threshold) {\r
239             count -= threshold;                        // we need to remove this many records;\r
240             Map<Long, Long> hist = getHistogram();    // histogram of records per day\r
241             // Determine the cutoff point to remove the needed number of records\r
242             long sum = 0;\r
243             long cutoff = 0;\r
244             for (Long day : new TreeSet<Long>(hist.keySet())) {\r
245                 sum += hist.get(day);\r
246                 cutoff = day;\r
247                 if (sum >= count)\r
248                     break;\r
249             }\r
250             cutoff++;\r
251             cutoff *= 86400000L;        // convert day to ms\r
252             logger.debug("  Pruning records older than=" + (cutoff / 86400000L) + " (" + new Date(cutoff) + ")");\r
253 \r
254             Connection conn = null;\r
255             try {\r
256                 // Limit to a million at a time to avoid typing up the DB for too long.\r
257                 conn = db.getConnection();\r
258                 try(PreparedStatement ps = conn.prepareStatement("DELETE from LOG_RECORDS where EVENT_TIME < ? limit 1000000")) {\r
259                     ps.setLong(1, cutoff);\r
260                     while (count > 0) {\r
261                         if (!ps.execute()) {\r
262                             int dcount = ps.getUpdateCount();\r
263                             count -= dcount;\r
264                             logger.debug("  " + dcount + " rows deleted.");\r
265                             did1 |= (dcount != 0);\r
266                             if (dcount == 0)\r
267                                 count = 0;    // prevent inf. loops\r
268                         } else {\r
269                             count = 0;    // shouldn't happen!\r
270                         }\r
271                     }\r
272                 }\r
273              try(Statement stmt = conn.createStatement()) {\r
274                  stmt.execute("OPTIMIZE TABLE LOG_RECORDS");\r
275              }\r
276             } catch (SQLException e) {\r
277                 System.err.println(e);\r
278                 e.printStackTrace();\r
279             } finally {\r
280                 db.release(conn);\r
281             }\r
282         }\r
283         return did1;\r
284     }\r
285 \r
286     private long countRecords() {\r
287         long count = 0;\r
288         Connection conn = null;\r
289         try {\r
290             conn = db.getConnection();\r
291            try(Statement stmt = conn.createStatement()) {\r
292                try(ResultSet rs = stmt.executeQuery("SELECT COUNT(*) as COUNT from LOG_RECORDS")) {\r
293                    if (rs.next()) {\r
294                        count = rs.getLong("COUNT");\r
295                    }\r
296                }\r
297            }\r
298          } catch (SQLException e) {\r
299             System.err.println(e);\r
300             e.printStackTrace();\r
301         } finally {\r
302             db.release(conn);\r
303         }\r
304         return count;\r
305     }\r
306 \r
307     private Map<Long, Long> getHistogram() {\r
308         Map<Long, Long> map = new HashMap<Long, Long>();\r
309         Connection conn = null;\r
310         try {\r
311             logger.debug("  LOG_RECORD table histogram...");\r
312             conn = db.getConnection();\r
313             try(Statement stmt = conn.createStatement()) {\r
314                 try(ResultSet rs = stmt.executeQuery("SELECT FLOOR(EVENT_TIME/86400000) AS DAY, COUNT(*) AS COUNT FROM LOG_RECORDS GROUP BY DAY")) {\r
315                     while (rs.next()) {\r
316                         long day = rs.getLong("DAY");\r
317                         long cnt = rs.getLong("COUNT");\r
318                         map.put(day, cnt);\r
319                         logger.debug("  " + day + "  " + cnt);\r
320                     }\r
321                 }\r
322             }\r
323            } catch (SQLException e) {\r
324             System.err.println(e);\r
325             e.printStackTrace();\r
326         } finally {\r
327             db.release(conn);\r
328         }\r
329         return map;\r
330     }\r
331 \r
332     private void initializeNextid() {\r
333         Connection conn = null;\r
334         try {\r
335             conn = db.getConnection();\r
336             RLEBitSet nbs = new RLEBitSet();\r
337             try(Statement stmt = conn.createStatement()) {\r
338                 // Build a bitset of all records in the LOG_RECORDS table\r
339                 // We need to run this SELECT in stages, because otherwise we run out of memory!\r
340                 final long stepsize = 6000000L;\r
341                 boolean go_again = true;\r
342                 for (long i = 0; go_again; i += stepsize) {\r
343                     String sql = String.format("select RECORD_ID from LOG_RECORDS LIMIT %d,%d", i, stepsize);\r
344                     try (ResultSet rs = stmt.executeQuery(sql)) {\r
345                         go_again = false;\r
346                         while (rs.next()) {\r
347                             long n = rs.getLong("RECORD_ID");\r
348                             nbs.set(n);\r
349                             go_again = true;\r
350                         }\r
351                     }\r
352                 }\r
353             }\r
354             seq_set = nbs;\r
355             // Compare with the range for this server\r
356             // Determine the next ID for this set of record IDs\r
357             RLEBitSet tbs = (RLEBitSet) nbs.clone();\r
358             RLEBitSet idset = new RLEBitSet();\r
359             idset.set(set_start, set_start + SET_SIZE);\r
360             tbs.and(idset);\r
361             long t = tbs.length();\r
362             nextid = (t == 0) ? set_start : (t - 1);\r
363             if (nextid >= set_start + SET_SIZE) {\r
364                 // Handle wraparound, when the IDs reach the end of our "range"\r
365                 Long[] last = null;\r
366                 Iterator<Long[]> li = tbs.getRangeIterator();\r
367                 while (li.hasNext()) {\r
368                     last = li.next();\r
369                 }\r
370                 if (last != null) {\r
371                     tbs.clear(last[0], last[1] + 1);\r
372                     t = tbs.length();\r
373                     nextid = (t == 0) ? set_start : (t - 1);\r
374                 }\r
375             }\r
376             logger.debug(String.format("initializeNextid, next ID is %d (%x)", nextid, nextid));\r
377         } catch (SQLException e) {\r
378             System.err.println(e);\r
379             e.printStackTrace();\r
380         } finally {\r
381             db.release(conn);\r
382         }\r
383     }\r
384 \r
385     @SuppressWarnings("resource")\r
386     private int[] process(File f) {\r
387         int ok = 0, total = 0;\r
388         try {\r
389             Connection conn = db.getConnection();\r
390             PreparedStatement ps = conn.prepareStatement(INSERT_SQL);\r
391             Reader r = f.getPath().endsWith(".gz")\r
392                     ? new InputStreamReader(new GZIPInputStream(new FileInputStream(f)))\r
393                     : new FileReader(f);\r
394             try(LineNumberReader in = new LineNumberReader(r)) {\r
395                 String line;\r
396                 while ((line = in.readLine()) != null) {\r
397                     try {\r
398                         for (Loadable rec : buildRecords(line)) {\r
399                             rec.load(ps);\r
400                             if (rec instanceof LogRecord) {\r
401                                 LogRecord lr = ((LogRecord) rec);\r
402                                 if (!seq_set.get(lr.getRecordId())) {\r
403                                     ps.executeUpdate();\r
404                                     seq_set.set(lr.getRecordId());\r
405                                 } else\r
406                                     logger.debug("Duplicate record ignored: " + lr.getRecordId());\r
407                             } else {\r
408                                 if (++nextid > set_end)\r
409                                     nextid = set_start;\r
410                                 ps.setLong(18, nextid);\r
411                                 ps.executeUpdate();\r
412                                 seq_set.set(nextid);\r
413                             }\r
414                             ps.clearParameters();\r
415                             ok++;\r
416                         }\r
417                     } catch (SQLException e) {\r
418                         logger.warn("PROV8003 Invalid value in record: " + line);\r
419                         logger.debug(e);\r
420                         e.printStackTrace();\r
421                     } catch (NumberFormatException e) {\r
422                         logger.warn("PROV8004 Invalid number in record: " + line);\r
423                         logger.debug(e);\r
424                         e.printStackTrace();\r
425                     } catch (ParseException e) {\r
426                         logger.warn("PROV8005 Invalid date in record: " + line);\r
427                         logger.debug(e);\r
428                         e.printStackTrace();\r
429                     } catch (Exception e) {\r
430                         logger.warn("PROV8006 Invalid pattern in record: " + line);\r
431                         logger.debug(e);\r
432                         e.printStackTrace();\r
433                     }\r
434                     total++;\r
435                 }\r
436             }\r
437             ps.close();\r
438             db.release(conn);\r
439             conn = null;\r
440         } catch (FileNotFoundException e) {\r
441             logger.warn("PROV8007 Exception reading " + f + ": " + e);\r
442         } catch (IOException e) {\r
443             logger.warn("PROV8007 Exception reading " + f + ": " + e);\r
444         } catch (SQLException e) {\r
445             logger.warn("PROV8007 Exception reading " + f + ": " + e);\r
446         }\r
447         return new int[]{ok, total};\r
448     }\r
449 \r
450     private Loadable[] buildRecords(String line) throws ParseException {\r
451         String[] pp = line.split("\\|");\r
452         if (pp != null && pp.length >= 7) {\r
453             String rtype = pp[1].toUpperCase();\r
454             if (rtype.equals("PUB") && pp.length == 11) {\r
455                 // Fields are: date|PUB|pubid|feedid|requrl|method|ctype|clen|srcip|user|status\r
456                 return new Loadable[]{new PublishRecord(pp)};\r
457             }\r
458             if (rtype.equals("DEL") && pp.length == 12) {\r
459                 // Fields are: date|DEL|pubid|feedid|subid|requrl|method|ctype|clen|user|status|xpubid\r
460                 String[] subs = pp[4].split("\\s+");\r
461                 if (subs != null) {\r
462                     Loadable[] rv = new Loadable[subs.length];\r
463                     for (int i = 0; i < subs.length; i++) {\r
464                         // create a new record for each individual sub\r
465                         pp[4] = subs[i];\r
466                         rv[i] = new DeliveryRecord(pp);\r
467                     }\r
468                     return rv;\r
469                 }\r
470             }\r
471             if (rtype.equals("EXP") && pp.length == 11) {\r
472                 // Fields are: date|EXP|pubid|feedid|subid|requrl|method|ctype|clen|reason|attempts\r
473                 ExpiryRecord e = new ExpiryRecord(pp);\r
474                 if (e.getReason().equals("other"))\r
475                     logger.info("Invalid reason '" + pp[9] + "' changed to 'other' for record: " + e.getPublishId());\r
476                 return new Loadable[]{e};\r
477             }\r
478             if (rtype.equals("PBF") && pp.length == 12) {\r
479                 // Fields are: date|PBF|pubid|feedid|requrl|method|ctype|clen-expected|clen-received|srcip|user|error\r
480                 return new Loadable[]{new PubFailRecord(pp)};\r
481             }\r
482             if (rtype.equals("DLX") && pp.length == 7) {\r
483                 // Fields are: date|DLX|pubid|feedid|subid|clen-tosend|clen-sent\r
484                 return new Loadable[]{new DeliveryExtraRecord(pp)};\r
485             }\r
486             if (rtype.equals("LOG") && (pp.length == 19 || pp.length == 20)) {\r
487                 // Fields are: date|LOG|pubid|feedid|requrl|method|ctype|clen|type|feedFileid|remoteAddr|user|status|subid|fileid|result|attempts|reason|record_id\r
488                 return new Loadable[]{new LogRecord(pp)};\r
489             }\r
490         }\r
491         logger.warn("PROV8002 bad record: " + line);\r
492         return new Loadable[0];\r
493     }\r
494 \r
495     /**\r
496      * The LogfileLoader can be run stand-alone by invoking the main() method of this class.\r
497      *\r
498      * @param a ignored\r
499      * @throws InterruptedException\r
500      */\r
501     public static void main(String[] a) throws InterruptedException {\r
502         LogfileLoader.getLoader();\r
503         Thread.sleep(200000L);\r
504     }\r
505 }\r