2 * Copyright 2017 ZTE Corporation.
\r
4 * Licensed under the Apache License, Version 2.0 (the "License");
\r
5 * you may not use this file except in compliance with the License.
\r
6 * You may obtain a copy of the License at
\r
8 * http://www.apache.org/licenses/LICENSE-2.0
\r
10 * Unless required by applicable law or agreed to in writing, software
\r
11 * distributed under the License is distributed on an "AS IS" BASIS,
\r
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
13 * See the License for the specific language governing permissions and
\r
14 * limitations under the License.
\r
16 package org.openo.holmes.engine.manager;
\r
19 import java.io.Serializable;
\r
20 import java.io.StringReader;
\r
21 import java.util.HashSet;
\r
22 import java.util.List;
\r
23 import java.util.Locale;
\r
24 import java.util.Set;
\r
25 import javax.annotation.PostConstruct;
\r
26 import javax.inject.Inject;
\r
27 import javax.jms.Connection;
\r
28 import javax.jms.ConnectionFactory;
\r
29 import javax.jms.Destination;
\r
30 import javax.jms.JMSException;
\r
31 import javax.jms.Message;
\r
32 import javax.jms.MessageConsumer;
\r
33 import javax.jms.MessageListener;
\r
34 import javax.jms.Session;
\r
35 import lombok.extern.slf4j.Slf4j;
\r
36 import org.apache.activemq.ActiveMQConnectionFactory;
\r
37 import org.apache.activemq.command.ActiveMQObjectMessage;
\r
38 import org.drools.KnowledgeBase;
\r
39 import org.drools.KnowledgeBaseConfiguration;
\r
40 import org.drools.KnowledgeBaseFactory;
\r
41 import org.drools.builder.KnowledgeBuilder;
\r
42 import org.drools.builder.KnowledgeBuilderFactory;
\r
43 import org.drools.builder.ResourceType;
\r
44 import org.drools.conf.EventProcessingOption;
\r
45 import org.drools.definition.KnowledgePackage;
\r
46 import org.drools.io.Resource;
\r
47 import org.drools.io.ResourceFactory;
\r
48 import org.drools.runtime.StatefulKnowledgeSession;
\r
49 import org.drools.runtime.rule.FactHandle;
\r
50 import org.glassfish.hk2.api.IterableProvider;
\r
51 import org.jvnet.hk2.annotations.Service;
\r
52 import org.openo.holmes.common.api.entity.CorrelationRule;
\r
53 import org.openo.holmes.common.api.stat.Alarm;
\r
54 import org.openo.holmes.common.config.MQConfig;
\r
55 import org.openo.holmes.common.constant.AlarmConst;
\r
56 import org.openo.holmes.common.exception.CorrelationException;
\r
57 import org.openo.holmes.common.utils.ExceptionUtil;
\r
58 import org.openo.holmes.common.utils.I18nProxy;
\r
59 import org.openo.holmes.engine.request.DeployRuleRequest;
\r
60 import org.openo.holmes.engine.wrapper.RuleMgtWrapper;
\r
64 public class DroolsEngine {
\r
66 private final static int ENABLE = 1;
\r
67 private final Set<String> packageNames = new HashSet<String>();
\r
69 private RuleMgtWrapper ruleMgtWrapper;
\r
70 private KnowledgeBase kbase;
\r
71 private KnowledgeBaseConfiguration kconf;
\r
72 private StatefulKnowledgeSession ksession;
\r
74 private IterableProvider<MQConfig> mqConfigProvider;
\r
75 private ConnectionFactory connectionFactory;
\r
78 private void init() {
\r
82 // 2. start mq listener
\r
83 registerAlarmTopicListener();
\r
84 } catch (Exception e) {
\r
85 log.error("Start service failed: " + e.getMessage(), e);
\r
86 throw ExceptionUtil.buildExceptionResponse("Start service failed!");
\r
90 private void registerAlarmTopicListener() {
\r
92 "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;
\r
93 connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
\r
94 mqConfigProvider.get().brokerPassword, brokerURL);
\r
96 AlarmMqMessageListener listener = new AlarmMqMessageListener();
\r
101 private void start() throws CorrelationException {
\r
102 log.info("Drools Engine Initialize Beginning...");
\r
104 initEngineParameter();
\r
107 log.info("Business Rule Engine Initialize Successfully.");
\r
110 public void stop() {
\r
111 this.ksession.dispose();
\r
114 private void initEngineParameter() {
\r
115 this.kconf = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
\r
117 this.kconf.setOption(EventProcessingOption.STREAM);
\r
119 this.kconf.setProperty("drools.assertBehaviour", "equality");
\r
121 this.kbase = KnowledgeBaseFactory.newKnowledgeBase("D-ENGINE", this.kconf);
\r
123 this.ksession = kbase.newStatefulKnowledgeSession();
\r
126 private void initDeployRule() throws CorrelationException {
\r
127 List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);
\r
129 if (rules.isEmpty()) {
\r
132 for (CorrelationRule rule : rules) {
\r
133 if (rule.getContent() != null) {
\r
134 deployRuleFromDB(rule.getContent());
\r
139 private void deployRuleFromDB(String ruleContent) throws CorrelationException {
\r
140 StringReader reader = new StringReader(ruleContent);
\r
141 Resource res = ResourceFactory.newReaderResource(reader);
\r
143 KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
\r
145 kbuilder.add(res, ResourceType.DRL);
\r
149 kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
\r
150 } catch (Exception e) {
\r
151 throw new CorrelationException(e.getMessage(), e);
\r
153 ksession.fireAllRules();
\r
156 public synchronized String deployRule(DeployRuleRequest rule, Locale locale)
\r
157 throws CorrelationException {
\r
158 StringReader reader = new StringReader(rule.getContent());
\r
159 Resource res = ResourceFactory.newReaderResource(reader);
\r
161 KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
\r
163 kbuilder.add(res, ResourceType.DRL);
\r
165 judgeRuleContent(locale, kbuilder, true);
\r
167 String packageName = kbuilder.getKnowledgePackages().iterator().next().getName();
\r
169 packageNames.add(packageName);
\r
170 kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
\r
171 } catch (Exception e) {
\r
174 I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_DEPLOY_RULE_FAILED);
\r
175 throw new CorrelationException(errorMsg, e);
\r
178 ksession.fireAllRules();
\r
179 return packageName;
\r
182 public synchronized void undeployRule(String packageName, Locale locale)
\r
183 throws CorrelationException {
\r
185 KnowledgePackage pkg = kbase.getKnowledgePackage(packageName);
\r
188 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
\r
189 I18nProxy.ENGINE_DELETE_RULE_NULL,
\r
190 new String[]{packageName});
\r
191 throw new CorrelationException(errorMsg);
\r
196 kbase.removeKnowledgePackage(pkg.getName());
\r
197 } catch (Exception e) {
\r
198 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
\r
199 I18nProxy.ENGINE_DELETE_RULE_FAILED, new String[]{packageName});
\r
200 throw new CorrelationException(errorMsg, e);
\r
204 public void compileRule(String content, Locale locale)
\r
205 throws CorrelationException {
\r
206 StringReader reader = new StringReader(content);
\r
207 Resource res = ResourceFactory.newReaderResource(reader);
\r
209 KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
\r
211 kbuilder.add(res, ResourceType.DRL);
\r
213 judgeRuleContent(locale, kbuilder, false);
\r
216 private void judgeRuleContent(Locale locale, KnowledgeBuilder kbuilder, boolean judgePackageName)
\r
217 throws CorrelationException {
\r
218 if (kbuilder.hasErrors()) {
\r
219 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
\r
220 I18nProxy.ENGINE_CONTENT_ILLEGALITY,
\r
221 new String[]{kbuilder.getErrors().toString()});
\r
222 log.error(errorMsg);
\r
223 throw new CorrelationException(errorMsg);
\r
226 String packageName = kbuilder.getKnowledgePackages().iterator().next().getName();
\r
228 if (packageNames.contains(packageName) && judgePackageName) {
\r
229 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
\r
230 I18nProxy.ENGINE_CONTENT_ILLEGALITY, new String[]{
\r
231 I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_CONTAINS_PACKAGE)});
\r
233 throw new CorrelationException(errorMsg);
\r
237 public void putRaisedIntoStream(Alarm raiseAlarm) {
\r
238 FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm);
\r
239 if (factHandle != null) {
\r
240 this.ksession.retract(factHandle);
\r
242 this.ksession.insert(raiseAlarm);
\r
243 this.ksession.fireAllRules();
\r
246 class AlarmMqMessageListener implements MessageListener {
\r
248 private Connection connection = null;
\r
249 private Session session = null;
\r
250 private Destination destination = null;
\r
251 private MessageConsumer consumer = null;
\r
253 private void initialize() throws JMSException {
\r
254 connection = connectionFactory.createConnection();
\r
255 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
\r
256 destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
\r
257 consumer = session.createConsumer(destination);
\r
258 connection.start();
\r
261 public void receive() {
\r
264 consumer.setMessageListener(this);
\r
265 } catch (JMSException e) {
\r
266 log.error("Failed to connect to the MQ service : " + e.getMessage(), e);
\r
269 } catch (JMSException e1) {
\r
270 log.error("Failed close connection " + e1.getMessage(), e1);
\r
275 public void onMessage(Message arg0) {
\r
276 ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;
\r
278 Serializable object = objectMessage.getObject();
\r
280 if (object instanceof Alarm) {
\r
281 Alarm alarm = (Alarm) object;
\r
282 putRaisedIntoStream(alarm);
\r
284 } catch (JMSException e) {
\r
285 log.error("Failed get object : " + e.getMessage(), e);
\r
289 private void close() throws JMSException {
\r
290 if (consumer != null) {
\r
293 if (session != null) {
\r
296 if (connection != null) {
\r
297 connection.close();
\r