sonar fix:Exception handling in emsdriver
[vfc/nfvo/driver/ems.git] / ems / boco / src / main / java / org / onap / vfc / nfvo / emsdriver / collector / TaskThread.java
1 /*
2  * Copyright 2017 BOCO Corporation.  CMCC Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onap.vfc.nfvo.emsdriver.collector;
17
18 import org.apache.commons.io.FileUtils;
19 import org.apache.commons.logging.Log;
20 import org.apache.commons.logging.LogFactory;
21 import org.onap.vfc.nfvo.emsdriver.commons.constant.Constant;
22 import org.onap.vfc.nfvo.emsdriver.commons.ftp.AFtpRemoteFile;
23 import org.onap.vfc.nfvo.emsdriver.commons.ftp.FTPInterface;
24 import org.onap.vfc.nfvo.emsdriver.commons.ftp.FTPSrv;
25 import org.onap.vfc.nfvo.emsdriver.commons.ftp.RemoteFile;
26 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectMsg;
27 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectVo;
28 import org.onap.vfc.nfvo.emsdriver.commons.utils.*;
29 import org.onap.vfc.nfvo.emsdriver.configmgr.ConfigurationImp;
30 import org.onap.vfc.nfvo.emsdriver.configmgr.ConfigurationInterface;
31 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannel;
32 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannelFactory;
33
34 import javax.xml.stream.XMLInputFactory;
35 import javax.xml.stream.XMLStreamConstants;
36 import javax.xml.stream.XMLStreamReader;
37 import java.io.*;
38 import java.text.SimpleDateFormat;
39 import java.util.*;
40 import java.util.regex.Matcher;
41 import java.util.regex.Pattern;
42
43 public class TaskThread implements Runnable {
44
45         private static final Log log = LogFactory.getLog(TaskThread.class);
46         private MessageChannel pmResultChannel; 
47         private MessageChannel cmResultChannel;
48         private CollectMsg data;
49
50         private ConfigurationInterface configurationInterface = new ConfigurationImp();
51
52         private String localPath = Constant.SYS_DATA_TEMP;
53         private String resultPath = Constant.SYS_DATA_RESULT;
54
55         private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
56
57         private SimpleDateFormat dateFormat2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
58
59         public TaskThread(CollectMsg data) {
60                 this.data = data;
61         }
62
63         public TaskThread() {
64                 super();
65         }
66
67         @Override
68         public void run() {
69
70                 cmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_CHANNEL_KEY);
71                 pmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_PM_CHANNEL_KEY);
72                 try {
73                         collectMsgHandle(data);
74                 } catch (Exception e) {
75                         log.error(" collectMsgHandle", e);
76                 }
77         }
78
79         private void collectMsgHandle(CollectMsg collectMsg) {
80                 String emsName = collectMsg.getEmsName();
81                 String type = collectMsg.getType();
82                 CollectVo collectVo = configurationInterface.getCollectVoByEmsNameAndType(emsName, type);
83
84                 // ftp download
85                 List<String> downloadfiles = this.ftpDownload(collectVo);
86                 // paser ftp update message send
87                 for (String fileName : downloadfiles) {
88                         this.parseFtpAndSendMessage(fileName, collectVo);
89                 }
90         }
91
92         public void parseFtpAndSendMessage(String fileName, CollectVo collectVo) {
93                 //
94                 List<File> filelist = decompressed(fileName);
95
96                 for (File tempfile : filelist) {
97
98                         String unfileName = tempfile.getName();
99
100                         Pattern pa = Pattern.compile(".*-(.*)-\\w{2}-");
101                         Matcher ma = pa.matcher(unfileName);
102                         if (!ma.find())
103                                 continue;
104                         String nename = ma.group(1);
105                         boolean parseResult = false;
106                         if (Constant.COLLECT_TYPE_CM.equalsIgnoreCase(collectVo.getType())) {
107                                 parseResult = processCMXml(tempfile, nename, "CM");
108                         } else {
109                                 if (unfileName.indexOf(".csv") > -1) {//changed to -1 for coding practice as having ".csv" must have some some legal name
110                                         parseResult = processPMCsv(tempfile);
111                                 } else {
112                                         parseResult = processPMXml(tempfile);
113                                 }
114                         }
115
116                         if (parseResult) {
117                                 log.info("parser " + tempfile + " sucess");
118                                 tempfile.delete();
119                         } else {
120                                 log.info("parser " + tempfile + " fail");
121                         }
122
123                 }
124         }
125
126         public boolean processPMXml(File file) {
127                 try (   FileInputStream fis = new FileInputStream(file);
128                         InputStreamReader isr = new InputStreamReader(fis, Constant.ENCODING_UTF8)){
129
130                         XMLInputFactory fac = XMLInputFactory.newInstance();
131                         XMLStreamReader reader = fac.createXMLStreamReader(isr);
132
133                         boolean fileHeaderStart = false;
134                         boolean measurementStart = false;
135                         boolean pmNameFlag = false;
136                         boolean pmDataFlag = false;
137                         boolean objectFlag = true;
138
139                         int index = -1;
140                         int nameIndex = -1;
141                         String currentMea = null;
142                         String subName = null;
143                         String localName = null;
144                         String endLocalName = null;
145                         String objectType = null;
146
147                         LinkedHashMap<String, String> commonNameAndValue = new LinkedHashMap<>();
148                         LinkedHashMap<String, String> pmDatas = null;
149                         LinkedHashMap<Integer, String> pmNames = null;
150
151                         int event = -1;
152                         while (reader.hasNext()) {
153                                 try {
154                                         event = reader.next();
155
156                                         switch (event) {
157                                         case XMLStreamConstants.START_ELEMENT:
158                                                 localName = reader.getLocalName();
159                                                 if ("FileHeader".equalsIgnoreCase(localName)) {
160                                                         fileHeaderStart = true;
161                                                 }
162                                                 if (fileHeaderStart && !"FileHeader".equalsIgnoreCase(localName)) {
163                                                         commonNameAndValue.put(localName, reader.getElementText().trim());
164                                                 }
165                                                 if ("Measurements".equalsIgnoreCase(localName)) {
166                                                         // a new Measurement starts
167                                                         measurementStart = true;
168                                                 }
169                                                 if (measurementStart) {
170                                                         // measurement handler
171                                                         if ("ObjectType".equalsIgnoreCase(localName)) {
172                                                                 objectType = reader.getElementText().trim();
173                                                                 commonNameAndValue.put("ObjectType", objectType);
174                                                         }
175                                                         if ("PmName".equalsIgnoreCase(localName)) {
176                                                                 pmNameFlag = true;
177                                                                 pmNames = new LinkedHashMap<Integer, String>();
178
179                                                         }
180                                                         if (pmNameFlag && "N".equalsIgnoreCase(localName)) {
181                                                                 // pmname handler, add columnNames
182                                                                 nameIndex = Integer.parseInt(getXMLAttribute(reader, "i"));
183                                                                 String text = reader.getElementText().trim();
184                                                                 pmNames.put(nameIndex, text);
185                                                         }
186                                                         if ("PmData".equalsIgnoreCase(localName)) {
187                                                                 pmDataFlag = true;
188                                                                 pmDatas = new LinkedHashMap<String, String>();
189                                                         }
190
191                                                         if (pmDataFlag) {
192                                                                 // pmdata handler
193                                                                 if ("Object".equalsIgnoreCase(localName)) {
194                                                                         objectFlag = true;
195                                                                         int n = reader.getAttributeCount();
196                                                                         for (int i = 0; i < n; i++) {
197                                                                                 String name = reader.getAttributeLocalName(i);
198                                                                                 commonNameAndValue.put(name, reader.getAttributeValue(i));
199                                                                         }
200                                                                 }
201                                                                 if (objectFlag) {
202                                                                         // add columnValues
203                                                                         if ("V".equalsIgnoreCase(localName)) {
204                                                                                 String indexStr = getXMLAttribute(reader, "i");
205                                                                                 if (indexStr == null) {
206                                                                                         log.error("ERROR: illegal value index");
207                                                                                         continue;
208                                                                                 }
209                                                                                 index = Integer.parseInt(indexStr);
210                                                                                 String name = pmNames.get(index);
211                                                                                 if (name == null) {
212                                                                                         log.error("illegal data: valueIndex=" + index);
213                                                                                         continue;
214                                                                                 }
215
216                                                                                 String value = reader.getElementText().trim();
217                                                                                 pmDatas.put(name, value);
218                                                                         }
219                                                                         if ("CV".equalsIgnoreCase(localName)) {
220
221                                                                                 String indexStr = getXMLAttribute(reader, "i");
222                                                                                 if (indexStr == null) {
223                                                                                         log.error("ERROR: illegal value index");
224                                                                                         continue;
225                                                                                 }
226                                                                                 index = Integer.parseInt(indexStr);
227
228                                                                                 currentMea = pmNames.get(index);
229                                                                                 if (currentMea == null) {
230                                                                                         log.error("illegal data: valueIndex=" + index);
231                                                                                         continue;
232                                                                                 }
233                                                                         }
234
235                                                                         if ("SN".equalsIgnoreCase(localName)) {
236                                                                                 subName = reader.getElementText().trim();
237
238                                                                         }
239                                                                         if ("SV".equalsIgnoreCase(localName)) {
240                                                                                 String subValue = reader.getElementText().trim();
241                                                                                 pmDatas.put(subName, subValue);
242                                                                         }
243                                                                 }
244                                                         }
245
246                                                 }
247
248                                                 break;
249                                         case XMLStreamConstants.CHARACTERS:
250                                                 // ...
251                                                 break;
252                                         case XMLStreamConstants.END_ELEMENT:
253                                                 endLocalName = reader.getLocalName();
254                                                 if ("Object".equalsIgnoreCase(endLocalName)) {
255                                                         objectFlag = false;
256                                                         pmDatas.putAll(commonNameAndValue);
257                                                         try {
258                                                                 pmResultChannel.put(pmDatas);
259
260                                                         } catch (Exception e) {
261                                                                 pmResultChannel.clear();        
262                                                                 log.error("collectResultChannel.put(resultMap) error ", e);
263                                                         }
264                                                 }
265                                                 if ("PmData".equalsIgnoreCase(endLocalName)) {
266                                                         pmDataFlag = false;
267                                                 }
268
269                                                 if ("PmName".equalsIgnoreCase(endLocalName)) {
270                                                         pmNameFlag = false;
271                                                 }
272                                                 if ("Measurements".equalsIgnoreCase(endLocalName)) {
273                                                         // a measurement over
274                                                         measurementStart = false;
275                                                 }
276
277                                                 if ("FileHeader".equalsIgnoreCase(endLocalName)) {
278                                                         fileHeaderStart = false;
279                                                 }
280                                                 break;
281                                         }
282                                 } catch (Exception e) {
283                                         log.error("Exception: ", e);
284                                         event = reader.next();
285                                 }
286                         }
287                 reader.close();
288                 } catch (Exception e) {
289                         log.error("processPMXml is Exception ", e);
290                         return false;
291                 } 
292                 return true;
293         }
294
295         private String getXMLAttribute(XMLStreamReader reader, String obj) {
296                 String res = null;
297                 if (obj == null || reader == null) {
298                         return res;
299                 }
300                 int n = reader.getAttributeCount();
301                 for (int i = 0; i < n; i++) {
302                         String name = reader.getAttributeLocalName(i);
303                         if (obj.equalsIgnoreCase(name)) {
304                                 res = reader.getAttributeValue(i);
305                         }
306                 }
307                 return res;
308         }
309
310         public boolean processPMCsv(File tempfile) {
311
312                 List<String> columnNames = new ArrayList<>();
313                 List<String> commonValues = new ArrayList<>();
314                 try (FileInputStream brs = new FileInputStream(tempfile);
315                                 InputStreamReader isr = new InputStreamReader(brs, Constant.ENCODING_UTF8);
316                                 BufferedReader br = new BufferedReader(isr)) {
317
318                         // common field
319                         String commonField = br.readLine();
320                         String[] fields = commonField.split("\\|", -1);
321                         for (String com : fields) {
322                                 String[] comNameAndValue = com.split("=", 2);
323                                 columnNames.add(comNameAndValue[0].trim());
324                                 commonValues.add(comNameAndValue[1]);
325                         }
326                         // column names
327                         String columnName = br.readLine();
328                         String[] names = columnName.split("\\|", -1);
329                         for (String name : names) {
330                                 columnNames.add(name);
331                         }
332
333                         String valueLine = "";
334                         List<String> valuelist = new ArrayList<>();
335
336                         while ((valueLine = br.readLine()) != null) {
337                                 if ("".equals(valueLine.trim())) {
338                                         continue;
339                                 }
340                                 String[] values = valueLine.split("\\|", -1);
341
342                                 valuelist.addAll(commonValues);
343                                 for (String value : values) {
344                                         valuelist.add(value);
345                                 }
346                                 HashMap<String, String> resultMap = this.resultMap(columnNames, valuelist);
347                                 try {
348                                         pmResultChannel.put(resultMap);
349                                 } catch (Exception e) {
350                                         pmResultChannel.clear();        
351                                         log.error("collectResultChannel.put(resultMap) error ", e);
352                                 }
353                                 valuelist.clear();
354                         }
355                 } catch (IOException e) {
356                         log.error("processPMCsv is fail ", e);
357                         return false;
358                 }
359                 return true;
360
361         }
362
363         private HashMap<String, String> resultMap(List<String> columnNames, List<String> valuelist) {
364
365                 HashMap<String, String> resultMap = new HashMap<>();
366                 if (columnNames.size() == valuelist.size()) {
367                         for (int i = 0; i < columnNames.size(); i++) {
368                                 resultMap.put(columnNames.get(i), valuelist.get(i));
369                         }
370                 }
371
372                 return resultMap;
373
374         }
375
376         private boolean processCMXml(File tempfile, String nename, String type) {
377
378                 String csvpath = localPath + nename + "/" + type + "/";
379                 File csvpathfile = new File(csvpath);
380                 if (!csvpathfile.exists()) {
381                         csvpathfile.mkdirs();
382                 }
383                 String csvFileName = nename + dateFormat.format(new Date()) + System.nanoTime();
384                 String csvpathAndFileName = csvpath + csvFileName + ".csv";
385                 try(    FileOutputStream fos = new FileOutputStream(csvpathAndFileName, false);
386                         BufferedOutputStream    bos = new BufferedOutputStream(fos, 10240)){
387
388                         boolean FieldNameFlag = false;
389                         boolean FieldValueFlag = false;
390                         int countNum = 0;
391                         String xmlPathAndFileName = null;
392                         String localName = null;
393                         String endLocalName = null;
394                         String rmUID = null;
395                         int index = -1;
396                         ArrayList<String> names = new ArrayList<>();// colname
397                         LinkedHashMap<String, String> nameAndValue = new LinkedHashMap<>();
398
399                         try(    FileInputStream fis = new FileInputStream(tempfile);
400                                 InputStreamReader isr = new InputStreamReader(fis, Constant.ENCODING_UTF8)){
401
402                                 XMLInputFactory fac = XMLInputFactory.newInstance();
403                                 XMLStreamReader reader = fac.createXMLStreamReader(isr);
404                                 int event = -1;
405                                 boolean setcolum = true;
406                                 while (reader.hasNext()) {
407                                         try {
408                                                 event = reader.next();
409                                                 switch (event) {
410                                                         case XMLStreamConstants.START_ELEMENT:
411                                                                 localName = reader.getLocalName();
412                                                                 if ("FieldName".equalsIgnoreCase(localName)) {
413                                                                         FieldNameFlag = true;
414                                                                 }
415                                                                 if (FieldNameFlag && "N".equalsIgnoreCase(localName)) {
416                                                                         String colName = reader.getElementText().trim();
417                                                                         names.add(colName);
418                                                                 }
419                                                                 if ("FieldValue".equalsIgnoreCase(localName)) {
420                                                                         FieldValueFlag = true;
421
422                                                                 }
423                                                                 if (FieldValueFlag) {
424                                                                         if (setcolum) {
425                                                                                 xmlPathAndFileName = this.setColumnNames(nename, names, type);
426                                                                                 setcolum = false;
427                                                                         }
428
429                                                                         if ("Object".equalsIgnoreCase(localName)) {
430                                                                                 int ac = reader.getAttributeCount();
431                                                                                 for (int i = 0; i < ac; i++) {
432                                                                                         if ("rmUID".equalsIgnoreCase(reader.getAttributeLocalName(i))) {
433                                                                                                 rmUID = reader.getAttributeValue(i).trim();
434                                                                                         }
435                                                                                 }
436                                                                                 nameAndValue.put("rmUID", rmUID);
437                                                                         }
438                                                                         if ("V".equalsIgnoreCase(localName)) {
439                                                                                 index = Integer.parseInt(reader.getAttributeValue(0)) - 1;
440                                                                                 String currentName = names.get(index);
441                                                                                 String v = reader.getElementText().trim();
442                                                                                 nameAndValue.put(currentName, v);
443                                                                         }
444                                                                 }
445                                                                 break;
446                                                         case XMLStreamConstants.CHARACTERS:
447                                                                 break;
448                                                         case XMLStreamConstants.END_ELEMENT:
449                                                                 endLocalName = reader.getLocalName();
450
451                                                                 if ("FieldName".equalsIgnoreCase(endLocalName)) {
452                                                                         FieldNameFlag = false;
453                                                                 }
454                                                                 if ("FieldValue".equalsIgnoreCase(endLocalName)) {
455                                                                         FieldValueFlag = false;
456                                                                 }
457                                                                 if ("Object".equalsIgnoreCase(endLocalName)) {
458                                                                         countNum++;
459                                                                         this.appendLine(nameAndValue, bos);
460                                                                         nameAndValue.clear();
461                                                                 }
462                                                                 break;
463                                                 }
464                                         } catch (Exception e) {
465                                                 log.error("Exception: ",e);
466                                                 event = reader.next();
467                                         }
468                                 }
469
470                                 String[] fileKeys = this.createZipFile(csvpathAndFileName, xmlPathAndFileName, nename);
471                                 // ftp store
472                                 Properties ftpPro = configurationInterface.getProperties();
473                                 String ip = ftpPro.getProperty("ftp_ip");
474                                 String port = ftpPro.getProperty("ftp_port");
475                                 String ftp_user = ftpPro.getProperty("ftp_user");
476                                 String ftp_password = ftpPro.getProperty("ftp_password");
477
478                                 String ftp_passive = ftpPro.getProperty("ftp_passive");
479                                 String ftp_type = ftpPro.getProperty("ftp_type");
480                                 String remoteFile = ftpPro.getProperty("ftp_remote_path");
481                                 this.ftpStore(fileKeys, ip, port, ftp_user, ftp_password, ftp_passive, ftp_type, remoteFile);
482                                 // create Message
483                                 String message = this.createMessage(fileKeys[1], ftp_user, ftp_password, ip, port, countNum, nename);
484
485                                 // set message
486                                 this.setMessage(message);
487
488                         reader.close();
489                         } catch (Exception e) {
490                                 log.error("" + StringUtil.getStackTrace(e));
491                                 return false;
492                         }
493
494                 } catch (FileNotFoundException e1) {
495                         log.error("FileNotFoundException " + StringUtil.getStackTrace(e1));
496                         return false;
497                 }catch (Exception e) {
498                                 log.error("" + StringUtil.getStackTrace(e));
499                                 return false;
500                 }
501                 return true;
502         }
503
504         private void setMessage(String message) {
505                 try {
506                         cmResultChannel.put(message);
507                 } catch (Exception e) {
508                         log.error("collectResultChannel.put(message) is error " + StringUtil.getStackTrace(e));
509                 }
510         }
511
512         public String createMessage(String zipName, String user, String pwd, String ip, String port, int countNum,
513                         String nename) {
514                 StringBuilder strBuffer = new StringBuilder();
515                 strBuffer.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
516                                 + "<FILE_DATA_READY_UL xmlns:xsi=\" http://www.w3.org/2001/XMLSchema-instance\">"
517                                 + "<Header SessionID=\"");
518                 strBuffer.append("");
519                 strBuffer.append("\" LicenceID=\"");
520                 strBuffer.append("");
521                 strBuffer.append("\" SystemID=\"");
522                 strBuffer.append("");
523                 strBuffer.append("\" Time=\"");
524                 strBuffer.append(dateFormat2.format(new Date()));
525                 strBuffer.append("\" PolicyID=\"");
526                 strBuffer.append("");
527                 strBuffer.append("\"/><Body>");
528                 strBuffer.append("<DataCatalog>");
529                 strBuffer.append("");
530                 strBuffer.append("</DataCatalog><GroupID>");
531                 strBuffer.append(nename);
532                 strBuffer.append("</GroupID><DataSourceName>");
533                 strBuffer.append("");
534                 strBuffer.append("</DataSourceName><InstanceID>");
535                 strBuffer.append("");
536                 strBuffer.append("</InstanceID><FileFormat>");
537                 strBuffer.append("csv");
538                 strBuffer.append("</FileFormat><CharSet>");
539                 strBuffer.append("gbk");
540                 strBuffer.append("</CharSet><FieldSeparator>");
541                 strBuffer.append("|");
542                 strBuffer.append("</FieldSeparator><IsCompressed>");
543                 strBuffer.append("true");
544                 strBuffer.append("</IsCompressed><StartTime>");
545                 strBuffer.append(dateFormat2.format(new Date()));
546                 strBuffer.append("</StartTime><EndTime>");
547                 strBuffer.append("");
548                 strBuffer.append("</EndTime><FileList>");
549                 strBuffer.append(zipName);
550                 strBuffer.append("</FileList><ConnectionString>");
551                 strBuffer.append("ftp://" + user + ":" + pwd + "@" + ip + ":" + port);
552                 strBuffer.append("</ConnectionString>");
553                 strBuffer.append("<DataCount>");
554                 strBuffer.append(countNum);
555                 strBuffer.append("</DataCount>");
556
557                 strBuffer.append("<FileSize>").append("").append("</FileSize>");
558                 strBuffer.append("<DataGranularity>").append("").append("</DataGranularity>");
559
560                 strBuffer.append("</Body></FILE_DATA_READY_UL>");
561                 return strBuffer.toString();
562
563         }
564
565         private void ftpStore(String[] fileKeys, String ip, String port, String ftp_user, String ftp_password,
566                         String ftp_passive, String ftp_type, String remoteFile) {
567                 String zipFilePath = fileKeys[0];
568
569                 FTPInterface ftpClient;
570                 ftpClient = new FTPSrv();
571                 // login
572                 try {
573                         ftpClient.login(ip, Integer.parseInt(port), ftp_user, ftp_password, "GBK",
574                                         Boolean.parseBoolean(ftp_passive), 5 * 60 * 1000);
575                 } catch (Exception e) {
576                         log.error("login fail,ip=[" + ip + "] port=[" + port + "] user=[" + ftp_user + /*"]pwd=[" + ftp_password + */"]"
577                                         + StringUtil.getStackTrace(e));
578                         return;
579                 }
580                 log.debug("store  [" + zipFilePath + "]to[" + remoteFile + "]");
581
582                 FileUtils.deleteQuietly(new File(zipFilePath));
583
584         }
585
586         private String[] createZipFile(String csvpathAndFileName, String xmlPathAndFileName, String nename)
587                         throws IOException {
588                 String zipPath = resultPath + nename + dateFormat.format(new Date()) + "_" + System.nanoTime();
589
590                 File destDir = new File(zipPath);
591                 destDir.mkdirs();
592                 try {
593                         FileUtils.copyFileToDirectory(new File(csvpathAndFileName), destDir);
594                         FileUtils.copyFileToDirectory(new File(xmlPathAndFileName), destDir);
595                 } catch (IOException e) {
596                         throw new IOException("createZipFile",e);
597                 }
598
599                 String destFilePath = zipPath + ".zip";
600                 try {
601                         Zip zip = new Zip(destDir.getAbsolutePath(), destFilePath);
602                         zip.setCompressLevel(9);
603                         zip.compress();
604
605                         FileUtils.deleteDirectory(destDir);
606                 } catch (IOException e) {
607                         log.error("zip.compress() is fail " + StringUtil.getStackTrace(e));
608                         throw new IOException("createZipFile",e);
609                 }
610                 return new String[] { destFilePath, zipPath + ".zip" };
611         }
612
613         private String setColumnNames(String nename, List<String> names, String type) {
614                 // write xml
615                 String xmlpath = localPath + nename + "/" + type + "/";
616                 File xmlpathfile = new File(xmlpath);
617                 if (!xmlpathfile.exists()) {
618                         xmlpathfile.mkdirs();
619                 }
620                 String xmlFileName = nename + dateFormat.format(new Date()) + System.nanoTime();
621                 String fieldLine = "";
622                 for (int i = 0; i < names.size(); i++) {
623                         String field = "\t<Field>\r\n" + "\t\t<FieldNo>" + i + "</FieldNo>\r\n" + "\t\t<FieldName>" + names.get(i)
624                                         + "</FieldName>\r\n" + "\t\t<FieldType>" + names.get(i) + "</FieldType>\r\n"
625                                         + "\t\t<FieldNameOther>" + names.get(i) + "</FieldNameOther>\r\n" + "\t</Field>\r\n";
626                         fieldLine = fieldLine + field;
627                 }
628
629                 String str = "<?xml version=\"1.0\" encoding=\"gbk\"?>\r\n" + "<xml>\r\n" + "<FILE_STRUCTURE>\r\n" + fieldLine
630                                 + "</FILE_STRUCTURE>\r\n" + "</xml>\r\n";
631                 String xmlPathAndFileName = xmlpath + xmlFileName + ".xml";
632                 try {
633                         this.writeDetail(xmlPathAndFileName, str);
634                 } catch (Exception e) {
635                         log.error("writeDetail is fail ,xmlFileName=" + xmlFileName + StringUtil.getStackTrace(e));
636                 }
637
638                 return xmlPathAndFileName;
639         }
640
641         private void writeDetail(String detailFileName, String str) throws IOException {
642                 try (OutputStream readOut = new FileOutputStream(new File(detailFileName), false);
643                                 OutputStreamWriter writer = new OutputStreamWriter(readOut)) {
644                         writer.write(str);
645                         writer.flush();
646                 } catch (IOException e) {
647                         throw new IOException("writeDetail",e);
648                 }
649         }
650
651         private void appendLine(LinkedHashMap<String, String> nameAndValue, BufferedOutputStream bos) {
652                 try{
653                         StringBuilder lineDatas = new StringBuilder();
654                         
655                         for (String key : nameAndValue.keySet()) {
656                                 lineDatas.append(nameAndValue.get(key)).append("|");
657                         }
658
659                         /*for (HashMap.Entry<String, String> entry : nameAndValue.entrySet()) {
660                                 lineDatas.append(entry.getValue()).append("|");
661                         }*/
662                         bos.write(lineDatas.toString().getBytes());
663                         bos.write("\n".getBytes());
664                 } catch (IOException e) {
665                         log.error("appendLine error " + StringUtil.getStackTrace(e));
666                 }
667         }
668
669         public List<File> decompressed(String fileName) {
670                 List<File> filelist = new ArrayList<File>();
671
672                 if (fileName.indexOf(".gz") > 1) {
673                         try {
674                                 File decompressFile = deGz(fileName);
675                                 filelist.add(decompressFile);
676                                 new File(fileName).delete();
677                         } catch (IOException e) {
678                                 log.error("decompressed is fail " + StringUtil.getStackTrace(e));
679                         }
680                 } else if (fileName.indexOf(".zip") > 1) {
681                         try {
682                                 File[] files = deZip(new File(fileName));
683                                 new File(fileName).delete();
684                                 for (File temp : files) {
685                                         filelist.add(temp);
686                                 }
687                         } catch (Exception e) {
688                                 log.error("decompressed is fail " + StringUtil.getStackTrace(e));
689                         }
690                 } else {
691                         filelist.add(new File(fileName));
692                 }
693
694                 return filelist;
695         }
696
697         private File deGz(String gzFileName) throws IOException {
698                 Gunzip gunzip = new Gunzip();
699                 String orgFile = gzFileName.replace(".gz", "");
700                 gunzip.unCompress(gzFileName, orgFile);
701                 return new File(orgFile);
702         }
703
704         public File[] deZip(File file) throws IOException {
705
706                 String regx = "(.*).zip";
707                 Pattern p = Pattern.compile(regx);
708                 Matcher m = p.matcher(file.getName());
709                 if (m.find()) {
710                         String orgFile = localPath + m.group(1) + "/";
711                         UnZip unzip = new UnZip(file.getAbsolutePath(), orgFile);
712                         unzip.deCompress();
713                         file = new File(orgFile);
714                 }
715                 File[] files = file.listFiles();
716
717                 return files;
718
719         }
720
721         private List<String> ftpDownload(CollectVo collectVo) {
722
723                 List<String> fileList = new ArrayList<String>();
724                 // IP
725                 String ip = collectVo.getIP();
726                 // port
727                 String port = collectVo.getPort();
728                 // user
729                 String user = collectVo.getUser();
730                 // password
731                 String password = collectVo.getPassword();
732                 // isPassiveMode
733                 String passivemode = collectVo.getPassive();
734
735                 FTPInterface ftpClient = new FTPSrv();
736
737                 // login
738                 try {
739                         log.info("ftp login ,ip=[" + ip + "] port=[" + port + "] user=[" + user + /*"]password=[" + password +*/ "]");
740                         ftpClient.login(ip, Integer.parseInt(port), user, password, "GBK", Boolean.parseBoolean(passivemode),
741                                         5 * 60 * 1000);
742                 } catch (Exception e) {
743                         log.error("login fail,ip=[" + ip + "] port=[" + port + "] user=[" + user + /*"]password=[" + password +*/ "]"
744                                         + StringUtil.getStackTrace(e));
745                         return fileList;
746                 }
747
748                 // download
749                 String dir = collectVo.getRemotepath();
750                 List<String> searchExprList = new ArrayList<String>();
751                 String[] FPath = dir.split(";");
752                 for (int i = 0; i < FPath.length; i++) {
753                         int oldSize = searchExprList.size();
754                         String conpath = FPath[i] + collectVo.getMatch();
755                         HashMap<String, String> varMap = new HashMap<>();
756                         long collectPeriod = 900;
757                         try {
758                                 collectPeriod = Long.parseLong(collectVo.getGranularity()) * 60;
759                                 log.info("collectPeriod =[" + collectPeriod + "]");
760                         } catch (NumberFormatException e) {
761                                 log.error("NumberFormatException" ,e);
762                         }
763                         long[] d = DateUtil.getScanScope(new Date(), collectPeriod);
764                         searchExprList.add(VarExprParser.replaceVar(conpath, d[0], d[1]));
765
766                         varMap.clear();
767                         log.info("[" + conpath + "] result[" + (searchExprList.size() - oldSize) + "] path");
768                 }
769                 String nowdir = null;
770                 try {
771                         nowdir = ftpClient.pwd();
772                         searchExprList = getPathNoRegular(searchExprList, ftpClient);
773                 } catch (Exception e1) {
774                         log.error(" collect fail ", e1);
775                         return fileList;
776                 }
777                 List<AFtpRemoteFile> remoteFiles = new ArrayList<AFtpRemoteFile>();
778                 for (String expr : searchExprList) {
779                         ftpClient.chdir(nowdir);
780                         String keys[] = parseExprKeys(expr);
781                         String ftpRegular = keys[1];
782                         String ftpDir = keys[0];
783
784                         boolean cdsucess = ftpClient.chdir(ftpDir);
785                         if (cdsucess) {
786                                 AFtpRemoteFile[] arf = (AFtpRemoteFile[]) ftpClient.list();
787                                 log.info(" list [" + ftpDir + "] result[" + (arf == null ? "null" : arf.length) + "] files");
788                                 // filter
789
790                                 rfileFilter(remoteFiles, arf, ftpRegular);
791
792                                 for (AFtpRemoteFile ftpRemoteFile : remoteFiles) {
793                                         if (!new File(localPath).exists()) {
794                                                 try {
795                                                         new File(localPath).mkdir();
796                                                 } catch (Exception e) {
797                                                         log.error("create localPath is fail localPath=" + localPath + " "
798                                                                         + StringUtil.getStackTrace(e));
799                                                 }
800                                         }
801
802                                         if (!new File(localPath).exists()) {
803                                                 new File(localPath).mkdirs();
804                                         }
805
806                                         String localFileName = localPath + ftpRemoteFile.getFileName();
807                                         if(new File(localFileName).exists()){
808                                                 new File(localFileName).delete();
809                                         }
810
811                                         boolean flag = ftpClient.downloadFile(ftpRemoteFile.getAbsFileName(), localFileName);
812
813                                         if (flag) {
814                                                 fileList.add(localFileName);
815                                         } else {
816                                                 log.error("download file fail fileName=" + ftpRemoteFile.getAbsFileName());
817                                         }
818                                 }
819
820                         } else {
821                                 log.error("cd dir is faill dir =[" + ftpDir + "]");
822                         }
823                 }
824
825                 return fileList;
826         }
827
828         private void rfileFilter(List<AFtpRemoteFile> fileContainer, AFtpRemoteFile[] arfs, String ftpRegular) {
829                 if (ftpRegular != null && ftpRegular.length() > 0) {
830                         Pattern pattern = null;
831                         try {
832                                 pattern = Pattern.compile(ftpRegular, Pattern.CASE_INSENSITIVE);
833                         } catch (Exception e) {
834                                 log.info("[" + ftpRegular + "]Pattern.compile exception:",e);
835                                 // should rethrow exception or return from here
836                         }
837                         int hisSize = fileContainer.size();
838                         for (int j = 0; arfs != null && j < arfs.length; j++) {
839                                 String fileName = parseFileName(arfs[j].getFileName());
840                                 Matcher matcher = null;
841                                 if (pattern != null)
842                                         matcher = pattern.matcher(fileName);
843                                 else {
844                                         // define the flow when pattern is null
845                                 }
846
847                                 if (null != matcher &&  matcher.find())
848                                         fileContainer.add(arfs[j]);
849                         }
850                         log.info("[" + ftpRegular + "]filter[" + (fileContainer.size() - hisSize) + "]filse");
851                 } else {
852                         for (int j = 0; arfs != null && j < arfs.length; j++)
853                                 fileContainer.add(arfs[j]);
854                 }
855
856         }
857
858         private String parseFileName(String fileName) {
859                 int idx = fileName.lastIndexOf("/");
860                 if (idx == -1)
861                         return fileName;
862                 return fileName.substring(idx + 1, fileName.length());
863         }
864
865         private String[] parseExprKeys(String source) {
866
867                 if (source.indexOf(";") > -1) {
868                         source = source.substring(0, source.indexOf(";"));
869                 }
870                 if (source.endsWith("/"))
871                         return new String[] { source, "" };
872
873                 int idx = source.lastIndexOf("/");
874                 String[] dirkeys = new String[2];
875                 dirkeys[0] = source.substring(0, idx + 1);
876                 dirkeys[1] = source.substring(idx + 1, source.length());
877                 return dirkeys;
878         }
879
880         public List<String> getPathNoRegular(List<String> searchExprList, FTPInterface ftpCache) throws IOException {
881                 boolean isregular = false;
882                 List<String> regularList = new ArrayList<String>();
883                 for (String regular : searchExprList) {
884                         Pattern lpattern = null;
885                         try {
886                                 lpattern = Pattern.compile("(.*/)<([^/]+)>(/.*)");
887                         } catch (Exception e) {
888                                 log.error("[" + regular + "]compile fails:" + e.getMessage());
889                                 throw new IOException("getPathNoRegular", e);
890                         }
891                         Matcher matcher = null;
892                         if (lpattern != null)
893                                 matcher = lpattern.matcher(regular);
894                         else {
895                                 // define flow in case lpattern is null
896                         }
897
898                         if (null != matcher && matcher.find()) {
899                                 isregular = true;
900                                 String parpath = matcher.group(1);
901                                 try {
902                                         boolean isin = ftpCache.chdir(parpath);
903                                         if (isin) {
904                                                 log.info("cd dir [" + parpath + "] sucess");
905                                         } else {
906                                                 log.error("cd dir [" + parpath + "] fail");
907                                         }
908                                 } catch (Exception e) {
909                                         log.error(" cd dir [" + parpath + "]fail", e);
910                                         throw new IOException("ftpCache.chdir",e);
911                                 }
912                                 RemoteFile[] remotef = ftpCache.list();
913                                 for (RemoteFile aremote : remotef) {
914                                         if (aremote.isDirectory() && aremote.getFileName().matches(matcher.group(2))) {
915                                                 regularList.add(matcher.group(1) + aremote.getFileName() + matcher.group(3));
916                                         }
917                                 }
918                         } else {
919                                 regularList.add(regular);
920                         }
921                 }
922                 if (isregular == true) {
923                         getPathNoRegular(regularList, ftpCache);
924                 }
925                 return regularList;
926         }
927         
928         public MessageChannel getPmResultChannel() {
929                 return pmResultChannel;
930         }
931
932         public void setPmResultChannel(MessageChannel pmResChannel) {
933                 this.pmResultChannel = pmResChannel;
934         }
935 }