2 * Copyright 2017 ZTE Corporation.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.openo.holmes.engine.manager;
19 import java.io.StringReader;
20 import java.util.List;
21 import java.util.Locale;
22 import javax.annotation.PostConstruct;
23 import javax.inject.Inject;
24 import javax.jms.Connection;
25 import javax.jms.ConnectionFactory;
26 import javax.jms.Destination;
27 import javax.jms.JMSException;
28 import javax.jms.MessageConsumer;
29 import javax.jms.ObjectMessage;
30 import javax.jms.Session;
31 import lombok.extern.slf4j.Slf4j;
32 import org.apache.activemq.ActiveMQConnectionFactory;
33 import org.drools.KnowledgeBase;
34 import org.drools.KnowledgeBaseConfiguration;
35 import org.drools.KnowledgeBaseFactory;
36 import org.drools.builder.KnowledgeBuilder;
37 import org.drools.builder.KnowledgeBuilderFactory;
38 import org.drools.builder.ResourceType;
39 import org.drools.conf.EventProcessingOption;
40 import org.drools.definition.KnowledgePackage;
41 import org.drools.io.Resource;
42 import org.drools.io.ResourceFactory;
43 import org.drools.runtime.StatefulKnowledgeSession;
44 import org.drools.runtime.rule.FactHandle;
45 import org.glassfish.hk2.api.IterableProvider;
46 import org.jvnet.hk2.annotations.Service;
47 import org.openo.holmes.common.api.entity.CorrelationRule;
48 import org.openo.holmes.common.api.stat.Alarm;
49 import org.openo.holmes.common.config.MQConfig;
50 import org.openo.holmes.common.constant.AlarmConst;
51 import org.openo.holmes.common.exception.CorrelationException;
52 import org.openo.holmes.common.exception.DbException;
53 import org.openo.holmes.common.exception.EngineException;
54 import org.openo.holmes.common.exception.RuleIllegalityException;
55 import org.openo.holmes.common.utils.ExceptionUtil;
56 import org.openo.holmes.common.utils.I18nProxy;
57 import org.openo.holmes.engine.request.DeployRuleRequest;
58 import org.openo.holmes.engine.wrapper.RuleMgtWrapper;
62 public class DroolsEngine {
64 private final static String CORRELATION_RULE = "CORRELATION_RULE";
66 private final static String CORRELATION_ALARM = "CORRELATION_ALARM";
68 private final static int ENABLE = 1;
71 private RuleMgtWrapper ruleMgtWrapper;
73 private KnowledgeBase kbase;
75 private KnowledgeBaseConfiguration kconf;
77 private StatefulKnowledgeSession ksession;
79 private KnowledgeBuilder kbuilder;
82 private IterableProvider<MQConfig> mqConfigProvider;
84 private ConnectionFactory connectionFactory;
91 // 2. start mq listener
92 registerAlarmTopicListener();
93 } catch (Exception e) {
94 log.error("Start service failed: " + e.getMessage());
95 throw ExceptionUtil.buildExceptionResponse("Start service failed!");
99 private void registerAlarmTopicListener() {
101 "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;
102 connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
103 mqConfigProvider.get().brokerPassword, brokerURL);
105 Thread thread = new Thread(new AlarmMqMessageListener());
110 private void start() throws EngineException, RuleIllegalityException, DbException {
111 log.info("Drools Egine Initialize Begining ... ");
113 initEngineParameter();
116 log.info("Business Rule Egine Initialize Successfully ");
119 public void stop() throws Exception {
120 this.ksession.dispose();
123 private void initEngineParameter() throws EngineException {
124 this.kconf = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
126 this.kconf.setOption(EventProcessingOption.STREAM);
128 this.kconf.setProperty("drools.assertBehaviour", "equality");
130 this.kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
132 this.kbase = KnowledgeBaseFactory.newKnowledgeBase("D-ENGINE", this.kconf);
134 this.ksession = kbase.newStatefulKnowledgeSession();
137 private void initDeployRule() throws RuleIllegalityException, EngineException, DbException {
138 List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);
140 if (rules.size() > 0) {
141 for (CorrelationRule rule : rules) {
142 if (rule.getContent() != null) {
143 deployRuleFromCache(rule.getContent());
149 private void deployRuleFromCache(String ruleContent) throws EngineException {
150 StringReader reader = new StringReader(ruleContent);
151 Resource res = ResourceFactory.newReaderResource(reader);
153 if (kbuilder == null) {
154 kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
157 kbuilder.add(res, ResourceType.DRL);
161 kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
162 } catch (Exception e) {
163 throw new EngineException(e);
166 ksession.fireAllRules();
169 public synchronized String deployRule(DeployRuleRequest rule, Locale locale)
170 throws CorrelationException {
171 StringReader reader = new StringReader(rule.getContent());
172 Resource res = ResourceFactory.newReaderResource(reader);
174 if (kbuilder == null) {
175 kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
178 kbuilder.add(res, ResourceType.DRL);
180 if (kbuilder.hasErrors()) {
182 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
183 I18nProxy.ENGINE_CONTENT_ILLEGALITY,
184 new String[]{kbuilder.getErrors().toString()});
185 throw new CorrelationException(errorMsg);
188 KnowledgePackage kpackage = kbuilder.getKnowledgePackages().iterator().next();
190 if (kbase.getKnowledgePackages().contains(kpackage)) {
192 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
193 I18nProxy.ENGINE_CONTENT_ILLEGALITY,new String[]{
194 I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_CONTAINS_PACKAGE)});
196 throw new CorrelationException(errorMsg);
200 kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
201 } catch (Exception e) {
204 I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_DEPLOY_RULE_FAILED);
205 throw new CorrelationException(errorMsg, e);
208 ksession.fireAllRules();
209 return kpackage.getName();
212 public synchronized void undeployRule(String packageName, Locale locale)
213 throws CorrelationException {
215 KnowledgePackage pkg = kbase.getKnowledgePackage(packageName);
218 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
219 I18nProxy.ENGINE_DELETE_RULE_NULL,
220 new String[]{packageName});
221 throw new CorrelationException(errorMsg);
226 kbase.removeKnowledgePackage(pkg.getName());
227 } catch (Exception e) {
228 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
229 I18nProxy.ENGINE_DELETE_RULE_FAILED, new String[]{packageName});
230 throw new CorrelationException(errorMsg, e);
234 public void compileRule(String content, Locale locale)
235 throws CorrelationException {
236 StringReader reader = new StringReader(content);
237 Resource res = ResourceFactory.newReaderResource(reader);
239 if (kbuilder == null) {
240 kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
243 kbuilder.add(res, ResourceType.DRL);
245 if (kbuilder.hasErrors()) {
246 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
247 I18nProxy.ENGINE_CONTENT_ILLEGALITY,
248 new String[]{kbuilder.getErrors().toString()});
250 throw new CorrelationException(errorMsg);
254 public void putRaisedIntoStream(Alarm raiseAlarm) {
255 FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm);
256 if (factHandle != null) {
257 this.ksession.retract(factHandle);
259 this.ksession.insert(raiseAlarm);
260 this.ksession.fireAllRules();
263 class AlarmMqMessageListener implements Runnable {
266 Connection connection;
268 Destination destination;
269 MessageConsumer messageConsumer;
272 connection = connectionFactory.createConnection();
274 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
275 destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
276 messageConsumer = session.createConsumer(destination);
279 ObjectMessage objMessage = (ObjectMessage) messageConsumer.receive(100000);
280 if (objMessage != null) {
281 putRaisedIntoStream((Alarm) objMessage.getObject());
286 } catch (JMSException e) {
287 log.error("connection mq service Failed: " + e.getMessage());