2 * Copyright 2017 BOCO Corporation. CMCC Technologies Co., Ltd
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onap.vfc.nfvo.emsdriver.collector.alarm;
18 import org.apache.commons.logging.Log;
19 import org.apache.commons.logging.LogFactory;
20 import org.onap.vfc.nfvo.emsdriver.commons.constant.Constant;
21 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectVo;
22 import org.onap.vfc.nfvo.emsdriver.commons.utils.StringUtil;
23 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannel;
24 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannelFactory;
26 import java.io.BufferedInputStream;
27 import java.io.BufferedOutputStream;
28 import java.io.IOException;
29 import java.net.Socket;
30 import java.net.SocketException;
31 import java.net.UnknownHostException;
33 public class AlarmTaskThread extends Thread {
34 private static final Log log = LogFactory.getLog(AlarmTaskThread.class);
36 private HeartBeat heartBeat = null;
37 private boolean isStop = false;
38 private CollectVo collectVo = null;
39 private int readTimeout = Constant.READ_TIMEOUT_MILLISECOND;
41 private Socket socket = null;
42 private BufferedInputStream is = null;
43 private BufferedOutputStream dos = null;
45 private MessageChannel alarmChannel;
47 public AlarmTaskThread() {
51 public AlarmTaskThread(CollectVo collectVo) {
53 this.collectVo = collectVo;
59 alarmChannel = MessageChannelFactory.getMessageChannel(Constant.RESULT_CHANNEL_KEY);
61 while (!this.isStop) {
64 body = this.receive();
65 alarmChannel.put(body);
66 } catch (Exception e) {
67 log.error("alarmChannel.put Exception: ", e);
71 } catch (Exception e) {
72 log.error("run Exception:", e);
76 public String receive() throws IOException {
79 String retString = null;
80 while (retString == null && !this.isStop) {
81 msg = MessageUtil.readOneMsg(is);
82 log.debug("msg = " + msg.toString(true));
83 log.info("msg.getMsgType().name = " + msg.getMsgType().name);
84 if ("ackLoginAlarm".equalsIgnoreCase(msg.getMsgType().name)) {
85 log.debug("receive login ack");
86 boolean suc = this.ackLoginAlarm(msg);
88 if (reqId == Integer.MAX_VALUE)
91 Msg msgheart = MessageUtil.putHeartBeatMsg(reqId);
92 heartBeat = new HeartBeat(socket, msgheart);
93 heartBeat.setName("CMCC_JT_HeartBeat");
100 if ("ackHeartBeat".equalsIgnoreCase(msg.getMsgType().name)) {
101 log.debug("received heartBeat message:" + msg.getBody());
105 if ("realTimeAlarm".equalsIgnoreCase(msg.getMsgType().name)) {
106 log.debug("received alarm message");
107 retString = msg.getBody();
109 if (retString == null) {
116 } catch (Exception e) {
117 log.error("receive Error: ", e);
118 throw new IOException("receive Error: ", e);
122 public void init() throws IOException {
125 String host = collectVo.getIP();
127 String port = collectVo.getPort();
129 String user = collectVo.getUser();
131 String password = collectVo.getPassword();
134 if ((collectVo.getReadTimeout()).trim().length() > 0)
135 this.readTimeout = Integer.parseInt(collectVo.getReadTimeout());
137 } catch (NumberFormatException e) {
138 log.error("Unable to parse read_timout: ", e);
139 throw new NumberFormatException("Unable to parse read_timout: " + e);
142 log.info("socket connect host=" + host + ", port=" + port);
144 int portInt = Integer.parseInt(port);
145 socket = new Socket(host, portInt);
147 } catch (UnknownHostException e) {
148 log.error("remote host [" + host + "]connect fail"
149 + StringUtil.getStackTrace(e));
150 throw new UnknownHostException("remote host [" + host
151 + "]connect fail" + e);
152 } catch (IOException e1) {
153 log.error("create socket IOException ", e1);
154 throw new SocketException("create socket IOException " + e1);
157 socket.setSoTimeout(this.readTimeout);
158 socket.setTcpNoDelay(true);
159 socket.setKeepAlive(true);
160 } catch (SocketException e) {
161 log.error(" SocketException " + StringUtil.getStackTrace(e));
162 throw new SocketException(" SocketException "
163 + StringUtil.getStackTrace(e));
166 dos = new BufferedOutputStream(socket.getOutputStream());
168 Msg msg = MessageUtil.putLoginMsg(user, password);
171 log.debug("send login message " + msg.toString(false));
172 MessageUtil.writeMsg(msg, dos);
174 } catch (Exception e) {
175 log.error("send login message is fail "
176 + StringUtil.getStackTrace(e));
179 is = new BufferedInputStream(socket.getInputStream());
181 } catch (SocketException e) {
182 log.error("SocketException ", e);
183 throw new SocketException("SocketException " + e);
187 private boolean ackLoginAlarm(Msg msg) throws IOException {
190 String loginres = msg.getBody();
191 String[] loginbody = loginres.split(";");
192 if (loginbody.length > 1) {
193 for (String str : loginbody) {
194 if (str.contains("=")) {
195 String[] paras1 = str.split("=", -1);
196 if ("result".equalsIgnoreCase(paras1[0].trim())) {
197 if ("succ".equalsIgnoreCase(paras1[1].trim()))
205 log.error("login ack body Incorrect formatbody=" + loginres);
208 } catch (Exception e) {
209 log.error("pocess login ack fail" + StringUtil.getStackTrace(e));
212 log.info("login sucess receive login ack " + msg.getBody());
214 log.error("login fail receive login ack " + msg.getBody());
217 throw new IOException("pocess login ack fail");
222 public void close() {
223 if (heartBeat != null) {
224 heartBeat.setStop(true);
229 } catch (IOException e) {
230 log.error("Unable to close BufferedInput Stream", e);
238 } catch (IOException e) {
239 log.error("Unable to close BufferedOutput Stream", e);
244 if (socket != null) {
247 } catch (IOException e) {
248 log.error("Unable to close Socket", e);
256 public void reinit() {
259 while (!this.isStop) {
263 Thread.sleep(1000L * 30);
266 } catch (Exception e) {
267 log.error("Number [" + time + "]reconnect ["
268 + collectVo.getIP() + "]fail" + e);
277 public void setStop(boolean isStop) {
278 this.isStop = isStop;
282 * @return the heartBeat
284 public HeartBeat getHeartBeat() {