From c5705cdbe98f13e1dd4b64040390ea389a6065c2 Mon Sep 17 00:00:00 2001 From: Instrumental Date: Tue, 26 Feb 2019 13:12:00 -0600 Subject: [PATCH] Improve Batch Bulk Load Issue-ID: AAF-775 Change-Id: Ib2d6ff3607a30c6e8d74ff24498e5907237d6ff2 Signed-off-by: Instrumental --- .../org/onap/aaf/auth/batch/helpers/CQLBatch.java | 6 +- .../onap/aaf/auth/batch/helpers/CQLBatchLoop.java | 20 ++- .../java/org/onap/aaf/auth/batch/helpers/Cred.java | 1 - .../org/onap/aaf/auth/batch/update/Extend.java | 20 +-- .../org/onap/aaf/auth/batch/update/Upload.java | 139 +++++++++------------ auth/auth-cass/cass_init/cmd.sh | 4 +- auth/auth-cass/cass_init/push.sh | 6 +- auth/auth-cass/docker/dbash.sh | 2 +- 8 files changed, 96 insertions(+), 102 deletions(-) diff --git a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatch.java b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatch.java index efd1ec9e..738c5345 100644 --- a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatch.java +++ b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatch.java @@ -63,12 +63,14 @@ public class CQLBatch { } public ResultSet execute(boolean dryRun) { + ResultSet rv = null; if(dryRun) { end(); - return null; } else { - return execute(); + rv = execute(); } + sb.setLength(0); + return rv; } public void touch(String table, int begin, int end, boolean dryRun) { diff --git a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatchLoop.java b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatchLoop.java index ca264d14..51a88ef2 100644 --- a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatchLoop.java +++ b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatchLoop.java @@ -27,10 +27,13 @@ public class CQLBatchLoop { private final StringBuilder sb; private final boolean dryRun; private int i; + private int count; + private int batches; public CQLBatchLoop(CQLBatch cb, int max, boolean dryRun) { cqlBatch = cb; i=0; + count = 0; maxBatch = max; sb = cqlBatch.begin(); this.dryRun = dryRun; @@ -43,10 +46,11 @@ public class CQLBatchLoop { public void preLoop() { if(i<0) { cqlBatch.begin(); - } else if(i>=maxBatch) { + } else if(i>=maxBatch || sb.length()>24000) { cqlBatch.execute(dryRun); cqlBatch.begin(); i=0; + ++batches; } } @@ -56,6 +60,7 @@ public class CQLBatchLoop { */ public StringBuilder inc() { ++i; + ++count; return sb; } @@ -63,7 +68,18 @@ public class CQLBatchLoop { * Close up when done. However, can go back to "preLoop" safely. */ public void flush() { - cqlBatch.execute(dryRun); + if(i>0) { + cqlBatch.execute(dryRun); + ++batches; + } i=-1; } + + public int total() { + return count; + } + + public int batches() { + return batches; + } } diff --git a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/Cred.java b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/Cred.java index 8db2b47a..e51fcfdc 100644 --- a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/Cred.java +++ b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/Cred.java @@ -136,7 +136,6 @@ public class Cred { public static void load(Trans trans, Session session, int ... types ) { load(trans, session,"select id, type, expires, other, writetime(cred), tag from authz.cred;",types); - } public static void loadOneNS(Trans trans, Session session, String ns,int ... types ) { diff --git a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Extend.java b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Extend.java index 01795935..870dc1e5 100644 --- a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Extend.java +++ b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Extend.java @@ -25,7 +25,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Date; import java.util.GregorianCalendar; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.onap.aaf.auth.batch.Batch; import org.onap.aaf.auth.batch.BatchPrincipal; @@ -78,12 +80,14 @@ public class Extend extends Batch { gcType = GregorianCalendar.WEEK_OF_YEAR; int weeks = 4; + Set cmd = new HashSet<>(); for(int i=0; i< args().length;++i) { if("-weeks".equals(args()[i])) { if(args().length>i+1) { - weeks = Integer.parseInt(args()[i +1]); - break; + weeks = Integer.parseInt(args()[++i]); } + } else { + cmd.add(args()[i]); } } @@ -95,12 +99,12 @@ public class Extend extends Batch { // Create Intermediate Output File logDir = logDir(); extFiles = new ArrayList<>(); - if(args().length>0) { - for(int i=0;i array = new ArrayList(); for(String feed : args()) { - File file = new File(feed + ".dat"); + File file; + if(feed.endsWith(".dat")) { + file = new File(feed); + feed = file.getName(); + feed = feed.substring(0,feed.length()-4); + } else { + file = new File(feed+".dat"); + } TimeTaken tt = trans.start(file.getAbsolutePath(), Env.SUB); - System.out.println("#### Running " + feed + ".dat Feed ####"); + String msg = String.format("#### Running %s.dat Feed ####",feed); + trans.info().log(msg); + System.out.println(msg); + BufferedReader br = null; try { - if(file.exists()) { - count=batchCnt=0; - boolean justOne = false; try { - BufferedReader br = new BufferedReader(new FileReader(file)); + br = new BufferedReader(new FileReader(file)); try { while((line=br.readLine())!=null) { if(line.length()>5000) { - if(query.length()>0) { - applyBatch(query); - justOne=true; - } - } - if(query.length()==0) { - query.append("BEGIN BATCH\n"); + cqlBatch.flush(); } + cqlBatch.preLoop(); + // Split into fields, first turning Escaped values into something we can convert back from char c=0; boolean inQuote = false; int fldcnt = 0; - for(int i=0;i0) { - applyBatch(query); + cqlBatch.inc().append(build(feed, array)); } - + cqlBatch.flush(); + } catch (Exception t) { + trans.error().log(t); } finally { br.close(); - sb.setLength(0); - query.setLength(0); } - } catch (IOException e) { trans.error().log(e); - e.printStackTrace(); } - } else { trans.error().log("No file found: ", file.getAbsolutePath()); } } finally { tt.done(); System.err.flush(); - System.out.printf("\n%d applied in %d batches\n",count,batchCnt); + msg = String.format("\n%d applied in %d batches\n",cqlBatch.total(), cqlBatch.batches()); + trans.info().log(msg); + System.out.println(msg); } - } } @@ -198,32 +188,36 @@ public class Upload extends Batch { private String build(String feed, List array) { String rv; - switch(feed) { - case "approval": - rv = String.format(APPR_INS_FMT,array.toArray()); - break; - case "artifact": - rv = String.format(ARTI_INS_FMT,array.toArray()); - break; - case "cred": - rv = String.format(CRED_INS_FMT,array.toArray()); - break; - case "ns": - rv = String.format(NS_INS_FMT,array.toArray()); - break; - case "role": - rv = String.format(ROLE_INS_FMT,array.toArray()); - break; - case "perm": - rv = String.format(PERM_INS_FMT,array.toArray()); - break; - case "x509": - rv = String.format(X509_INS_FMT,array.toArray()); - break; - default: - rv = ""; + if(array.size()>0) { + switch(feed) { + case "approval": + rv = String.format(APPR_INS_FMT,array.toArray()); + break; + case "artifact": + rv = String.format(ARTI_INS_FMT,array.toArray()); + break; + case "cred": + rv = String.format(CRED_INS_FMT,array.toArray()); + break; + case "ns": + rv = String.format(NS_INS_FMT,array.toArray()); + break; + case "role": + rv = String.format(ROLE_INS_FMT,array.toArray()); + break; + case "perm": + rv = String.format(PERM_INS_FMT,array.toArray()); + break; + case "x509": + rv = String.format(X509_INS_FMT,array.toArray()); + break; + default: + rv = ""; + } + array.clear(); + } else { + rv = ""; } - array.clear(); return rv; } @@ -290,23 +284,6 @@ public class Upload extends Batch { } } - private void applyBatch(StringBuilder query) { - try { - query.append("APPLY BATCH;"); - ResultSet rv = session.execute(query.toString()); - if(rv.wasApplied()) { - System.out.print('.'); - if((++batchCnt % 60)==0) { - System.out.println(); - } - } else { - System.out.print("Data NOT APPLIED"); - } - } finally { - query.setLength(0); - } - } - @Override protected void _close(AuthzTrans trans) { diff --git a/auth/auth-cass/cass_init/cmd.sh b/auth/auth-cass/cass_init/cmd.sh index ca1c0e82..b26536d6 100644 --- a/auth/auth-cass/cass_init/cmd.sh +++ b/auth/auth-cass/cass_init/cmd.sh @@ -97,10 +97,10 @@ function install_cql { wait_start cassandra responsive # Now, make sure data exists if [ ! -e $INSTALLED_VERSION ] && [ -n "$(/usr/bin/cqlsh -e 'describe keyspaces' | grep authz)" ]; then - /usr/bin/cqlsh --timeout 60 -e 'DROP KEYSPACE authz' + /usr/bin/cqlsh --request-timeout=60 -e 'DROP KEYSPACE authz' fi - if [ -z "`/usr/bin/cqlsh --timeout 60 -e 'describe keyspaces' | grep authz`" ]; then + if [ -z "`/usr/bin/cqlsh --request-timeout 60 -e 'describe keyspaces' | grep authz`" ]; then status install echo "Initializing Cassandra DB" echo "Docker Installed Basic Cassandra on aaf.cass. Executing the following " diff --git a/auth/auth-cass/cass_init/push.sh b/auth/auth-cass/cass_init/push.sh index f3e5d80d..f887f077 100644 --- a/auth/auth-cass/cass_init/push.sh +++ b/auth/auth-cass/cass_init/push.sh @@ -35,11 +35,7 @@ fi cd dats for T in $(ls *.dat); do if [ -s $T ]; then - until cqlsh --request-timeout=60 -e "COPY authz.${T/.dat/} FROM '$T' WITH DELIMITER='|';"; - do - echo "Unexpected failure...sleep for 10 seconds and try again" - sleep 10 - done + cqlsh --request-timeout=100 -e "COPY authz.${T/.dat/} FROM '$T' WITH DELIMITER='|';"; fi done cd $DIR diff --git a/auth/auth-cass/docker/dbash.sh b/auth/auth-cass/docker/dbash.sh index 1e13d27d..737cf102 100644 --- a/auth/auth-cass/docker/dbash.sh +++ b/auth/auth-cass/docker/dbash.sh @@ -24,5 +24,5 @@ if [ -e ../../docker/d.props ]; then fi DOCKER=${DOCKER:-docker} -$DOCKER exec -it aaf_cass bash +$DOCKER exec -it aaf-cass bash -- 2.16.6