Batch work and client
[aaf/authz.git] / auth / auth-batch / src / main / java / org / onap / aaf / auth / batch / update / Remove.java
index bcc8591..dad03ce 100644 (file)
@@ -33,6 +33,7 @@ import org.onap.aaf.auth.batch.Batch;
 import org.onap.aaf.auth.batch.BatchPrincipal;
 import org.onap.aaf.auth.batch.helpers.Approval;
 import org.onap.aaf.auth.batch.helpers.CQLBatch;
+import org.onap.aaf.auth.batch.helpers.CQLBatchLoop;
 import org.onap.aaf.auth.batch.helpers.Cred;
 import org.onap.aaf.auth.batch.helpers.Future;
 import org.onap.aaf.auth.batch.helpers.UserRole;
@@ -53,168 +54,150 @@ import org.onap.aaf.misc.env.TimeTaken;
 import org.onap.aaf.misc.env.util.Chrono;
 
 public class Remove extends Batch {
-    private final AuthzTrans noAvg;
-    private HistoryDAO historyDAO;
+       private final AuthzTrans noAvg;
+       private HistoryDAO historyDAO;
        private CQLBatch cqlBatch;
 
-    public Remove(AuthzTrans trans) throws APIException, IOException, OrganizationException {
-        super(trans.env());
-        trans.info().log("Starting Connection Process");
-        
-        noAvg = env.newTransNoAvg();
-        noAvg.setUser(new BatchPrincipal("Remove"));
-
-        TimeTaken tt0 = trans.start("Cassandra Initialization", Env.SUB);
-        try {
-               historyDAO = new HistoryDAO(trans, cluster, CassAccess.KEYSPACE);
-            TimeTaken tt2 = trans.start("Connect to Cluster", Env.REMOTE);
-            try {
-                session = historyDAO.getSession(trans);
-            } finally {
-                tt2.done();
-            }
-            cqlBatch = new CQLBatch(noAvg.info(),session); 
-            
-
-        } finally {
-            tt0.done();
-        }
-    }
-
-    @Override
-    protected void run(AuthzTrans trans) {
-        final int maxBatch = 25;
-
-        // Create Intermediate Output 
-        File logDir = logDir();
-        
-        List<File> remove = new ArrayList<>();
-        if(args().length>0) {
-               for(int i=0;i<args().length;++i) {
-                       remove.add(new File(logDir, args()[i]));
-               }
-        } else {
-               remove.add(new File(logDir,"Delete"+Chrono.dateOnlyStamp()+".csv"));
-        }
-        
-        for(File f : remove) {
-               trans.init().log("Processing File:",f.getAbsolutePath());
-        }
-        
-        final Holder<Boolean> ur = new Holder<>(false);
-        final Holder<Boolean> cred = new Holder<>(false);
-        final Holder<Boolean> x509 = new Holder<>(false);
-        final Holder<String> memoFmt = new Holder<String>("");
-        final HistoryDAO.Data hdd = new HistoryDAO.Data();
-        final String orgName = trans.org().getName();
-        
-        hdd.action="delete";
-        hdd.reconstruct = ByteBuffer.allocate(0);
-        hdd.user = noAvg.user();
-        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMM");
-        hdd.yr_mon = Integer.parseInt(sdf.format(new Date()));
-        
-        try { 
-               for(File f : remove) {
-                       trans.info().log("Processing ",f.getAbsolutePath(),"for Deletions");
-                       if(f.exists()) {
-                               CSV removeCSV = new CSV(env.access(),f);
-                                       
-                               try {
-                                       final StringBuilder sb = cqlBatch.begin();
-                                   final Holder<Integer> hi = new Holder<Integer>(0);
-                                               removeCSV.visit(new CSV.Visitor() {
-                                                       @Override
-                                                       public void visit(List<String> row) throws IOException, CadiException {
-                                                               int i = hi.get();
-                                                               if(i>=maxBatch) {
-                                                                       cqlBatch.execute(dryRun);
-                                                                       hi.set(0);
-                                                                       cqlBatch.begin();
-                                                                       i=0;
-                                                               }
-                                                               switch(row.get(0)) {
-                                                                       case "info":
-                                                                               switch(row.get(1)) {
-                                                                                       case "Delete":
-                                                                                               memoFmt.set("%s expired from %s on %s");
-                                                                                               break;
-                                                                                       case "NotInOrgDelete":
-                                                                                               memoFmt.set("Identity %s was removed from %s on %s");
-                                                                                               break;
-
-                                                                               }
-                                                                               break;
-                                                                       case "ur":
-                                                                               if(!ur.get()) {
-                                                                                       ur.set(true);
-                                                                               }
-                                                                               hi.set(++i);
-                                                                               UserRole.batchDelete(sb,row);
-                                                                               hdd.target=UserRoleDAO.TABLE; 
-                                                                               hdd.subject=UserRole.histSubject(row);
-                                                                               hdd.memo=UserRole.histMemo(memoFmt.get(), row);
-                                                                               historyDAO.createBatch(sb, hdd);
-                                                                               break;
-                                                                       case "cred":
-                                                                               if(!cred.get()) {
-                                                                                       cred.set(true);
-                                                                               }
-                                                                               hi.set(++i);
-                                                                               Cred.batchDelete(sb,row);
-                                                                               hdd.target=CredDAO.TABLE; 
-                                                                               hdd.subject=Cred.histSubject(row);
-                                                                               hdd.memo=Cred.histMemo(memoFmt.get(), orgName,row);
-                                                                               historyDAO.createBatch(sb, hdd);
-                                                                       break;
-                                                                       case "x509":
-                                                                               if(!x509.get()) {
-                                                                                       x509.set(true);
-                                                                               }
-                                                                               hi.set(++i);
-                                                                               X509.row(sb,row);
-                                                                               hdd.target=CertDAO.TABLE; 
-                                                                               hdd.subject=X509.histSubject(row);
-                                                                               hdd.memo=X509.histMemo(memoFmt.get(),row);
-                                                                               historyDAO.createBatch(sb, hdd);
-                                                                               break;
-                                                                       case "future":
-                                                                               // Not cached
-                                                                               hi.set(++i);
-                                                                               Future.deleteByIDBatch(sb,row.get(1));
-                                                                               break;
-                                                                       case "approval":
-                                                                               // Not cached
-                                                                               hi.set(++i);
-                                                                               Approval.deleteByIDBatch(sb,row.get(1));
-                                                                               break;
-                                                               }
+       public Remove(AuthzTrans trans) throws APIException, IOException, OrganizationException {
+               super(trans.env());
+               trans.info().log("Starting Connection Process");
+
+               noAvg = env.newTransNoAvg();
+               noAvg.setUser(new BatchPrincipal("Remove"));
+
+               TimeTaken tt0 = trans.start("Cassandra Initialization", Env.SUB);
+               try {
+                       historyDAO = new HistoryDAO(trans, cluster, CassAccess.KEYSPACE);
+                       TimeTaken tt2 = trans.start("Connect to Cluster", Env.REMOTE);
+                       try {
+                               session = historyDAO.getSession(trans);
+                       } finally {
+                               tt2.done();
+                       }
+                       cqlBatch = new CQLBatch(noAvg.info(),session); 
+
+
+               } finally {
+                       tt0.done();
+               }
+       }
+
+       @Override
+       protected void run(AuthzTrans trans) {
+
+               // Create Intermediate Output 
+               File logDir = logDir();
+
+               List<File> remove = new ArrayList<>();
+               if(args().length>0) {
+                       for(int i=0;i<args().length;++i) {
+                               remove.add(new File(logDir, args()[i]));
+                       }
+               } else {
+                       remove.add(new File(logDir,"Delete"+Chrono.dateOnlyStamp()+".csv"));
+               }
+
+               for(File f : remove) {
+                       trans.init().log("Processing File:",f.getAbsolutePath());
+               }
+
+               final Holder<Boolean> ur = new Holder<>(false);
+               final Holder<Boolean> cred = new Holder<>(false);
+               final Holder<Boolean> x509 = new Holder<>(false);
+               final Holder<String> memoFmt = new Holder<String>("");
+               final HistoryDAO.Data hdd = new HistoryDAO.Data();
+               final String orgName = trans.org().getName();
+
+               hdd.action="delete";
+               hdd.reconstruct = ByteBuffer.allocate(0);
+               hdd.user = noAvg.user();
+               SimpleDateFormat sdf = new SimpleDateFormat("yyyyMM");
+               hdd.yr_mon = Integer.parseInt(sdf.format(new Date()));
+
+               try { 
+                       final CQLBatchLoop cbl = new CQLBatchLoop(cqlBatch,50,dryRun);
+                       for(File f : remove) {
+                               trans.info().log("Processing ",f.getAbsolutePath(),"for Deletions");
+                               if(f.exists()) {
+                                       CSV removeCSV = new CSV(env.access(),f);
+                                       try {
+                                               removeCSV.visit( row -> {
+                                                       cbl.preLoop();
+                                                       switch(row.get(0)) {
+                                                               case "info":
+                                                                       switch(row.get(1)) {
+                                                                               case "Delete":
+                                                                                       memoFmt.set("%s expired from %s on %s");
+                                                                                       break;
+                                                                               case "NotInOrgDelete":
+                                                                                       memoFmt.set("Identity %s was removed from %s on %s");
+                                                                                       break;
+                                                                       }
+                                                                       break;
+                                                               case "ur":
+                                                                       if(!ur.get()) {
+                                                                               ur.set(true);
+                                                                       }
+                                                                       UserRole.batchDelete(cbl.inc(),row);
+                                                                       hdd.target=UserRoleDAO.TABLE; 
+                                                                       hdd.subject=UserRole.histSubject(row);
+                                                                       hdd.memo=UserRole.histMemo(memoFmt.get(), row);
+                                                                       historyDAO.createBatch(cbl.inc(), hdd);
+                                                                       break;
+                                                               case "cred":
+                                                                       if(!cred.get()) {
+                                                                               cred.set(true);
+                                                                       }
+                                                                       Cred.batchDelete(cbl.inc(),row);
+                                                                       hdd.target=CredDAO.TABLE; 
+                                                                       hdd.subject=Cred.histSubject(row);
+                                                                       hdd.memo=Cred.histMemo(memoFmt.get(), orgName,row);
+                                                                       historyDAO.createBatch(cbl.inc(), hdd);
+                                                                       break;
+                                                               case "x509":
+                                                                       if(!x509.get()) {
+                                                                               x509.set(true);
+                                                                       }
+                                                                       X509.row(cbl.inc(),row);
+                                                                       hdd.target=CertDAO.TABLE; 
+                                                                       hdd.subject=X509.histSubject(row);
+                                                                       hdd.memo=X509.histMemo(memoFmt.get(),row);
+                                                                       historyDAO.createBatch(cbl.inc(), hdd);
+                                                                       break;
+                                                               case "future":
+                                                                       // Not cached
+                                                                       Future.deleteByIDBatch(cbl.inc(),row.get(1));
+                                                                       break;
+                                                               case "approval":
+                                                                       // Not cached
+                                                                       Approval.deleteByIDBatch(cbl.inc(),row.get(1));
+                                                                       break;
                                                        }
                                                });
-                                               cqlBatch.execute(dryRun);
+                                               cbl.flush();
                                        } catch (IOException | CadiException e) {
                                                e.printStackTrace();
                                        }
-                       } else {
-                               trans.error().log("File",f.getAbsolutePath(),"does not exist.");
-                       }
-               }
-        } finally {
-               if(ur.get()) {
-                       cqlBatch.touch(UserRoleDAO.TABLE, 0, UserRoleDAO.CACHE_SEG, dryRun);
-               }
-               if(cred.get()) {
-                       cqlBatch.touch(CredDAO.TABLE, 0, CredDAO.CACHE_SEG, dryRun);
-               }
-               if(x509.get()) {
-                       cqlBatch.touch(CertDAO.TABLE, 0, CertDAO.CACHE_SEG, dryRun);
-               }
-        }
-    }
-    
-    @Override
-    protected void _close(AuthzTrans trans) {
-        session.close();
-    }
+                               } else {
+                                       trans.error().log("File",f.getAbsolutePath(),"does not exist.");
+                               }
+                       }
+               } finally {
+                       if(ur.get()) {
+                               cqlBatch.touch(UserRoleDAO.TABLE, 0, UserRoleDAO.CACHE_SEG, dryRun);
+                       }
+                       if(cred.get()) {
+                               cqlBatch.touch(CredDAO.TABLE, 0, CredDAO.CACHE_SEG, dryRun);
+                       }
+                       if(x509.get()) {
+                               cqlBatch.touch(CertDAO.TABLE, 0, CertDAO.CACHE_SEG, dryRun);
+                       }
+               }
+       }
+
+       @Override
+       protected void _close(AuthzTrans trans) {
+               session.close();
+       }
 
 }