Modify MQ listener mode
authorFengLiang <feng.liang1@zte.com.cn>
Mon, 6 Mar 2017 07:04:36 +0000 (15:04 +0800)
committerFengLiang <feng.liang1@zte.com.cn>
Mon, 6 Mar 2017 07:04:36 +0000 (15:04 +0800)
Change-Id: Ided322b55ed1baff351edb03239fc06e32da3844
Issue-ID: HOLMES-47
Signed-off-by: FengLiang <feng.liang1@zte.com.cn>
engine-d/src/main/java/org/openo/holmes/engine/manager/DroolsEngine.java
engine-d/src/test/java/org/openo/holmes/engine/manager/DroolsEngineTest.java

index 0ed452a..d9f55ed 100644 (file)
@@ -16,6 +16,7 @@
 package org.openo.holmes.engine.manager;
 
 
+import java.io.Serializable;
 import java.io.StringReader;
 import java.util.List;
 import java.util.Locale;
@@ -25,11 +26,13 @@ import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
-import javax.jms.ObjectMessage;
+import javax.jms.MessageListener;
 import javax.jms.Session;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQObjectMessage;
 import org.drools.KnowledgeBase;
 import org.drools.KnowledgeBaseConfiguration;
 import org.drools.KnowledgeBaseFactory;
@@ -57,11 +60,6 @@ import org.openo.holmes.engine.wrapper.RuleMgtWrapper;
 @Slf4j
 @Service
 public class DroolsEngine {
-
-    private final static String CORRELATION_RULE = "CORRELATION_RULE";
-
-    private final static String CORRELATION_ALARM = "CORRELATION_ALARM";
-
     private final static int ENABLE = 1;
 
     @Inject
@@ -99,8 +97,8 @@ public class DroolsEngine {
         connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
             mqConfigProvider.get().brokerPassword, brokerURL);
 
-        Thread thread = new Thread(new AlarmMqMessageListener());
-        thread.start();
+        AlarmMqMessageListener listener = new AlarmMqMessageListener();
+        listener.receive();
     }
 
 
@@ -258,34 +256,56 @@ public class DroolsEngine {
         this.ksession.fireAllRules();
     }
 
-    class AlarmMqMessageListener implements Runnable {
+    class AlarmMqMessageListener implements MessageListener {
+
+        private Connection connection = null;
+        private Session session = null;
+        private Destination destination = null;
+        private MessageConsumer consumer = null;
 
-        public void run() {
-            Connection connection;
-            Session session;
-            Destination destination;
-            MessageConsumer messageConsumer;
+        private void initialize() throws JMSException {
+            connection = connectionFactory.createConnection();
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
+            consumer = session.createConsumer(destination);
+            connection.start();
+        }
+
+        public void receive() {
+            try {
+                initialize();
+                consumer.setMessageListener(this);
+            } catch (JMSException e) {
+                log.error("Failed to connect to the MQ service : " + e.getMessage(), e);
+                try {
+                    close();
+                } catch (JMSException e1) {
+                    log.error("Failed close connection  " + e1.getMessage(), e1);
+                }
+            }
+        }
 
+        public void onMessage(Message arg0) {
+            ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;
             try {
-                connection = connectionFactory.createConnection();
-                connection.start();
-                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
-                messageConsumer = session.createConsumer(destination);
-
-                while (true) {
-                    ObjectMessage objMessage = (ObjectMessage) messageConsumer.receive(100000);
-                    if (objMessage != null) {
-                        putRaisedIntoStream((Alarm) objMessage.getObject());
-                    } else {
-                        break;
-                    }
+                Serializable object = objectMessage.getObject();
+
+                if (object instanceof Alarm) {
+                    Alarm alarm = (Alarm) object;
+                    putRaisedIntoStream(alarm);
                 }
             } catch (JMSException e) {
-                log.error("connection mq service Failed: " + e.getMessage(), e);
+                log.error("Failed get object : " + e.getMessage(), e);
             }
+        }
 
+        private void close() throws JMSException {
+            if (consumer != null)
+                consumer.close();
+            if (session != null)
+                session.close();
+            if (connection != null)
+                connection.close();
         }
     }
-
 }
index 1740d62..0f637c7 100644 (file)
@@ -36,6 +36,7 @@ import javax.jms.MessageConsumer;
 import javax.jms.ObjectMessage;\r
 import javax.jms.Session;\r
 import javax.jms.Topic;\r
+import org.apache.activemq.command.ActiveMQObjectMessage;\r
 import org.drools.KnowledgeBase;\r
 import org.drools.KnowledgeBaseConfiguration;\r
 import org.drools.builder.KnowledgeBuilder;\r
@@ -362,59 +363,124 @@ public class DroolsEngineTest {
         PowerMock.verifyAll();\r
     }\r
 \r
+\r
     @Test\r
-    public void listener_run_objmessage_is_null() throws JMSException {\r
+    public void listener_receive() throws JMSException {\r
         DroolsEngine.AlarmMqMessageListener listener = droolsEngine.new AlarmMqMessageListener();\r
 \r
         Connection connection = PowerMock.createMock(Connection.class);\r
         Session session = PowerMock.createMock(Session.class);\r
         Destination destination = PowerMock.createMock(Topic.class);\r
-        MessageConsumer messageConsumer = PowerMock.createMock(MessageConsumer.class);\r
+        MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class);\r
+\r
+        Whitebox.setInternalState(listener, "connection", connection);\r
+        Whitebox.setInternalState(listener, "session", session);\r
+        Whitebox.setInternalState(listener, "destination", destination);\r
+        Whitebox.setInternalState(listener, "consumer", consumer);\r
+\r
+        PowerMock.reset();\r
 \r
         expect(connectionFactory.createConnection()).andReturn(connection);\r
         connection.start();\r
         expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session);\r
         expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination);\r
-        expect(session.createConsumer(anyObject(Destination.class))).andReturn(messageConsumer);\r
-        expect(messageConsumer.receive(anyLong())).andReturn(null);\r
+        expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer);\r
+        consumer.setMessageListener(listener);\r
 \r
         PowerMock.replayAll();\r
 \r
