2 * Copyright 2017 ZTE Corporation.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.openo.holmes.engine.manager;
19 import java.io.StringReader;
20 import java.util.List;
21 import java.util.Locale;
22 import javax.annotation.PostConstruct;
23 import javax.inject.Inject;
24 import javax.jms.Connection;
25 import javax.jms.ConnectionFactory;
26 import javax.jms.Destination;
27 import javax.jms.JMSException;
28 import javax.jms.MessageConsumer;
29 import javax.jms.ObjectMessage;
30 import javax.jms.Session;
31 import lombok.extern.slf4j.Slf4j;
32 import org.apache.activemq.ActiveMQConnectionFactory;
33 import org.drools.KnowledgeBase;
34 import org.drools.KnowledgeBaseConfiguration;
35 import org.drools.KnowledgeBaseFactory;
36 import org.drools.builder.KnowledgeBuilder;
37 import org.drools.builder.KnowledgeBuilderFactory;
38 import org.drools.builder.ResourceType;
39 import org.drools.conf.EventProcessingOption;
40 import org.drools.definition.KnowledgePackage;
41 import org.drools.io.Resource;
42 import org.drools.io.ResourceFactory;
43 import org.drools.runtime.StatefulKnowledgeSession;
44 import org.drools.runtime.rule.FactHandle;
45 import org.glassfish.hk2.api.IterableProvider;
46 import org.jvnet.hk2.annotations.Service;
47 import org.openo.holmes.common.api.entity.CorrelationRule;
48 import org.openo.holmes.common.api.stat.Alarm;
49 import org.openo.holmes.common.config.MQConfig;
50 import org.openo.holmes.common.constant.AlarmConst;
51 import org.openo.holmes.common.exception.CorrelationException;
52 import org.openo.holmes.common.utils.ExceptionUtil;
53 import org.openo.holmes.common.utils.I18nProxy;
54 import org.openo.holmes.engine.request.DeployRuleRequest;
55 import org.openo.holmes.engine.wrapper.RuleMgtWrapper;
59 public class DroolsEngine {
61 private final static String CORRELATION_RULE = "CORRELATION_RULE";
63 private final static String CORRELATION_ALARM = "CORRELATION_ALARM";
65 private final static int ENABLE = 1;
68 private RuleMgtWrapper ruleMgtWrapper;
70 private KnowledgeBase kbase;
72 private KnowledgeBaseConfiguration kconf;
74 private StatefulKnowledgeSession ksession;
76 private KnowledgeBuilder kbuilder;
79 private IterableProvider<MQConfig> mqConfigProvider;
81 private ConnectionFactory connectionFactory;
88 // 2. start mq listener
89 registerAlarmTopicListener();
90 } catch (Exception e) {
91 log.error("Start service failed: " + e.getMessage(), e);
92 throw ExceptionUtil.buildExceptionResponse("Start service failed!");
96 private void registerAlarmTopicListener() {
98 "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;
99 connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
100 mqConfigProvider.get().brokerPassword, brokerURL);
102 Thread thread = new Thread(new AlarmMqMessageListener());
107 private void start() throws CorrelationException {
108 log.info("Drools Engine Initialize Beginning...");
110 initEngineParameter();
113 log.info("Business Rule Engine Initialize Successfully.");
117 this.ksession.dispose();
120 private void initEngineParameter(){
121 this.kconf = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
123 this.kconf.setOption(EventProcessingOption.STREAM);
125 this.kconf.setProperty("drools.assertBehaviour", "equality");
127 this.kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
129 this.kbase = KnowledgeBaseFactory.newKnowledgeBase("D-ENGINE", this.kconf);
131 this.ksession = kbase.newStatefulKnowledgeSession();
134 private void initDeployRule() throws CorrelationException {
135 List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);
137 if (rules.isEmpty()) {
140 for (CorrelationRule rule : rules) {
141 if (rule.getContent() != null) {
142 deployRuleFromDB(rule.getContent());
147 private void deployRuleFromDB(String ruleContent) throws CorrelationException {
148 StringReader reader = new StringReader(ruleContent);
149 Resource res = ResourceFactory.newReaderResource(reader);
151 if (kbuilder == null) {
152 kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
155 kbuilder.add(res, ResourceType.DRL);
159 kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
160 } catch (Exception e) {
161 throw new CorrelationException(e.getMessage(), e);
164 ksession.fireAllRules();
167 public synchronized String deployRule(DeployRuleRequest rule, Locale locale)
168 throws CorrelationException {
169 StringReader reader = new StringReader(rule.getContent());
170 Resource res = ResourceFactory.newReaderResource(reader);
172 if (kbuilder == null) {
173 kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
176 kbuilder.add(res, ResourceType.DRL);
178 if (kbuilder.hasErrors()) {
180 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
181 I18nProxy.ENGINE_CONTENT_ILLEGALITY,
182 new String[]{kbuilder.getErrors().toString()});
183 throw new CorrelationException(errorMsg);
186 KnowledgePackage kpackage = kbuilder.getKnowledgePackages().iterator().next();
188 if (kbase.getKnowledgePackages().contains(kpackage)) {
190 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
191 I18nProxy.ENGINE_CONTENT_ILLEGALITY,new String[]{
192 I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_CONTAINS_PACKAGE)});
194 throw new CorrelationException(errorMsg);
198 kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
199 } catch (Exception e) {
202 I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_DEPLOY_RULE_FAILED);
203 throw new CorrelationException(errorMsg, e);
206 ksession.fireAllRules();
207 return kpackage.getName();
210 public synchronized void undeployRule(String packageName, Locale locale)
211 throws CorrelationException {
213 KnowledgePackage pkg = kbase.getKnowledgePackage(packageName);
216 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
217 I18nProxy.ENGINE_DELETE_RULE_NULL,
218 new String[]{packageName});
219 throw new CorrelationException(errorMsg);
224 kbase.removeKnowledgePackage(pkg.getName());
225 } catch (Exception e) {
226 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
227 I18nProxy.ENGINE_DELETE_RULE_FAILED, new String[]{packageName});
228 throw new CorrelationException(errorMsg, e);
232 public void compileRule(String content, Locale locale)
233 throws CorrelationException {
234 StringReader reader = new StringReader(content);
235 Resource res = ResourceFactory.newReaderResource(reader);
237 if (kbuilder == null) {
238 kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
241 kbuilder.add(res, ResourceType.DRL);
243 if (kbuilder.hasErrors()) {
244 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
245 I18nProxy.ENGINE_CONTENT_ILLEGALITY,
246 new String[]{kbuilder.getErrors().toString()});
248 throw new CorrelationException(errorMsg);
252 public void putRaisedIntoStream(Alarm raiseAlarm) {
253 FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm);
254 if (factHandle != null) {
255 this.ksession.retract(factHandle);
257 this.ksession.insert(raiseAlarm);
258 this.ksession.fireAllRules();
261 class AlarmMqMessageListener implements Runnable {
264 Connection connection;
266 Destination destination;
267 MessageConsumer messageConsumer;
270 connection = connectionFactory.createConnection();
272 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
273 destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
274 messageConsumer = session.createConsumer(destination);
277 ObjectMessage objMessage = (ObjectMessage) messageConsumer.receive(100000);
278 if (objMessage != null) {
279 putRaisedIntoStream((Alarm) objMessage.getObject());
284 } catch (JMSException e) {
285 log.error("connection mq service Failed: " + e.getMessage(), e);