Modify emsdriver Code
[vfc/nfvo/driver/ems.git] / ems / boco / src / main / java / org / onap / vfc / nfvo / emsdriver / collector / TaskThread.java
index fdbf3ba..b4cec85 100644 (file)
@@ -47,6 +47,7 @@ import org.onap.vfc.nfvo.emsdriver.commons.constant.Constant;
 import org.onap.vfc.nfvo.emsdriver.commons.ftp.AFtpRemoteFile;
 import org.onap.vfc.nfvo.emsdriver.commons.ftp.FTPInterface;
 import org.onap.vfc.nfvo.emsdriver.commons.ftp.FTPSrv;
+import org.onap.vfc.nfvo.emsdriver.commons.ftp.RemoteFile;
 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectMsg;
 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectVo;
 import org.onap.vfc.nfvo.emsdriver.commons.utils.DateUtil;
@@ -94,7 +95,7 @@ public class TaskThread implements Runnable{
        public void run(){
                
                cmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_CHANNEL_KEY);
-               pmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_CHANNEL_KEY);
+               pmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_PM_CHANNEL_KEY);
                try {
                        collectMsgHandle(data);
                } catch (Exception e) {
@@ -129,13 +130,13 @@ public class TaskThread implements Runnable{
                          continue;
                        String nename = ma.group(1);
                        boolean parseResult = false;
-                       if("ems-resource".equalsIgnoreCase(collectVo.getType())){
+                       if(Constant.COLLECT_TYPE_CM.equalsIgnoreCase(collectVo.getType())){
                                parseResult = processCMXml(tempfile, nename,"CM");
                        }else{
                                if(unfileName.indexOf(".csv") > 0){
                                        parseResult = processPMCsv(tempfile);
                                }else{
-                                       parseResult = processPMCsv(tempfile);
+                                       parseResult = processPMXml(tempfile);
                                }
                        }
                        
@@ -148,6 +149,214 @@ public class TaskThread implements Runnable{
                }
        }
        
+       public boolean processPMXml(File file) {
+
+               FileInputStream fis = null;
+               InputStreamReader isr = null;
+               XMLStreamReader reader = null;
+               try {
+                        fis = new FileInputStream(file);
+                        isr = new InputStreamReader(fis, Constant.ENCODING_UTF8);
+
+                       XMLInputFactory fac = XMLInputFactory.newInstance();
+                        reader = fac.createXMLStreamReader(isr);
+                        
+                       boolean fileHeaderStart = false;
+                       boolean measurementStart = false;
+                       boolean pmNameFlag = false;
+                       boolean pmDataFlag = false;
+                       boolean objectFlag = true;
+
+                       int index = -1;
+                       int nameIndex = -1;
+                       String currentMea = null;
+                       String subName = null;
+                       String localName = null;
+                       String endLocalName = null;
+                       String objectType = null;
+
+                       
+                       LinkedHashMap<String, String> commonNameAndValue = new LinkedHashMap<String, String>();
+                       LinkedHashMap<String, String> pmDatas = null;
+                       LinkedHashMap<Integer, String> pmNames = null;
+
+
+                       int event = -1;
+                       while (reader.hasNext()) {
+                               try{
+                                       event = reader.next();
+       
+                                       switch (event) {
+                                       case XMLStreamConstants.START_ELEMENT:
+                                               localName = reader.getLocalName();
+                                               if ("FileHeader".equalsIgnoreCase(localName)) {
+                                                       fileHeaderStart = true;
+                                               }
+                                               if(fileHeaderStart){
+                                                       if(!"FileHeader".equalsIgnoreCase(localName)){
+                                                               commonNameAndValue.put(localName,reader.getElementText().trim());
+                                                       }
+                                                       
+                                               }
+                                               if ("Measurements".equalsIgnoreCase(localName)) {
+                                                       // a new Measurement starts
+                                                       measurementStart = true; 
+                                               }
+                                               if (measurementStart) {
+                                                       // measurement handler
+                                                       if ("ObjectType".equalsIgnoreCase(localName)) {
+                                                               objectType = reader.getElementText().trim();
+                                                               commonNameAndValue.put("ObjectType",objectType);
+                                                       }
+                                                       if ("PmName".equalsIgnoreCase(localName)) {
+                                                               pmNameFlag = true;
+                                                               pmNames = new LinkedHashMap<Integer, String>();
+       
+                                                       }
+                                                       if (pmNameFlag) {
+                                                               // pmname handler, add columnNames
+                                                               if ("N".equalsIgnoreCase(localName)) {
+                                                                       nameIndex = Integer.parseInt(getXMLAttribute(reader, "i"));
+                                                                       String text = reader.getElementText().trim();
+                                                                       pmNames.put(nameIndex, text);
+                                                               }
+                                                       }
+                                                       if ("PmData".equalsIgnoreCase(localName)) {
+                                                               pmDataFlag = true;
+                                                               pmDatas = new LinkedHashMap<String, String>();
+                                                       }
+                                                       
+                                                       if (pmDataFlag) {
+                                                               // pmdata handler
+                                                               if ("Object".equalsIgnoreCase(localName)) {
+                                                                       objectFlag = true;
+                                                                       int n = reader.getAttributeCount();
+                                                                       for(int i = 0; i < n; i++) {
+                                                                               String name = reader.getAttributeLocalName(i);
+                                                                               commonNameAndValue.put(name, reader.getAttributeValue(i));
+                                                                       }
+                                                               }
+                                                               if (objectFlag) {
+       
+                                                                       // add columnValues
+                                                                       if ("V".equalsIgnoreCase(localName)) {
+                                                                               String indexStr = getXMLAttribute(reader, "i");
+                                                                               if(indexStr == null) {
+                                                                                       log.error("ERROR: illegal value index");
+                                                                                       continue;
+                                                                               }
+                                                                               index = Integer.parseInt(indexStr);
+                                                                               String name = pmNames.get(index);
+                                                                               if(name == null){
+                                                                                       log.error("illegal data: valueIndex="+index);
+                                                                                       continue;
+                                                                               }
+                                                                               
+                                                                               String value = reader.getElementText().trim();
+                                                                               pmDatas.put(name, value);
+                                                                       }
+                                                                       if ("CV".equalsIgnoreCase(localName)) {
+                                                                       
+                                                                               String indexStr = getXMLAttribute(reader, "i");
+                                                                               if(indexStr == null) {
+                                                                                       log.error("ERROR: illegal value index");
+                                                                                       continue;
+                                                                               }
+                                                                               index = Integer.parseInt(indexStr);
+                                                                               
+                                                                               currentMea = pmNames.get(index);
+                                                                               if(currentMea == null){
+                                                                                       log.error("illegal data: valueIndex="+index);
+                                                                                       continue;
+                                                                               }
+                                                                       }
+       
+                                                                       if ("SN".equalsIgnoreCase(localName)) {
+                                                                               subName = reader.getElementText().trim();
+                                                                               
+                                                                       }
+                                                                       if ("SV".equalsIgnoreCase(localName)) {
+                                                                               String subValue = reader.getElementText().trim();
+//                                                                             pmDatas.put(currentMea+subName, subValue);
+                                                                               pmDatas.put(subName, subValue);
+                                                                       }
+                                                               }
+                                                       }
+       
+                                               }       
+       
+                                               break;
+                                       case XMLStreamConstants.CHARACTERS:
+                                               // ...
+                                               break;
+                                       case XMLStreamConstants.END_ELEMENT:
+                                               // ...
+                                               endLocalName = reader.getLocalName();
+                                               if("Object".equalsIgnoreCase(endLocalName)){
+                                                       objectFlag = false;
+                                                       pmDatas.putAll(commonNameAndValue);
+                                                       try {
+                                                               pmResultChannel.put(pmDatas);
+                                                               
+                                                       } catch (InterruptedException e) {
+                                                               log.error("collectResultChannel.put(resultMap) error ",e);
+                                                       }
+//                                                     System.out.println(pmDatas);
+//                                                     pmDatas.clear();
+                               }
+                                               if (endLocalName.equalsIgnoreCase("PmData")) {
+                                                       pmDataFlag = false;
+                                               }
+       
+                                               if (endLocalName.equalsIgnoreCase("PmName")) {
+                                                       pmNameFlag = false;
+                                               }
+                                               if (endLocalName.equalsIgnoreCase("Measurements")) {
+                                                       // a measurement over
+                                                       measurementStart = false;
+                                               }
+                                               
+                                               if("FileHeader".equalsIgnoreCase(endLocalName)){
+                                   fileHeaderStart = false;
+                               }
+                                               break;
+                                       }
+                               }catch (Exception e){
+                                       log.error("",e);
+                                       event = reader.next();
+                               }
+                       }
+                       
+               } catch (Exception e) {
+                       log.error("processPMXml is Exception ",e);
+                       return false;
+               } finally{
+                       try{
+                               if(reader != null)reader.close();
+                               if(isr!= null)isr.close();
+                               if(fis != null)fis.close();
+                       }catch (Exception e) {
+                               e.printStackTrace();
+                       }
+               }
+               return true;
+       }
+               
+       private String getXMLAttribute(XMLStreamReader reader, String obj) {
+               String res = null;
+               if(obj == null || reader == null){
+                       return res;
+               }
+               int n = reader.getAttributeCount();
+               for(int i = 0; i < n; i++) {
+                       String name = reader.getAttributeLocalName(i);
+                       if(obj.equalsIgnoreCase(name)){
+                               res = reader.getAttributeValue(i);
+                       }
+               }
+               return res;
+       }
+       
        public boolean processPMCsv(File tempfile) {
                
                FileInputStream brs = null;
@@ -175,9 +384,7 @@ public class TaskThread implements Runnable{
                        for(String name : names){
                                columnNames.add(name);
                        }
-                       
-//                     xmlPathAndFileName = this.setColumnNames(nename, columnNames,type);
-                       
+
                        String valueLine = "";
                        List<String> valuelist = new ArrayList<String>();
                        
@@ -668,6 +875,7 @@ public class TaskThread implements Runnable{
                
                //login
                try {
+                       log.info("ftp login ,ip=["+ip+"] port=["+port+"] user=["+user+"]password=["+password+"]");
                        ftpClient.login(ip, Integer.parseInt(port), user, password, "GBK", Boolean.parseBoolean(passivemode), 5*60*1000);
                } catch (Exception e) {
                        log.error("login fail,ip=["+ip+"] port=["+port+"] user=["+user+"]password=["+password+"]"+StringUtil.getStackTrace(e));
@@ -680,27 +888,42 @@ public class TaskThread implements Runnable{
                String []FPath = dir.split(";");
                for(int i=0;i<FPath.length;i++){
                        int oldSize = searchExprList.size();
-                       String conpath = FPath[i];
+                       String conpath = FPath[i] + collectVo.getMatch();
                        Hashtable<String,String> varMap = new Hashtable<String,String>();
-                       long[] d = DateUtil.getScanScope(new Date(), 900);
+                       long collectPeriod = 900;
+                       try {
+                               collectPeriod = Long.parseLong(collectVo.getGranularity())*60;
+                               log.info("collectPeriod =["+collectPeriod+"]");
+                       } catch (NumberFormatException e) {
+                               e.printStackTrace();
+                       }
+                       long[] d = DateUtil.getScanScope(new Date(), collectPeriod);
                        searchExprList.add(VarExprParser.replaceVar(conpath,d[0],d[1]));
                        
                        varMap.clear();
                        varMap = null;
-                       log.info("["+conpath+"]result["+(searchExprList.size()-oldSize)+"] path");
+                       log.info("["+conpath+"] result["+(searchExprList.size()-oldSize)+"] path");
                        conpath = null;
                }
-               searchExprList =getLocalPathNoRegular(searchExprList);
+               String nowdir = null;
+               try {
+                       nowdir = ftpClient.pwd();
+                       searchExprList =getPathNoRegular(searchExprList,ftpClient);
+               } catch (Exception e1) {
+                       log.error(" collect fail ",e1);
+                       return fileList;
+               }
                List<AFtpRemoteFile> remoteFiles = new ArrayList<AFtpRemoteFile>();
                for(String expr :searchExprList){
+                       ftpClient.chdir(nowdir);
                        String keys[] = parseExprKeys(expr);
                        String ftpRegular = keys[1];
                        String ftpDir = keys[0];
                        
-                       boolean cdsucess = ftpClient.chdir(expr);
+                       boolean cdsucess = ftpClient.chdir(ftpDir);
                        if(cdsucess){
                                AFtpRemoteFile[] arf = (AFtpRemoteFile[]) ftpClient.list();
-                               log.info(" list ["+ftpDir+"]result["+(arf==null?"null":arf.length)+"] files");
+                               log.info(" list ["+ftpDir+"] result["+(arf==null?"null":arf.length)+"] files");
                                //filter
                                
                                rfileFilter(remoteFiles,arf,ftpRegular);
@@ -737,7 +960,7 @@ public class TaskThread implements Runnable{
                                }
                                
                        }else{
-                               log.error("chdir is faill dir =["+dir+"]");
+                               log.error("cd dir is faill dir =["+ftpDir+"]");
                        }
                }
                
@@ -792,7 +1015,8 @@ public class TaskThread implements Runnable{
                return dirkeys;
        }
        
-       public  List<String> getLocalPathNoRegular(List<String> searchExprList){
+       
+       public  List<String> getPathNoRegular(List<String> searchExprList,FTPInterface ftpCache)  throws Exception{
                boolean isregular = false;
                List<String> regularList = new ArrayList<String>();
                for(String regular : searchExprList){
@@ -800,26 +1024,37 @@ public class TaskThread implements Runnable{
                        try{
                                lpattern = Pattern.compile("(.*/)<([^/]+)>(/.*)"); 
                        }catch (Exception e) {
-                               log.info("["+regular+"]compile fails:"+e.getMessage());
+                               log.error("["+regular+"]compile fails:"+e.getMessage());
+                               e.printStackTrace();
                        }
                        
                        Matcher  matcher = lpattern.matcher(regular);
                        if(matcher.find()){
                                isregular = true;
                                String parpath  = matcher.group(1);
-                               File[] arryFile = new File(parpath).listFiles();
-                               for(File file :arryFile){
-                                       if(file.isDirectory()&&file.getName().matches(matcher.group(2))){
-                                               regularList.add(matcher.group(1)+file.getName()+matcher.group(3));
+                               try{
+                                       boolean isin = ftpCache.chdir(parpath);
+                                   if(isin){
+                                       log.info("cd dir [" + parpath + "] sucess");
+                                   }else{
+                                       log.error("cd dir [" + parpath + "] fail");
+                                   }
+                               }catch(Exception e){
+                                       log.error(" cd dir ["+parpath+"]fail",e);
+                                       throw e;
+                               }
+                               RemoteFile[] remotef = ftpCache.list();
+                               for(RemoteFile aremote :remotef){
+                                       if(aremote.isDirectory()&&aremote.getFileName().matches(matcher.group(2))){
+                                               regularList.add(matcher.group(1)+aremote.getFileName()+matcher.group(3));
                                        }       
                                }
                        }else{
                                regularList.add(regular);
-                       }
-                                       
+                       }               
                }
                if(isregular==true){
-                       getLocalPathNoRegular(regularList);
+                       getPathNoRegular(regularList,ftpCache);
                }
                return regularList;
        }