Modify MQ listener mode
[holmes/engine-management.git] / engine-d / src / main / java / org / openo / holmes / engine / manager / DroolsEngine.java
index 0ed452a..d9f55ed 100644 (file)
@@ -16,6 +16,7 @@
 package org.openo.holmes.engine.manager;
 
 
+import java.io.Serializable;
 import java.io.StringReader;
 import java.util.List;
 import java.util.Locale;
@@ -25,11 +26,13 @@ import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
-import javax.jms.ObjectMessage;
+import javax.jms.MessageListener;
 import javax.jms.Session;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQObjectMessage;
 import org.drools.KnowledgeBase;
 import org.drools.KnowledgeBaseConfiguration;
 import org.drools.KnowledgeBaseFactory;
@@ -57,11 +60,6 @@ import org.openo.holmes.engine.wrapper.RuleMgtWrapper;
 @Slf4j
 @Service
 public class DroolsEngine {
-
-    private final static String CORRELATION_RULE = "CORRELATION_RULE";
-
-    private final static String CORRELATION_ALARM = "CORRELATION_ALARM";
-
     private final static int ENABLE = 1;
 
     @Inject
@@ -99,8 +97,8 @@ public class DroolsEngine {
         connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
             mqConfigProvider.get().brokerPassword, brokerURL);
 
-        Thread thread = new Thread(new AlarmMqMessageListener());
-        thread.start();
+        AlarmMqMessageListener listener = new AlarmMqMessageListener();
+        listener.receive();
     }
 
 
@@ -258,34 +256,56 @@ public class DroolsEngine {
         this.ksession.fireAllRules();
     }
 
-    class AlarmMqMessageListener implements Runnable {
+    class AlarmMqMessageListener implements MessageListener {
+
+        private Connection connection = null;
+        private Session session = null;
+        private Destination destination = null;
+        private MessageConsumer consumer = null;
 
-        public void run() {
-            Connection connection;
-            Session session;
-            Destination destination;
-            MessageConsumer messageConsumer;
+        private void initialize() throws JMSException {
+            connection = connectionFactory.createConnection();
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
+            consumer = session.createConsumer(destination);
+            connection.start();
+        }
+
+        public void receive() {
+            try {
+                initialize();
+                consumer.setMessageListener(this);
+            } catch (JMSException e) {
+                log.error("Failed to connect to the MQ service : " + e.getMessage(), e);
+                try {
+                    close();
+                } catch (JMSException e1) {
+                    log.error("Failed close connection  " + e1.getMessage(), e1);
+                }
+            }
+        }
 
+        public void onMessage(Message arg0) {
+            ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;
             try {
-                connection = connectionFactory.createConnection();
-                connection.start();
-                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
-                messageConsumer = session.createConsumer(destination);
-
-                while (true) {
-                    ObjectMessage objMessage = (ObjectMessage) messageConsumer.receive(100000);
-                    if (objMessage != null) {
-                        putRaisedIntoStream((Alarm) objMessage.getObject());
-                    } else {
-                        break;
-                    }
+                Serializable object = objectMessage.getObject();
+
+                if (object instanceof Alarm) {
+                    Alarm alarm = (Alarm) object;
+                    putRaisedIntoStream(alarm);
                 }
             } catch (JMSException e) {
-                log.error("connection mq service Failed: " + e.getMessage(), e);
+                log.error("Failed get object : " + e.getMessage(), e);
             }
+        }
 
+        private void close() throws JMSException {
+            if (consumer != null)
+                consumer.close();
+            if (session != null)
+                session.close();
+            if (connection != null)
+                connection.close();
         }
     }
-
 }