Improve Batch Bulk Load 14/79214/2
authorInstrumental <jonathan.gathman@att.com>
Tue, 26 Feb 2019 19:12:00 +0000 (13:12 -0600)
committerInstrumental <jonathan.gathman@att.com>
Tue, 26 Feb 2019 20:41:05 +0000 (14:41 -0600)
Issue-ID: AAF-775
Change-Id: Ib2d6ff3607a30c6e8d74ff24498e5907237d6ff2
Signed-off-by: Instrumental <jonathan.gathman@att.com>
auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatch.java
auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatchLoop.java
auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/Cred.java
auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Extend.java
auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Upload.java
auth/auth-cass/cass_init/cmd.sh
auth/auth-cass/cass_init/push.sh
auth/auth-cass/docker/dbash.sh

index efd1ec9..738c534 100644 (file)
@@ -63,12 +63,14 @@ public class CQLBatch {
        }
        
        public ResultSet execute(boolean dryRun) {
+               ResultSet rv = null;
                if(dryRun) {
                        end();
-                       return null;
                } else {
-                       return execute();
+                       rv = execute();
                }
+               sb.setLength(0);
+               return rv;
        }
        
        public void touch(String table, int begin, int end, boolean dryRun) {
index ca264d1..51a88ef 100644 (file)
@@ -27,10 +27,13 @@ public class CQLBatchLoop {
        private final StringBuilder sb;
        private final boolean dryRun;
        private int i;
+       private int count;
+       private int batches;
        
        public CQLBatchLoop(CQLBatch cb, int max, boolean dryRun) {
                cqlBatch = cb;
                i=0;
+               count = 0;
                maxBatch = max;
                sb = cqlBatch.begin();
                this.dryRun = dryRun;
@@ -43,10 +46,11 @@ public class CQLBatchLoop {
        public void preLoop() {
                if(i<0) {
                        cqlBatch.begin();
-               } else if(i>=maxBatch) {
+               } else if(i>=maxBatch || sb.length()>24000) {
                        cqlBatch.execute(dryRun);
                        cqlBatch.begin();
                        i=0;
+                       ++batches;
                }
        }
        
@@ -56,6 +60,7 @@ public class CQLBatchLoop {
         */
        public StringBuilder inc() {
                ++i;
+               ++count;
                return sb;
        }
        
@@ -63,7 +68,18 @@ public class CQLBatchLoop {
         * Close up when done.  However, can go back to "preLoop" safely.
         */
        public void flush() {
-               cqlBatch.execute(dryRun);
+               if(i>0) {
+                       cqlBatch.execute(dryRun);
+                       ++batches;
+               }
                i=-1;
        }
+
+       public int total() {
+               return count;
+       }
+       
+       public int batches() {
+               return batches;
+       }
 }
index 8db2b47..e51fcfd 100644 (file)
@@ -136,7 +136,6 @@ public class Cred  {
 
     public static void load(Trans trans, Session session, int ... types ) {
         load(trans, session,"select id, type, expires, other, writetime(cred), tag from authz.cred;",types);
-        
     }
 
     public static void loadOneNS(Trans trans, Session session, String ns,int ... types ) {
index 0179593..870dc1e 100644 (file)
@@ -25,7 +25,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.GregorianCalendar;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.onap.aaf.auth.batch.Batch;
 import org.onap.aaf.auth.batch.BatchPrincipal;
@@ -78,12 +80,14 @@ public class Extend extends Batch {
         gcType = GregorianCalendar.WEEK_OF_YEAR;
         int weeks = 4;
                
+        Set<String> cmd = new HashSet<>();
                for(int i=0; i< args().length;++i) {
                        if("-weeks".equals(args()[i])) {
                                if(args().length>i+1) {
-                                       weeks = Integer.parseInt(args()[i +1]);
-                                       break;
+                                       weeks = Integer.parseInt(args()[++i]);
                                }
+                       } else {
+                               cmd.add(args()[i]);
                        }
                }
                
@@ -95,12 +99,12 @@ public class Extend extends Batch {
         // Create Intermediate Output 
         File logDir = logDir();
         extFiles = new ArrayList<>();
-        if(args().length>0) {
-               for(int i=0;i<args().length;++i) {
-                       extFiles.add(new File(logDir, args()[i]));
-               }
-        } else {
+        if(cmd.isEmpty()) {
                extFiles.add(new File(logDir,PrepExtend.PREP_EXTEND+Chrono.dateOnlyStamp()+".csv"));
+        } else {
+               for(String fn : cmd) {
+                       extFiles.add(new File(logDir, fn));
+               }
         }
         
         // Load Cred.  We don't follow Visitor, because we have to gather up everything into Identity Anyway
@@ -142,7 +146,7 @@ public class Extend extends Batch {
                                                        case "ur":
                                                                hi.set(++i);
                                                                gc = hgc.get();
-                                                               gc.setTime(new Date(Long.parseLong(row.get(5))));
+                                                               gc.setTime(new Date(Long.parseLong(row.get(6))));
                                                                if(gc.before(now)) {
                                                                        gc.setTime(now.getTime());
                                                                }
index 87bdf27..a3d37a0 100644 (file)
@@ -30,20 +30,18 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.onap.aaf.auth.batch.Batch;
+import org.onap.aaf.auth.batch.helpers.CQLBatch;
+import org.onap.aaf.auth.batch.helpers.CQLBatchLoop;
 import org.onap.aaf.auth.env.AuthzTrans;
 import org.onap.aaf.auth.org.OrganizationException;
 import org.onap.aaf.misc.env.APIException;
 import org.onap.aaf.misc.env.Env;
+import org.onap.aaf.misc.env.LogTarget;
 import org.onap.aaf.misc.env.TimeTaken;
 
-import com.datastax.driver.core.ResultSet;
-
 public class Upload extends Batch {
 
-       private static final int BATCH_LENGTH = 100;
-
-       int count;
-       int batchCnt;
+       private CQLBatchLoop cqlBatch;
 
        // APPROVALS
        private static final String APPR_INS_FMT="  INSERT INTO authz.approval "
@@ -104,7 +102,8 @@ public class Upload extends Batch {
                        } finally {
                                tt.done();
                        }
-
+                       
+                       cqlBatch = new CQLBatchLoop(new CQLBatch(LogTarget.NULL,session),50,dryRun);
            } finally {
                tt0.done();
            }
@@ -114,35 +113,36 @@ public class Upload extends Batch {
        protected void run(AuthzTrans trans) {
                String line;
                StringBuilder sb = new StringBuilder();
-               StringBuilder query = new StringBuilder();
                List<String> array = new ArrayList<String>();
                for(String feed : args()) {
-                       File file = new File(feed + ".dat");
+                       File file;
+                       if(feed.endsWith(".dat")) {
+                               file = new File(feed);
+                               feed = file.getName();
+                               feed = feed.substring(0,feed.length()-4);
+                       } else {
+                               file = new File(feed+".dat");
+                       }
                        TimeTaken tt = trans.start(file.getAbsolutePath(), Env.SUB);
-                       System.out.println("#### Running " + feed + ".dat Feed ####");
+                       String msg = String.format("#### Running %s.dat Feed ####",feed);
+                       trans.info().log(msg);
+                       System.out.println(msg);
+               BufferedReader br = null;
                    try {
-
                                if(file.exists()) {
-                                       count=batchCnt=0;
-                                       boolean justOne = false;
                                        try {
-                                               BufferedReader br = new BufferedReader(new FileReader(file));
+                                               br = new BufferedReader(new FileReader(file));
                                                try {
                                                        while((line=br.readLine())!=null) {
                                                                if(line.length()>5000) {
-                                                                       if(query.length()>0) {
-                                                                               applyBatch(query);
-                                                                               justOne=true;
-                                                                       }
-                                                               }
-                                                               if(query.length()==0) {
-                                                                       query.append("BEGIN BATCH\n");
+                                                                       cqlBatch.flush();
                                                                }
+                                                               cqlBatch.preLoop();
+
                                                                // Split into fields, first turning Escaped values into something we can convert back from
                                                                char c=0;
                                                                boolean inQuote = false;
                                                                int fldcnt = 0;
-                                                               
                                                                for(int i=0;i<line.length();++i) {
                                                                        switch(c=line.charAt(i)) {
                                                                                case '"':
@@ -160,37 +160,27 @@ public class Upload extends Batch {
                                                                        }
                                                                }
                                                                addField(feed,fldcnt,array,sb);
-                                                               query.append(build(feed, array));
-                                                               
-                                                               if((++count % BATCH_LENGTH)==0 || justOne) {
-                                                                       applyBatch(query);
-                                                                       justOne=false;
-                                                               }
-                                                       }
-                                                       if(query.length()>0) {
-                                                               applyBatch(query);
+                                                               cqlBatch.inc().append(build(feed, array));
                                                        }
-                                                       
+                                                       cqlBatch.flush();
+                                               } catch (Exception t) {
+                                                       trans.error().log(t);
                                                } finally {
                                                        br.close();
-                                                       sb.setLength(0);
-                                                       query.setLength(0);
                                                }
-                                               
                                        } catch (IOException e) {
                                                trans.error().log(e);
-                                               e.printStackTrace();
                                        }
-
                                } else {
                                        trans.error().log("No file found: ", file.getAbsolutePath());
                                }
                        } finally {
                                tt.done();
                                System.err.flush();
-                               System.out.printf("\n%d applied in %d batches\n",count,batchCnt);
+                               msg = String.format("\n%d applied in %d batches\n",cqlBatch.total(), cqlBatch.batches());
+                               trans.info().log(msg);
+                               System.out.println(msg);
                        }
-
                }
 
        }
@@ -198,32 +188,36 @@ public class Upload extends Batch {
 
        private String build(String feed, List<String> array) {
                String rv;
-               switch(feed) {
-                       case "approval":
-                               rv = String.format(APPR_INS_FMT,array.toArray());
-                               break;
-                       case "artifact":
-                               rv = String.format(ARTI_INS_FMT,array.toArray());
-                               break;
-                       case "cred":
-                               rv = String.format(CRED_INS_FMT,array.toArray());
-                               break;
-                       case "ns":
-                               rv = String.format(NS_INS_FMT,array.toArray());
-                               break;
-                       case "role":
-                               rv = String.format(ROLE_INS_FMT,array.toArray());
-                               break;
-                       case "perm":
-                               rv = String.format(PERM_INS_FMT,array.toArray());
-                               break;
-                       case "x509":
-                               rv = String.format(X509_INS_FMT,array.toArray());
-                               break;
-                       default:
-                               rv = "";
+               if(array.size()>0) {
+                       switch(feed) {
+                               case "approval":
+                                       rv = String.format(APPR_INS_FMT,array.toArray());
+                                       break;
+                               case "artifact":
+                                       rv = String.format(ARTI_INS_FMT,array.toArray());
+                                       break;
+                               case "cred":
+                                       rv = String.format(CRED_INS_FMT,array.toArray());
+                                       break;
+                               case "ns":
+                                       rv = String.format(NS_INS_FMT,array.toArray());
+                                       break;
+                               case "role":
+                                       rv = String.format(ROLE_INS_FMT,array.toArray());
+                                       break;
+                               case "perm":
+                                       rv = String.format(PERM_INS_FMT,array.toArray());
+                                       break;
+                               case "x509":
+                                       rv = String.format(X509_INS_FMT,array.toArray());
+                                       break;
+                               default:
+                                       rv = "";
+                       }
+                       array.clear();
+               } else {
+                       rv = ""; 
                }
-               array.clear();
                return rv;
        }
        
@@ -290,23 +284,6 @@ public class Upload extends Batch {
                }
        }
 
-       private void applyBatch(StringBuilder query) {
-               try {
-                       query.append("APPLY BATCH;");
-                       ResultSet rv = session.execute(query.toString());
-                       if(rv.wasApplied()) {
-                               System.out.print('.');
-                               if((++batchCnt % 60)==0) {
-                                       System.out.println();
-                               }
-                       } else {
-                               System.out.print("Data NOT APPLIED");
-                       }
-               } finally {
-                       query.setLength(0);
-               }
-       }
-
 
        @Override
        protected void _close(AuthzTrans trans) {
index ca1c0e8..b26536d 100644 (file)
@@ -97,10 +97,10 @@ function install_cql {
     wait_start cassandra responsive   
     # Now, make sure data exists
     if [ ! -e $INSTALLED_VERSION ] && [ -n "$(/usr/bin/cqlsh -e 'describe keyspaces' | grep authz)" ]; then
-      /usr/bin/cqlsh --timeout 60 -e 'DROP KEYSPACE authz' 
+      /usr/bin/cqlsh --request-timeout=60 -e 'DROP KEYSPACE authz' 
     fi
 
-    if [ -z "`/usr/bin/cqlsh --timeout 60 -e 'describe keyspaces' | grep authz`" ]; then
+    if [ -z "`/usr/bin/cqlsh --request-timeout 60 -e 'describe keyspaces' | grep authz`" ]; then
         status install 
         echo "Initializing Cassandra DB" 
         echo "Docker Installed Basic Cassandra on aaf.cass.  Executing the following "
index f3e5d80..f887f07 100644 (file)
@@ -35,11 +35,7 @@ fi
 cd dats
 for T in $(ls *.dat); do
   if [ -s $T ]; then
-    until cqlsh --request-timeout=60 -e "COPY authz.${T/.dat/} FROM '$T' WITH DELIMITER='|';";
-    do
-      echo "Unexpected failure...sleep for 10 seconds and try again"
-      sleep 10
-    done
+    cqlsh --request-timeout=100 -e "COPY authz.${T/.dat/} FROM '$T' WITH DELIMITER='|';";
   fi
 done
 cd $DIR
index 1e13d27..737cf10 100644 (file)
@@ -24,5 +24,5 @@ if [ -e ../../docker/d.props ]; then
 fi
 DOCKER=${DOCKER:-docker}
 
-$DOCKER exec -it aaf_cass bash
+$DOCKER exec -it aaf-cass bash