Modify emsdriver Code
[vfc/nfvo/driver/ems.git] / ems / boco / src / main / java / org / onap / vfc / nfvo / emsdriver / collector / alarm / AlarmTaskThread.java
1 /**
2  * Copyright 2017 BOCO Corporation.  CMCC Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onap.vfc.nfvo.emsdriver.collector.alarm;
17
18 import java.io.BufferedInputStream;
19 import java.io.BufferedOutputStream;
20 import java.io.IOException;
21 import java.net.Socket;
22 import java.net.SocketException;
23 import java.net.UnknownHostException;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.onap.vfc.nfvo.emsdriver.commons.constant.Constant;
28 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectVo;
29 import org.onap.vfc.nfvo.emsdriver.commons.utils.StringUtil;
30 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannel;
31 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannelFactory;
32
33
34 public class AlarmTaskThread extends Thread{
35         public  Log log = LogFactory.getLog(AlarmTaskThread.class);
36         
37         private HeartBeat heartBeat = null;
38         
39         private boolean isStop = false;
40         private CollectVo collectVo = null;
41         private int read_timeout = Constant.READ_TIMEOUT_MILLISECOND;
42         private int reqId;
43         
44         private Socket socket = null;
45         private BufferedInputStream is = null;
46         private BufferedOutputStream dos = null;
47         
48         private MessageChannel alarmChannel;
49         
50         
51         public AlarmTaskThread() {
52                 super();
53         }
54
55         public AlarmTaskThread(CollectVo collectVo) {
56
57                 this.collectVo = collectVo;
58         }
59
60         public void run() {
61                 alarmChannel = MessageChannelFactory.getMessageChannel(Constant.RESULT_CHANNEL_KEY);
62                 try {
63                         this.init();
64                         while(!this.isStop){
65                                 String body;
66                                 try {
67                                         body = this.receive();
68                                         try {
69                                                 alarmChannel.put(body);
70                                         } catch (InterruptedException e) {
71                                                 log.error(StringUtil.getStackTrace(e));
72                                         }
73                                 } catch (Exception e) {
74                                         e.printStackTrace();
75                                         reinit();
76                                 }
77                         }
78                 } catch (Exception e) {
79                         log.error(StringUtil.getStackTrace(e));
80                 }
81         }
82         
83         
84
85         public String receive() throws Exception {
86
87                 Msg msg =null;
88                 String retString = null;
89                 
90                 while (retString == null && !this.isStop) {
91                         
92                         msg = MessageUtil.readOneMsg(is);
93                         log.debug("msg = "+msg.toString(true));
94                         log.info("msg.getMsgType().name = "+msg.getMsgType().name);
95                         if("ackLoginAlarm".equalsIgnoreCase(msg.getMsgType().name)){
96                                 log.debug("receive login ack");
97                                 boolean suc = this.ackLoginAlarm(msg);
98                                 if(suc){
99                                         
100                                         if(reqId == Integer.MAX_VALUE){
101                                                 reqId = 0;
102                                         }
103                                         reqId ++;
104                                         Msg  msgheart = MessageUtil.putHeartBeatMsg(reqId);
105                                         heartBeat =  new HeartBeat(socket,msgheart); 
106                                         heartBeat.setName("CMCC_JT_HeartBeat");
107                                         // start heartBeat
108                                         heartBeat.start();
109                                 }
110                                 retString = null;
111                         }
112                         
113                         if("ackHeartBeat".equalsIgnoreCase(msg.getMsgType().name)){
114                                 log.debug("received heartBeat message:"+msg.getBody());
115                                 retString = null;
116                         }
117                         
118                         
119                         
120                         if("realTimeAlarm".equalsIgnoreCase(msg.getMsgType().name)){
121                                 log.debug("received alarm message");
122                                 retString =  msg.getBody();
123                         }
124                         
125                         if(retString == null){
126                                 Thread.sleep(100);
127                         }
128                 }
129                 return retString;
130         }
131         
132         public void init() throws Exception {
133                 isStop = false;
134                 //host
135                 String host = collectVo.getIP();
136                 //port
137                 String port = collectVo.getPort();
138                 //user
139                 String user = collectVo.getUser();
140                 //password
141                 String password = collectVo.getPassword();
142                 
143                 String read_timeout = collectVo.getRead_timeout();
144                 if ((read_timeout != null) && (read_timeout.trim().length() > 0)) {
145                       try {
146                         this.read_timeout = Integer.parseInt(read_timeout);
147                       } catch (NumberFormatException e) {
148                         log.error(StringUtil.getStackTrace(e));
149                       }
150                     }
151                 log.info("socket connect host=" + host + ", port=" + port);
152                 try {
153                         int portInt = Integer.parseInt(port);
154                         socket = new Socket(host, portInt);
155                         
156                 } catch (UnknownHostException e) {
157                         throw new Exception("remote host [" + host + "]connect fail" + StringUtil.getStackTrace(e));
158                 } catch (IOException e) {
159                         throw new Exception("create socket IOException " + StringUtil.getStackTrace(e));
160                 }
161                 try {
162                         socket.setSoTimeout(this.read_timeout);
163                         socket.setTcpNoDelay(true);
164                         socket.setKeepAlive(true);
165                 } catch (SocketException e) {
166                         throw new Exception(" SocketException " + StringUtil.getStackTrace(e));
167                 }
168                 try {
169                         dos = new BufferedOutputStream(socket.getOutputStream());
170                         
171                         Msg  msg = MessageUtil.putLoginMsg(user,password);
172                         
173                         try {
174                                 log.debug("send login message "+msg.toString(false));
175                                 MessageUtil.writeMsg(msg,dos);
176                                 
177                         } catch (Exception e) {
178                                 log.error("send login message is fail "+StringUtil.getStackTrace(e));
179                         }
180
181                         is = new BufferedInputStream(socket.getInputStream());
182                         
183                 } catch (SocketException e) {
184                         throw new Exception(StringUtil.getStackTrace(e));
185                 }
186         }
187         
188         private boolean ackLoginAlarm(Msg msg) throws Exception {
189                 
190                 boolean is_success = false;
191                 try {
192                         String loginres = msg.getBody();
193                         //ackLoginAlarm; result=fail(succ); resDesc=username-error
194                         String [] loginbody = loginres.split(";");
195                         if(loginbody.length > 1){
196                                 for(String str :loginbody){
197                             if(str.contains("=")){
198                                 String [] paras1 = str.split("=",-1);
199                                 if("result".equalsIgnoreCase(paras1[0].trim())){
200                                                         if("succ".equalsIgnoreCase(paras1[1].trim())){
201                                                                 is_success = true;
202                                                         }else{
203                                                                 is_success = false;
204                                                         }
205                                                 }
206                             }
207                                 }
208                         }else {
209                                 log.error("login ack body Incorrect formatbody=" + loginres);
210                         }
211                         
212                         
213                 } catch (Exception e) {
214                         log.error("pocess login ack fail"+StringUtil.getStackTrace(e));
215                 }
216                 if (is_success) {
217                         log.info("login sucess receive login ack " + msg.getBody());
218                 } else {
219                         log.error("login fail receive login ack  " + msg.getBody());
220                         this.close();
221                         this.isStop = true;
222                         throw new Exception("login fail quit");
223                 }
224                 return is_success;
225         }
226
227         public void close() {
228
229                 if(heartBeat != null){
230                         heartBeat.setStop(true);
231                 }
232                 
233                 if (is != null) {
234                         try {
235                                 is.close();
236                         } catch (IOException e) {
237                         } finally {
238                                 is = null;
239                         }
240                 }
241
242                 if (dos != null) {
243                         try {
244                                 dos.close();
245                         } catch (IOException e) {
246                         } finally {
247                                 dos = null;
248                         }
249                 }
250
251                 if (socket != null) {
252                         try {
253                                 socket.close();
254                         } catch (IOException e) {
255                         } finally {
256                                 socket = null;
257                         }
258
259                 }
260         }
261         
262         public void  reinit() {
263                 int time = 0;
264                 close();
265                 while(!this.isStop) {
266                         close();
267                         time++;
268                         try {
269                                 Thread.sleep(1000 * 30);
270                                 init();
271                                 return;
272                         } catch (Exception e) {
273                                 log.error("Number ["+time+"]reconnect ["+collectVo.getIP()+"]fail" );
274                         }
275                 }
276         }
277
278         /**
279          * @param isStop the isStop to set
280          */
281         public void setStop(boolean isStop) {
282                 this.isStop = isStop;
283         }
284
285         /**
286          * @return the heartBeat
287          */
288         public HeartBeat getHeartBeat() {
289                 return heartBeat;
290         }
291         
292         
293         
294 }