Support exporting to the kryo format in the dataSnapshot script 13/138213/3
authorFiete Ostkamp <Fiete.Ostkamp@telekom.de>
Wed, 12 Jun 2024 14:53:33 +0000 (16:53 +0200)
committerFiete Ostkamp <Fiete.Ostkamp@telekom.de>
Thu, 13 Jun 2024 13:28:10 +0000 (15:28 +0200)
- add support for the kryo format when creating a database snapshot

Issue-ID: AAI-3872
Change-Id: Iba085f7227bbfcb928370443c2e0eac95e49bb9c
Signed-off-by: Fiete Ostkamp <Fiete.Ostkamp@telekom.de>
src/main/java/org/onap/aai/datasnapshot/DataSnapshot.java
src/test/java/org/onap/aai/datasnapshot/DataSnapshotTest.java

index a9312b2..cdb858e 100644 (file)
@@ -67,7 +67,7 @@ import com.beust.jcommander.ParameterException;
 public class DataSnapshot {
 
        private static Logger LOGGER;
-       
+
        /* Using realtime d */
        private static final String REALTIME_DB = "realtime";
 
@@ -79,10 +79,10 @@ public class DataSnapshot {
                SNAPSHOT_RELOAD_COMMANDS.add("RELOAD_DATA");
                SNAPSHOT_RELOAD_COMMANDS.add("RELOAD_DATA_MULTI");
        }
-       
+
        private CommandLineArgs cArgs;
-       
-       
+
+
        /**
         * The main method.
         *
@@ -101,7 +101,7 @@ public class DataSnapshot {
                DataSnapshot dataSnapshot = new DataSnapshot();
                success = dataSnapshot.executeCommand(args, success, dbClearFlag, graph, command,
                                oldSnapshotFileName);
-               
+
                if(success){
                        AAISystemExitUtil.systemExitCloseAAIGraph(0);
                } else {
@@ -114,7 +114,7 @@ public class DataSnapshot {
        public boolean executeCommand(String[] args, boolean success,
                        Boolean dbClearFlag, JanusGraph graph, String command,
                        String oldSnapshotFileName) {
-                               
+
                // Set the logging file properties to be used by EELFManager
                System.setProperty("aai.service.name", DataSnapshot.class.getSimpleName());
                Properties props = System.getProperties();
@@ -122,7 +122,7 @@ public class DataSnapshot {
                props.setProperty(Configuration.PROPERTY_LOGGING_FILE_PATH, AAIConstants.AAI_HOME_BUNDLECONFIG);
                LOGGER = LoggerFactory.getLogger(DataSnapshot.class);
                cArgs = new CommandLineArgs();
-               
+
                String itemName = "aai.datasnapshot.threads.for.create";
                try {
                        String val = AAIConfig.get(itemName);
@@ -144,8 +144,8 @@ public class DataSnapshot {
                        LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
                }
                long maxNodesPerFile4Create = cArgs.maxNodesPerFile;
-                               
-               cArgs.snapshotType = "graphson";
+
+               // cArgs.snapshotType = "graphson";
                Long vertAddDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_VERTEX_ADD_DELAY_MS;
                itemName = "aai.datasnapshot.vertex.add.delay.ms";
                try {
@@ -156,7 +156,7 @@ public class DataSnapshot {
                }catch ( Exception e ){
                        LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
                }
-               
+
                Long edgeAddDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_EDGE_ADD_DELAY_MS;
                itemName = "aai.datasnapshot.edge.add.delay.ms";
                try {
@@ -167,7 +167,7 @@ public class DataSnapshot {
                }catch ( Exception e ){
                        LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
                }
-               
+
                Long failureDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_FAILURE_DELAY_MS;
                itemName = "aai.datasnapshot.failure.delay.ms";
                try {
@@ -178,7 +178,7 @@ public class DataSnapshot {
                }catch ( Exception e ){
                        LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
                }
-               
+
                Long retryDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_RETRY_DELAY_MS;
                itemName = "aai.datasnapshot.retry.delay.ms";
                try {
@@ -189,7 +189,7 @@ public class DataSnapshot {
                }catch ( Exception e ){
                        LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
                }
-               
+
                int maxErrorsPerThread = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_MAX_ERRORS_PER_THREAD;
                itemName = "aai.datasnapshot.max.errors.per.thread";
                try {
@@ -200,7 +200,7 @@ public class DataSnapshot {
                }catch ( Exception e ){
                        LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
                }
-               
+
                Long vertToEdgeProcDelay = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_VERTEX_TO_EDGE_PROC_DELAY_MS;
                itemName = "aai.datasnapshot.vertex.to.edge.proc.delay.ms";
                try {
@@ -211,7 +211,7 @@ public class DataSnapshot {
                }catch ( Exception e ){
                        LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
                }
-               
+
                itemName = "aai.datasnapshot.stagger.thread.delay.ms";
                try {
                        String val = AAIConfig.get(itemName);
@@ -220,28 +220,29 @@ public class DataSnapshot {
                        }
                }catch ( Exception e ){
                        LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
-               }               
-       
+               }
+
                long debugAddDelayTime = 1;  // Default to 1 millisecond
                Boolean debug4Create = false;  // By default we do not use debugging for snapshot creation
-               
+
                JCommander jCommander;
                try {
                        jCommander = new JCommander(cArgs, args);
                        jCommander.setProgramName(DataSnapshot.class.getSimpleName());
                } catch (ParameterException e1) {
                        AAIException ae = new AAIException("AAI_6128", e1 , "Error - invalid value passed to list of args - "+args);
-                       ErrorLogHelper.logException(ae);                        
+                       ErrorLogHelper.logException(ae);
                        AAISystemExitUtil.systemExitCloseAAIGraph(1);
                }
-                               
+
                if (args.length >= 1) {
                        command = cArgs.command;
                }
-               
+
                String source = cArgs.caller;
 
-        String snapshotType = "graphson";
+    // String snapshotType = "graphson";
+    String snapshotType = cArgs.snapshotType;
                if( SNAPSHOT_RELOAD_COMMANDS.contains(cArgs.command)){
                        if (args.length >= 2) {
                                // If re-loading, they need to also pass the snapshot file name to use.
@@ -253,38 +254,10 @@ public class DataSnapshot {
                else if( command.equals("THREADED_SNAPSHOT") ){
                        if (args.length >= 2) {
                                // If doing a "threaded" snapshot, they need to specify how many threads to use
-                               try {
-                                       threadCount4Create = cArgs.threadCount;
-                               }
-                               catch ( NumberFormatException nfe ){
-                                       ErrorLogHelper.logError("AAI_6128", "Bad (non-integer) threadCount passed to DataSnapshot [" + cArgs.threadCount + "]");
-                                       LOGGER.debug("Bad (non-integer) threadCount passed to DataSnapshot [" + cArgs.threadCount + "]");
-                                       AAISystemExitUtil.systemExitCloseAAIGraph(1);
-                               }
-                               if( threadCount4Create < 1 || threadCount4Create > 100 ){
-                                       ErrorLogHelper.logError("AAI_6128", "Out of range (1-100) threadCount passed to DataSnapshot [" + cArgs.threadCount + "]");
-                                       LOGGER.debug("Out of range (1-100) threadCount passed to DataSnapshot [" + cArgs.threadCount + "]");
-                                       AAISystemExitUtil.systemExitCloseAAIGraph(1);
-                               }
-                               LOGGER.debug(" Will do Threaded Snapshot with threadCount = " + threadCount4Create );
-                               
-                               try {
-                                       maxNodesPerFile4Create = cArgs.maxNodesPerFile;
-                               }
-                               catch ( NumberFormatException nfe ){
-                                       ErrorLogHelper.logError("AAI_6128", "Bad (non-long) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]");
-                                       LOGGER.debug("Bad (non-long) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]");
-                                       AAISystemExitUtil.systemExitCloseAAIGraph(1);
-                               }
-                               
-                               if( maxNodesPerFile4Create < 1000 || maxNodesPerFile4Create > 1000000 ){
-                                       ErrorLogHelper.logError("AAI_6128", "Out of range (1000-1000000) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]");
-                                       LOGGER.debug("Out of range (1000-1000000) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]");
-                                       LOGGER.debug("Out of range (1000-1000000) maxNodesPerFile >> Recommended value = 120000)");
-                                       AAISystemExitUtil.systemExitCloseAAIGraph(1);
-                               }
-                               LOGGER.debug(" Will do Threaded Snapshot with maxNodesPerFile = " + maxNodesPerFile4Create );
-                               
+                               threadCount4Create = validateThreadCount(cArgs);
+
+                               validateMaxNodesPerFile(cArgs);
+
                                // If doing a "threaded" snapshot, they need to specify how many threads to use
                                // They can also use debug mode if they pass the word "DEBUG" to do the nodes one at a time to see where it breaks.
                                if( cArgs.debugFlag.equals("DEBUG") ){
@@ -292,21 +265,9 @@ public class DataSnapshot {
                                }
                                LOGGER.debug(" Will do Threaded Snapshot with threadCount = " + threadCount4Create +
                                                ", and DEBUG-flag set to: " + debug4Create );
-                               
+
                                if (debug4Create) {
-                                       // If doing a "threaded" snapshot, they need to specify how many threads to use (param 1)
-                                       // They can also use debug mode if they pass the word "DEBUG" to do the nodes one (param 2)
-                                       // They can also pass a delayTimer - how many milliseconds to put between each node's ADD (param 3)
-                                       try {
-                                               debugAddDelayTime = cArgs.debugAddDelayTime;
-                                       } catch (NumberFormatException nfe) {
-                                               ErrorLogHelper.logError("AAI_6128",     "Bad (non-integer) debugAddDelayTime passed to DataSnapshot ["
-                                                                               + cArgs.debugAddDelayTime + "]");
-                                               LOGGER.debug("Bad (non-integer) debugAddDelayTime passed to DataSnapshot ["+ cArgs.debugAddDelayTime + "]");
-                                               AAISystemExitUtil.systemExitCloseAAIGraph(1);
-                                       }
-                                       LOGGER.debug(" Will do Threaded Snapshot with threadCount = "+ threadCount4Create + ", DEBUG-flag set to: "
-                                                       + debug4Create + ", and addDelayTimer = " + debugAddDelayTime + " mSec. ");
+                                       debugAddDelayTime = validateDebugAddDelayTime(cArgs, threadCount4Create, debug4Create);
                                }
                        }
                        else {
@@ -324,26 +285,14 @@ public class DataSnapshot {
                                // it is a multi-part snapshot, then this should be the root of the name.
                                // We will be using the default delay timers.
                                oldSnapshotFileName = cArgs.oldFileName;
-                               
+
                                // They should be passing the timers in in this order:
                                //    vertDelay, edgeDelay, failureDelay, retryDelay
                                vertAddDelayMs = cArgs.vertAddDelayMs;
                                edgeAddDelayMs = cArgs.edgeAddDelayMs;
                                failureDelayMs = cArgs.failureDelayMs;
                                retryDelayMs = cArgs.retryDelayMs;
-                               try {
-                                       maxErrorsPerThread = cArgs.maxErrorsPerThread;
-                               }
-                               catch ( NumberFormatException nfe ){
-                                       ErrorLogHelper.logError("AAI_6128", "Bad (non-integer) maxErrorsPerThread passed to DataSnapshot [" + cArgs.maxErrorsPerThread + "]");
-                                       LOGGER.debug("Bad (non-integer) maxErrorsPerThread passed to DataSnapshot [" + cArgs.maxErrorsPerThread + "]");
-                                       AAISystemExitUtil.systemExitCloseAAIGraph(1);
-                               }
-                               if( maxErrorsPerThread < 1  ){
-                                       ErrorLogHelper.logError("AAI_6128", "Out of range (>0) maxErrorsPerThread passed to DataSnapshot [" + cArgs.maxErrorsPerThread + "]");
-                                       LOGGER.debug("Out of range (>0) maxErrorsPerThread passed to DataSnapshot [" + cArgs.maxErrorsPerThread + "]");
-                                       AAISystemExitUtil.systemExitCloseAAIGraph(1);
-                               }
+                               maxErrorsPerThread = validateMaxErrorsPerThread(cArgs);
                        }
                        else {
                                ErrorLogHelper.logError("AAI_6128", "Wrong param count (should be either 2 or 7) when using MUTLITHREAD_RELOAD.");
@@ -371,7 +320,7 @@ public class DataSnapshot {
                LOGGER.debug("File name to reload snapshot [" + cArgs.oldFileName + "]");
                LOGGER.debug("snapshotType is [" + cArgs.snapshotType + "]");
                LOGGER.debug("Thread count is [" + cArgs.threadCount + "]");
-               LOGGER.debug("Max Nodes Per File is [" + cArgs.maxNodesPerFile + "]");  
+               LOGGER.debug("Max Nodes Per File is [" + cArgs.maxNodesPerFile + "]");
                LOGGER.debug("Debug Flag is [" + cArgs.debugFlag + "]");
                LOGGER.debug("DebugAddDelayTimer is [" + cArgs.debugAddDelayTime + "]");
                LOGGER.debug("VertAddDelayMs is [" + cArgs.vertAddDelayMs + "]");
@@ -392,8 +341,8 @@ public class DataSnapshot {
                if (!AAIConfig.isEmpty(cArgs.oldFileDir)){
                        LOGGER.debug("Directory path (if not default) to load the old snapshot file from is [" + cArgs.oldFileDir + "]");
                }
-               
-               
+
+
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                try {
                        AAIConfig.init();
@@ -403,36 +352,16 @@ public class DataSnapshot {
                        // Make sure the dataSnapshots directory is there
                        new File(targetDir).mkdirs();
                        LOGGER.debug("    ---- NOTE --- about to open graph (takes a little while) ");
-                       
+
                        if ( (command.equals("THREADED_SNAPSHOT") || command.equals("JUST_TAKE_SNAPSHOT"))
                                        && threadCount4Create == 1 ){
-                               // -------------------------------------------------------------------------------
-                               // They want to take a snapshot on a single thread and have it go in a single file
-                               //   NOTE - they can't use the DEBUG option in this case.
-                               // -------------------------------------------------------------------------------
-                               LOGGER.debug(" Command = " + command );
-                               verifyGraph(AAIGraph.getInstance().getGraph());
-                               FormatDate fd = new FormatDate("yyyyMMddHHmm", "GMT");
-                               String dteStr = fd.getDateTime();
-                               graph = AAIGraph.getInstance().getGraph();
-                               GraphAdminDBUtils.logConfigs(graph.configuration());
-                               String newSnapshotOutFname = null;
-                               long timeA = System.nanoTime();
-                               newSnapshotOutFname = targetDir + AAIConstants.AAI_FILESEP + "dataSnapshot.graphSON." + dteStr;
-                               graph.io(IoCore.graphson()).writeGraph(newSnapshotOutFname);
-                               LOGGER.debug("Snapshot written to " + newSnapshotOutFname);
-                               long timeB = System.nanoTime();
-                               long diffTime =  timeB - timeA;
-                               long minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
-                               long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount);
-                               LOGGER.debug("    -- Single-Thread dataSnapshot took: " +
-                                               minCount + " minutes, " + secCount + " seconds " );
-       
-                       }       
-                       else if ( (command.equals("THREADED_SNAPSHOT") || command.equals("JUST_TAKE_SNAPSHOT")) 
-                                       && threadCount4Create > 1 ){                            
+                               graph = writeSnapshotFile(command, targetDir, snapshotType);
+
+                       }
+                       else if ( (command.equals("THREADED_SNAPSHOT") || command.equals("JUST_TAKE_SNAPSHOT"))
+                                       && threadCount4Create > 1 ){
                                        // ------------------------------------------------------------
-                                       // They want the creation of the snapshot to be spread out via 
+                                       // They want the creation of the snapshot to be spread out via
                                        //    threads and go to multiple files
                                        // ------------------------------------------------------------
                                        LOGGER.debug(" Command = " + command );
@@ -442,7 +371,7 @@ public class DataSnapshot {
                                        } else {
                                                FormatDate fd = new FormatDate("yyyyMMddHHmm", "GMT");
                                                String dteStr = fd.getDateTime();
-                                               newSnapshotOutFname = targetDir + AAIConstants.AAI_FILESEP 
+                                               newSnapshotOutFname = targetDir + AAIConstants.AAI_FILESEP
                                                                + "dataSnapshot.graphSON." + dteStr;
                                        }
                                        verifyGraph(AAIGraph.getInstance().getGraph());
@@ -460,12 +389,12 @@ public class DataSnapshot {
                                        LOGGER.debug("    -- To count all vertices in DB it took: " +
                                                        minCount + " minutes, " + secCount + " seconds " );
                                        LOGGER.debug(" Total Count of Nodes in DB = " + totalVertCount + ".");
-                                       
-                                       int fileCount4Create = figureOutFileCount( totalVertCount, threadCount4Create, 
+
+                                       int fileCount4Create = figureOutFileCount( totalVertCount, threadCount4Create,
                                                        maxNodesPerFile4Create );
-                                       int threadPassesNeeded = (int) Math.ceil((double)fileCount4Create / (double)threadCount4Create);        
-                                       long nodesPerFile = (long) Math.ceil((double)totalVertCount / (double)fileCount4Create);  
-                                       
+                                       int threadPassesNeeded = (int) Math.ceil((double)fileCount4Create / (double)threadCount4Create);
+                                       long nodesPerFile = (long) Math.ceil((double)totalVertCount / (double)fileCount4Create);
+
                                        LOGGER.debug(" We will run this many simultaneous threads: " + threadCount4Create );
                                        LOGGER.debug(" Required number of passes: " + threadPassesNeeded );
                                        LOGGER.debug(" Max Nodes per file: " + maxNodesPerFile4Create );
@@ -479,8 +408,8 @@ public class DataSnapshot {
                                                String tk = "" + t;
                                                vertIdListHash.put( tk, vIdList);
                                        }
-                                                               
-                                       int currentTNum = 0; 
+
+                                       int currentTNum = 0;
                                        String currentTKey = "0";
                                        long thisThrIndex = 0;
                                        Iterator <Vertex> vtxItr = graph.vertices();  // Getting ALL vertices!
@@ -496,12 +425,12 @@ public class DataSnapshot {
                                                long vid = (long)(vtxItr.next()).id();
                                                (vertIdListHash.get(currentTKey)).add(vid);
                                        }
-                                       
+
                                        // close this graph instance thing here since we have all the ids
                                        graph.tx().rollback();
                                        graph.tx().close();
-                                       
-                                       
+
+
                                        long timeB = System.nanoTime();
                                        diffTime =  timeB - timeA2;
                                        minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
@@ -512,9 +441,9 @@ public class DataSnapshot {
                                        // Need to print out each set of vertices using it's own thread
                                        // NOTE - we may have more files to generate than number of threads - which
                                        //    just means that ALL the files won't necessarily be generated in parallel.
-                                       
+
                                        int fileNo = 0;
-                                       for( int passNo = 1; passNo <= threadPassesNeeded; passNo++ ){          
+                                       for( int passNo = 1; passNo <= threadPassesNeeded; passNo++ ){
                                                ArrayList <Thread> threadArr = new ArrayList <Thread> ();
                                                // For each Pass, kick off all the threads and wait until they finish
                                                long timeP1 = System.nanoTime();
@@ -522,14 +451,14 @@ public class DataSnapshot {
                                                        String fileNoStr = "" + fileNo;
                                                        String subFName = newSnapshotOutFname + ".P" + fileNoStr;
                                                        LOGGER.debug(" DEBUG >>> kick off pass # " + passNo + ", thread # " + thNum);
-                                                       Thread thr = new Thread(new PrintVertexDetails(graph, subFName, 
+                                                       Thread thr = new Thread(new PrintVertexDetails(graph, subFName,
                                                                        vertIdListHash.get(fileNoStr),
-                                                                       debug4Create, debugAddDelayTime, 
+                                                                       debug4Create, debugAddDelayTime,
                                                                        snapshotType, LOGGER) );
                                                        thr.start();
                                                        threadArr.add(thr);
                                                        fileNo++;
-                                               }                                       
+                                               }
                                                // Make sure all the threads finish before considering this Pass finished.
                                                for( int thNum = 0; thNum < threadCount4Create; thNum++ ){
                                                        if( null != threadArr.get(thNum) ){
@@ -543,7 +472,7 @@ public class DataSnapshot {
                                                LOGGER.debug(" Pass number " + passNo + " (out of " + threadPassesNeeded +
                                                                ") took " + minCount + " minutes, " + secCount + " seconds ");
                                        }
-                                       
+
                                        long timeC = System.nanoTime();
                                        diffTime =  timeC - timeB;
                                        minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
@@ -551,7 +480,7 @@ public class DataSnapshot {
                                        LOGGER.debug("   -- To write all the data out to snapshot files, it took: " +
                                                        minCount + " minutes, " + secCount + " seconds " );
 
-                                       
+
                        } else if( command.equals("MULTITHREAD_RELOAD") ){
                                // ---------------------------------------------------------------------
                                // They want the RELOAD of the snapshot to be spread out via threads
@@ -559,11 +488,11 @@ public class DataSnapshot {
                                //    snapshot is  written to.  Ie. if you have a single-file snapshot,
                                //    then this will be single-threaded.
                                // If the number of files is greater than the 'threadCount' parameter,
-                               //    then we will use more than one pass to keep the number of simultaneous 
+                               //    then we will use more than one pass to keep the number of simultaneous
                                //    threads below the threadCount param.
                                //
                                LOGGER.debug(" Command = " + command );
-                               
+
                                if (cArgs.oldFileDir != null && !cArgs.oldFileDir.isEmpty()){
                                        targetDir = cArgs.oldFileDir;
                                }
@@ -579,11 +508,11 @@ public class DataSnapshot {
 
                                ExecutorService executor = Executors.newFixedThreadPool(fCount);
                                int threadFailCount = 0;
-                               
+
                                LOGGER.debug(" -- vertAddDelayMs = " + vertAddDelayMs
                                                + ", failureDelayMs = " + failureDelayMs + ", retryDelayMs = " + retryDelayMs
                                                + ", maxErrorsPerThread = " + maxErrorsPerThread );
-                                       
+
                                // --------------------------------------
                                // Step 1 -- Load empty vertices
                                // --------------------------------------
@@ -606,18 +535,18 @@ public class DataSnapshot {
                                                listFutV.add(future);
                                                LOGGER.debug(" --  Starting PartialDbLoad VERT_ONLY file # "+ fileNo
                                                                + "( passNo = " + passNo + ", passIndex = " + thisPassCount + ")");
-                                               
+
                                                thisPassCount++;
                                                fileNo++;
                                        }
-                                       
+
                                        int threadCount4Reload = 0;
                                        for(Future<HashMap<String,String>> fut : listFutV){
                                                threadCount4Reload++;
                                                try {
                                                        old2NewVertIdMap.putAll(fut.get());
-                                                       LOGGER.debug(" -- back from PartialVertexLoader.  returned pass # " 
-                                                                       + passNo + ", thread # " 
+                                                       LOGGER.debug(" -- back from PartialVertexLoader.  returned pass # "
+                                                                       + passNo + ", thread # "
                                                                        + threadCount4Reload +
                                                                        ", current size of old2NewVertMap is: " + old2NewVertIdMap.size() );
                                                }
@@ -633,7 +562,7 @@ public class DataSnapshot {
                                                }
                                        }
                                } // end of passes for loading empty vertices
-                                       
+
                                executor.shutdown();
                                if( threadFailCount > 0 ) {
                                        String emsg = " FAILURE >> " + threadFailCount + " Vertex-loader thread(s) failed to complete successfully.  ";
@@ -651,7 +580,7 @@ public class DataSnapshot {
                                // Give the DB a little time to chew on all those new vertices
                                Thread.sleep(vertToEdgeProcDelay);
 
-                                       
+
                                // -------------------------------------------------------------
                                // Step 2 -- Load Edges and properties onto the empty vertices
                                // -------------------------------------------------------------
@@ -677,9 +606,9 @@ public class DataSnapshot {
                                                // add future to the list, we can wait for it below
                                                listFutEdg.add(future);
                                                LOGGER.debug(" --  Starting PartialPropAndEdge file # "
-                                                               + fileNo + " (pass # " + passNo + ", passIndex " 
+                                                               + fileNo + " (pass # " + passNo + ", passIndex "
                                                                + thisPassCount + ")" );
-                                               
+
                                                thisPassCount++;
                                                fileNo++;
                                        }
@@ -688,7 +617,7 @@ public class DataSnapshot {
                                        for( Future<ArrayList<String>> fut : listFutEdg ){
                                                threadCount4Reload++;
                                                try{
-                                                       fut.get();  
+                                                       fut.get();
                                                        LOGGER.debug(" -- back from PartialPropAndEdgeLoader.  pass # "
                                                                        + passNo + ", thread # " + threadCount4Reload  );
                                                }
@@ -703,8 +632,8 @@ public class DataSnapshot {
                                                        ErrorLogHelper.logException(ae);
                                                }
                                        }
-                                       
-                               } // end of passes for reloading edges and properties 
+
+                               } // end of passes for reloading edges and properties
 
                                executor.shutdown();
 
@@ -770,7 +699,7 @@ public class DataSnapshot {
                                LOGGER.debug("     reloading data or the data will be put in without indexes. ");
                                dbClearFlag = true;
                                LOGGER.debug("All done clearing DB");
-                               
+
                        } else if (command.equals("RELOAD_DATA")) {
                                // ---------------------------------------------------------------------------
                                // They want to restore the database from either a single file, or a group
@@ -786,7 +715,7 @@ public class DataSnapshot {
                                        LOGGER.debug(emsg);
                                        AAISystemExitUtil.systemExitCloseAAIGraph(1);
                                }
-                               
+
                                long timeA = System.nanoTime();
 
                                ArrayList <File> snapFilesArr = new ArrayList <File> ();
@@ -809,15 +738,15 @@ public class DataSnapshot {
                                                }
                                        }
                                }
-                               
+
                                if( snapFilesArr.isEmpty() ){
                                        String emsg = "oldSnapshotFile " + onePieceSnapshotFname + "(with or without .P0) could not be found.";
                                        LOGGER.debug(emsg);
                                        AAISystemExitUtil.systemExitCloseAAIGraph(1);
                                }
-                               
+
                                int fCount = snapFilesArr.size();
-                               Vector<InputStream> inputStreamsV = new Vector<>();                  
+                               Vector<InputStream> inputStreamsV = new Vector<>();
                                for( int i = 0; i < fCount; i++ ){
                                        File f = snapFilesArr.get(i);
                                        String fname = f.getName();
@@ -850,14 +779,14 @@ public class DataSnapshot {
 
                                long vCount = graph.traversal().V().count().next();
                                LOGGER.debug("A little after repopulating from an old snapshot, we see: " + vCount + " vertices in the db.");
-                               
+
                                long timeB = System.nanoTime();
                                long diffTime =  timeB - timeA;
                                long minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
                                long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount);
                                LOGGER.debug("    -- To Reload this snapshot, it took: " +
                                                minCount + " minutes, " + secCount + " seconds " );
-                               
+
                                LOGGER.debug("A little after repopulating from an old snapshot, we see: " + vCount + " vertices in the db.");
 
                        } else {
@@ -892,7 +821,122 @@ public class DataSnapshot {
 
                return success;
        }
-       
+
+
+       private JanusGraph writeSnapshotFile(String command, String targetDir, String format) throws IOException {
+               JanusGraph graph;
+               // -------------------------------------------------------------------------------
+               // They want to take a snapshot on a single thread and have it go in a single file
+               //   NOTE - they can't use the DEBUG option in this case.
+               // -------------------------------------------------------------------------------
+               if (format != "graphson" && format != "gryo") {
+                       format = "graphson";
+               }
+               LOGGER.debug(" Command = " + command );
+               verifyGraph(AAIGraph.getInstance().getGraph());
+               FormatDate fd = new FormatDate("yyyyMMddHHmm", "GMT");
+               String dteStr = fd.getDateTime();
+               graph = AAIGraph.getInstance().getGraph();
+               GraphAdminDBUtils.logConfigs(graph.configuration());
+               String newSnapshotOutFname = null;
+               long timeA = System.nanoTime();
+
+               if(format == "gryo") {
+                       newSnapshotOutFname = targetDir + AAIConstants.AAI_FILESEP + "snapshot_" + dteStr + ".gryo";
+                       graph.io(IoCore.gryo()).writeGraph(newSnapshotOutFname);
+               } else {
+                       newSnapshotOutFname = targetDir + AAIConstants.AAI_FILESEP + "dataSnapshot.graphSON." + dteStr;
+                       graph.io(IoCore.graphson()).writeGraph(newSnapshotOutFname);
+               }
+               LOGGER.debug("Snapshot written to " + newSnapshotOutFname);
+               long timeB = System.nanoTime();
+               long diffTime =  timeB - timeA;
+               long minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
+               long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount);
+               LOGGER.debug("    -- Single-Thread dataSnapshot took: " +
+                               minCount + " minutes, " + secCount + " seconds " );
+               return graph;
+       }
+
+
+       private int validateMaxErrorsPerThread(CommandLineArgs cArgs) {
+               int maxErrorsPerThread = 0;
+               try {
+                       maxErrorsPerThread = cArgs.maxErrorsPerThread;
+               }
+               catch ( NumberFormatException nfe ){
+                       ErrorLogHelper.logError("AAI_6128", "Bad (non-integer) maxErrorsPerThread passed to DataSnapshot [" + cArgs.maxErrorsPerThread + "]");
+                       LOGGER.debug("Bad (non-integer) maxErrorsPerThread passed to DataSnapshot [" + cArgs.maxErrorsPerThread + "]");
+                       AAISystemExitUtil.systemExitCloseAAIGraph(1);
+               }
+               if( maxErrorsPerThread < 1  ){
+                       ErrorLogHelper.logError("AAI_6128", "Out of range (>0) maxErrorsPerThread passed to DataSnapshot [" + cArgs.maxErrorsPerThread + "]");
+                       LOGGER.debug("Out of range (>0) maxErrorsPerThread passed to DataSnapshot [" + cArgs.maxErrorsPerThread + "]");
+                       AAISystemExitUtil.systemExitCloseAAIGraph(1);
+               }
+               return maxErrorsPerThread;
+       }
+
+
+       private long validateDebugAddDelayTime(CommandLineArgs cArgs, int threadCount4Create, Boolean debug4Create) {
+               // If doing a "threaded" snapshot, they need to specify how many threads to use (param 1)
+               // They can also use debug mode if they pass the word "DEBUG" to do the nodes one (param 2)
+               // They can also pass a delayTimer - how many milliseconds to put between each node's ADD (param 3)
+               long debugAddDelayTime = 0;
+               try {
+                       debugAddDelayTime = cArgs.debugAddDelayTime;
+               } catch (NumberFormatException nfe) {
+                       ErrorLogHelper.logError("AAI_6128",     "Bad (non-integer) debugAddDelayTime passed to DataSnapshot ["
+                                                       + cArgs.debugAddDelayTime + "]");
+                       LOGGER.debug("Bad (non-integer) debugAddDelayTime passed to DataSnapshot ["+ cArgs.debugAddDelayTime + "]");
+                       AAISystemExitUtil.systemExitCloseAAIGraph(1);
+               }
+               LOGGER.debug(" Will do Threaded Snapshot with threadCount = "+ threadCount4Create + ", DEBUG-flag set to: "
+                               + debug4Create + ", and addDelayTimer = " + debugAddDelayTime + " mSec. ");
+               return debugAddDelayTime;
+       }
+
+
+       private void validateMaxNodesPerFile(CommandLineArgs cArgs) {
+               long maxNodesPerFile4Create = 0;
+               try {
+                       maxNodesPerFile4Create = cArgs.maxNodesPerFile;
+               }
+               catch ( NumberFormatException nfe ){
+                       ErrorLogHelper.logError("AAI_6128", "Bad (non-long) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]");
+                       LOGGER.debug("Bad (non-long) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]");
+                       AAISystemExitUtil.systemExitCloseAAIGraph(1);
+               }
+
+               if( maxNodesPerFile4Create < 1000 || maxNodesPerFile4Create > 1000000 ){
+                       ErrorLogHelper.logError("AAI_6128", "Out of range (1000-1000000) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]");
+                       LOGGER.debug("Out of range (1000-1000000) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]");
+                       LOGGER.debug("Out of range (1000-1000000) maxNodesPerFile >> Recommended value = 120000)");
+                       AAISystemExitUtil.systemExitCloseAAIGraph(1);
+               }
+               LOGGER.debug(" Will do Threaded Snapshot with maxNodesPerFile = " + maxNodesPerFile4Create );
+       }
+
+
+       private int validateThreadCount(CommandLineArgs cArgs) {
+               int threadCount4Create = 0;
+               try {
+                       threadCount4Create = cArgs.threadCount;
+               }
+               catch ( NumberFormatException nfe ){
+                       ErrorLogHelper.logError("AAI_6128", "Bad (non-integer) threadCount passed to DataSnapshot [" + cArgs.threadCount + "]");
+                       LOGGER.debug("Bad (non-integer) threadCount passed to DataSnapshot [" + cArgs.threadCount + "]");
+                       AAISystemExitUtil.systemExitCloseAAIGraph(1);
+               }
+               if( threadCount4Create < 1 || threadCount4Create > 100 ){
+                       ErrorLogHelper.logError("AAI_6128", "Out of range (1-100) threadCount passed to DataSnapshot [" + cArgs.threadCount + "]");
+                       LOGGER.debug("Out of range (1-100) threadCount passed to DataSnapshot [" + cArgs.threadCount + "]");
+                       AAISystemExitUtil.systemExitCloseAAIGraph(1);
+               }
+               LOGGER.debug(" Will do Threaded Snapshot with threadCount = " + threadCount4Create );
+               return threadCount4Create;
+       }
+
 
        private static ArrayList <File> getFilesToProcess(String targetDir, String oldSnapshotFileName, boolean doingClearDb)
                throws Exception {
@@ -972,26 +1016,26 @@ public class DataSnapshot {
 
        }
 
-       
-       public static int figureOutFileCount( long totalVertCount, int threadCount4Create, 
+
+       public static int figureOutFileCount( long totalVertCount, int threadCount4Create,
                        long maxNodesPerFile ) {
-               
+
                // NOTE - we would always like to use all of our threads.  That is, if
                //   we could process all the data with 16 threads, but our threadCount4Create is
-               //   only 15, we will do two passes and use all 15 threads each pass which will 
+               //   only 15, we will do two passes and use all 15 threads each pass which will
                //   create a total of 30 files.  Each file will be a bit smaller so the overall
                //   time for the two passes should be faster.
                if( totalVertCount <= 0 || threadCount4Create <= 0 || maxNodesPerFile <= 0) {
                        return 1;
                }
-               
-               long maxNodesPerPass = threadCount4Create * maxNodesPerFile;    
-               int numberOfPasses = (int) Math.ceil( (double)totalVertCount / (double)maxNodesPerPass);        
+
+               long maxNodesPerPass = threadCount4Create * maxNodesPerFile;
+               int numberOfPasses = (int) Math.ceil( (double)totalVertCount / (double)maxNodesPerPass);
                int fileCt = threadCount4Create * numberOfPasses;
-               
-               return fileCt;          
+
+               return fileCt;
        }
-       
+
 
        class CommandLineArgs {
 
@@ -1009,7 +1053,7 @@ public class DataSnapshot {
 
                @Parameter(names = "-threadCount", description = "thread count for create")
                public int threadCount = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_THREADS_FOR_CREATE;
-                               
+
                @Parameter(names = "-maxNodesPerFile", description = "Max nodes per file")
                public long maxNodesPerFile = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_MAX_NODES_PER_FILE_FOR_CREATE;
 
@@ -1018,13 +1062,13 @@ public class DataSnapshot {
 
                @Parameter(names = "-debugAddDelayTime", description = "delay in ms between each Add for debug mode")
                public long debugAddDelayTime = 1L;
-               
+
                @Parameter(names = "-vertAddDelayMs", description = "delay in ms while adding each vertex")
                public long vertAddDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_VERTEX_ADD_DELAY_MS.longValue();
-               
+
                @Parameter(names = "-edgeAddDelayMs", description = "delay in ms while adding each edge")
                public long edgeAddDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_EDGE_ADD_DELAY_MS.longValue();
-               
+
                @Parameter(names = "-failureDelayMs", description = "delay in ms when failure to load vertex or edge in snapshot")
                public long failureDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_FAILURE_DELAY_MS.longValue();
 
@@ -1033,25 +1077,25 @@ public class DataSnapshot {
 
                @Parameter(names = "-maxErrorsPerThread", description = "max errors allowed per thread")
                public int maxErrorsPerThread = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_MAX_ERRORS_PER_THREAD;
-               
+
                @Parameter(names = "-vertToEdgeProcDelay", description = "vertex to edge processing delay in ms")
                public long vertToEdgeProcDelay = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_VERTEX_TO_EDGE_PROC_DELAY_MS.longValue();
-               
+
                @Parameter(names = "-staggerThreadDelay", description = "thread delay stagger time in ms")
                public long staggerThreadDelay = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_STAGGER_THREAD_DELAY_MS;
-                                               
+
                @Parameter(names = "-fileName", description = "file name for generating snapshot ")
                public String fileName = "";
-               
+
                @Parameter(names = "-snapshotDir", description = "file path for generating snapshot ")
                public String snapshotDir = "";
-               
+
                @Parameter(names = "-oldFileDir", description = "directory containing the old snapshot file for reloading")
                public String oldFileDir = "";
-               
+
                @Parameter(names = "-caller", description = "process invoking the dataSnapshot")
                public String caller = "";
-               
+
        }
-       
-}
\ No newline at end of file
+
+}
index 9d74651..cc61e99 100644 (file)
@@ -56,7 +56,7 @@ public class DataSnapshotTest extends AAISetup {
     private JanusGraphTransaction currentTransaction;
 
     private List<Vertex> vertexes;
-    
+
     private static final int DELAYSINGLETHREADTEST = 90;
 
     @Rule
@@ -126,15 +126,13 @@ public class DataSnapshotTest extends AAISetup {
          assertThat(outputCapture.toString(), containsString("graphson had no data."));
     }
 
-    
-    @Ignore("Unit test failing temporarily ignore")
     @Test
     public void testTakeSnapshotAndItShouldCreateASnapshotFileWithOneVertex() throws IOException, InterruptedException {
 
         String logsFolder     = System.getProperty("AJSC_HOME") + "/logs/data/dataSnapshots/";
 
         Set<Path> preSnapshotFiles = Files.walk(Paths.get(logsFolder)).collect(Collectors.toSet());
-        
+
         // previous test may have the same generated file name, this wait will ensure a new name is used for this test
         System.out.println("delay generation, seconds " + DELAYSINGLETHREADTEST);
         Thread.sleep(DELAYSINGLETHREADTEST*1000);
@@ -149,7 +147,7 @@ public class DataSnapshotTest extends AAISetup {
 
         Set<Path> postSnapshotFiles = Files.walk(Paths.get(logsFolder)).collect(Collectors.toSet());
 
-        assertThat(postSnapshotFiles.size(), is(37));
+        assertThat(postSnapshotFiles.size(), is(preSnapshotFiles.size() +1));
         postSnapshotFiles.removeAll(preSnapshotFiles);
         List<Path> snapshotPathList = postSnapshotFiles.stream().collect(Collectors.toList());
 
@@ -158,7 +156,39 @@ public class DataSnapshotTest extends AAISetup {
         List<String> fileContents = Files.readAllLines(snapshotPathList.get(0));
         assertThat(fileContents.get(0), containsString("id"));
     }
-    
+
+    @Test
+    public void testTakeKryoSnapshotAndItShouldCreateASnapshotFileWithOneVertex() throws IOException, InterruptedException {
+
+        String logsFolder = System.getProperty("AJSC_HOME") + "/logs/data/dataSnapshots/";
+
+        Set<Path> preSnapshotFiles = Files.walk(Paths.get(logsFolder)).collect(Collectors.toSet());
+
+        // previous test may have the same generated file name, this wait will ensure a new name is used for this test
+        System.out.println("delay generation, seconds " + DELAYSINGLETHREADTEST);
+        Thread.sleep(DELAYSINGLETHREADTEST*100);
+        // Run the clear dataSnapshot and this time it should fail
+        //String [] args = {"JUST_TAKE_SNAPSHOT"};  >> default behavior is now to use 15 threads
+        // To just get one file, you have to tell it to just use one.
+        String [] args = {"-c","THREADED_SNAPSHOT", "-threadCount" ,"1", "-snapshotType" , "gryo"};
+
+        DataSnapshot.main(args);
+
+        // Add sleep so the file actually gets created with the data
+
+        Set<Path> postSnapshotFiles = Files.walk(Paths.get(logsFolder)).collect(Collectors.toSet());
+
+        assertThat(postSnapshotFiles.size(), is(preSnapshotFiles.size() +1));
+        boolean gryoSnapshotExists = postSnapshotFiles.stream()
+            .map(Path::toString)
+            .anyMatch(name -> name.endsWith("gryo"));
+        assertTrue(gryoSnapshotExists);
+        postSnapshotFiles.removeAll(preSnapshotFiles);
+        List<Path> snapshotPathList = postSnapshotFiles.stream().collect(Collectors.toList());
+
+        assertThat(snapshotPathList.size(), is(1));
+    }
+
 
     @Test
     public void testTakeSnapshotMultiAndItShouldCreateMultipleSnapshotFiles() throws IOException {
@@ -180,27 +210,27 @@ public class DataSnapshotTest extends AAISetup {
         long totalVerts = 5000;
         int threadCt = 15;
         long maxNodesPerFile = 120000;
-        
-        int fileCt = DataSnapshot.figureOutFileCount( totalVerts, threadCt, 
+
+        int fileCt = DataSnapshot.figureOutFileCount( totalVerts, threadCt,
                        maxNodesPerFile );
         assertThat( fileCt, is(15));
-               
+
         totalVerts = 5000;
         threadCt = 15;
         maxNodesPerFile = 100;
-        fileCt = DataSnapshot.figureOutFileCount( totalVerts, threadCt, 
+        fileCt = DataSnapshot.figureOutFileCount( totalVerts, threadCt,
                        maxNodesPerFile );
         assertThat( fileCt, is(60));
-        
+
         totalVerts = 1500;
         threadCt = 15;
         maxNodesPerFile = 100;
-        fileCt = DataSnapshot.figureOutFileCount( totalVerts, threadCt, 
+        fileCt = DataSnapshot.figureOutFileCount( totalVerts, threadCt,
                        maxNodesPerFile );
-        assertThat( fileCt, is(15));       
-        
+        assertThat( fileCt, is(15));
+
     }
-    
+
     @Test
     public void testTakeSnapshotMultiWithDebugAndItShouldCreateMultipleSnapshotFiles() throws IOException {
 
@@ -224,7 +254,7 @@ public class DataSnapshotTest extends AAISetup {
 
         // Run the clear dataSnapshot and this time it should fail
         String [] args = {"-c","THREADED_SNAPSHOT", "-threadCount","foo","-debugFlag", "DEBUG"};
-        
+
         DataSnapshot.main(args);
 
         // For this test if there is only one vertex in the graph, not sure if it will create multiple files