2 * Copyright 2017 BOCO Corporation. CMCC Technologies Co., Ltd
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onap.vfc.nfvo.emsdriver.collector.alarm;
18 import java.io.BufferedInputStream;
19 import java.io.BufferedOutputStream;
20 import java.io.IOException;
21 import java.net.Socket;
22 import java.net.SocketException;
23 import java.net.UnknownHostException;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.onap.vfc.nfvo.emsdriver.commons.constant.Constant;
29 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectVo;
30 import org.onap.vfc.nfvo.emsdriver.commons.utils.StringUtil;
31 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannel;
32 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannelFactory;
34 import com.alibaba.fastjson.JSONObject;
37 public class AlarmTaskThread extends Thread{
38 public Log log = LogFactory.getLog(AlarmTaskThread.class);
40 private HeartBeat heartBeat = null;
42 private boolean isStop = false;
43 private CollectVo collectVo = null;
44 private int read_timeout = Constant.READ_TIMEOUT_MILLISECOND;
47 private Socket socket = null;
48 private BufferedInputStream is = null;
49 private BufferedOutputStream dos = null;
51 private MessageChannel alarmChannel;
53 public AlarmTaskThread(CollectVo collectVo) {
55 this.collectVo = collectVo;
59 alarmChannel = MessageChannelFactory.getMessageChannel(Constant.ALARM_CHANNEL_KEY);
65 body = this.receive();
66 String alarm120 = this.build120Alarm(body);
68 this.send120Alarm(alarm120);
69 } catch (Exception e) {
73 } catch (Exception e) {
74 log.error(StringUtil.getStackTrace(e));
78 private void send120Alarm(String alarm120) {
81 alarmChannel.put(alarm120);
82 } catch (InterruptedException e) {
83 log.error(StringUtil.getStackTrace(e));
87 private String build120Alarm(String body) {
88 StringBuilder content = new StringBuilder(
89 "<?xml version='1.0' encoding='iso-8859-1'?>\n")
90 .append("<WholeMsg MsgMark='120' Priority='2' FieldNum='5'><FM_ALARM_MSG>\n");
93 JSONObject reagobj = JSONObject.parseObject(body);
95 Set<String> keys = reagobj.keySet();
97 for (String key : keys) {
99 String value = (String)reagobj.get(key);
100 content.append("<").append(key).append(">");
101 content.append(value);
102 content.append("</").append(key).append(">\n");
104 content.append("</FM_ALARM_MSG></WholeMsg>");
106 return content.toString();
110 public String receive() throws Exception {
113 String retString = null;
115 while (retString == null && !this.isStop) {
117 msg = MessageUtil.readOneMsg(is);
119 if("ackLoginAlarm".equalsIgnoreCase(msg.getMsgType().name)){
120 log.debug("receive login ack");
121 boolean suc = this.ackLoginAlarm(msg);
124 if(reqId == Integer.MAX_VALUE){
128 Msg msgheart = MessageUtil.putHeartBeatMsg(reqId);
129 heartBeat = new HeartBeat(socket,msgheart);
130 heartBeat.setName("CMCC_JT_HeartBeat");
137 if("ackHeartBeat".equalsIgnoreCase(msg.getMsgType().name)){
138 log.debug("received heartBeat��"+msg.getBody());
144 if("realTimeAlarm".equalsIgnoreCase(msg.getMsgType().name)){
145 log.debug("received alarm message");
146 retString = msg.getBody();
152 public void init() throws Exception {
155 String host = collectVo.getIP();
157 String port = collectVo.getPort();
159 String user = collectVo.getUser();
161 String password = collectVo.getPassword();
163 String read_timeout = collectVo.getRead_timeout();
164 if ((read_timeout != null) && (read_timeout.trim().length() > 0)) {
166 this.read_timeout = Integer.parseInt(read_timeout);
167 } catch (NumberFormatException e) {
168 log.error(StringUtil.getStackTrace(e));
171 log.debug("socket connect host=" + host + ", port=" + port);
173 int portInt = Integer.parseInt(port);
174 socket = new Socket(host, portInt);
176 } catch (UnknownHostException e) {
177 throw new Exception("remote host [" + host + "]connect fail" + StringUtil.getStackTrace(e));
178 } catch (IOException e) {
179 throw new Exception("create socket IOException " + StringUtil.getStackTrace(e));
182 socket.setSoTimeout(this.read_timeout);
183 socket.setTcpNoDelay(true);
184 socket.setKeepAlive(true);
185 } catch (SocketException e) {
186 throw new Exception(" SocketException " + StringUtil.getStackTrace(e));
189 dos = new BufferedOutputStream(socket.getOutputStream());
191 Msg msg = MessageUtil.putLoginMsg(user,password);
194 log.debug("send login message "+msg.toString(false));
195 MessageUtil.writeMsg(msg,dos);
197 } catch (Exception e) {
198 log.error("send login message is fail "+StringUtil.getStackTrace(e));
201 is = new BufferedInputStream(socket.getInputStream());
203 } catch (SocketException e) {
204 throw new Exception(StringUtil.getStackTrace(e));
208 private boolean ackLoginAlarm(Msg msg) throws Exception {
210 boolean is_success = false;
212 String loginres = msg.getBody();
213 //ackLoginAlarm; result=fail(succ); resDesc=username-error
214 String [] loginbody = loginres.split(";");
215 if(loginbody.length > 1){
216 for(String str :loginbody){
217 if(str.contains("=")){
218 String [] paras1 = str.split("=",-1);
219 if("result".equalsIgnoreCase(paras1[0].trim())){
220 if("succ".equalsIgnoreCase(paras1[1].trim())){
229 log.error("login ack body Incorrect formatbody=" + loginres);
233 } catch (Exception e) {
234 log.error("pocess login ack fail"+StringUtil.getStackTrace(e));
237 log.info("login sucess receive login ack " + msg.getBody());
239 log.error("login fail receive login ack " + msg.getBody());
242 throw new Exception("login fail quit");
247 public void close() {
249 if(heartBeat != null){
250 heartBeat.setStop(true);
256 } catch (IOException e) {
265 } catch (IOException e) {
271 if (socket != null) {
274 } catch (IOException e) {
282 public void reinit() {
285 while(!this.isStop) {
289 Thread.sleep(1000 * 10);
292 } catch (Exception e) {
293 log.error("Number ["+time+"]reconnect ["+collectVo.getIP()+"]fail" );