Update ves-agent dependency
[vfc/nfvo/driver/ems.git] / ems / boco / src / main / java / org / onap / vfc / nfvo / emsdriver / collector / alarm / AlarmTaskThread.java
1 /*
2  * Copyright 2017 BOCO Corporation.  CMCC Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onap.vfc.nfvo.emsdriver.collector.alarm;
17
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20 import org.onap.vfc.nfvo.emsdriver.commons.constant.Constant;
21 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectVo;
22 import org.onap.vfc.nfvo.emsdriver.commons.utils.StringUtil;
23 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannel;
24 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannelFactory;
25
26 import java.io.BufferedInputStream;
27 import java.io.BufferedOutputStream;
28 import java.io.IOException;
29 import java.net.Socket;
30 import java.net.SocketException;
31 import java.net.UnknownHostException;
32
33 public class AlarmTaskThread extends Thread {
34         private static final Logger log = LoggerFactory.getLogger(AlarmTaskThread.class);
35
36         private HeartBeat heartBeat = null;
37         private boolean isStop = false;
38         private CollectVo collectVo = null;
39         private int readTimeout = Constant.READ_TIMEOUT_MILLISECOND;
40         private int reqId;
41         private Socket socket = null;
42         private BufferedInputStream is = null;
43         private BufferedOutputStream dos = null;
44
45         private MessageChannel alarmChannel;
46
47         public AlarmTaskThread() {
48                 super();
49         }
50
51         public AlarmTaskThread(CollectVo collectVo) {
52
53                 this.collectVo = collectVo;
54         }
55
56         @Override
57         public void run() {
58                 try {
59                         alarmChannel = MessageChannelFactory.getMessageChannel(Constant.RESULT_CHANNEL_KEY);
60                         this.init();
61                         while (!this.isStop) {
62                                 String body;
63                                 try {
64                                         body = this.receive();
65                                         alarmChannel.put(body);
66                                 } catch (Exception e) {
67                                         log.error("alarmChannel.put Exception: ", e);
68                                         reinit();
69                                 }
70                         }
71                 } catch (Exception e) {
72                         log.error("run Exception:", e);
73                 }
74         }
75
76         public String receive() throws IOException {
77                 try {
78                         Msg msg = null;
79                         String retString = null;
80                         while (retString == null && !this.isStop) {
81                                 msg = MessageUtil.readOneMsg(is);
82                                 log.debug("msg = " + msg.toString(true));
83                                 log.info("msg.getMsgType().name = " + msg.getMsgType().name);
84                                 if ("ackLoginAlarm".equalsIgnoreCase(msg.getMsgType().name)) {
85                                         log.debug("receive login ack");
86                                         boolean suc = this.ackLoginAlarm(msg);
87                                         if (suc) {
88                                                 if (reqId == Integer.MAX_VALUE)
89                                                         reqId = 0;
90                                                 reqId++;
91                                                 Msg msgheart = MessageUtil.putHeartBeatMsg(reqId);
92                                                 heartBeat = new HeartBeat(socket, msgheart);
93                                                 heartBeat.setName("CMCC_JT_HeartBeat");
94                                                 // start heartBeat
95                                                 heartBeat.start();
96                                         }
97                                         retString = null;
98                                 }
99
100                                 if ("ackHeartBeat".equalsIgnoreCase(msg.getMsgType().name)) {
101                                         log.debug("received heartBeat message:" + msg.getBody());
102                                         retString = null;
103                                 }
104
105                                 if ("realTimeAlarm".equalsIgnoreCase(msg.getMsgType().name)) {
106                                         log.debug("received alarm message");
107                                         retString = msg.getBody();
108                                 }
109                                 if (retString == null) {
110                                         Thread.sleep(100);
111                                 }
112
113                         }// while
114                         return retString;
115
116                 } catch (Exception e) {
117                         log.error("receive Error: ", e);
118                         throw new IOException("receive Error: ", e);
119                 }
120         }
121
122         public void init() throws IOException {
123                 isStop = false;
124                 // host
125                 String host = collectVo.getIP();
126                 // port
127                 String port = collectVo.getPort();
128                 // user
129                 String user = collectVo.getUser();
130                 // password
131                 String password = collectVo.getPassword();
132
133                 try {
134                         if ((collectVo.getReadTimeout()).trim().length() > 0)
135                                 this.readTimeout = Integer.parseInt(collectVo.getReadTimeout());
136
137                 } catch (NumberFormatException e) {
138                         log.error("Unable to parse read_timout: ", e);
139                         throw new NumberFormatException("Unable to parse read_timout: " + e);
140                 }
141
142                 log.info("socket connect host=" + host + ", port=" + port);
143                 try {
144                         int portInt = Integer.parseInt(port);
145                         socket = new Socket(host, portInt);
146
147                 } catch (UnknownHostException e) {
148                         log.error("remote host [" + host + "]connect fail"
149                                         + StringUtil.getStackTrace(e));
150                         throw new UnknownHostException("remote host [" + host
151                                         + "]connect fail" + e);
152                 } catch (IOException e1) {
153                         log.error("create socket IOException ", e1);
154                         throw new SocketException("create socket IOException " + e1);
155                 }
156                 try {
157                         socket.setSoTimeout(this.readTimeout);
158                         socket.setTcpNoDelay(true);
159                         socket.setKeepAlive(true);
160                 } catch (SocketException e) {
161                         log.error(" SocketException " + StringUtil.getStackTrace(e));
162                         throw new SocketException(" SocketException "
163                                         + StringUtil.getStackTrace(e));
164                 }
165                 try {
166                         dos = new BufferedOutputStream(socket.getOutputStream());
167
168                         Msg msg = MessageUtil.putLoginMsg(user, password);
169
170                         try {
171                                 log.debug("send login message " + msg.toString(false));
172                                 MessageUtil.writeMsg(msg, dos);
173
174                         } catch (Exception e) {
175                                 log.error("send login message is fail "
176                                                 + StringUtil.getStackTrace(e));
177                         }
178
179                         is = new BufferedInputStream(socket.getInputStream());
180
181                 } catch (SocketException e) {
182                         log.error("SocketException ", e);
183                         throw new SocketException("SocketException " + e);
184                 }
185         }
186
187         private boolean ackLoginAlarm(Msg msg) throws IOException {
188                 boolean ret = false;
189                 try {
190                         String loginres = msg.getBody();
191                         String[] loginbody = loginres.split(";");
192                         if (loginbody.length > 1) {
193                                 for (String str : loginbody) {
194                                         if (str.contains("=")) {
195                                                 String[] paras1 = str.split("=", -1);
196                                                 if ("result".equalsIgnoreCase(paras1[0].trim())) {
197                                                         if ("succ".equalsIgnoreCase(paras1[1].trim()))
198                                                                 ret = true;
199                                                         else
200                                                                 ret = false;
201                                                 }
202                                         }
203                                 }
204                         } else {
205                                 log.error("login ack body Incorrect formatbody=" + loginres);
206                         }
207
208                 } catch (Exception e) {
209                         log.error("pocess login ack fail" + StringUtil.getStackTrace(e));
210                 }
211                 if (ret) {
212                         log.info("login sucess receive login ack " + msg.getBody());
213                 } else {
214                         log.error("login fail receive login ack  " + msg.getBody());
215                         this.close();
216                         this.isStop = true;
217                         throw new IOException("pocess login ack fail");
218                 }
219                 return ret;
220         }
221
222         public void close() {
223                 if (heartBeat != null) {
224                         heartBeat.setStop(true);
225                 }
226                 if (is != null) {
227                         try {
228                                 is.close();
229                         } catch (IOException e) {
230                                 log.error("Unable to close BufferedInput Stream", e);
231                         } finally {
232                                 is = null;
233                         }
234                 }
235                 if (dos != null) {
236                         try {
237                                 dos.close();
238                         } catch (IOException e) {
239                                 log.error("Unable to close BufferedOutput Stream", e);
240                         } finally {
241                                 dos = null;
242                         }
243                 }
244                 if (socket != null) {
245                         try {
246                                 socket.close();
247                         } catch (IOException e) {
248                                 log.error("Unable to close Socket", e);
249                         } finally {
250                                 socket = null;
251                         }
252
253                 }
254         }
255
256         public void reinit() {
257                 int time = 0;
258                 close();
259                 while (!this.isStop) {
260                         close();
261                         time++;
262                         try {
263                                 Thread.sleep(1000L * 30);
264                                 init();
265                                 return;
266                         } catch (Exception e) {
267                                 log.error("Number [" + time + "]reconnect ["
268                                                 + collectVo.getIP() + "]fail" + e);
269                         }
270                 }
271         }
272
273         /**
274          * @param isStop
275          *            the isStop to set
276          */
277         public void setStop(boolean isStop) {
278                 this.isStop = isStop;
279         }
280
281         /**
282          * @return the heartBeat
283          */
284         public HeartBeat getHeartBeat() {
285                 return heartBeat;
286         }
287
288 }