2 * Copyright 2017 BOCO Corporation. CMCC Technologies Co., Ltd
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onap.nfvo.emsdriver.collector;
18 import java.io.BufferedOutputStream;
19 import java.io.BufferedReader;
21 import java.io.FileInputStream;
22 import java.io.FileNotFoundException;
23 import java.io.FileOutputStream;
24 import java.io.IOException;
25 import java.io.InputStreamReader;
26 import java.io.OutputStream;
27 import java.io.OutputStreamWriter;
28 import java.text.SimpleDateFormat;
29 import java.util.ArrayList;
30 import java.util.Date;
31 import java.util.LinkedHashMap;
32 import java.util.List;
33 import java.util.Properties;
34 import java.util.regex.Matcher;
35 import java.util.regex.Pattern;
37 import javax.xml.stream.XMLInputFactory;
38 import javax.xml.stream.XMLStreamConstants;
39 import javax.xml.stream.XMLStreamReader;
41 import org.apache.commons.io.FileUtils;
42 import org.apache.commons.logging.Log;
43 import org.apache.commons.logging.LogFactory;
44 import org.onap.nfvo.emsdriver.commons.constant.Constant;
45 import org.onap.nfvo.emsdriver.commons.ftp.AFtpRemoteFile;
46 import org.onap.nfvo.emsdriver.commons.ftp.FTPInterface;
47 import org.onap.nfvo.emsdriver.commons.ftp.FTPSrv;
48 import org.onap.nfvo.emsdriver.commons.ftp.SFTPSrv;
49 import org.onap.nfvo.emsdriver.commons.model.CollectMsg;
50 import org.onap.nfvo.emsdriver.commons.model.CollectVo;
51 import org.onap.nfvo.emsdriver.commons.utils.StringUtil;
52 import org.onap.nfvo.emsdriver.commons.utils.UnZip;
53 import org.onap.nfvo.emsdriver.commons.utils.Zip;
54 import org.onap.nfvo.emsdriver.configmgr.ConfigurationImp;
55 import org.onap.nfvo.emsdriver.configmgr.ConfigurationInterface;
56 import org.onap.nfvo.emsdriver.messagemgr.MessageChannel;
57 import org.onap.nfvo.emsdriver.messagemgr.MessageChannelFactory;
60 public class TaskThread implements Runnable{
62 public Log log = LogFactory.getLog(TaskThread.class);
64 private MessageChannel collectResultChannel;
66 private CollectMsg data;
68 private ConfigurationInterface configurationInterface = new ConfigurationImp();
70 private String localPath = Constant.SYS_DATA_TEMP;
71 private String resultPath = Constant.SYS_DATA_RESULT;
73 private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
75 private SimpleDateFormat dateFormat2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
77 public TaskThread(CollectMsg data) {
84 collectResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_CHANNEL_KEY);
87 collectMsgHandle(data);
88 } catch (Exception e) {
93 private void collectMsgHandle(CollectMsg collectMsg) {
94 String emsName = collectMsg.getEmsName();
95 String type = collectMsg.getType();
96 CollectVo collectVo = configurationInterface.getCollectVoByEmsNameAndType(emsName, type);
99 List<String> downloadfiles = this.ftpDownload(collectVo);
100 //paser ftp update message send
101 for(String fileName :downloadfiles){
102 this.parseFtpAndSendMessage(fileName,collectVo);
106 private void parseFtpAndSendMessage(String fileName, CollectVo collectVo) {
108 List<File> filelist = decompressed(fileName);
110 for (File tempfile : filelist) {
112 String unfileName = tempfile.getName();
114 Pattern pa = Pattern.compile(".*-(.*)-\\w{2}-");
115 Matcher ma = pa.matcher(unfileName);
118 String nename = ma.group(1);
119 boolean parseResult = false;
120 if("CM".equalsIgnoreCase(collectVo.getType())){
121 parseResult = processCMXml(tempfile, nename,"CM");
123 parseResult = processPMCsv(tempfile, nename,"PM");
127 log.info("parser "+tempfile+" sucess");
129 log.info("parser "+tempfile+" fail");
135 private boolean processPMCsv(File tempfile, String nename,String type) {
137 String csvpath = localPath+nename+"/"+type+"/";
138 File csvpathfile = new File(csvpath);
139 if(!csvpathfile.exists()){
140 csvpathfile.mkdirs();
142 String csvFileName = nename +dateFormat.format(new Date())+ System.nanoTime();
143 String csvpathAndFileName = csvpath+csvFileName;
144 BufferedOutputStream bos = null;
145 FileOutputStream fos = null;
147 fos = new FileOutputStream(csvpathAndFileName,false);
148 bos = new BufferedOutputStream(fos, 10240);
149 } catch (FileNotFoundException e1) {
150 log.error("FileNotFoundException "+StringUtil.getStackTrace(e1));
153 FileInputStream brs = null;
154 InputStreamReader isr = null;
155 BufferedReader br = null;
157 List<String> columnNames = new ArrayList<String>();
158 List<String> commonValues = new ArrayList<String>();
161 brs = new FileInputStream(tempfile);
162 isr = new InputStreamReader(brs, Constant.ENCODING_UTF8);
163 br = new BufferedReader(isr);
165 String commonField = br.readLine();
166 String[] fields = commonField.split("|",-1);
167 for(String com : fields){
168 String[] comNameAndValue = com.split("=",2);
169 columnNames.add(comNameAndValue[0].trim());
170 commonValues.add(comNameAndValue[1]);
173 String columnName = br.readLine();
174 String[] names = columnName.split("|",-1);
175 for(String name : names){
176 columnNames.add(name);
179 String xmlPathAndFileName = this.setColumnNames(nename, columnNames,type);
181 String valueLine = "";
182 List<String> valuelist = new ArrayList<String>();
184 while (br.readLine() != null) {
186 if (valueLine.trim().equals("")) {
190 String [] values = valueLine.split("|",-1);
192 valuelist.addAll(commonValues);
193 for(String value : values){
194 valuelist.add(value);
196 this.appendLine(valuelist, bos);
210 String[] fileKeys = this.createZipFile(csvpathAndFileName,xmlPathAndFileName,nename);
212 Properties ftpPro = configurationInterface.getProperties();
213 String ip = ftpPro.getProperty("ftp_ip");
214 String port = ftpPro.getProperty("ftp_port");
215 String ftp_user = ftpPro.getProperty("ftp_user");
216 String ftp_password = ftpPro.getProperty("ftp_password");
218 String ftp_passive = ftpPro.getProperty("ftp_passive");
219 String ftp_type = ftpPro.getProperty("ftp_type");
220 String remoteFile = ftpPro.getProperty("ftp_remote_path");
221 this.ftpStore(fileKeys,ip,port,ftp_user,ftp_password,ftp_passive,ftp_type,remoteFile);
223 String message = this.createMessage(fileKeys[1], ftp_user, ftp_password, ip, port, countNum,nename);
226 this.setMessage(message);
227 } catch (IOException e) {
228 log.error("processPMCsv is fail ",e);
245 } catch (Exception e){
253 private boolean processCMXml(File tempfile, String nename, String type) {
255 String csvpath = localPath+nename+"/"+type+"/";
256 File csvpathfile = new File(csvpath);
257 if(!csvpathfile.exists()){
258 csvpathfile.mkdirs();
260 String csvFileName = nename +dateFormat.format(new Date())+ System.nanoTime();
261 String csvpathAndFileName = csvpath+csvFileName+".csv";
262 BufferedOutputStream bos = null;
263 FileOutputStream fos = null;
265 fos = new FileOutputStream(csvpathAndFileName,false);
266 bos = new BufferedOutputStream(fos, 10240);
267 } catch (FileNotFoundException e1) {
268 log.error("FileNotFoundException "+StringUtil.getStackTrace(e1));
271 boolean FieldNameFlag = false;
272 boolean FieldValueFlag = false;
275 String xmlPathAndFileName = null;
276 String localName = null;
277 String endLocalName = null;
280 ArrayList<String> names = new ArrayList<String>();// colname
281 LinkedHashMap<String, String> nameAndValue = new LinkedHashMap<String, String>();
284 FileInputStream fis = null;
285 InputStreamReader isr = null;
286 XMLStreamReader reader = null;
288 fis = new FileInputStream(tempfile);
289 isr = new InputStreamReader(fis, Constant.ENCODING_UTF8);
290 XMLInputFactory fac = XMLInputFactory.newInstance();
291 reader = fac.createXMLStreamReader(isr);
293 boolean setcolum = true;
294 while (reader.hasNext()){
296 event = reader.next();
298 case XMLStreamConstants.START_ELEMENT:
299 localName = reader.getLocalName();
300 if ("FieldName".equalsIgnoreCase(localName)){
301 FieldNameFlag = true;
304 if ("N".equalsIgnoreCase(localName)){
305 String colName = reader.getElementText().trim();
309 if ("FieldValue".equalsIgnoreCase(localName)){
310 FieldValueFlag = true;
315 xmlPathAndFileName = this.setColumnNames(nename, names,type);
319 if ("Object".equalsIgnoreCase(localName)){
320 int ac = reader.getAttributeCount();
321 for (int i = 0; i < ac; i++){
322 if ("rmUID".equalsIgnoreCase(reader.getAttributeLocalName(i))){
323 rmUID = reader.getAttributeValue(i).trim();
326 nameAndValue.put("rmUID", rmUID);
328 if ("V".equalsIgnoreCase(localName)) {
329 index = Integer.parseInt(reader
330 .getAttributeValue(0)) - 1;
331 String currentName = names.get(index);
332 String v = reader.getElementText().trim();
333 nameAndValue.put(currentName, v);
337 case XMLStreamConstants.CHARACTERS:
339 case XMLStreamConstants.END_ELEMENT:
340 endLocalName = reader.getLocalName();
342 if ("FieldName".equalsIgnoreCase(endLocalName)){
343 FieldNameFlag = false;
345 if ("FieldValue".equalsIgnoreCase(endLocalName)){
346 FieldValueFlag = false;
348 if ("Object".equalsIgnoreCase(endLocalName)){
350 this.appendLine(nameAndValue,bos);
351 nameAndValue.clear();
355 } catch (Exception e)
357 log.error(""+StringUtil.getStackTrace(e));
358 event = reader.next();
372 String[] fileKeys = this.createZipFile(csvpathAndFileName,xmlPathAndFileName,nename);
374 Properties ftpPro = configurationInterface.getProperties();
375 String ip = ftpPro.getProperty("ftp_ip");
376 String port = ftpPro.getProperty("ftp_port");
377 String ftp_user = ftpPro.getProperty("ftp_user");
378 String ftp_password = ftpPro.getProperty("ftp_password");
380 String ftp_passive = ftpPro.getProperty("ftp_passive");
381 String ftp_type = ftpPro.getProperty("ftp_type");
382 String remoteFile = ftpPro.getProperty("ftp_remote_path");
383 this.ftpStore(fileKeys,ip,port,ftp_user,ftp_password,ftp_passive,ftp_type,remoteFile);
385 String message = this.createMessage(fileKeys[1], ftp_user, ftp_password, ip, port, countNum,nename);
388 this.setMessage(message);
389 } catch (Exception e){
390 log.error(""+StringUtil.getStackTrace(e));
410 } catch (Exception e){
417 private void setMessage(String message) {
420 collectResultChannel.put(message);
421 } catch (Exception e) {
422 log.error("collectResultChannel.put(message) is error "+StringUtil.getStackTrace(e));
426 private String createMessage(String zipName,String user,String pwd,String ip, String port,int countNum, String nename) {
428 StringBuffer strBuffer = new StringBuffer();
430 .append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
431 + "<FILE_DATA_READY_UL xmlns:xsi=\" http://www.w3.org/2001/XMLSchema-instance\">"
432 + "<Header SessionID=\"");
433 strBuffer.append("");
434 strBuffer.append("\" LicenceID=\"");
435 strBuffer.append("");
436 strBuffer.append("\" SystemID=\"");
437 strBuffer.append("");
438 strBuffer.append("\" Time=\"");
439 strBuffer.append( dateFormat2.format(new Date()));
440 strBuffer.append("\" PolicyID=\"");
441 strBuffer.append("");
442 strBuffer.append("\"/><Body>");
443 strBuffer.append("<DataCatalog>");
444 strBuffer.append("");
445 strBuffer.append("</DataCatalog><GroupID>");
446 strBuffer.append(nename);
447 strBuffer.append("</GroupID><DataSourceName>");
448 strBuffer.append("");
449 strBuffer.append("</DataSourceName><InstanceID>");
450 strBuffer.append("");
451 strBuffer.append("</InstanceID><FileFormat>");
452 strBuffer.append("csv");
453 strBuffer.append("</FileFormat><CharSet>");
454 strBuffer.append("gbk");
455 strBuffer.append("</CharSet><FieldSeparator>");
456 strBuffer.append("|");
457 strBuffer.append("</FieldSeparator><IsCompressed>");
458 strBuffer.append("true");
459 strBuffer.append("</IsCompressed><StartTime>");
460 strBuffer.append(dateFormat2.format(new Date()));
461 strBuffer.append("</StartTime><EndTime>");
462 strBuffer.append("");
463 strBuffer.append("</EndTime><FileList>");
464 strBuffer.append(zipName);
465 strBuffer.append("</FileList><ConnectionString>");
466 strBuffer.append("ftp://" + user + ":" + pwd + "@" + ip + ":" + port);
467 strBuffer.append("</ConnectionString>");
468 strBuffer.append("<DataCount>");
469 strBuffer.append(countNum);
470 strBuffer.append("</DataCount>");
472 strBuffer.append("<FileSize>").append("").append("</FileSize>");
473 strBuffer.append("<DataGranularity>").append("").append("</DataGranularity>");
476 strBuffer.append("</Body></FILE_DATA_READY_UL>");
477 return strBuffer.toString();
481 private void ftpStore(String[] fileKeys, String ip, String port, String ftp_user, String ftp_password,
482 String ftp_passive, String ftp_type, String remoteFile) {
483 String zipFilePath = fileKeys[0];
486 FTPInterface ftpClient;
487 if("ftp".equalsIgnoreCase(ftp_type)){
488 ftpClient = new FTPSrv();
490 ftpClient = new SFTPSrv();
495 ftpClient.login(ip, Integer.parseInt(port), ftp_user, ftp_password, "GBK", Boolean.parseBoolean(ftp_passive), 5*60*1000);
496 } catch (Exception e) {
497 log.error("login fail,ip=["+ip+"] port=["+port+"] user=["+ftp_user+"]pwd=["+ftp_password+"]"+StringUtil.getStackTrace(e));
500 ftpClient.store(zipFilePath, remoteFile);
501 log.debug("store ["+zipFilePath+"]to["+remoteFile+"]");
503 FileUtils.deleteQuietly(new File(zipFilePath));
508 private String[] createZipFile(String csvpathAndFileName,String xmlPathAndFileName,String nename) {
510 String zipPath = resultPath+nename +dateFormat.format(new Date())+"_"+System.nanoTime();
512 File destDir = new File(zipPath);
516 FileUtils.copyFileToDirectory(new File(csvpathAndFileName), destDir);
517 FileUtils.copyFileToDirectory(new File(xmlPathAndFileName), destDir);
518 } catch (IOException e) {
522 String destFilePath = zipPath + ".zip";
524 Zip zip = new Zip(destDir.getAbsolutePath(), destFilePath);
525 zip.setCompressLevel(9);
528 FileUtils.deleteDirectory(destDir);
529 } catch (IOException e) {
530 log.error("zip.compress() is fail "+StringUtil.getStackTrace(e));
532 return new String[] { destFilePath, zipPath + ".zip"};
536 private String setColumnNames(String nename, List<String> names,String type) {
538 String xmlpath = localPath+nename +"/"+type+"/";
539 File xmlpathfile = new File(xmlpath);
540 if(!xmlpathfile.exists()){
541 xmlpathfile.mkdirs();
543 String xmlFileName = nename +dateFormat.format(new Date())+ System.nanoTime();
544 String fieldLine = "";
545 for (int i = 0; i < names.size(); i++) {
546 String field = "\t<Field>\r\n" + "\t\t<FieldNo>" + i
547 + "</FieldNo>\r\n" + "\t\t<FieldName>"
548 + names.get(i) + "</FieldName>\r\n"
549 + "\t\t<FieldType>" + names.get(i)
551 + "\t\t<FieldNameOther>" + names.get(i)
552 + "</FieldNameOther>\r\n" +
554 fieldLine = fieldLine + field;
557 String str = "<?xml version=\"1.0\" encoding=\"gbk\"?>\r\n"
558 + "<xml>\r\n" + "<FILE_STRUCTURE>\r\n" + fieldLine
559 + "</FILE_STRUCTURE>\r\n" + "</xml>\r\n";
560 String xmlPathAndFileName = xmlpath+xmlFileName+".xml";
562 this.writeDetail(xmlPathAndFileName,str);
563 } catch (Exception e) {
564 log.error("writeDetail is fail ,xmlFileName="+xmlFileName +StringUtil.getStackTrace(e));
567 return xmlPathAndFileName;
570 private void writeDetail(String detailFileName,String str) throws Exception {
571 OutputStreamWriter writer = null;
572 OutputStream readOut = null;
574 readOut = new FileOutputStream(new File(detailFileName), false);
575 writer = new OutputStreamWriter(readOut);
592 private void appendLine(LinkedHashMap<String, String> nameAndValue,BufferedOutputStream bos) {
593 StringBuilder lineDatas = new StringBuilder();
595 for (String key : nameAndValue.keySet()) {
596 lineDatas.append(nameAndValue.get(key)).append("|");
599 bos.write(lineDatas.toString().getBytes());
600 bos.write("\n".getBytes());
601 } catch (IOException e) {
602 log.error("appendLine error "+StringUtil.getStackTrace(e));
606 private void appendLine(List<String> values,BufferedOutputStream bos) {
607 StringBuilder lineDatas = new StringBuilder();
609 for (String value : values) {
610 lineDatas.append(value).append("|");
613 bos.write(lineDatas.toString().getBytes());
614 bos.write("\n".getBytes());
615 } catch (IOException e) {
616 log.error("appendLine error "+StringUtil.getStackTrace(e));
620 public List<File> decompressed(String fileName){
621 List<File> filelist = new ArrayList<File>();
623 if (fileName.indexOf(".gz") > 1)
625 // decompressFile = deGz(file);
626 } else if (fileName.indexOf(".zip") > 1)
629 File[] files = deZip(new File(fileName));
630 for(File temp :files){
633 } catch (Exception e) {
634 log.error("decompressed is fail "+StringUtil.getStackTrace(e));
638 filelist.add(new File(fileName));
644 public File[] deZip(File file) throws Exception{
646 String regx = "(.*).zip";
647 Pattern p = Pattern.compile(regx);
648 Matcher m = p.matcher(file.getName());
651 String orgFile = localPath + m.group(1) + "/";
652 UnZip unzip = new UnZip(file.getAbsolutePath(), orgFile);
654 file = new File(orgFile);
656 File[] files = file.listFiles();
662 private List<String> ftpDownload(CollectVo collectVo) {
664 List<String> fileList = new ArrayList<String>();
666 String ip = collectVo.getIP();
668 String port = collectVo.getPort();
670 String user = collectVo.getUser();
672 String password = collectVo.getPassword();
674 String passivemode = collectVo.getPassive();
676 String ftpType = collectVo.getFtptype();
677 FTPInterface ftpClient = null;
678 if("ftp".equalsIgnoreCase(ftpType)){
679 ftpClient = new FTPSrv();
681 ftpClient = new SFTPSrv();
686 ftpClient.login(ip, Integer.parseInt(port), user, password, "GBK", Boolean.parseBoolean(passivemode), 5*60*1000);
687 } catch (Exception e) {
688 log.error("login fail,ip=["+ip+"] port=["+port+"] user=["+user+"]password=["+password+"]"+StringUtil.getStackTrace(e));
693 String dir = collectVo.getRemotepath();
694 boolean cdsucess = ftpClient.chdir(dir);
696 AFtpRemoteFile[] remoteFiles = (AFtpRemoteFile[]) ftpClient.list();
698 for(AFtpRemoteFile ftpRemoteFile: remoteFiles){
699 if(!new File(localPath).exists()){
701 new File(localPath).mkdir();
702 } catch (Exception e) {
703 log.error("create localPath is fail localPath="+localPath+" "+StringUtil.getStackTrace(e));
707 if(!new File(localPath).exists()){
708 new File(localPath).mkdirs();
711 String localFileName = localPath + ftpRemoteFile.getFileName();
712 File loaclFile = new File(localFileName);
713 if (loaclFile.exists()) {
717 boolean flag = ftpClient.downloadFile(ftpRemoteFile.getAbsFileName(), localFileName);
720 fileList.add(localFileName);
722 log.error("download file fail fileName="+ftpRemoteFile.getAbsFileName());
727 log.error("chdir is faill dir =["+dir+"]");