-/**
- * Copyright 2017 ZTE Corporation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.openo.holmes.engine.manager;
-
-
-import java.io.Serializable;
-import java.io.StringReader;
-import java.util.List;
-import java.util.Locale;
-import javax.annotation.PostConstruct;
-import javax.inject.Inject;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.command.ActiveMQObjectMessage;
-import org.drools.KnowledgeBase;
-import org.drools.KnowledgeBaseConfiguration;
-import org.drools.KnowledgeBaseFactory;
-import org.drools.builder.KnowledgeBuilder;
-import org.drools.builder.KnowledgeBuilderFactory;
-import org.drools.builder.ResourceType;
-import org.drools.conf.EventProcessingOption;
-import org.drools.definition.KnowledgePackage;
-import org.drools.io.Resource;
-import org.drools.io.ResourceFactory;
-import org.drools.runtime.StatefulKnowledgeSession;
-import org.drools.runtime.rule.FactHandle;
-import org.glassfish.hk2.api.IterableProvider;
-import org.jvnet.hk2.annotations.Service;
-import org.openo.holmes.common.api.entity.CorrelationRule;
-import org.openo.holmes.common.api.stat.Alarm;
-import org.openo.holmes.common.config.MQConfig;
-import org.openo.holmes.common.constant.AlarmConst;
-import org.openo.holmes.common.exception.CorrelationException;
-import org.openo.holmes.common.utils.ExceptionUtil;
-import org.openo.holmes.common.utils.I18nProxy;
-import org.openo.holmes.engine.request.DeployRuleRequest;
-import org.openo.holmes.engine.wrapper.RuleMgtWrapper;
-
-@Slf4j
-@Service
-public class DroolsEngine {
- private final static int ENABLE = 1;
-
- @Inject
- private RuleMgtWrapper ruleMgtWrapper;
-
- private KnowledgeBase kbase;
-
- private KnowledgeBaseConfiguration kconf;
-
- private StatefulKnowledgeSession ksession;
-
- private KnowledgeBuilder kbuilder;
-
- @Inject
- private IterableProvider<MQConfig> mqConfigProvider;
-
- private ConnectionFactory connectionFactory;
-
- @PostConstruct
- private void init() {
- try {
- // 1. start engine
- start();
- // 2. start mq listener
- registerAlarmTopicListener();
- } catch (Exception e) {
- log.error("Start service failed: " + e.getMessage(), e);
- throw ExceptionUtil.buildExceptionResponse("Start service failed!");
- }
- }
-
- private void registerAlarmTopicListener() {
- String brokerURL =
- "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;
- connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
- mqConfigProvider.get().brokerPassword, brokerURL);
-
- AlarmMqMessageListener listener = new AlarmMqMessageListener();
- listener.receive();
- }
-
-
- private void start() throws CorrelationException {
- log.info("Drools Engine Initialize Beginning...");
-
- initEngineParameter();
- initDeployRule();
-
- log.info("Business Rule Engine Initialize Successfully.");
- }
-
- public void stop() {
- this.ksession.dispose();
- }
-
- private void initEngineParameter(){
- this.kconf = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
-
- this.kconf.setOption(EventProcessingOption.STREAM);
-
- this.kconf.setProperty("drools.assertBehaviour", "equality");
-
- this.kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
-
- this.kbase = KnowledgeBaseFactory.newKnowledgeBase("D-ENGINE", this.kconf);
-
- this.ksession = kbase.newStatefulKnowledgeSession();
- }
-
- private void initDeployRule() throws CorrelationException {
- List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);
-
- if (rules.isEmpty()) {
- return;
- }
- for (CorrelationRule rule : rules) {
- if (rule.getContent() != null) {
- deployRuleFromDB(rule.getContent());
- }
- }
- }
-
- private void deployRuleFromDB(String ruleContent) throws CorrelationException {
- StringReader reader = new StringReader(ruleContent);
- Resource res = ResourceFactory.newReaderResource(reader);
-
- if (kbuilder == null) {
- kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
- }
-
- kbuilder.add(res, ResourceType.DRL);
-
- try {
-
- kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
- } catch (Exception e) {
- throw new CorrelationException(e.getMessage(), e);
- }
-
- ksession.fireAllRules();
- }
-
- public synchronized String deployRule(DeployRuleRequest rule, Locale locale)
- throws CorrelationException {
- StringReader reader = new StringReader(rule.getContent());
- Resource res = ResourceFactory.newReaderResource(reader);
-
- if (kbuilder == null) {
- kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
- }
-
- kbuilder.add(res, ResourceType.DRL);
-
- if (kbuilder.hasErrors()) {
-
- String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
- I18nProxy.ENGINE_CONTENT_ILLEGALITY,
- new String[]{kbuilder.getErrors().toString()});
- throw new CorrelationException(errorMsg);
- }
-
- KnowledgePackage kpackage = kbuilder.getKnowledgePackages().iterator().next();
-
- if (kbase.getKnowledgePackages().contains(kpackage)) {
-
- String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
- I18nProxy.ENGINE_CONTENT_ILLEGALITY,new String[]{
- I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_CONTAINS_PACKAGE)});
-
- throw new CorrelationException(errorMsg);
- }
- try {
-
- kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
- } catch (Exception e) {
-
- String errorMsg =
- I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_DEPLOY_RULE_FAILED);
- throw new CorrelationException(errorMsg, e);
- }
-
- ksession.fireAllRules();
- return kpackage.getName();
- }
-
- public synchronized void undeployRule(String packageName, Locale locale)
- throws CorrelationException {
-
- KnowledgePackage pkg = kbase.getKnowledgePackage(packageName);
-
- if (null == pkg) {
- String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
- I18nProxy.ENGINE_DELETE_RULE_NULL,
- new String[]{packageName});
- throw new CorrelationException(errorMsg);
- }
-
- try {
-
- kbase.removeKnowledgePackage(pkg.getName());
- } catch (Exception e) {
- String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
- I18nProxy.ENGINE_DELETE_RULE_FAILED, new String[]{packageName});
- throw new CorrelationException(errorMsg, e);
- }
- }
-
- public void compileRule(String content, Locale locale)
- throws CorrelationException {
- StringReader reader = new StringReader(content);
- Resource res = ResourceFactory.newReaderResource(reader);
-
- if (kbuilder == null) {
- kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
- }
-
- kbuilder.add(res, ResourceType.DRL);
-
- if (kbuilder.hasErrors()) {
- String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
- I18nProxy.ENGINE_CONTENT_ILLEGALITY,
- new String[]{kbuilder.getErrors().toString()});
- log.error(errorMsg);
- throw new CorrelationException(errorMsg);
- }
- }
-
- public void putRaisedIntoStream(Alarm raiseAlarm) {
- FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm);
- if (factHandle != null) {
- this.ksession.retract(factHandle);
- }
- this.ksession.insert(raiseAlarm);
- this.ksession.fireAllRules();
- }
-
- class AlarmMqMessageListener implements MessageListener {
-
- private Connection connection = null;
- private Session session = null;
- private Destination destination = null;
- private MessageConsumer consumer = null;
-
- private void initialize() throws JMSException {
- connection = connectionFactory.createConnection();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
- consumer = session.createConsumer(destination);
- connection.start();
- }
-
- public void receive() {
- try {
- initialize();
- consumer.setMessageListener(this);
- } catch (JMSException e) {
- log.error("Failed to connect to the MQ service : " + e.getMessage(), e);
- try {
- close();
- } catch (JMSException e1) {
- log.error("Failed close connection " + e1.getMessage(), e1);
- }
- }
- }
-
- public void onMessage(Message arg0) {
- ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;
- try {
- Serializable object = objectMessage.getObject();
-
- if (object instanceof Alarm) {
- Alarm alarm = (Alarm) object;
- putRaisedIntoStream(alarm);
- }
- } catch (JMSException e) {
- log.error("Failed get object : " + e.getMessage(), e);
- }
- }
-
- private void close() throws JMSException {
- if (consumer != null)
- consumer.close();
- if (session != null)
- session.close();
- if (connection != null)
- connection.close();
- }
- }
-}
+/**\r
+ * Copyright 2017 ZTE Corporation.\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+package org.openo.holmes.engine.manager;\r
+\r
+\r
+import java.io.Serializable;\r
+import java.io.StringReader;\r
+import java.util.List;\r
+import java.util.Locale;\r
+import javax.annotation.PostConstruct;\r
+import javax.inject.Inject;\r
+import javax.jms.Connection;\r
+import javax.jms.ConnectionFactory;\r
+import javax.jms.Destination;\r
+import javax.jms.JMSException;\r
+import javax.jms.Message;\r
+import javax.jms.MessageConsumer;\r
+import javax.jms.MessageListener;\r
+import javax.jms.Session;\r
+import lombok.extern.slf4j.Slf4j;\r
+import org.apache.activemq.ActiveMQConnectionFactory;\r
+import org.apache.activemq.command.ActiveMQObjectMessage;\r
+import org.drools.KnowledgeBase;\r
+import org.drools.KnowledgeBaseConfiguration;\r
+import org.drools.KnowledgeBaseFactory;\r
+import org.drools.builder.KnowledgeBuilder;\r
+import org.drools.builder.KnowledgeBuilderFactory;\r
+import org.drools.builder.ResourceType;\r
+import org.drools.conf.EventProcessingOption;\r
+import org.drools.definition.KnowledgePackage;\r
+import org.drools.io.Resource;\r
+import org.drools.io.ResourceFactory;\r
+import org.drools.runtime.StatefulKnowledgeSession;\r
+import org.drools.runtime.rule.FactHandle;\r
+import org.glassfish.hk2.api.IterableProvider;\r
+import org.jvnet.hk2.annotations.Service;\r
+import org.openo.holmes.common.api.entity.CorrelationRule;\r
+import org.openo.holmes.common.api.stat.Alarm;\r
+import org.openo.holmes.common.config.MQConfig;\r
+import org.openo.holmes.common.constant.AlarmConst;\r
+import org.openo.holmes.common.exception.CorrelationException;\r
+import org.openo.holmes.common.utils.ExceptionUtil;\r
+import org.openo.holmes.common.utils.I18nProxy;\r
+import org.openo.holmes.engine.request.DeployRuleRequest;\r
+import org.openo.holmes.engine.wrapper.RuleMgtWrapper;\r
+\r
+@Slf4j\r
+@Service\r
+public class DroolsEngine {\r
+\r
+ private final static int ENABLE = 1;\r
+\r
+ @Inject\r
+ private RuleMgtWrapper ruleMgtWrapper;\r
+\r
+ private KnowledgeBase kbase;\r
+\r
+ private KnowledgeBaseConfiguration kconf;\r
+\r
+ private StatefulKnowledgeSession ksession;\r
+\r
+ private KnowledgeBuilder kbuilder;\r
+\r
+ @Inject\r
+ private IterableProvider<MQConfig> mqConfigProvider;\r
+\r
+ private ConnectionFactory connectionFactory;\r
+\r
+ @PostConstruct\r
+ private void init() {\r
+ try {\r
+ // 1. start engine\r
+ start();\r
+ // 2. start mq listener\r
+ registerAlarmTopicListener();\r
+ } catch (Exception e) {\r
+ log.error("Start service failed: " + e.getMessage(), e);\r
+ throw ExceptionUtil.buildExceptionResponse("Start service failed!");\r
+ }\r
+ }\r
+\r
+ private void registerAlarmTopicListener() {\r
+ String brokerURL =\r
+ "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;\r
+ connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,\r
+ mqConfigProvider.get().brokerPassword, brokerURL);\r
+\r
+ AlarmMqMessageListener listener = new AlarmMqMessageListener();\r
+ listener.receive();\r
+ }\r
+\r
+\r
+ private void start() throws CorrelationException {\r
+ log.info("Drools Engine Initialize Beginning...");\r
+\r
+ initEngineParameter();\r
+ initDeployRule();\r
+\r
+ log.info("Business Rule Engine Initialize Successfully.");\r
+ }\r
+\r
+ public void stop() {\r
+ this.ksession.dispose();\r
+ }\r
+\r
+ private void initEngineParameter() {\r
+ this.kconf = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();\r
+\r
+ this.kconf.setOption(EventProcessingOption.STREAM);\r
+\r
+ this.kconf.setProperty("drools.assertBehaviour", "equality");\r
+\r
+ this.kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();\r
+\r
+ this.kbase = KnowledgeBaseFactory.newKnowledgeBase("D-ENGINE", this.kconf);\r
+\r
+ this.ksession = kbase.newStatefulKnowledgeSession();\r
+ }\r
+\r
+ private void initDeployRule() throws CorrelationException {\r
+ List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);\r
+\r
+ if (rules.isEmpty()) {\r
+ return;\r
+ }\r
+ for (CorrelationRule rule : rules) {\r
+ if (rule.getContent() != null) {\r
+ deployRuleFromDB(rule.getContent());\r
+ }\r
+ }\r
+ }\r
+\r
+ private void deployRuleFromDB(String ruleContent) throws CorrelationException {\r
+ StringReader reader = new StringReader(ruleContent);\r
+ Resource res = ResourceFactory.newReaderResource(reader);\r
+\r
+ kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();\r
+\r
+ kbuilder.add(res, ResourceType.DRL);\r
+\r
+ try {\r
+\r
+ kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());\r
+ } catch (Exception e) {\r
+ throw new CorrelationException(e.getMessage(), e);\r
+ }\r
+ ksession.fireAllRules();\r
+ }\r
+\r
+ public synchronized String deployRule(DeployRuleRequest rule, Locale locale)\r
+ throws CorrelationException {\r
+ StringReader reader = new StringReader(rule.getContent());\r
+ Resource res = ResourceFactory.newReaderResource(reader);\r
+\r
+ kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();\r
+\r
+ kbuilder.add(res, ResourceType.DRL);\r
+\r
+ if (kbuilder.hasErrors()) {\r
+\r
+ String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,\r
+ I18nProxy.ENGINE_CONTENT_ILLEGALITY,\r
+ new String[]{kbuilder.getErrors().toString()});\r
+ throw new CorrelationException(errorMsg);\r
+ }\r
+\r
+ KnowledgePackage kpackage = kbuilder.getKnowledgePackages().iterator().next();\r
+\r
+ if (kbase.getKnowledgePackages().contains(kpackage)) {\r
+\r
+ String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,\r
+ I18nProxy.ENGINE_CONTENT_ILLEGALITY, new String[]{\r
+ I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_CONTAINS_PACKAGE)});\r
+\r
+ throw new CorrelationException(errorMsg);\r
+ }\r
+ try {\r
+\r
+ kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());\r
+ } catch (Exception e) {\r
+\r
+ String errorMsg =\r
+ I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_DEPLOY_RULE_FAILED);\r
+ throw new CorrelationException(errorMsg, e);\r
+ }\r
+\r
+ ksession.fireAllRules();\r
+ return kpackage.getName();\r
+ }\r
+\r
+ public synchronized void undeployRule(String packageName, Locale locale)\r
+ throws CorrelationException {\r
+\r
+ KnowledgePackage pkg = kbase.getKnowledgePackage(packageName);\r
+\r
+ if (null == pkg) {\r
+ String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,\r
+ I18nProxy.ENGINE_DELETE_RULE_NULL,\r
+ new String[]{packageName});\r
+ throw new CorrelationException(errorMsg);\r
+ }\r
+\r
+ try {\r
+\r
+ kbase.removeKnowledgePackage(pkg.getName());\r
+ } catch (Exception e) {\r
+ String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,\r
+ I18nProxy.ENGINE_DELETE_RULE_FAILED, new String[]{packageName});\r
+ throw new CorrelationException(errorMsg, e);\r
+ }\r
+ }\r
+\r
+ public void compileRule(String content, Locale locale)\r
+ throws CorrelationException {\r
+ StringReader reader = new StringReader(content);\r
+ Resource res = ResourceFactory.newReaderResource(reader);\r
+\r
+ kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();\r
+\r
+ kbuilder.add(res, ResourceType.DRL);\r
+\r
+ if (kbuilder.hasErrors()) {\r
+ String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,\r
+ I18nProxy.ENGINE_CONTENT_ILLEGALITY,\r
+ new String[]{kbuilder.getErrors().toString()});\r
+ log.error(errorMsg);\r
+ throw new CorrelationException(errorMsg);\r
+ }\r
+ }\r
+\r
+ public void putRaisedIntoStream(Alarm raiseAlarm) {\r
+ FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm);\r
+ if (factHandle != null) {\r
+ this.ksession.retract(factHandle);\r
+ }\r
+ this.ksession.insert(raiseAlarm);\r
+ this.ksession.fireAllRules();\r
+ }\r
+\r
+ class AlarmMqMessageListener implements MessageListener {\r
+\r
+ private Connection connection = null;\r
+ private Session session = null;\r
+ private Destination destination = null;\r
+ private MessageConsumer consumer = null;\r
+\r
+ private void initialize() throws JMSException {\r
+ connection = connectionFactory.createConnection();\r
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);\r
+ destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);\r
+ consumer = session.createConsumer(destination);\r
+ connection.start();\r
+ }\r
+\r
+ public void receive() {\r
+ try {\r
+ initialize();\r
+ consumer.setMessageListener(this);\r
+ } catch (JMSException e) {\r
+ log.error("Failed to connect to the MQ service : " + e.getMessage(), e);\r
+ try {\r
+ close();\r
+ } catch (JMSException e1) {\r
+ log.error("Failed close connection " + e1.getMessage(), e1);\r
+ }\r
+ }\r
+ }\r
+\r
+ public void onMessage(Message arg0) {\r
+ ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;\r
+ try {\r
+ Serializable object = objectMessage.getObject();\r
+\r
+ if (object instanceof Alarm) {\r
+ Alarm alarm = (Alarm) object;\r
+ putRaisedIntoStream(alarm);\r
+ }\r
+ } catch (JMSException e) {\r
+ log.error("Failed get object : " + e.getMessage(), e);\r
+ }\r
+ }\r
+\r
+ private void close() throws JMSException {\r
+ if (consumer != null) {\r
+ consumer.close();\r
+ }\r
+ if (session != null) {\r
+ session.close();\r
+ }\r
+ if (connection != null) {\r
+ connection.close();\r
+ }\r
+ }\r
+ }\r
+}\r