add test case
[vfc/nfvo/driver/ems.git] / ems / boco / src / main / java / org / onap / vfc / nfvo / emsdriver / collector / alarm / AlarmTaskThread.java
1 /**
2  * Copyright 2017 BOCO Corporation.  CMCC Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onap.vfc.nfvo.emsdriver.collector.alarm;
17
18 import java.io.BufferedInputStream;
19 import java.io.BufferedOutputStream;
20 import java.io.IOException;
21 import java.net.Socket;
22 import java.net.SocketException;
23 import java.net.UnknownHostException;
24 import java.util.Set;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.onap.vfc.nfvo.emsdriver.commons.constant.Constant;
29 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectVo;
30 import org.onap.vfc.nfvo.emsdriver.commons.utils.StringUtil;
31 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannel;
32 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannelFactory;
33
34 import com.alibaba.fastjson.JSONObject;
35
36
37 public class AlarmTaskThread extends Thread{
38         public  Log log = LogFactory.getLog(AlarmTaskThread.class);
39         
40         private HeartBeat heartBeat = null;
41         
42         private boolean isStop = false;
43         private CollectVo collectVo = null;
44         private int read_timeout = Constant.READ_TIMEOUT_MILLISECOND;
45         private int reqId;
46         
47         private Socket socket = null;
48         private BufferedInputStream is = null;
49         private BufferedOutputStream dos = null;
50         
51         private MessageChannel alarmChannel;
52         
53         
54         public AlarmTaskThread() {
55                 super();
56         }
57
58         public AlarmTaskThread(CollectVo collectVo) {
59
60                 this.collectVo = collectVo;
61         }
62
63         public void run() {
64                 alarmChannel = MessageChannelFactory.getMessageChannel(Constant.ALARM_CHANNEL_KEY);
65                 try {
66                         this.init();
67                         while(!this.isStop){
68                                 String body;
69                                 try {
70                                         body = this.receive();
71                                         String alarm120 = this.build120Alarm(body);
72                                         
73                                         this.send120Alarm(alarm120);
74                                 } catch (Exception e) {
75                                         reinit();
76                                 }
77                         }
78                 } catch (Exception e) {
79                         log.error(StringUtil.getStackTrace(e));
80                 }
81         }
82         
83         private void send120Alarm(String alarm120) {
84                 
85                 try {
86                         alarmChannel.put(alarm120);
87                 } catch (InterruptedException e) {
88                         log.error(StringUtil.getStackTrace(e));
89                 }
90         }
91
92         public String  build120Alarm(String body) {
93                 StringBuilder content = new StringBuilder(
94                 "<?xml version='1.0' encoding='iso-8859-1'?>\n")
95                 .append("<WholeMsg MsgMark='120' Priority='2' FieldNum='5'><FM_ALARM_MSG>\n");
96         
97         
98                 JSONObject reagobj = JSONObject.parseObject(body);
99                 
100                 Set<String> keys = reagobj.keySet();
101                 
102                 for (String key : keys) {
103                         
104                         String value = reagobj.get(key).toString();
105                         content.append("<").append(key).append(">");
106                     content.append(value);
107                     content.append("</").append(key).append(">\n");
108                 }
109                 content.append("</FM_ALARM_MSG></WholeMsg>");
110
111                 return content.toString();
112                 
113         }
114
115         public String receive() throws Exception {
116
117                 Msg msg =null;
118                 String retString = null;
119                 
120                 while (retString == null && !this.isStop) {
121                         
122                         msg = MessageUtil.readOneMsg(is);
123                         
124                         if("ackLoginAlarm".equalsIgnoreCase(msg.getMsgType().name)){
125                                 log.debug("receive login ack");
126                                 boolean suc = this.ackLoginAlarm(msg);
127                                 if(suc){
128                                         
129                                         if(reqId == Integer.MAX_VALUE){
130                                                 reqId = 0;
131                                         }
132                                         reqId ++;
133                                         Msg  msgheart = MessageUtil.putHeartBeatMsg(reqId);
134                                         heartBeat =  new HeartBeat(socket,msgheart); 
135                                         heartBeat.setName("CMCC_JT_HeartBeat");
136                                         // start heartBeat
137                                         heartBeat.start();
138                                 }
139                                 retString = null;
140                         }
141                         
142                         if("ackHeartBeat".equalsIgnoreCase(msg.getMsgType().name)){
143                                 log.debug("received heartBeat message:"+msg.getBody());
144                                 retString = null;
145                         }
146                         
147                         
148                         
149                         if("realTimeAlarm".equalsIgnoreCase(msg.getMsgType().name)){
150                                 log.debug("received alarm message");
151                                 retString =  msg.getBody();
152                         }
153                 }
154                 return retString;
155         }
156         
157         public void init() throws Exception {
158                 isStop = false;
159                 //host
160                 String host = collectVo.getIP();
161                 //port
162                 String port = collectVo.getPort();
163                 //user
164                 String user = collectVo.getUser();
165                 //password
166                 String password = collectVo.getPassword();
167                 
168                 String read_timeout = collectVo.getRead_timeout();
169                 if ((read_timeout != null) && (read_timeout.trim().length() > 0)) {
170                       try {
171                         this.read_timeout = Integer.parseInt(read_timeout);
172                       } catch (NumberFormatException e) {
173                         log.error(StringUtil.getStackTrace(e));
174                       }
175                     }
176                 log.debug("socket connect host=" + host + ", port=" + port);
177                 try {
178                         int portInt = Integer.parseInt(port);
179                         socket = new Socket(host, portInt);
180                         
181                 } catch (UnknownHostException e) {
182                         throw new Exception("remote host [" + host + "]connect fail" + StringUtil.getStackTrace(e));
183                 } catch (IOException e) {
184                         throw new Exception("create socket IOException " + StringUtil.getStackTrace(e));
185                 }
186                 try {
187                         socket.setSoTimeout(this.read_timeout);
188                         socket.setTcpNoDelay(true);
189                         socket.setKeepAlive(true);
190                 } catch (SocketException e) {
191                         throw new Exception(" SocketException " + StringUtil.getStackTrace(e));
192                 }
193                 try {
194                         dos = new BufferedOutputStream(socket.getOutputStream());
195                         
196                         Msg  msg = MessageUtil.putLoginMsg(user,password);
197                         
198                         try {
199                                 log.debug("send login message "+msg.toString(false));
200                                 MessageUtil.writeMsg(msg,dos);
201                                 
202                         } catch (Exception e) {
203                                 log.error("send login message is fail "+StringUtil.getStackTrace(e));
204                         }
205
206                         is = new BufferedInputStream(socket.getInputStream());
207                         
208                 } catch (SocketException e) {
209                         throw new Exception(StringUtil.getStackTrace(e));
210                 }
211         }
212         
213         private boolean ackLoginAlarm(Msg msg) throws Exception {
214                 
215                 boolean is_success = false;
216                 try {
217                         String loginres = msg.getBody();
218                         //ackLoginAlarm; result=fail(succ); resDesc=username-error
219                         String [] loginbody = loginres.split(";");
220                         if(loginbody.length > 1){
221                                 for(String str :loginbody){
222                             if(str.contains("=")){
223                                 String [] paras1 = str.split("=",-1);
224                                 if("result".equalsIgnoreCase(paras1[0].trim())){
225                                                         if("succ".equalsIgnoreCase(paras1[1].trim())){
226                                                                 is_success = true;
227                                                         }else{
228                                                                 is_success = false;
229                                                         }
230                                                 }
231                             }
232                                 }
233                         }else {
234                                 log.error("login ack body Incorrect formatbody=" + loginres);
235                         }
236                         
237                         
238                 } catch (Exception e) {
239                         log.error("pocess login ack fail"+StringUtil.getStackTrace(e));
240                 }
241                 if (is_success) {
242                         log.info("login sucess receive login ack " + msg.getBody());
243                 } else {
244                         log.error("login fail receive login ack  " + msg.getBody());
245                         this.close();
246                         this.isStop = true;
247                         throw new Exception("login fail quit");
248                 }
249                 return is_success;
250         }
251
252         public void close() {
253
254                 if(heartBeat != null){
255                         heartBeat.setStop(true);
256                 }
257                 
258                 if (is != null) {
259                         try {
260                                 is.close();
261                         } catch (IOException e) {
262                         } finally {
263                                 is = null;
264                         }
265                 }
266
267                 if (dos != null) {
268                         try {
269                                 dos.close();
270                         } catch (IOException e) {
271                         } finally {
272                                 dos = null;
273                         }
274                 }
275
276                 if (socket != null) {
277                         try {
278                                 socket.close();
279                         } catch (IOException e) {
280                         } finally {
281                                 socket = null;
282                         }
283
284                 }
285         }
286         
287         public void  reinit() {
288                 int time = 0;
289                 close();
290                 while(!this.isStop) {
291                         close();
292                         time++;
293                         try {
294                                 Thread.sleep(1000 * 10);
295                                 init();
296                                 return;
297                         } catch (Exception e) {
298                                 log.error("Number ["+time+"]reconnect ["+collectVo.getIP()+"]fail" );
299                         }
300                 }
301         }
302 }