2 * Copyright 2017 ZTE Corporation.
\r
4 * Licensed under the Apache License, Version 2.0 (the "License");
\r
5 * you may not use this file except in compliance with the License.
\r
6 * You may obtain a copy of the License at
\r
8 * http://www.apache.org/licenses/LICENSE-2.0
\r
10 * Unless required by applicable law or agreed to in writing, software
\r
11 * distributed under the License is distributed on an "AS IS" BASIS,
\r
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
13 * See the License for the specific language governing permissions and
\r
14 * limitations under the License.
\r
16 package org.openo.holmes.engine.manager;
\r
19 import java.io.Serializable;
\r
20 import java.io.StringReader;
\r
21 import java.util.List;
\r
22 import java.util.Locale;
\r
23 import javax.annotation.PostConstruct;
\r
24 import javax.inject.Inject;
\r
25 import javax.jms.Connection;
\r
26 import javax.jms.ConnectionFactory;
\r
27 import javax.jms.Destination;
\r
28 import javax.jms.JMSException;
\r
29 import javax.jms.Message;
\r
30 import javax.jms.MessageConsumer;
\r
31 import javax.jms.MessageListener;
\r
32 import javax.jms.Session;
\r
33 import lombok.extern.slf4j.Slf4j;
\r
34 import org.apache.activemq.ActiveMQConnectionFactory;
\r
35 import org.apache.activemq.command.ActiveMQObjectMessage;
\r
36 import org.drools.KnowledgeBase;
\r
37 import org.drools.KnowledgeBaseConfiguration;
\r
38 import org.drools.KnowledgeBaseFactory;
\r
39 import org.drools.builder.KnowledgeBuilder;
\r
40 import org.drools.builder.KnowledgeBuilderFactory;
\r
41 import org.drools.builder.ResourceType;
\r
42 import org.drools.conf.EventProcessingOption;
\r
43 import org.drools.definition.KnowledgePackage;
\r
44 import org.drools.io.Resource;
\r
45 import org.drools.io.ResourceFactory;
\r
46 import org.drools.runtime.StatefulKnowledgeSession;
\r
47 import org.drools.runtime.rule.FactHandle;
\r
48 import org.glassfish.hk2.api.IterableProvider;
\r
49 import org.jvnet.hk2.annotations.Service;
\r
50 import org.openo.holmes.common.api.entity.CorrelationRule;
\r
51 import org.openo.holmes.common.api.stat.Alarm;
\r
52 import org.openo.holmes.common.config.MQConfig;
\r
53 import org.openo.holmes.common.constant.AlarmConst;
\r
54 import org.openo.holmes.common.exception.CorrelationException;
\r
55 import org.openo.holmes.common.utils.ExceptionUtil;
\r
56 import org.openo.holmes.common.utils.I18nProxy;
\r
57 import org.openo.holmes.engine.request.DeployRuleRequest;
\r
58 import org.openo.holmes.engine.wrapper.RuleMgtWrapper;
\r
62 public class DroolsEngine {
\r
64 private final static int ENABLE = 1;
\r
67 private RuleMgtWrapper ruleMgtWrapper;
\r
69 private KnowledgeBase kbase;
\r
71 private KnowledgeBaseConfiguration kconf;
\r
73 private StatefulKnowledgeSession ksession;
\r
75 private KnowledgeBuilder kbuilder;
\r
78 private IterableProvider<MQConfig> mqConfigProvider;
\r
80 private ConnectionFactory connectionFactory;
\r
83 private void init() {
\r
87 // 2. start mq listener
\r
88 registerAlarmTopicListener();
\r
89 } catch (Exception e) {
\r
90 log.error("Start service failed: " + e.getMessage(), e);
\r
91 throw ExceptionUtil.buildExceptionResponse("Start service failed!");
\r
95 private void registerAlarmTopicListener() {
\r
97 "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;
\r
98 connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
\r
99 mqConfigProvider.get().brokerPassword, brokerURL);
\r
101 AlarmMqMessageListener listener = new AlarmMqMessageListener();
\r
102 listener.receive();
\r
106 private void start() throws CorrelationException {
\r
107 log.info("Drools Engine Initialize Beginning...");
\r
109 initEngineParameter();
\r
112 log.info("Business Rule Engine Initialize Successfully.");
\r
115 public void stop() {
\r
116 this.ksession.dispose();
\r
119 private void initEngineParameter() {
\r
120 this.kconf = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
\r
122 this.kconf.setOption(EventProcessingOption.STREAM);
\r
124 this.kconf.setProperty("drools.assertBehaviour", "equality");
\r
126 this.kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
\r
128 this.kbase = KnowledgeBaseFactory.newKnowledgeBase("D-ENGINE", this.kconf);
\r
130 this.ksession = kbase.newStatefulKnowledgeSession();
\r
133 private void initDeployRule() throws CorrelationException {
\r
134 List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);
\r
136 if (rules.isEmpty()) {
\r
139 for (CorrelationRule rule : rules) {
\r
140 if (rule.getContent() != null) {
\r
141 deployRuleFromDB(rule.getContent());
\r
146 private void deployRuleFromDB(String ruleContent) throws CorrelationException {
\r
147 StringReader reader = new StringReader(ruleContent);
\r
148 Resource res = ResourceFactory.newReaderResource(reader);
\r
150 kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
\r
152 kbuilder.add(res, ResourceType.DRL);
\r
156 kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
\r
157 } catch (Exception e) {
\r
158 throw new CorrelationException(e.getMessage(), e);
\r
160 ksession.fireAllRules();
\r
163 public synchronized String deployRule(DeployRuleRequest rule, Locale locale)
\r
164 throws CorrelationException {
\r
165 StringReader reader = new StringReader(rule.getContent());
\r
166 Resource res = ResourceFactory.newReaderResource(reader);
\r
168 kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
\r
170 kbuilder.add(res, ResourceType.DRL);
\r
172 if (kbuilder.hasErrors()) {
\r
174 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
\r
175 I18nProxy.ENGINE_CONTENT_ILLEGALITY,
\r
176 new String[]{kbuilder.getErrors().toString()});
\r
177 throw new CorrelationException(errorMsg);
\r
180 KnowledgePackage kpackage = kbuilder.getKnowledgePackages().iterator().next();
\r
182 if (kbase.getKnowledgePackages().contains(kpackage)) {
\r
184 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
\r
185 I18nProxy.ENGINE_CONTENT_ILLEGALITY, new String[]{
\r
186 I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_CONTAINS_PACKAGE)});
\r
188 throw new CorrelationException(errorMsg);
\r
192 kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
\r
193 } catch (Exception e) {
\r
196 I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_DEPLOY_RULE_FAILED);
\r
197 throw new CorrelationException(errorMsg, e);
\r
200 ksession.fireAllRules();
\r
201 return kpackage.getName();
\r
204 public synchronized void undeployRule(String packageName, Locale locale)
\r
205 throws CorrelationException {
\r
207 KnowledgePackage pkg = kbase.getKnowledgePackage(packageName);
\r
210 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
\r
211 I18nProxy.ENGINE_DELETE_RULE_NULL,
\r
212 new String[]{packageName});
\r
213 throw new CorrelationException(errorMsg);
\r
218 kbase.removeKnowledgePackage(pkg.getName());
\r
219 } catch (Exception e) {
\r
220 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
\r
221 I18nProxy.ENGINE_DELETE_RULE_FAILED, new String[]{packageName});
\r
222 throw new CorrelationException(errorMsg, e);
\r
226 public void compileRule(String content, Locale locale)
\r
227 throws CorrelationException {
\r
228 StringReader reader = new StringReader(content);
\r
229 Resource res = ResourceFactory.newReaderResource(reader);
\r
231 kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
\r
233 kbuilder.add(res, ResourceType.DRL);
\r
235 if (kbuilder.hasErrors()) {
\r
236 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
\r
237 I18nProxy.ENGINE_CONTENT_ILLEGALITY,
\r
238 new String[]{kbuilder.getErrors().toString()});
\r
239 log.error(errorMsg);
\r
240 throw new CorrelationException(errorMsg);
\r
244 public void putRaisedIntoStream(Alarm raiseAlarm) {
\r
245 FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm);
\r
246 if (factHandle != null) {
\r
247 this.ksession.retract(factHandle);
\r
249 this.ksession.insert(raiseAlarm);
\r
250 this.ksession.fireAllRules();
\r
253 class AlarmMqMessageListener implements MessageListener {
\r
255 private Connection connection = null;
\r
256 private Session session = null;
\r
257 private Destination destination = null;
\r
258 private MessageConsumer consumer = null;
\r
260 private void initialize() throws JMSException {
\r
261 connection = connectionFactory.createConnection();
\r
262 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
\r
263 destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
\r
264 consumer = session.createConsumer(destination);
\r
265 connection.start();
\r
268 public void receive() {
\r
271 consumer.setMessageListener(this);
\r
272 } catch (JMSException e) {
\r
273 log.error("Failed to connect to the MQ service : " + e.getMessage(), e);
\r
276 } catch (JMSException e1) {
\r
277 log.error("Failed close connection " + e1.getMessage(), e1);
\r
282 public void onMessage(Message arg0) {
\r
283 ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;
\r
285 Serializable object = objectMessage.getObject();
\r
287 if (object instanceof Alarm) {
\r
288 Alarm alarm = (Alarm) object;
\r
289 putRaisedIntoStream(alarm);
\r
291 } catch (JMSException e) {
\r
292 log.error("Failed get object : " + e.getMessage(), e);
\r
296 private void close() throws JMSException {
\r
297 if (consumer != null) {
\r
300 if (session != null) {
\r
303 if (connection != null) {
\r
304 connection.close();
\r