Refactor Prov DB handling
[dmaap/datarouter.git] / datarouter-prov / src / main / java / org / onap / dmaap / datarouter / provisioning / utils / LogfileLoader.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.provisioning.utils;
26
27 import com.att.eelf.configuration.EELFLogger;
28 import com.att.eelf.configuration.EELFManager;
29 import java.io.File;
30 import java.io.FileInputStream;
31 import java.io.FileReader;
32 import java.io.IOException;
33 import java.io.InputStreamReader;
34 import java.io.LineNumberReader;
35 import java.io.Reader;
36 import java.nio.file.Files;
37 import java.sql.Connection;
38 import java.sql.PreparedStatement;
39 import java.sql.ResultSet;
40 import java.sql.SQLException;
41 import java.sql.Statement;
42 import java.text.ParseException;
43 import java.util.Date;
44 import java.util.HashMap;
45 import java.util.Iterator;
46 import java.util.Map;
47 import java.util.TreeSet;
48 import java.util.zip.GZIPInputStream;
49 import org.onap.dmaap.datarouter.provisioning.BaseServlet;
50 import org.onap.dmaap.datarouter.provisioning.ProvRunner;
51 import org.onap.dmaap.datarouter.provisioning.beans.DeliveryExtraRecord;
52 import org.onap.dmaap.datarouter.provisioning.beans.DeliveryRecord;
53 import org.onap.dmaap.datarouter.provisioning.beans.ExpiryRecord;
54 import org.onap.dmaap.datarouter.provisioning.beans.Loadable;
55 import org.onap.dmaap.datarouter.provisioning.beans.LogRecord;
56 import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
57 import org.onap.dmaap.datarouter.provisioning.beans.PubFailRecord;
58 import org.onap.dmaap.datarouter.provisioning.beans.PublishRecord;
59
60 /**
61  * This class provides methods that run in a separate thread, in order to process logfiles uploaded into the spooldir.
62  * These logfiles are loaded into the MariaDB LOG_RECORDS table. In a running provisioning server, there should only be
63  * two places where records can be loaded into this table; here, and in the method DB.retroFit4() which may be run at
64  * startup to load the old (1.0) style log tables into LOG_RECORDS;
65  * <p>This method maintains an {@link RLEBitSet} which can be used to easily see what records are presently in the
66  * database.
67  * This bit set is used to synchronize between provisioning servers.</p>
68  *
69  * @author Robert Eby
70  * @version $Id: LogfileLoader.java,v 1.22 2014/03/12 19:45:41 eby Exp $
71  */
72 public class LogfileLoader extends Thread {
73     /**
74      * NOT USED: Percentage of free space required before old records are removed.
75      */
76     public static final int REQUIRED_FREE_PCT = 20;
77
78     /**
79      * This is a singleton -- there is only one LogfileLoader object in the server.
80      */
81     private static LogfileLoader logfileLoader;
82
83     /**
84      * Each server can assign this many IDs.
85      */
86     private static final long SET_SIZE = (1L << 56);
87
88     private final EELFLogger logger;
89     private final String spooldir;
90     private final long setStart;
91     private final long setEnd;
92     private RLEBitSet seqSet;
93     private long nextId;
94     private boolean idle;
95
96     private LogfileLoader() {
97         this.logger = EELFManager.getInstance().getLogger("InternalLog");
98         this.spooldir = ProvRunner.getProvProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
99         this.setStart = getIdRange();
100         this.setEnd = setStart + SET_SIZE - 1;
101         this.seqSet = new RLEBitSet();
102         this.nextId = 0;
103         this.idle = false;
104         this.setDaemon(true);
105         this.setName("LogfileLoader");
106     }
107
108     /**
109      * Get the singleton LogfileLoader object, and start it if it is not running.
110      *
111      * @return the LogfileLoader
112      */
113     public static synchronized LogfileLoader getLoader() {
114         if (logfileLoader == null) {
115             logfileLoader = new LogfileLoader();
116         }
117         if (!logfileLoader.isAlive()) {
118             logfileLoader.start();
119         }
120         return logfileLoader;
121     }
122
123     private long getIdRange() {
124         long size;
125         if (BaseServlet.isInitialActivePOD()) {
126             size = 0;
127         } else if (BaseServlet.isInitialStandbyPOD()) {
128             size = SET_SIZE;
129         } else {
130             size = SET_SIZE * 2;
131         }
132         String range = String.format("[%X .. %X]", size, size + SET_SIZE - 1);
133         logger.debug("This server shall assign RECORD_IDs in the range " + range);
134         return size;
135     }
136
137     /**
138      * Return the bit set representing the record ID's that are loaded in this database.
139      *
140      * @return the bit set
141      */
142     public RLEBitSet getBitSet() {
143         return seqSet;
144     }
145
146     /**
147      * True if the LogfileLoader is currently waiting for work.
148      *
149      * @return true if idle
150      */
151     public boolean isIdle() {
152         return idle;
153     }
154
155     /**
156      * Run continuously to look for new logfiles in the spool directory and import them into the DB.
157      * The spool is checked once per second.  If free space on the MariaDB filesystem falls below
158      * REQUIRED_FREE_PCT (normally 20%) then the oldest logfile entries are removed and the LOG_RECORDS
159      * table is compacted until free space rises above the threshold.
160      */
161     @Override
162     public void run() {
163         initializeNextid();
164         while (true) {
165             try {
166                 File dirfile = new File(spooldir);
167                 while (true) {
168                     runLogFileLoad(dirfile);
169                 }
170             } catch (Exception e) {
171                 logger.warn("PROV0020: Caught exception in LogfileLoader: " + e);
172             }
173         }
174     }
175
176     private void runLogFileLoad(File filesDir) {
177         File[] inFiles = filesDir.listFiles((dir, name) -> name.startsWith("IN."));
178         if (inFiles != null) {
179             if (inFiles.length == 0) {
180                 idle = true;
181                 try {
182                     Thread.sleep(1000L);
183                 } catch (InterruptedException e) {
184                     Thread.currentThread().interrupt();
185                 }
186                 idle = false;
187             } else {
188                 // Remove old rows
189                 if (pruneRecords()) {
190                     // Removed at least some entries, recompute the bit map
191                     initializeNextid();
192                 }
193                 for (File file : inFiles) {
194                     processFile(file);
195                 }
196             }
197         }
198     }
199
200     private void processFile(File infile) {
201         if (logger.isDebugEnabled()) {
202             logger.debug("PROV8001 Starting " + infile + " ...");
203         }
204         long time = System.currentTimeMillis();
205         int[] array = process(infile);
206         time = System.currentTimeMillis() - time;
207         logger.info(String.format("PROV8000 Processed %s in %d ms; %d of %d records.",
208             infile.toString(), time, array[0], array[1]));
209         try {
210             Files.delete(infile.toPath());
211         } catch (IOException e) {
212             logger.info("PROV8001 failed to delete file " + infile.getName(), e);
213         }
214     }
215
216     boolean pruneRecords() {
217         boolean did1 = false;
218         long count = countRecords();
219         Parameters defaultLogRetention = Parameters.getParameter(Parameters.DEFAULT_LOG_RETENTION);
220         long threshold = (defaultLogRetention != null) ? Long.parseLong(defaultLogRetention.getValue()) : 1000000L;
221         Parameters provLogRetention = Parameters.getParameter(Parameters.PROV_LOG_RETENTION);
222         if (provLogRetention != null) {
223             try {
224                 long retention = Long.parseLong(provLogRetention.getValue());
225                 // This check is to prevent inadvertent errors from wiping the table out
226                 if (retention > 1000000L) {
227                     threshold = retention;
228                 }
229             } catch (NumberFormatException e) {
230                 // ignore
231             }
232         }
233         logger.debug("Pruning LOG_RECORD table: records in DB=" + count + ", threshold=" + threshold);
234         if (count > threshold) {
235             // we need to remove this many records
236             count -= threshold;
237             // histogram of records per day
238             Map<Long, Long> hist = getHistogram();
239             // Determine the cutoff point to remove the needed number of records
240             long sum = 0;
241             long cutoff = 0;
242             for (Long day : new TreeSet<>(hist.keySet())) {
243                 sum += hist.get(day);
244                 cutoff = day;
245                 if (sum >= count) {
246                     break;
247                 }
248             }
249             cutoff++;
250             // convert day to ms
251             cutoff *= 86400000L;
252             logger.debug("  Pruning records older than=" + (cutoff / 86400000L) + " (" + new Date(cutoff) + ")");
253
254             try (Connection conn = ProvDbUtils.getInstance().getConnection()) {
255                 // Limit to a million at a time to avoid typing up the DB for too long.
256                 try (PreparedStatement ps = conn.prepareStatement(
257                     "DELETE from LOG_RECORDS where EVENT_TIME < ? limit 1000000")) {
258                     ps.setLong(1, cutoff);
259                     while (count > 0) {
260                         if (!ps.execute()) {
261                             int dcount = ps.getUpdateCount();
262                             count -= dcount;
263                             logger.debug("  " + dcount + " rows deleted.");
264                             did1 |= (dcount != 0);
265                             if (dcount == 0) {
266                                 count = 0;    // prevent inf. loops
267                             }
268                         } else {
269                             count = 0;    // shouldn't happen!
270                         }
271                     }
272                 }
273                 try (Statement stmt = conn.createStatement()) {
274                     stmt.execute("OPTIMIZE TABLE LOG_RECORDS");
275                 }
276             } catch (SQLException e) {
277                 logger.error("LogfileLoader.pruneRecords: " + e.getMessage(), e);
278             }
279         }
280         return did1;
281     }
282
283     private long countRecords() {
284         long count = 0;
285         try (Connection conn = ProvDbUtils.getInstance().getConnection();
286             PreparedStatement ps = conn.prepareStatement("SELECT COUNT(*) as COUNT from LOG_RECORDS");
287             ResultSet rs = ps.executeQuery()) {
288             if (rs.next()) {
289                 count = rs.getLong("COUNT");
290             }
291         } catch (SQLException e) {
292             logger.error("LogfileLoader.countRecords: " + e.getMessage(), e);
293         }
294         return count;
295     }
296
297     private Map<Long, Long> getHistogram() {
298         Map<Long, Long> map = new HashMap<>();
299         try (Connection conn = ProvDbUtils.getInstance().getConnection();
300             PreparedStatement ps = conn.prepareStatement(
301                 "SELECT FLOOR(EVENT_TIME/86400000) AS DAY, COUNT(*) AS COUNT FROM LOG_RECORDS GROUP BY DAY");
302             ResultSet rs = ps.executeQuery()) {
303             logger.debug("  LOG_RECORD table histogram...");
304             while (rs.next()) {
305                 long day = rs.getLong("DAY");
306                 long cnt = rs.getLong("COUNT");
307                 map.put(day, cnt);
308                 logger.debug("  " + day + "  " + cnt);
309             }
310         } catch (SQLException e) {
311             logger.error("LogfileLoader.getHistogram: " + e.getMessage(), e);
312         }
313         return map;
314     }
315
316     private void initializeNextid() {
317         try (Connection conn = ProvDbUtils.getInstance().getConnection()) {
318             RLEBitSet nbs = new RLEBitSet();
319             try (Statement stmt = conn.createStatement()) {
320                 // Build a bitset of all records in the LOG_RECORDS table
321                 // We need to run this SELECT in stages, because otherwise we run out of memory!
322                 final long stepsize = 6000000L;
323                 boolean goAgain = true;
324                 for (long i = 0; goAgain; i += stepsize) {
325                     String sql = String.format("select RECORD_ID from LOG_RECORDS LIMIT %d,%d", i, stepsize);
326                     try (ResultSet rs = stmt.executeQuery(sql)) {
327                         goAgain = false;
328                         while (rs.next()) {
329                             long recordId = rs.getLong("RECORD_ID");
330                             nbs.set(recordId);
331                             goAgain = true;
332                         }
333                     }
334                 }
335             }
336             seqSet = nbs;
337             // Compare with the range for this server
338             // Determine the next ID for this set of record IDs
339             RLEBitSet tbs = (RLEBitSet) nbs.clone();
340             RLEBitSet idset = new RLEBitSet();
341             idset.set(setStart, setStart + SET_SIZE);
342             tbs.and(idset);
343             long bitLength = tbs.length();
344             nextId = (bitLength == 0) ? setStart : (bitLength - 1);
345             if (nextId >= setStart + SET_SIZE) {
346                 // Handle wraparound, when the IDs reach the end of our "range"
347                 Long[] last = null;
348                 Iterator<Long[]> li = tbs.getRangeIterator();
349                 while (li.hasNext()) {
350                     last = li.next();
351                 }
352                 if (last != null) {
353                     tbs.clear(last[0], last[1] + 1);
354                     bitLength = tbs.length();
355                     nextId = (bitLength == 0) ? setStart : (bitLength - 1);
356                 }
357             }
358             logger.debug(String.format("LogfileLoader.initializeNextid, next ID is %d (%x)", nextId, nextId));
359         } catch (SQLException e) {
360             logger.error("LogfileLoader.initializeNextid: " + e.getMessage(), e);
361         }
362     }
363
364     @SuppressWarnings("resource")
365     int[] process(File file) {
366         int ok = 0;
367         int total = 0;
368         try (Connection conn = ProvDbUtils.getInstance().getConnection();
369             PreparedStatement ps = conn.prepareStatement(
370                 "insert into LOG_RECORDS values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) {
371             Reader reader = file.getPath().endsWith(".gz")
372                 ? new InputStreamReader(new GZIPInputStream(new FileInputStream(file)))
373                 : new FileReader(file);
374             try (LineNumberReader in = new LineNumberReader(reader)) {
375                 String line;
376                 while ((line = in.readLine()) != null) {
377                     try {
378                         for (Loadable rec : buildRecords(line)) {
379                             rec.load(ps);
380                             if (rec instanceof LogRecord) {
381                                 LogRecord lr = ((LogRecord) rec);
382                                 if (!seqSet.get(lr.getRecordId())) {
383                                     ps.executeUpdate();
384                                     seqSet.set(lr.getRecordId());
385                                 } else {
386                                     logger.debug("Duplicate record ignored: " + lr.getRecordId());
387                                 }
388                             } else {
389                                 if (++nextId > setEnd) {
390                                     nextId = setStart;
391                                 }
392                                 ps.setLong(18, nextId);
393                                 ps.executeUpdate();
394                                 seqSet.set(nextId);
395                             }
396                             ps.clearParameters();
397                             ok++;
398                         }
399                     } catch (SQLException e) {
400                         logger.warn("PROV8003 Invalid value in record: " + line, e);
401                     } catch (NumberFormatException e) {
402                         logger.warn("PROV8004 Invalid number in record: " + line, e);
403                     } catch (ParseException e) {
404                         logger.warn("PROV8005 Invalid date in record: " + line, e);
405                     } catch (Exception e) {
406                         logger.warn("PROV8006 Invalid pattern in record: " + line, e);
407                     }
408                     total++;
409                 }
410             }
411         } catch (SQLException | IOException e) {
412             logger.warn("PROV8007 Exception reading " + file + ": " + e);
413         }
414         return new int[]{ok, total};
415     }
416
417     Loadable[] buildRecords(String line) throws ParseException {
418         String[] pp = line.split("\\|");
419         if (pp != null && pp.length >= 7) {
420             String rtype = pp[1].toUpperCase();
421             if ("PUB".equals(rtype) && pp.length == 11) {
422                 // Fields are: date|PUB|pubid|feedid|requrl|method|ctype|clen|srcip|user|status
423                 return new Loadable[]{new PublishRecord(pp)};
424             }
425             if ("DEL".equals(rtype) && pp.length == 12) {
426                 // Fields are: date|DEL|pubid|feedid|subid|requrl|method|ctype|clen|user|status|xpubid
427                 String[] subs = pp[4].split("\\s+");
428                 if (subs != null) {
429                     Loadable[] rv = new Loadable[subs.length];
430                     for (int i = 0; i < subs.length; i++) {
431                         // create a new record for each individual sub
432                         pp[4] = subs[i];
433                         rv[i] = new DeliveryRecord(pp);
434                     }
435                     return rv;
436                 }
437             }
438             if ("EXP".equals(rtype) && pp.length == 11) {
439                 // Fields are: date|EXP|pubid|feedid|subid|requrl|method|ctype|clen|reason|attempts
440                 ExpiryRecord expiryRecord = new ExpiryRecord(pp);
441                 if ("other".equals(expiryRecord.getReason())) {
442                     logger.info("Invalid reason '" + pp[9] + "' changed to 'other' for record: "
443                         + expiryRecord.getPublishId());
444                 }
445                 return new Loadable[]{expiryRecord};
446             }
447             if ("PBF".equals(rtype) && pp.length == 12) {
448                 // Fields are: date|PBF|pubid|feedid|requrl|method|ctype|clen-expected|clen-received|srcip|user|error
449                 return new Loadable[]{new PubFailRecord(pp)};
450             }
451             if ("DLX".equals(rtype) && pp.length == 7) {
452                 // Fields are: date|DLX|pubid|feedid|subid|clen-tosend|clen-sent
453                 return new Loadable[]{new DeliveryExtraRecord(pp)};
454             }
455             if ("LOG".equals(rtype) && (pp.length == 19 || pp.length == 20)) {
456                 // Fields are: date|LOG|pubid|feedid|requrl|method|ctype|clen|type|
457                 // feedFileid|remoteAddr|user|status|subid|fileid|result|attempts|reason|record_id
458                 return new Loadable[]{new LogRecord(pp)};
459             }
460         }
461         logger.warn("PROV8002 bad record: " + line);
462         return new Loadable[0];
463     }
464
465     /**
466      * The LogfileLoader can be run stand-alone by invoking the main() method of this class.
467      *
468      * @param str ignored
469      */
470     public static void main(String[] str) throws InterruptedException {
471         LogfileLoader.getLoader();
472         Thread.sleep(200000L);
473     }
474 }