2 * Copyright 2017 BOCO Corporation. CMCC Technologies Co., Ltd
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onap.vfc.nfvo.emsdriver.collector.alarm;
18 import java.io.BufferedInputStream;
19 import java.io.BufferedOutputStream;
20 import java.io.IOException;
21 import java.net.Socket;
22 import java.net.SocketException;
23 import java.net.UnknownHostException;
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.onap.vfc.nfvo.emsdriver.commons.constant.Constant;
28 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectVo;
29 import org.onap.vfc.nfvo.emsdriver.commons.utils.StringUtil;
30 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannel;
31 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannelFactory;
34 public class AlarmTaskThread extends Thread{
35 public Log log = LogFactory.getLog(AlarmTaskThread.class);
37 private HeartBeat heartBeat = null;
39 private boolean isStop = false;
40 private CollectVo collectVo = null;
41 private int read_timeout = Constant.READ_TIMEOUT_MILLISECOND;
44 private Socket socket = null;
45 private BufferedInputStream is = null;
46 private BufferedOutputStream dos = null;
48 private MessageChannel alarmChannel;
51 public AlarmTaskThread() {
55 public AlarmTaskThread(CollectVo collectVo) {
57 this.collectVo = collectVo;
61 alarmChannel = MessageChannelFactory.getMessageChannel(Constant.RESULT_CHANNEL_KEY);
67 body = this.receive();
69 alarmChannel.put(body);
70 } catch (InterruptedException e) {
71 log.error(StringUtil.getStackTrace(e));
73 } catch (Exception e) {
78 } catch (Exception e) {
79 log.error(StringUtil.getStackTrace(e));
85 public String receive() throws Exception {
88 String retString = null;
90 while (retString == null && !this.isStop) {
92 msg = MessageUtil.readOneMsg(is);
93 log.debug("msg = "+msg.toString(true));
94 log.info("msg.getMsgType().name = "+msg.getMsgType().name);
95 if("ackLoginAlarm".equalsIgnoreCase(msg.getMsgType().name)){
96 log.debug("receive login ack");
97 boolean suc = this.ackLoginAlarm(msg);
100 if(reqId == Integer.MAX_VALUE){
104 Msg msgheart = MessageUtil.putHeartBeatMsg(reqId);
105 heartBeat = new HeartBeat(socket,msgheart);
106 heartBeat.setName("CMCC_JT_HeartBeat");
113 if("ackHeartBeat".equalsIgnoreCase(msg.getMsgType().name)){
114 log.debug("received heartBeat message:"+msg.getBody());
120 if("realTimeAlarm".equalsIgnoreCase(msg.getMsgType().name)){
121 log.debug("received alarm message");
122 retString = msg.getBody();
125 if(retString == null){
132 public void init() throws Exception {
135 String host = collectVo.getIP();
137 String port = collectVo.getPort();
139 String user = collectVo.getUser();
141 String password = collectVo.getPassword();
143 String read_timeout = collectVo.getRead_timeout();
144 if ((read_timeout != null) && (read_timeout.trim().length() > 0)) {
146 this.read_timeout = Integer.parseInt(read_timeout);
147 } catch (NumberFormatException e) {
148 log.error(StringUtil.getStackTrace(e));
151 log.info("socket connect host=" + host + ", port=" + port);
153 int portInt = Integer.parseInt(port);
154 socket = new Socket(host, portInt);
156 } catch (UnknownHostException e) {
157 throw new Exception("remote host [" + host + "]connect fail" + StringUtil.getStackTrace(e));
158 } catch (IOException e) {
159 throw new Exception("create socket IOException " + StringUtil.getStackTrace(e));
162 socket.setSoTimeout(this.read_timeout);
163 socket.setTcpNoDelay(true);
164 socket.setKeepAlive(true);
165 } catch (SocketException e) {
166 throw new Exception(" SocketException " + StringUtil.getStackTrace(e));
169 dos = new BufferedOutputStream(socket.getOutputStream());
171 Msg msg = MessageUtil.putLoginMsg(user,password);
174 log.debug("send login message "+msg.toString(false));
175 MessageUtil.writeMsg(msg,dos);
177 } catch (Exception e) {
178 log.error("send login message is fail "+StringUtil.getStackTrace(e));
181 is = new BufferedInputStream(socket.getInputStream());
183 } catch (SocketException e) {
184 throw new Exception(StringUtil.getStackTrace(e));
188 private boolean ackLoginAlarm(Msg msg) throws Exception {
190 boolean is_success = false;
192 String loginres = msg.getBody();
193 //ackLoginAlarm; result=fail(succ); resDesc=username-error
194 String [] loginbody = loginres.split(";");
195 if(loginbody.length > 1){
196 for(String str :loginbody){
197 if(str.contains("=")){
198 String [] paras1 = str.split("=",-1);
199 if("result".equalsIgnoreCase(paras1[0].trim())){
200 if("succ".equalsIgnoreCase(paras1[1].trim())){
209 log.error("login ack body Incorrect formatbody=" + loginres);
213 } catch (Exception e) {
214 log.error("pocess login ack fail"+StringUtil.getStackTrace(e));
217 log.info("login sucess receive login ack " + msg.getBody());
219 log.error("login fail receive login ack " + msg.getBody());
222 throw new Exception("login fail quit");
227 public void close() {
229 if(heartBeat != null){
230 heartBeat.setStop(true);
236 } catch (IOException e) {
245 } catch (IOException e) {
251 if (socket != null) {
254 } catch (IOException e) {
262 public void reinit() {
265 while(!this.isStop) {
269 Thread.sleep(1000 * 30);
272 } catch (Exception e) {
273 log.error("Number ["+time+"]reconnect ["+collectVo.getIP()+"]fail" );
279 * @param isStop the isStop to set
281 public void setStop(boolean isStop) {
282 this.isStop = isStop;
286 * @return the heartBeat
288 public HeartBeat getHeartBeat() {