X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=auth%2Fauth-batch%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Faaf%2Fauth%2Fbatch%2Fupdate%2FUpload.java;h=83de05afc545b0cd24fbc628d2363886bc4db4af;hb=HEAD;hp=76472b5f70b7caffaf6275230157427b15e0e466;hpb=dc4923a67f424149e35a77108f5b2ac3418fbaf3;p=aaf%2Fauthz.git diff --git a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Upload.java b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Upload.java index 76472b5f..83de05af 100644 --- a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Upload.java +++ b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Upload.java @@ -3,13 +3,15 @@ * org.onap.aaf * =========================================================================== * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. + * + * Modifications Copyright (C) 2018 IBM. * =========================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,294 +22,206 @@ package org.onap.aaf.auth.batch.update; -import java.io.BufferedReader; import java.io.File; -import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.onap.aaf.auth.batch.Batch; +import org.onap.aaf.auth.batch.helpers.CQLBatch; +import org.onap.aaf.auth.batch.helpers.CQLBatchLoop; import org.onap.aaf.auth.env.AuthzTrans; import org.onap.aaf.auth.org.OrganizationException; +import org.onap.aaf.cadi.util.CSV; import org.onap.aaf.misc.env.APIException; import org.onap.aaf.misc.env.Env; +import org.onap.aaf.misc.env.LogTarget; import org.onap.aaf.misc.env.TimeTaken; - -import com.datastax.driver.core.ResultSet; +import org.onap.aaf.misc.env.util.Split; public class Upload extends Batch { - public Upload(AuthzTrans trans) throws APIException, IOException, OrganizationException { - super(trans.env()); - trans.info().log("Starting Connection Process"); - - TimeTaken tt0 = trans.start("Cassandra Initialization", Env.SUB); - try { - TimeTaken tt = trans.start("Connect to Cluster", Env.REMOTE); - try { - session = cluster.connect(); - } finally { - tt.done(); - } - - } finally { - tt0.done(); - } - } - - private static final int BATCH_LENGTH = 100; - - int count,batchCnt; - - @Override - protected void run(AuthzTrans trans) { - String line; - StringBuilder sb = new StringBuilder(); - StringBuilder query = new StringBuilder(); - List array = new ArrayList(); - for(String feed : args()) { - File file = new File(feed + ".dat"); - TimeTaken tt = trans.start(file.getAbsolutePath(), Env.SUB); - System.out.println("#### Running " + feed + ".dat Feed ####"); - try { - - if(file.exists()) { - count=batchCnt=0; - boolean justOne = false; - try { - BufferedReader br = new BufferedReader(new FileReader(file)); - try { - while((line=br.readLine())!=null) { - if(line.length()>5000) { - if(query.length()>0) { - applyBatch(query); - justOne=true; - } - } - if(query.length()==0) { - query.append("BEGIN BATCH\n"); - } - // Split into fields, first turning Escaped values into something we can convert back from - char c=0; - boolean inQuote = false; - int fldcnt = 0; - - for(int i=0;i0) { - applyBatch(query); - } - - } finally { - br.close(); - sb.setLength(0); - query.setLength(0); - } - - } catch (IOException e) { - trans.error().log(e); - e.printStackTrace(); - } - - } else { - trans.error().log("No file found: ", file.getAbsolutePath()); - } - } finally { - tt.done(); - System.err.flush(); - System.out.printf("\n%d applied in %d batches\n",count,batchCnt); - } - - } - - } - // APPROVALS - private static final String APPR_INS_FMT=" INSERT INTO authz.approval " - + "(id,approver,last_notified,memo,operation,status,ticket,type,user) " - + "VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s);\n"; - private static final Boolean[] APPR_QUOTES = new Boolean[]{false,true,true,true,true,true,false,true,true}; - - // ARTIFACTS - private static final String ARTI_INS_FMT=" INSERT INTO authz.artifact " - + "(mechid,machine,ca,dir,expires,notify,ns,os_user,renewdays,sans,sponsor,type) " - + "VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);\n"; - private static final Boolean[] ARTI_QUOTES = new Boolean[] - {true,true,true,true,true,true,true,true,false,false,true,false}; - - // CREDS - private static final String CRED_INS_FMT=" INSERT INTO authz.cred " - + "(id,type,expires,cred,notes,ns,other,prev) " - + "VALUES (%s,%s,%s,%s,%s,%s,%s,%s);\n"; - private static final Boolean[] CRED_QUOTES = new Boolean[] - {true,false,true,false,true,true,false,false}; - - // NS - private static final String NS_INS_FMT=" INSERT INTO authz.ns " - + "(name,description,parent,scope,type) " - + "VALUES (%s,%s,%s,%s,%s);\n"; - private static final Boolean[] NS_QUOTES = new Boolean[] - {true,true,true,false,false}; - - // x509 - private static final String X509_INS_FMT=" INSERT INTO authz.x509 " - + "(ca,serial,id,x500,x509) " - + "VALUES (%s,%s,%s,%s,%s);\n"; - private static final Boolean[] X509_QUOTES = new Boolean[] - {true,false,true,true,true}; - - // ROLE - private static final String ROLE_INS_FMT=" INSERT INTO authz.role " - + "(ns,name,description,perms) " - + "VALUES (%s,%s,%s,%s);\n"; - private static final Boolean[] ROLE_QUOTES = new Boolean[] - {true,true,true,false}; - // ROLE - private static final String PERM_INS_FMT=" INSERT INTO authz.perm " - + "(ns,type,instance,action,description,roles) " - + "VALUES (%s,%s,%s,%s,%s,%s);\n"; - private static final Boolean[] PERM_QUOTES = new Boolean[] - {true,true,true,true,true,false}; - - - private String build(String feed, List array) { - String rv; - switch(feed) { - case "approval": - rv = String.format(APPR_INS_FMT,array.toArray()); - break; - case "artifact": - rv = String.format(ARTI_INS_FMT,array.toArray()); - break; - case "cred": - rv = String.format(CRED_INS_FMT,array.toArray()); - break; - case "ns": - rv = String.format(NS_INS_FMT,array.toArray()); - break; - case "role": - rv = String.format(ROLE_INS_FMT,array.toArray()); - break; - case "perm": - rv = String.format(PERM_INS_FMT,array.toArray()); - break; - case "x509": - rv = String.format(X509_INS_FMT,array.toArray()); - break; - default: - rv = ""; - } - array.clear(); - return rv; - } - - private void addField(String feed, int fldcnt, List array, StringBuilder sb) { - Boolean[] ba; - switch(feed) { - case "approval": - ba = APPR_QUOTES; - break; - case "artifact": - ba = ARTI_QUOTES; - break; - case "cred": - ba = CRED_QUOTES; - break; - case "ns": - ba = NS_QUOTES; - break; - case "role": - ba = ROLE_QUOTES; - break; - case "perm": - ba = PERM_QUOTES; - break; - case "x509": - ba = X509_QUOTES; - break; - default: - ba = null; - } - if(ba!=null) { - if(sb.toString().length()==0) { - array.add("null"); - } else { - if(ba[fldcnt]) { - String s = null; - if(sb.indexOf("'")>=0) { - s = sb.toString().replace("'","''"); - } - if(sb.indexOf("\\n")>=0) { - if(s==null) { - s = sb.toString().replace("\\n","\n"); - } else { - s = s.replace("\\n","\n"); - } - } - if(sb.indexOf("\\t")>=0) { - if(s==null) { - s = sb.toString().replace("\\t","\t"); - } else { - s = s.replace("\\t","\t"); - } - } - if(s==null) { - array.add("'" + sb + '\''); - } else { - array.add("'" + s + '\''); - } - } else { - array.add(sb.toString()); - } - } - sb.setLength(0); - } - } - - private void applyBatch(StringBuilder query) { - try { - query.append("APPLY BATCH;"); - ResultSet rv = session.execute(query.toString()); - if(rv.wasApplied()) { - System.out.print('.'); - if((++batchCnt % 60)==0) { - System.out.println(); - } - } else { - System.out.print("Data NOT APPLIED"); - } - } finally { - query.setLength(0); - } - } - - - @Override - protected void _close(AuthzTrans trans) { + private static final String DAT = ".dat"; + + private CQLBatch cqlBatch; + + private Map feeds; + + + public Upload(AuthzTrans trans) throws APIException, IOException, OrganizationException { + super(trans.env()); + trans.info().log("Starting Connection Process"); + + TimeTaken tt0 = trans.start("Cassandra Initialization", Env.SUB); + try { + TimeTaken tt = trans.start("Connect to Cluster", Env.REMOTE); + try { + session = cluster.connect(); + } finally { + tt.done(); + } + + cqlBatch = new CQLBatch(LogTarget.NULL,session); + + feeds=new HashMap<>(); + new Feed(feeds,"ns",1,"name,description,parent,scope=int,type=int",300); + new Feed(feeds,"notified",3,"user,target,key,last",300); + new Feed(feeds,"approval",1,"id=UUID,approver,last_notified,memo,operation,status,ticket=UUID,type,user",200); + new Feed(feeds,"artifact",2,"mechid,machine,ca,dir,expires,notify,ns,os_user,renewdays=int,sans=set,sponsor,type=set",200); + new Feed(feeds,"cred",1,"id,type=int,expires,cred=blob,notes,ns,other=int,prev=blob,tag",200); + new Feed(feeds,"x509",2,"ca,serial=blob,id,x500,x509=C/R",200); + new Feed(feeds,"role",2,"ns,name,description,perms=set",200); + new Feed(feeds,"perm",4,"ns,type,instance,action,description,roles=set",200); + new Feed(feeds,"history",1,"id=UUID,action,memo,reconstruct=blob,subject,target,user,yr_mon=int",300); + + } finally { + tt0.done(); + } + } + + + @Override + protected void run(AuthzTrans trans) { + List files = new ArrayList<>(); + for(String s : args()) { + trans.init().log(s); + } + + if(args().length>0) { + File dir = new File(args()[0]); + if(dir.isDirectory()) { + for(File f : dir.listFiles(pathname -> { + return pathname.getName().endsWith(DAT); + })) { + files.add(f); + } + } else { + File f; + for(String arg : args()) { + if(arg.endsWith(DAT)) { + f=new File(arg); + } else { + f=new File(arg+DAT); + } + files.add(f); + } + } + } + + for(File file : files) { + String f = file.getName(); + final Feed feed = feeds.get(f.substring(0,f.length()-4)); + if(feed!=null) { + TimeTaken tt = trans.start(file.getAbsolutePath(), Env.SUB); + String msg = String.format("#### Running %s.dat Feed ####",feed.getName()); + trans.info().log(msg); + System.out.println(msg); + CQLBatchLoop cbl = new CQLBatchLoop(cqlBatch,feed.batchSize,dryRun).showProgress(); + + try { + if(file.exists()) { + CSV csv = new CSV(trans.env().access(),file).setDelimiter('|'); + csv.visit( row -> { + feed.insert(cbl.inc(),row); + }); + } + cbl.flush(); + } catch (Throwable e) { + e.printStackTrace(); + } finally { + tt.done(); + System.err.flush(); + msg = String.format("\n%d applied in %d batches\n",cbl.total(), cbl.batches()); + trans.info().log(msg); + System.out.println(msg); + } + } + } + } + + @Override + protected void _close(AuthzTrans trans) { session.close(); - } - + } + + private class Feed { + private final String name; + private final String[] flds; + private final String[] types; + private final int key; + private final int batchSize; + public Feed(Map feeds, String feed, int keyLength, String fields,int batchSize) { + name=feed; + key = keyLength; + flds = Split.splitTrim(',', fields); + types = new String[flds.length]; + this.batchSize = batchSize; + int equals; + for(int i=0;i0) { + types[i]=flds[i].substring(equals+1); + flds[i]=flds[i].substring(0, equals); + } + } + feeds.put(feed,this); + } + + public String getName() { + return name; + } + + public void insert(StringBuilder sb,List row) { + sb.append("INSERT INTO authz."); + sb.append(name); + sb.append(" ("); + boolean first = true; + StringBuilder values = new StringBuilder(") VALUES ("); + String value; + String type; + for(int idx=0;idx