Update aai-common to 1.14.0 in graphadmin
[aai/graphadmin.git] / src / main / java / org / onap / aai / datasnapshot / DataSnapshot4HistInit.java
index 8d250d7..43c31e1 100644 (file)
@@ -40,10 +40,12 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import org.onap.aai.config.PropertyPasswordConfiguration;
+import org.onap.aai.restclient.PropertyPasswordConfiguration;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.tinkerpop.gremlin.structure.io.IoCore;
+import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONIo;
+import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONVersion;
 import org.janusgraph.core.JanusGraph;
 import org.janusgraph.core.JanusGraphFactory;
 import org.janusgraph.core.util.JanusGraphCleanup;
@@ -75,17 +77,17 @@ import com.beust.jcommander.ParameterException;
 
 public class DataSnapshot4HistInit {
 
-       private static Logger LOGGER;
-       
+       private static Logger LOGGER = LoggerFactory.getLogger(DataSnapshot4HistInit.class);
+
        /* Using realtime d */
        private static final String REALTIME_DB = "realtime";
 
        private static final Set<String> SNAPSHOT_RELOAD_COMMANDS = new HashSet<>();
 
        private static final String MIGRATION_PROCESS_NAME = "migration";
-       
+
        private static boolean historyEnabled;
-       
+
        private LoaderFactory loaderFactory;
        private SchemaVersions schemaVersions;
 
@@ -93,14 +95,14 @@ public class DataSnapshot4HistInit {
                SNAPSHOT_RELOAD_COMMANDS.add("RELOAD_DATA");
                SNAPSHOT_RELOAD_COMMANDS.add("RELOAD_DATA_MULTI");
        }
-       
+
        private CommandLineArgs cArgs;
-       
+
        public DataSnapshot4HistInit(LoaderFactory loaderFactory, SchemaVersions schemaVersions){
                this.loaderFactory  = loaderFactory;
                this.schemaVersions = schemaVersions;
        }
-       
+
        /**
         * The main method.
         *
@@ -108,7 +110,14 @@ public class DataSnapshot4HistInit {
         *            the arguments
         */
        public static void main(String[] args) {
-               
+
+               // Set the logging file properties to be used by EELFManager
+               System.setProperty("aai.service.name", DataSnapshot4HistInit.class.getSimpleName());
+               Properties props = System.getProperties();
+               props.setProperty(Configuration.PROPERTY_LOGGING_FILE_NAME, AAIConstants.AAI_LOGBACK_PROPS);
+               props.setProperty(Configuration.PROPERTY_LOGGING_FILE_PATH, AAIConstants.AAI_HOME_BUNDLECONFIG);
+
+
                AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
                PropertyPasswordConfiguration initializer = new PropertyPasswordConfiguration();
                initializer.initialize(ctx);
@@ -123,20 +132,20 @@ public class DataSnapshot4HistInit {
                        ErrorLogHelper.logException(ae);
                        AAISystemExitUtil.systemExitCloseAAIGraph(1);
                }
-               
+
                historyEnabled = Boolean.parseBoolean(ctx.getEnvironment().getProperty("history.enabled","false"));
                if( !historyEnabled ) {
                        String emsg = "Error - DataSnapshot4HistInit may only be used when history.enabled=true. ";
                        System.out.println(emsg);
                        AAIException ae = new AAIException("AAI_6128", emsg);
-                       ErrorLogHelper.logException(ae);                        
+                       ErrorLogHelper.logException(ae);
                        AAISystemExitUtil.systemExitCloseAAIGraph(1);
                }
-               
+
                LoaderFactory loaderFactory = ctx.getBean(LoaderFactory.class);
                SchemaVersions schemaVersions = (SchemaVersions) ctx.getBean("schemaVersions");
                DataSnapshot4HistInit dataSnapshotHI = new DataSnapshot4HistInit(loaderFactory, schemaVersions);
-               
+
                boolean success = dataSnapshotHI.executeCommand(args);
                if(success){
                        AAISystemExitUtil.systemExitCloseAAIGraph(0);
@@ -148,21 +157,14 @@ public class DataSnapshot4HistInit {
 
 
        public boolean executeCommand(String[] args) {
-               
-               // Set the logging file properties to be used by EELFManager
-               System.setProperty("aai.service.name", DataSnapshot4HistInit.class.getSimpleName());
-               Properties props = System.getProperties();
-               props.setProperty(Configuration.PROPERTY_LOGGING_FILE_NAME, AAIConstants.AAI_LOGBACK_PROPS);
-               props.setProperty(Configuration.PROPERTY_LOGGING_FILE_PATH, AAIConstants.AAI_HOME_BUNDLECONFIG);
-               LOGGER = LoggerFactory.getLogger(DataSnapshot4HistInit.class);
-               
+
                Boolean dbClearFlag = false;
                JanusGraph graph = null;
-               String command = "UNKNOWN"; 
+               String command = "UNKNOWN";
                String oldSnapshotFileName = "";
                boolean success = true;
-                               
-               cArgs = new CommandLineArgs();          
+
+               cArgs = new CommandLineArgs();
                String itemName = "aai.datasnapshot.threads.for.create";
                try {
                        String val = AAIConfig.get(itemName);
@@ -184,7 +186,7 @@ public class DataSnapshot4HistInit {
                        LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
                }
                long maxNodesPerFile4Create = cArgs.maxNodesPerFile;
-                               
+
                cArgs.snapshotType = "graphson";
                Long vertAddDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_VERTEX_ADD_DELAY_MS;
                itemName = "aai.datasnapshot.vertex.add.delay.ms";
@@ -196,7 +198,7 @@ public class DataSnapshot4HistInit {
                }catch ( Exception e ){
                        LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
                }
-               
+
                Long edgeAddDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_EDGE_ADD_DELAY_MS;
                itemName = "aai.datasnapshot.edge.add.delay.ms";
                try {
@@ -207,7 +209,7 @@ public class DataSnapshot4HistInit {
                }catch ( Exception e ){
                        LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
                }
-               
+
                Long failureDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_FAILURE_DELAY_MS;
                itemName = "aai.datasnapshot.failure.delay.ms";
                try {
@@ -218,7 +220,7 @@ public class DataSnapshot4HistInit {
                }catch ( Exception e ){
                        LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
                }
-               
+
                Long retryDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_RETRY_DELAY_MS;
                itemName = "aai.datasnapshot.retry.delay.ms";
                try {
@@ -229,7 +231,7 @@ public class DataSnapshot4HistInit {
                }catch ( Exception e ){
                        LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
                }
-               
+
                int maxErrorsPerThread = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_MAX_ERRORS_PER_THREAD;
                itemName = "aai.datasnapshot.max.errors.per.thread";
                try {
@@ -240,7 +242,7 @@ public class DataSnapshot4HistInit {
                }catch ( Exception e ){
                        LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
                }
-               
+
                Long vertToEdgeProcDelay = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_VERTEX_TO_EDGE_PROC_DELAY_MS;
                itemName = "aai.datasnapshot.vertex.to.edge.proc.delay.ms";
                try {
@@ -251,7 +253,7 @@ public class DataSnapshot4HistInit {
                }catch ( Exception e ){
                        LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
                }
-               
+
                itemName = "aai.datasnapshot.stagger.thread.delay.ms";
                try {
                        String val = AAIConfig.get(itemName);
@@ -260,8 +262,8 @@ public class DataSnapshot4HistInit {
                        }
                }catch ( Exception e ){
                        LOGGER.warn("WARNING - could not get [" + itemName + "] value from aaiconfig.properties file. " + e.getMessage());
-               }               
-       
+               }
+
                long debugAddDelayTime = 1;  // Default to 1 millisecond
                Boolean debug4Create = false;  // By default we do not use debugging for snapshot creation
                JCommander jCommander;
@@ -270,15 +272,15 @@ public class DataSnapshot4HistInit {
                        jCommander.setProgramName(DataSnapshot4HistInit.class.getSimpleName());
                } catch (ParameterException e1) {
                        AAIException ae = new AAIException("AAI_6128", e1 , "Error - invalid value passed to list of args - "+args);
-                       ErrorLogHelper.logException(ae);                        
+                       ErrorLogHelper.logException(ae);
                        AAISystemExitUtil.systemExitCloseAAIGraph(1);
                }
-               
-                               
+
+
                if (args.length >= 1) {
                        command = cArgs.command;
                }
-               
+
                String source = cArgs.caller;
                String snapshotType = "graphson";
                if( SNAPSHOT_RELOAD_COMMANDS.contains(cArgs.command)){
@@ -306,7 +308,7 @@ public class DataSnapshot4HistInit {
                                        AAISystemExitUtil.systemExitCloseAAIGraph(1);
                                }
                                LOGGER.debug(" Will do Threaded Snapshot with threadCount = " + threadCount4Create );
-                               
+
                                if( maxNodesPerFile4Create < 1000 || maxNodesPerFile4Create > 1000000 ){
                                        ErrorLogHelper.logError("AAI_6128", "Out of range (1000-1000000) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]");
                                        LOGGER.debug("Out of range (1000-1000000) maxNodesPerFile passed to DataSnapshot [" + cArgs.maxNodesPerFile + "]");
@@ -314,7 +316,7 @@ public class DataSnapshot4HistInit {
                                        AAISystemExitUtil.systemExitCloseAAIGraph(1);
                                }
                                LOGGER.debug(" Will do Threaded Snapshot with maxNodesPerFile = " + maxNodesPerFile4Create );
-                               
+
                                // If doing a "threaded" snapshot, they need to specify how many threads to use
                                // They can also use debug mode if they pass the word "DEBUG" to do the nodes one at a time to see where it breaks.
                                if( cArgs.debugFlag.equals("DEBUG") ){
@@ -322,7 +324,7 @@ public class DataSnapshot4HistInit {
                                }
                                LOGGER.debug(" Will do Threaded Snapshot with threadCount = " + threadCount4Create +
                                                ", and DEBUG-flag set to: " + debug4Create );
-                               
+
                                if (debug4Create) {
                                        // If doing a "threaded" snapshot, they need to specify how many threads to use (param 1)
                                        // They can also use debug mode if they pass the word "DEBUG" to do the nodes one (param 2)
@@ -384,9 +386,9 @@ public class DataSnapshot4HistInit {
                        }
                }
                long scriptStartTime = System.currentTimeMillis();
-               
-               threadCount4Create = cArgs.threadCount; 
-               
+
+               threadCount4Create = cArgs.threadCount;
+
                //Print Defaults
                LOGGER.debug("DataSnapshot4HistInit command is [" + cArgs.command + "]");
                LOGGER.debug("File name to reload snapshot [" + cArgs.oldFileName + "]");
@@ -401,7 +403,7 @@ public class DataSnapshot4HistInit {
                LOGGER.debug("VertToEdgeProcDelay is [" + cArgs.vertToEdgeProcDelay + "]");
                LOGGER.debug("StaggerThreadDelay is [" + cArgs.staggerThreadDelay + "]");
                LOGGER.debug("Caller process is ["+ cArgs.caller + "]");
-               
+
                //Print non-default values
                if (!AAIConfig.isEmpty(cArgs.fileName)){
                        LOGGER.debug("Snapshot file name (if not default) to use  is [" + cArgs.fileName + "]");
@@ -412,7 +414,7 @@ public class DataSnapshot4HistInit {
                if (!AAIConfig.isEmpty(cArgs.oldFileDir)){
                        LOGGER.debug("Directory path (if not default) to load the old snapshot file from is [" + cArgs.oldFileDir + "]");
                }
-               
+
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                try {
                        AAIConfig.init();
@@ -423,7 +425,14 @@ public class DataSnapshot4HistInit {
                        new File(targetDir).mkdirs();
 
                        LOGGER.debug("    ---- NOTE --- about to open graph (takes a little while) ");
-                       
+
+                       String graphsonVersionArg = cArgs.graphsonVersion;
+                       if (graphsonVersionArg != GraphSONVersion.V1_0.getVersion() && graphsonVersionArg != GraphSONVersion.V2_0.getVersion() && graphsonVersionArg != GraphSONVersion.V3_0.getVersion()) {
+                               LOGGER.warn("The graphsonVersion argument was not in the correct format, defaulting to V1. Argument value should be either one of [{}|{}|{}]", GraphSONVersion.V1_0,GraphSONVersion.V2_0,GraphSONVersion.V3_0);
+                               graphsonVersionArg = GraphSONVersion.V1_0.getVersion();
+                       }
+                       GraphSONVersion graphsonVersion = GraphSONVersion.valueOf("V" + graphsonVersionArg.replace(".", "_"));
+
                        if ( (command.equals("THREADED_SNAPSHOT") || command.equals("JUST_TAKE_SNAPSHOT"))
                                        && threadCount4Create == 1 ){
                                // -------------------------------------------------------------------------------
@@ -439,7 +448,7 @@ public class DataSnapshot4HistInit {
                                String newSnapshotOutFname = null;
                                long timeA = System.nanoTime();
                                newSnapshotOutFname = targetDir + AAIConstants.AAI_FILESEP + "dataSnapshot.graphSON." + dteStr;
-                               graph.io(IoCore.graphson()).writeGraph(newSnapshotOutFname);
+                               graph.io(GraphSONIo.build(graphsonVersion)).writeGraph(newSnapshotOutFname);
                                LOGGER.debug("Snapshot written to " + newSnapshotOutFname);
                                long timeB = System.nanoTime();
                                long diffTime =  timeB - timeA;
@@ -447,12 +456,12 @@ public class DataSnapshot4HistInit {
                                long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount);
                                LOGGER.debug("    -- Single-Thread dataSnapshot took: " +
                                                minCount + " minutes, " + secCount + " seconds " );
-       
-                       }       
-                       else if ( (command.equals("THREADED_SNAPSHOT") || command.equals("JUST_TAKE_SNAPSHOT")) 
+
+                       }
+                       else if ( (command.equals("THREADED_SNAPSHOT") || command.equals("JUST_TAKE_SNAPSHOT"))
                                        && threadCount4Create > 1 ){
                                        // ------------------------------------------------------------
-                                       // They want the creation of the snapshot to be spread out via 
+                                       // They want the creation of the snapshot to be spread out via
                                        //    threads and go to multiple files
                                        // ------------------------------------------------------------
                                        LOGGER.debug(" Command = " + command );
@@ -479,12 +488,12 @@ public class DataSnapshot4HistInit {
                                        long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount);
                                        LOGGER.debug("    -- To count all vertices in DB it took: " +
                                                        minCount + " minutes, " + secCount + " seconds " );
-                                       
-                                       int fileCount4Create = figureOutFileCount( totalVertCount, threadCount4Create, 
+
+                                       int fileCount4Create = figureOutFileCount( totalVertCount, threadCount4Create,
                                                        maxNodesPerFile4Create );
-                                       int threadPassesNeeded = (int) Math.ceil((double)fileCount4Create / (double)threadCount4Create);        
-                                       long nodesPerFile = (long) Math.ceil((double)totalVertCount / (double)fileCount4Create);  
-                                       
+                                       int threadPassesNeeded = (int) Math.ceil((double)fileCount4Create / (double)threadCount4Create);
+                                       long nodesPerFile = (long) Math.ceil((double)totalVertCount / (double)fileCount4Create);
+
                                        LOGGER.debug(" We will run this many simultaneous threads: " + threadCount4Create );
                                        LOGGER.debug(" Required number of passes: " + threadPassesNeeded );
                                        LOGGER.debug(" Max Nodes per file: " + maxNodesPerFile4Create );
@@ -498,8 +507,8 @@ public class DataSnapshot4HistInit {
                                                String tk = "" + t;
                                                vertIdListHash.put( tk, vIdList);
                                        }
-                                                               
-                                       int currentTNum = 0; 
+
+                                       int currentTNum = 0;
                                        String currentTKey = "0";
                                        long thisThrIndex = 0;
                                        Iterator <Vertex> vtxItr = graph.vertices();  // Getting ALL vertices!
@@ -515,11 +524,11 @@ public class DataSnapshot4HistInit {
                                                long vid = (long)(vtxItr.next()).id();
                                                (vertIdListHash.get(currentTKey)).add(vid);
                                        }
-                               
+
                                        // close this graph instance thing here since we have all the ids
                                        graph.tx().rollback();
                                        graph.tx().close();
-                                       
+
                                        long timeB = System.nanoTime();
                                        diffTime =  timeB - timeA2;
                                        minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
@@ -530,9 +539,9 @@ public class DataSnapshot4HistInit {
                                        // Need to print out each set of vertices using it's own thread
                                        // NOTE - we may have more files to generate than number of threads - which
                                        //    just means that ALL the files won't necessarily be generated in parallel.
-                                       
+
                                        int fileNo = 0;
-                                       for( int passNo = 1; passNo <= threadPassesNeeded; passNo++ ){          
+                                       for( int passNo = 1; passNo <= threadPassesNeeded; passNo++ ){
                                                ArrayList <Thread> threadArr = new ArrayList <Thread> ();
                                                // For each Pass, kick off all the threads and wait until they finish
                                                long timeP1 = System.nanoTime();
@@ -540,14 +549,14 @@ public class DataSnapshot4HistInit {
                                                        String fileNoStr = "" + fileNo;
                                                        String subFName = newSnapshotOutFname + ".P" + fileNoStr;
                                                        LOGGER.debug(" DEBUG >>> kick off pass # " + passNo + ", thread # " + thNum);
-                                                       Thread thr = new Thread(new PrintVertexDetails(graph, subFName, 
+                                                       Thread thr = new Thread(new PrintVertexDetails(graph, subFName,
                                                                        vertIdListHash.get(fileNoStr),
-                                                                       debug4Create, debugAddDelayTime, 
+                                                                       debug4Create, debugAddDelayTime,
                                                                        snapshotType, LOGGER) );
                                                        thr.start();
                                                        threadArr.add(thr);
                                                        fileNo++;
-                                               }                                       
+                                               }
                                                // Make sure all the threads finish before considering this Pass finished.
                                                for( int thNum = 0; thNum < threadCount4Create; thNum++ ){
                                                        if( null != threadArr.get(thNum) ){
@@ -561,7 +570,7 @@ public class DataSnapshot4HistInit {
                                                LOGGER.debug(" Pass number " + passNo + " (out of " + threadPassesNeeded +
                                                                ") took " + minCount + " minutes, " + secCount + " seconds ");
                                        }
-                                       
+
                                        long timeC = System.nanoTime();
                                        diffTime =  timeC - timeB;
                                        minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
@@ -569,26 +578,32 @@ public class DataSnapshot4HistInit {
                                        LOGGER.debug("   -- To write all the data out to snapshot files, it took: " +
                                                        minCount + " minutes, " + secCount + " seconds " );
 
-                                       
+
                        } else if( command.equals("MULTITHREAD_RELOAD") ){
                                // ---------------------------------------------------------------------
                                // They want the RELOAD of the snapshot to be spread out via threads
                                // NOTE - it will only use as many threads as the number of files the
                                //    snapshot is  written to.  Ie. if you have a single-file snapshot,
                                //    then this will be single-threaded.
+                               // If the number of files is greater than the 'threadCount' parameter,
+                               //    then we will use more than one pass to keep the number of simultaneous
+                               //    threads below the threadCount param.
                                //
                                LOGGER.debug(" Command = " + command );
-                               if (cArgs.oldFileDir != null && cArgs.oldFileDir != ""){
+
+                               if (cArgs.oldFileDir != null && !cArgs.oldFileDir.isEmpty()){
                                        targetDir = cArgs.oldFileDir;
                                }
                                ArrayList <File> snapFilesArr = getFilesToProcess(targetDir, oldSnapshotFileName, false);
                                int fCount = snapFilesArr.size();
+                               int threadPassesNeeded = (int) Math.ceil((double)fCount / (double)threadCount4Create);
+                               int filesPerPass = (int) Math.ceil((double)fCount / (double)threadPassesNeeded);
+
                                JanusGraph graph1 = AAIGraph.getInstance().getGraph();
-                               GraphAdminDBUtils.logConfigs(graph1.configuration());
                                long timeStart = System.nanoTime();
-                               HashMap <String,String> old2NewVertIdMap = new <String,String> HashMap ();
-                               HashMap <String,ArrayList<String>> nodeKeyNames = new <String,ArrayList<String>> HashMap ();
-                       
+                               GraphAdminDBUtils.logConfigs(graph1.configuration());
+                               HashMap <String,String> old2NewVertIdMap = new HashMap <String,String>  ();
+                               HashMap <String,ArrayList<String>> nodeKeyNames = new  HashMap <String,ArrayList<String>> ();
                                try {
                                        LOGGER.debug("call getNodeKeyNames ()" );
                                        nodeKeyNames = getNodeKeyNames();
@@ -597,142 +612,162 @@ public class DataSnapshot4HistInit {
                                        ErrorLogHelper.logException(ae);
                                        AAISystemExitUtil.systemExitCloseAAIGraph(1);
                                }
-                               
-                                       // We're going to try loading in the vertices - without edges or properties
-                                       //    using Separate threads
-
-                                       ExecutorService executor = Executors.newFixedThreadPool(fCount);
-                                       List<Future<HashMap<String,String>>> list = new ArrayList<Future<HashMap<String,String>>>();
-                                       for( int i=0; i < fCount; i++ ){
-                                               File f = snapFilesArr.get(i);
+
+                               ExecutorService executor = Executors.newFixedThreadPool(fCount);
+                               int threadFailCount = 0;
+
+                               LOGGER.debug(" -- vertAddDelayMs = " + vertAddDelayMs
+                                               + ", failureDelayMs = " + failureDelayMs + ", retryDelayMs = " + retryDelayMs
+                                               + ", maxErrorsPerThread = " + maxErrorsPerThread );
+
+                               // --------------------------------------
+                               // Step 1 -- Load empty vertices
+                               // --------------------------------------
+                               int fileNo = 0;
+                               for( int passNo = 1; passNo <= threadPassesNeeded; passNo++ ){
+                                       List<Future<HashMap<String,String>>> listFutV = new ArrayList<Future<HashMap<String,String>>>();
+
+                                       int thisPassCount = 0;
+                                       while( (thisPassCount < filesPerPass) && (fileNo < fCount) ){
+                                               File f = snapFilesArr.get(fileNo);
                                                String fname = f.getName();
                                                String fullSnapName = targetDir + AAIConstants.AAI_FILESEP + fname;
                                                Thread.sleep(cArgs.staggerThreadDelay);  // Stagger the threads a bit
                                                LOGGER.debug(" -- Read file: [" + fullSnapName + "]");
-                                               LOGGER.debug(" -- Call the PartialVertexLoader to just load vertices  ----");
-                                               LOGGER.debug(" -- vertAddDelayMs = " + vertAddDelayMs
-                                                               + ", failureDelayMs = " + failureDelayMs + ", retryDelayMs = " + retryDelayMs
-                                                               + ", maxErrorsPerThread = " + maxErrorsPerThread );
                                                Callable <HashMap<String,String>> vLoader = new PartialVertexLoader(graph1, fullSnapName,
                                                                vertAddDelayMs, failureDelayMs, retryDelayMs, maxErrorsPerThread, LOGGER);
                                                Future <HashMap<String,String>> future = (Future<HashMap<String, String>>) executor.submit(vLoader);
 
-                                               // add Future to the list, we can get return value using Future
-                                               list.add(future);
-                                               LOGGER.debug(" --  Starting PartialDbLoad VERT_ONLY thread # "+ i );
+                                               // add future to the list, we can get return value later
+                                               listFutV.add(future);
+                                               LOGGER.debug(" --  Starting PartialDbLoad VERT_ONLY file # "+ fileNo
+                                                               + "( passNo = " + passNo + ", passIndex = " + thisPassCount + ")");
+
+                                               thisPassCount++;
+                                               fileNo++;
                                        }
 
                                        int threadCount4Reload = 0;
-                                       int threadFailCount = 0;
-                                       for(Future<HashMap<String,String>> fut : list){
-                               threadCount4Reload++;
-                               try {
-                                       old2NewVertIdMap.putAll(fut.get());
-                                       LOGGER.debug(" -- back from PartialVertexLoader.  returned thread # " + threadCount4Reload +
-                                                       ", current size of old2NewVertMap is: " + old2NewVertIdMap.size() );
-                               }
-                               catch (InterruptedException e) {
-                                       threadFailCount++;
-                                       AAIException ae = new AAIException("AAI_6128", e , "InterruptedException");
-                                               ErrorLogHelper.logException(ae);
-                                       }
-                               catch (ExecutionException e) {
-                                       threadFailCount++;
-                                       AAIException ae = new AAIException("AAI_6128", e , "ExecutionException");
-                                               ErrorLogHelper.logException(ae);
-                               }
-                           }
-                                       executor.shutdown();
-
-                                       if( threadFailCount > 0 ) {
-                                               String emsg = " FAILURE >> " + threadFailCount + " Vertex-loader thread(s) failed to complete successfully.  ";
-                                               LOGGER.debug(emsg);
-                                               throw new Exception( emsg );
+                                       for(Future<HashMap<String,String>> fut : listFutV){
+                                               threadCount4Reload++;
+                                               try {
+                                                       old2NewVertIdMap.putAll(fut.get());
+                                                       LOGGER.debug(" -- back from PartialVertexLoader.  returned pass # "
+                                                                       + passNo + ", thread # "
+                                                                       + threadCount4Reload +
+                                                                       ", current size of old2NewVertMap is: " + old2NewVertIdMap.size() );
+                                               }
+                                               catch (InterruptedException e) {
+                                                       threadFailCount++;
+                                                       AAIException ae = new AAIException("AAI_6128", e , "InterruptedException");
+                                                       ErrorLogHelper.logException(ae);
+                                               }
+                                               catch (ExecutionException e) {
+                                                       threadFailCount++;
+                                                       AAIException ae = new AAIException("AAI_6128", e , "ExecutionException");
+                                                       ErrorLogHelper.logException(ae);
+                                               }
                                        }
+                               } // end of passes for loading empty vertices
 
-                                       long timeX = System.nanoTime();
-                                       long diffTime =  timeX - timeStart;
-                                       long minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
-                                       long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount);
-                                       LOGGER.debug("   -- To reload just the vertex ids from the snapshot files, it took: " +
-                                                       minCount + " minutes, " + secCount + " seconds " );
+                               executor.shutdown();
+                               if( threadFailCount > 0 ) {
+                                       String emsg = " FAILURE >> " + threadFailCount + " Vertex-loader thread(s) failed to complete successfully.  ";
+                                       LOGGER.debug(emsg);
+                                       throw new Exception( emsg );
+                               }
+
+                               long timeX = System.nanoTime();
+                               long diffTime =  timeX - timeStart;
+                               long minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
+                               long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount);
+                               LOGGER.debug("   -- To reload just the vertex ids from the snapshot files, it took: " +
+                                               minCount + " minutes, " + secCount + " seconds " );
 
-                                       // Give the DB a little time to chew on all those vertices
-                                       Thread.sleep(vertToEdgeProcDelay);
-
-                                       // ----------------------------------------------------------------------------------------
-                                       LOGGER.debug("\n\n\n  -- Now do the edges/props ----------------------");
-                                       // ----------------------------------------------------------------------------------------
-                                       
-                                       // We're going to try loading in the edges and missing properties
-                                       // Note - we're passing the whole oldVid2newVid mapping to the PartialPropAndEdgeLoader
-                                       //     so that the String-updates to the GraphSON will happen in the threads instead of
-                                       //     here in the un-threaded calling method.
-                                       executor = Executors.newFixedThreadPool(fCount);
-                                       ArrayList<Future<ArrayList<String>>> listEdg = new ArrayList<Future<ArrayList<String>>>();
-                                       for( int i=0; i < fCount; i++ ){
-                                               File f = snapFilesArr.get(i);
+                               // Give the DB a little time to chew on all those new vertices
+                               Thread.sleep(vertToEdgeProcDelay);
+
+
+                               // -------------------------------------------------------------
+                               // Step 2 -- Load Edges and properties onto the empty vertices
+                               // -------------------------------------------------------------
+                               LOGGER.debug("\n\n\n  -- Now load the edges/properties ----------------------");
+                               executor = Executors.newFixedThreadPool(fCount);
+
+                               fileNo = 0;
+                               for( int passNo = 1; passNo <= threadPassesNeeded; passNo++ ){
+                                       ArrayList<Future<ArrayList<String>>> listFutEdg = new ArrayList<Future<ArrayList<String>>>();
+
+                                       int thisPassCount = 0;
+                                       while( (thisPassCount < filesPerPass) && (fileNo < fCount) ){
+                                               File f = snapFilesArr.get(fileNo);
                                                String fname = f.getName();
                                                String fullSnapName = targetDir + AAIConstants.AAI_FILESEP + fname;
                                                Thread.sleep(cArgs.staggerThreadDelay);  // Stagger the threads a bit
                                                LOGGER.debug(" -- Read file: [" + fullSnapName + "]");
-                                               LOGGER.debug(" -- Call the PartialPropAndEdgeLoader4HistInit for Properties and EDGEs  ----");
-                                               LOGGER.debug(" -- edgeAddDelayMs = " + vertAddDelayMs
-                                                               + ", failureDelayMs = " + failureDelayMs + ", retryDelayMs = " + retryDelayMs
-                                                               + ", maxErrorsPerThread = " + maxErrorsPerThread );
-
-                                               
-                                               Callable  eLoader = new PartialPropAndEdgeLoader4HistInit(graph1, fullSnapName,
+                                               Callable eLoader = new PartialPropAndEdgeLoader4HistInit(graph1, fullSnapName,
                                                                edgeAddDelayMs, failureDelayMs, retryDelayMs,
                                                                old2NewVertIdMap, maxErrorsPerThread, LOGGER,
                                                                scriptStartTime, nodeKeyNames);
+
                                                Future <ArrayList<String>> future = (Future<ArrayList<String>>) executor.submit(eLoader);
 
-                                               //add Future to the list, we can get return value using Future
-                                               listEdg.add(future);
-                                               LOGGER.debug(" --  Starting PartialPropAndEdge thread # "+ i );
+                                               // add future to the list, we can wait for it below
+                                               listFutEdg.add(future);
+                                               LOGGER.debug(" --  Starting PartialPropAndEdgeLoader4HistInit file # "
+                                                               + fileNo + " (pass # " + passNo + ", passIndex "
+                                                               + thisPassCount + ")" );
+
+                                               thisPassCount++;
+                                               fileNo++;
                                        }
-                                       threadCount4Reload = 0;
-                                       for(Future<ArrayList<String>> fut : listEdg){
-                                   threadCount4Reload++;
-                                   try{
-                                       fut.get();  // DEBUG -- should be doing something with the return value if it's not empty - ie. errors
-                                       LOGGER.debug(" -- back from PartialPropAndEdgeLoader.  thread # " + threadCount4Reload  );
-                                   }
+
+                                       int threadCount4Reload = 0;
+                                       for( Future<ArrayList<String>> fut : listFutEdg ){
+                                               threadCount4Reload++;
+                                               try{
+                                                       fut.get();
+                                                       LOGGER.debug(" -- back from PartialPropAndEdgeLoader4HistInit.  pass # "
+                                                                       + passNo + ", thread # " + threadCount4Reload  );
+                                               }
                                                catch (InterruptedException e) {
                                                        threadFailCount++;
                                                        AAIException ae = new AAIException("AAI_6128", e , "InterruptedException");
-                                               ErrorLogHelper.logException(ae);
+                                                       ErrorLogHelper.logException(ae);
                                                }
                                                catch (ExecutionException e) {
                                                        threadFailCount++;
                                                        AAIException ae = new AAIException("AAI_6128", e , "ExecutionException");
-                                               ErrorLogHelper.logException(ae);
+                                                       ErrorLogHelper.logException(ae);
                                                }
                                        }
 
-                                       executor.shutdown();
-                                       if( threadFailCount > 0 ) {
-                                               String emsg = " FAILURE >> " + threadFailCount + " Property/Edge-loader thread(s) failed to complete successfully.  ";
-                                               LOGGER.debug(emsg);
-                                               throw new Exception( emsg );
-                                       }
+                               } // end of passes for reloading edges and properties
 
-                                       // This is needed so we can see the data committed by the called threads
-                                       graph1.tx().commit();
+                               executor.shutdown();
 
-                                       long timeEnd = System.nanoTime();
-                                       diffTime =  timeEnd - timeX;
-                                       minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
-                                       secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount);
-                                       LOGGER.debug("   -- To reload the edges and properties from snapshot files, it took: " +
-                                                       minCount + " minutes, " + secCount + " seconds " );
+                               if( threadFailCount > 0 ) {
+                                       String emsg = " FAILURE >> " + threadFailCount + " Property/Edge-loader thread(s) failed to complete successfully.  ";
+                                       LOGGER.debug(emsg);
+                                       throw new Exception( emsg );
+                               }
+
+                               // This is needed so we can see the data committed by the called threads
+                               graph1.tx().commit();
+
+                               long timeEnd = System.nanoTime();
+                               diffTime =  timeEnd - timeX;
+                               minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
+                               secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount);
+                               LOGGER.debug("   -- To reload the edges and properties from snapshot files, it took: " +
+                                               minCount + " minutes, " + secCount + " seconds " );
+
+                               long totalDiffTime =  timeEnd - timeStart;
+                               long totalMinCount = TimeUnit.NANOSECONDS.toMinutes(totalDiffTime);
+                               long totalSecCount = TimeUnit.NANOSECONDS.toSeconds(totalDiffTime) - (60 * totalMinCount);
+                               LOGGER.debug("   -- TOTAL multi-threaded reload time: " +
+                                               totalMinCount + " minutes, " + totalSecCount + " seconds " );
 
-                                       long totalDiffTime =  timeEnd - timeStart;
-                                       long totalMinCount = TimeUnit.NANOSECONDS.toMinutes(totalDiffTime);
-                                       long totalSecCount = TimeUnit.NANOSECONDS.toSeconds(totalDiffTime) - (60 * totalMinCount);
-                                       LOGGER.debug("   -- TOTAL multi-threaded reload time: " +
-                                                       totalMinCount + " minutes, " + totalSecCount + " seconds " );
                        } else if (command.equals("CLEAR_ENTIRE_DATABASE")) {
                                // ------------------------------------------------------------------
                                // They are calling this to clear the db before re-loading it
@@ -760,10 +795,10 @@ public class DataSnapshot4HistInit {
                                String rtConfig = AAIConstants.REALTIME_DB_CONFIG;
                                String serviceName = System.getProperty("aai.service.name", DataSnapshot4HistInit.class.getSimpleName());
                                LOGGER.debug("Getting new configs for clearig");
-                               
+
                                PropertiesConfiguration propertiesConfiguration = new AAIGraphConfig.Builder(rtConfig).forService(serviceName).withGraphType(REALTIME_DB).buildConfiguration();
                                LOGGER.debug("Open New Janus Graph");
-                               
+
                                JanusGraph janusGraph = JanusGraphFactory.open(propertiesConfiguration);
                                verifyGraph(janusGraph);
                                GraphAdminDBUtils.logConfigs(janusGraph.configuration());
@@ -774,7 +809,7 @@ public class DataSnapshot4HistInit {
                                LOGGER.debug("     reloading data or the data will be put in without indexes. ");
                                dbClearFlag = true;
                                LOGGER.debug("All done clearing DB");
-                               
+
                        } else if (command.equals("RELOAD_DATA")) {
                                // ---------------------------------------------------------------------------
                                // They want to restore the database from either a single file, or a group
@@ -790,7 +825,7 @@ public class DataSnapshot4HistInit {
                                        LOGGER.debug(emsg);
                                        AAISystemExitUtil.systemExitCloseAAIGraph(1);
                                }
-                               
+
                                long timeA = System.nanoTime();
 
                                ArrayList <File> snapFilesArr = new ArrayList <File> ();
@@ -813,15 +848,15 @@ public class DataSnapshot4HistInit {
                                                }
                                        }
                                }
-                               
+
                                if( snapFilesArr.isEmpty() ){
                                        String emsg = "oldSnapshotFile " + onePieceSnapshotFname + "(with or without .P0) could not be found.";
                                        LOGGER.debug(emsg);
                                        AAISystemExitUtil.systemExitCloseAAIGraph(1);
                                }
-                               
+
                                int fCount = snapFilesArr.size();
-                               Vector<InputStream> inputStreamsV = new Vector<>();                  
+                               Vector<InputStream> inputStreamsV = new Vector<>();
                                for( int i = 0; i < fCount; i++ ){
                                        File f = snapFilesArr.get(i);
                                        String fname = f.getName();
@@ -843,10 +878,10 @@ public class DataSnapshot4HistInit {
                            // inputStreams.elements() will return Enumerations
                            InputStream sis = new SequenceInputStream(inputStreamsV.elements());
                            LOGGER.debug("Begin loading data from " + fCount + " files  -----");
-                           if("gryo".equalsIgnoreCase(snapshotType)){
+                         if("gryo".equalsIgnoreCase(snapshotType)){
                                        graph.io(IoCore.gryo()).reader().create().readGraph(sis, graph);
                                } else {
-                                       graph.io(IoCore.graphson()).reader().create().readGraph(sis, graph);
+                                       graph.io(GraphSONIo.build(graphsonVersion)).reader().create().readGraph(sis, graph);
                                }
                                LOGGER.debug("Completed the inputGraph command, now try to commit()... ");
                                graph.tx().commit();
@@ -854,14 +889,14 @@ public class DataSnapshot4HistInit {
 
                                long vCount = graph.traversal().V().count().next();
                                LOGGER.debug("A little after repopulating from an old snapshot, we see: " + vCount + " vertices in the db.");
-                               
+
                                long timeB = System.nanoTime();
                                long diffTime =  timeB - timeA;
                                long minCount = TimeUnit.NANOSECONDS.toMinutes(diffTime);
                                long secCount = TimeUnit.NANOSECONDS.toSeconds(diffTime) - (60 * minCount);
                                LOGGER.debug("    -- To Reload this snapshot, it took: " +
                                                minCount + " minutes, " + secCount + " seconds " );
-                               
+
                                LOGGER.debug("A little after repopulating from an old snapshot, we see: " + vCount + " vertices in the db.");
 
                        } else {
@@ -897,11 +932,11 @@ public class DataSnapshot4HistInit {
 
                return success;
        }
-       
+
     public static HashMap <String,ArrayList<String>> getNodeKeyNames()  {
-       
+
        HashMap <String,ArrayList<String>> keyNameHash = new HashMap <String,ArrayList<String>> ();
-       Loader loader = LoaderUtil.getLatestVersion(); 
+       Loader loader = LoaderUtil.getLatestVersion();
                Set<Entry<String, Introspector>> entrySet = loader.getAllObjects().entrySet();
        // Get a collection of the names of the key properties for each nodeType
                for (Entry<String, Introspector> entry : entrySet) {
@@ -909,12 +944,12 @@ public class DataSnapshot4HistInit {
                        Set <String> keyPropsSet = entry.getValue().getKeys();
                        ArrayList <String> keyProps = new ArrayList <String> ();
                        keyProps.addAll(keyPropsSet);
-                       keyNameHash.put(nType, keyProps);       
+                       keyNameHash.put(nType, keyProps);
                }
                return keyNameHash;
     }
-    
-    
+
+
        private static ArrayList <File> getFilesToProcess(String targetDir, String oldSnapshotFileName, boolean doingClearDb)
                throws Exception {
 
@@ -993,26 +1028,26 @@ public class DataSnapshot4HistInit {
 
        }
 
-       
-       public static int figureOutFileCount( long totalVertCount, int threadCount4Create, 
+
+       public static int figureOutFileCount( long totalVertCount, int threadCount4Create,
                        long maxNodesPerFile ) {
-               
+
                // NOTE - we would always like to use all of our threads.  That is, if
                //   we could process all the data with 16 threads, but our threadCount4Create is
-               //   only 15, we will do two passes and use all 15 threads each pass which will 
+               //   only 15, we will do two passes and use all 15 threads each pass which will
                //   create a total of 30 files.  Each file will be a bit smaller so the overall
                //   time for the two passes should be faster.
                if( totalVertCount <= 0 || threadCount4Create <= 0 || maxNodesPerFile <= 0) {
                        return 1;
                }
-               
-               long maxNodesPerPass = threadCount4Create * maxNodesPerFile;    
-               int numberOfPasses = (int) Math.ceil( (double)totalVertCount / (double)maxNodesPerPass);        
+
+               long maxNodesPerPass = threadCount4Create * maxNodesPerFile;
+               int numberOfPasses = (int) Math.ceil( (double)totalVertCount / (double)maxNodesPerPass);
                int fileCt = threadCount4Create * numberOfPasses;
-               
-               return fileCt;          
+
+               return fileCt;
        }
-       
+
 
        class CommandLineArgs {
 
@@ -1028,9 +1063,12 @@ public class DataSnapshot4HistInit {
                @Parameter(names = "-snapshotType", description = "snapshot type of gryo or graphson")
                public String snapshotType = "graphson";
 
+               @Parameter(names = "-v", description = "Graphson version of the snapshot file. Defaults to 1")
+               public String graphsonVersion = GraphSONVersion.V1_0.getVersion();
+
                @Parameter(names = "-threadCount", description = "thread count for create")
                public int threadCount = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_THREADS_FOR_CREATE;
-                               
+
                @Parameter(names = "-maxNodesPerFile", description = "Max nodes per file")
                public long maxNodesPerFile = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_MAX_NODES_PER_FILE_FOR_CREATE;
 
@@ -1039,13 +1077,13 @@ public class DataSnapshot4HistInit {
 
                @Parameter(names = "-debugAddDelayTime", description = "delay in ms between each Add for debug mode")
                public long debugAddDelayTime = 1L;
-               
+
                @Parameter(names = "-vertAddDelayMs", description = "delay in ms while adding each vertex")
                public long vertAddDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_VERTEX_ADD_DELAY_MS.longValue();
-               
+
                @Parameter(names = "-edgeAddDelayMs", description = "delay in ms while adding each edge")
                public long edgeAddDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_EDGE_ADD_DELAY_MS.longValue();
-               
+
                @Parameter(names = "-failureDelayMs", description = "delay in ms when failure to load vertex or edge in snapshot")
                public long failureDelayMs = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_FAILURE_DELAY_MS.longValue();
 
@@ -1054,25 +1092,25 @@ public class DataSnapshot4HistInit {
 
                @Parameter(names = "-maxErrorsPerThread", description = "max errors allowed per thread")
                public int maxErrorsPerThread = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_MAX_ERRORS_PER_THREAD;
-               
+
                @Parameter(names = "-vertToEdgeProcDelay", description = "vertex to edge processing delay in ms")
                public long vertToEdgeProcDelay = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_VERTEX_TO_EDGE_PROC_DELAY_MS.longValue();
-               
+
                @Parameter(names = "-staggerThreadDelay", description = "thread delay stagger time in ms")
                public long staggerThreadDelay = GraphAdminConstants.AAI_SNAPSHOT_DEFAULT_STAGGER_THREAD_DELAY_MS;
-               
+
                @Parameter(names = "-fileName", description = "file name for generating snapshot ")
                public String fileName = "";
-               
+
                @Parameter(names = "-snapshotDir", description = "file path for generating snapshot ")
                public String snapshotDir = "";
-               
+
                @Parameter(names = "-oldFileDir", description = "directory containing the old snapshot file for reloading")
                public String oldFileDir = "";
-               
+
                @Parameter(names = "-caller", description = "process invoking the dataSnapshot")
                public String caller = "";
-               
+
        }
-       
+
 }