- class AlarmMqMessageListener implements MessageListener {\r
-\r
- private Connection connection = null;\r
- private Session session = null;\r
- private Destination destination = null;\r
- private MessageConsumer consumer = null;\r
-\r
- private void initialize() throws JMSException {\r
- connection = connectionFactory.createConnection();\r
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);\r
- destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);\r
- consumer = session.createConsumer(destination);\r
- connection.start();\r
- }\r
-\r
- public void receive() {\r
- try {\r
- initialize();\r
- consumer.setMessageListener(this);\r
- } catch (JMSException e) {\r
- log.error("Failed to connect to the MQ service : " + e.getMessage(), e);\r
- try {\r
- close();\r
- } catch (JMSException e1) {\r
- log.error("Failed close connection " + e1.getMessage(), e1);\r
- }\r
- }\r
- }\r
-\r
- public void onMessage(Message arg0) {\r
- ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;\r
- try {\r
- Serializable object = objectMessage.getObject();\r
-\r
- if (object instanceof Alarm) {\r
- Alarm alarm = (Alarm) object;\r
- putRaisedIntoStream(alarm);\r
- }\r
- } catch (JMSException e) {\r
- log.error("Failed get object : " + e.getMessage(), e);\r
- }\r
- }\r
-\r
- private void close() throws JMSException {\r
- if (consumer != null) {\r
- consumer.close();\r
- }\r
- if (session != null) {\r
- session.close();\r
- }\r
- if (connection != null) {\r
- connection.close();\r
- }\r
- }\r
- }\r