More unit test coverage and code cleanup
[dmaap/datarouter.git] / datarouter-prov / src / main / java / org / onap / dmaap / datarouter / provisioning / utils / LogfileLoader.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.provisioning.utils;
26
27 import com.att.eelf.configuration.EELFLogger;
28 import com.att.eelf.configuration.EELFManager;
29 import java.io.File;
30 import java.io.FileInputStream;
31 import java.io.FileReader;
32 import java.io.IOException;
33 import java.io.InputStreamReader;
34 import java.io.LineNumberReader;
35 import java.io.Reader;
36 import java.nio.file.Files;
37 import java.sql.Connection;
38 import java.sql.PreparedStatement;
39 import java.sql.ResultSet;
40 import java.sql.SQLException;
41 import java.sql.Statement;
42 import java.text.ParseException;
43 import java.util.Date;
44 import java.util.HashMap;
45 import java.util.Iterator;
46 import java.util.Map;
47 import java.util.TreeSet;
48 import java.util.zip.GZIPInputStream;
49 import org.onap.dmaap.datarouter.provisioning.BaseServlet;
50 import org.onap.dmaap.datarouter.provisioning.beans.DeliveryExtraRecord;
51 import org.onap.dmaap.datarouter.provisioning.beans.DeliveryRecord;
52 import org.onap.dmaap.datarouter.provisioning.beans.ExpiryRecord;
53 import org.onap.dmaap.datarouter.provisioning.beans.Loadable;
54 import org.onap.dmaap.datarouter.provisioning.beans.LogRecord;
55 import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
56 import org.onap.dmaap.datarouter.provisioning.beans.PubFailRecord;
57 import org.onap.dmaap.datarouter.provisioning.beans.PublishRecord;
58
59 /**
60  * This class provides methods that run in a separate thread, in order to process logfiles uploaded into the spooldir.
61  * These logfiles are loaded into the MariaDB LOG_RECORDS table. In a running provisioning server, there should only be
62  * two places where records can be loaded into this table; here, and in the method DB.retroFit4() which may be run at
63  * startup to load the old (1.0) style log tables into LOG_RECORDS;
64  * <p>This method maintains an {@link RLEBitSet} which can be used to easily see what records are presently in the
65  * database.
66  * This bit set is used to synchronize between provisioning servers.</p>
67  *
68  * @author Robert Eby
69  * @version $Id: LogfileLoader.java,v 1.22 2014/03/12 19:45:41 eby Exp $
70  */
71 public class LogfileLoader extends Thread {
72     /**
73      * NOT USED: Percentage of free space required before old records are removed.
74      */
75     public static final int REQUIRED_FREE_PCT = 20;
76
77     /**
78      * This is a singleton -- there is only one LogfileLoader object in the server.
79      */
80     private static LogfileLoader logfileLoader;
81
82     /**
83      * The PreparedStatement which is loaded by a <i>Loadable</i>.
84      */
85     private static final String INSERT_SQL = "insert into LOG_RECORDS values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
86     /**
87      * Each server can assign this many IDs.
88      */
89     private static final long SET_SIZE = (1L << 56);
90
91     private final EELFLogger logger;
92     private final DB db;
93     private final String spooldir;
94     private final long setStart;
95     private final long setEnd;
96     private RLEBitSet seqSet;
97     private long nextId;
98     private boolean idle;
99
100     private LogfileLoader() {
101         this.logger = EELFManager.getInstance().getLogger("InternalLog");
102         this.db = new DB();
103         this.spooldir = db.getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
104         this.setStart = getIdRange();
105         this.setEnd = setStart + SET_SIZE - 1;
106         this.seqSet = new RLEBitSet();
107         this.nextId = 0;
108         this.idle = false;
109         this.setDaemon(true);
110         this.setName("LogfileLoader");
111     }
112
113     /**
114      * Get the singleton LogfileLoader object, and start it if it is not running.
115      *
116      * @return the LogfileLoader
117      */
118     public static synchronized LogfileLoader getLoader() {
119         if (logfileLoader == null) {
120             logfileLoader = new LogfileLoader();
121         }
122         if (!logfileLoader.isAlive()) {
123             logfileLoader.start();
124         }
125         return logfileLoader;
126     }
127
128     private long getIdRange() {
129         long n;
130         if (BaseServlet.isInitialActivePOD()) {
131             n = 0;
132         } else if (BaseServlet.isInitialStandbyPOD()) {
133             n = SET_SIZE;
134         } else {
135             n = SET_SIZE * 2;
136         }
137         String r = String.format("[%X .. %X]", n, n + SET_SIZE - 1);
138         logger.debug("This server shall assign RECORD_IDs in the range " + r);
139         return n;
140     }
141
142     /**
143      * Return the bit set representing the record ID's that are loaded in this database.
144      *
145      * @return the bit set
146      */
147     public RLEBitSet getBitSet() {
148         return seqSet;
149     }
150
151     /**
152      * True if the LogfileLoader is currently waiting for work.
153      *
154      * @return true if idle
155      */
156     public boolean isIdle() {
157         return idle;
158     }
159
160     /**
161      * Run continuously to look for new logfiles in the spool directory and import them into the DB.
162      * The spool is checked once per second.  If free space on the MariaDB filesystem falls below
163      * REQUIRED_FREE_PCT (normally 20%) then the oldest logfile entries are removed and the LOG_RECORDS
164      * table is compacted until free space rises above the threshold.
165      */
166     @Override
167     public void run() {
168         initializeNextid();
169         while (true) {
170             try {
171                 File dirfile = new File(spooldir);
172                 while (true) {
173                     runLogFileLoad(dirfile);
174                 }
175             } catch (Exception e) {
176                 logger.warn("PROV0020: Caught exception in LogfileLoader: " + e);
177             }
178         }
179     }
180
181     private void runLogFileLoad(File filesDir) {
182         File[] inFiles = filesDir.listFiles((dir, name) -> name.startsWith("IN."));
183         if (inFiles != null) {
184             if (inFiles.length == 0) {
185                 idle = true;
186                 try {
187                     Thread.sleep(1000L);
188                 } catch (InterruptedException e) {
189                     Thread.currentThread().interrupt();
190                 }
191                 idle = false;
192             } else {
193                 // Remove old rows
194                 if (pruneRecords()) {
195                     // Removed at least some entries, recompute the bit map
196                     initializeNextid();
197                 }
198                 for (File file : inFiles) {
199                     processFile(file);
200                 }
201             }
202         }
203     }
204
205     private void processFile(File infile) {
206         if (logger.isDebugEnabled()) {
207             logger.debug("PROV8001 Starting " + infile + " ...");
208         }
209         long time = System.currentTimeMillis();
210         int[] n = process(infile);
211         time = System.currentTimeMillis() - time;
212         logger.info(String.format("PROV8000 Processed %s in %d ms; %d of %d records.",
213             infile.toString(), time, n[0], n[1]));
214         try {
215             Files.delete(infile.toPath());
216         } catch (IOException e) {
217             logger.info("PROV8001 failed to delete file " + infile.getName(), e);
218         }
219     }
220
221     boolean pruneRecords() {
222         boolean did1 = false;
223         long count = countRecords();
224         Parameters defaultLogRetention = Parameters.getParameter(Parameters.DEFAULT_LOG_RETENTION);
225         long threshold = (defaultLogRetention != null) ? Long.parseLong(defaultLogRetention.getValue()) : 1000000L;
226         Parameters provLogRetention = Parameters.getParameter(Parameters.PROV_LOG_RETENTION);
227         if (provLogRetention != null) {
228             try {
229                 long n = Long.parseLong(provLogRetention.getValue());
230                 // This check is to prevent inadvertent errors from wiping the table out
231                 if (n > 1000000L) {
232                     threshold = n;
233                 }
234             } catch (NumberFormatException e) {
235                 // ignore
236             }
237         }
238         logger.debug("Pruning LOG_RECORD table: records in DB=" + count + ", threshold=" + threshold);
239         if (count > threshold) {
240             // we need to remove this many records
241             count -= threshold;
242             // histogram of records per day
243             Map<Long, Long> hist = getHistogram();
244             // Determine the cutoff point to remove the needed number of records
245             long sum = 0;
246             long cutoff = 0;
247             for (Long day : new TreeSet<>(hist.keySet())) {
248                 sum += hist.get(day);
249                 cutoff = day;
250                 if (sum >= count) {
251                     break;
252                 }
253             }
254             cutoff++;
255             // convert day to ms
256             cutoff *= 86400000L;
257             logger.debug("  Pruning records older than=" + (cutoff / 86400000L) + " (" + new Date(cutoff) + ")");
258
259             Connection conn = null;
260             try {
261                 // Limit to a million at a time to avoid typing up the DB for too long.
262                 conn = db.getConnection();
263                 try (PreparedStatement ps = conn.prepareStatement("DELETE from LOG_RECORDS where EVENT_TIME < ? limit 1000000")) {
264                     ps.setLong(1, cutoff);
265                     while (count > 0) {
266                         if (!ps.execute()) {
267                             int dcount = ps.getUpdateCount();
268                             count -= dcount;
269                             logger.debug("  " + dcount + " rows deleted.");
270                             did1 |= (dcount != 0);
271                             if (dcount == 0) {
272                                 count = 0;    // prevent inf. loops
273                             }
274                         } else {
275                             count = 0;    // shouldn't happen!
276                         }
277                     }
278                 }
279                 try (Statement stmt = conn.createStatement()) {
280                     stmt.execute("OPTIMIZE TABLE LOG_RECORDS");
281                 }
282             } catch (SQLException e) {
283                 logger.error("LogfileLoader.pruneRecords: " + e.getMessage(), e);
284             } finally {
285                 db.release(conn);
286             }
287         }
288         return did1;
289     }
290
291     private long countRecords() {
292         long count = 0;
293         try (Connection conn = db.getConnection();
294             Statement stmt = conn.createStatement()) {
295             try (ResultSet rs = stmt.executeQuery("SELECT COUNT(*) as COUNT from LOG_RECORDS")) {
296                 if (rs.next()) {
297                     count = rs.getLong("COUNT");
298                 }
299             } finally {
300                 db.release(conn);
301             }
302         } catch (SQLException e) {
303             logger.error("LogfileLoader.countRecords: " + e.getMessage(), e);
304         }
305         return count;
306     }
307
308     private Map<Long, Long> getHistogram() {
309         Map<Long, Long> map = new HashMap<>();
310         try (Connection conn = db.getConnection();
311             Statement stmt = conn.createStatement()) {
312             logger.debug("  LOG_RECORD table histogram...");
313             try (ResultSet rs = stmt.executeQuery("SELECT FLOOR(EVENT_TIME/86400000) AS DAY, COUNT(*) AS COUNT FROM LOG_RECORDS GROUP BY DAY")) {
314                 while (rs.next()) {
315                     long day = rs.getLong("DAY");
316                     long cnt = rs.getLong("COUNT");
317                     map.put(day, cnt);
318                     logger.debug("  " + day + "  " + cnt);
319                 }
320             } finally {
321                 db.release(conn);
322             }
323         } catch (SQLException e) {
324             logger.error("LogfileLoader.getHistogram: " + e.getMessage(), e);
325         }
326         return map;
327     }
328
329     private void initializeNextid() {
330         Connection conn = null;
331         try {
332             conn = db.getConnection();
333             RLEBitSet nbs = new RLEBitSet();
334             try (Statement stmt = conn.createStatement()) {
335                 // Build a bitset of all records in the LOG_RECORDS table
336                 // We need to run this SELECT in stages, because otherwise we run out of memory!
337                 final long stepsize = 6000000L;
338                 boolean goAgain = true;
339                 for (long i = 0; goAgain; i += stepsize) {
340                     String sql = String.format("select RECORD_ID from LOG_RECORDS LIMIT %d,%d", i, stepsize);
341                     try (ResultSet rs = stmt.executeQuery(sql)) {
342                         goAgain = false;
343                         while (rs.next()) {
344                             long n = rs.getLong("RECORD_ID");
345                             nbs.set(n);
346                             goAgain = true;
347                         }
348                     }
349                 }
350             }
351             seqSet = nbs;
352             // Compare with the range for this server
353             // Determine the next ID for this set of record IDs
354             RLEBitSet tbs = (RLEBitSet) nbs.clone();
355             RLEBitSet idset = new RLEBitSet();
356             idset.set(setStart, setStart + SET_SIZE);
357             tbs.and(idset);
358             long t = tbs.length();
359             nextId = (t == 0) ? setStart : (t - 1);
360             if (nextId >= setStart + SET_SIZE) {
361                 // Handle wraparound, when the IDs reach the end of our "range"
362                 Long[] last = null;
363                 Iterator<Long[]> li = tbs.getRangeIterator();
364                 while (li.hasNext()) {
365                     last = li.next();
366                 }
367                 if (last != null) {
368                     tbs.clear(last[0], last[1] + 1);
369                     t = tbs.length();
370                     nextId = (t == 0) ? setStart : (t - 1);
371                 }
372             }
373             logger.debug(String.format("LogfileLoader.initializeNextid, next ID is %d (%x)", nextId, nextId));
374         } catch (SQLException e) {
375             logger.error("LogfileLoader.initializeNextid: " + e.getMessage(), e);
376         } finally {
377             db.release(conn);
378         }
379     }
380
381     @SuppressWarnings("resource")
382     int[] process(File f) {
383         int ok = 0;
384         int total = 0;
385         try {
386             Connection conn = db.getConnection();
387             PreparedStatement ps = conn.prepareStatement(INSERT_SQL);
388             Reader r = f.getPath().endsWith(".gz")
389                 ? new InputStreamReader(new GZIPInputStream(new FileInputStream(f)))
390                 : new FileReader(f);
391             try (LineNumberReader in = new LineNumberReader(r)) {
392                 String line;
393                 while ((line = in.readLine()) != null) {
394                     try {
395                         for (Loadable rec : buildRecords(line)) {
396                             rec.load(ps);
397                             if (rec instanceof LogRecord) {
398                                 LogRecord lr = ((LogRecord) rec);
399                                 if (!seqSet.get(lr.getRecordId())) {
400                                     ps.executeUpdate();
401                                     seqSet.set(lr.getRecordId());
402                                 } else {
403                                     logger.debug("Duplicate record ignored: " + lr.getRecordId());
404                                 }
405                             } else {
406                                 if (++nextId > setEnd) {
407                                     nextId = setStart;
408                                 }
409                                 ps.setLong(18, nextId);
410                                 ps.executeUpdate();
411                                 seqSet.set(nextId);
412                             }
413                             ps.clearParameters();
414                             ok++;
415                         }
416                     } catch (SQLException e) {
417                         logger.warn("PROV8003 Invalid value in record: " + line, e);
418                     } catch (NumberFormatException e) {
419                         logger.warn("PROV8004 Invalid number in record: " + line, e);
420                     } catch (ParseException e) {
421                         logger.warn("PROV8005 Invalid date in record: " + line, e);
422                     } catch (Exception e) {
423                         logger.warn("PROV8006 Invalid pattern in record: " + line, e);
424                     }
425                     total++;
426                 }
427             }
428             ps.close();
429             db.release(conn);
430         } catch (SQLException | IOException e) {
431             logger.warn("PROV8007 Exception reading " + f + ": " + e);
432         }
433         return new int[]{ok, total};
434     }
435
436     Loadable[] buildRecords(String line) throws ParseException {
437         String[] pp = line.split("\\|");
438         if (pp != null && pp.length >= 7) {
439             String rtype = pp[1].toUpperCase();
440             if ("PUB".equals(rtype) && pp.length == 11) {
441                 // Fields are: date|PUB|pubid|feedid|requrl|method|ctype|clen|srcip|user|status
442                 return new Loadable[]{new PublishRecord(pp)};
443             }
444             if ("DEL".equals(rtype) && pp.length == 12) {
445                 // Fields are: date|DEL|pubid|feedid|subid|requrl|method|ctype|clen|user|status|xpubid
446                 String[] subs = pp[4].split("\\s+");
447                 if (subs != null) {
448                     Loadable[] rv = new Loadable[subs.length];
449                     for (int i = 0; i < subs.length; i++) {
450                         // create a new record for each individual sub
451                         pp[4] = subs[i];
452                         rv[i] = new DeliveryRecord(pp);
453                     }
454                     return rv;
455                 }
456             }
457             if ("EXP".equals(rtype) && pp.length == 11) {
458                 // Fields are: date|EXP|pubid|feedid|subid|requrl|method|ctype|clen|reason|attempts
459                 ExpiryRecord e = new ExpiryRecord(pp);
460                 if ("other".equals(e.getReason())) {
461                     logger.info("Invalid reason '" + pp[9] + "' changed to 'other' for record: " + e.getPublishId());
462                 }
463                 return new Loadable[]{e};
464             }
465             if ("PBF".equals(rtype) && pp.length == 12) {
466                 // Fields are: date|PBF|pubid|feedid|requrl|method|ctype|clen-expected|clen-received|srcip|user|error
467                 return new Loadable[]{new PubFailRecord(pp)};
468             }
469             if ("DLX".equals(rtype) && pp.length == 7) {
470                 // Fields are: date|DLX|pubid|feedid|subid|clen-tosend|clen-sent
471                 return new Loadable[]{new DeliveryExtraRecord(pp)};
472             }
473             if ("LOG".equals(rtype) && (pp.length == 19 || pp.length == 20)) {
474                 // Fields are: date|LOG|pubid|feedid|requrl|method|ctype|clen|type|feedFileid|remoteAddr|user|status|subid|fileid|result|attempts|reason|record_id
475                 return new Loadable[]{new LogRecord(pp)};
476             }
477         }
478         logger.warn("PROV8002 bad record: " + line);
479         return new Loadable[0];
480     }
481
482     /**
483      * The LogfileLoader can be run stand-alone by invoking the main() method of this class.
484      *
485      * @param a ignored
486      */
487     public static void main(String[] a) throws InterruptedException {
488         LogfileLoader.getLoader();
489         Thread.sleep(200000L);
490     }
491 }