Modify emsdriver Code
[vfc/nfvo/driver/ems.git] / ems / boco / src / main / java / org / onap / vfc / nfvo / emsdriver / collector / TaskThread.java
1 /**
2  * Copyright 2017 BOCO Corporation.  CMCC Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onap.vfc.nfvo.emsdriver.collector;
17
18 import java.io.BufferedOutputStream;
19 import java.io.BufferedReader;
20 import java.io.File;
21 import java.io.FileInputStream;
22 import java.io.FileNotFoundException;
23 import java.io.FileOutputStream;
24 import java.io.IOException;
25 import java.io.InputStreamReader;
26 import java.io.OutputStream;
27 import java.io.OutputStreamWriter;
28 import java.text.SimpleDateFormat;
29 import java.util.ArrayList;
30 import java.util.Date;
31 import java.util.HashMap;
32 import java.util.Hashtable;
33 import java.util.LinkedHashMap;
34 import java.util.List;
35 import java.util.Properties;
36 import java.util.regex.Matcher;
37 import java.util.regex.Pattern;
38
39 import javax.xml.stream.XMLInputFactory;
40 import javax.xml.stream.XMLStreamConstants;
41 import javax.xml.stream.XMLStreamReader;
42
43 import org.apache.commons.io.FileUtils;
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46 import org.onap.vfc.nfvo.emsdriver.commons.constant.Constant;
47 import org.onap.vfc.nfvo.emsdriver.commons.ftp.AFtpRemoteFile;
48 import org.onap.vfc.nfvo.emsdriver.commons.ftp.FTPInterface;
49 import org.onap.vfc.nfvo.emsdriver.commons.ftp.FTPSrv;
50 import org.onap.vfc.nfvo.emsdriver.commons.ftp.RemoteFile;
51 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectMsg;
52 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectVo;
53 import org.onap.vfc.nfvo.emsdriver.commons.utils.DateUtil;
54 import org.onap.vfc.nfvo.emsdriver.commons.utils.Gunzip;
55 import org.onap.vfc.nfvo.emsdriver.commons.utils.StringUtil;
56 import org.onap.vfc.nfvo.emsdriver.commons.utils.UnZip;
57 import org.onap.vfc.nfvo.emsdriver.commons.utils.VarExprParser;
58 import org.onap.vfc.nfvo.emsdriver.commons.utils.Zip;
59 import org.onap.vfc.nfvo.emsdriver.configmgr.ConfigurationImp;
60 import org.onap.vfc.nfvo.emsdriver.configmgr.ConfigurationInterface;
61 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannel;
62 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannelFactory;
63
64
65 public class TaskThread implements Runnable{
66         
67         public  Log log = LogFactory.getLog(TaskThread.class);
68         
69         private MessageChannel cmResultChannel;
70         public MessageChannel pmResultChannel;
71         
72         private CollectMsg data;
73         
74         private ConfigurationInterface configurationInterface = new ConfigurationImp();
75         
76         private String localPath = Constant.SYS_DATA_TEMP;
77         private String resultPath = Constant.SYS_DATA_RESULT;
78         
79         private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
80         
81         private SimpleDateFormat dateFormat2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
82         
83 //      private String csvpathAndFileName;
84 //      private String xmlPathAndFileName;
85 //      private int countNum = 0 ;
86         
87         public TaskThread(CollectMsg data) {
88                 this.data = data;
89         }
90         public TaskThread() {
91                 super();
92         }
93
94         @Override
95         public void run(){
96                 
97                 cmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_CHANNEL_KEY);
98                 pmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_PM_CHANNEL_KEY);
99                 try {
100                         collectMsgHandle(data);
101                 } catch (Exception e) {
102                         log.error("",e);
103                 }
104         }
105
106         private void collectMsgHandle(CollectMsg collectMsg) {
107                 String emsName = collectMsg.getEmsName();
108                 String type = collectMsg.getType();
109                 CollectVo collectVo = configurationInterface.getCollectVoByEmsNameAndType(emsName, type);
110                 
111                 //ftp download 
112                 List<String> downloadfiles = this.ftpDownload(collectVo);
113                 //paser ftp update message send
114                 for(String fileName :downloadfiles){
115                         this.parseFtpAndSendMessage(fileName,collectVo);
116                 }
117         }
118
119         public void parseFtpAndSendMessage(String fileName, CollectVo collectVo) {
120                 //
121                 List<File> filelist = decompressed(fileName);
122                 
123                 for (File tempfile : filelist) { 
124                 
125                         String unfileName = tempfile.getName();
126                         
127                         Pattern pa = Pattern.compile(".*-(.*)-\\w{2}-");
128                         Matcher ma = pa.matcher(unfileName);
129                         if (!ma.find())
130                           continue;
131                         String nename = ma.group(1);
132                         boolean parseResult = false;
133                         if(Constant.COLLECT_TYPE_CM.equalsIgnoreCase(collectVo.getType())){
134                                 parseResult = processCMXml(tempfile, nename,"CM");
135                         }else{
136                                 if(unfileName.indexOf(".csv") > 0){
137                                         parseResult = processPMCsv(tempfile);
138                                 }else{
139                                         parseResult = processPMXml(tempfile);
140                                 }
141                         }
142                         
143                         if (parseResult){
144                                 log.info("parser "+tempfile+" sucess");
145                         }else {
146                                 log.info("parser "+tempfile+" fail");
147                         }
148                         
149                 }
150         }
151         
152         public boolean processPMXml(File file) {
153
154                 FileInputStream fis = null;
155                 InputStreamReader isr = null;
156                 XMLStreamReader reader = null;
157                 try {
158                          fis = new FileInputStream(file);
159                          isr = new InputStreamReader(fis, Constant.ENCODING_UTF8);
160
161                         XMLInputFactory fac = XMLInputFactory.newInstance();
162                          reader = fac.createXMLStreamReader(isr);
163                          
164                         boolean fileHeaderStart = false;
165                         boolean measurementStart = false;
166                         boolean pmNameFlag = false;
167                         boolean pmDataFlag = false;
168                         boolean objectFlag = true;
169
170                         int index = -1;
171                         int nameIndex = -1;
172                         String currentMea = null;
173                         String subName = null;
174                         String localName = null;
175                         String endLocalName = null;
176                         String objectType = null;
177
178                         
179                         LinkedHashMap<String, String> commonNameAndValue = new LinkedHashMap<String, String>();
180                         LinkedHashMap<String, String> pmDatas = null;
181                         LinkedHashMap<Integer, String> pmNames = null;
182
183
184                         int event = -1;
185                         while (reader.hasNext()) {
186                                 try{
187                                         event = reader.next();
188         
189                                         switch (event) {
190                                         case XMLStreamConstants.START_ELEMENT:
191                                                 localName = reader.getLocalName();
192                                                 if ("FileHeader".equalsIgnoreCase(localName)) {
193                                                         fileHeaderStart = true;
194                                                 }
195                                                 if(fileHeaderStart){
196                                                         if(!"FileHeader".equalsIgnoreCase(localName)){
197                                                                 commonNameAndValue.put(localName,reader.getElementText().trim());
198                                                         }
199                                                         
200                                                 }
201                                                 if ("Measurements".equalsIgnoreCase(localName)) {
202                                                         // a new Measurement starts
203                                                         measurementStart = true; 
204                                                 }
205                                                 if (measurementStart) {
206                                                         // measurement handler
207                                                         if ("ObjectType".equalsIgnoreCase(localName)) {
208                                                                 objectType = reader.getElementText().trim();
209                                                                 commonNameAndValue.put("ObjectType",objectType);
210                                                         }
211                                                         if ("PmName".equalsIgnoreCase(localName)) {
212                                                                 pmNameFlag = true;
213                                                                 pmNames = new LinkedHashMap<Integer, String>();
214         
215                                                         }
216                                                         if (pmNameFlag) {
217                                                                 // pmname handler, add columnNames
218                                                                 if ("N".equalsIgnoreCase(localName)) {
219                                                                         nameIndex = Integer.parseInt(getXMLAttribute(reader, "i"));
220                                                                         String text = reader.getElementText().trim();
221                                                                         pmNames.put(nameIndex, text);
222                                                                 }
223                                                         }
224                                                         if ("PmData".equalsIgnoreCase(localName)) {
225                                                                 pmDataFlag = true;
226                                                                 pmDatas = new LinkedHashMap<String, String>();
227                                                         }
228                                                         
229                                                         if (pmDataFlag) {
230                                                                 // pmdata handler
231                                                                 if ("Object".equalsIgnoreCase(localName)) {
232                                                                         objectFlag = true;
233                                                                         int n = reader.getAttributeCount();
234                                                                         for(int i = 0; i < n; i++) {
235                                                                                 String name = reader.getAttributeLocalName(i);
236                                                                                 commonNameAndValue.put(name, reader.getAttributeValue(i));
237                                                                         }
238                                                                 }
239                                                                 if (objectFlag) {
240         
241                                                                         // add columnValues
242                                                                         if ("V".equalsIgnoreCase(localName)) {
243                                                                                 String indexStr = getXMLAttribute(reader, "i");
244                                                                                 if(indexStr == null) {
245                                                                                         log.error("ERROR: illegal value index");
246                                                                                         continue;
247                                                                                 }
248                                                                                 index = Integer.parseInt(indexStr);
249                                                                                 String name = pmNames.get(index);
250                                                                                 if(name == null){
251                                                                                         log.error("illegal data: valueIndex="+index);
252                                                                                         continue;
253                                                                                 }
254                                                                                 
255                                                                                 String value = reader.getElementText().trim();
256                                                                                 pmDatas.put(name, value);
257                                                                         }
258                                                                         if ("CV".equalsIgnoreCase(localName)) {
259                                                                         
260                                                                                 String indexStr = getXMLAttribute(reader, "i");
261                                                                                 if(indexStr == null) {
262                                                                                         log.error("ERROR: illegal value index");
263                                                                                         continue;
264                                                                                 }
265                                                                                 index = Integer.parseInt(indexStr);
266                                                                                 
267                                                                                 currentMea = pmNames.get(index);
268                                                                                 if(currentMea == null){
269                                                                                         log.error("illegal data: valueIndex="+index);
270                                                                                         continue;
271                                                                                 }
272                                                                         }
273         
274                                                                         if ("SN".equalsIgnoreCase(localName)) {
275                                                                                 subName = reader.getElementText().trim();
276                                                                                 
277                                                                         }
278                                                                         if ("SV".equalsIgnoreCase(localName)) {
279                                                                                 String subValue = reader.getElementText().trim();
280 //                                                                              pmDatas.put(currentMea+subName, subValue);
281                                                                                 pmDatas.put(subName, subValue);
282                                                                         }
283                                                                 }
284                                                         }
285         
286                                                 }       
287         
288                                                 break;
289                                         case XMLStreamConstants.CHARACTERS:
290                                                 // ...
291                                                 break;
292                                         case XMLStreamConstants.END_ELEMENT:
293                                                 // ...
294                                                 endLocalName = reader.getLocalName();
295                                                 if("Object".equalsIgnoreCase(endLocalName)){
296                                                         objectFlag = false;
297                                                         pmDatas.putAll(commonNameAndValue);
298                                                         try {
299                                                                 pmResultChannel.put(pmDatas);
300                                                                 
301                                                         } catch (InterruptedException e) {
302                                                                 log.error("collectResultChannel.put(resultMap) error ",e);
303                                                         }
304 //                                                      System.out.println(pmDatas);
305 //                                                      pmDatas.clear();
306                                 }
307                                                 if (endLocalName.equalsIgnoreCase("PmData")) {
308                                                         pmDataFlag = false;
309                                                 }
310         
311                                                 if (endLocalName.equalsIgnoreCase("PmName")) {
312                                                         pmNameFlag = false;
313                                                 }
314                                                 if (endLocalName.equalsIgnoreCase("Measurements")) {
315                                                         // a measurement over
316                                                         measurementStart = false;
317                                                 }
318                                                 
319                                                 if("FileHeader".equalsIgnoreCase(endLocalName)){
320                                     fileHeaderStart = false;
321                                 }
322                                                 break;
323                                         }
324                                 }catch (Exception e){
325                                         log.error("",e);
326                                         event = reader.next();
327                                 }
328                         }
329                         
330                 } catch (Exception e) {
331                         log.error("processPMXml is Exception ",e);
332                         return false;
333                 } finally{
334                         try{
335                                 if(reader != null)reader.close();
336                                 if(isr!= null)isr.close();
337                                 if(fis != null)fis.close();
338                         }catch (Exception e) {
339                                 e.printStackTrace();
340                         }
341                 }
342                 return true;
343         }
344                 
345         private String getXMLAttribute(XMLStreamReader reader, String obj) {
346                 String res = null;
347                 if(obj == null || reader == null){
348                         return res;
349                 }
350                 int n = reader.getAttributeCount();
351                 for(int i = 0; i < n; i++) {
352                         String name = reader.getAttributeLocalName(i);
353                         if(obj.equalsIgnoreCase(name)){
354                                 res = reader.getAttributeValue(i);
355                         }
356                 }
357                 return res;
358         }
359         
360         public boolean processPMCsv(File tempfile) {
361                 
362                 FileInputStream brs = null;
363                 InputStreamReader isr = null;
364                 BufferedReader br = null;
365                 
366                 List<String> columnNames = new ArrayList<String>();
367                 List<String> commonValues = new ArrayList<String>();
368                 try {
369                         
370                         brs = new FileInputStream(tempfile);
371                         isr = new InputStreamReader(brs, Constant.ENCODING_UTF8);
372                         br = new BufferedReader(isr);
373                         //common field
374                         String commonField = br.readLine();
375                         String[] fields = commonField.split("\\|",-1);
376                         for(String com : fields){
377                                 String[] comNameAndValue = com.split("=",2);
378                                 columnNames.add(comNameAndValue[0].trim());
379                                 commonValues.add(comNameAndValue[1]);
380                         }
381                         //column names
382                         String columnName = br.readLine();
383                         String[] names = columnName.split("\\|",-1);
384                         for(String name : names){
385                                 columnNames.add(name);
386                         }
387
388                         String valueLine = "";
389                         List<String> valuelist = new ArrayList<String>();
390                         
391                         while ((valueLine = br.readLine()) != null) {
392                                 if (valueLine.trim().equals("")) {
393                                         continue;
394                                 }
395 //                              countNum ++;
396                                 String [] values = valueLine.split("\\|",-1);
397                                 
398                                 valuelist.addAll(commonValues);
399                                 for(String value : values){
400                                         valuelist.add(value);
401                                 }
402 //                              this.appendLine(valuelist, bos);
403                                 //resultMap
404                                 HashMap<String,String> resultMap = this.resultMap(columnNames,valuelist);
405                                 try {
406                                         pmResultChannel.put(resultMap);
407                                 } catch (InterruptedException e) {
408                                         log.error("collectResultChannel.put(resultMap) error ",e);
409                                 }
410                                 valuelist.clear();
411                         }
412                 } catch (IOException e) {
413                         log.error("processPMCsv is fail ",e);
414                         return false;
415                 }finally{
416                         try{
417                                 if (br != null)
418                                         br.close();
419                                 if (isr != null)
420                                         isr.close();
421                                 if (brs != null)
422                                         brs.close();
423                                 
424                         } catch (Exception e){
425                                 log.error(e);
426                         }
427                 }
428                 return true;
429                 
430         }
431
432         private HashMap<String,String> resultMap(List<String> columnNames, List<String> valuelist) {
433                 
434                 HashMap<String,String>  resultMap = new HashMap<String,String>();
435                 if(columnNames.size() == valuelist.size()){
436                         for(int i =0;i<columnNames.size();i++){
437                                 resultMap.put(columnNames.get(i), valuelist.get(i));
438                         }
439                 }
440                 
441                 return resultMap;
442                 
443         }
444         private boolean processCMXml(File tempfile, String nename, String type) {
445                 
446                 String csvpath = localPath+nename+"/"+type+"/";
447                 File csvpathfile = new File(csvpath);
448                 if(!csvpathfile.exists()){
449                         csvpathfile.mkdirs();
450                 }
451                 String csvFileName = nename +dateFormat.format(new Date())+  System.nanoTime();
452                 String csvpathAndFileName = csvpath+csvFileName+".csv";
453                 BufferedOutputStream  bos = null;
454                 FileOutputStream fos = null;
455                 try {
456                         fos = new FileOutputStream(csvpathAndFileName,false);
457                         bos = new BufferedOutputStream(fos, 10240);
458                 } catch (FileNotFoundException e1) {
459                         log.error("FileNotFoundException "+StringUtil.getStackTrace(e1));
460                 }
461                 
462                 boolean FieldNameFlag = false;
463                 boolean FieldValueFlag = false;
464                 //line num
465                 int countNum = 0;
466                 String xmlPathAndFileName = null;
467                 String localName = null;
468                 String endLocalName = null;
469                 String rmUID = null;
470                 int index = -1;
471                 ArrayList<String> names = new ArrayList<String>();// colname
472                 LinkedHashMap<String, String> nameAndValue = new LinkedHashMap<String, String>();
473
474                 
475                 FileInputStream fis = null;
476                 InputStreamReader isr = null;
477                 XMLStreamReader reader = null;
478                 try{
479                         fis = new FileInputStream(tempfile);
480                         isr = new InputStreamReader(fis, Constant.ENCODING_UTF8);
481                         XMLInputFactory fac = XMLInputFactory.newInstance();
482                         reader = fac.createXMLStreamReader(isr);
483                         int event = -1;
484                         boolean setcolum = true;
485                         while (reader.hasNext()){
486                                 try{
487                                         event = reader.next();
488                                         switch (event){
489                                         case XMLStreamConstants.START_ELEMENT:
490                                                 localName = reader.getLocalName();
491                                                 if ("FieldName".equalsIgnoreCase(localName)){
492                                                         FieldNameFlag = true;
493                                                 }
494                                                 if (FieldNameFlag){
495                                                         if ("N".equalsIgnoreCase(localName)){
496                                                                 String colName = reader.getElementText().trim();
497                                                                 names.add(colName);
498                                                         }
499                                                 }
500                                                 if ("FieldValue".equalsIgnoreCase(localName)){
501                                                         FieldValueFlag = true;
502                                                         
503                                                 }
504                                                 if (FieldValueFlag){
505                                                         if(setcolum){
506                                                                 xmlPathAndFileName = this.setColumnNames(nename, names,type);
507                                                                 setcolum = false;
508                                                         }
509                                                         
510                                                         if ("Object".equalsIgnoreCase(localName)){
511                                                                 int ac = reader.getAttributeCount();
512                                                                 for (int i = 0; i < ac; i++){
513                                                                         if ("rmUID".equalsIgnoreCase(reader.getAttributeLocalName(i))){
514                                                                                 rmUID = reader.getAttributeValue(i).trim();
515                                                                         }
516                                                                 }
517                                                                 nameAndValue.put("rmUID", rmUID);
518                                                         }
519                                                         if ("V".equalsIgnoreCase(localName)) {
520                                                                 index = Integer.parseInt(reader
521                                                                                 .getAttributeValue(0)) - 1;
522                                                                 String currentName = names.get(index);
523                                                                 String v = reader.getElementText().trim();
524                                                                 nameAndValue.put(currentName, v);
525                                                         }
526                                                 }
527                                                 break;
528                                         case XMLStreamConstants.CHARACTERS:
529                                                 break;
530                                         case XMLStreamConstants.END_ELEMENT:
531                                                 endLocalName = reader.getLocalName();
532
533                                                 if ("FieldName".equalsIgnoreCase(endLocalName)){
534                                                         FieldNameFlag = false;
535                                                 }
536                                                 if ("FieldValue".equalsIgnoreCase(endLocalName)){
537                                                         FieldValueFlag = false;
538                                                 }
539                                                 if ("Object".equalsIgnoreCase(endLocalName)){
540                                                         countNum ++;
541                                                         this.appendLine(nameAndValue,bos);
542                                                         nameAndValue.clear();
543                                                 }
544                                                 break;
545                                         }
546                                 } catch (Exception e)
547                                 {
548                                         log.error(""+StringUtil.getStackTrace(e));
549                                         event = reader.next();
550                                 }
551                         }
552                         
553                         
554                         if(bos != null){
555                                 bos.close();
556                                 bos = null;
557                         }
558                         if(fos != null){
559                                 fos.close();
560                                 fos = null;
561                         }
562                         
563                         String[] fileKeys = this.createZipFile(csvpathAndFileName,xmlPathAndFileName,nename);
564                         //ftp store
565                         Properties ftpPro = configurationInterface.getProperties();
566                         String ip = ftpPro.getProperty("ftp_ip");
567                         String port = ftpPro.getProperty("ftp_port");
568                         String ftp_user = ftpPro.getProperty("ftp_user");
569                         String ftp_password = ftpPro.getProperty("ftp_password");
570                         
571                         String ftp_passive = ftpPro.getProperty("ftp_passive");
572                         String ftp_type = ftpPro.getProperty("ftp_type");
573                         String remoteFile = ftpPro.getProperty("ftp_remote_path");
574                         this.ftpStore(fileKeys,ip,port,ftp_user,ftp_password,ftp_passive,ftp_type,remoteFile);
575                         //create Message
576                         String message = this.createMessage(fileKeys[1], ftp_user, ftp_password, ip,  port, countNum,nename);
577                         
578                         //set message
579                         this.setMessage(message);
580                 } catch (Exception e){
581                         log.error(""+StringUtil.getStackTrace(e));
582                         return false;
583                 } finally{
584                         try{
585                                 if (reader != null){
586                                         reader.close();
587                                 }
588                                 if (isr != null){
589                                         isr.close();
590                                 }
591                                 if (fis != null){
592                                         fis.close();
593                                 }
594                                 if(bos != null){
595                                         bos.close();
596                                 }
597                                 
598                                 if(fos != null){
599                                         fos.close();
600                                 }
601                         } catch (Exception e){
602                                 log.error(e);
603                         }
604                 }
605                 return true;
606         }
607         
608         private void setMessage(String message) {
609
610                 try {
611                         cmResultChannel.put(message);
612                 } catch (Exception e) {
613                         log.error("collectResultChannel.put(message) is error "+StringUtil.getStackTrace(e));
614                 }
615         }
616
617         public String createMessage(String zipName,String user,String pwd,String ip, String port,int countNum, String nename) {
618
619                 StringBuffer strBuffer = new StringBuffer();
620                 strBuffer
621                                 .append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
622                                                 + "<FILE_DATA_READY_UL xmlns:xsi=\" http://www.w3.org/2001/XMLSchema-instance\">"
623                                                 + "<Header SessionID=\"");
624                 strBuffer.append("");
625                 strBuffer.append("\" LicenceID=\"");
626                 strBuffer.append("");
627                 strBuffer.append("\" SystemID=\"");
628                 strBuffer.append("");
629                 strBuffer.append("\" Time=\"");
630                 strBuffer.append( dateFormat2.format(new Date()));
631                 strBuffer.append("\" PolicyID=\"");
632                 strBuffer.append("");
633                 strBuffer.append("\"/><Body>");
634                 strBuffer.append("<DataCatalog>");
635                 strBuffer.append("");
636                 strBuffer.append("</DataCatalog><GroupID>");
637                 strBuffer.append(nename);
638                 strBuffer.append("</GroupID><DataSourceName>");
639                 strBuffer.append("");
640                 strBuffer.append("</DataSourceName><InstanceID>");
641                 strBuffer.append("");
642                 strBuffer.append("</InstanceID><FileFormat>");
643                 strBuffer.append("csv");
644                 strBuffer.append("</FileFormat><CharSet>");
645                 strBuffer.append("gbk");
646                 strBuffer.append("</CharSet><FieldSeparator>");
647                 strBuffer.append("|");
648                 strBuffer.append("</FieldSeparator><IsCompressed>");
649                 strBuffer.append("true");
650                 strBuffer.append("</IsCompressed><StartTime>");
651                 strBuffer.append(dateFormat2.format(new Date()));
652                 strBuffer.append("</StartTime><EndTime>");
653                 strBuffer.append("");
654                 strBuffer.append("</EndTime><FileList>");
655                 strBuffer.append(zipName);
656                 strBuffer.append("</FileList><ConnectionString>");
657                 strBuffer.append("ftp://" + user + ":" + pwd + "@" + ip + ":" + port);
658                 strBuffer.append("</ConnectionString>");
659                 strBuffer.append("<DataCount>");
660                 strBuffer.append(countNum);
661                 strBuffer.append("</DataCount>");
662                 
663                 strBuffer.append("<FileSize>").append("").append("</FileSize>");
664                 strBuffer.append("<DataGranularity>").append("").append("</DataGranularity>");
665
666                 
667                 strBuffer.append("</Body></FILE_DATA_READY_UL>");
668                 return strBuffer.toString();
669
670         }
671
672         private void ftpStore(String[] fileKeys, String ip, String port, String ftp_user, String ftp_password, 
673                         String ftp_passive, String ftp_type, String remoteFile) {
674                 String zipFilePath = fileKeys[0];
675                 
676                 
677                 FTPInterface ftpClient;
678                 ftpClient = new FTPSrv();
679                 //login
680                 try {
681                         ftpClient.login(ip, Integer.parseInt(port), ftp_user, ftp_password, "GBK", Boolean.parseBoolean(ftp_passive), 5*60*1000);
682                 } catch (Exception e) {
683                         log.error("login fail,ip=["+ip+"] port=["+port+"] user=["+ftp_user+"]pwd=["+ftp_password+"]"+StringUtil.getStackTrace(e));
684                     return;
685                 } 
686 //              ftpClient.store(zipFilePath, remoteFile);
687                 log.debug("store  ["+zipFilePath+"]to["+remoteFile+"]");
688                                                                 
689                 FileUtils.deleteQuietly(new File(zipFilePath));
690                 
691                 
692         }
693
694         private String[] createZipFile(String csvpathAndFileName,String xmlPathAndFileName,String nename) {
695                 
696                 String zipPath = resultPath+nename +dateFormat.format(new Date())+"_"+System.nanoTime();
697                 
698                 File destDir = new File(zipPath);
699                 destDir.mkdirs();
700                 
701                 try {
702                         FileUtils.copyFileToDirectory(new File(csvpathAndFileName), destDir);
703                         FileUtils.copyFileToDirectory(new File(xmlPathAndFileName), destDir);
704                 } catch (IOException e) {
705                         
706                 }
707                 
708                 String destFilePath = zipPath + ".zip";
709                 try {
710                         Zip zip = new Zip(destDir.getAbsolutePath(), destFilePath);
711                         zip.setCompressLevel(9);
712                         zip.compress();
713
714                         FileUtils.deleteDirectory(destDir);
715                 } catch (IOException e) {
716                         log.error("zip.compress() is fail "+StringUtil.getStackTrace(e));
717                 }
718                 return new String[] { destFilePath, zipPath + ".zip"};
719         }
720
721
722         private String setColumnNames(String nename, List<String> names,String type) {
723                 //write xml
724                 String xmlpath = localPath+nename +"/"+type+"/";
725                 File xmlpathfile = new File(xmlpath);
726                 if(!xmlpathfile.exists()){
727                         xmlpathfile.mkdirs();
728                 }
729                 String xmlFileName = nename +dateFormat.format(new Date())+ System.nanoTime();
730                 String fieldLine = "";
731                 for (int i = 0; i < names.size(); i++) {
732                         String field = "\t<Field>\r\n" + "\t\t<FieldNo>" + i
733                                         + "</FieldNo>\r\n" + "\t\t<FieldName>"
734                                         + names.get(i) + "</FieldName>\r\n"
735                                         + "\t\t<FieldType>" + names.get(i)
736                                         + "</FieldType>\r\n"
737                                         + "\t\t<FieldNameOther>" + names.get(i)
738                                         + "</FieldNameOther>\r\n" +
739                                         "\t</Field>\r\n";
740                         fieldLine = fieldLine + field;
741                 }
742
743                 String str = "<?xml version=\"1.0\" encoding=\"gbk\"?>\r\n"
744                                 + "<xml>\r\n" + "<FILE_STRUCTURE>\r\n" + fieldLine
745                                 + "</FILE_STRUCTURE>\r\n" + "</xml>\r\n";
746                 String xmlPathAndFileName = xmlpath+xmlFileName+".xml";
747                 try {
748                         this.writeDetail(xmlPathAndFileName,str);
749                 } catch (Exception e) {
750                         log.error("writeDetail is fail ,xmlFileName="+xmlFileName +StringUtil.getStackTrace(e));
751                 }
752                 
753                 return xmlPathAndFileName;
754         }
755         
756         private void writeDetail(String detailFileName,String str) throws Exception {
757                 OutputStreamWriter writer = null;
758                 OutputStream readOut = null;
759                 try {
760                         readOut = new FileOutputStream(new File(detailFileName), false);
761                         writer = new OutputStreamWriter(readOut);
762                         writer.write(str);
763                         writer.flush();
764                 } finally {
765                         
766                         if(null != writer){
767                                 writer.close();
768                         }
769                         if(readOut != null){
770                                 readOut.close();
771                         }
772                         
773                 }
774
775         }
776         
777
778         private void appendLine(LinkedHashMap<String, String> nameAndValue,BufferedOutputStream  bos) {
779                 StringBuilder lineDatas =  new StringBuilder();
780                 
781                 for (String key : nameAndValue.keySet()) {
782                         lineDatas.append(nameAndValue.get(key)).append("|");
783                 }
784                 try {
785                         bos.write(lineDatas.toString().getBytes());
786                         bos.write("\n".getBytes());
787                 } catch (IOException e) {
788                         log.error("appendLine error "+StringUtil.getStackTrace(e));
789                 }
790         }
791         
792 //      private void appendLine(List<String> values,BufferedOutputStream  bos) {
793 //              StringBuilder lineDatas =  new StringBuilder();
794 //              
795 //              for (String value : values) {
796 //                      lineDatas.append(value).append("|");
797 //              }
798 //              try {
799 //                      bos.write(lineDatas.toString().getBytes());
800 //                      bos.write("\n".getBytes());
801 //              } catch (IOException e) {
802 //                      log.error("appendLine error "+StringUtil.getStackTrace(e));
803 //              }
804 //      }
805
806         public List<File> decompressed(String fileName){
807             List<File> filelist = new ArrayList<File>();
808         
809             if (fileName.indexOf(".gz") > 1)
810             {
811                 try {
812                                 File decompressFile = deGz(fileName);
813                                 filelist.add(decompressFile);
814                         } catch (IOException e) {
815                                 log.error("decompressed is fail "+StringUtil.getStackTrace(e));
816                         }
817             } else if (fileName.indexOf(".zip") > 1)
818             {
819                 try {
820                                 File[] files = deZip(new File(fileName));
821                                 for(File temp :files){
822                                         filelist.add(temp);
823                                 }
824                         } catch (Exception e) {
825                                 log.error("decompressed is fail "+StringUtil.getStackTrace(e));
826                         }
827             }
828             else {
829                 filelist.add(new File(fileName));
830             }
831         
832             return filelist;
833         }
834
835         private File deGz(String gzFileName) throws IOException {
836                 Gunzip gunzip = new Gunzip();
837                 String orgFile = gzFileName.replace(".gz", "");
838                 gunzip.unCompress(gzFileName, orgFile);
839                 return new File(orgFile);
840         }
841
842         public File[] deZip(File file) throws Exception{
843                   
844                 String regx = "(.*).zip";
845               Pattern p = Pattern.compile(regx);
846               Matcher m = p.matcher(file.getName());
847               if (m.find())
848               {
849                 String orgFile = localPath + m.group(1) + "/";
850                 UnZip unzip = new UnZip(file.getAbsolutePath(), orgFile);
851             unzip.deCompress();
852                 file = new File(orgFile);
853               }
854               File[] files = file.listFiles();
855               
856               return files;
857              
858         }
859
860         private List<String> ftpDownload(CollectVo collectVo) {
861                 
862                 List<String> fileList = new ArrayList<String>();
863                 //IP
864                 String ip = collectVo.getIP();
865                 //port
866                 String port = collectVo.getPort();
867                 //user
868                 String user = collectVo.getUser();
869                 //password
870                 String password = collectVo.getPassword();
871                 //isPassiveMode
872                 String passivemode = collectVo.getPassive();
873                 
874                 FTPInterface ftpClient = new FTPSrv();
875                 
876                 //login
877                 try {
878                         log.info("ftp login ,ip=["+ip+"] port=["+port+"] user=["+user+"]password=["+password+"]");
879                         ftpClient.login(ip, Integer.parseInt(port), user, password, "GBK", Boolean.parseBoolean(passivemode), 5*60*1000);
880                 } catch (Exception e) {
881                         log.error("login fail,ip=["+ip+"] port=["+port+"] user=["+user+"]password=["+password+"]"+StringUtil.getStackTrace(e));
882                     return fileList;
883                 } 
884                 
885                 //download
886                 String dir = collectVo.getRemotepath();
887                 List<String> searchExprList = new ArrayList<String>();
888                 String []FPath = dir.split(";");
889                 for(int i=0;i<FPath.length;i++){
890                         int oldSize = searchExprList.size();
891                         String conpath = FPath[i] + collectVo.getMatch();
892                         Hashtable<String,String> varMap = new Hashtable<String,String>();
893                         long collectPeriod = 900;
894                         try {
895                                 collectPeriod = Long.parseLong(collectVo.getGranularity())*60;
896                                 log.info("collectPeriod =["+collectPeriod+"]");
897                         } catch (NumberFormatException e) {
898                                 e.printStackTrace();
899                         }
900                         long[] d = DateUtil.getScanScope(new Date(), collectPeriod);
901                         searchExprList.add(VarExprParser.replaceVar(conpath,d[0],d[1]));
902                         
903                         varMap.clear();
904                         varMap = null;
905                         log.info("["+conpath+"] result["+(searchExprList.size()-oldSize)+"] path");
906                         conpath = null;
907                 }
908                 String nowdir = null;
909                 try {
910                         nowdir = ftpClient.pwd();
911                         searchExprList =getPathNoRegular(searchExprList,ftpClient);
912                 } catch (Exception e1) {
913                         log.error(" collect fail ",e1);
914                         return fileList;
915                 }
916                 List<AFtpRemoteFile> remoteFiles = new ArrayList<AFtpRemoteFile>();
917                 for(String expr :searchExprList){
918                         ftpClient.chdir(nowdir);
919                         String keys[] = parseExprKeys(expr);
920                         String ftpRegular = keys[1];
921                         String ftpDir = keys[0];
922                         
923                         boolean cdsucess = ftpClient.chdir(ftpDir);
924                         if(cdsucess){
925                                 AFtpRemoteFile[] arf = (AFtpRemoteFile[]) ftpClient.list();
926                                 log.info(" list ["+ftpDir+"] result["+(arf==null?"null":arf.length)+"] files");
927                                 //filter
928                                 
929                                 rfileFilter(remoteFiles,arf,ftpRegular);
930                                 
931                                 keys = null;
932                                 ftpRegular=ftpDir = null;
933                                 
934                                 for(AFtpRemoteFile ftpRemoteFile: remoteFiles){
935                                         if(!new File(localPath).exists()){
936                                                 try {
937                                                         new File(localPath).mkdir();
938                                                 } catch (Exception e) {
939                                                         log.error("create localPath is fail localPath="+localPath+" "+StringUtil.getStackTrace(e));
940                                                 }
941                                         }
942                                         
943                                         if(!new File(localPath).exists()){
944                                                 new File(localPath).mkdirs();
945                                         }
946                                         
947                                         String localFileName = localPath + ftpRemoteFile.getFileName();
948                                         File loaclFile = new File(localFileName);
949                                         if (loaclFile.exists()) {
950                                                 loaclFile.delete();
951                                         }
952                                         
953                                         boolean flag = ftpClient.downloadFile(ftpRemoteFile.getAbsFileName(), localFileName);
954                                         
955                                         if(flag){
956                                                 fileList.add(localFileName);
957                                         }else{
958                                                 log.error("download file fail fileName="+ftpRemoteFile.getAbsFileName());
959                                         }
960                                 }
961                                 
962                         }else{
963                                 log.error("cd dir is faill dir =["+ftpDir+"]");
964                         }
965                 }
966                 
967                 
968                 
969                 return fileList;
970         }
971         
972         private void rfileFilter(List<AFtpRemoteFile> fileContainer, AFtpRemoteFile[] arfs, String ftpRegular) {
973                 if (ftpRegular!=null && ftpRegular.length()>0) {
974                         Pattern pattern = null;
975                         try {
976                                 pattern = Pattern.compile(ftpRegular, Pattern.CASE_INSENSITIVE);
977                         } catch (Exception e) {
978                                 log.info("["+ftpRegular+"]Pattern.compile exception:"+e.getMessage());
979                         }
980                         int hisSize = fileContainer.size();
981                         for (int j=0; arfs!=null&&j<arfs.length; j++) {
982                                 String fileName = parseFileName(arfs[j].getFileName());
983                                 Matcher matcher = pattern.matcher(fileName);
984                                 if (matcher.find()) 
985                                         fileContainer.add(arfs[j]);
986                         }
987                         log.info("["+ftpRegular+"]filter["+(fileContainer.size()-hisSize)+"]filse");
988                         pattern = null;
989                 }else {
990                         for (int j=0; arfs!=null&&j<arfs.length; j++) 
991                                 fileContainer.add(arfs[j]);
992                 }
993                 
994         }
995         
996         private String parseFileName(String fileName) {
997                 int idx = fileName.lastIndexOf("/");
998                 if (idx == -1)
999                         return fileName;
1000                 return fileName.substring(idx+1, fileName.length());
1001         }
1002         
1003         private String[] parseExprKeys(String source) {
1004                 
1005                 if(source.indexOf(";") > -1){
1006                         source = source.substring(0, source.indexOf(";"));
1007                 }
1008                 if (source.endsWith("/")) 
1009                         return new String[]{source,""};
1010
1011                 int idx = source.lastIndexOf("/");
1012                 String[] dirkeys = new String[2];
1013                 dirkeys[0] = source.substring(0, idx+1);
1014                 dirkeys[1] = source.substring(idx+1, source.length());
1015                 return dirkeys;
1016         }
1017         
1018         
1019         public  List<String> getPathNoRegular(List<String> searchExprList,FTPInterface ftpCache)  throws Exception{
1020                 boolean isregular = false;
1021                 List<String> regularList = new ArrayList<String>();
1022                 for(String regular : searchExprList){
1023                         Pattern lpattern = null;
1024                         try{
1025                                 lpattern = Pattern.compile("(.*/)<([^/]+)>(/.*)"); 
1026                         }catch (Exception e) {
1027                                 log.error("["+regular+"]compile fails:"+e.getMessage());
1028                                 e.printStackTrace();
1029                         }
1030                         
1031                         Matcher  matcher = lpattern.matcher(regular);
1032                         if(matcher.find()){
1033                                 isregular = true;
1034                                 String parpath  = matcher.group(1);
1035                                 try{
1036                                         boolean isin = ftpCache.chdir(parpath);
1037                                     if(isin){
1038                                         log.info("cd dir [" + parpath + "] sucess");
1039                                     }else{
1040                                         log.error("cd dir [" + parpath + "] fail");
1041                                     }
1042                                 }catch(Exception e){
1043                                         log.error(" cd dir ["+parpath+"]fail",e);
1044                                         throw e;
1045                                 }
1046                                 RemoteFile[] remotef = ftpCache.list();
1047                                 for(RemoteFile aremote :remotef){
1048                                         if(aremote.isDirectory()&&aremote.getFileName().matches(matcher.group(2))){
1049                                                 regularList.add(matcher.group(1)+aremote.getFileName()+matcher.group(3));
1050                                         }       
1051                                 }
1052                         }else{
1053                                 regularList.add(regular);
1054                         }               
1055                 }
1056                 if(isregular==true){
1057                         getPathNoRegular(regularList,ftpCache);
1058                 }
1059                 return regularList;
1060         }
1061         
1062 }