9b38c9c2051ba373af972868b8375af76bac15c9
[vfc/nfvo/driver/ems.git] / ems / boco / src / main / java / org / onap / vfc / nfvo / emsdriver / collector / TaskThread.java
1 /*
2  * Copyright 2017 BOCO Corporation.  CMCC Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onap.vfc.nfvo.emsdriver.collector;
17
18 import org.apache.commons.io.FileUtils;
19 import org.apache.commons.logging.Log;
20 import org.apache.commons.logging.LogFactory;
21 import org.onap.vfc.nfvo.emsdriver.commons.constant.Constant;
22 import org.onap.vfc.nfvo.emsdriver.commons.ftp.AFtpRemoteFile;
23 import org.onap.vfc.nfvo.emsdriver.commons.ftp.FTPInterface;
24 import org.onap.vfc.nfvo.emsdriver.commons.ftp.FTPSrv;
25 import org.onap.vfc.nfvo.emsdriver.commons.ftp.RemoteFile;
26 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectMsg;
27 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectVo;
28 import org.onap.vfc.nfvo.emsdriver.commons.utils.*;
29 import org.onap.vfc.nfvo.emsdriver.configmgr.ConfigurationImp;
30 import org.onap.vfc.nfvo.emsdriver.configmgr.ConfigurationInterface;
31 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannel;
32 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannelFactory;
33
34 import javax.xml.stream.XMLInputFactory;
35 import javax.xml.stream.XMLStreamConstants;
36 import javax.xml.stream.XMLStreamReader;
37 import java.io.*;
38 import java.text.SimpleDateFormat;
39 import java.util.*;
40 import java.util.regex.Matcher;
41 import java.util.regex.Pattern;
42
43 public class TaskThread implements Runnable {
44
45         private static final Log log = LogFactory.getLog(TaskThread.class);
46         private MessageChannel pmResultChannel;
47         private MessageChannel cmResultChannel;
48         private CollectMsg data;
49         private ConfigurationInterface configurationInterface = new ConfigurationImp();
50         private String localPath = Constant.SYS_DATA_TEMP;
51         private String resultPath = Constant.SYS_DATA_RESULT;
52         private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
53         private SimpleDateFormat dateFormat2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
54
55         public TaskThread(CollectMsg data) {
56                 this.data = data;
57         }
58
59         public TaskThread() {
60                 super();
61         }
62
63         @Override
64         public void run() {
65                 cmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_CHANNEL_KEY);
66                 pmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_PM_CHANNEL_KEY);
67                 try {
68                         collectMsgHandle(data);
69                 } catch (Exception e) {
70                         log.error(" collectMsgHandle", e);
71                 }
72         }
73
74         private void collectMsgHandle(CollectMsg collectMsg) {
75                 String emsName = collectMsg.getEmsName();
76                 String type = collectMsg.getType();
77                 CollectVo collectVo = configurationInterface.getCollectVoByEmsNameAndType(emsName, type);
78                 // ftp download
79                 List<String> downloadfiles = this.ftpDownload(collectVo);
80                 // paser ftp update message send
81                 for (String fileName : downloadfiles) {
82                         this.parseFtpAndSendMessage(fileName, collectVo);
83                 }
84         }
85
86         public void parseFtpAndSendMessage(String fileName, CollectVo collectVo) {
87                 List<File> filelist = decompressed(fileName);
88                 for (File tempfile : filelist) {
89                         String unfileName = tempfile.getName();
90                         Pattern pa = Pattern.compile(".*-(.*)-\\w{2}-");
91                         Matcher ma = pa.matcher(unfileName);
92                         if (!ma.find())
93                                 continue;
94                         //String nename = ma.group(1);
95                         boolean parseResult = false;
96                         if (Constant.COLLECT_TYPE_CM.equalsIgnoreCase(collectVo.getType())) {
97                                 // parseResult = processCMXml(tempfile, nename, "CM");//The
98                                 // logic is not exist now, but need to add in the future
99                         } else {
100                                 if (unfileName.indexOf(".csv") > -1) {// changed to -1 for coding practice as having ".csv" must have some some legal name
101                                         parseResult = processPMCsv(tempfile);
102                                 } else {
103                                         parseResult = processPMXml(tempfile);
104                                 }
105                         }
106                         if (parseResult) {
107                                 log.info("parser " + tempfile + " sucess");
108                                 tempfile.delete();
109                         } else {
110                                 log.info("parser " + tempfile + " fail");
111                         }
112
113                 }
114         }
115
116         public boolean processPMXml(File file) {
117                 try (FileInputStream fis = new FileInputStream(file);
118                                 InputStreamReader isr = new InputStreamReader(fis,Constant.ENCODING_UTF8)) {
119                         XMLInputFactory fac = XMLInputFactory.newInstance();
120                         XMLStreamReader reader = fac.createXMLStreamReader(isr);
121
122                         boolean fileHeaderStart = false;
123                         boolean measurementStart = false;
124                         boolean pmNameFlag = false;
125                         boolean pmDataFlag = false;
126                         boolean objectFlag = true;
127
128                         int index = -1;
129                         int nameIndex = -1;
130                         String currentMea = null;
131                         String subName = null;
132                         String localName = null;
133                         String endLocalName = null;
134                         String objectType = null;
135
136                         LinkedHashMap<String, String> commonNameAndValue = new LinkedHashMap<>();
137                         LinkedHashMap<String, String> pmDatas = null;
138                         LinkedHashMap<Integer, String> pmNames = null;
139
140                         int event = -1;
141                         while (reader.hasNext()) {
142                                 try {
143                                         String fileHeaderConstant = "FileHeader";
144                                         event = reader.next();
145                                         switch (event) {
146                                         case XMLStreamConstants.START_ELEMENT:
147                                                 localName = reader.getLocalName();
148                                                 if (fileHeaderConstant.equalsIgnoreCase(localName)) {
149                                                         fileHeaderStart = true;
150                                                 }
151                                                 if (fileHeaderStart&& !fileHeaderConstant.equalsIgnoreCase(localName)) {
152                                                         commonNameAndValue.put(localName, reader.getElementText().trim());
153                                                 }
154                                                 if ("Measurements".equalsIgnoreCase(localName)) {
155                                                         // a new Measurement starts
156                                                         measurementStart = true;
157                                                 }
158                                                 if (measurementStart) {
159                                                         // measurement handler
160                                                         if ("ObjectType".equalsIgnoreCase(localName)) {
161                                                                 objectType = reader.getElementText().trim();
162                                                                 commonNameAndValue.put("ObjectType", objectType);
163                                                         }
164                                                         if ("PmName".equalsIgnoreCase(localName)) {
165                                                                 pmNameFlag = true;
166                                                                 pmNames = new LinkedHashMap<>();
167
168                                                         }
169                                                         if (pmNameFlag && "N".equalsIgnoreCase(localName)) {
170                                                                 // pmname handler, add columnNames
171                                                                 nameIndex = Integer.parseInt(getXMLAttribute(reader, "i"));
172                                                                 String text = reader.getElementText().trim();
173                                                                 pmNames.put(nameIndex, text);
174                                                         }
175                                                         if ("PmData".equalsIgnoreCase(localName)) {
176                                                                 pmDataFlag = true;
177                                                                 pmDatas = new LinkedHashMap<>();
178                                                         }
179
180                                                         if (pmDataFlag) {
181                                                                 // pmdata handler
182                                                                 if ("Object".equalsIgnoreCase(localName)) {
183                                                                         objectFlag = true;
184                                                                         int n = reader.getAttributeCount();
185                                                                         for (int i = 0; i < n; i++) {
186                                                                                 String name = reader.getAttributeLocalName(i);
187                                                                                 commonNameAndValue.put(name,reader.getAttributeValue(i));
188                                                                         }
189                                                                 }
190                                                                 if (objectFlag) {
191                                                                         // add columnValues
192                                                                         if ("V".equalsIgnoreCase(localName)) {
193                                                                                 String indexStr = getXMLAttribute(reader, "i");
194                                                                                 if (indexStr == null) {
195                                                                                         log.error("ERROR: illegal value index");
196                                                                                         continue;
197                                                                                 }
198                                                                                 index = Integer.parseInt(indexStr);
199                                                                                 String name = pmNames.get(index);
200                                                                                 if (name == null) {
201                                                                                         log.error("illegal data: valueIndex="+ index);
202                                                                                         continue;
203                                                                                 }
204
205                                                                                 String value = reader.getElementText().trim();
206                                                                                 pmDatas.put(name, value);
207                                                                         }
208                                                                         if ("CV".equalsIgnoreCase(localName)) {
209
210                                                                                 String indexStr = getXMLAttribute(reader, "i");
211                                                                                 if (indexStr == null) {
212                                                                                         log.error("ERROR: illegal value index");
213                                                                                         continue;
214                                                                                 }
215                                                                                 index = Integer.parseInt(indexStr);
216
217                                                                                 currentMea = pmNames.get(index);
218                                                                                 if (currentMea == null) {
219                                                                                         log.error("illegal data: valueIndex="+ index);
220                                                                                         continue;
221                                                                                 }
222                                                                         }
223
224                                                                         if ("SN".equalsIgnoreCase(localName)) {
225                                                                                 subName = reader.getElementText().trim();
226
227                                                                         }
228                                                                         if ("SV".equalsIgnoreCase(localName)) {
229                                                                                 String subValue = reader.getElementText().trim();
230                                                                                 pmDatas.put(subName, subValue);
231                                                                         }
232                                                                 }
233                                                         }
234
235                                                 }
236
237                                                 break;
238                                         case XMLStreamConstants.CHARACTERS:
239                                                 // ...
240                                                 break;
241                                         case XMLStreamConstants.END_ELEMENT:
242                                                 endLocalName = reader.getLocalName();
243                                                 if ("Object".equalsIgnoreCase(endLocalName)) {
244                                                         objectFlag = false;
245                                                         pmDatas.putAll(commonNameAndValue);
246                                                         try {
247                                                                 pmResultChannel.put(pmDatas);
248
249                                                         } catch (Exception e) {
250                                                                 pmResultChannel.clear();
251                                                                 log.error("collectResultChannel.put(resultMap) error ",e);
252                                                         }
253                                                 }
254                                                 if ("PmData".equalsIgnoreCase(endLocalName)) {
255                                                         pmDataFlag = false;
256                                                 }
257
258                                                 if ("PmName".equalsIgnoreCase(endLocalName)) {
259                                                         pmNameFlag = false;
260                                                 }
261                                                 if ("Measurements".equalsIgnoreCase(endLocalName)) {
262                                                         // a measurement over
263                                                         measurementStart = false;
264                                                 }
265
266                                                 if (fileHeaderConstant.equalsIgnoreCase(endLocalName)) {
267                                                         fileHeaderStart = false;
268                                                 }
269                                                 break;
270                                         }
271                                 } catch (Exception e) {
272                                         log.error("Exception: ", e);
273                                         event = reader.next();
274                                 }
275                         }
276                         reader.close();
277                 } catch (Exception e) {
278                         log.error("processPMXml is Exception ", e);
279                         return false;
280                 }
281                 return true;
282         }
283
284         private String getXMLAttribute(XMLStreamReader reader, String obj) {
285                 String res = null;
286                 if (obj == null || reader == null) {
287                         return res;
288                 }
289                 int n = reader.getAttributeCount();
290                 for (int i = 0; i < n; i++) {
291                         String name = reader.getAttributeLocalName(i);
292                         if (obj.equalsIgnoreCase(name)) {
293                                 res = reader.getAttributeValue(i);
294                         }
295                 }
296                 return res;
297         }
298
299         public boolean processPMCsv(File tempfile) {
300
301                 List<String> columnNames = new ArrayList<>();
302                 List<String> commonValues = new ArrayList<>();
303                 try (FileInputStream brs = new FileInputStream(tempfile);
304                                 InputStreamReader isr = new InputStreamReader(brs,Constant.ENCODING_UTF8);
305                                 BufferedReader br = new BufferedReader(isr)) {
306
307                         // common field
308                         String commonField = br.readLine();
309                         String[] fields = commonField.split("\\|", -1);
310                         for (String com : fields) {
311                                 String[] comNameAndValue = com.split("=", 2);
312                                 columnNames.add(comNameAndValue[0].trim());
313                                 commonValues.add(comNameAndValue[1]);
314                         }
315                         // column names
316                         String columnName = br.readLine();
317                         String[] names = columnName.split("\\|", -1);
318                         for (String name : names) {
319                                 columnNames.add(name);
320                         }
321
322                         String valueLine = "";
323                         List<String> valuelist = new ArrayList<>();
324
325                         while ((valueLine = br.readLine()) != null) {
326                                 if ("".equals(valueLine.trim())) {
327                                         continue;
328                                 }
329                                 String[] values = valueLine.split("\\|", -1);
330
331                                 valuelist.addAll(commonValues);
332                                 for (String value : values) {
333                                         valuelist.add(value);
334                                 }
335                                 HashMap<String, String> resultMap = this.resultMap(columnNames,valuelist);
336                                 try {
337                                         pmResultChannel.put(resultMap);
338                                 } catch (Exception e) {
339                                         pmResultChannel.clear();
340                                         log.error("collectResultChannel.put(resultMap) error ", e);
341                                 }
342                                 valuelist.clear();
343                         }
344                 } catch (IOException e) {
345                         log.error("processPMCsv is fail ", e);
346                         return false;
347                 }
348                 return true;
349
350         }
351
352         private HashMap<String, String> resultMap(List<String> columnNames,
353                         List<String> valuelist) {
354
355                 HashMap<String, String> resultMap = new HashMap<>();
356                 if (columnNames.size() == valuelist.size()) {
357                         for (int i = 0; i < columnNames.size(); i++) {
358                                 resultMap.put(columnNames.get(i), valuelist.get(i));
359                         }
360                 }
361
362                 return resultMap;
363
364         }
365
366 /*      private boolean processCMXml(File tempfile, String nename, String type) {
367
368                 String csvpath = localPath + nename + "/" + type + "/";
369                 File csvpathfile = new File(csvpath);
370                 if (!csvpathfile.exists()) {
371                         csvpathfile.mkdirs();
372                 }
373                 String csvFileName = nename + dateFormat.format(new Date()) + System.nanoTime();
374                 String csvpathAndFileName = csvpath + csvFileName + ".csv";
375                 try(    FileOutputStream fos = new FileOutputStream(csvpathAndFileName, false);
376                         BufferedOutputStream    bos = new BufferedOutputStream(fos, 10240)){
377
378                         boolean FieldNameFlag = false;
379                         boolean FieldValueFlag = false;
380                         int countNum = 0;
381                         String xmlPathAndFileName = null;
382                         String localName = null;
383                         String endLocalName = null;
384                         String rmUID = null;
385                         int index = -1;
386                         ArrayList<String> names = new ArrayList<>();// colname
387                         LinkedHashMap<String, String> nameAndValue = new LinkedHashMap<>();
388
389                         try(    FileInputStream fis = new FileInputStream(tempfile);
390                                 InputStreamReader isr = new InputStreamReader(fis, Constant.ENCODING_UTF8)){
391
392                                 XMLInputFactory fac = XMLInputFactory.newInstance();
393                                 XMLStreamReader reader = fac.createXMLStreamReader(isr);
394                                 int event = -1;
395                                 boolean setcolum = true;
396                                 while (reader.hasNext()) {
397                                         try {
398                                                 event = reader.next();
399                                                 switch (event) {
400                                                         case XMLStreamConstants.START_ELEMENT:
401                                                                 localName = reader.getLocalName();
402                                                                 if ("FieldName".equalsIgnoreCase(localName)) {
403                                                                         FieldNameFlag = true;
404                                                                 }
405                                                                 if (FieldNameFlag && "N".equalsIgnoreCase(localName)) {
406                                                                         String colName = reader.getElementText().trim();
407                                                                         names.add(colName);
408                                                                 }
409                                                                 if ("FieldValue".equalsIgnoreCase(localName)) {
410                                                                         FieldValueFlag = true;
411
412                                                                 }
413                                                                 if (FieldValueFlag) {
414                                                                         if (setcolum) {
415                                                                                 xmlPathAndFileName = this.setColumnNames(nename, names, type);
416                                                                                 setcolum = false;
417                                                                         }
418
419                                                                         if ("Object".equalsIgnoreCase(localName)) {
420                                                                                 int ac = reader.getAttributeCount();
421                                                                                 for (int i = 0; i < ac; i++) {
422                                                                                         if ("rmUID".equalsIgnoreCase(reader.getAttributeLocalName(i))) {
423                                                                                                 rmUID = reader.getAttributeValue(i).trim();
424                                                                                         }
425                                                                                 }
426                                                                                 nameAndValue.put("rmUID", rmUID);
427                                                                         }
428                                                                         if ("V".equalsIgnoreCase(localName)) {
429                                                                                 index = Integer.parseInt(reader.getAttributeValue(0)) - 1;
430                                                                                 String currentName = names.get(index);
431                                                                                 String v = reader.getElementText().trim();
432                                                                                 nameAndValue.put(currentName, v);
433                                                                         }
434                                                                 }
435                                                                 break;
436                                                         case XMLStreamConstants.CHARACTERS:
437                                                                 break;
438                                                         case XMLStreamConstants.END_ELEMENT:
439                                                                 endLocalName = reader.getLocalName();
440
441                                                                 if ("FieldName".equalsIgnoreCase(endLocalName)) {
442                                                                         FieldNameFlag = false;
443                                                                 }
444                                                                 if ("FieldValue".equalsIgnoreCase(endLocalName)) {
445                                                                         FieldValueFlag = false;
446                                                                 }
447                                                                 if ("Object".equalsIgnoreCase(endLocalName)) {
448                                                                         countNum++;
449                                                                         this.appendLine(nameAndValue, bos);
450                                                                         nameAndValue.clear();
451                                                                 }
452                                                                 break;
453                                                 }
454                                         } catch (Exception e) {
455                                                 log.error("Exception: ",e);
456                                                 event = reader.next();
457                                         }
458                                 }
459
460                                 String[] fileKeys = this.createZipFile(csvpathAndFileName, xmlPathAndFileName, nename);
461                                 // ftp store
462                                 Properties ftpPro = configurationInterface.getProperties();
463                                 String ip = ftpPro.getProperty("ftp_ip");
464                                 String port = ftpPro.getProperty("ftp_port");
465                                 String ftp_user = ftpPro.getProperty("ftp_user");
466                                 String ftp_password = ftpPro.getProperty("ftp_password");
467
468                                 String ftp_passive = ftpPro.getProperty("ftp_passive");
469                                 String ftp_type = ftpPro.getProperty("ftp_type");
470                                 String remoteFile = ftpPro.getProperty("ftp_remote_path");
471                                 this.ftpStore(fileKeys, ip, port, ftp_user, ftp_password, ftp_passive, ftp_type, remoteFile);
472                                 // create Message
473                                 String message = this.createMessage(fileKeys[1], ftp_user, ftp_password, ip, port, countNum, nename);
474
475                                 // set message
476                                 this.setMessage(message);
477
478                         reader.close();
479                         } catch (Exception e) {
480                                 log.error("" + StringUtil.getStackTrace(e));
481                                 return false;
482                         }
483
484                 } catch (FileNotFoundException e1) {
485                         log.error("FileNotFoundException " + StringUtil.getStackTrace(e1));
486                         return false;
487                 }catch (Exception e) {
488                                 log.error("" + StringUtil.getStackTrace(e));
489                                 return false;
490                 }
491                 return true;
492         }*/
493
494         private void setMessage(String message) {
495                 try {
496                         cmResultChannel.put(message);
497                 } catch (Exception e) {
498                         log.error("collectResultChannel.put(message) is error "
499                                         + StringUtil.getStackTrace(e));
500                 }
501         }
502
503         public String createMessage(String zipName, String user, String pwd,
504                         String ip, String port, int countNum, String nename) {
505                 StringBuilder strBuffer = new StringBuilder();
506                 strBuffer
507                                 .append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
508                                                 + "<FILE_DATA_READY_UL xmlns:xsi=\" http://www.w3.org/2001/XMLSchema-instance\">"
509                                                 + "<Header SessionID=\"");
510                 strBuffer.append("");
511                 strBuffer.append("\" LicenceID=\"");
512                 strBuffer.append("");
513                 strBuffer.append("\" SystemID=\"");
514                 strBuffer.append("");
515                 strBuffer.append("\" Time=\"");
516                 strBuffer.append(dateFormat2.format(new Date()));
517                 strBuffer.append("\" PolicyID=\"");
518                 strBuffer.append("");
519                 strBuffer.append("\"/><Body>");
520                 strBuffer.append("<DataCatalog>");
521                 strBuffer.append("");
522                 strBuffer.append("</DataCatalog><GroupID>");
523                 strBuffer.append(nename);
524                 strBuffer.append("</GroupID><DataSourceName>");
525                 strBuffer.append("");
526                 strBuffer.append("</DataSourceName><InstanceID>");
527                 strBuffer.append("");
528                 strBuffer.append("</InstanceID><FileFormat>");
529                 strBuffer.append("csv");
530                 strBuffer.append("</FileFormat><CharSet>");
531                 strBuffer.append("gbk");
532                 strBuffer.append("</CharSet><FieldSeparator>");
533                 strBuffer.append("|");
534                 strBuffer.append("</FieldSeparator><IsCompressed>");
535                 strBuffer.append("true");
536                 strBuffer.append("</IsCompressed><StartTime>");
537                 strBuffer.append(dateFormat2.format(new Date()));
538                 strBuffer.append("</StartTime><EndTime>");
539                 strBuffer.append("");
540                 strBuffer.append("</EndTime><FileList>");
541                 strBuffer.append(zipName);
542                 strBuffer.append("</FileList><ConnectionString>");
543                 strBuffer.append("ftp://" + user + ":" + pwd + "@" + ip + ":" + port);
544                 strBuffer.append("</ConnectionString>");
545                 strBuffer.append("<DataCount>");
546                 strBuffer.append(countNum);
547                 strBuffer.append("</DataCount>");
548
549                 strBuffer.append("<FileSize>").append("").append("</FileSize>");
550                 strBuffer.append("<DataGranularity>").append("")
551                                 .append("</DataGranularity>");
552
553                 strBuffer.append("</Body></FILE_DATA_READY_UL>");
554                 return strBuffer.toString();
555
556         }
557
558         private void ftpStore(String[] fileKeys, String ip, String port,
559                         String ftpUser, String ftpPassword, String ftpPassive,
560                         String ftpType, String remoteFile) {
561                 String zipFilePath = fileKeys[0];
562
563                 FTPInterface ftpClient;
564                 ftpClient = new FTPSrv();
565                 // login
566                 try {
567                         ftpClient.login(ip, Integer.parseInt(port), ftpUser, ftpPassword,
568                                         "GBK", Boolean.parseBoolean(ftpPassive), 5 * 60 * 1000);
569                 } catch (Exception e) {
570                         log.error("login fail,ip=[" + ip + "] port=[" + port + "] user=["
571                                         + ftpUser + "]pwd=[" + ftpPassword + "]"
572                                         + StringUtil.getStackTrace(e));
573                         return;
574                 }
575                 log.debug("store  [" + zipFilePath + "]to[" + remoteFile + "]");
576
577                 FileUtils.deleteQuietly(new File(zipFilePath));
578
579         }
580
581         private String[] createZipFile(String csvpathAndFileName, String xmlPathAndFileName, String nename) throws IOException {
582                 String zipPath = resultPath + nename + dateFormat.format(new Date())
583                                 + "_" + System.nanoTime();
584
585                 File destDir = new File(zipPath);
586                 destDir.mkdirs();
587                 try {
588                         FileUtils
589                                         .copyFileToDirectory(new File(csvpathAndFileName), destDir);
590                         FileUtils
591                                         .copyFileToDirectory(new File(xmlPathAndFileName), destDir);
592                 } catch (IOException e) {
593                         throw new IOException("createZipFile", e);
594                 }
595
596                 String destFilePath = zipPath + ".zip";
597                 try {
598                         Zip zip = new Zip(destDir.getAbsolutePath(), destFilePath);
599                         zip.setCompressLevel(9);
600                         zip.compress();
601
602                         FileUtils.deleteDirectory(destDir);
603                 } catch (IOException e) {
604                         log.error("zip.compress() is fail " + StringUtil.getStackTrace(e));
605                         throw new IOException("createZipFile", e);
606                 }
607                 return new String[] { destFilePath, zipPath + ".zip" };
608         }
609
610         private String setColumnNames(String nename, List<String> names, String type) {
611                 // write xml
612                 String xmlpath = localPath + nename + "/" + type + "/";
613                 File xmlpathfile = new File(xmlpath);
614                 if (!xmlpathfile.exists()) {
615                         xmlpathfile.mkdirs();
616                 }
617                 String xmlFileName = nename + dateFormat.format(new Date()) + System.nanoTime();
618                 String fieldLine = "";
619                 for (int i = 0; i < names.size(); i++) {
620                         String field = "\t<Field>\r\n" + "\t\t<FieldNo>" + i
621                                         + "</FieldNo>\r\n" + "\t\t<FieldName>" + names.get(i)
622                                         + "</FieldName>\r\n" + "\t\t<FieldType>" + names.get(i)
623                                         + "</FieldType>\r\n" + "\t\t<FieldNameOther>"
624                                         + names.get(i) + "</FieldNameOther>\r\n" + "\t</Field>\r\n";
625                         fieldLine = fieldLine + field;
626                 }
627
628                 String str = "<?xml version=\"1.0\" encoding=\"gbk\"?>\r\n"
629                                 + "<xml>\r\n" + "<FILE_STRUCTURE>\r\n" + fieldLine
630                                 + "</FILE_STRUCTURE>\r\n" + "</xml>\r\n";
631                 String xmlPathAndFileName = xmlpath + xmlFileName + ".xml";
632                 try {
633                         this.writeDetail(xmlPathAndFileName, str);
634                 } catch (Exception e) {
635                         log.error("writeDetail is fail ,xmlFileName=" + xmlFileName
636                                         + StringUtil.getStackTrace(e));
637                 }
638
639                 return xmlPathAndFileName;
640         }
641
642         private void writeDetail(String detailFileName, String str)
643                         throws IOException {
644                 try (OutputStream readOut = new FileOutputStream(new File(
645                                 detailFileName), false);
646                                 OutputStreamWriter writer = new OutputStreamWriter(readOut)) {
647                         writer.write(str);
648                         writer.flush();
649                 } catch (IOException e) {
650                         throw new IOException("writeDetail", e);
651                 }
652         }
653
654         private void appendLine(LinkedHashMap<String, String> nameAndValue,
655                         BufferedOutputStream bos) {
656                 try {
657                         StringBuilder lineDatas = new StringBuilder();
658
659                         for (String key : nameAndValue.keySet()) {
660                                 lineDatas.append(nameAndValue.get(key)).append("|");
661                         }
662
663                         /*
664                          * for (HashMap.Entry<String, String> entry :
665                          * nameAndValue.entrySet()) {
666                          * lineDatas.append(entry.getValue()).append("|"); }
667                          */
668                         bos.write(lineDatas.toString().getBytes());
669                         bos.write("\n".getBytes());
670                 } catch (IOException e) {
671                         log.error("appendLine error " + StringUtil.getStackTrace(e));
672                 }
673         }
674
675         public List<File> decompressed(String fileName) {
676                 List<File> filelist = new ArrayList<>();
677
678                 if (fileName.indexOf(".gz") > -1) {
679                         try {
680                                 File decompressFile = deGz(fileName);
681                                 filelist.add(decompressFile);
682                                 new File(fileName).delete();
683                         } catch (IOException e) {
684                                 log.error("decompressed is fail " + StringUtil.getStackTrace(e));
685                         }
686                 } else if (fileName.indexOf(".zip") > -1) {
687                         try {
688                                 File[] files = deZip(new File(fileName));
689                                 new File(fileName).delete();
690                                 for (File temp : files) {
691                                         filelist.add(temp);
692                                 }
693                         } catch (Exception e) {
694                                 log.error("decompressed is fail " + StringUtil.getStackTrace(e));
695                         }
696                 } else {
697                         filelist.add(new File(fileName));
698                 }
699
700                 return filelist;
701         }
702
703         private File deGz(String gzFileName) throws IOException {
704                 Gunzip gunzip = new Gunzip();
705                 String orgFile = gzFileName.replace(".gz", "");
706                 gunzip.unCompress(gzFileName, orgFile);
707                 return new File(orgFile);
708         }
709
710         public File[] deZip(File file) throws IOException {
711
712                 String regx = "(.*).zip";
713                 Pattern p = Pattern.compile(regx);
714                 Matcher m = p.matcher(file.getName());
715                 if (m.find()) {
716                         String orgFile = localPath + m.group(1) + "/";
717                         UnZip unzip = new UnZip(file.getAbsolutePath(), orgFile);
718                         unzip.deCompress();
719                         file = new File(orgFile);
720                 }
721                 return file.listFiles();
722
723         }
724
725         private List<String> ftpDownload(CollectVo collectVo) {
726
727                 List<String> fileList = new ArrayList<>();
728                 // IP
729                 String ip = collectVo.getIP();
730                 // port
731                 String port = collectVo.getPort();
732                 // user
733                 String user = collectVo.getUser();
734                 // password
735                 String password = collectVo.getPassword();
736                 // isPassiveMode
737                 String passivemode = collectVo.getPassive();
738
739                 FTPInterface ftpClient = new FTPSrv();
740
741                 // login
742                 try {
743                         log.info("ftp login ,ip=[" + ip + "] port=[" + port + "] user=["
744                                         + user + /* "]password=[" + password + */"]");
745                         ftpClient.login(ip, Integer.parseInt(port), user, password, "GBK",
746                                         Boolean.parseBoolean(passivemode), 5 * 60 * 1000);
747                 } catch (Exception e) {
748                         log.error("login fail,ip=[" + ip + "] port=[" + port + "] user=["
749                                         + user + /* "]password=[" + password + */"]"
750                                         + StringUtil.getStackTrace(e));
751                         return fileList;
752                 }
753
754                 // download
755                 String dir = collectVo.getRemotepath();
756                 List<String> searchExprList = new ArrayList<>();
757                 String[] fPath = dir.split(";");
758                 for (int i = 0; i < fPath.length; i++) {
759                         int oldSize = searchExprList.size();
760                         String conpath = fPath[i] + collectVo.getMatch();
761                         HashMap<String, String> varMap = new HashMap<>();
762                         long collectPeriod = 900;
763                         try {
764                                 collectPeriod = Long.parseLong(collectVo.getGranularity()) * 60;
765                                 log.info("collectPeriod =[" + collectPeriod + "]");
766                         } catch (NumberFormatException e) {
767                                 log.error("NumberFormatException", e);
768                         }
769                         long[] d = DateUtil.getScanScope(new Date(), collectPeriod);
770                         searchExprList.add(VarExprParser.replaceVar(conpath, d[0], d[1]));
771
772                         varMap.clear();
773                         log.info("[" + conpath + "] result["
774                                         + (searchExprList.size() - oldSize) + "] path");
775                 }
776                 String nowdir = null;
777                 try {
778                         nowdir = ftpClient.pwd();
779                         searchExprList = getPathNoRegular(searchExprList, ftpClient);
780                 } catch (Exception e1) {
781                         log.error(" collect fail ", e1);
782                         return fileList;
783                 }
784                 List<AFtpRemoteFile> remoteFiles = new ArrayList<>();
785                 for (String expr : searchExprList) {
786                         ftpClient.chdir(nowdir);
787                         String[] keys = parseExprKeys(expr);
788                         String ftpRegular = keys[1];
789                         String ftpDir = keys[0];
790
791                         boolean cdsucess = ftpClient.chdir(ftpDir);
792                         if (cdsucess) {
793                                 AFtpRemoteFile[] arf = (AFtpRemoteFile[]) ftpClient.list();
794                                 log.info(" list [" + ftpDir + "] result["
795                                                 + (arf == null ? "null" : arf.length) + "] files");
796                                 // filter
797
798                                 rfileFilter(remoteFiles, arf, ftpRegular);
799
800                                 for (AFtpRemoteFile ftpRemoteFile : remoteFiles) {
801                                         if (!new File(localPath).exists()) {
802                                                 try {
803                                                         new File(localPath).mkdir();
804                                                 } catch (Exception e) {
805                                                         log.error("create localPath is fail localPath="
806                                                                         + localPath + " "
807                                                                         + StringUtil.getStackTrace(e));
808                                                 }
809                                         }
810
811                                         if (!new File(localPath).exists()) {
812                                                 new File(localPath).mkdirs();
813                                         }
814
815                                         String localFileName = localPath
816                                                         + ftpRemoteFile.getFileName();
817                                         if (new File(localFileName).exists()) {
818                                                 new File(localFileName).delete();
819                                         }
820
821                                         boolean flag = ftpClient.downloadFile(
822                                                         ftpRemoteFile.getAbsFileName(), localFileName);
823
824                                         if (flag) {
825                                                 fileList.add(localFileName);
826                                         } else {
827                                                 log.error("download file fail fileName="
828                                                                 + ftpRemoteFile.getAbsFileName());
829                                         }
830                                 }
831
832                         } else {
833                                 log.error("cd dir is faill dir =[" + ftpDir + "]");
834                         }
835                 }
836
837                 return fileList;
838         }
839
840         private void rfileFilter(List<AFtpRemoteFile> fileContainer,
841                         AFtpRemoteFile[] arfs, String ftpRegular) {
842                 if (ftpRegular != null && ftpRegular.length() > 0) {
843                         Pattern pattern = null;
844                         try {
845                                 pattern = Pattern.compile(ftpRegular, Pattern.CASE_INSENSITIVE);
846                         } catch (Exception e) {
847                                 log.info("[" + ftpRegular + "]Pattern.compile exception:", e);
848                                 // should rethrow exception or return from here
849                         }
850                         int hisSize = fileContainer.size();
851                         for (int j = 0; arfs != null && j < arfs.length; j++) {
852                                 String fileName = parseFileName(arfs[j].getFileName());
853                                 Matcher matcher = null;
854                                 if (pattern != null)
855                                         matcher = pattern.matcher(fileName);
856                                 else {
857                                         // define the flow when pattern is null
858                                 }
859
860                                 if (null != matcher && matcher.find())
861                                         fileContainer.add(arfs[j]);
862                         }
863                         log.info("[" + ftpRegular + "]filter["
864                                         + (fileContainer.size() - hisSize) + "]filse");
865                 } else {
866                         for (int j = 0; arfs != null && j < arfs.length; j++)
867                                 fileContainer.add(arfs[j]);
868                 }
869
870         }
871
872         private String parseFileName(String fileName) {
873                 int idx = fileName.lastIndexOf("/");
874                 if (idx == -1)
875                         return fileName;
876                 return fileName.substring(idx + 1, fileName.length());
877         }
878
879         private String[] parseExprKeys(String source) {
880
881                 if (source.indexOf(";") > -1) {
882                         source = source.substring(0, source.indexOf(";"));
883                 }
884                 if (source.endsWith("/"))
885                         return new String[] { source, "" };
886
887                 int idx = source.lastIndexOf("/");
888                 String[] dirkeys = new String[2];
889                 dirkeys[0] = source.substring(0, idx + 1);
890                 dirkeys[1] = source.substring(idx + 1, source.length());
891                 return dirkeys;
892         }
893
894         public List<String> getPathNoRegular(List<String> searchExprList,
895                         FTPInterface ftpCache) throws IOException {
896                 boolean isregular = false;
897                 List<String> regularList = new ArrayList<>();
898                 for (String regular : searchExprList) {
899                         Pattern lpattern = null;
900                         try {
901                                 lpattern = Pattern.compile("(.*/)<([^/]+)>(/.*)");
902                         } catch (Exception e) {
903                                 log.error("[" + regular + "]compile fails:" + e.getMessage());
904                                 throw new IOException("getPathNoRegular", e);
905                         }
906                         Matcher matcher = null;
907                         if (lpattern != null)
908                                 matcher = lpattern.matcher(regular);
909                         else {
910                                 // define flow in case lpattern is null
911                         }
912
913                         if (null != matcher && matcher.find()) {
914                                 isregular = true;
915                                 String parpath = matcher.group(1);
916                                 try {
917                                         boolean isin = ftpCache.chdir(parpath);
918                                         if (isin) {
919                                                 log.info("cd dir [" + parpath + "] sucess");
920                                         } else {
921                                                 log.error("cd dir [" + parpath + "] fail");
922                                         }
923                                 } catch (Exception e) {
924                                         log.error(" cd dir [" + parpath + "]fail", e);
925                                         throw new IOException("ftpCache.chdir", e);
926                                 }
927                                 RemoteFile[] remotef = ftpCache.list();
928                                 for (RemoteFile aremote : remotef) {
929                                         if (aremote.isDirectory()
930                                                         && aremote.getFileName().matches(matcher.group(2))) {
931                                                 regularList.add(matcher.group(1)
932                                                                 + aremote.getFileName() + matcher.group(3));
933                                         }
934                                 }
935                         } else {
936                                 regularList.add(regular);
937                         }
938                 }
939                 if (isregular) {
940                         getPathNoRegular(regularList, ftpCache);
941                 }
942                 return regularList;
943         }
944
945         public MessageChannel getPmResultChannel() {
946                 return pmResultChannel;
947         }
948
949         public void setPmResultChannel(MessageChannel pmResChannel) {
950                 this.pmResultChannel = pmResChannel;
951         }
952 }