2 * Copyright 2017 ZTE Corporation.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.openo.holmes.engine.manager;
19 import java.io.Serializable;
20 import java.io.StringReader;
21 import java.util.List;
22 import java.util.Locale;
23 import javax.annotation.PostConstruct;
24 import javax.inject.Inject;
25 import javax.jms.Connection;
26 import javax.jms.ConnectionFactory;
27 import javax.jms.Destination;
28 import javax.jms.JMSException;
29 import javax.jms.Message;
30 import javax.jms.MessageConsumer;
31 import javax.jms.MessageListener;
32 import javax.jms.Session;
33 import lombok.extern.slf4j.Slf4j;
34 import org.apache.activemq.ActiveMQConnectionFactory;
35 import org.apache.activemq.command.ActiveMQObjectMessage;
36 import org.drools.KnowledgeBase;
37 import org.drools.KnowledgeBaseConfiguration;
38 import org.drools.KnowledgeBaseFactory;
39 import org.drools.builder.KnowledgeBuilder;
40 import org.drools.builder.KnowledgeBuilderFactory;
41 import org.drools.builder.ResourceType;
42 import org.drools.conf.EventProcessingOption;
43 import org.drools.definition.KnowledgePackage;
44 import org.drools.io.Resource;
45 import org.drools.io.ResourceFactory;
46 import org.drools.runtime.StatefulKnowledgeSession;
47 import org.drools.runtime.rule.FactHandle;
48 import org.glassfish.hk2.api.IterableProvider;
49 import org.jvnet.hk2.annotations.Service;
50 import org.openo.holmes.common.api.entity.CorrelationRule;
51 import org.openo.holmes.common.api.stat.Alarm;
52 import org.openo.holmes.common.config.MQConfig;
53 import org.openo.holmes.common.constant.AlarmConst;
54 import org.openo.holmes.common.exception.CorrelationException;
55 import org.openo.holmes.common.utils.ExceptionUtil;
56 import org.openo.holmes.common.utils.I18nProxy;
57 import org.openo.holmes.engine.request.DeployRuleRequest;
58 import org.openo.holmes.engine.wrapper.RuleMgtWrapper;
62 public class DroolsEngine {
63 private final static int ENABLE = 1;
66 private RuleMgtWrapper ruleMgtWrapper;
68 private KnowledgeBase kbase;
70 private KnowledgeBaseConfiguration kconf;
72 private StatefulKnowledgeSession ksession;
74 private KnowledgeBuilder kbuilder;
77 private IterableProvider<MQConfig> mqConfigProvider;
79 private ConnectionFactory connectionFactory;
86 // 2. start mq listener
87 registerAlarmTopicListener();
88 } catch (Exception e) {
89 log.error("Start service failed: " + e.getMessage(), e);
90 throw ExceptionUtil.buildExceptionResponse("Start service failed!");
94 private void registerAlarmTopicListener() {
96 "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;
97 connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
98 mqConfigProvider.get().brokerPassword, brokerURL);
100 AlarmMqMessageListener listener = new AlarmMqMessageListener();
105 private void start() throws CorrelationException {
106 log.info("Drools Engine Initialize Beginning...");
108 initEngineParameter();
111 log.info("Business Rule Engine Initialize Successfully.");
115 this.ksession.dispose();
118 private void initEngineParameter(){
119 this.kconf = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
121 this.kconf.setOption(EventProcessingOption.STREAM);
123 this.kconf.setProperty("drools.assertBehaviour", "equality");
125 this.kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
127 this.kbase = KnowledgeBaseFactory.newKnowledgeBase("D-ENGINE", this.kconf);
129 this.ksession = kbase.newStatefulKnowledgeSession();
132 private void initDeployRule() throws CorrelationException {
133 List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);
135 if (rules.isEmpty()) {
138 for (CorrelationRule rule : rules) {
139 if (rule.getContent() != null) {
140 deployRuleFromDB(rule.getContent());
145 private void deployRuleFromDB(String ruleContent) throws CorrelationException {
146 StringReader reader = new StringReader(ruleContent);
147 Resource res = ResourceFactory.newReaderResource(reader);
149 if (kbuilder == null) {
150 kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
153 kbuilder.add(res, ResourceType.DRL);
157 kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
158 } catch (Exception e) {
159 throw new CorrelationException(e.getMessage(), e);
162 ksession.fireAllRules();
165 public synchronized String deployRule(DeployRuleRequest rule, Locale locale)
166 throws CorrelationException {
167 StringReader reader = new StringReader(rule.getContent());
168 Resource res = ResourceFactory.newReaderResource(reader);
170 if (kbuilder == null) {
171 kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
174 kbuilder.add(res, ResourceType.DRL);
176 if (kbuilder.hasErrors()) {
178 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
179 I18nProxy.ENGINE_CONTENT_ILLEGALITY,
180 new String[]{kbuilder.getErrors().toString()});
181 throw new CorrelationException(errorMsg);
184 KnowledgePackage kpackage = kbuilder.getKnowledgePackages().iterator().next();
186 if (kbase.getKnowledgePackages().contains(kpackage)) {
188 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
189 I18nProxy.ENGINE_CONTENT_ILLEGALITY,new String[]{
190 I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_CONTAINS_PACKAGE)});
192 throw new CorrelationException(errorMsg);
196 kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
197 } catch (Exception e) {
200 I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_DEPLOY_RULE_FAILED);
201 throw new CorrelationException(errorMsg, e);
204 ksession.fireAllRules();
205 return kpackage.getName();
208 public synchronized void undeployRule(String packageName, Locale locale)
209 throws CorrelationException {
211 KnowledgePackage pkg = kbase.getKnowledgePackage(packageName);
214 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
215 I18nProxy.ENGINE_DELETE_RULE_NULL,
216 new String[]{packageName});
217 throw new CorrelationException(errorMsg);
222 kbase.removeKnowledgePackage(pkg.getName());
223 } catch (Exception e) {
224 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
225 I18nProxy.ENGINE_DELETE_RULE_FAILED, new String[]{packageName});
226 throw new CorrelationException(errorMsg, e);
230 public void compileRule(String content, Locale locale)
231 throws CorrelationException {
232 StringReader reader = new StringReader(content);
233 Resource res = ResourceFactory.newReaderResource(reader);
235 if (kbuilder == null) {
236 kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
239 kbuilder.add(res, ResourceType.DRL);
241 if (kbuilder.hasErrors()) {
242 String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
243 I18nProxy.ENGINE_CONTENT_ILLEGALITY,
244 new String[]{kbuilder.getErrors().toString()});
246 throw new CorrelationException(errorMsg);
250 public void putRaisedIntoStream(Alarm raiseAlarm) {
251 FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm);
252 if (factHandle != null) {
253 this.ksession.retract(factHandle);
255 this.ksession.insert(raiseAlarm);
256 this.ksession.fireAllRules();
259 class AlarmMqMessageListener implements MessageListener {
261 private Connection connection = null;
262 private Session session = null;
263 private Destination destination = null;
264 private MessageConsumer consumer = null;
266 private void initialize() throws JMSException {
267 connection = connectionFactory.createConnection();
268 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
269 destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
270 consumer = session.createConsumer(destination);
274 public void receive() {
277 consumer.setMessageListener(this);
278 } catch (JMSException e) {
279 log.error("Failed to connect to the MQ service : " + e.getMessage(), e);
282 } catch (JMSException e1) {
283 log.error("Failed close connection " + e1.getMessage(), e1);
288 public void onMessage(Message arg0) {
289 ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;
291 Serializable object = objectMessage.getObject();
293 if (object instanceof Alarm) {
294 Alarm alarm = (Alarm) object;
295 putRaisedIntoStream(alarm);
297 } catch (JMSException e) {
298 log.error("Failed get object : " + e.getMessage(), e);
302 private void close() throws JMSException {
303 if (consumer != null)
307 if (connection != null)