ffc18229ed831e0d4c07cd846432fc19c1bd8368
[vfc/nfvo/driver/ems.git] / ems / boco / src / main / java / org / onap / vfc / nfvo / emsdriver / collector / alarm / AlarmTaskThread.java
1 /**
2  * Copyright 2017 BOCO Corporation.  CMCC Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onap.vfc.nfvo.emsdriver.collector.alarm;
17
18 import java.io.BufferedInputStream;
19 import java.io.BufferedOutputStream;
20 import java.io.IOException;
21 import java.net.Socket;
22 import java.net.SocketException;
23 import java.net.UnknownHostException;
24 import java.util.Set;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.onap.vfc.nfvo.emsdriver.commons.constant.Constant;
29 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectVo;
30 import org.onap.vfc.nfvo.emsdriver.commons.utils.StringUtil;
31 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannel;
32 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannelFactory;
33
34 import com.alibaba.fastjson.JSONObject;
35
36
37 public class AlarmTaskThread extends Thread{
38         public  Log log = LogFactory.getLog(AlarmTaskThread.class);
39         
40         private HeartBeat heartBeat = null;
41         
42         private boolean isStop = false;
43         private CollectVo collectVo = null;
44         private int read_timeout = Constant.READ_TIMEOUT_MILLISECOND;
45         private int reqId;
46         
47         private Socket socket = null;
48         private BufferedInputStream is = null;
49         private BufferedOutputStream dos = null;
50         
51         private MessageChannel alarmChannel;
52         
53         
54         public AlarmTaskThread() {
55                 super();
56         }
57
58         public AlarmTaskThread(CollectVo collectVo) {
59
60                 this.collectVo = collectVo;
61         }
62
63         public void run() {
64                 alarmChannel = MessageChannelFactory.getMessageChannel(Constant.RESULT_CHANNEL_KEY);
65                 try {
66                         this.init();
67                         while(!this.isStop){
68                                 String body;
69                                 try {
70                                         body = this.receive();
71                                         try {
72                                                 alarmChannel.put(body);
73                                         } catch (InterruptedException e) {
74                                                 log.error(StringUtil.getStackTrace(e));
75                                         }
76                                 } catch (Exception e) {
77                                         e.printStackTrace();
78                                         reinit();
79                                 }
80                         }
81                 } catch (Exception e) {
82                         log.error(StringUtil.getStackTrace(e));
83                 }
84         }
85         
86         
87
88         public String receive() throws Exception {
89
90                 Msg msg =null;
91                 String retString = null;
92                 
93                 while (retString == null && !this.isStop) {
94                         
95                         msg = MessageUtil.readOneMsg(is);
96                         
97                         if("ackLoginAlarm".equalsIgnoreCase(msg.getMsgType().name)){
98                                 log.debug("receive login ack");
99                                 boolean suc = this.ackLoginAlarm(msg);
100                                 if(suc){
101                                         
102                                         if(reqId == Integer.MAX_VALUE){
103                                                 reqId = 0;
104                                         }
105                                         reqId ++;
106                                         Msg  msgheart = MessageUtil.putHeartBeatMsg(reqId);
107                                         heartBeat =  new HeartBeat(socket,msgheart); 
108                                         heartBeat.setName("CMCC_JT_HeartBeat");
109                                         // start heartBeat
110                                         heartBeat.start();
111                                 }
112                                 retString = null;
113                         }
114                         
115                         if("ackHeartBeat".equalsIgnoreCase(msg.getMsgType().name)){
116                                 log.debug("received heartBeat message:"+msg.getBody());
117                                 retString = null;
118                         }
119                         
120                         
121                         
122                         if("realTimeAlarm".equalsIgnoreCase(msg.getMsgType().name)){
123                                 log.debug("received alarm message");
124                                 retString =  msg.getBody();
125                         }
126                         
127                         if(retString == null){
128                                 Thread.sleep(100);
129                         }
130                 }
131                 return retString;
132         }
133         
134         public void init() throws Exception {
135                 isStop = false;
136                 //host
137                 String host = collectVo.getIP();
138                 //port
139                 String port = collectVo.getPort();
140                 //user
141                 String user = collectVo.getUser();
142                 //password
143                 String password = collectVo.getPassword();
144                 
145                 String read_timeout = collectVo.getRead_timeout();
146                 if ((read_timeout != null) && (read_timeout.trim().length() > 0)) {
147                       try {
148                         this.read_timeout = Integer.parseInt(read_timeout);
149                       } catch (NumberFormatException e) {
150                         log.error(StringUtil.getStackTrace(e));
151                       }
152                     }
153                 log.debug("socket connect host=" + host + ", port=" + port);
154                 try {
155                         int portInt = Integer.parseInt(port);
156                         socket = new Socket(host, portInt);
157                         
158                 } catch (UnknownHostException e) {
159                         throw new Exception("remote host [" + host + "]connect fail" + StringUtil.getStackTrace(e));
160                 } catch (IOException e) {
161                         throw new Exception("create socket IOException " + StringUtil.getStackTrace(e));
162                 }
163                 try {
164                         socket.setSoTimeout(this.read_timeout);
165                         socket.setTcpNoDelay(true);
166                         socket.setKeepAlive(true);
167                 } catch (SocketException e) {
168                         throw new Exception(" SocketException " + StringUtil.getStackTrace(e));
169                 }
170                 try {
171                         dos = new BufferedOutputStream(socket.getOutputStream());
172                         
173                         Msg  msg = MessageUtil.putLoginMsg(user,password);
174                         
175                         try {
176                                 log.debug("send login message "+msg.toString(false));
177                                 MessageUtil.writeMsg(msg,dos);
178                                 
179                         } catch (Exception e) {
180                                 log.error("send login message is fail "+StringUtil.getStackTrace(e));
181                         }
182
183                         is = new BufferedInputStream(socket.getInputStream());
184                         
185                 } catch (SocketException e) {
186                         throw new Exception(StringUtil.getStackTrace(e));
187                 }
188         }
189         
190         private boolean ackLoginAlarm(Msg msg) throws Exception {
191                 
192                 boolean is_success = false;
193                 try {
194                         String loginres = msg.getBody();
195                         //ackLoginAlarm; result=fail(succ); resDesc=username-error
196                         String [] loginbody = loginres.split(";");
197                         if(loginbody.length > 1){
198                                 for(String str :loginbody){
199                             if(str.contains("=")){
200                                 String [] paras1 = str.split("=",-1);
201                                 if("result".equalsIgnoreCase(paras1[0].trim())){
202                                                         if("succ".equalsIgnoreCase(paras1[1].trim())){
203                                                                 is_success = true;
204                                                         }else{
205                                                                 is_success = false;
206                                                         }
207                                                 }
208                             }
209                                 }
210                         }else {
211                                 log.error("login ack body Incorrect formatbody=" + loginres);
212                         }
213                         
214                         
215                 } catch (Exception e) {
216                         log.error("pocess login ack fail"+StringUtil.getStackTrace(e));
217                 }
218                 if (is_success) {
219                         log.info("login sucess receive login ack " + msg.getBody());
220                 } else {
221                         log.error("login fail receive login ack  " + msg.getBody());
222                         this.close();
223                         this.isStop = true;
224                         throw new Exception("login fail quit");
225                 }
226                 return is_success;
227         }
228
229         public void close() {
230
231                 if(heartBeat != null){
232                         heartBeat.setStop(true);
233                 }
234                 
235                 if (is != null) {
236                         try {
237                                 is.close();
238                         } catch (IOException e) {
239                         } finally {
240                                 is = null;
241                         }
242                 }
243
244                 if (dos != null) {
245                         try {
246                                 dos.close();
247                         } catch (IOException e) {
248                         } finally {
249                                 dos = null;
250                         }
251                 }
252
253                 if (socket != null) {
254                         try {
255                                 socket.close();
256                         } catch (IOException e) {
257                         } finally {
258                                 socket = null;
259                         }
260
261                 }
262         }
263         
264         public void  reinit() {
265                 int time = 0;
266                 close();
267                 while(!this.isStop) {
268                         close();
269                         time++;
270                         try {
271                                 Thread.sleep(1000 * 10);
272                                 init();
273                                 return;
274                         } catch (Exception e) {
275                                 log.error("Number ["+time+"]reconnect ["+collectVo.getIP()+"]fail" );
276                         }
277                 }
278         }
279
280         /**
281          * @param isStop the isStop to set
282          */
283         public void setStop(boolean isStop) {
284                 this.isStop = isStop;
285         }
286
287         /**
288          * @return the heartBeat
289          */
290         public HeartBeat getHeartBeat() {
291                 return heartBeat;
292         }
293         
294         
295         
296 }