package org.openo.holmes.engine.manager;
+import java.io.Serializable;
import java.io.StringReader;
import java.util.List;
import java.util.Locale;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
-import javax.jms.ObjectMessage;
+import javax.jms.MessageListener;
import javax.jms.Session;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQObjectMessage;
import org.drools.KnowledgeBase;
import org.drools.KnowledgeBaseConfiguration;
import org.drools.KnowledgeBaseFactory;
import org.openo.holmes.common.config.MQConfig;
import org.openo.holmes.common.constant.AlarmConst;
import org.openo.holmes.common.exception.CorrelationException;
-import org.openo.holmes.common.exception.DbException;
-import org.openo.holmes.common.exception.EngineException;
-import org.openo.holmes.common.exception.RuleIllegalityException;
import org.openo.holmes.common.utils.ExceptionUtil;
import org.openo.holmes.common.utils.I18nProxy;
import org.openo.holmes.engine.request.DeployRuleRequest;
@Slf4j
@Service
public class DroolsEngine {
-
- private final static String CORRELATION_RULE = "CORRELATION_RULE";
-
- private final static String CORRELATION_ALARM = "CORRELATION_ALARM";
-
private final static int ENABLE = 1;
@Inject
// 2. start mq listener
registerAlarmTopicListener();
} catch (Exception e) {
- log.error("Start service failed: " + e.getMessage());
+ log.error("Start service failed: " + e.getMessage(), e);
throw ExceptionUtil.buildExceptionResponse("Start service failed!");
}
}
connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
mqConfigProvider.get().brokerPassword, brokerURL);
- Thread thread = new Thread(new AlarmMqMessageListener());
- thread.start();
+ AlarmMqMessageListener listener = new AlarmMqMessageListener();
+ listener.receive();
}
- private void start() throws EngineException, RuleIllegalityException, DbException {
- log.info("Drools Egine Initialize Begining ... ");
+ private void start() throws CorrelationException {
+ log.info("Drools Engine Initialize Beginning...");
initEngineParameter();
-// initDeployRule();
+ initDeployRule();
- log.info("Business Rule Egine Initialize Successfully ");
+ log.info("Business Rule Engine Initialize Successfully.");
}
- public void stop() throws Exception {
+ public void stop() {
this.ksession.dispose();
}
- private void initEngineParameter() throws EngineException {
+ private void initEngineParameter(){
this.kconf = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
this.kconf.setOption(EventProcessingOption.STREAM);
this.ksession = kbase.newStatefulKnowledgeSession();
}
- private void initDeployRule() throws RuleIllegalityException, EngineException, DbException {
+ private void initDeployRule() throws CorrelationException {
List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);
- if (rules.size() > 0) {
- for (CorrelationRule rule : rules) {
- if (rule.getContent() != null) {
- deployRuleFromCache(rule.getContent());
- }
+ if (rules.isEmpty()) {
+ return;
+ }
+ for (CorrelationRule rule : rules) {
+ if (rule.getContent() != null) {
+ deployRuleFromDB(rule.getContent());
}
}
}
- private void deployRuleFromCache(String ruleContent) throws EngineException {
+ private void deployRuleFromDB(String ruleContent) throws CorrelationException {
StringReader reader = new StringReader(ruleContent);
Resource res = ResourceFactory.newReaderResource(reader);
- kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
+ if (kbuilder == null) {
+ kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
+ }
kbuilder.add(res, ResourceType.DRL);
kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
} catch (Exception e) {
- throw new EngineException(e);
+ throw new CorrelationException(e.getMessage(), e);
}
- kbuilder = null;
-
ksession.fireAllRules();
}
StringReader reader = new StringReader(rule.getContent());
Resource res = ResourceFactory.newReaderResource(reader);
- kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
+ if (kbuilder == null) {
+ kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
+ }
kbuilder.add(res, ResourceType.DRL);
throw new CorrelationException(errorMsg);
}
- String packageName = kbuilder.getKnowledgePackages().iterator().next().getName();
+ KnowledgePackage kpackage = kbuilder.getKnowledgePackages().iterator().next();
- if (kbase.getKnowledgePackages().contains(packageName)) {
+ if (kbase.getKnowledgePackages().contains(kpackage)) {
String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
I18nProxy.ENGINE_CONTENT_ILLEGALITY,new String[]{
throw new CorrelationException(errorMsg, e);
}
- kbuilder = null;
-
ksession.fireAllRules();
- return packageName;
+ return kpackage.getName();
}
-
public synchronized void undeployRule(String packageName, Locale locale)
throws CorrelationException {
StringReader reader = new StringReader(content);
Resource res = ResourceFactory.newReaderResource(reader);
- kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
+ if (kbuilder == null) {
+ kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
+ }
kbuilder.add(res, ResourceType.DRL);
log.error(errorMsg);
throw new CorrelationException(errorMsg);
}
- kbuilder = null;
}
public void putRaisedIntoStream(Alarm raiseAlarm) {
this.ksession.fireAllRules();
}
- class AlarmMqMessageListener implements Runnable {
+ class AlarmMqMessageListener implements MessageListener {
- public void run() {
- Connection connection;
- Session session;
- Destination destination;
- MessageConsumer messageConsumer;
+ private Connection connection = null;
+ private Session session = null;
+ private Destination destination = null;
+ private MessageConsumer consumer = null;
+ private void initialize() throws JMSException {
+ connection = connectionFactory.createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
+ consumer = session.createConsumer(destination);
+ connection.start();
+ }
+
+ public void receive() {
try {
- connection = connectionFactory.createConnection();
- connection.start();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
- messageConsumer = session.createConsumer(destination);
-
- while (true) {
- ObjectMessage objMessage = (ObjectMessage) messageConsumer.receive(100000);
- if (objMessage != null) {
- putRaisedIntoStream((Alarm) objMessage.getObject());
- } else {
- break;
- }
+ initialize();
+ consumer.setMessageListener(this);
+ } catch (JMSException e) {
+ log.error("Failed to connect to the MQ service : " + e.getMessage(), e);
+ try {
+ close();
+ } catch (JMSException e1) {
+ log.error("Failed close connection " + e1.getMessage(), e1);
+ }
+ }
+ }
+
+ public void onMessage(Message arg0) {
+ ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;
+ try {
+ Serializable object = objectMessage.getObject();
+
+ if (object instanceof Alarm) {
+ Alarm alarm = (Alarm) object;
+ putRaisedIntoStream(alarm);
}
} catch (JMSException e) {
- log.error("connection mq service Failed: " + e.getMessage());
+ log.error("Failed get object : " + e.getMessage(), e);
}
+ }
+ private void close() throws JMSException {
+ if (consumer != null)
+ consumer.close();
+ if (session != null)
+ session.close();
+ if (connection != null)
+ connection.close();
}
}
-
}