/** * Copyright 2017 BOCO Corporation. CMCC Technologies Co., Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.onap.vfc.nfvo.emsdriver.collector; import java.io.BufferedOutputStream; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.Hashtable; import java.util.LinkedHashMap; import java.util.List; import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.xml.stream.XMLInputFactory; import javax.xml.stream.XMLStreamConstants; import javax.xml.stream.XMLStreamReader; import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.onap.vfc.nfvo.emsdriver.commons.constant.Constant; import org.onap.vfc.nfvo.emsdriver.commons.ftp.AFtpRemoteFile; import org.onap.vfc.nfvo.emsdriver.commons.ftp.FTPInterface; import org.onap.vfc.nfvo.emsdriver.commons.ftp.FTPSrv; import org.onap.vfc.nfvo.emsdriver.commons.model.CollectMsg; import org.onap.vfc.nfvo.emsdriver.commons.model.CollectVo; import org.onap.vfc.nfvo.emsdriver.commons.utils.DateUtil; import org.onap.vfc.nfvo.emsdriver.commons.utils.Gunzip; import org.onap.vfc.nfvo.emsdriver.commons.utils.StringUtil; import org.onap.vfc.nfvo.emsdriver.commons.utils.UnZip; import org.onap.vfc.nfvo.emsdriver.commons.utils.VarExprParser; import org.onap.vfc.nfvo.emsdriver.commons.utils.Zip; import org.onap.vfc.nfvo.emsdriver.configmgr.ConfigurationImp; import org.onap.vfc.nfvo.emsdriver.configmgr.ConfigurationInterface; import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannel; import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannelFactory; public class TaskThread implements Runnable{ public Log log = LogFactory.getLog(TaskThread.class); private MessageChannel cmResultChannel; public MessageChannel pmResultChannel; private CollectMsg data; private ConfigurationInterface configurationInterface = new ConfigurationImp(); private String localPath = Constant.SYS_DATA_TEMP; private String resultPath = Constant.SYS_DATA_RESULT; private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss"); private SimpleDateFormat dateFormat2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // private String csvpathAndFileName; // private String xmlPathAndFileName; // private int countNum = 0 ; public TaskThread(CollectMsg data) { this.data = data; } public TaskThread() { super(); } @Override public void run(){ cmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_CHANNEL_KEY); pmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_CHANNEL_KEY); try { collectMsgHandle(data); } catch (Exception e) { log.error("",e); } } private void collectMsgHandle(CollectMsg collectMsg) { String emsName = collectMsg.getEmsName(); String type = collectMsg.getType(); CollectVo collectVo = configurationInterface.getCollectVoByEmsNameAndType(emsName, type); //ftp download List downloadfiles = this.ftpDownload(collectVo); //paser ftp update message send for(String fileName :downloadfiles){ this.parseFtpAndSendMessage(fileName,collectVo); } } public void parseFtpAndSendMessage(String fileName, CollectVo collectVo) { // List filelist = decompressed(fileName); for (File tempfile : filelist) { String unfileName = tempfile.getName(); Pattern pa = Pattern.compile(".*-(.*)-\\w{2}-"); Matcher ma = pa.matcher(unfileName); if (!ma.find()) continue; String nename = ma.group(1); boolean parseResult = false; if("ems-resource".equalsIgnoreCase(collectVo.getType())){ parseResult = processCMXml(tempfile, nename,"CM"); }else{ parseResult = processPMCsv(tempfile); } if (parseResult){ log.info("parser "+tempfile+" sucess"); }else { log.info("parser "+tempfile+" fail"); } } } public boolean processPMCsv(File tempfile) { FileInputStream brs = null; InputStreamReader isr = null; BufferedReader br = null; List columnNames = new ArrayList(); List commonValues = new ArrayList(); try { brs = new FileInputStream(tempfile); isr = new InputStreamReader(brs, Constant.ENCODING_UTF8); br = new BufferedReader(isr); //common field String commonField = br.readLine(); String[] fields = commonField.split("\\|",-1); for(String com : fields){ String[] comNameAndValue = com.split("=",2); columnNames.add(comNameAndValue[0].trim()); commonValues.add(comNameAndValue[1]); } //column names String columnName = br.readLine(); String[] names = columnName.split("\\|",-1); for(String name : names){ columnNames.add(name); } // xmlPathAndFileName = this.setColumnNames(nename, columnNames,type); String valueLine = ""; List valuelist = new ArrayList(); while ((valueLine = br.readLine()) != null) { if (valueLine.trim().equals("")) { continue; } // countNum ++; String [] values = valueLine.split("\\|",-1); valuelist.addAll(commonValues); for(String value : values){ valuelist.add(value); } // this.appendLine(valuelist, bos); //resultMap HashMap resultMap = this.resultMap(columnNames,valuelist); try { pmResultChannel.put(resultMap); } catch (InterruptedException e) { log.error("collectResultChannel.put(resultMap) error ",e); } valuelist.clear(); } } catch (IOException e) { log.error("processPMCsv is fail ",e); return false; }finally{ try{ if (br != null) br.close(); if (isr != null) isr.close(); if (brs != null) brs.close(); } catch (Exception e){ log.error(e); } } return true; } private HashMap resultMap(List columnNames, List valuelist) { HashMap resultMap = new HashMap(); if(columnNames.size() == valuelist.size()){ for(int i =0;i names = new ArrayList();// colname LinkedHashMap nameAndValue = new LinkedHashMap(); FileInputStream fis = null; InputStreamReader isr = null; XMLStreamReader reader = null; try{ fis = new FileInputStream(tempfile); isr = new InputStreamReader(fis, Constant.ENCODING_UTF8); XMLInputFactory fac = XMLInputFactory.newInstance(); reader = fac.createXMLStreamReader(isr); int event = -1; boolean setcolum = true; while (reader.hasNext()){ try{ event = reader.next(); switch (event){ case XMLStreamConstants.START_ELEMENT: localName = reader.getLocalName(); if ("FieldName".equalsIgnoreCase(localName)){ FieldNameFlag = true; } if (FieldNameFlag){ if ("N".equalsIgnoreCase(localName)){ String colName = reader.getElementText().trim(); names.add(colName); } } if ("FieldValue".equalsIgnoreCase(localName)){ FieldValueFlag = true; } if (FieldValueFlag){ if(setcolum){ xmlPathAndFileName = this.setColumnNames(nename, names,type); setcolum = false; } if ("Object".equalsIgnoreCase(localName)){ int ac = reader.getAttributeCount(); for (int i = 0; i < ac; i++){ if ("rmUID".equalsIgnoreCase(reader.getAttributeLocalName(i))){ rmUID = reader.getAttributeValue(i).trim(); } } nameAndValue.put("rmUID", rmUID); } if ("V".equalsIgnoreCase(localName)) { index = Integer.parseInt(reader .getAttributeValue(0)) - 1; String currentName = names.get(index); String v = reader.getElementText().trim(); nameAndValue.put(currentName, v); } } break; case XMLStreamConstants.CHARACTERS: break; case XMLStreamConstants.END_ELEMENT: endLocalName = reader.getLocalName(); if ("FieldName".equalsIgnoreCase(endLocalName)){ FieldNameFlag = false; } if ("FieldValue".equalsIgnoreCase(endLocalName)){ FieldValueFlag = false; } if ("Object".equalsIgnoreCase(endLocalName)){ countNum ++; this.appendLine(nameAndValue,bos); nameAndValue.clear(); } break; } } catch (Exception e) { log.error(""+StringUtil.getStackTrace(e)); event = reader.next(); } } if(bos != null){ bos.close(); bos = null; } if(fos != null){ fos.close(); fos = null; } String[] fileKeys = this.createZipFile(csvpathAndFileName,xmlPathAndFileName,nename); //ftp store Properties ftpPro = configurationInterface.getProperties(); String ip = ftpPro.getProperty("ftp_ip"); String port = ftpPro.getProperty("ftp_port"); String ftp_user = ftpPro.getProperty("ftp_user"); String ftp_password = ftpPro.getProperty("ftp_password"); String ftp_passive = ftpPro.getProperty("ftp_passive"); String ftp_type = ftpPro.getProperty("ftp_type"); String remoteFile = ftpPro.getProperty("ftp_remote_path"); this.ftpStore(fileKeys,ip,port,ftp_user,ftp_password,ftp_passive,ftp_type,remoteFile); //create Message String message = this.createMessage(fileKeys[1], ftp_user, ftp_password, ip, port, countNum,nename); //set message this.setMessage(message); } catch (Exception e){ log.error(""+StringUtil.getStackTrace(e)); return false; } finally{ try{ if (reader != null){ reader.close(); } if (isr != null){ isr.close(); } if (fis != null){ fis.close(); } if(bos != null){ bos.close(); } if(fos != null){ fos.close(); } } catch (Exception e){ log.error(e); } } return true; } private void setMessage(String message) { try { cmResultChannel.put(message); } catch (Exception e) { log.error("collectResultChannel.put(message) is error "+StringUtil.getStackTrace(e)); } } public String createMessage(String zipName,String user,String pwd,String ip, String port,int countNum, String nename) { StringBuffer strBuffer = new StringBuffer(); strBuffer .append("" + "" + "
"); strBuffer.append(""); strBuffer.append(""); strBuffer.append(""); strBuffer.append(nename); strBuffer.append(""); strBuffer.append(""); strBuffer.append(""); strBuffer.append(""); strBuffer.append(""); strBuffer.append("csv"); strBuffer.append(""); strBuffer.append("gbk"); strBuffer.append(""); strBuffer.append("|"); strBuffer.append(""); strBuffer.append("true"); strBuffer.append(""); strBuffer.append(dateFormat2.format(new Date())); strBuffer.append(""); strBuffer.append(""); strBuffer.append(""); strBuffer.append(zipName); strBuffer.append(""); strBuffer.append("ftp://" + user + ":" + pwd + "@" + ip + ":" + port); strBuffer.append(""); strBuffer.append(""); strBuffer.append(countNum); strBuffer.append(""); strBuffer.append("").append("").append(""); strBuffer.append("").append("").append(""); strBuffer.append(""); return strBuffer.toString(); } private void ftpStore(String[] fileKeys, String ip, String port, String ftp_user, String ftp_password, String ftp_passive, String ftp_type, String remoteFile) { String zipFilePath = fileKeys[0]; FTPInterface ftpClient; ftpClient = new FTPSrv(); //login try { ftpClient.login(ip, Integer.parseInt(port), ftp_user, ftp_password, "GBK", Boolean.parseBoolean(ftp_passive), 5*60*1000); } catch (Exception e) { log.error("login fail,ip=["+ip+"] port=["+port+"] user=["+ftp_user+"]pwd=["+ftp_password+"]"+StringUtil.getStackTrace(e)); return; } // ftpClient.store(zipFilePath, remoteFile); log.debug("store ["+zipFilePath+"]to["+remoteFile+"]"); FileUtils.deleteQuietly(new File(zipFilePath)); } private String[] createZipFile(String csvpathAndFileName,String xmlPathAndFileName,String nename) { String zipPath = resultPath+nename +dateFormat.format(new Date())+"_"+System.nanoTime(); File destDir = new File(zipPath); destDir.mkdirs(); try { FileUtils.copyFileToDirectory(new File(csvpathAndFileName), destDir); FileUtils.copyFileToDirectory(new File(xmlPathAndFileName), destDir); } catch (IOException e) { } String destFilePath = zipPath + ".zip"; try { Zip zip = new Zip(destDir.getAbsolutePath(), destFilePath); zip.setCompressLevel(9); zip.compress(); FileUtils.deleteDirectory(destDir); } catch (IOException e) { log.error("zip.compress() is fail "+StringUtil.getStackTrace(e)); } return new String[] { destFilePath, zipPath + ".zip"}; } private String setColumnNames(String nename, List names,String type) { //write xml String xmlpath = localPath+nename +"/"+type+"/"; File xmlpathfile = new File(xmlpath); if(!xmlpathfile.exists()){ xmlpathfile.mkdirs(); } String xmlFileName = nename +dateFormat.format(new Date())+ System.nanoTime(); String fieldLine = ""; for (int i = 0; i < names.size(); i++) { String field = "\t\r\n" + "\t\t" + i + "\r\n" + "\t\t" + names.get(i) + "\r\n" + "\t\t" + names.get(i) + "\r\n" + "\t\t" + names.get(i) + "\r\n" + "\t\r\n"; fieldLine = fieldLine + field; } String str = "\r\n" + "\r\n" + "\r\n" + fieldLine + "\r\n" + "\r\n"; String xmlPathAndFileName = xmlpath+xmlFileName+".xml"; try { this.writeDetail(xmlPathAndFileName,str); } catch (Exception e) { log.error("writeDetail is fail ,xmlFileName="+xmlFileName +StringUtil.getStackTrace(e)); } return xmlPathAndFileName; } private void writeDetail(String detailFileName,String str) throws Exception { OutputStreamWriter writer = null; OutputStream readOut = null; try { readOut = new FileOutputStream(new File(detailFileName), false); writer = new OutputStreamWriter(readOut); writer.write(str); writer.flush(); } finally { if(null != writer){ writer.close(); } if(readOut != null){ readOut.close(); } } } private void appendLine(LinkedHashMap nameAndValue,BufferedOutputStream bos) { StringBuilder lineDatas = new StringBuilder(); for (String key : nameAndValue.keySet()) { lineDatas.append(nameAndValue.get(key)).append("|"); } try { bos.write(lineDatas.toString().getBytes()); bos.write("\n".getBytes()); } catch (IOException e) { log.error("appendLine error "+StringUtil.getStackTrace(e)); } } // private void appendLine(List values,BufferedOutputStream bos) { // StringBuilder lineDatas = new StringBuilder(); // // for (String value : values) { // lineDatas.append(value).append("|"); // } // try { // bos.write(lineDatas.toString().getBytes()); // bos.write("\n".getBytes()); // } catch (IOException e) { // log.error("appendLine error "+StringUtil.getStackTrace(e)); // } // } public List decompressed(String fileName){ List filelist = new ArrayList(); if (fileName.indexOf(".gz") > 1) { try { File decompressFile = deGz(fileName); filelist.add(decompressFile); } catch (IOException e) { log.error("decompressed is fail "+StringUtil.getStackTrace(e)); } } else if (fileName.indexOf(".zip") > 1) { try { File[] files = deZip(new File(fileName)); for(File temp :files){ filelist.add(temp); } } catch (Exception e) { log.error("decompressed is fail "+StringUtil.getStackTrace(e)); } } else { filelist.add(new File(fileName)); } return filelist; } private File deGz(String gzFileName) throws IOException { Gunzip gunzip = new Gunzip(); String orgFile = gzFileName.replace(".gz", ""); gunzip.unCompress(gzFileName, orgFile); return new File(orgFile); } public File[] deZip(File file) throws Exception{ String regx = "(.*).zip"; Pattern p = Pattern.compile(regx); Matcher m = p.matcher(file.getName()); if (m.find()) { String orgFile = localPath + m.group(1) + "/"; UnZip unzip = new UnZip(file.getAbsolutePath(), orgFile); unzip.deCompress(); file = new File(orgFile); } File[] files = file.listFiles(); return files; } private List ftpDownload(CollectVo collectVo) { List fileList = new ArrayList(); //IP String ip = collectVo.getIP(); //port String port = collectVo.getPort(); //user String user = collectVo.getUser(); //password String password = collectVo.getPassword(); //isPassiveMode String passivemode = collectVo.getPassive(); FTPInterface ftpClient = new FTPSrv(); //login try { ftpClient.login(ip, Integer.parseInt(port), user, password, "GBK", Boolean.parseBoolean(passivemode), 5*60*1000); } catch (Exception e) { log.error("login fail,ip=["+ip+"] port=["+port+"] user=["+user+"]password=["+password+"]"+StringUtil.getStackTrace(e)); return fileList; } //download String dir = collectVo.getRemotepath(); List searchExprList = new ArrayList(); String []FPath = dir.split(";"); for(int i=0;i varMap = new Hashtable(); long[] d = DateUtil.getScanScope(new Date(), 900); searchExprList.add(VarExprParser.replaceVar(conpath,d[0],d[1])); varMap.clear(); varMap = null; log.info("["+conpath+"],result["+(searchExprList.size()-oldSize)+"] path"); conpath = null; } searchExprList =getLocalPathNoRegular(searchExprList); List remoteFiles = new ArrayList(); for(String expr :searchExprList){ String keys[] = parseExprKeys(expr); String ftpRegular = keys[1]; String ftpDir = keys[0]; boolean cdsucess = ftpClient.chdir(expr); if(cdsucess){ AFtpRemoteFile[] arf = (AFtpRemoteFile[]) ftpClient.list(); log.info(" list ["+ftpDir+"],result["+(arf==null?"null":arf.length)+"] files"); //filter rfileFilter(remoteFiles,arf,ftpRegular); keys = null; ftpRegular=ftpDir = null; for(AFtpRemoteFile ftpRemoteFile: remoteFiles){ if(!new File(localPath).exists()){ try { new File(localPath).mkdir(); } catch (Exception e) { log.error("create localPath is fail localPath="+localPath+" "+StringUtil.getStackTrace(e)); } } if(!new File(localPath).exists()){ new File(localPath).mkdirs(); } String localFileName = localPath + ftpRemoteFile.getFileName(); File loaclFile = new File(localFileName); if (loaclFile.exists()) { loaclFile.delete(); } boolean flag = ftpClient.downloadFile(ftpRemoteFile.getAbsFileName(), localFileName); if(flag){ fileList.add(localFileName); }else{ log.error("download file fail fileName="+ftpRemoteFile.getAbsFileName()); } } }else{ log.error("chdir is faill dir =["+dir+"]"); } } return fileList; } private void rfileFilter(List fileContainer, AFtpRemoteFile[] arfs, String ftpRegular) { if (ftpRegular!=null && ftpRegular.length()>0) { Pattern pattern = null; try { pattern = Pattern.compile(ftpRegular, Pattern.CASE_INSENSITIVE); } catch (Exception e) { log.info("["+ftpRegular+"]Pattern.compile exception:"+e.getMessage()); } int hisSize = fileContainer.size(); for (int j=0; arfs!=null&&j -1){ source = source.substring(0, source.indexOf(";")); } if (source.endsWith("/")) return new String[]{source,""}; int idx = source.lastIndexOf("/"); String[] dirkeys = new String[2]; dirkeys[0] = source.substring(0, idx+1); dirkeys[1] = source.substring(idx+1, source.length()); return dirkeys; } public List getLocalPathNoRegular(List searchExprList){ boolean isregular = false; List regularList = new ArrayList(); for(String regular : searchExprList){ Pattern lpattern = null; try{ lpattern = Pattern.compile("(.*/)<([^/]+)>(/.*)"); }catch (Exception e) { log.info("["+regular+"]compile fails:"+e.getMessage()); } Matcher matcher = lpattern.matcher(regular); if(matcher.find()){ isregular = true; String parpath = matcher.group(1); File[] arryFile = new File(parpath).listFiles(); for(File file :arryFile){ if(file.isDirectory()&&file.getName().matches(matcher.group(2))){ regularList.add(matcher.group(1)+file.getName()+matcher.group(3)); } } }else{ regularList.add(regular); } } if(isregular==true){ getLocalPathNoRegular(regularList); } return regularList; } }