2 * Copyright 2017 BOCO Corporation. CMCC Technologies Co., Ltd
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onap.vfc.nfvo.emsdriver.collector;
18 import org.apache.commons.io.FileUtils;
19 import org.apache.commons.logging.Log;
20 import org.apache.commons.logging.LogFactory;
21 import org.onap.vfc.nfvo.emsdriver.commons.constant.Constant;
22 import org.onap.vfc.nfvo.emsdriver.commons.ftp.AFtpRemoteFile;
23 import org.onap.vfc.nfvo.emsdriver.commons.ftp.FTPInterface;
24 import org.onap.vfc.nfvo.emsdriver.commons.ftp.FTPSrv;
25 import org.onap.vfc.nfvo.emsdriver.commons.ftp.RemoteFile;
26 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectMsg;
27 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectVo;
28 import org.onap.vfc.nfvo.emsdriver.commons.utils.*;
29 import org.onap.vfc.nfvo.emsdriver.configmgr.ConfigurationImp;
30 import org.onap.vfc.nfvo.emsdriver.configmgr.ConfigurationInterface;
31 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannel;
32 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannelFactory;
34 import javax.xml.stream.XMLInputFactory;
35 import javax.xml.stream.XMLStreamConstants;
36 import javax.xml.stream.XMLStreamReader;
38 import java.text.SimpleDateFormat;
40 import java.util.regex.Matcher;
41 import java.util.regex.Pattern;
43 public class TaskThread implements Runnable {
45 private static final Log log = LogFactory.getLog(TaskThread.class);
46 private MessageChannel pmResultChannel;
47 private MessageChannel cmResultChannel;
48 private CollectMsg data;
49 private ConfigurationInterface configurationInterface = new ConfigurationImp();
50 private String localPath = Constant.SYS_DATA_TEMP;
51 private String resultPath = Constant.SYS_DATA_RESULT;
52 private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
53 private SimpleDateFormat dateFormat2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
55 public TaskThread(CollectMsg data) {
65 cmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_CHANNEL_KEY);
66 pmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_PM_CHANNEL_KEY);
68 collectMsgHandle(data);
69 } catch (Exception e) {
70 log.error(" collectMsgHandle", e);
74 private void collectMsgHandle(CollectMsg collectMsg) {
75 String emsName = collectMsg.getEmsName();
76 String type = collectMsg.getType();
77 CollectVo collectVo = configurationInterface.getCollectVoByEmsNameAndType(emsName, type);
79 List<String> downloadfiles = this.ftpDownload(collectVo);
80 // paser ftp update message send
81 for (String fileName : downloadfiles) {
82 this.parseFtpAndSendMessage(fileName, collectVo);
86 public void parseFtpAndSendMessage(String fileName, CollectVo collectVo) {
87 List<File> filelist = decompressed(fileName);
88 for (File tempfile : filelist) {
89 String unfileName = tempfile.getName();
90 Pattern pa = Pattern.compile(".*-(.*)-\\w{2}-");
91 Matcher ma = pa.matcher(unfileName);
94 //String nename = ma.group(1);
95 boolean parseResult = false;
96 if (Constant.COLLECT_TYPE_CM.equalsIgnoreCase(collectVo.getType())) {
97 // parseResult = processCMXml(tempfile, nename, "CM");//The
98 // logic is not exist now, but need to add in the future
100 if (unfileName.indexOf(".csv") > -1) {// changed to -1 for coding practice as having ".csv" must have some some legal name
101 parseResult = processPMCsv(tempfile);
103 parseResult = processPMXml(tempfile);
107 log.info("parser " + tempfile + " sucess");
110 log.info("parser " + tempfile + " fail");
116 public boolean processPMXml(File file) {
117 try (FileInputStream fis = new FileInputStream(file);
118 InputStreamReader isr = new InputStreamReader(fis,Constant.ENCODING_UTF8)) {
119 XMLInputFactory fac = XMLInputFactory.newInstance();
120 XMLStreamReader reader = fac.createXMLStreamReader(isr);
122 boolean fileHeaderStart = false;
123 boolean measurementStart = false;
124 boolean pmNameFlag = false;
125 boolean pmDataFlag = false;
126 boolean objectFlag = true;
130 String currentMea = null;
131 String subName = null;
132 String localName = null;
133 String endLocalName = null;
134 String objectType = null;
136 LinkedHashMap<String, String> commonNameAndValue = new LinkedHashMap<>();
137 LinkedHashMap<String, String> pmDatas = null;
138 LinkedHashMap<Integer, String> pmNames = null;
141 while (reader.hasNext()) {
143 String fileHeaderConstant = "FileHeader";
144 event = reader.next();
146 case XMLStreamConstants.START_ELEMENT:
147 localName = reader.getLocalName();
148 if (fileHeaderConstant.equalsIgnoreCase(localName)) {
149 fileHeaderStart = true;
151 if (fileHeaderStart&& !fileHeaderConstant.equalsIgnoreCase(localName)) {
152 commonNameAndValue.put(localName, reader.getElementText().trim());
154 if ("Measurements".equalsIgnoreCase(localName)) {
155 // a new Measurement starts
156 measurementStart = true;
158 if (measurementStart) {
159 // measurement handler
160 if ("ObjectType".equalsIgnoreCase(localName)) {
161 objectType = reader.getElementText().trim();
162 commonNameAndValue.put("ObjectType", objectType);
164 if ("PmName".equalsIgnoreCase(localName)) {
166 pmNames = new LinkedHashMap<>();
169 if (pmNameFlag && "N".equalsIgnoreCase(localName)) {
170 // pmname handler, add columnNames
171 nameIndex = Integer.parseInt(getXMLAttribute(reader, "i"));
172 String text = reader.getElementText().trim();
173 pmNames.put(nameIndex, text);
175 if ("PmData".equalsIgnoreCase(localName)) {
177 pmDatas = new LinkedHashMap<>();
182 if ("Object".equalsIgnoreCase(localName)) {
184 int n = reader.getAttributeCount();
185 for (int i = 0; i < n; i++) {
186 String name = reader.getAttributeLocalName(i);
187 commonNameAndValue.put(name,reader.getAttributeValue(i));
192 if ("V".equalsIgnoreCase(localName)) {
193 String indexStr = getXMLAttribute(reader, "i");
194 if (indexStr == null) {
195 log.error("ERROR: illegal value index");
198 index = Integer.parseInt(indexStr);
199 String name = pmNames.get(index);
201 log.error("illegal data: valueIndex="+ index);
205 String value = reader.getElementText().trim();
206 pmDatas.put(name, value);
208 if ("CV".equalsIgnoreCase(localName)) {
210 String indexStr = getXMLAttribute(reader, "i");
211 if (indexStr == null) {
212 log.error("ERROR: illegal value index");
215 index = Integer.parseInt(indexStr);
217 currentMea = pmNames.get(index);
218 if (currentMea == null) {
219 log.error("illegal data: valueIndex="+ index);
224 if ("SN".equalsIgnoreCase(localName)) {
225 subName = reader.getElementText().trim();
228 if ("SV".equalsIgnoreCase(localName)) {
229 String subValue = reader.getElementText().trim();
230 pmDatas.put(subName, subValue);
238 case XMLStreamConstants.CHARACTERS:
241 case XMLStreamConstants.END_ELEMENT:
242 endLocalName = reader.getLocalName();
243 if ("Object".equalsIgnoreCase(endLocalName)) {
245 pmDatas.putAll(commonNameAndValue);
247 pmResultChannel.put(pmDatas);
249 } catch (Exception e) {
250 pmResultChannel.clear();
251 log.error("collectResultChannel.put(resultMap) error ",e);
254 if ("PmData".equalsIgnoreCase(endLocalName)) {
258 if ("PmName".equalsIgnoreCase(endLocalName)) {
261 if ("Measurements".equalsIgnoreCase(endLocalName)) {
262 // a measurement over
263 measurementStart = false;
266 if (fileHeaderConstant.equalsIgnoreCase(endLocalName)) {
267 fileHeaderStart = false;
271 } catch (Exception e) {
272 log.error("Exception: ", e);
273 event = reader.next();
277 } catch (Exception e) {
278 log.error("processPMXml is Exception ", e);
284 private String getXMLAttribute(XMLStreamReader reader, String obj) {
286 if (obj == null || reader == null) {
289 int n = reader.getAttributeCount();
290 for (int i = 0; i < n; i++) {
291 String name = reader.getAttributeLocalName(i);
292 if (obj.equalsIgnoreCase(name)) {
293 res = reader.getAttributeValue(i);
299 public boolean processPMCsv(File tempfile) {
301 List<String> columnNames = new ArrayList<>();
302 List<String> commonValues = new ArrayList<>();
303 try (FileInputStream brs = new FileInputStream(tempfile);
304 InputStreamReader isr = new InputStreamReader(brs,Constant.ENCODING_UTF8);
305 BufferedReader br = new BufferedReader(isr)) {
308 String commonField = br.readLine();
309 String[] fields = commonField.split("\\|", -1);
310 for (String com : fields) {
311 String[] comNameAndValue = com.split("=", 2);
312 columnNames.add(comNameAndValue[0].trim());
313 commonValues.add(comNameAndValue[1]);
316 String columnName = br.readLine();
317 String[] names = columnName.split("\\|", -1);
318 for (String name : names) {
319 columnNames.add(name);
322 String valueLine = "";
323 List<String> valuelist = new ArrayList<>();
325 while ((valueLine = br.readLine()) != null) {
326 if ("".equals(valueLine.trim())) {
329 String[] values = valueLine.split("\\|", -1);
331 valuelist.addAll(commonValues);
332 for (String value : values) {
333 valuelist.add(value);
335 HashMap<String, String> resultMap = this.resultMap(columnNames,valuelist);
337 pmResultChannel.put(resultMap);
338 } catch (Exception e) {
339 pmResultChannel.clear();
340 log.error("collectResultChannel.put(resultMap) error ", e);
344 } catch (IOException e) {
345 log.error("processPMCsv is fail ", e);
352 private HashMap<String, String> resultMap(List<String> columnNames,
353 List<String> valuelist) {
355 HashMap<String, String> resultMap = new HashMap<>();
356 if (columnNames.size() == valuelist.size()) {
357 for (int i = 0; i < columnNames.size(); i++) {
358 resultMap.put(columnNames.get(i), valuelist.get(i));
366 /* private boolean processCMXml(File tempfile, String nename, String type) {
368 String csvpath = localPath + nename + "/" + type + "/";
369 File csvpathfile = new File(csvpath);
370 if (!csvpathfile.exists()) {
371 csvpathfile.mkdirs();
373 String csvFileName = nename + dateFormat.format(new Date()) + System.nanoTime();
374 String csvpathAndFileName = csvpath + csvFileName + ".csv";
375 try( FileOutputStream fos = new FileOutputStream(csvpathAndFileName, false);
376 BufferedOutputStream bos = new BufferedOutputStream(fos, 10240)){
378 boolean FieldNameFlag = false;
379 boolean FieldValueFlag = false;
381 String xmlPathAndFileName = null;
382 String localName = null;
383 String endLocalName = null;
386 ArrayList<String> names = new ArrayList<>();// colname
387 LinkedHashMap<String, String> nameAndValue = new LinkedHashMap<>();
389 try( FileInputStream fis = new FileInputStream(tempfile);
390 InputStreamReader isr = new InputStreamReader(fis, Constant.ENCODING_UTF8)){
392 XMLInputFactory fac = XMLInputFactory.newInstance();
393 XMLStreamReader reader = fac.createXMLStreamReader(isr);
395 boolean setcolum = true;
396 while (reader.hasNext()) {
398 event = reader.next();
400 case XMLStreamConstants.START_ELEMENT:
401 localName = reader.getLocalName();
402 if ("FieldName".equalsIgnoreCase(localName)) {
403 FieldNameFlag = true;
405 if (FieldNameFlag && "N".equalsIgnoreCase(localName)) {
406 String colName = reader.getElementText().trim();
409 if ("FieldValue".equalsIgnoreCase(localName)) {
410 FieldValueFlag = true;
413 if (FieldValueFlag) {
415 xmlPathAndFileName = this.setColumnNames(nename, names, type);
419 if ("Object".equalsIgnoreCase(localName)) {
420 int ac = reader.getAttributeCount();
421 for (int i = 0; i < ac; i++) {
422 if ("rmUID".equalsIgnoreCase(reader.getAttributeLocalName(i))) {
423 rmUID = reader.getAttributeValue(i).trim();
426 nameAndValue.put("rmUID", rmUID);
428 if ("V".equalsIgnoreCase(localName)) {
429 index = Integer.parseInt(reader.getAttributeValue(0)) - 1;
430 String currentName = names.get(index);
431 String v = reader.getElementText().trim();
432 nameAndValue.put(currentName, v);
436 case XMLStreamConstants.CHARACTERS:
438 case XMLStreamConstants.END_ELEMENT:
439 endLocalName = reader.getLocalName();
441 if ("FieldName".equalsIgnoreCase(endLocalName)) {
442 FieldNameFlag = false;
444 if ("FieldValue".equalsIgnoreCase(endLocalName)) {
445 FieldValueFlag = false;
447 if ("Object".equalsIgnoreCase(endLocalName)) {
449 this.appendLine(nameAndValue, bos);
450 nameAndValue.clear();
454 } catch (Exception e) {
455 log.error("Exception: ",e);
456 event = reader.next();
460 String[] fileKeys = this.createZipFile(csvpathAndFileName, xmlPathAndFileName, nename);
462 Properties ftpPro = configurationInterface.getProperties();
463 String ip = ftpPro.getProperty("ftp_ip");
464 String port = ftpPro.getProperty("ftp_port");
465 String ftp_user = ftpPro.getProperty("ftp_user");
466 String ftp_password = ftpPro.getProperty("ftp_password");
468 String ftp_passive = ftpPro.getProperty("ftp_passive");
469 String ftp_type = ftpPro.getProperty("ftp_type");
470 String remoteFile = ftpPro.getProperty("ftp_remote_path");
471 this.ftpStore(fileKeys, ip, port, ftp_user, ftp_password, ftp_passive, ftp_type, remoteFile);
473 String message = this.createMessage(fileKeys[1], ftp_user, ftp_password, ip, port, countNum, nename);
476 this.setMessage(message);
479 } catch (Exception e) {
480 log.error("" + StringUtil.getStackTrace(e));
484 } catch (FileNotFoundException e1) {
485 log.error("FileNotFoundException " + StringUtil.getStackTrace(e1));
487 }catch (Exception e) {
488 log.error("" + StringUtil.getStackTrace(e));
494 private void setMessage(String message) {
496 cmResultChannel.put(message);
497 } catch (Exception e) {
498 log.error("collectResultChannel.put(message) is error "
499 + StringUtil.getStackTrace(e));
503 public String createMessage(String zipName, String user, String pwd,
504 String ip, String port, int countNum, String nename) {
505 StringBuilder strBuffer = new StringBuilder();
507 .append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
508 + "<FILE_DATA_READY_UL xmlns:xsi=\" http://www.w3.org/2001/XMLSchema-instance\">"
509 + "<Header SessionID=\"");
510 strBuffer.append("");
511 strBuffer.append("\" LicenceID=\"");
512 strBuffer.append("");
513 strBuffer.append("\" SystemID=\"");
514 strBuffer.append("");
515 strBuffer.append("\" Time=\"");
516 strBuffer.append(dateFormat2.format(new Date()));
517 strBuffer.append("\" PolicyID=\"");
518 strBuffer.append("");
519 strBuffer.append("\"/><Body>");
520 strBuffer.append("<DataCatalog>");
521 strBuffer.append("");
522 strBuffer.append("</DataCatalog><GroupID>");
523 strBuffer.append(nename);
524 strBuffer.append("</GroupID><DataSourceName>");
525 strBuffer.append("");
526 strBuffer.append("</DataSourceName><InstanceID>");
527 strBuffer.append("");
528 strBuffer.append("</InstanceID><FileFormat>");
529 strBuffer.append("csv");
530 strBuffer.append("</FileFormat><CharSet>");
531 strBuffer.append("gbk");
532 strBuffer.append("</CharSet><FieldSeparator>");
533 strBuffer.append("|");
534 strBuffer.append("</FieldSeparator><IsCompressed>");
535 strBuffer.append("true");
536 strBuffer.append("</IsCompressed><StartTime>");
537 strBuffer.append(dateFormat2.format(new Date()));
538 strBuffer.append("</StartTime><EndTime>");
539 strBuffer.append("");
540 strBuffer.append("</EndTime><FileList>");
541 strBuffer.append(zipName);
542 strBuffer.append("</FileList><ConnectionString>");
543 strBuffer.append("ftp://" + user + ":" + pwd + "@" + ip + ":" + port);
544 strBuffer.append("</ConnectionString>");
545 strBuffer.append("<DataCount>");
546 strBuffer.append(countNum);
547 strBuffer.append("</DataCount>");
549 strBuffer.append("<FileSize>").append("").append("</FileSize>");
550 strBuffer.append("<DataGranularity>").append("")
551 .append("</DataGranularity>");
553 strBuffer.append("</Body></FILE_DATA_READY_UL>");
554 return strBuffer.toString();
558 private void ftpStore(String[] fileKeys, String ip, String port,
559 String ftpUser, String ftpPassword, String ftpPassive,
560 String ftpType, String remoteFile) {
561 String zipFilePath = fileKeys[0];
563 FTPInterface ftpClient;
564 ftpClient = new FTPSrv();
567 ftpClient.login(ip, Integer.parseInt(port), ftpUser, ftpPassword,
568 "GBK", Boolean.parseBoolean(ftpPassive), 5 * 60 * 1000);
569 } catch (Exception e) {
570 log.error("login fail,ip=[" + ip + "] port=[" + port + "] user=["
571 + ftpUser + "]pwd=[" + ftpPassword + "]"
572 + StringUtil.getStackTrace(e));
575 log.debug("store [" + zipFilePath + "]to[" + remoteFile + "]");
577 FileUtils.deleteQuietly(new File(zipFilePath));
581 private String[] createZipFile(String csvpathAndFileName, String xmlPathAndFileName, String nename) throws IOException {
582 String zipPath = resultPath + nename + dateFormat.format(new Date())
583 + "_" + System.nanoTime();
585 File destDir = new File(zipPath);
589 .copyFileToDirectory(new File(csvpathAndFileName), destDir);
591 .copyFileToDirectory(new File(xmlPathAndFileName), destDir);
592 } catch (IOException e) {
593 throw new IOException("createZipFile", e);
596 String destFilePath = zipPath + ".zip";
598 Zip zip = new Zip(destDir.getAbsolutePath(), destFilePath);
599 zip.setCompressLevel(9);
602 FileUtils.deleteDirectory(destDir);
603 } catch (IOException e) {
604 log.error("zip.compress() is fail " + StringUtil.getStackTrace(e));
605 throw new IOException("createZipFile", e);
607 return new String[] { destFilePath, zipPath + ".zip" };
610 private String setColumnNames(String nename, List<String> names, String type) {
612 String xmlpath = localPath + nename + "/" + type + "/";
613 File xmlpathfile = new File(xmlpath);
614 if (!xmlpathfile.exists()) {
615 xmlpathfile.mkdirs();
617 String xmlFileName = nename + dateFormat.format(new Date()) + System.nanoTime();
618 String fieldLine = "";
619 for (int i = 0; i < names.size(); i++) {
620 String field = "\t<Field>\r\n" + "\t\t<FieldNo>" + i
621 + "</FieldNo>\r\n" + "\t\t<FieldName>" + names.get(i)
622 + "</FieldName>\r\n" + "\t\t<FieldType>" + names.get(i)
623 + "</FieldType>\r\n" + "\t\t<FieldNameOther>"
624 + names.get(i) + "</FieldNameOther>\r\n" + "\t</Field>\r\n";
625 fieldLine = fieldLine + field;
628 String str = "<?xml version=\"1.0\" encoding=\"gbk\"?>\r\n"
629 + "<xml>\r\n" + "<FILE_STRUCTURE>\r\n" + fieldLine
630 + "</FILE_STRUCTURE>\r\n" + "</xml>\r\n";
631 String xmlPathAndFileName = xmlpath + xmlFileName + ".xml";
633 this.writeDetail(xmlPathAndFileName, str);
634 } catch (Exception e) {
635 log.error("writeDetail is fail ,xmlFileName=" + xmlFileName
636 + StringUtil.getStackTrace(e));
639 return xmlPathAndFileName;
642 private void writeDetail(String detailFileName, String str)
644 try (OutputStream readOut = new FileOutputStream(new File(
645 detailFileName), false);
646 OutputStreamWriter writer = new OutputStreamWriter(readOut)) {
649 } catch (IOException e) {
650 throw new IOException("writeDetail", e);
654 private void appendLine(LinkedHashMap<String, String> nameAndValue,
655 BufferedOutputStream bos) {
657 StringBuilder lineDatas = new StringBuilder();
659 for (String key : nameAndValue.keySet()) {
660 lineDatas.append(nameAndValue.get(key)).append("|");
664 * for (HashMap.Entry<String, String> entry :
665 * nameAndValue.entrySet()) {
666 * lineDatas.append(entry.getValue()).append("|"); }
668 bos.write(lineDatas.toString().getBytes());
669 bos.write("\n".getBytes());
670 } catch (IOException e) {
671 log.error("appendLine error " + StringUtil.getStackTrace(e));
675 public List<File> decompressed(String fileName) {
676 List<File> filelist = new ArrayList<>();
678 if (fileName.indexOf(".gz") > -1) {
680 File decompressFile = deGz(fileName);
681 filelist.add(decompressFile);
682 new File(fileName).delete();
683 } catch (IOException e) {
684 log.error("decompressed is fail " + StringUtil.getStackTrace(e));
686 } else if (fileName.indexOf(".zip") > -1) {
688 File[] files = deZip(new File(fileName));
689 new File(fileName).delete();
690 for (File temp : files) {
693 } catch (Exception e) {
694 log.error("decompressed is fail " + StringUtil.getStackTrace(e));
697 filelist.add(new File(fileName));
703 private File deGz(String gzFileName) throws IOException {
704 Gunzip gunzip = new Gunzip();
705 String orgFile = gzFileName.replace(".gz", "");
706 gunzip.unCompress(gzFileName, orgFile);
707 return new File(orgFile);
710 public File[] deZip(File file) throws IOException {
712 String regx = "(.*).zip";
713 Pattern p = Pattern.compile(regx);
714 Matcher m = p.matcher(file.getName());
716 String orgFile = localPath + m.group(1) + "/";
717 UnZip unzip = new UnZip(file.getAbsolutePath(), orgFile);
719 file = new File(orgFile);
721 return file.listFiles();
725 private List<String> ftpDownload(CollectVo collectVo) {
727 List<String> fileList = new ArrayList<>();
729 String ip = collectVo.getIP();
731 String port = collectVo.getPort();
733 String user = collectVo.getUser();
735 String password = collectVo.getPassword();
737 String passivemode = collectVo.getPassive();
739 FTPInterface ftpClient = new FTPSrv();
743 log.info("ftp login ,ip=[" + ip + "] port=[" + port + "] user=["
744 + user + /* "]password=[" + password + */"]");
745 ftpClient.login(ip, Integer.parseInt(port), user, password, "GBK",
746 Boolean.parseBoolean(passivemode), 5 * 60 * 1000);
747 } catch (Exception e) {
748 log.error("login fail,ip=[" + ip + "] port=[" + port + "] user=["
749 + user + /* "]password=[" + password + */"]"
750 + StringUtil.getStackTrace(e));
755 String dir = collectVo.getRemotepath();
756 List<String> searchExprList = new ArrayList<>();
757 String[] fPath = dir.split(";");
758 for (int i = 0; i < fPath.length; i++) {
759 int oldSize = searchExprList.size();
760 String conpath = fPath[i] + collectVo.getMatch();
761 HashMap<String, String> varMap = new HashMap<>();
762 long collectPeriod = 900;
764 collectPeriod = Long.parseLong(collectVo.getGranularity()) * 60;
765 log.info("collectPeriod =[" + collectPeriod + "]");
766 } catch (NumberFormatException e) {
767 log.error("NumberFormatException", e);
769 long[] d = DateUtil.getScanScope(new Date(), collectPeriod);
770 searchExprList.add(VarExprParser.replaceVar(conpath, d[0], d[1]));
773 log.info("[" + conpath + "] result["
774 + (searchExprList.size() - oldSize) + "] path");
776 String nowdir = null;
778 nowdir = ftpClient.pwd();
779 searchExprList = getPathNoRegular(searchExprList, ftpClient);
780 } catch (Exception e1) {
781 log.error(" collect fail ", e1);
784 List<AFtpRemoteFile> remoteFiles = new ArrayList<>();
785 for (String expr : searchExprList) {
786 ftpClient.chdir(nowdir);
787 String[] keys = parseExprKeys(expr);
788 String ftpRegular = keys[1];
789 String ftpDir = keys[0];
791 boolean cdsucess = ftpClient.chdir(ftpDir);
793 AFtpRemoteFile[] arf = (AFtpRemoteFile[]) ftpClient.list();
794 log.info(" list [" + ftpDir + "] result["
795 + (arf == null ? "null" : arf.length) + "] files");
798 rfileFilter(remoteFiles, arf, ftpRegular);
800 for (AFtpRemoteFile ftpRemoteFile : remoteFiles) {
801 if (!new File(localPath).exists()) {
803 new File(localPath).mkdir();
804 } catch (Exception e) {
805 log.error("create localPath is fail localPath="
807 + StringUtil.getStackTrace(e));
811 if (!new File(localPath).exists()) {
812 new File(localPath).mkdirs();
815 String localFileName = localPath
816 + ftpRemoteFile.getFileName();
817 if (new File(localFileName).exists()) {
818 new File(localFileName).delete();
821 boolean flag = ftpClient.downloadFile(
822 ftpRemoteFile.getAbsFileName(), localFileName);
825 fileList.add(localFileName);
827 log.error("download file fail fileName="
828 + ftpRemoteFile.getAbsFileName());
833 log.error("cd dir is faill dir =[" + ftpDir + "]");
840 private void rfileFilter(List<AFtpRemoteFile> fileContainer,
841 AFtpRemoteFile[] arfs, String ftpRegular) {
842 if (ftpRegular != null && ftpRegular.length() > 0) {
843 Pattern pattern = null;
845 pattern = Pattern.compile(ftpRegular, Pattern.CASE_INSENSITIVE);
846 } catch (Exception e) {
847 log.info("[" + ftpRegular + "]Pattern.compile exception:", e);
848 // should rethrow exception or return from here
850 int hisSize = fileContainer.size();
851 for (int j = 0; arfs != null && j < arfs.length; j++) {
852 String fileName = parseFileName(arfs[j].getFileName());
853 Matcher matcher = null;
855 matcher = pattern.matcher(fileName);
857 // define the flow when pattern is null
860 if (null != matcher && matcher.find())
861 fileContainer.add(arfs[j]);
863 log.info("[" + ftpRegular + "]filter["
864 + (fileContainer.size() - hisSize) + "]filse");
866 for (int j = 0; arfs != null && j < arfs.length; j++)
867 fileContainer.add(arfs[j]);
872 private String parseFileName(String fileName) {
873 int idx = fileName.lastIndexOf("/");
876 return fileName.substring(idx + 1, fileName.length());
879 private String[] parseExprKeys(String source) {
881 if (source.indexOf(";") > -1) {
882 source = source.substring(0, source.indexOf(";"));
884 if (source.endsWith("/"))
885 return new String[] { source, "" };
887 int idx = source.lastIndexOf("/");
888 String[] dirkeys = new String[2];
889 dirkeys[0] = source.substring(0, idx + 1);
890 dirkeys[1] = source.substring(idx + 1, source.length());
894 public List<String> getPathNoRegular(List<String> searchExprList,
895 FTPInterface ftpCache) throws IOException {
896 boolean isregular = false;
897 List<String> regularList = new ArrayList<>();
898 for (String regular : searchExprList) {
899 Pattern lpattern = null;
901 lpattern = Pattern.compile("(.*/)<([^/]+)>(/.*)");
902 } catch (Exception e) {
903 log.error("[" + regular + "]compile fails:" + e.getMessage());
904 throw new IOException("getPathNoRegular", e);
906 Matcher matcher = null;
907 if (lpattern != null)
908 matcher = lpattern.matcher(regular);
910 // define flow in case lpattern is null
913 if (null != matcher && matcher.find()) {
915 String parpath = matcher.group(1);
917 boolean isin = ftpCache.chdir(parpath);
919 log.info("cd dir [" + parpath + "] sucess");
921 log.error("cd dir [" + parpath + "] fail");
923 } catch (Exception e) {
924 log.error(" cd dir [" + parpath + "]fail", e);
925 throw new IOException("ftpCache.chdir", e);
927 RemoteFile[] remotef = ftpCache.list();
928 for (RemoteFile aremote : remotef) {
929 if (aremote.isDirectory()
930 && aremote.getFileName().matches(matcher.group(2))) {
931 regularList.add(matcher.group(1)
932 + aremote.getFileName() + matcher.group(3));
936 regularList.add(regular);
940 getPathNoRegular(regularList, ftpCache);
945 public MessageChannel getPmResultChannel() {
946 return pmResultChannel;
949 public void setPmResultChannel(MessageChannel pmResChannel) {
950 this.pmResultChannel = pmResChannel;