Modify MQ listener mode
[holmes/engine-management.git] / engine-d / src / main / java / org / openo / holmes / engine / manager / DroolsEngine.java
index 473b929..d9f55ed 100644 (file)
@@ -16,6 +16,7 @@
 package org.openo.holmes.engine.manager;
 
 
+import java.io.Serializable;
 import java.io.StringReader;
 import java.util.List;
 import java.util.Locale;
@@ -25,11 +26,13 @@ import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
-import javax.jms.ObjectMessage;
+import javax.jms.MessageListener;
 import javax.jms.Session;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQObjectMessage;
 import org.drools.KnowledgeBase;
 import org.drools.KnowledgeBaseConfiguration;
 import org.drools.KnowledgeBaseFactory;
@@ -49,9 +52,6 @@ import org.openo.holmes.common.api.stat.Alarm;
 import org.openo.holmes.common.config.MQConfig;
 import org.openo.holmes.common.constant.AlarmConst;
 import org.openo.holmes.common.exception.CorrelationException;
-import org.openo.holmes.common.exception.DbException;
-import org.openo.holmes.common.exception.EngineException;
-import org.openo.holmes.common.exception.RuleIllegalityException;
 import org.openo.holmes.common.utils.ExceptionUtil;
 import org.openo.holmes.common.utils.I18nProxy;
 import org.openo.holmes.engine.request.DeployRuleRequest;
@@ -60,11 +60,6 @@ import org.openo.holmes.engine.wrapper.RuleMgtWrapper;
 @Slf4j
 @Service
 public class DroolsEngine {
-
-    private final static String CORRELATION_RULE = "CORRELATION_RULE";
-
-    private final static String CORRELATION_ALARM = "CORRELATION_ALARM";
-
     private final static int ENABLE = 1;
 
     @Inject
@@ -91,7 +86,7 @@ public class DroolsEngine {
             // 2. start mq listener
             registerAlarmTopicListener();
         } catch (Exception e) {
-            log.error("Start service failed: " + e.getMessage());
+            log.error("Start service failed: " + e.getMessage(), e);
             throw ExceptionUtil.buildExceptionResponse("Start service failed!");
         }
     }
@@ -102,25 +97,25 @@ public class DroolsEngine {
         connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
             mqConfigProvider.get().brokerPassword, brokerURL);
 
-        Thread thread = new Thread(new AlarmMqMessageListener());
-        thread.start();
+        AlarmMqMessageListener listener = new AlarmMqMessageListener();
+        listener.receive();
     }
 
 