-        listener.run();\r
+        listener.receive();\r
 \r
         PowerMock.verifyAll();\r
     }\r
 \r
     @Test\r
-    public void listener_run_objmessage_is_not_null() throws JMSException {\r
+    public void listener_exception() throws JMSException {\r
         DroolsEngine.AlarmMqMessageListener listener = droolsEngine.new AlarmMqMessageListener();\r
 \r
         Connection connection = PowerMock.createMock(Connection.class);\r
         Session session = PowerMock.createMock(Session.class);\r
         Destination destination = PowerMock.createMock(Topic.class);\r
-        MessageConsumer messageConsumer = PowerMock.createMock(MessageConsumer.class);\r
-        ObjectMessage objMessage = PowerMock.createMock(ObjectMessage.class);\r
+        MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class);\r
 \r
-        FactHandle factHandle = PowerMock.createMock(FactHandle.class);\r
+        Whitebox.setInternalState(listener, "connection", connection);\r
+        Whitebox.setInternalState(listener, "session", session);\r
+        Whitebox.setInternalState(listener, "destination", destination);\r
+        Whitebox.setInternalState(listener, "consumer", consumer);\r
+\r
+        PowerMock.reset();\r
 \r
         expect(connectionFactory.createConnection()).andReturn(connection);\r
         connection.start();\r
         expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session);\r
         expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination);\r
-        expect(session.createConsumer(anyObject(Destination.class))).andReturn(messageConsumer);\r
-        expect(messageConsumer.receive(anyLong())).andReturn(objMessage);\r
-        expect(objMessage.getObject()).andReturn(new Alarm());\r
+        expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer);\r
+        consumer.setMessageListener(listener);\r
+        EasyMock.expectLastCall().andThrow(new JMSException(""));\r
 \r
-        expect(ksession.getFactHandle(anyObject(Alarm.class))).andReturn(factHandle);\r
-        ksession.retract(anyObject(FactHandle.class));\r
-        expect(ksession.insert(anyObject(Alarm.class))).andReturn(null);\r
-        expect(ksession.fireAllRules()).andReturn(0);\r
+        consumer.close();\r
+        session.close();\r
+        connection.close();\r
+\r
+\r
+        PowerMock.replayAll();\r
+\r
+        listener.receive();\r
+\r
+        PowerMock.verifyAll();\r
+    }\r
+\r
+    @Test\r
+    public void listener_close_exception() throws JMSException {\r
+        DroolsEngine.AlarmMqMessageListener listener = droolsEngine.new AlarmMqMessageListener();\r
+\r
+        Connection connection = PowerMock.createMock(Connection.class);\r
+        Session session = PowerMock.createMock(Session.class);\r
+        Destination destination = PowerMock.createMock(Topic.class);\r
+        MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class);\r
 \r
-        expect(messageConsumer.receive(anyLong())).andReturn(null);\r
+        Whitebox.setInternalState(listener, "connection", connection);\r
+        Whitebox.setInternalState(listener, "session", session);\r
+        Whitebox.setInternalState(listener, "destination", destination);\r
+        Whitebox.setInternalState(listener, "consumer", consumer);\r
+\r
+        PowerMock.reset();\r
+\r
+        expect(connectionFactory.createConnection()).andReturn(connection);\r
+        connection.start();\r
+        expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session);\r
+        expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination);\r
+        expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer);\r
+        consumer.setMessageListener(listener);\r
+        EasyMock.expectLastCall().andThrow(new JMSException(""));\r
+\r
+        consumer.close();\r
+        EasyMock.expectLastCall().andThrow(new JMSException(""));\r
+\r
+\r
+        PowerMock.replayAll();\r
+\r
+        listener.receive();\r
+\r
+        PowerMock.verifyAll();\r
+    }\r
+\r
+    @Test\r
+    public void listener_on_message() throws JMSException {\r
+        DroolsEngine.AlarmMqMessageListener listener = droolsEngine.new AlarmMqMessageListener();\r
+        Alarm alarm = new Alarm();\r
+        alarm.setAlarmKey("alarmKey");\r
+        ActiveMQObjectMessage objectMessage = new ActiveMQObjectMessage();\r
+        objectMessage.setObject(alarm);\r
+\r
+        expect(ksession.getFactHandle(anyObject(Alarm.class))).andReturn(null);\r
+\r
+        expect(ksession.insert(anyObject(Alarm.class))).andReturn(null);\r
+        expect(ksession.fireAllRules()).andReturn(1);\r
 \r
         PowerMock.replayAll();\r
 \r
-        listener.run();\r
+        listener.onMessage(objectMessage);\r
 \r
         PowerMock.verifyAll();\r
     }\r