ad1b4e57c12ece5754c2d4e1875ab4e4e68e27d0
[holmes/engine-management.git] / engine-d / src / main / java / org / openo / holmes / engine / manager / DroolsEngine.java
1 /**\r
2  * Copyright 2017 ZTE Corporation.\r
3  *\r
4  * Licensed under the Apache License, Version 2.0 (the "License");\r
5  * you may not use this file except in compliance with the License.\r
6  * You may obtain a copy of the License at\r
7  *\r
8  * http://www.apache.org/licenses/LICENSE-2.0\r
9  *\r
10  * Unless required by applicable law or agreed to in writing, software\r
11  * distributed under the License is distributed on an "AS IS" BASIS,\r
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
13  * See the License for the specific language governing permissions and\r
14  * limitations under the License.\r
15  */\r
16 package org.openo.holmes.engine.manager;\r
17 \r
18 \r
19 import java.io.Serializable;\r
20 import java.io.StringReader;\r
21 import java.util.List;\r
22 import java.util.Locale;\r
23 import javax.annotation.PostConstruct;\r
24 import javax.inject.Inject;\r
25 import javax.jms.Connection;\r
26 import javax.jms.ConnectionFactory;\r
27 import javax.jms.Destination;\r
28 import javax.jms.JMSException;\r
29 import javax.jms.Message;\r
30 import javax.jms.MessageConsumer;\r
31 import javax.jms.MessageListener;\r
32 import javax.jms.Session;\r
33 import lombok.extern.slf4j.Slf4j;\r
34 import org.apache.activemq.ActiveMQConnectionFactory;\r
35 import org.apache.activemq.command.ActiveMQObjectMessage;\r
36 import org.drools.KnowledgeBase;\r
37 import org.drools.KnowledgeBaseConfiguration;\r
38 import org.drools.KnowledgeBaseFactory;\r
39 import org.drools.builder.KnowledgeBuilder;\r
40 import org.drools.builder.KnowledgeBuilderFactory;\r
41 import org.drools.builder.ResourceType;\r
42 import org.drools.conf.EventProcessingOption;\r
43 import org.drools.definition.KnowledgePackage;\r
44 import org.drools.io.Resource;\r
45 import org.drools.io.ResourceFactory;\r
46 import org.drools.runtime.StatefulKnowledgeSession;\r
47 import org.drools.runtime.rule.FactHandle;\r
48 import org.glassfish.hk2.api.IterableProvider;\r
49 import org.jvnet.hk2.annotations.Service;\r
50 import org.openo.holmes.common.api.entity.CorrelationRule;\r
51 import org.openo.holmes.common.api.stat.Alarm;\r
52 import org.openo.holmes.common.config.MQConfig;\r
53 import org.openo.holmes.common.constant.AlarmConst;\r
54 import org.openo.holmes.common.exception.CorrelationException;\r
55 import org.openo.holmes.common.utils.ExceptionUtil;\r
56 import org.openo.holmes.common.utils.I18nProxy;\r
57 import org.openo.holmes.engine.request.DeployRuleRequest;\r
58 import org.openo.holmes.engine.wrapper.RuleMgtWrapper;\r
59 \r
60 @Slf4j\r
61 @Service\r
62 public class DroolsEngine {\r
63 \r
64     private final static int ENABLE = 1;\r
65 \r
66     @Inject\r
67     private RuleMgtWrapper ruleMgtWrapper;\r
68 \r
69     private KnowledgeBase kbase;\r
70 \r
71     private KnowledgeBaseConfiguration kconf;\r
72 \r
73     private StatefulKnowledgeSession ksession;\r
74 \r
75     private KnowledgeBuilder kbuilder;\r
76 \r
77     @Inject\r
78     private IterableProvider<MQConfig> mqConfigProvider;\r
79 \r
80     private ConnectionFactory connectionFactory;\r
81 \r
82     @PostConstruct\r
83     private void init() {\r
84         try {\r
85             // 1. start engine\r
86             start();\r
87             // 2. start mq listener\r
88             registerAlarmTopicListener();\r
89         } catch (Exception e) {\r
90             log.error("Start service failed: " + e.getMessage(), e);\r
91             throw ExceptionUtil.buildExceptionResponse("Start service failed!");\r
92         }\r
93     }\r
94 \r
95     private void registerAlarmTopicListener() {\r
96         String brokerURL =\r
97                 "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;\r
98         connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,\r
99                 mqConfigProvider.get().brokerPassword, brokerURL);\r
100 \r
101         AlarmMqMessageListener listener = new AlarmMqMessageListener();\r
102         listener.receive();\r
103     }\r
104 \r
105 \r
106     private void start() throws CorrelationException {\r
107         log.info("Drools Engine Initialize Beginning...");\r
108 \r
109         initEngineParameter();\r
110         initDeployRule();\r
111 \r
112         log.info("Business Rule Engine Initialize Successfully.");\r
113     }\r
114 \r
115     public void stop() {\r
116         this.ksession.dispose();\r
117     }\r
118 \r
119     private void initEngineParameter() {\r
120         this.kconf = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();\r
121 \r
122         this.kconf.setOption(EventProcessingOption.STREAM);\r
123 \r
124         this.kconf.setProperty("drools.assertBehaviour", "equality");\r
125 \r
126         this.kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();\r
127 \r
128         this.kbase = KnowledgeBaseFactory.newKnowledgeBase("D-ENGINE", this.kconf);\r
129 \r
130         this.ksession = kbase.newStatefulKnowledgeSession();\r
131     }\r
132 \r
133     private void initDeployRule() throws CorrelationException {\r
134         List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);\r
135 \r
136         if (rules.isEmpty()) {\r
137             return;\r
138         }\r
139         for (CorrelationRule rule : rules) {\r
140             if (rule.getContent() != null) {\r
141                 deployRuleFromDB(rule.getContent());\r
142             }\r
143         }\r
144     }\r
145 \r
146     private void deployRuleFromDB(String ruleContent) throws CorrelationException {\r
147         StringReader reader = new StringReader(ruleContent);\r
148         Resource res = ResourceFactory.newReaderResource(reader);\r
149 \r
150         kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();\r
151 \r
152         kbuilder.add(res, ResourceType.DRL);\r
153 \r
154         try {\r
155 \r
156             kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());\r
157         } catch (Exception e) {\r
158             throw new CorrelationException(e.getMessage(), e);\r
159         }\r
160         ksession.fireAllRules();\r
161     }\r
162 \r
163     public synchronized String deployRule(DeployRuleRequest rule, Locale locale)\r
164             throws CorrelationException {\r
165         StringReader reader = new StringReader(rule.getContent());\r
166         Resource res = ResourceFactory.newReaderResource(reader);\r
167 \r
168         kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();\r
169 \r
170         kbuilder.add(res, ResourceType.DRL);\r
171 \r
172         if (kbuilder.hasErrors()) {\r
173 \r
174             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,\r
175                     I18nProxy.ENGINE_CONTENT_ILLEGALITY,\r
176                     new String[]{kbuilder.getErrors().toString()});\r
177             throw new CorrelationException(errorMsg);\r
178         }\r
179 \r
180         KnowledgePackage kpackage = kbuilder.getKnowledgePackages().iterator().next();\r
181 \r
182         if (kbase.getKnowledgePackages().contains(kpackage)) {\r
183 \r
184             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,\r
185                     I18nProxy.ENGINE_CONTENT_ILLEGALITY, new String[]{\r
186                             I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_CONTAINS_PACKAGE)});\r
187 \r
188             throw new CorrelationException(errorMsg);\r
189         }\r
190         try {\r
191 \r
192             kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());\r
193         } catch (Exception e) {\r
194 \r
195             String errorMsg =\r
196                     I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_DEPLOY_RULE_FAILED);\r
197             throw new CorrelationException(errorMsg, e);\r
198         }\r
199 \r
200         ksession.fireAllRules();\r
201         return kpackage.getName();\r
202     }\r
203 \r
204     public synchronized void undeployRule(String packageName, Locale locale)\r
205             throws CorrelationException {\r
206 \r
207         KnowledgePackage pkg = kbase.getKnowledgePackage(packageName);\r
208 \r
209         if (null == pkg) {\r
210             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,\r
211                     I18nProxy.ENGINE_DELETE_RULE_NULL,\r
212                     new String[]{packageName});\r
213             throw new CorrelationException(errorMsg);\r
214         }\r
215 \r
216         try {\r
217 \r
218             kbase.removeKnowledgePackage(pkg.getName());\r
219         } catch (Exception e) {\r
220             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,\r
221                     I18nProxy.ENGINE_DELETE_RULE_FAILED, new String[]{packageName});\r
222             throw new CorrelationException(errorMsg, e);\r
223         }\r
224     }\r
225 \r
226     public void compileRule(String content, Locale locale)\r
227             throws CorrelationException {\r
228         StringReader reader = new StringReader(content);\r
229         Resource res = ResourceFactory.newReaderResource(reader);\r
230 \r
231         kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();\r
232 \r
233         kbuilder.add(res, ResourceType.DRL);\r
234 \r
235         if (kbuilder.hasErrors()) {\r
236             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,\r
237                     I18nProxy.ENGINE_CONTENT_ILLEGALITY,\r
238                     new String[]{kbuilder.getErrors().toString()});\r
239             log.error(errorMsg);\r
240             throw new CorrelationException(errorMsg);\r
241         }\r
242     }\r
243 \r
244     public void putRaisedIntoStream(Alarm raiseAlarm) {\r
245         FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm);\r
246         if (factHandle != null) {\r
247             this.ksession.retract(factHandle);\r
248         }\r
249         this.ksession.insert(raiseAlarm);\r
250         this.ksession.fireAllRules();\r
251     }\r
252 \r
253     class AlarmMqMessageListener implements MessageListener {\r
254 \r
255         private Connection connection = null;\r
256         private Session session = null;\r
257         private Destination destination = null;\r
258         private MessageConsumer consumer = null;\r
259 \r
260         private void initialize() throws JMSException {\r
261             connection = connectionFactory.createConnection();\r
262             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);\r
263             destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);\r
264             consumer = session.createConsumer(destination);\r
265             connection.start();\r
266         }\r
267 \r
268         public void receive() {\r
269             try {\r
270                 initialize();\r
271                 consumer.setMessageListener(this);\r
272             } catch (JMSException e) {\r
273                 log.error("Failed to connect to the MQ service : " + e.getMessage(), e);\r
274                 try {\r
275                     close();\r
276                 } catch (JMSException e1) {\r
277                     log.error("Failed close connection  " + e1.getMessage(), e1);\r
278                 }\r
279             }\r
280         }\r
281 \r
282         public void onMessage(Message arg0) {\r
283             ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;\r
284             try {\r
285                 Serializable object = objectMessage.getObject();\r
286 \r
287                 if (object instanceof Alarm) {\r
288                     Alarm alarm = (Alarm) object;\r
289                     putRaisedIntoStream(alarm);\r
290                 }\r
291             } catch (JMSException e) {\r
292                 log.error("Failed get object : " + e.getMessage(), e);\r
293             }\r
294         }\r
295 \r
296         private void close() throws JMSException {\r
297             if (consumer != null) {\r
298                 consumer.close();\r
299             }\r
300             if (session != null) {\r
301                 session.close();\r
302             }\r
303             if (connection != null) {\r
304                 connection.close();\r
305             }\r
306         }\r
307     }\r
308 }\r