Modify MQ listener mode
[holmes/engine-management.git] / engine-d / src / main / java / org / openo / holmes / engine / manager / DroolsEngine.java
index b6d867f..d9f55ed 100644 (file)
@@ -16,6 +16,7 @@
 package org.openo.holmes.engine.manager;
 
 
+import java.io.Serializable;
 import java.io.StringReader;
 import java.util.List;
 import java.util.Locale;
@@ -25,11 +26,13 @@ import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
-import javax.jms.ObjectMessage;
+import javax.jms.MessageListener;
 import javax.jms.Session;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQObjectMessage;
 import org.drools.KnowledgeBase;
 import org.drools.KnowledgeBaseConfiguration;
 import org.drools.KnowledgeBaseFactory;
@@ -49,9 +52,6 @@ import org.openo.holmes.common.api.stat.Alarm;
 import org.openo.holmes.common.config.MQConfig;
 import org.openo.holmes.common.constant.AlarmConst;
 import org.openo.holmes.common.exception.CorrelationException;
-import org.openo.holmes.common.exception.DbException;
-import org.openo.holmes.common.exception.EngineException;
-import org.openo.holmes.common.exception.RuleIllegalityException;
 import org.openo.holmes.common.utils.ExceptionUtil;
 import org.openo.holmes.common.utils.I18nProxy;
 import org.openo.holmes.engine.request.DeployRuleRequest;
@@ -60,11 +60,6 @@ import org.openo.holmes.engine.wrapper.RuleMgtWrapper;
 @Slf4j
 @Service
 public class DroolsEngine {
-
-    private final static String CORRELATION_RULE = "CORRELATION_RULE";
-
-    private final static String CORRELATION_ALARM = "CORRELATION_ALARM";
-
     private final static int ENABLE = 1;
 
     @Inject
@@ -102,12 +97,12 @@ public class DroolsEngine {
         connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
             mqConfigProvider.get().brokerPassword, brokerURL);
 
-        Thread thread = new Thread(new AlarmMqMessageListener());
-        thread.start();
+        AlarmMqMessageListener listener = new AlarmMqMessageListener();
+        listener.receive();
     }
 
 
-    private void start() throws EngineException, RuleIllegalityException, DbException {
+    private void start() throws CorrelationException {
         log.info("Drools Engine Initialize Beginning...");
 
         initEngineParameter();
@@ -120,7 +115,7 @@ public class DroolsEngine {
         this.ksession.dispose();
     }
 
-    private void initEngineParameter() throws EngineException {
+    private void initEngineParameter(){
         this.kconf = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
 
         this.kconf.setOption(EventProcessingOption.STREAM);
@@ -134,19 +129,20 @@ public class DroolsEngine {
         this.ksession = kbase.newStatefulKnowledgeSession();
     }
 
-    private void initDeployRule() throws RuleIllegalityException, EngineException, DbException {
+    private void initDeployRule() throws CorrelationException {
         List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);
 
-        if (!rules.isEmpty()) {
-            for (CorrelationRule rule : rules) {
-                if (rule.getContent() != null) {
-                    deployRuleFromCache(rule.getContent());
-                }
+        if (rules.isEmpty()) {
+            return;
+        }
+        for (CorrelationRule rule : rules) {
+            if (rule.getContent() != null) {
+                deployRuleFromDB(rule.getContent());
             }
         }
     }
 
-    private void deployRuleFromCache(String ruleContent) throws EngineException {
+    private void deployRuleFromDB(String ruleContent) throws CorrelationException {
         StringReader reader = new StringReader(ruleContent);
         Resource res = ResourceFactory.newReaderResource(reader);
 
@@ -160,7 +156,7 @@ public class DroolsEngine {
 
             kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
         } catch (Exception e) {
-            throw new EngineException(e);
+            throw new CorrelationException(e.getMessage(), e);
         }
 
         ksession.fireAllRules();
@@ -260,34 +256,56 @@ public class DroolsEngine {
         this.ksession.fireAllRules();
     }
 
-    class AlarmMqMessageListener implements Runnable {
+    class AlarmMqMessageListener implements MessageListener {
+
+        private Connection connection = null;
+        private Session session = null;
+        private Destination destination = null;
+        private MessageConsumer consumer = null;
+
+        private void initialize() throws JMSException {
+            connection = connectionFactory.createConnection();
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
+            consumer = session.createConsumer(destination);
+            connection.start();
+        }
 
-        public void run() {
-            Connection connection;
-            Session session;
-            Destination destination;
-            MessageConsumer messageConsumer;
+        public void receive() {
+            try {
+                initialize();
+                consumer.setMessageListener(this);
+            } catch (JMSException e) {
+                log.error("Failed to connect to the MQ service : " + e.getMessage(), e);
+                try {
+                    close();
+                } catch (JMSException e1) {
+                    log.error("Failed close connection  " + e1.getMessage(), e1);
+                }
+            }
+        }
 
+        public void onMessage(Message arg0) {
+            ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;
             try {
-                connection = connectionFactory.createConnection();
-                connection.start();
-                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
-                messageConsumer = session.createConsumer(destination);
-
-                while (true) {
-                    ObjectMessage objMessage = (ObjectMessage) messageConsumer.receive(100000);
-                    if (objMessage != null) {
-                        putRaisedIntoStream((Alarm) objMessage.getObject());
-                    } else {
-                        break;
-                    }
+                Serializable object = objectMessage.getObject();
+
+                if (object instanceof Alarm) {
+                    Alarm alarm = (Alarm) object;
+                    putRaisedIntoStream(alarm);
                 }
             } catch (JMSException e) {
-                log.error("connection mq service Failed: " + e.getMessage(), e);
+                log.error("Failed get object : " + e.getMessage(), e);
             }
+        }
 
+        private void close() throws JMSException {
+            if (consumer != null)
+                consumer.close();
+            if (session != null)
+                session.close();
+            if (connection != null)
+                connection.close();
         }
     }
-
 }