2 * Copyright 2017 BOCO Corporation. CMCC Technologies Co., Ltd
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onap.vfc.nfvo.emsdriver.collector;
18 import java.io.BufferedOutputStream;
19 import java.io.BufferedReader;
21 import java.io.FileInputStream;
22 import java.io.FileNotFoundException;
23 import java.io.FileOutputStream;
24 import java.io.IOException;
25 import java.io.InputStreamReader;
26 import java.io.OutputStream;
27 import java.io.OutputStreamWriter;
28 import java.text.SimpleDateFormat;
29 import java.util.ArrayList;
30 import java.util.Date;
31 import java.util.HashMap;
32 import java.util.Hashtable;
33 import java.util.LinkedHashMap;
34 import java.util.List;
35 import java.util.Properties;
36 import java.util.regex.Matcher;
37 import java.util.regex.Pattern;
39 import javax.xml.stream.XMLInputFactory;
40 import javax.xml.stream.XMLStreamConstants;
41 import javax.xml.stream.XMLStreamReader;
43 import org.apache.commons.io.FileUtils;
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46 import org.onap.vfc.nfvo.emsdriver.commons.constant.Constant;
47 import org.onap.vfc.nfvo.emsdriver.commons.ftp.AFtpRemoteFile;
48 import org.onap.vfc.nfvo.emsdriver.commons.ftp.FTPInterface;
49 import org.onap.vfc.nfvo.emsdriver.commons.ftp.FTPSrv;
50 import org.onap.vfc.nfvo.emsdriver.commons.ftp.RemoteFile;
51 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectMsg;
52 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectVo;
53 import org.onap.vfc.nfvo.emsdriver.commons.utils.DateUtil;
54 import org.onap.vfc.nfvo.emsdriver.commons.utils.Gunzip;
55 import org.onap.vfc.nfvo.emsdriver.commons.utils.StringUtil;
56 import org.onap.vfc.nfvo.emsdriver.commons.utils.UnZip;
57 import org.onap.vfc.nfvo.emsdriver.commons.utils.VarExprParser;
58 import org.onap.vfc.nfvo.emsdriver.commons.utils.Zip;
59 import org.onap.vfc.nfvo.emsdriver.configmgr.ConfigurationImp;
60 import org.onap.vfc.nfvo.emsdriver.configmgr.ConfigurationInterface;
61 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannel;
62 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannelFactory;
65 public class TaskThread implements Runnable{
67 public Log log = LogFactory.getLog(TaskThread.class);
69 private MessageChannel cmResultChannel;
70 public MessageChannel pmResultChannel;
72 private CollectMsg data;
74 private ConfigurationInterface configurationInterface = new ConfigurationImp();
76 private String localPath = Constant.SYS_DATA_TEMP;
77 private String resultPath = Constant.SYS_DATA_RESULT;
79 private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
81 private SimpleDateFormat dateFormat2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
83 // private String csvpathAndFileName;
84 // private String xmlPathAndFileName;
85 // private int countNum = 0 ;
87 public TaskThread(CollectMsg data) {
97 cmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_CHANNEL_KEY);
98 pmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_PM_CHANNEL_KEY);
100 collectMsgHandle(data);
101 } catch (Exception e) {
106 private void collectMsgHandle(CollectMsg collectMsg) {
107 String emsName = collectMsg.getEmsName();
108 String type = collectMsg.getType();
109 CollectVo collectVo = configurationInterface.getCollectVoByEmsNameAndType(emsName, type);
112 List<String> downloadfiles = this.ftpDownload(collectVo);
113 //paser ftp update message send
114 for(String fileName :downloadfiles){
115 this.parseFtpAndSendMessage(fileName,collectVo);
119 public void parseFtpAndSendMessage(String fileName, CollectVo collectVo) {
121 List<File> filelist = decompressed(fileName);
123 for (File tempfile : filelist) {
125 String unfileName = tempfile.getName();
127 Pattern pa = Pattern.compile(".*-(.*)-\\w{2}-");
128 Matcher ma = pa.matcher(unfileName);
131 String nename = ma.group(1);
132 boolean parseResult = false;
133 if(Constant.COLLECT_TYPE_CM.equalsIgnoreCase(collectVo.getType())){
134 parseResult = processCMXml(tempfile, nename,"CM");
136 if(unfileName.indexOf(".csv") > 0){
137 parseResult = processPMCsv(tempfile);
139 parseResult = processPMXml(tempfile);
144 log.info("parser "+tempfile+" sucess");
146 log.info("parser "+tempfile+" fail");
152 public boolean processPMXml(File file) {
154 FileInputStream fis = null;
155 InputStreamReader isr = null;
156 XMLStreamReader reader = null;
158 fis = new FileInputStream(file);
159 isr = new InputStreamReader(fis, Constant.ENCODING_UTF8);
161 XMLInputFactory fac = XMLInputFactory.newInstance();
162 reader = fac.createXMLStreamReader(isr);
164 boolean fileHeaderStart = false;
165 boolean measurementStart = false;
166 boolean pmNameFlag = false;
167 boolean pmDataFlag = false;
168 boolean objectFlag = true;
172 String currentMea = null;
173 String subName = null;
174 String localName = null;
175 String endLocalName = null;
176 String objectType = null;
179 LinkedHashMap<String, String> commonNameAndValue = new LinkedHashMap<String, String>();
180 LinkedHashMap<String, String> pmDatas = null;
181 LinkedHashMap<Integer, String> pmNames = null;
185 while (reader.hasNext()) {
187 event = reader.next();
190 case XMLStreamConstants.START_ELEMENT:
191 localName = reader.getLocalName();
192 if ("FileHeader".equalsIgnoreCase(localName)) {
193 fileHeaderStart = true;
196 if(!"FileHeader".equalsIgnoreCase(localName)){
197 commonNameAndValue.put(localName,reader.getElementText().trim());
201 if ("Measurements".equalsIgnoreCase(localName)) {
202 // a new Measurement starts
203 measurementStart = true;
205 if (measurementStart) {
206 // measurement handler
207 if ("ObjectType".equalsIgnoreCase(localName)) {
208 objectType = reader.getElementText().trim();
209 commonNameAndValue.put("ObjectType",objectType);
211 if ("PmName".equalsIgnoreCase(localName)) {
213 pmNames = new LinkedHashMap<Integer, String>();
217 // pmname handler, add columnNames
218 if ("N".equalsIgnoreCase(localName)) {
219 nameIndex = Integer.parseInt(getXMLAttribute(reader, "i"));
220 String text = reader.getElementText().trim();
221 pmNames.put(nameIndex, text);
224 if ("PmData".equalsIgnoreCase(localName)) {
226 pmDatas = new LinkedHashMap<String, String>();
231 if ("Object".equalsIgnoreCase(localName)) {
233 int n = reader.getAttributeCount();
234 for(int i = 0; i < n; i++) {
235 String name = reader.getAttributeLocalName(i);
236 commonNameAndValue.put(name, reader.getAttributeValue(i));
242 if ("V".equalsIgnoreCase(localName)) {
243 String indexStr = getXMLAttribute(reader, "i");
244 if(indexStr == null) {
245 log.error("ERROR: illegal value index");
248 index = Integer.parseInt(indexStr);
249 String name = pmNames.get(index);
251 log.error("illegal data: valueIndex="+index);
255 String value = reader.getElementText().trim();
256 pmDatas.put(name, value);
258 if ("CV".equalsIgnoreCase(localName)) {
260 String indexStr = getXMLAttribute(reader, "i");
261 if(indexStr == null) {
262 log.error("ERROR: illegal value index");
265 index = Integer.parseInt(indexStr);
267 currentMea = pmNames.get(index);
268 if(currentMea == null){
269 log.error("illegal data: valueIndex="+index);
274 if ("SN".equalsIgnoreCase(localName)) {
275 subName = reader.getElementText().trim();
278 if ("SV".equalsIgnoreCase(localName)) {
279 String subValue = reader.getElementText().trim();
280 // pmDatas.put(currentMea+subName, subValue);
281 pmDatas.put(subName, subValue);
289 case XMLStreamConstants.CHARACTERS:
292 case XMLStreamConstants.END_ELEMENT:
294 endLocalName = reader.getLocalName();
295 if("Object".equalsIgnoreCase(endLocalName)){
297 pmDatas.putAll(commonNameAndValue);
299 pmResultChannel.put(pmDatas);
301 } catch (InterruptedException e) {
302 log.error("collectResultChannel.put(resultMap) error ",e);
304 // System.out.println(pmDatas);
307 if (endLocalName.equalsIgnoreCase("PmData")) {
311 if (endLocalName.equalsIgnoreCase("PmName")) {
314 if (endLocalName.equalsIgnoreCase("Measurements")) {
315 // a measurement over
316 measurementStart = false;
319 if("FileHeader".equalsIgnoreCase(endLocalName)){
320 fileHeaderStart = false;
324 }catch (Exception e){
326 event = reader.next();
330 } catch (Exception e) {
331 log.error("processPMXml is Exception ",e);
335 if(reader != null)reader.close();
336 if(isr!= null)isr.close();
337 if(fis != null)fis.close();
338 }catch (Exception e) {
345 private String getXMLAttribute(XMLStreamReader reader, String obj) {
347 if(obj == null || reader == null){
350 int n = reader.getAttributeCount();
351 for(int i = 0; i < n; i++) {
352 String name = reader.getAttributeLocalName(i);
353 if(obj.equalsIgnoreCase(name)){
354 res = reader.getAttributeValue(i);
360 public boolean processPMCsv(File tempfile) {
362 FileInputStream brs = null;
363 InputStreamReader isr = null;
364 BufferedReader br = null;
366 List<String> columnNames = new ArrayList<String>();
367 List<String> commonValues = new ArrayList<String>();
370 brs = new FileInputStream(tempfile);
371 isr = new InputStreamReader(brs, Constant.ENCODING_UTF8);
372 br = new BufferedReader(isr);
374 String commonField = br.readLine();
375 String[] fields = commonField.split("\\|",-1);
376 for(String com : fields){
377 String[] comNameAndValue = com.split("=",2);
378 columnNames.add(comNameAndValue[0].trim());
379 commonValues.add(comNameAndValue[1]);
382 String columnName = br.readLine();
383 String[] names = columnName.split("\\|",-1);
384 for(String name : names){
385 columnNames.add(name);
388 String valueLine = "";
389 List<String> valuelist = new ArrayList<String>();
391 while ((valueLine = br.readLine()) != null) {
392 if (valueLine.trim().equals("")) {
396 String [] values = valueLine.split("\\|",-1);
398 valuelist.addAll(commonValues);
399 for(String value : values){
400 valuelist.add(value);
402 // this.appendLine(valuelist, bos);
404 HashMap<String,String> resultMap = this.resultMap(columnNames,valuelist);
406 pmResultChannel.put(resultMap);
407 } catch (InterruptedException e) {
408 log.error("collectResultChannel.put(resultMap) error ",e);
412 } catch (IOException e) {
413 log.error("processPMCsv is fail ",e);
424 } catch (Exception e){
432 private HashMap<String,String> resultMap(List<String> columnNames, List<String> valuelist) {
434 HashMap<String,String> resultMap = new HashMap<String,String>();
435 if(columnNames.size() == valuelist.size()){
436 for(int i =0;i<columnNames.size();i++){
437 resultMap.put(columnNames.get(i), valuelist.get(i));
444 private boolean processCMXml(File tempfile, String nename, String type) {
446 String csvpath = localPath+nename+"/"+type+"/";
447 File csvpathfile = new File(csvpath);
448 if(!csvpathfile.exists()){
449 csvpathfile.mkdirs();
451 String csvFileName = nename +dateFormat.format(new Date())+ System.nanoTime();
452 String csvpathAndFileName = csvpath+csvFileName+".csv";
453 BufferedOutputStream bos = null;
454 FileOutputStream fos = null;
456 fos = new FileOutputStream(csvpathAndFileName,false);
457 bos = new BufferedOutputStream(fos, 10240);
458 } catch (FileNotFoundException e1) {
459 log.error("FileNotFoundException "+StringUtil.getStackTrace(e1));
462 boolean FieldNameFlag = false;
463 boolean FieldValueFlag = false;
466 String xmlPathAndFileName = null;
467 String localName = null;
468 String endLocalName = null;
471 ArrayList<String> names = new ArrayList<String>();// colname
472 LinkedHashMap<String, String> nameAndValue = new LinkedHashMap<String, String>();
475 FileInputStream fis = null;
476 InputStreamReader isr = null;
477 XMLStreamReader reader = null;
479 fis = new FileInputStream(tempfile);
480 isr = new InputStreamReader(fis, Constant.ENCODING_UTF8);
481 XMLInputFactory fac = XMLInputFactory.newInstance();
482 reader = fac.createXMLStreamReader(isr);
484 boolean setcolum = true;
485 while (reader.hasNext()){
487 event = reader.next();
489 case XMLStreamConstants.START_ELEMENT:
490 localName = reader.getLocalName();
491 if ("FieldName".equalsIgnoreCase(localName)){
492 FieldNameFlag = true;
495 if ("N".equalsIgnoreCase(localName)){
496 String colName = reader.getElementText().trim();
500 if ("FieldValue".equalsIgnoreCase(localName)){
501 FieldValueFlag = true;
506 xmlPathAndFileName = this.setColumnNames(nename, names,type);
510 if ("Object".equalsIgnoreCase(localName)){
511 int ac = reader.getAttributeCount();
512 for (int i = 0; i < ac; i++){
513 if ("rmUID".equalsIgnoreCase(reader.getAttributeLocalName(i))){
514 rmUID = reader.getAttributeValue(i).trim();
517 nameAndValue.put("rmUID", rmUID);
519 if ("V".equalsIgnoreCase(localName)) {
520 index = Integer.parseInt(reader
521 .getAttributeValue(0)) - 1;
522 String currentName = names.get(index);
523 String v = reader.getElementText().trim();
524 nameAndValue.put(currentName, v);
528 case XMLStreamConstants.CHARACTERS:
530 case XMLStreamConstants.END_ELEMENT:
531 endLocalName = reader.getLocalName();
533 if ("FieldName".equalsIgnoreCase(endLocalName)){
534 FieldNameFlag = false;
536 if ("FieldValue".equalsIgnoreCase(endLocalName)){
537 FieldValueFlag = false;
539 if ("Object".equalsIgnoreCase(endLocalName)){
541 this.appendLine(nameAndValue,bos);
542 nameAndValue.clear();
546 } catch (Exception e)
548 log.error(""+StringUtil.getStackTrace(e));
549 event = reader.next();
563 String[] fileKeys = this.createZipFile(csvpathAndFileName,xmlPathAndFileName,nename);
565 Properties ftpPro = configurationInterface.getProperties();
566 String ip = ftpPro.getProperty("ftp_ip");
567 String port = ftpPro.getProperty("ftp_port");
568 String ftp_user = ftpPro.getProperty("ftp_user");
569 String ftp_password = ftpPro.getProperty("ftp_password");
571 String ftp_passive = ftpPro.getProperty("ftp_passive");
572 String ftp_type = ftpPro.getProperty("ftp_type");
573 String remoteFile = ftpPro.getProperty("ftp_remote_path");
574 this.ftpStore(fileKeys,ip,port,ftp_user,ftp_password,ftp_passive,ftp_type,remoteFile);
576 String message = this.createMessage(fileKeys[1], ftp_user, ftp_password, ip, port, countNum,nename);
579 this.setMessage(message);
580 } catch (Exception e){
581 log.error(""+StringUtil.getStackTrace(e));
601 } catch (Exception e){
608 private void setMessage(String message) {
611 cmResultChannel.put(message);
612 } catch (Exception e) {
613 log.error("collectResultChannel.put(message) is error "+StringUtil.getStackTrace(e));
617 public String createMessage(String zipName,String user,String pwd,String ip, String port,int countNum, String nename) {
619 StringBuffer strBuffer = new StringBuffer();
621 .append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
622 + "<FILE_DATA_READY_UL xmlns:xsi=\" http://www.w3.org/2001/XMLSchema-instance\">"
623 + "<Header SessionID=\"");
624 strBuffer.append("");
625 strBuffer.append("\" LicenceID=\"");
626 strBuffer.append("");
627 strBuffer.append("\" SystemID=\"");
628 strBuffer.append("");
629 strBuffer.append("\" Time=\"");
630 strBuffer.append( dateFormat2.format(new Date()));
631 strBuffer.append("\" PolicyID=\"");
632 strBuffer.append("");
633 strBuffer.append("\"/><Body>");
634 strBuffer.append("<DataCatalog>");
635 strBuffer.append("");
636 strBuffer.append("</DataCatalog><GroupID>");
637 strBuffer.append(nename);
638 strBuffer.append("</GroupID><DataSourceName>");
639 strBuffer.append("");
640 strBuffer.append("</DataSourceName><InstanceID>");
641 strBuffer.append("");
642 strBuffer.append("</InstanceID><FileFormat>");
643 strBuffer.append("csv");
644 strBuffer.append("</FileFormat><CharSet>");
645 strBuffer.append("gbk");
646 strBuffer.append("</CharSet><FieldSeparator>");
647 strBuffer.append("|");
648 strBuffer.append("</FieldSeparator><IsCompressed>");
649 strBuffer.append("true");
650 strBuffer.append("</IsCompressed><StartTime>");
651 strBuffer.append(dateFormat2.format(new Date()));
652 strBuffer.append("</StartTime><EndTime>");
653 strBuffer.append("");
654 strBuffer.append("</EndTime><FileList>");
655 strBuffer.append(zipName);
656 strBuffer.append("</FileList><ConnectionString>");
657 strBuffer.append("ftp://" + user + ":" + pwd + "@" + ip + ":" + port);
658 strBuffer.append("</ConnectionString>");
659 strBuffer.append("<DataCount>");
660 strBuffer.append(countNum);
661 strBuffer.append("</DataCount>");
663 strBuffer.append("<FileSize>").append("").append("</FileSize>");
664 strBuffer.append("<DataGranularity>").append("").append("</DataGranularity>");
667 strBuffer.append("</Body></FILE_DATA_READY_UL>");
668 return strBuffer.toString();
672 private void ftpStore(String[] fileKeys, String ip, String port, String ftp_user, String ftp_password,
673 String ftp_passive, String ftp_type, String remoteFile) {
674 String zipFilePath = fileKeys[0];
677 FTPInterface ftpClient;
678 ftpClient = new FTPSrv();
681 ftpClient.login(ip, Integer.parseInt(port), ftp_user, ftp_password, "GBK", Boolean.parseBoolean(ftp_passive), 5*60*1000);
682 } catch (Exception e) {
683 log.error("login fail,ip=["+ip+"] port=["+port+"] user=["+ftp_user+"]pwd=["+ftp_password+"]"+StringUtil.getStackTrace(e));
686 // ftpClient.store(zipFilePath, remoteFile);
687 log.debug("store ["+zipFilePath+"]to["+remoteFile+"]");
689 FileUtils.deleteQuietly(new File(zipFilePath));
694 private String[] createZipFile(String csvpathAndFileName,String xmlPathAndFileName,String nename) {
696 String zipPath = resultPath+nename +dateFormat.format(new Date())+"_"+System.nanoTime();
698 File destDir = new File(zipPath);
702 FileUtils.copyFileToDirectory(new File(csvpathAndFileName), destDir);
703 FileUtils.copyFileToDirectory(new File(xmlPathAndFileName), destDir);
704 } catch (IOException e) {
708 String destFilePath = zipPath + ".zip";
710 Zip zip = new Zip(destDir.getAbsolutePath(), destFilePath);
711 zip.setCompressLevel(9);
714 FileUtils.deleteDirectory(destDir);
715 } catch (IOException e) {
716 log.error("zip.compress() is fail "+StringUtil.getStackTrace(e));
718 return new String[] { destFilePath, zipPath + ".zip"};
722 private String setColumnNames(String nename, List<String> names,String type) {
724 String xmlpath = localPath+nename +"/"+type+"/";
725 File xmlpathfile = new File(xmlpath);
726 if(!xmlpathfile.exists()){
727 xmlpathfile.mkdirs();
729 String xmlFileName = nename +dateFormat.format(new Date())+ System.nanoTime();
730 String fieldLine = "";
731 for (int i = 0; i < names.size(); i++) {
732 String field = "\t<Field>\r\n" + "\t\t<FieldNo>" + i
733 + "</FieldNo>\r\n" + "\t\t<FieldName>"
734 + names.get(i) + "</FieldName>\r\n"
735 + "\t\t<FieldType>" + names.get(i)
737 + "\t\t<FieldNameOther>" + names.get(i)
738 + "</FieldNameOther>\r\n" +
740 fieldLine = fieldLine + field;
743 String str = "<?xml version=\"1.0\" encoding=\"gbk\"?>\r\n"
744 + "<xml>\r\n" + "<FILE_STRUCTURE>\r\n" + fieldLine
745 + "</FILE_STRUCTURE>\r\n" + "</xml>\r\n";
746 String xmlPathAndFileName = xmlpath+xmlFileName+".xml";
748 this.writeDetail(xmlPathAndFileName,str);
749 } catch (Exception e) {
750 log.error("writeDetail is fail ,xmlFileName="+xmlFileName +StringUtil.getStackTrace(e));
753 return xmlPathAndFileName;
756 private void writeDetail(String detailFileName,String str) throws Exception {
757 OutputStreamWriter writer = null;
758 OutputStream readOut = null;
760 readOut = new FileOutputStream(new File(detailFileName), false);
761 writer = new OutputStreamWriter(readOut);
778 private void appendLine(LinkedHashMap<String, String> nameAndValue,BufferedOutputStream bos) {
779 StringBuilder lineDatas = new StringBuilder();
781 for (String key : nameAndValue.keySet()) {
782 lineDatas.append(nameAndValue.get(key)).append("|");
785 bos.write(lineDatas.toString().getBytes());
786 bos.write("\n".getBytes());
787 } catch (IOException e) {
788 log.error("appendLine error "+StringUtil.getStackTrace(e));
792 // private void appendLine(List<String> values,BufferedOutputStream bos) {
793 // StringBuilder lineDatas = new StringBuilder();
795 // for (String value : values) {
796 // lineDatas.append(value).append("|");
799 // bos.write(lineDatas.toString().getBytes());
800 // bos.write("\n".getBytes());
801 // } catch (IOException e) {
802 // log.error("appendLine error "+StringUtil.getStackTrace(e));
806 public List<File> decompressed(String fileName){
807 List<File> filelist = new ArrayList<File>();
809 if (fileName.indexOf(".gz") > 1)
812 File decompressFile = deGz(fileName);
813 filelist.add(decompressFile);
814 } catch (IOException e) {
815 log.error("decompressed is fail "+StringUtil.getStackTrace(e));
817 } else if (fileName.indexOf(".zip") > 1)
820 File[] files = deZip(new File(fileName));
821 for(File temp :files){
824 } catch (Exception e) {
825 log.error("decompressed is fail "+StringUtil.getStackTrace(e));
829 filelist.add(new File(fileName));
835 private File deGz(String gzFileName) throws IOException {
836 Gunzip gunzip = new Gunzip();
837 String orgFile = gzFileName.replace(".gz", "");
838 gunzip.unCompress(gzFileName, orgFile);
839 return new File(orgFile);
842 public File[] deZip(File file) throws Exception{
844 String regx = "(.*).zip";
845 Pattern p = Pattern.compile(regx);
846 Matcher m = p.matcher(file.getName());
849 String orgFile = localPath + m.group(1) + "/";
850 UnZip unzip = new UnZip(file.getAbsolutePath(), orgFile);
852 file = new File(orgFile);
854 File[] files = file.listFiles();
860 private List<String> ftpDownload(CollectVo collectVo) {
862 List<String> fileList = new ArrayList<String>();
864 String ip = collectVo.getIP();
866 String port = collectVo.getPort();
868 String user = collectVo.getUser();
870 String password = collectVo.getPassword();
872 String passivemode = collectVo.getPassive();
874 FTPInterface ftpClient = new FTPSrv();
878 log.info("ftp login ,ip=["+ip+"] port=["+port+"] user=["+user+"]password=["+password+"]");
879 ftpClient.login(ip, Integer.parseInt(port), user, password, "GBK", Boolean.parseBoolean(passivemode), 5*60*1000);
880 } catch (Exception e) {
881 log.error("login fail,ip=["+ip+"] port=["+port+"] user=["+user+"]password=["+password+"]"+StringUtil.getStackTrace(e));
886 String dir = collectVo.getRemotepath();
887 List<String> searchExprList = new ArrayList<String>();
888 String []FPath = dir.split(";");
889 for(int i=0;i<FPath.length;i++){
890 int oldSize = searchExprList.size();
891 String conpath = FPath[i] + collectVo.getMatch();
892 Hashtable<String,String> varMap = new Hashtable<String,String>();
893 long collectPeriod = 900;
895 collectPeriod = Long.parseLong(collectVo.getGranularity())*60;
896 log.info("collectPeriod =["+collectPeriod+"]");
897 } catch (NumberFormatException e) {
900 long[] d = DateUtil.getScanScope(new Date(), collectPeriod);
901 searchExprList.add(VarExprParser.replaceVar(conpath,d[0],d[1]));
905 log.info("["+conpath+"] result["+(searchExprList.size()-oldSize)+"] path");
908 String nowdir = null;
910 nowdir = ftpClient.pwd();
911 searchExprList =getPathNoRegular(searchExprList,ftpClient);
912 } catch (Exception e1) {
913 log.error(" collect fail ",e1);
916 List<AFtpRemoteFile> remoteFiles = new ArrayList<AFtpRemoteFile>();
917 for(String expr :searchExprList){
918 ftpClient.chdir(nowdir);
919 String keys[] = parseExprKeys(expr);
920 String ftpRegular = keys[1];
921 String ftpDir = keys[0];
923 boolean cdsucess = ftpClient.chdir(ftpDir);
925 AFtpRemoteFile[] arf = (AFtpRemoteFile[]) ftpClient.list();
926 log.info(" list ["+ftpDir+"] result["+(arf==null?"null":arf.length)+"] files");
929 rfileFilter(remoteFiles,arf,ftpRegular);
932 ftpRegular=ftpDir = null;
934 for(AFtpRemoteFile ftpRemoteFile: remoteFiles){
935 if(!new File(localPath).exists()){
937 new File(localPath).mkdir();
938 } catch (Exception e) {
939 log.error("create localPath is fail localPath="+localPath+" "+StringUtil.getStackTrace(e));
943 if(!new File(localPath).exists()){
944 new File(localPath).mkdirs();
947 String localFileName = localPath + ftpRemoteFile.getFileName();
948 File loaclFile = new File(localFileName);
949 if (loaclFile.exists()) {
953 boolean flag = ftpClient.downloadFile(ftpRemoteFile.getAbsFileName(), localFileName);
956 fileList.add(localFileName);
958 log.error("download file fail fileName="+ftpRemoteFile.getAbsFileName());
963 log.error("cd dir is faill dir =["+ftpDir+"]");
972 private void rfileFilter(List<AFtpRemoteFile> fileContainer, AFtpRemoteFile[] arfs, String ftpRegular) {
973 if (ftpRegular!=null && ftpRegular.length()>0) {
974 Pattern pattern = null;
976 pattern = Pattern.compile(ftpRegular, Pattern.CASE_INSENSITIVE);
977 } catch (Exception e) {
978 log.info("["+ftpRegular+"]Pattern.compile exception:"+e.getMessage());
980 int hisSize = fileContainer.size();
981 for (int j=0; arfs!=null&&j<arfs.length; j++) {
982 String fileName = parseFileName(arfs[j].getFileName());
983 Matcher matcher = pattern.matcher(fileName);
985 fileContainer.add(arfs[j]);
987 log.info("["+ftpRegular+"]filter["+(fileContainer.size()-hisSize)+"]filse");
990 for (int j=0; arfs!=null&&j<arfs.length; j++)
991 fileContainer.add(arfs[j]);
996 private String parseFileName(String fileName) {
997 int idx = fileName.lastIndexOf("/");
1000 return fileName.substring(idx+1, fileName.length());
1003 private String[] parseExprKeys(String source) {
1005 if(source.indexOf(";") > -1){
1006 source = source.substring(0, source.indexOf(";"));
1008 if (source.endsWith("/"))
1009 return new String[]{source,""};
1011 int idx = source.lastIndexOf("/");
1012 String[] dirkeys = new String[2];
1013 dirkeys[0] = source.substring(0, idx+1);
1014 dirkeys[1] = source.substring(idx+1, source.length());
1019 public List<String> getPathNoRegular(List<String> searchExprList,FTPInterface ftpCache) throws Exception{
1020 boolean isregular = false;
1021 List<String> regularList = new ArrayList<String>();
1022 for(String regular : searchExprList){
1023 Pattern lpattern = null;
1025 lpattern = Pattern.compile("(.*/)<([^/]+)>(/.*)");
1026 }catch (Exception e) {
1027 log.error("["+regular+"]compile fails:"+e.getMessage());
1028 e.printStackTrace();
1031 Matcher matcher = lpattern.matcher(regular);
1034 String parpath = matcher.group(1);
1036 boolean isin = ftpCache.chdir(parpath);
1038 log.info("cd dir [" + parpath + "] sucess");
1040 log.error("cd dir [" + parpath + "] fail");
1042 }catch(Exception e){
1043 log.error(" cd dir ["+parpath+"]fail",e);
1046 RemoteFile[] remotef = ftpCache.list();
1047 for(RemoteFile aremote :remotef){
1048 if(aremote.isDirectory()&&aremote.getFileName().matches(matcher.group(2))){
1049 regularList.add(matcher.group(1)+aremote.getFileName()+matcher.group(3));
1053 regularList.add(regular);
1056 if(isregular==true){
1057 getPathNoRegular(regularList,ftpCache);