import org.onap.vfc.nfvo.emsdriver.commons.ftp.AFtpRemoteFile;
import org.onap.vfc.nfvo.emsdriver.commons.ftp.FTPInterface;
import org.onap.vfc.nfvo.emsdriver.commons.ftp.FTPSrv;
+import org.onap.vfc.nfvo.emsdriver.commons.ftp.RemoteFile;
import org.onap.vfc.nfvo.emsdriver.commons.model.CollectMsg;
import org.onap.vfc.nfvo.emsdriver.commons.model.CollectVo;
import org.onap.vfc.nfvo.emsdriver.commons.utils.DateUtil;
public void run(){
cmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_CHANNEL_KEY);
- pmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_CHANNEL_KEY);
+ pmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_PM_CHANNEL_KEY);
try {
collectMsgHandle(data);
} catch (Exception e) {
continue;
String nename = ma.group(1);
boolean parseResult = false;
- if("ems-resource".equalsIgnoreCase(collectVo.getType())){
+ if(Constant.COLLECT_TYPE_CM.equalsIgnoreCase(collectVo.getType())){
parseResult = processCMXml(tempfile, nename,"CM");
}else{
if(unfileName.indexOf(".csv") > 0){
parseResult = processPMCsv(tempfile);
}else{
- parseResult = processPMCsv(tempfile);
+ parseResult = processPMXml(tempfile);
}
}
}
}
+ public boolean processPMXml(File file) {
+
+ FileInputStream fis = null;
+ InputStreamReader isr = null;
+ XMLStreamReader reader = null;
+ try {
+ fis = new FileInputStream(file);
+ isr = new InputStreamReader(fis, Constant.ENCODING_UTF8);
+
+ XMLInputFactory fac = XMLInputFactory.newInstance();
+ reader = fac.createXMLStreamReader(isr);
+
+ boolean fileHeaderStart = false;
+ boolean measurementStart = false;
+ boolean pmNameFlag = false;
+ boolean pmDataFlag = false;
+ boolean objectFlag = true;
+
+ int index = -1;
+ int nameIndex = -1;
+ String currentMea = null;
+ String subName = null;
+ String localName = null;
+ String endLocalName = null;
+ String objectType = null;
+
+
+ LinkedHashMap<String, String> commonNameAndValue = new LinkedHashMap<String, String>();
+ LinkedHashMap<String, String> pmDatas = null;
+ LinkedHashMap<Integer, String> pmNames = null;
+
+
+ int event = -1;
+ while (reader.hasNext()) {
+ try{
+ event = reader.next();
+
+ switch (event) {
+ case XMLStreamConstants.START_ELEMENT:
+ localName = reader.getLocalName();
+ if ("FileHeader".equalsIgnoreCase(localName)) {
+ fileHeaderStart = true;
+ }
+ if(fileHeaderStart){
+ if(!"FileHeader".equalsIgnoreCase(localName)){
+ commonNameAndValue.put(localName,reader.getElementText().trim());
+ }
+
+ }
+ if ("Measurements".equalsIgnoreCase(localName)) {
+ // a new Measurement starts
+ measurementStart = true;
+ }
+ if (measurementStart) {
+ // measurement handler
+ if ("ObjectType".equalsIgnoreCase(localName)) {
+ objectType = reader.getElementText().trim();
+ commonNameAndValue.put("ObjectType",objectType);
+ }
+ if ("PmName".equalsIgnoreCase(localName)) {
+ pmNameFlag = true;
+ pmNames = new LinkedHashMap<Integer, String>();
+
+ }
+ if (pmNameFlag) {
+ // pmname handler, add columnNames
+ if ("N".equalsIgnoreCase(localName)) {
+ nameIndex = Integer.parseInt(getXMLAttribute(reader, "i"));
+ String text = reader.getElementText().trim();
+ pmNames.put(nameIndex, text);
+ }
+ }
+ if ("PmData".equalsIgnoreCase(localName)) {
+ pmDataFlag = true;
+ pmDatas = new LinkedHashMap<String, String>();
+ }
+
+ if (pmDataFlag) {
+ // pmdata handler
+ if ("Object".equalsIgnoreCase(localName)) {
+ objectFlag = true;
+ int n = reader.getAttributeCount();
+ for(int i = 0; i < n; i++) {
+ String name = reader.getAttributeLocalName(i);
+ commonNameAndValue.put(name, reader.getAttributeValue(i));
+ }
+ }
+ if (objectFlag) {
+
+ // add columnValues
+ if ("V".equalsIgnoreCase(localName)) {
+ String indexStr = getXMLAttribute(reader, "i");
+ if(indexStr == null) {
+ log.error("ERROR: illegal value index");
+ continue;
+ }
+ index = Integer.parseInt(indexStr);
+ String name = pmNames.get(index);
+ if(name == null){
+ log.error("illegal data: valueIndex="+index);
+ continue;
+ }
+
+ String value = reader.getElementText().trim();
+ pmDatas.put(name, value);
+ }
+ if ("CV".equalsIgnoreCase(localName)) {
+
+ String indexStr = getXMLAttribute(reader, "i");
+ if(indexStr == null) {
+ log.error("ERROR: illegal value index");
+ continue;
+ }
+ index = Integer.parseInt(indexStr);
+
+ currentMea = pmNames.get(index);
+ if(currentMea == null){
+ log.error("illegal data: valueIndex="+index);
+ continue;
+ }
+ }
+
+ if ("SN".equalsIgnoreCase(localName)) {
+ subName = reader.getElementText().trim();
+
+ }
+ if ("SV".equalsIgnoreCase(localName)) {
+ String subValue = reader.getElementText().trim();
+// pmDatas.put(currentMea+subName, subValue);
+ pmDatas.put(subName, subValue);
+ }
+ }
+ }
+
+ }
+
+ break;
+ case XMLStreamConstants.CHARACTERS:
+ // ...
+ break;
+ case XMLStreamConstants.END_ELEMENT:
+ // ...
+ endLocalName = reader.getLocalName();
+ if("Object".equalsIgnoreCase(endLocalName)){
+ objectFlag = false;
+ pmDatas.putAll(commonNameAndValue);
+ try {
+ pmResultChannel.put(pmDatas);
+
+ } catch (InterruptedException e) {
+ log.error("collectResultChannel.put(resultMap) error ",e);
+ }
+// System.out.println(pmDatas);
+// pmDatas.clear();
+ }
+ if (endLocalName.equalsIgnoreCase("PmData")) {
+ pmDataFlag = false;
+ }
+
+ if (endLocalName.equalsIgnoreCase("PmName")) {
+ pmNameFlag = false;
+ }
+ if (endLocalName.equalsIgnoreCase("Measurements")) {
+ // a measurement over
+ measurementStart = false;
+ }
+
+ if("FileHeader".equalsIgnoreCase(endLocalName)){
+ fileHeaderStart = false;
+ }
+ break;
+ }
+ }catch (Exception e){
+ log.error("",e);
+ event = reader.next();
+ }
+ }
+
+ } catch (Exception e) {
+ log.error("processPMXml is Exception ",e);
+ return false;
+ } finally{
+ try{
+ if(reader != null)reader.close();
+ if(isr!= null)isr.close();
+ if(fis != null)fis.close();
+ }catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ return true;
+ }
+
+ private String getXMLAttribute(XMLStreamReader reader, String obj) {
+ String res = null;
+ if(obj == null || reader == null){
+ return res;
+ }
+ int n = reader.getAttributeCount();
+ for(int i = 0; i < n; i++) {
+ String name = reader.getAttributeLocalName(i);
+ if(obj.equalsIgnoreCase(name)){
+ res = reader.getAttributeValue(i);
+ }
+ }
+ return res;
+ }
+
public boolean processPMCsv(File tempfile) {
FileInputStream brs = null;
for(String name : names){
columnNames.add(name);
}
-
-// xmlPathAndFileName = this.setColumnNames(nename, columnNames,type);
-
+
String valueLine = "";
List<String> valuelist = new ArrayList<String>();
//login
try {
+ log.info("ftp login ,ip=["+ip+"] port=["+port+"] user=["+user+"]password=["+password+"]");
ftpClient.login(ip, Integer.parseInt(port), user, password, "GBK", Boolean.parseBoolean(passivemode), 5*60*1000);
} catch (Exception e) {
log.error("login fail,ip=["+ip+"] port=["+port+"] user=["+user+"]password=["+password+"]"+StringUtil.getStackTrace(e));
String []FPath = dir.split(";");
for(int i=0;i<FPath.length;i++){
int oldSize = searchExprList.size();
- String conpath = FPath[i];
+ String conpath = FPath[i] + collectVo.getMatch();
Hashtable<String,String> varMap = new Hashtable<String,String>();
- long[] d = DateUtil.getScanScope(new Date(), 900);
+ long collectPeriod = 900;
+ try {
+ collectPeriod = Long.parseLong(collectVo.getGranularity())*60;
+ log.info("collectPeriod =["+collectPeriod+"]");
+ } catch (NumberFormatException e) {
+ e.printStackTrace();
+ }
+ long[] d = DateUtil.getScanScope(new Date(), collectPeriod);
searchExprList.add(VarExprParser.replaceVar(conpath,d[0],d[1]));
varMap.clear();
varMap = null;
- log.info("["+conpath+"],result["+(searchExprList.size()-oldSize)+"] path");
+ log.info("["+conpath+"] result["+(searchExprList.size()-oldSize)+"] path");
conpath = null;
}
- searchExprList =getLocalPathNoRegular(searchExprList);
+ String nowdir = null;
+ try {
+ nowdir = ftpClient.pwd();
+ searchExprList =getPathNoRegular(searchExprList,ftpClient);
+ } catch (Exception e1) {
+ log.error(" collect fail ",e1);
+ return fileList;
+ }
List<AFtpRemoteFile> remoteFiles = new ArrayList<AFtpRemoteFile>();
for(String expr :searchExprList){
+ ftpClient.chdir(nowdir);
String keys[] = parseExprKeys(expr);
String ftpRegular = keys[1];
String ftpDir = keys[0];
- boolean cdsucess = ftpClient.chdir(expr);
+ boolean cdsucess = ftpClient.chdir(ftpDir);
if(cdsucess){
AFtpRemoteFile[] arf = (AFtpRemoteFile[]) ftpClient.list();
- log.info(" list ["+ftpDir+"],result["+(arf==null?"null":arf.length)+"] files");
+ log.info(" list ["+ftpDir+"] result["+(arf==null?"null":arf.length)+"] files");
//filter
rfileFilter(remoteFiles,arf,ftpRegular);
}
}else{
- log.error("chdir is faill dir =["+dir+"]");
+ log.error("cd dir is faill dir =["+ftpDir+"]");
}
}
return dirkeys;
}
- public List<String> getLocalPathNoRegular(List<String> searchExprList){
+
+ public List<String> getPathNoRegular(List<String> searchExprList,FTPInterface ftpCache) throws Exception{
boolean isregular = false;
List<String> regularList = new ArrayList<String>();
for(String regular : searchExprList){
try{
lpattern = Pattern.compile("(.*/)<([^/]+)>(/.*)");
}catch (Exception e) {
- log.info("["+regular+"]compile fails:"+e.getMessage());
+ log.error("["+regular+"]compile fails:"+e.getMessage());
+ e.printStackTrace();
}
Matcher matcher = lpattern.matcher(regular);
if(matcher.find()){
isregular = true;
String parpath = matcher.group(1);
- File[] arryFile = new File(parpath).listFiles();
- for(File file :arryFile){
- if(file.isDirectory()&&file.getName().matches(matcher.group(2))){
- regularList.add(matcher.group(1)+file.getName()+matcher.group(3));
+ try{
+ boolean isin = ftpCache.chdir(parpath);
+ if(isin){
+ log.info("cd dir [" + parpath + "] sucess");
+ }else{
+ log.error("cd dir [" + parpath + "] fail");
+ }
+ }catch(Exception e){
+ log.error(" cd dir ["+parpath+"]fail",e);
+ throw e;
+ }
+ RemoteFile[] remotef = ftpCache.list();
+ for(RemoteFile aremote :remotef){
+ if(aremote.isDirectory()&&aremote.getFileName().matches(matcher.group(2))){
+ regularList.add(matcher.group(1)+aremote.getFileName()+matcher.group(3));
}
}
}else{
regularList.add(regular);
- }
-
+ }
}
if(isregular==true){
- getLocalPathNoRegular(regularList);
+ getPathNoRegular(regularList,ftpCache);
}
return regularList;
}