Improve Batch Bulk Load
[aaf/authz.git] / auth / auth-batch / src / main / java / org / onap / aaf / auth / batch / update / Upload.java
index 87bdf27..a3d37a0 100644 (file)
@@ -30,20 +30,18 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.onap.aaf.auth.batch.Batch;
+import org.onap.aaf.auth.batch.helpers.CQLBatch;
+import org.onap.aaf.auth.batch.helpers.CQLBatchLoop;
 import org.onap.aaf.auth.env.AuthzTrans;
 import org.onap.aaf.auth.org.OrganizationException;
 import org.onap.aaf.misc.env.APIException;
 import org.onap.aaf.misc.env.Env;
+import org.onap.aaf.misc.env.LogTarget;
 import org.onap.aaf.misc.env.TimeTaken;
 
-import com.datastax.driver.core.ResultSet;
-
 public class Upload extends Batch {
 
-       private static final int BATCH_LENGTH = 100;
-
-       int count;
-       int batchCnt;
+       private CQLBatchLoop cqlBatch;
 
        // APPROVALS
        private static final String APPR_INS_FMT="  INSERT INTO authz.approval "
@@ -104,7 +102,8 @@ public class Upload extends Batch {
                        } finally {
                                tt.done();
                        }
-
+                       
+                       cqlBatch = new CQLBatchLoop(new CQLBatch(LogTarget.NULL,session),50,dryRun);
            } finally {
                tt0.done();
            }
@@ -114,35 +113,36 @@ public class Upload extends Batch {
        protected void run(AuthzTrans trans) {
                String line;
                StringBuilder sb = new StringBuilder();
-               StringBuilder query = new StringBuilder();
                List<String> array = new ArrayList<String>();
                for(String feed : args()) {
-                       File file = new File(feed + ".dat");
+                       File file;
+                       if(feed.endsWith(".dat")) {
+                               file = new File(feed);
+                               feed = file.getName();
+                               feed = feed.substring(0,feed.length()-4);
+                       } else {
+                               file = new File(feed+".dat");
+                       }
                        TimeTaken tt = trans.start(file.getAbsolutePath(), Env.SUB);
-                       System.out.println("#### Running " + feed + ".dat Feed ####");
+                       String msg = String.format("#### Running %s.dat Feed ####",feed);
+                       trans.info().log(msg);
+                       System.out.println(msg);
+               BufferedReader br = null;
                    try {
-
                                if(file.exists()) {
-                                       count=batchCnt=0;
-                                       boolean justOne = false;
                                        try {
-                                               BufferedReader br = new BufferedReader(new FileReader(file));
+                                               br = new BufferedReader(new FileReader(file));
                                                try {
                                                        while((line=br.readLine())!=null) {
                                                                if(line.length()>5000) {
-                                                                       if(query.length()>0) {
-                                                                               applyBatch(query);
-                                                                               justOne=true;
-                                                                       }
-                                                               }
-                                                               if(query.length()==0) {
-                                                                       query.append("BEGIN BATCH\n");
+                                                                       cqlBatch.flush();
                                                                }
+                                                               cqlBatch.preLoop();
+
                                                                // Split into fields, first turning Escaped values into something we can convert back from
                                                                char c=0;
                                                                boolean inQuote = false;
                                                                int fldcnt = 0;
-                                                               
                                                                for(int i=0;i<line.length();++i) {
                                                                        switch(c=line.charAt(i)) {
                                                                                case '"':
@@ -160,37 +160,27 @@ public class Upload extends Batch {
                                                                        }
                                                                }
                                                                addField(feed,fldcnt,array,sb);
-                                                               query.append(build(feed, array));
-                                                               
-                                                               if((++count % BATCH_LENGTH)==0 || justOne) {
-                                                                       applyBatch(query);
-                                                                       justOne=false;
-                                                               }
-                                                       }
-                                                       if(query.length()>0) {
-                                                               applyBatch(query);
+                                                               cqlBatch.inc().append(build(feed, array));
                                                        }
-                                                       
+                                                       cqlBatch.flush();
+                                               } catch (Exception t) {
+                                                       trans.error().log(t);
                                                } finally {
                                                        br.close();
-                                                       sb.setLength(0);
-                                                       query.setLength(0);
                                                }
-                                               
                                        } catch (IOException e) {
                                                trans.error().log(e);
-                                               e.printStackTrace();
                                        }
-
                                } else {
                                        trans.error().log("No file found: ", file.getAbsolutePath());
                                }
                        } finally {
                                tt.done();
                                System.err.flush();
-                               System.out.printf("\n%d applied in %d batches\n",count,batchCnt);
+                               msg = String.format("\n%d applied in %d batches\n",cqlBatch.total(), cqlBatch.batches());
+                               trans.info().log(msg);
+                               System.out.println(msg);
                        }
-
                }
 
        }
@@ -198,32 +188,36 @@ public class Upload extends Batch {
 
        private String build(String feed, List<String> array) {
                String rv;
-               switch(feed) {
-                       case "approval":
-                               rv = String.format(APPR_INS_FMT,array.toArray());
-                               break;
-                       case "artifact":
-                               rv = String.format(ARTI_INS_FMT,array.toArray());
-                               break;
-                       case "cred":
-                               rv = String.format(CRED_INS_FMT,array.toArray());
-                               break;
-                       case "ns":
-                               rv = String.format(NS_INS_FMT,array.toArray());
-                               break;
-                       case "role":
-                               rv = String.format(ROLE_INS_FMT,array.toArray());
-                               break;
-                       case "perm":
-                               rv = String.format(PERM_INS_FMT,array.toArray());
-                               break;
-                       case "x509":
-                               rv = String.format(X509_INS_FMT,array.toArray());
-                               break;
-                       default:
-                               rv = "";
+               if(array.size()>0) {
+                       switch(feed) {
+                               case "approval":
+                                       rv = String.format(APPR_INS_FMT,array.toArray());
+                                       break;
+                               case "artifact":
+                                       rv = String.format(ARTI_INS_FMT,array.toArray());
+                                       break;
+                               case "cred":
+                                       rv = String.format(CRED_INS_FMT,array.toArray());
+                                       break;
+                               case "ns":
+                                       rv = String.format(NS_INS_FMT,array.toArray());
+                                       break;
+                               case "role":
+                                       rv = String.format(ROLE_INS_FMT,array.toArray());
+                                       break;
+                               case "perm":
+                                       rv = String.format(PERM_INS_FMT,array.toArray());
+                                       break;
+                               case "x509":
+                                       rv = String.format(X509_INS_FMT,array.toArray());
+                                       break;
+                               default:
+                                       rv = "";
+                       }
+                       array.clear();
+               } else {
+                       rv = ""; 
                }
-               array.clear();
                return rv;
        }
        
@@ -290,23 +284,6 @@ public class Upload extends Batch {
                }
        }
 
-       private void applyBatch(StringBuilder query) {
-               try {
-                       query.append("APPLY BATCH;");
-                       ResultSet rv = session.execute(query.toString());
-                       if(rv.wasApplied()) {
-                               System.out.print('.');
-                               if((++batchCnt % 60)==0) {
-                                       System.out.println();
-                               }
-                       } else {
-                               System.out.print("Data NOT APPLIED");
-                       }
-               } finally {
-                       query.setLength(0);
-               }
-       }
-
 
        @Override
        protected void _close(AuthzTrans trans) {