2 * Copyright 2017 BOCO Corporation. CMCC Technologies Co., Ltd
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onap.vfc.nfvo.emsdriver.collector;
18 import org.apache.commons.io.FileUtils;
19 import org.apache.commons.logging.Log;
20 import org.apache.commons.logging.LogFactory;
21 import org.onap.vfc.nfvo.emsdriver.commons.constant.Constant;
22 import org.onap.vfc.nfvo.emsdriver.commons.ftp.AFtpRemoteFile;
23 import org.onap.vfc.nfvo.emsdriver.commons.ftp.FTPInterface;
24 import org.onap.vfc.nfvo.emsdriver.commons.ftp.FTPSrv;
25 import org.onap.vfc.nfvo.emsdriver.commons.ftp.RemoteFile;
26 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectMsg;
27 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectVo;
28 import org.onap.vfc.nfvo.emsdriver.commons.utils.*;
29 import org.onap.vfc.nfvo.emsdriver.configmgr.ConfigurationImp;
30 import org.onap.vfc.nfvo.emsdriver.configmgr.ConfigurationInterface;
31 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannel;
32 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannelFactory;
34 import javax.xml.stream.XMLInputFactory;
35 import javax.xml.stream.XMLStreamConstants;
36 import javax.xml.stream.XMLStreamReader;
38 import java.text.SimpleDateFormat;
40 import java.util.regex.Matcher;
41 import java.util.regex.Pattern;
43 public class TaskThread implements Runnable {
45 private static final Log log = LogFactory.getLog(TaskThread.class);
46 private MessageChannel pmResultChannel;
47 private MessageChannel cmResultChannel;
48 private CollectMsg data;
50 private ConfigurationInterface configurationInterface = new ConfigurationImp();
52 private String localPath = Constant.SYS_DATA_TEMP;
53 private String resultPath = Constant.SYS_DATA_RESULT;
55 private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
57 private SimpleDateFormat dateFormat2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
59 public TaskThread(CollectMsg data) {
70 cmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_CHANNEL_KEY);
71 pmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_PM_CHANNEL_KEY);
73 collectMsgHandle(data);
74 } catch (Exception e) {
75 log.error(" collectMsgHandle", e);
79 private void collectMsgHandle(CollectMsg collectMsg) {
80 String emsName = collectMsg.getEmsName();
81 String type = collectMsg.getType();
82 CollectVo collectVo = configurationInterface.getCollectVoByEmsNameAndType(emsName, type);
85 List<String> downloadfiles = this.ftpDownload(collectVo);
86 // paser ftp update message send
87 for (String fileName : downloadfiles) {
88 this.parseFtpAndSendMessage(fileName, collectVo);
92 public void parseFtpAndSendMessage(String fileName, CollectVo collectVo) {
94 List<File> filelist = decompressed(fileName);
96 for (File tempfile : filelist) {
98 String unfileName = tempfile.getName();
100 Pattern pa = Pattern.compile(".*-(.*)-\\w{2}-");
101 Matcher ma = pa.matcher(unfileName);
104 String nename = ma.group(1);
105 boolean parseResult = false;
106 if (Constant.COLLECT_TYPE_CM.equalsIgnoreCase(collectVo.getType())) {
107 parseResult = processCMXml(tempfile, nename, "CM");
109 if (unfileName.indexOf(".csv") > -1) {//changed to -1 for coding practice as having ".csv" must have some some legal name
110 parseResult = processPMCsv(tempfile);
112 parseResult = processPMXml(tempfile);
117 log.info("parser " + tempfile + " sucess");
120 log.info("parser " + tempfile + " fail");
126 public boolean processPMXml(File file) {
127 try ( FileInputStream fis = new FileInputStream(file);
128 InputStreamReader isr = new InputStreamReader(fis, Constant.ENCODING_UTF8)){
130 XMLInputFactory fac = XMLInputFactory.newInstance();
131 XMLStreamReader reader = fac.createXMLStreamReader(isr);
133 boolean fileHeaderStart = false;
134 boolean measurementStart = false;
135 boolean pmNameFlag = false;
136 boolean pmDataFlag = false;
137 boolean objectFlag = true;
141 String currentMea = null;
142 String subName = null;
143 String localName = null;
144 String endLocalName = null;
145 String objectType = null;
147 LinkedHashMap<String, String> commonNameAndValue = new LinkedHashMap<>();
148 LinkedHashMap<String, String> pmDatas = null;
149 LinkedHashMap<Integer, String> pmNames = null;
152 while (reader.hasNext()) {
154 event = reader.next();
157 case XMLStreamConstants.START_ELEMENT:
158 localName = reader.getLocalName();
159 if ("FileHeader".equalsIgnoreCase(localName)) {
160 fileHeaderStart = true;
162 if (fileHeaderStart && !"FileHeader".equalsIgnoreCase(localName)) {
163 commonNameAndValue.put(localName, reader.getElementText().trim());
165 if ("Measurements".equalsIgnoreCase(localName)) {
166 // a new Measurement starts
167 measurementStart = true;
169 if (measurementStart) {
170 // measurement handler
171 if ("ObjectType".equalsIgnoreCase(localName)) {
172 objectType = reader.getElementText().trim();
173 commonNameAndValue.put("ObjectType", objectType);
175 if ("PmName".equalsIgnoreCase(localName)) {
177 pmNames = new LinkedHashMap<Integer, String>();
180 if (pmNameFlag && "N".equalsIgnoreCase(localName)) {
181 // pmname handler, add columnNames
182 nameIndex = Integer.parseInt(getXMLAttribute(reader, "i"));
183 String text = reader.getElementText().trim();
184 pmNames.put(nameIndex, text);
186 if ("PmData".equalsIgnoreCase(localName)) {
188 pmDatas = new LinkedHashMap<String, String>();
193 if ("Object".equalsIgnoreCase(localName)) {
195 int n = reader.getAttributeCount();
196 for (int i = 0; i < n; i++) {
197 String name = reader.getAttributeLocalName(i);
198 commonNameAndValue.put(name, reader.getAttributeValue(i));
203 if ("V".equalsIgnoreCase(localName)) {
204 String indexStr = getXMLAttribute(reader, "i");
205 if (indexStr == null) {
206 log.error("ERROR: illegal value index");
209 index = Integer.parseInt(indexStr);
210 String name = pmNames.get(index);
212 log.error("illegal data: valueIndex=" + index);
216 String value = reader.getElementText().trim();
217 pmDatas.put(name, value);
219 if ("CV".equalsIgnoreCase(localName)) {
221 String indexStr = getXMLAttribute(reader, "i");
222 if (indexStr == null) {
223 log.error("ERROR: illegal value index");
226 index = Integer.parseInt(indexStr);
228 currentMea = pmNames.get(index);
229 if (currentMea == null) {
230 log.error("illegal data: valueIndex=" + index);
235 if ("SN".equalsIgnoreCase(localName)) {
236 subName = reader.getElementText().trim();
239 if ("SV".equalsIgnoreCase(localName)) {
240 String subValue = reader.getElementText().trim();
241 pmDatas.put(subName, subValue);
249 case XMLStreamConstants.CHARACTERS:
252 case XMLStreamConstants.END_ELEMENT:
253 endLocalName = reader.getLocalName();
254 if ("Object".equalsIgnoreCase(endLocalName)) {
256 pmDatas.putAll(commonNameAndValue);
258 pmResultChannel.put(pmDatas);
260 } catch (Exception e) {
261 pmResultChannel.clear();
262 log.error("collectResultChannel.put(resultMap) error ", e);
265 if ("PmData".equalsIgnoreCase(endLocalName)) {
269 if ("PmName".equalsIgnoreCase(endLocalName)) {
272 if ("Measurements".equalsIgnoreCase(endLocalName)) {
273 // a measurement over
274 measurementStart = false;
277 if ("FileHeader".equalsIgnoreCase(endLocalName)) {
278 fileHeaderStart = false;
282 } catch (Exception e) {
283 log.error("Exception: ", e);
284 event = reader.next();
288 } catch (Exception e) {
289 log.error("processPMXml is Exception ", e);
295 private String getXMLAttribute(XMLStreamReader reader, String obj) {
297 if (obj == null || reader == null) {
300 int n = reader.getAttributeCount();
301 for (int i = 0; i < n; i++) {
302 String name = reader.getAttributeLocalName(i);
303 if (obj.equalsIgnoreCase(name)) {
304 res = reader.getAttributeValue(i);
310 public boolean processPMCsv(File tempfile) {
312 List<String> columnNames = new ArrayList<>();
313 List<String> commonValues = new ArrayList<>();
314 try (FileInputStream brs = new FileInputStream(tempfile);
315 InputStreamReader isr = new InputStreamReader(brs, Constant.ENCODING_UTF8);
316 BufferedReader br = new BufferedReader(isr)) {
319 String commonField = br.readLine();
320 String[] fields = commonField.split("\\|", -1);
321 for (String com : fields) {
322 String[] comNameAndValue = com.split("=", 2);
323 columnNames.add(comNameAndValue[0].trim());
324 commonValues.add(comNameAndValue[1]);
327 String columnName = br.readLine();
328 String[] names = columnName.split("\\|", -1);
329 for (String name : names) {
330 columnNames.add(name);
333 String valueLine = "";
334 List<String> valuelist = new ArrayList<>();
336 while ((valueLine = br.readLine()) != null) {
337 if ("".equals(valueLine.trim())) {
340 String[] values = valueLine.split("\\|", -1);
342 valuelist.addAll(commonValues);
343 for (String value : values) {
344 valuelist.add(value);
346 HashMap<String, String> resultMap = this.resultMap(columnNames, valuelist);
348 pmResultChannel.put(resultMap);
349 } catch (Exception e) {
350 pmResultChannel.clear();
351 log.error("collectResultChannel.put(resultMap) error ", e);
355 } catch (IOException e) {
356 log.error("processPMCsv is fail ", e);
363 private HashMap<String, String> resultMap(List<String> columnNames, List<String> valuelist) {
365 HashMap<String, String> resultMap = new HashMap<>();
366 if (columnNames.size() == valuelist.size()) {
367 for (int i = 0; i < columnNames.size(); i++) {
368 resultMap.put(columnNames.get(i), valuelist.get(i));
376 private boolean processCMXml(File tempfile, String nename, String type) {
378 String csvpath = localPath + nename + "/" + type + "/";
379 File csvpathfile = new File(csvpath);
380 if (!csvpathfile.exists()) {
381 csvpathfile.mkdirs();
383 String csvFileName = nename + dateFormat.format(new Date()) + System.nanoTime();
384 String csvpathAndFileName = csvpath + csvFileName + ".csv";
385 try( FileOutputStream fos = new FileOutputStream(csvpathAndFileName, false);
386 BufferedOutputStream bos = new BufferedOutputStream(fos, 10240)){
388 boolean FieldNameFlag = false;
389 boolean FieldValueFlag = false;
391 String xmlPathAndFileName = null;
392 String localName = null;
393 String endLocalName = null;
396 ArrayList<String> names = new ArrayList<>();// colname
397 LinkedHashMap<String, String> nameAndValue = new LinkedHashMap<>();
399 try( FileInputStream fis = new FileInputStream(tempfile);
400 InputStreamReader isr = new InputStreamReader(fis, Constant.ENCODING_UTF8)){
402 XMLInputFactory fac = XMLInputFactory.newInstance();
403 XMLStreamReader reader = fac.createXMLStreamReader(isr);
405 boolean setcolum = true;
406 while (reader.hasNext()) {
408 event = reader.next();
410 case XMLStreamConstants.START_ELEMENT:
411 localName = reader.getLocalName();
412 if ("FieldName".equalsIgnoreCase(localName)) {
413 FieldNameFlag = true;
415 if (FieldNameFlag && "N".equalsIgnoreCase(localName)) {
416 String colName = reader.getElementText().trim();
419 if ("FieldValue".equalsIgnoreCase(localName)) {
420 FieldValueFlag = true;
423 if (FieldValueFlag) {
425 xmlPathAndFileName = this.setColumnNames(nename, names, type);
429 if ("Object".equalsIgnoreCase(localName)) {
430 int ac = reader.getAttributeCount();
431 for (int i = 0; i < ac; i++) {
432 if ("rmUID".equalsIgnoreCase(reader.getAttributeLocalName(i))) {
433 rmUID = reader.getAttributeValue(i).trim();
436 nameAndValue.put("rmUID", rmUID);
438 if ("V".equalsIgnoreCase(localName)) {
439 index = Integer.parseInt(reader.getAttributeValue(0)) - 1;
440 String currentName = names.get(index);
441 String v = reader.getElementText().trim();
442 nameAndValue.put(currentName, v);
446 case XMLStreamConstants.CHARACTERS:
448 case XMLStreamConstants.END_ELEMENT:
449 endLocalName = reader.getLocalName();
451 if ("FieldName".equalsIgnoreCase(endLocalName)) {
452 FieldNameFlag = false;
454 if ("FieldValue".equalsIgnoreCase(endLocalName)) {
455 FieldValueFlag = false;
457 if ("Object".equalsIgnoreCase(endLocalName)) {
459 this.appendLine(nameAndValue, bos);
460 nameAndValue.clear();
464 } catch (Exception e) {
465 log.error("Exception: ",e);
466 event = reader.next();
470 String[] fileKeys = this.createZipFile(csvpathAndFileName, xmlPathAndFileName, nename);
472 Properties ftpPro = configurationInterface.getProperties();
473 String ip = ftpPro.getProperty("ftp_ip");
474 String port = ftpPro.getProperty("ftp_port");
475 String ftp_user = ftpPro.getProperty("ftp_user");
476 String ftp_password = ftpPro.getProperty("ftp_password");
478 String ftp_passive = ftpPro.getProperty("ftp_passive");
479 String ftp_type = ftpPro.getProperty("ftp_type");
480 String remoteFile = ftpPro.getProperty("ftp_remote_path");
481 this.ftpStore(fileKeys, ip, port, ftp_user, ftp_password, ftp_passive, ftp_type, remoteFile);
483 String message = this.createMessage(fileKeys[1], ftp_user, ftp_password, ip, port, countNum, nename);
486 this.setMessage(message);
489 } catch (Exception e) {
490 log.error("" + StringUtil.getStackTrace(e));
494 } catch (FileNotFoundException e1) {
495 log.error("FileNotFoundException " + StringUtil.getStackTrace(e1));
497 }catch (Exception e) {
498 log.error("" + StringUtil.getStackTrace(e));
504 private void setMessage(String message) {
506 cmResultChannel.put(message);
507 } catch (Exception e) {
508 log.error("collectResultChannel.put(message) is error " + StringUtil.getStackTrace(e));
512 public String createMessage(String zipName, String user, String pwd, String ip, String port, int countNum,
514 StringBuilder strBuffer = new StringBuilder();
515 strBuffer.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
516 + "<FILE_DATA_READY_UL xmlns:xsi=\" http://www.w3.org/2001/XMLSchema-instance\">"
517 + "<Header SessionID=\"");
518 strBuffer.append("");
519 strBuffer.append("\" LicenceID=\"");
520 strBuffer.append("");
521 strBuffer.append("\" SystemID=\"");
522 strBuffer.append("");
523 strBuffer.append("\" Time=\"");
524 strBuffer.append(dateFormat2.format(new Date()));
525 strBuffer.append("\" PolicyID=\"");
526 strBuffer.append("");
527 strBuffer.append("\"/><Body>");
528 strBuffer.append("<DataCatalog>");
529 strBuffer.append("");
530 strBuffer.append("</DataCatalog><GroupID>");
531 strBuffer.append(nename);
532 strBuffer.append("</GroupID><DataSourceName>");
533 strBuffer.append("");
534 strBuffer.append("</DataSourceName><InstanceID>");
535 strBuffer.append("");
536 strBuffer.append("</InstanceID><FileFormat>");
537 strBuffer.append("csv");
538 strBuffer.append("</FileFormat><CharSet>");
539 strBuffer.append("gbk");
540 strBuffer.append("</CharSet><FieldSeparator>");
541 strBuffer.append("|");
542 strBuffer.append("</FieldSeparator><IsCompressed>");
543 strBuffer.append("true");
544 strBuffer.append("</IsCompressed><StartTime>");
545 strBuffer.append(dateFormat2.format(new Date()));
546 strBuffer.append("</StartTime><EndTime>");
547 strBuffer.append("");
548 strBuffer.append("</EndTime><FileList>");
549 strBuffer.append(zipName);
550 strBuffer.append("</FileList><ConnectionString>");
551 strBuffer.append("ftp://" + user + ":" + pwd + "@" + ip + ":" + port);
552 strBuffer.append("</ConnectionString>");
553 strBuffer.append("<DataCount>");
554 strBuffer.append(countNum);
555 strBuffer.append("</DataCount>");
557 strBuffer.append("<FileSize>").append("").append("</FileSize>");
558 strBuffer.append("<DataGranularity>").append("").append("</DataGranularity>");
560 strBuffer.append("</Body></FILE_DATA_READY_UL>");
561 return strBuffer.toString();
565 private void ftpStore(String[] fileKeys, String ip, String port, String ftp_user, String ftp_password,
566 String ftp_passive, String ftp_type, String remoteFile) {
567 String zipFilePath = fileKeys[0];
569 FTPInterface ftpClient;
570 ftpClient = new FTPSrv();
573 ftpClient.login(ip, Integer.parseInt(port), ftp_user, ftp_password, "GBK",
574 Boolean.parseBoolean(ftp_passive), 5 * 60 * 1000);
575 } catch (Exception e) {
576 log.error("login fail,ip=[" + ip + "] port=[" + port + "] user=[" + ftp_user + /*"]pwd=[" + ftp_password + */"]"
577 + StringUtil.getStackTrace(e));
580 log.debug("store [" + zipFilePath + "]to[" + remoteFile + "]");
582 FileUtils.deleteQuietly(new File(zipFilePath));
586 private String[] createZipFile(String csvpathAndFileName, String xmlPathAndFileName, String nename)
588 String zipPath = resultPath + nename + dateFormat.format(new Date()) + "_" + System.nanoTime();
590 File destDir = new File(zipPath);
593 FileUtils.copyFileToDirectory(new File(csvpathAndFileName), destDir);
594 FileUtils.copyFileToDirectory(new File(xmlPathAndFileName), destDir);
595 } catch (IOException e) {
596 throw new IOException("createZipFile",e);
599 String destFilePath = zipPath + ".zip";
601 Zip zip = new Zip(destDir.getAbsolutePath(), destFilePath);
602 zip.setCompressLevel(9);
605 FileUtils.deleteDirectory(destDir);
606 } catch (IOException e) {
607 log.error("zip.compress() is fail " + StringUtil.getStackTrace(e));
608 throw new IOException("createZipFile",e);
610 return new String[] { destFilePath, zipPath + ".zip" };
613 private String setColumnNames(String nename, List<String> names, String type) {
615 String xmlpath = localPath + nename + "/" + type + "/";
616 File xmlpathfile = new File(xmlpath);
617 if (!xmlpathfile.exists()) {
618 xmlpathfile.mkdirs();
620 String xmlFileName = nename + dateFormat.format(new Date()) + System.nanoTime();
621 String fieldLine = "";
622 for (int i = 0; i < names.size(); i++) {
623 String field = "\t<Field>\r\n" + "\t\t<FieldNo>" + i + "</FieldNo>\r\n" + "\t\t<FieldName>" + names.get(i)
624 + "</FieldName>\r\n" + "\t\t<FieldType>" + names.get(i) + "</FieldType>\r\n"
625 + "\t\t<FieldNameOther>" + names.get(i) + "</FieldNameOther>\r\n" + "\t</Field>\r\n";
626 fieldLine = fieldLine + field;
629 String str = "<?xml version=\"1.0\" encoding=\"gbk\"?>\r\n" + "<xml>\r\n" + "<FILE_STRUCTURE>\r\n" + fieldLine
630 + "</FILE_STRUCTURE>\r\n" + "</xml>\r\n";
631 String xmlPathAndFileName = xmlpath + xmlFileName + ".xml";
633 this.writeDetail(xmlPathAndFileName, str);
634 } catch (Exception e) {
635 log.error("writeDetail is fail ,xmlFileName=" + xmlFileName + StringUtil.getStackTrace(e));
638 return xmlPathAndFileName;
641 private void writeDetail(String detailFileName, String str) throws IOException {
642 try (OutputStream readOut = new FileOutputStream(new File(detailFileName), false);
643 OutputStreamWriter writer = new OutputStreamWriter(readOut)) {
646 } catch (IOException e) {
647 throw new IOException("writeDetail",e);
651 private void appendLine(LinkedHashMap<String, String> nameAndValue, BufferedOutputStream bos) {
653 StringBuilder lineDatas = new StringBuilder();
655 for (String key : nameAndValue.keySet()) {
656 lineDatas.append(nameAndValue.get(key)).append("|");
659 /*for (HashMap.Entry<String, String> entry : nameAndValue.entrySet()) {
660 lineDatas.append(entry.getValue()).append("|");
662 bos.write(lineDatas.toString().getBytes());
663 bos.write("\n".getBytes());
664 } catch (IOException e) {
665 log.error("appendLine error " + StringUtil.getStackTrace(e));
669 public List<File> decompressed(String fileName) {
670 List<File> filelist = new ArrayList<File>();
672 if (fileName.indexOf(".gz") > 1) {
674 File decompressFile = deGz(fileName);
675 filelist.add(decompressFile);
676 new File(fileName).delete();
677 } catch (IOException e) {
678 log.error("decompressed is fail " + StringUtil.getStackTrace(e));
680 } else if (fileName.indexOf(".zip") > 1) {
682 File[] files = deZip(new File(fileName));
683 new File(fileName).delete();
684 for (File temp : files) {
687 } catch (Exception e) {
688 log.error("decompressed is fail " + StringUtil.getStackTrace(e));
691 filelist.add(new File(fileName));
697 private File deGz(String gzFileName) throws IOException {
698 Gunzip gunzip = new Gunzip();
699 String orgFile = gzFileName.replace(".gz", "");
700 gunzip.unCompress(gzFileName, orgFile);
701 return new File(orgFile);
704 public File[] deZip(File file) throws IOException {
706 String regx = "(.*).zip";
707 Pattern p = Pattern.compile(regx);
708 Matcher m = p.matcher(file.getName());
710 String orgFile = localPath + m.group(1) + "/";
711 UnZip unzip = new UnZip(file.getAbsolutePath(), orgFile);
713 file = new File(orgFile);
715 File[] files = file.listFiles();
721 private List<String> ftpDownload(CollectVo collectVo) {
723 List<String> fileList = new ArrayList<String>();
725 String ip = collectVo.getIP();
727 String port = collectVo.getPort();
729 String user = collectVo.getUser();
731 String password = collectVo.getPassword();
733 String passivemode = collectVo.getPassive();
735 FTPInterface ftpClient = new FTPSrv();
739 log.info("ftp login ,ip=[" + ip + "] port=[" + port + "] user=[" + user + /*"]password=[" + password +*/ "]");
740 ftpClient.login(ip, Integer.parseInt(port), user, password, "GBK", Boolean.parseBoolean(passivemode),
742 } catch (Exception e) {
743 log.error("login fail,ip=[" + ip + "] port=[" + port + "] user=[" + user + /*"]password=[" + password +*/ "]"
744 + StringUtil.getStackTrace(e));
749 String dir = collectVo.getRemotepath();
750 List<String> searchExprList = new ArrayList<String>();
751 String[] FPath = dir.split(";");
752 for (int i = 0; i < FPath.length; i++) {
753 int oldSize = searchExprList.size();
754 String conpath = FPath[i] + collectVo.getMatch();
755 HashMap<String, String> varMap = new HashMap<>();
756 long collectPeriod = 900;
758 collectPeriod = Long.parseLong(collectVo.getGranularity()) * 60;
759 log.info("collectPeriod =[" + collectPeriod + "]");
760 } catch (NumberFormatException e) {
761 log.error("NumberFormatException" ,e);
763 long[] d = DateUtil.getScanScope(new Date(), collectPeriod);
764 searchExprList.add(VarExprParser.replaceVar(conpath, d[0], d[1]));
767 log.info("[" + conpath + "] result[" + (searchExprList.size() - oldSize) + "] path");
769 String nowdir = null;
771 nowdir = ftpClient.pwd();
772 searchExprList = getPathNoRegular(searchExprList, ftpClient);
773 } catch (Exception e1) {
774 log.error(" collect fail ", e1);
777 List<AFtpRemoteFile> remoteFiles = new ArrayList<AFtpRemoteFile>();
778 for (String expr : searchExprList) {
779 ftpClient.chdir(nowdir);
780 String keys[] = parseExprKeys(expr);
781 String ftpRegular = keys[1];
782 String ftpDir = keys[0];
784 boolean cdsucess = ftpClient.chdir(ftpDir);
786 AFtpRemoteFile[] arf = (AFtpRemoteFile[]) ftpClient.list();
787 log.info(" list [" + ftpDir + "] result[" + (arf == null ? "null" : arf.length) + "] files");
790 rfileFilter(remoteFiles, arf, ftpRegular);
792 for (AFtpRemoteFile ftpRemoteFile : remoteFiles) {
793 if (!new File(localPath).exists()) {
795 new File(localPath).mkdir();
796 } catch (Exception e) {
797 log.error("create localPath is fail localPath=" + localPath + " "
798 + StringUtil.getStackTrace(e));
802 if (!new File(localPath).exists()) {
803 new File(localPath).mkdirs();
806 String localFileName = localPath + ftpRemoteFile.getFileName();
807 if(new File(localFileName).exists()){
808 new File(localFileName).delete();
811 boolean flag = ftpClient.downloadFile(ftpRemoteFile.getAbsFileName(), localFileName);
814 fileList.add(localFileName);
816 log.error("download file fail fileName=" + ftpRemoteFile.getAbsFileName());
821 log.error("cd dir is faill dir =[" + ftpDir + "]");
828 private void rfileFilter(List<AFtpRemoteFile> fileContainer, AFtpRemoteFile[] arfs, String ftpRegular) {
829 if (ftpRegular != null && ftpRegular.length() > 0) {
830 Pattern pattern = null;
832 pattern = Pattern.compile(ftpRegular, Pattern.CASE_INSENSITIVE);
833 } catch (Exception e) {
834 log.info("[" + ftpRegular + "]Pattern.compile exception:",e);
835 // should rethrow exception or return from here
837 int hisSize = fileContainer.size();
838 for (int j = 0; arfs != null && j < arfs.length; j++) {
839 String fileName = parseFileName(arfs[j].getFileName());
840 Matcher matcher = null;
842 matcher = pattern.matcher(fileName);
844 // define the flow when pattern is null
847 if (null != matcher && matcher.find())
848 fileContainer.add(arfs[j]);
850 log.info("[" + ftpRegular + "]filter[" + (fileContainer.size() - hisSize) + "]filse");
852 for (int j = 0; arfs != null && j < arfs.length; j++)
853 fileContainer.add(arfs[j]);
858 private String parseFileName(String fileName) {
859 int idx = fileName.lastIndexOf("/");
862 return fileName.substring(idx + 1, fileName.length());
865 private String[] parseExprKeys(String source) {
867 if (source.indexOf(";") > -1) {
868 source = source.substring(0, source.indexOf(";"));
870 if (source.endsWith("/"))
871 return new String[] { source, "" };
873 int idx = source.lastIndexOf("/");
874 String[] dirkeys = new String[2];
875 dirkeys[0] = source.substring(0, idx + 1);
876 dirkeys[1] = source.substring(idx + 1, source.length());
880 public List<String> getPathNoRegular(List<String> searchExprList, FTPInterface ftpCache) throws IOException {
881 boolean isregular = false;
882 List<String> regularList = new ArrayList<String>();
883 for (String regular : searchExprList) {
884 Pattern lpattern = null;
886 lpattern = Pattern.compile("(.*/)<([^/]+)>(/.*)");
887 } catch (Exception e) {
888 log.error("[" + regular + "]compile fails:" + e.getMessage());
889 throw new IOException("getPathNoRegular", e);
891 Matcher matcher = null;
892 if (lpattern != null)
893 matcher = lpattern.matcher(regular);
895 // define flow in case lpattern is null
898 if (null != matcher && matcher.find()) {
900 String parpath = matcher.group(1);
902 boolean isin = ftpCache.chdir(parpath);
904 log.info("cd dir [" + parpath + "] sucess");
906 log.error("cd dir [" + parpath + "] fail");
908 } catch (Exception e) {
909 log.error(" cd dir [" + parpath + "]fail", e);
910 throw new IOException("ftpCache.chdir",e);
912 RemoteFile[] remotef = ftpCache.list();
913 for (RemoteFile aremote : remotef) {
914 if (aremote.isDirectory() && aremote.getFileName().matches(matcher.group(2))) {
915 regularList.add(matcher.group(1) + aremote.getFileName() + matcher.group(3));
919 regularList.add(regular);
922 if (isregular == true) {
923 getPathNoRegular(regularList, ftpCache);
928 public MessageChannel getPmResultChannel() {
929 return pmResultChannel;
932 public void setPmResultChannel(MessageChannel pmResChannel) {
933 this.pmResultChannel = pmResChannel;