2 * Copyright 2017 BOCO Corporation. CMCC Technologies Co., Ltd
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onap.vfc.nfvo.emsdriver.collector;
18 import java.io.BufferedOutputStream;
19 import java.io.BufferedReader;
21 import java.io.FileInputStream;
22 import java.io.FileNotFoundException;
23 import java.io.FileOutputStream;
24 import java.io.IOException;
25 import java.io.InputStreamReader;
26 import java.io.OutputStream;
27 import java.io.OutputStreamWriter;
28 import java.text.SimpleDateFormat;
29 import java.util.ArrayList;
30 import java.util.Date;
31 import java.util.LinkedHashMap;
32 import java.util.List;
33 import java.util.Properties;
34 import java.util.regex.Matcher;
35 import java.util.regex.Pattern;
37 import javax.xml.stream.XMLInputFactory;
38 import javax.xml.stream.XMLStreamConstants;
39 import javax.xml.stream.XMLStreamReader;
41 import org.apache.commons.io.FileUtils;
42 import org.apache.commons.logging.Log;
43 import org.apache.commons.logging.LogFactory;
44 import org.onap.vfc.nfvo.emsdriver.commons.constant.Constant;
45 import org.onap.vfc.nfvo.emsdriver.commons.ftp.AFtpRemoteFile;
46 import org.onap.vfc.nfvo.emsdriver.commons.ftp.FTPInterface;
47 import org.onap.vfc.nfvo.emsdriver.commons.ftp.FTPSrv;
48 import org.onap.vfc.nfvo.emsdriver.commons.ftp.SFTPSrv;
49 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectMsg;
50 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectVo;
51 import org.onap.vfc.nfvo.emsdriver.commons.utils.Gunzip;
52 import org.onap.vfc.nfvo.emsdriver.commons.utils.StringUtil;
53 import org.onap.vfc.nfvo.emsdriver.commons.utils.UnZip;
54 import org.onap.vfc.nfvo.emsdriver.commons.utils.Zip;
55 import org.onap.vfc.nfvo.emsdriver.configmgr.ConfigurationImp;
56 import org.onap.vfc.nfvo.emsdriver.configmgr.ConfigurationInterface;
57 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannel;
58 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannelFactory;
61 public class TaskThread implements Runnable{
63 public Log log = LogFactory.getLog(TaskThread.class);
65 private MessageChannel collectResultChannel;
67 private CollectMsg data;
69 private ConfigurationInterface configurationInterface = new ConfigurationImp();
71 private String localPath = Constant.SYS_DATA_TEMP;
72 private String resultPath = Constant.SYS_DATA_RESULT;
74 private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
76 private SimpleDateFormat dateFormat2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
78 private String csvpathAndFileName;
79 private String xmlPathAndFileName;
80 private int countNum = 0 ;
82 public TaskThread(CollectMsg data) {
92 collectResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_CHANNEL_KEY);
95 collectMsgHandle(data);
96 } catch (Exception e) {
101 private void collectMsgHandle(CollectMsg collectMsg) {
102 String emsName = collectMsg.getEmsName();
103 String type = collectMsg.getType();
104 CollectVo collectVo = configurationInterface.getCollectVoByEmsNameAndType(emsName, type);
107 List<String> downloadfiles = this.ftpDownload(collectVo);
108 //paser ftp update message send
109 for(String fileName :downloadfiles){
110 this.parseFtpAndSendMessage(fileName,collectVo);
114 private void parseFtpAndSendMessage(String fileName, CollectVo collectVo) {
116 List<File> filelist = decompressed(fileName);
118 for (File tempfile : filelist) {
120 String unfileName = tempfile.getName();
122 Pattern pa = Pattern.compile(".*-(.*)-\\w{2}-");
123 Matcher ma = pa.matcher(unfileName);
126 String nename = ma.group(1);
127 boolean parseResult = false;
128 if("CM".equalsIgnoreCase(collectVo.getType())){
129 parseResult = processCMXml(tempfile, nename,"CM");
131 parseResult = processPMCsv(tempfile, nename,"PM");
134 String[] fileKeys = this.createZipFile(csvpathAndFileName,xmlPathAndFileName,nename);
136 Properties ftpPro = configurationInterface.getProperties();
137 String ip = ftpPro.getProperty("ftp_ip");
138 String port = ftpPro.getProperty("ftp_port");
139 String ftp_user = ftpPro.getProperty("ftp_user");
140 String ftp_password = ftpPro.getProperty("ftp_password");
142 String ftp_passive = ftpPro.getProperty("ftp_passive");
143 String ftp_type = ftpPro.getProperty("ftp_type");
144 String remoteFile = ftpPro.getProperty("ftp_remote_path");
145 this.ftpStore(fileKeys,ip,port,ftp_user,ftp_password,ftp_passive,ftp_type,remoteFile);
147 String message = this.createMessage(fileKeys[1], ftp_user, ftp_password, ip, port, countNum,nename);
150 this.setMessage(message);
154 log.info("parser "+tempfile+" sucess");
156 log.info("parser "+tempfile+" fail");
162 public boolean processPMCsv(File tempfile, String nename,String type) {
164 String csvpath = localPath+nename+"/"+type+"/";
165 File csvpathfile = new File(csvpath);
166 if(!csvpathfile.exists()){
167 csvpathfile.mkdirs();
169 String csvFileName = nename +dateFormat.format(new Date())+ System.nanoTime();
170 csvpathAndFileName = csvpath+csvFileName+".csv";
171 BufferedOutputStream bos = null;
172 FileOutputStream fos = null;
174 fos = new FileOutputStream(csvpathAndFileName,false);
175 bos = new BufferedOutputStream(fos, 10240);
176 } catch (FileNotFoundException e1) {
177 log.error("FileNotFoundException "+StringUtil.getStackTrace(e1));
180 FileInputStream brs = null;
181 InputStreamReader isr = null;
182 BufferedReader br = null;
184 List<String> columnNames = new ArrayList<String>();
185 List<String> commonValues = new ArrayList<String>();
188 brs = new FileInputStream(tempfile);
189 isr = new InputStreamReader(brs, Constant.ENCODING_UTF8);
190 br = new BufferedReader(isr);
192 String commonField = br.readLine();
193 String[] fields = commonField.split("\\|",-1);
194 for(String com : fields){
195 String[] comNameAndValue = com.split("=",2);
196 columnNames.add(comNameAndValue[0].trim());
197 commonValues.add(comNameAndValue[1]);
200 String columnName = br.readLine();
201 String[] names = columnName.split("\\|",-1);
202 for(String name : names){
203 columnNames.add(name);
206 xmlPathAndFileName = this.setColumnNames(nename, columnNames,type);
208 String valueLine = "";
209 List<String> valuelist = new ArrayList<String>();
211 while (br.readLine() != null) {
212 if (valueLine.trim().equals("")) {
216 String [] values = valueLine.split("\\|",-1);
218 valuelist.addAll(commonValues);
219 for(String value : values){
220 valuelist.add(value);
222 this.appendLine(valuelist, bos);
226 } catch (IOException e) {
227 log.error("processPMCsv is fail ",e);
243 } catch (Exception e){
251 private boolean processCMXml(File tempfile, String nename, String type) {
253 String csvpath = localPath+nename+"/"+type+"/";
254 File csvpathfile = new File(csvpath);
255 if(!csvpathfile.exists()){
256 csvpathfile.mkdirs();
258 String csvFileName = nename +dateFormat.format(new Date())+ System.nanoTime();
259 String csvpathAndFileName = csvpath+csvFileName+".csv";
260 BufferedOutputStream bos = null;
261 FileOutputStream fos = null;
263 fos = new FileOutputStream(csvpathAndFileName,false);
264 bos = new BufferedOutputStream(fos, 10240);
265 } catch (FileNotFoundException e1) {
266 log.error("FileNotFoundException "+StringUtil.getStackTrace(e1));
269 boolean FieldNameFlag = false;
270 boolean FieldValueFlag = false;
273 String xmlPathAndFileName = null;
274 String localName = null;
275 String endLocalName = null;
278 ArrayList<String> names = new ArrayList<String>();// colname
279 LinkedHashMap<String, String> nameAndValue = new LinkedHashMap<String, String>();
282 FileInputStream fis = null;
283 InputStreamReader isr = null;
284 XMLStreamReader reader = null;
286 fis = new FileInputStream(tempfile);
287 isr = new InputStreamReader(fis, Constant.ENCODING_UTF8);
288 XMLInputFactory fac = XMLInputFactory.newInstance();
289 reader = fac.createXMLStreamReader(isr);
291 boolean setcolum = true;
292 while (reader.hasNext()){
294 event = reader.next();
296 case XMLStreamConstants.START_ELEMENT:
297 localName = reader.getLocalName();
298 if ("FieldName".equalsIgnoreCase(localName)){
299 FieldNameFlag = true;
302 if ("N".equalsIgnoreCase(localName)){
303 String colName = reader.getElementText().trim();
307 if ("FieldValue".equalsIgnoreCase(localName)){
308 FieldValueFlag = true;
313 xmlPathAndFileName = this.setColumnNames(nename, names,type);
317 if ("Object".equalsIgnoreCase(localName)){
318 int ac = reader.getAttributeCount();
319 for (int i = 0; i < ac; i++){
320 if ("rmUID".equalsIgnoreCase(reader.getAttributeLocalName(i))){
321 rmUID = reader.getAttributeValue(i).trim();
324 nameAndValue.put("rmUID", rmUID);
326 if ("V".equalsIgnoreCase(localName)) {
327 index = Integer.parseInt(reader
328 .getAttributeValue(0)) - 1;
329 String currentName = names.get(index);
330 String v = reader.getElementText().trim();
331 nameAndValue.put(currentName, v);
335 case XMLStreamConstants.CHARACTERS:
337 case XMLStreamConstants.END_ELEMENT:
338 endLocalName = reader.getLocalName();
340 if ("FieldName".equalsIgnoreCase(endLocalName)){
341 FieldNameFlag = false;
343 if ("FieldValue".equalsIgnoreCase(endLocalName)){
344 FieldValueFlag = false;
346 if ("Object".equalsIgnoreCase(endLocalName)){
348 this.appendLine(nameAndValue,bos);
349 nameAndValue.clear();
353 } catch (Exception e)
355 log.error(""+StringUtil.getStackTrace(e));
356 event = reader.next();
370 String[] fileKeys = this.createZipFile(csvpathAndFileName,xmlPathAndFileName,nename);
372 Properties ftpPro = configurationInterface.getProperties();
373 String ip = ftpPro.getProperty("ftp_ip");
374 String port = ftpPro.getProperty("ftp_port");
375 String ftp_user = ftpPro.getProperty("ftp_user");
376 String ftp_password = ftpPro.getProperty("ftp_password");
378 String ftp_passive = ftpPro.getProperty("ftp_passive");
379 String ftp_type = ftpPro.getProperty("ftp_type");
380 String remoteFile = ftpPro.getProperty("ftp_remote_path");
381 this.ftpStore(fileKeys,ip,port,ftp_user,ftp_password,ftp_passive,ftp_type,remoteFile);
383 String message = this.createMessage(fileKeys[1], ftp_user, ftp_password, ip, port, countNum,nename);
386 this.setMessage(message);
387 } catch (Exception e){
388 log.error(""+StringUtil.getStackTrace(e));
408 } catch (Exception e){
415 private void setMessage(String message) {
418 collectResultChannel.put(message);
419 } catch (Exception e) {
420 log.error("collectResultChannel.put(message) is error "+StringUtil.getStackTrace(e));
424 private String createMessage(String zipName,String user,String pwd,String ip, String port,int countNum, String nename) {
426 StringBuffer strBuffer = new StringBuffer();
428 .append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
429 + "<FILE_DATA_READY_UL xmlns:xsi=\" http://www.w3.org/2001/XMLSchema-instance\">"
430 + "<Header SessionID=\"");
431 strBuffer.append("");
432 strBuffer.append("\" LicenceID=\"");
433 strBuffer.append("");
434 strBuffer.append("\" SystemID=\"");
435 strBuffer.append("");
436 strBuffer.append("\" Time=\"");
437 strBuffer.append( dateFormat2.format(new Date()));
438 strBuffer.append("\" PolicyID=\"");
439 strBuffer.append("");
440 strBuffer.append("\"/><Body>");
441 strBuffer.append("<DataCatalog>");
442 strBuffer.append("");
443 strBuffer.append("</DataCatalog><GroupID>");
444 strBuffer.append(nename);
445 strBuffer.append("</GroupID><DataSourceName>");
446 strBuffer.append("");
447 strBuffer.append("</DataSourceName><InstanceID>");
448 strBuffer.append("");
449 strBuffer.append("</InstanceID><FileFormat>");
450 strBuffer.append("csv");
451 strBuffer.append("</FileFormat><CharSet>");
452 strBuffer.append("gbk");
453 strBuffer.append("</CharSet><FieldSeparator>");
454 strBuffer.append("|");
455 strBuffer.append("</FieldSeparator><IsCompressed>");
456 strBuffer.append("true");
457 strBuffer.append("</IsCompressed><StartTime>");
458 strBuffer.append(dateFormat2.format(new Date()));
459 strBuffer.append("</StartTime><EndTime>");
460 strBuffer.append("");
461 strBuffer.append("</EndTime><FileList>");
462 strBuffer.append(zipName);
463 strBuffer.append("</FileList><ConnectionString>");
464 strBuffer.append("ftp://" + user + ":" + pwd + "@" + ip + ":" + port);
465 strBuffer.append("</ConnectionString>");
466 strBuffer.append("<DataCount>");
467 strBuffer.append(countNum);
468 strBuffer.append("</DataCount>");
470 strBuffer.append("<FileSize>").append("").append("</FileSize>");
471 strBuffer.append("<DataGranularity>").append("").append("</DataGranularity>");
474 strBuffer.append("</Body></FILE_DATA_READY_UL>");
475 return strBuffer.toString();
479 private void ftpStore(String[] fileKeys, String ip, String port, String ftp_user, String ftp_password,
480 String ftp_passive, String ftp_type, String remoteFile) {
481 String zipFilePath = fileKeys[0];
484 FTPInterface ftpClient;
485 if("ftp".equalsIgnoreCase(ftp_type)){
486 ftpClient = new FTPSrv();
488 ftpClient = new SFTPSrv();
493 ftpClient.login(ip, Integer.parseInt(port), ftp_user, ftp_password, "GBK", Boolean.parseBoolean(ftp_passive), 5*60*1000);
494 } catch (Exception e) {
495 log.error("login fail,ip=["+ip+"] port=["+port+"] user=["+ftp_user+"]pwd=["+ftp_password+"]"+StringUtil.getStackTrace(e));
498 ftpClient.store(zipFilePath, remoteFile);
499 log.debug("store ["+zipFilePath+"]to["+remoteFile+"]");
501 FileUtils.deleteQuietly(new File(zipFilePath));
506 private String[] createZipFile(String csvpathAndFileName,String xmlPathAndFileName,String nename) {
508 String zipPath = resultPath+nename +dateFormat.format(new Date())+"_"+System.nanoTime();
510 File destDir = new File(zipPath);
514 FileUtils.copyFileToDirectory(new File(csvpathAndFileName), destDir);
515 FileUtils.copyFileToDirectory(new File(xmlPathAndFileName), destDir);
516 } catch (IOException e) {
520 String destFilePath = zipPath + ".zip";
522 Zip zip = new Zip(destDir.getAbsolutePath(), destFilePath);
523 zip.setCompressLevel(9);
526 FileUtils.deleteDirectory(destDir);
527 } catch (IOException e) {
528 log.error("zip.compress() is fail "+StringUtil.getStackTrace(e));
530 return new String[] { destFilePath, zipPath + ".zip"};
534 private String setColumnNames(String nename, List<String> names,String type) {
536 String xmlpath = localPath+nename +"/"+type+"/";
537 File xmlpathfile = new File(xmlpath);
538 if(!xmlpathfile.exists()){
539 xmlpathfile.mkdirs();
541 String xmlFileName = nename +dateFormat.format(new Date())+ System.nanoTime();
542 String fieldLine = "";
543 for (int i = 0; i < names.size(); i++) {
544 String field = "\t<Field>\r\n" + "\t\t<FieldNo>" + i
545 + "</FieldNo>\r\n" + "\t\t<FieldName>"
546 + names.get(i) + "</FieldName>\r\n"
547 + "\t\t<FieldType>" + names.get(i)
549 + "\t\t<FieldNameOther>" + names.get(i)
550 + "</FieldNameOther>\r\n" +
552 fieldLine = fieldLine + field;
555 String str = "<?xml version=\"1.0\" encoding=\"gbk\"?>\r\n"
556 + "<xml>\r\n" + "<FILE_STRUCTURE>\r\n" + fieldLine
557 + "</FILE_STRUCTURE>\r\n" + "</xml>\r\n";
558 String xmlPathAndFileName = xmlpath+xmlFileName+".xml";
560 this.writeDetail(xmlPathAndFileName,str);
561 } catch (Exception e) {
562 log.error("writeDetail is fail ,xmlFileName="+xmlFileName +StringUtil.getStackTrace(e));
565 return xmlPathAndFileName;
568 private void writeDetail(String detailFileName,String str) throws Exception {
569 OutputStreamWriter writer = null;
570 OutputStream readOut = null;
572 readOut = new FileOutputStream(new File(detailFileName), false);
573 writer = new OutputStreamWriter(readOut);
590 private void appendLine(LinkedHashMap<String, String> nameAndValue,BufferedOutputStream bos) {
591 StringBuilder lineDatas = new StringBuilder();
593 for (String key : nameAndValue.keySet()) {
594 lineDatas.append(nameAndValue.get(key)).append("|");
597 bos.write(lineDatas.toString().getBytes());
598 bos.write("\n".getBytes());
599 } catch (IOException e) {
600 log.error("appendLine error "+StringUtil.getStackTrace(e));
604 private void appendLine(List<String> values,BufferedOutputStream bos) {
605 StringBuilder lineDatas = new StringBuilder();
607 for (String value : values) {
608 lineDatas.append(value).append("|");
611 bos.write(lineDatas.toString().getBytes());
612 bos.write("\n".getBytes());
613 } catch (IOException e) {
614 log.error("appendLine error "+StringUtil.getStackTrace(e));
618 public List<File> decompressed(String fileName){
619 List<File> filelist = new ArrayList<File>();
621 if (fileName.indexOf(".gz") > 1)
624 File decompressFile = deGz(fileName);
625 filelist.add(decompressFile);
626 } catch (IOException e) {
627 log.error("decompressed is fail "+StringUtil.getStackTrace(e));
629 } else if (fileName.indexOf(".zip") > 1)
632 File[] files = deZip(new File(fileName));
633 for(File temp :files){
636 } catch (Exception e) {
637 log.error("decompressed is fail "+StringUtil.getStackTrace(e));
641 filelist.add(new File(fileName));
647 private File deGz(String gzFileName) throws IOException {
648 Gunzip gunzip = new Gunzip();
649 String orgFile = gzFileName.replace(".gz", "");
650 gunzip.unCompress(gzFileName, orgFile);
651 return new File(orgFile);
654 public File[] deZip(File file) throws Exception{
656 String regx = "(.*).zip";
657 Pattern p = Pattern.compile(regx);
658 Matcher m = p.matcher(file.getName());
661 String orgFile = localPath + m.group(1) + "/";
662 UnZip unzip = new UnZip(file.getAbsolutePath(), orgFile);
664 file = new File(orgFile);
666 File[] files = file.listFiles();
672 private List<String> ftpDownload(CollectVo collectVo) {
674 List<String> fileList = new ArrayList<String>();
676 String ip = collectVo.getIP();
678 String port = collectVo.getPort();
680 String user = collectVo.getUser();
682 String password = collectVo.getPassword();
684 String passivemode = collectVo.getPassive();
686 String ftpType = collectVo.getFtptype();
687 FTPInterface ftpClient = null;
688 if("ftp".equalsIgnoreCase(ftpType)){
689 ftpClient = new FTPSrv();
691 ftpClient = new SFTPSrv();
696 ftpClient.login(ip, Integer.parseInt(port), user, password, "GBK", Boolean.parseBoolean(passivemode), 5*60*1000);
697 } catch (Exception e) {
698 log.error("login fail,ip=["+ip+"] port=["+port+"] user=["+user+"]password=["+password+"]"+StringUtil.getStackTrace(e));
703 String dir = collectVo.getRemotepath();
704 boolean cdsucess = ftpClient.chdir(dir);
706 AFtpRemoteFile[] remoteFiles = (AFtpRemoteFile[]) ftpClient.list();
708 for(AFtpRemoteFile ftpRemoteFile: remoteFiles){
709 if(!new File(localPath).exists()){
711 new File(localPath).mkdir();
712 } catch (Exception e) {
713 log.error("create localPath is fail localPath="+localPath+" "+StringUtil.getStackTrace(e));
717 if(!new File(localPath).exists()){
718 new File(localPath).mkdirs();
721 String localFileName = localPath + ftpRemoteFile.getFileName();
722 File loaclFile = new File(localFileName);
723 if (loaclFile.exists()) {
727 boolean flag = ftpClient.downloadFile(ftpRemoteFile.getAbsFileName(), localFileName);
730 fileList.add(localFileName);
732 log.error("download file fail fileName="+ftpRemoteFile.getAbsFileName());
737 log.error("chdir is faill dir =["+dir+"]");