2 * Copyright 2017 ZTE Corporation.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.openo.holmes.engine.manager;
19 import java.io.StringReader;
20 import java.util.List;
21 import java.util.Locale;
22 import javax.annotation.PostConstruct;
23 import javax.inject.Inject;
24 import javax.jms.Connection;
25 import javax.jms.ConnectionFactory;
26 import javax.jms.Destination;
27 import javax.jms.JMSException;
28 import javax.jms.MessageConsumer;
29 import javax.jms.ObjectMessage;
30 import javax.jms.Session;
31 import lombok.extern.slf4j.Slf4j;
32 import org.apache.activemq.ActiveMQConnectionFactory;
33 import org.drools.KnowledgeBase;
34 import org.drools.KnowledgeBaseConfiguration;
35 import org.drools.KnowledgeBaseFactory;
36 import org.drools.builder.KnowledgeBuilder;
37 import org.drools.builder.KnowledgeBuilderFactory;
38 import org.drools.builder.ResourceType;
39 import org.drools.conf.EventProcessingOption;
40 import org.drools.definition.KnowledgePackage;
41 import org.drools.io.Resource;
42 import org.drools.io.ResourceFactory;
43 import org.drools.runtime.StatefulKnowledgeSession;
44 import org.drools.runtime.rule.FactHandle;
45 import org.glassfish.hk2.api.IterableProvider;
46 import org.jvnet.hk2.annotations.Service;
47 import org.openo.holmes.common.api.entity.CorrelationRule;
48 import org.openo.holmes.common.api.stat.Alarm;
49 import org.openo.holmes.common.config.MQConfig;
50 import org.openo.holmes.common.constant.AlarmConst;
51 import org.openo.holmes.common.exception.CorrelationException;
52 import org.openo.holmes.common.utils.ExceptionUtil;
53 import org.openo.holmes.common.utils.I18nProxy;
54 import org.openo.holmes.engine.request.DeployRuleRequest;
55 import org.openo.holmes.engine.wrapper.RuleMgtWrapper;
59 public class DroolsEngine {
61 private final static String CORRELATION_RULE = "CORRELATION_RULE";
63 private final static String CORRELATION_ALARM = "CORRELATION_ALARM";
65 private final static int ENABLE = 1;
68 private RuleMgtWrapper ruleMgtWrapper;
70 private KnowledgeBase kbase;
72 private KnowledgeBaseConfiguration kconf;
74 private StatefulKnowledgeSession ksession;
76 private KnowledgeBuilder kbuilder;
79 private IterableProvider<MQConfig> mqConfigProvider;
81 private ConnectionFactory connectionFactory;
88 // 2. start mq listener
89 registerAlarmTopicListener();
90 } catch (Exception e) {
91 log.error("Start service failed: " + e.getMessage(), e);
92 throw ExceptionUtil.buildExceptionResponse("Start service failed!");
96 private void registerAlarmTopicListener() {
98 "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;
99 connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
100 mqConfigProvider.get().brokerPassword, brokerURL);
102 Thread thread = new Thread(new AlarmMqMessageListener());
107 private void start() throws CorrelationException {
108 log.info("Drools Engine Initialize Beginning...");
110 initEngineParameter();
113 log.info("Business Rule Engine Initialize Successfully.");
117 this.ksession.dispose();
120 private void initEngineParameter(){
121 this.kconf = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
123 this.kconf.setOption(EventProcessingOption.STREAM);
125 this.kconf.setProperty("drools.assertBehaviour", "equality");
127 this.kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
129 this.kbase = KnowledgeBaseFactory.newKnowledgeBase("D-ENGINE", this.kconf);
131 this.ksession = kbase.newStatefulKnowledgeSession();
134 private void initDeployRule() throws CorrelationException {
135 List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);
137 if (!rules.isEmpty()) {
138 for (CorrelationRule rule : rules) {
139 if (rule.getContent() != null) {
140 deployRuleFromDB(rule.getContent());
146 private void deployRuleFromDB(String ruleContent) throws CorrelationException {
147 StringReader reader = new StringReader(ruleContent);
148 Resource res = ResourceFactory.newReaderResource(reader);
150 if (kbuilder == null) {
151 kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
154 kbuilder.add(res, ResourceType.DRL);
158 kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
159 } catch (Exception e) {
160 throw new CorrelationException(e.getMessage(), e);
163 ksession.fireAllRules();
166 public synchronized String deployRule(DeployRuleRequest rule, Locale locale)
167 throws CorrelationException {
168 StringReader reader = new StringReader(rule.getContent());
169 Resource res = ResourceFactory.newReaderResource(reader);
171 if (kbuilder == null) {
172 kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
175 kbuilder.add(res, ResourceType.DRL);
177 if (kbuilder.hasErrors()) {
179 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
180 I18nProxy.ENGINE_CONTENT_ILLEGALITY,
181 new String[]{kbuilder.getErrors().toString()});
182 throw new CorrelationException(errorMsg);
185 KnowledgePackage kpackage = kbuilder.getKnowledgePackages().iterator().next();
187 if (kbase.getKnowledgePackages().contains(kpackage)) {
189 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
190 I18nProxy.ENGINE_CONTENT_ILLEGALITY,new String[]{
191 I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_CONTAINS_PACKAGE)});
193 throw new CorrelationException(errorMsg);
197 kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
198 } catch (Exception e) {
201 I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_DEPLOY_RULE_FAILED);
202 throw new CorrelationException(errorMsg, e);
205 ksession.fireAllRules();
206 return kpackage.getName();
209 public synchronized void undeployRule(String packageName, Locale locale)
210 throws CorrelationException {
212 KnowledgePackage pkg = kbase.getKnowledgePackage(packageName);
215 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
216 I18nProxy.ENGINE_DELETE_RULE_NULL,
217 new String[]{packageName});
218 throw new CorrelationException(errorMsg);
223 kbase.removeKnowledgePackage(pkg.getName());
224 } catch (Exception e) {
225 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
226 I18nProxy.ENGINE_DELETE_RULE_FAILED, new String[]{packageName});
227 throw new CorrelationException(errorMsg, e);
231 public void compileRule(String content, Locale locale)
232 throws CorrelationException {
233 StringReader reader = new StringReader(content);
234 Resource res = ResourceFactory.newReaderResource(reader);
236 if (kbuilder == null) {
237 kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
240 kbuilder.add(res, ResourceType.DRL);
242 if (kbuilder.hasErrors()) {
243 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
244 I18nProxy.ENGINE_CONTENT_ILLEGALITY,
245 new String[]{kbuilder.getErrors().toString()});
247 throw new CorrelationException(errorMsg);
251 public void putRaisedIntoStream(Alarm raiseAlarm) {
252 FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm);
253 if (factHandle != null) {
254 this.ksession.retract(factHandle);
256 this.ksession.insert(raiseAlarm);
257 this.ksession.fireAllRules();
260 class AlarmMqMessageListener implements Runnable {
263 Connection connection;
265 Destination destination;
266 MessageConsumer messageConsumer;
269 connection = connectionFactory.createConnection();
271 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
272 destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
273 messageConsumer = session.createConsumer(destination);
276 ObjectMessage objMessage = (ObjectMessage) messageConsumer.receive(100000);
277 if (objMessage != null) {
278 putRaisedIntoStream((Alarm) objMessage.getObject());
283 } catch (JMSException e) {
284 log.error("connection mq service Failed: " + e.getMessage(), e);