2 * Copyright 2017 ZTE Corporation.
\r
4 * Licensed under the Apache License, Version 2.0 (the "License");
\r
5 * you may not use this file except in compliance with the License.
\r
6 * You may obtain a copy of the License at
\r
8 * http://www.apache.org/licenses/LICENSE-2.0
\r
10 * Unless required by applicable law or agreed to in writing, software
\r
11 * distributed under the License is distributed on an "AS IS" BASIS,
\r
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
13 * See the License for the specific language governing permissions and
\r
14 * limitations under the License.
\r
16 package org.onap.holmes.engine.manager;
\r
19 import java.io.Serializable;
\r
20 import java.io.StringReader;
\r
21 import java.util.HashSet;
\r
22 import java.util.List;
\r
23 import java.util.Locale;
\r
24 import java.util.Set;
\r
25 import javax.annotation.PostConstruct;
\r
26 import javax.inject.Inject;
\r
27 import javax.jms.Connection;
\r
28 import javax.jms.ConnectionFactory;
\r
29 import javax.jms.Destination;
\r
30 import javax.jms.JMSException;
\r
31 import javax.jms.Message;
\r
32 import javax.jms.MessageConsumer;
\r
33 import javax.jms.MessageListener;
\r
34 import javax.jms.Session;
\r
35 import lombok.extern.slf4j.Slf4j;
\r
36 import org.apache.activemq.ActiveMQConnectionFactory;
\r
37 import org.apache.activemq.command.ActiveMQObjectMessage;
\r
38 import org.drools.KnowledgeBase;
\r
39 import org.drools.KnowledgeBaseConfiguration;
\r
40 import org.drools.KnowledgeBaseFactory;
\r
41 import org.drools.builder.KnowledgeBuilder;
\r
42 import org.drools.builder.KnowledgeBuilderFactory;
\r
43 import org.drools.builder.ResourceType;
\r
44 import org.drools.conf.EventProcessingOption;
\r
45 import org.drools.definition.KnowledgePackage;
\r
46 import org.drools.io.Resource;
\r
47 import org.drools.io.ResourceFactory;
\r
48 import org.drools.runtime.StatefulKnowledgeSession;
\r
49 import org.drools.runtime.rule.FactHandle;
\r
50 import org.glassfish.hk2.api.IterableProvider;
\r
51 import org.jvnet.hk2.annotations.Service;
\r
52 import org.onap.holmes.engine.request.DeployRuleRequest;
\r
53 import org.onap.holmes.common.api.entity.CorrelationRule;
\r
54 import org.onap.holmes.common.api.stat.Alarm;
\r
55 import org.onap.holmes.common.config.MQConfig;
\r
56 import org.onap.holmes.common.constant.AlarmConst;
\r
57 import org.onap.holmes.common.exception.CorrelationException;
\r
58 import org.onap.holmes.common.utils.ExceptionUtil;
\r
59 import org.onap.holmes.engine.wrapper.RuleMgtWrapper;
\r
63 public class DroolsEngine {
\r
65 private final static int ENABLE = 1;
\r
66 private final Set<String> packageNames = new HashSet<String>();
\r
68 private RuleMgtWrapper ruleMgtWrapper;
\r
69 private KnowledgeBase kbase;
\r
70 private KnowledgeBaseConfiguration kconf;
\r
71 private StatefulKnowledgeSession ksession;
\r
73 private IterableProvider<MQConfig> mqConfigProvider;
\r
74 private ConnectionFactory connectionFactory;
\r
77 private void init() {
\r
81 // 2. start mq listener
\r
82 registerAlarmTopicListener();
\r
83 } catch (Exception e) {
\r
84 log.error("Failed to start the service: " + e.getMessage(), e);
\r
85 throw ExceptionUtil.buildExceptionResponse("Failed to start the drools engine!");
\r
89 private void registerAlarmTopicListener() {
\r
91 "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;
\r
92 connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
\r
93 mqConfigProvider.get().brokerPassword, brokerURL);
\r
95 AlarmMqMessageListener listener = new AlarmMqMessageListener();
\r
100 private void start() throws CorrelationException {
\r
101 log.info("Drools Engine Initialize Beginning...");
\r
103 initEngineParameter();
\r
106 log.info("Business Rule Engine Initialize Successfully.");
\r
109 public void stop() {
\r
110 this.ksession.dispose();
\r
113 private void initEngineParameter() {
\r
114 this.kconf = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
\r
116 this.kconf.setOption(EventProcessingOption.STREAM);
\r
118 this.kconf.setProperty("drools.assertBehaviour", "equality");
\r
120 this.kbase = KnowledgeBaseFactory.newKnowledgeBase("D-ENGINE", this.kconf);
\r
122 this.ksession = kbase.newStatefulKnowledgeSession();
\r
125 private void initDeployRule() throws CorrelationException {
\r
126 List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);
\r
128 if (rules.isEmpty()) {
\r
131 for (CorrelationRule rule : rules) {
\r
132 if (rule.getContent() != null) {
\r
133 deployRuleFromDB(rule.getContent());
\r
138 private void deployRuleFromDB(String ruleContent) throws CorrelationException {
\r
139 StringReader reader = new StringReader(ruleContent);
\r
140 Resource res = ResourceFactory.newReaderResource(reader);
\r
142 KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
\r
144 kbuilder.add(res, ResourceType.DRL);
\r
148 kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
\r
149 } catch (Exception e) {
\r
150 throw new CorrelationException(e.getMessage(), e);
\r
152 ksession.fireAllRules();
\r
155 public synchronized String deployRule(DeployRuleRequest rule, Locale locale)
\r
156 throws CorrelationException {
\r
157 StringReader reader = new StringReader(rule.getContent());
\r
158 Resource res = ResourceFactory.newReaderResource(reader);
\r
160 KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
\r
162 kbuilder.add(res, ResourceType.DRL);
\r
164 judgeRuleContent(locale, kbuilder, true);
\r
166 String packageName = kbuilder.getKnowledgePackages().iterator().next().getName();
\r
168 packageNames.add(packageName);
\r
169 kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
\r
170 } catch (Exception e) {
\r
171 throw new CorrelationException("Failed to deploy the rule.", e);
\r
174 ksession.fireAllRules();
\r
175 return packageName;
\r
178 public synchronized void undeployRule(String packageName, Locale locale)
\r
179 throws CorrelationException {
\r
181 KnowledgePackage pkg = kbase.getKnowledgePackage(packageName);
\r
184 throw new CorrelationException("The rule " + packageName + " does not exist!");
\r
188 kbase.removeKnowledgePackage(pkg.getName());
\r
189 } catch (Exception e) {
\r
190 throw new CorrelationException("Failed to delete the rule: " + packageName, e);
\r
194 public void compileRule(String content, Locale locale)
\r
195 throws CorrelationException {
\r
196 StringReader reader = new StringReader(content);
\r
197 Resource res = ResourceFactory.newReaderResource(reader);
\r
199 KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
\r
201 kbuilder.add(res, ResourceType.DRL);
\r
203 judgeRuleContent(locale, kbuilder, false);
\r
206 private void judgeRuleContent(Locale locale, KnowledgeBuilder kbuilder, boolean judgePackageName)
\r
207 throws CorrelationException {
\r
208 if (kbuilder.hasErrors()) {
\r
209 String errorMsg = "There are errors in the rule: " + kbuilder.getErrors().toString();
\r
210 log.error(errorMsg);
\r
211 throw new CorrelationException(errorMsg);
\r
214 String packageName = kbuilder.getKnowledgePackages().iterator().next().getName();
\r
216 if (packageNames.contains(packageName) && judgePackageName) {
\r
217 throw new CorrelationException("The rule " + packageName + " already exists in the drools engine.");
\r
221 public void putRaisedIntoStream(Alarm raiseAlarm) {
\r
222 FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm);
\r
223 if (factHandle != null) {
\r
224 this.ksession.retract(factHandle);
\r
226 this.ksession.insert(raiseAlarm);
\r
227 this.ksession.fireAllRules();
\r
230 class AlarmMqMessageListener implements MessageListener {
\r
232 private Connection connection = null;
\r
233 private Session session = null;
\r
234 private Destination destination = null;
\r
235 private MessageConsumer consumer = null;
\r
237 private void initialize() throws JMSException {
\r
238 connection = connectionFactory.createConnection();
\r
239 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
\r
240 destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
\r
241 consumer = session.createConsumer(destination);
\r
242 connection.start();
\r
245 public void receive() {
\r
248 consumer.setMessageListener(this);
\r
249 } catch (JMSException e) {
\r
250 log.error("Failed to connect to the MQ service : " + e.getMessage(), e);
\r
253 } catch (JMSException e1) {
\r
254 log.error("Failed close connection " + e1.getMessage(), e1);
\r
259 public void onMessage(Message arg0) {
\r
260 ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;
\r
262 Serializable object = objectMessage.getObject();
\r
264 if (object instanceof Alarm) {
\r
265 Alarm alarm = (Alarm) object;
\r
266 putRaisedIntoStream(alarm);
\r
268 } catch (JMSException e) {
\r
269 log.error("Failed get object : " + e.getMessage(), e);
\r
273 private void close() throws JMSException {
\r
274 if (consumer != null) {
\r
277 if (session != null) {
\r
280 if (connection != null) {
\r
281 connection.close();
\r