-    private void start() throws EngineException, RuleIllegalityException, DbException {
-        log.info("Drools Egine Initialize Begining ... ");
+    private void start() throws CorrelationException {
+        log.info("Drools Engine Initialize Beginning...");
 
         initEngineParameter();
-//        initDeployRule();
+        initDeployRule();
 
-        log.info("Business Rule Egine Initialize Successfully ");
+        log.info("Business Rule Engine Initialize Successfully.");
     }
 
-    public void stop() throws Exception {
+    public void stop() {
         this.ksession.dispose();
     }
 
-    private void initEngineParameter() throws EngineException {
+    private void initEngineParameter(){
         this.kconf = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
 
         this.kconf.setOption(EventProcessingOption.STREAM);
@@ -134,23 +129,26 @@ public class DroolsEngine {
         this.ksession = kbase.newStatefulKnowledgeSession();
     }
 
-    private void initDeployRule() throws RuleIllegalityException, EngineException, DbException {
+    private void initDeployRule() throws CorrelationException {
         List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);
 
-        if (rules.size() > 0) {
-            for (CorrelationRule rule : rules) {
-                if (rule.getContent() != null) {
-                    deployRuleFromCache(rule.getContent());
-                }
+        if (rules.isEmpty()) {
+            return;
+        }
+        for (CorrelationRule rule : rules) {
+            if (rule.getContent() != null) {
+                deployRuleFromDB(rule.getContent());
             }
         }
     }
 
-    private void deployRuleFromCache(String ruleContent) throws EngineException {
+    private void deployRuleFromDB(String ruleContent) throws CorrelationException {
         StringReader reader = new StringReader(ruleContent);
         Resource res = ResourceFactory.newReaderResource(reader);
 
-        kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
+        if (kbuilder == null) {
+            kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
+        }
 
         kbuilder.add(res, ResourceType.DRL);
 
@@ -158,11 +156,9 @@ public class DroolsEngine {
 
             kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
         } catch (Exception e) {
-            throw new EngineException(e);
+            throw new CorrelationException(e.getMessage(), e);
         }
 
-        kbuilder = null;
-
         ksession.fireAllRules();
     }
 
@@ -171,7 +167,9 @@ public class DroolsEngine {
         StringReader reader = new StringReader(rule.getContent());
         Resource res = ResourceFactory.newReaderResource(reader);
 
-        kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
+        if (kbuilder == null) {
+            kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
+        }
 
         kbuilder.add(res, ResourceType.DRL);
 
@@ -183,9 +181,9 @@ public class DroolsEngine {
             throw new CorrelationException(errorMsg);
         }
 
-        String packageName = kbuilder.getKnowledgePackages().iterator().next().getName();
+        KnowledgePackage kpackage = kbuilder.getKnowledgePackages().iterator().next();
 
-        if (kbase.getKnowledgePackages().contains(packageName)) {
+        if (kbase.getKnowledgePackages().contains(kpackage)) {
 
             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
                 I18nProxy.ENGINE_CONTENT_ILLEGALITY,new String[]{
@@ -203,13 +201,10 @@ public class DroolsEngine {
             throw new CorrelationException(errorMsg, e);
         }
 
-        kbuilder = null;
-
         ksession.fireAllRules();
-        return packageName;
+        return kpackage.getName();
     }
 
-
     public synchronized void undeployRule(String packageName, Locale locale)
         throws CorrelationException {
 
@@ -237,7 +232,9 @@ public class DroolsEngine {
         StringReader reader = new StringReader(content);
         Resource res = ResourceFactory.newReaderResource(reader);
 
-        kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
+        if (kbuilder == null) {
+            kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
+        }
 
         kbuilder.add(res, ResourceType.DRL);
 
@@ -248,7 +245,6 @@ public class DroolsEngine {
             log.error(errorMsg);
             throw new CorrelationException(errorMsg);
         }
-        kbuilder = null;
     }
 
     public void putRaisedIntoStream(Alarm raiseAlarm) {
@@ -260,34 +256,56 @@ public class DroolsEngine {
         this.ksession.fireAllRules();
     }
 
-    class AlarmMqMessageListener implements Runnable {
+    class AlarmMqMessageListener implements MessageListener {
 
-        public void run() {
-            Connection connection;
-            Session session;
-            Destination destination;
-            MessageConsumer messageConsumer;
+        private Connection connection = null;
+        private Session session = null;
+        private Destination destination = null;
+        private MessageConsumer consumer = null;
 
+        private void initialize() throws JMSException {
+            connection = connectionFactory.createConnection();
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
+            consumer = session.createConsumer(destination);
+            connection.start();
+        }
+
+        public void receive() {
             try {
-                connection = connectionFactory.createConnection();
-                connection.start();
-                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
-                messageConsumer = session.createConsumer(destination);
-
-                while (true) {
-                    ObjectMessage objMessage = (ObjectMessage) messageConsumer.receive(100000);
-                    if (objMessage != null) {
-                        putRaisedIntoStream((Alarm) objMessage.getObject());
-                    } else {
-                        break;
-                    }
+                initialize();
+                consumer.setMessageListener(this);
+            } catch (JMSException e) {
+                log.error("Failed to connect to the MQ service : " + e.getMessage(), e);
+                try {
+                    close();
+                } catch (JMSException e1) {
+                    log.error("Failed close connection  " + e1.getMessage(), e1);
+                }
+            }
+        }
+
+        public void onMessage(Message arg0) {
+            ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;
+            try {
+                Serializable object = objectMessage.getObject();
+
+                if (object instanceof Alarm) {
+                    Alarm alarm = (Alarm) object;
+                    putRaisedIntoStream(alarm);
                 }
             } catch (JMSException e) {
-                log.error("connection mq service Failed: " + e.getMessage());
+                log.error("Failed get object : " + e.getMessage(), e);
             }
+        }
 
+        private void close() throws JMSException {
+            if (consumer != null)
+                consumer.close();
+            if (session != null)
+                session.close();
+            if (connection != null)
+                connection.close();
         }
     }
-
 }