c665463eabfb74e4509f12d123091f090b2e874b
[vfc/nfvo/driver/ems.git] / ems / boco / src / main / java / org / onap / vfc / nfvo / emsdriver / collector / alarm / AlarmTaskThread.java
1 /**
2  * Copyright 2017 BOCO Corporation.  CMCC Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onap.vfc.nfvo.emsdriver.collector.alarm;
17
18 import java.io.BufferedInputStream;
19 import java.io.BufferedOutputStream;
20 import java.io.IOException;
21 import java.net.Socket;
22 import java.net.SocketException;
23 import java.net.UnknownHostException;
24 import java.util.Set;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.onap.vfc.nfvo.emsdriver.commons.constant.Constant;
29 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectVo;
30 import org.onap.vfc.nfvo.emsdriver.commons.utils.StringUtil;
31 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannel;
32 import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannelFactory;
33
34 import com.alibaba.fastjson.JSONObject;
35
36
37 public class AlarmTaskThread extends Thread{
38         public  Log log = LogFactory.getLog(AlarmTaskThread.class);
39         
40         private HeartBeat heartBeat = null;
41         
42         private boolean isStop = false;
43         private CollectVo collectVo = null;
44         private int read_timeout = Constant.READ_TIMEOUT_MILLISECOND;
45         private int reqId;
46         
47         private Socket socket = null;
48         private BufferedInputStream is = null;
49         private BufferedOutputStream dos = null;
50         
51         private MessageChannel alarmChannel;
52         
53         public AlarmTaskThread(CollectVo collectVo) {
54
55                 this.collectVo = collectVo;
56         }
57
58         public void run() {
59                 alarmChannel = MessageChannelFactory.getMessageChannel(Constant.ALARM_CHANNEL_KEY);
60                 try {
61                         this.init();
62                         while(!this.isStop){
63                                 String body;
64                                 try {
65                                         body = this.receive();
66                                         String alarm120 = this.build120Alarm(body);
67                                         
68                                         this.send120Alarm(alarm120);
69                                 } catch (Exception e) {
70                                         reinit();
71                                 }
72                         }
73                 } catch (Exception e) {
74                         log.error(StringUtil.getStackTrace(e));
75                 }
76         }
77         
78         private void send120Alarm(String alarm120) {
79                 
80                 try {
81                         alarmChannel.put(alarm120);
82                 } catch (InterruptedException e) {
83                         log.error(StringUtil.getStackTrace(e));
84                 }
85         }
86
87         private String  build120Alarm(String body) {
88                 StringBuilder content = new StringBuilder(
89                 "<?xml version='1.0' encoding='iso-8859-1'?>\n")
90                 .append("<WholeMsg MsgMark='120' Priority='2' FieldNum='5'><FM_ALARM_MSG>\n");
91         
92         
93                 JSONObject reagobj = JSONObject.parseObject(body);
94                 
95                 Set<String> keys = reagobj.keySet();
96                 
97                 for (String key : keys) {
98                         
99                         String value = (String)reagobj.get(key);
100                         content.append("<").append(key).append(">");
101                     content.append(value);
102                     content.append("</").append(key).append(">\n");
103                 }
104                 content.append("</FM_ALARM_MSG></WholeMsg>");
105
106                 return content.toString();
107                 
108         }
109
110         public String receive() throws Exception {
111
112                 Msg msg =null;
113                 String retString = null;
114                 
115                 while (retString == null && !this.isStop) {
116                         
117                         msg = MessageUtil.readOneMsg(is);
118                         
119                         if("ackLoginAlarm".equalsIgnoreCase(msg.getMsgType().name)){
120                                 log.debug("receive login ack");
121                                 boolean suc = this.ackLoginAlarm(msg);
122                                 if(suc){
123                                         
124                                         if(reqId == Integer.MAX_VALUE){
125                                                 reqId = 0;
126                                         }
127                                         reqId ++;
128                                         Msg  msgheart = MessageUtil.putHeartBeatMsg(reqId);
129                                         heartBeat =  new HeartBeat(socket,msgheart); 
130                                         heartBeat.setName("CMCC_JT_HeartBeat");
131                                         // start heartBeat
132                                         heartBeat.start();
133                                 }
134                                 retString = null;
135                         }
136                         
137                         if("ackHeartBeat".equalsIgnoreCase(msg.getMsgType().name)){
138                                 log.debug("received heartBeat��"+msg.getBody());
139                                 retString = null;
140                         }
141                         
142                         
143                         
144                         if("realTimeAlarm".equalsIgnoreCase(msg.getMsgType().name)){
145                                 log.debug("received alarm message");
146                                 retString =  msg.getBody();
147                         }
148                 }
149                 return retString;
150         }
151         
152         public void init() throws Exception {
153                 isStop = false;
154                 //host
155                 String host = collectVo.getIP();
156                 //port
157                 String port = collectVo.getPort();
158                 //user
159                 String user = collectVo.getUser();
160                 //password
161                 String password = collectVo.getPassword();
162                 
163                 String read_timeout = collectVo.getRead_timeout();
164                 if ((read_timeout != null) && (read_timeout.trim().length() > 0)) {
165                       try {
166                         this.read_timeout = Integer.parseInt(read_timeout);
167                       } catch (NumberFormatException e) {
168                         log.error(StringUtil.getStackTrace(e));
169                       }
170                     }
171                 log.debug("socket connect host=" + host + ", port=" + port);
172                 try {
173                         int portInt = Integer.parseInt(port);
174                         socket = new Socket(host, portInt);
175                         
176                 } catch (UnknownHostException e) {
177                         throw new Exception("remote host [" + host + "]connect fail" + StringUtil.getStackTrace(e));
178                 } catch (IOException e) {
179                         throw new Exception("create socket IOException " + StringUtil.getStackTrace(e));
180                 }
181                 try {
182                         socket.setSoTimeout(this.read_timeout);
183                         socket.setTcpNoDelay(true);
184                         socket.setKeepAlive(true);
185                 } catch (SocketException e) {
186                         throw new Exception(" SocketException " + StringUtil.getStackTrace(e));
187                 }
188                 try {
189                         dos = new BufferedOutputStream(socket.getOutputStream());
190                         
191                         Msg  msg = MessageUtil.putLoginMsg(user,password);
192                         
193                         try {
194                                 log.debug("send login message "+msg.toString(false));
195                                 MessageUtil.writeMsg(msg,dos);
196                                 
197                         } catch (Exception e) {
198                                 log.error("send login message is fail "+StringUtil.getStackTrace(e));
199                         }
200
201                         is = new BufferedInputStream(socket.getInputStream());
202                         
203                 } catch (SocketException e) {
204                         throw new Exception(StringUtil.getStackTrace(e));
205                 }
206         }
207         
208         private boolean ackLoginAlarm(Msg msg) throws Exception {
209                 
210                 boolean is_success = false;
211                 try {
212                         String loginres = msg.getBody();
213                         //ackLoginAlarm; result=fail(succ); resDesc=username-error
214                         String [] loginbody = loginres.split(";");
215                         if(loginbody.length > 1){
216                                 for(String str :loginbody){
217                             if(str.contains("=")){
218                                 String [] paras1 = str.split("=",-1);
219                                 if("result".equalsIgnoreCase(paras1[0].trim())){
220                                                         if("succ".equalsIgnoreCase(paras1[1].trim())){
221                                                                 is_success = true;
222                                                         }else{
223                                                                 is_success = false;
224                                                         }
225                                                 }
226                             }
227                                 }
228                         }else {
229                                 log.error("login ack body Incorrect formatbody=" + loginres);
230                         }
231                         
232                         
233                 } catch (Exception e) {
234                         log.error("pocess login ack fail"+StringUtil.getStackTrace(e));
235                 }
236                 if (is_success) {
237                         log.info("login sucess receive login ack " + msg.getBody());
238                 } else {
239                         log.error("login fail receive login ack  " + msg.getBody());
240                         this.close();
241                         this.isStop = true;
242                         throw new Exception("login fail quit");
243                 }
244                 return is_success;
245         }
246
247         public void close() {
248
249                 if(heartBeat != null){
250                         heartBeat.setStop(true);
251                 }
252                 
253                 if (is != null) {
254                         try {
255                                 is.close();
256                         } catch (IOException e) {
257                         } finally {
258                                 is = null;
259                         }
260                 }
261
262                 if (dos != null) {
263                         try {
264                                 dos.close();
265                         } catch (IOException e) {
266                         } finally {
267                                 dos = null;
268                         }
269                 }
270
271                 if (socket != null) {
272                         try {
273                                 socket.close();
274                         } catch (IOException e) {
275                         } finally {
276                                 socket = null;
277                         }
278
279                 }
280         }
281         
282         public void  reinit() {
283                 int time = 0;
284                 close();
285                 while(!this.isStop) {
286                         close();
287                         time++;
288                         try {
289                                 Thread.sleep(1000 * 10);
290                                 init();
291                                 return;
292                         } catch (Exception e) {
293                                 log.error("Number ["+time+"]reconnect ["+collectVo.getIP()+"]fail" );
294                         }
295                 }
296         }
297 }