Fix compileRule
[holmes/engine-management.git] / engine-d / src / main / java / org / openo / holmes / engine / manager / DroolsEngine.java
1 /**\r
2  * Copyright 2017 ZTE Corporation.\r
3  *\r
4  * Licensed under the Apache License, Version 2.0 (the "License");\r
5  * you may not use this file except in compliance with the License.\r
6  * You may obtain a copy of the License at\r
7  *\r
8  * http://www.apache.org/licenses/LICENSE-2.0\r
9  *\r
10  * Unless required by applicable law or agreed to in writing, software\r
11  * distributed under the License is distributed on an "AS IS" BASIS,\r
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
13  * See the License for the specific language governing permissions and\r
14  * limitations under the License.\r
15  */\r
16 package org.openo.holmes.engine.manager;\r
17 \r
18 \r
19 import java.io.Serializable;\r
20 import java.io.StringReader;\r
21 import java.util.HashSet;\r
22 import java.util.List;\r
23 import java.util.Locale;\r
24 import java.util.Set;\r
25 import javax.annotation.PostConstruct;\r
26 import javax.inject.Inject;\r
27 import javax.jms.Connection;\r
28 import javax.jms.ConnectionFactory;\r
29 import javax.jms.Destination;\r
30 import javax.jms.JMSException;\r
31 import javax.jms.Message;\r
32 import javax.jms.MessageConsumer;\r
33 import javax.jms.MessageListener;\r
34 import javax.jms.Session;\r
35 import lombok.extern.slf4j.Slf4j;\r
36 import org.apache.activemq.ActiveMQConnectionFactory;\r
37 import org.apache.activemq.command.ActiveMQObjectMessage;\r
38 import org.drools.KnowledgeBase;\r
39 import org.drools.KnowledgeBaseConfiguration;\r
40 import org.drools.KnowledgeBaseFactory;\r
41 import org.drools.builder.KnowledgeBuilder;\r
42 import org.drools.builder.KnowledgeBuilderFactory;\r
43 import org.drools.builder.ResourceType;\r
44 import org.drools.conf.EventProcessingOption;\r
45 import org.drools.definition.KnowledgePackage;\r
46 import org.drools.io.Resource;\r
47 import org.drools.io.ResourceFactory;\r
48 import org.drools.runtime.StatefulKnowledgeSession;\r
49 import org.drools.runtime.rule.FactHandle;\r
50 import org.glassfish.hk2.api.IterableProvider;\r
51 import org.jvnet.hk2.annotations.Service;\r
52 import org.openo.holmes.common.api.entity.CorrelationRule;\r
53 import org.openo.holmes.common.api.stat.Alarm;\r
54 import org.openo.holmes.common.config.MQConfig;\r
55 import org.openo.holmes.common.constant.AlarmConst;\r
56 import org.openo.holmes.common.exception.CorrelationException;\r
57 import org.openo.holmes.common.utils.ExceptionUtil;\r
58 import org.openo.holmes.common.utils.I18nProxy;\r
59 import org.openo.holmes.engine.request.DeployRuleRequest;\r
60 import org.openo.holmes.engine.wrapper.RuleMgtWrapper;\r
61 \r
62 @Slf4j\r
63 @Service\r
64 public class DroolsEngine {\r
65 \r
66     private final static int ENABLE = 1;\r
67     private final Set<String> packageNames = new HashSet<String>();\r
68     @Inject\r
69     private RuleMgtWrapper ruleMgtWrapper;\r
70     private KnowledgeBase kbase;\r
71     private KnowledgeBaseConfiguration kconf;\r
72     private StatefulKnowledgeSession ksession;\r
73     @Inject\r
74     private IterableProvider<MQConfig> mqConfigProvider;\r
75     private ConnectionFactory connectionFactory;\r
76 \r
77     @PostConstruct\r
78     private void init() {\r
79         try {\r
80             // 1. start engine\r
81             start();\r
82             // 2. start mq listener\r
83             registerAlarmTopicListener();\r
84         } catch (Exception e) {\r
85             log.error("Start service failed: " + e.getMessage(), e);\r
86             throw ExceptionUtil.buildExceptionResponse("Start service failed!");\r
87         }\r
88     }\r
89 \r
90     private void registerAlarmTopicListener() {\r
91         String brokerURL =\r
92             "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;\r
93         connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,\r
94             mqConfigProvider.get().brokerPassword, brokerURL);\r
95 \r
96         AlarmMqMessageListener listener = new AlarmMqMessageListener();\r
97         listener.receive();\r
98     }\r
99 \r
100 \r
101     private void start() throws CorrelationException {\r
102         log.info("Drools Engine Initialize Beginning...");\r
103 \r
104         initEngineParameter();\r
105         initDeployRule();\r
106 \r
107         log.info("Business Rule Engine Initialize Successfully.");\r
108     }\r
109 \r
110     public void stop() {\r
111         this.ksession.dispose();\r
112     }\r
113 \r
114     private void initEngineParameter() {\r
115         this.kconf = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();\r
116 \r
117         this.kconf.setOption(EventProcessingOption.STREAM);\r
118 \r
119         this.kconf.setProperty("drools.assertBehaviour", "equality");\r
120 \r
121         this.kbase = KnowledgeBaseFactory.newKnowledgeBase("D-ENGINE", this.kconf);\r
122 \r
123         this.ksession = kbase.newStatefulKnowledgeSession();\r
124     }\r
125 \r
126     private void initDeployRule() throws CorrelationException {\r
127         List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);\r
128 \r
129         if (rules.isEmpty()) {\r
130             return;\r
131         }\r
132         for (CorrelationRule rule : rules) {\r
133             if (rule.getContent() != null) {\r
134                 deployRuleFromDB(rule.getContent());\r
135             }\r
136         }\r
137     }\r
138 \r
139     private void deployRuleFromDB(String ruleContent) throws CorrelationException {\r
140         StringReader reader = new StringReader(ruleContent);\r
141         Resource res = ResourceFactory.newReaderResource(reader);\r
142 \r
143         KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();\r
144 \r
145         kbuilder.add(res, ResourceType.DRL);\r
146 \r
147         try {\r
148 \r
149             kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());\r
150         } catch (Exception e) {\r
151             throw new CorrelationException(e.getMessage(), e);\r
152         }\r
153         ksession.fireAllRules();\r
154     }\r
155 \r
156     public synchronized String deployRule(DeployRuleRequest rule, Locale locale)\r
157         throws CorrelationException {\r
158         StringReader reader = new StringReader(rule.getContent());\r
159         Resource res = ResourceFactory.newReaderResource(reader);\r
160 \r
161         KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();\r
162 \r
163         kbuilder.add(res, ResourceType.DRL);\r
164 \r
165         judgeRuleContent(locale, kbuilder, true);\r
166 \r
167         String packageName = kbuilder.getKnowledgePackages().iterator().next().getName();\r
168         try {\r
169             packageNames.add(packageName);\r
170             kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());\r
171         } catch (Exception e) {\r
172 \r
173             String errorMsg =\r
174                 I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_DEPLOY_RULE_FAILED);\r
175             throw new CorrelationException(errorMsg, e);\r
176         }\r
177 \r
178         ksession.fireAllRules();\r
179         return packageName;\r
180     }\r
181 \r
182     public synchronized void undeployRule(String packageName, Locale locale)\r
183         throws CorrelationException {\r
184 \r
185         KnowledgePackage pkg = kbase.getKnowledgePackage(packageName);\r
186 \r
187         if (null == pkg) {\r
188             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,\r
189                 I18nProxy.ENGINE_DELETE_RULE_NULL,\r
190                 new String[]{packageName});\r
191             throw new CorrelationException(errorMsg);\r
192         }\r
193 \r
194         try {\r
195 \r
196             kbase.removeKnowledgePackage(pkg.getName());\r
197         } catch (Exception e) {\r
198             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,\r
199                 I18nProxy.ENGINE_DELETE_RULE_FAILED, new String[]{packageName});\r
200             throw new CorrelationException(errorMsg, e);\r
201         }\r
202     }\r
203 \r
204     public void compileRule(String content, Locale locale)\r
205         throws CorrelationException {\r
206         StringReader reader = new StringReader(content);\r
207         Resource res = ResourceFactory.newReaderResource(reader);\r
208 \r
209         KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();\r
210 \r
211         kbuilder.add(res, ResourceType.DRL);\r
212 \r
213         judgeRuleContent(locale, kbuilder, false);\r
214     }\r
215 \r
216     private void judgeRuleContent(Locale locale, KnowledgeBuilder kbuilder, boolean judgePackageName)\r
217         throws CorrelationException {\r
218         if (kbuilder.hasErrors()) {\r
219             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,\r
220                 I18nProxy.ENGINE_CONTENT_ILLEGALITY,\r
221                 new String[]{kbuilder.getErrors().toString()});\r
222             log.error(errorMsg);\r
223             throw new CorrelationException(errorMsg);\r
224         }\r
225 \r
226         String packageName = kbuilder.getKnowledgePackages().iterator().next().getName();\r
227 \r
228         if (packageNames.contains(packageName) && judgePackageName) {\r
229             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,\r
230                 I18nProxy.ENGINE_CONTENT_ILLEGALITY, new String[]{\r
231                     I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_CONTAINS_PACKAGE)});\r
232 \r
233             throw new CorrelationException(errorMsg);\r
234         }\r
235     }\r
236 \r
237     public void putRaisedIntoStream(Alarm raiseAlarm) {\r
238         FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm);\r
239         if (factHandle != null) {\r
240             this.ksession.retract(factHandle);\r
241         }\r
242         this.ksession.insert(raiseAlarm);\r
243         this.ksession.fireAllRules();\r
244     }\r
245 \r
246     class AlarmMqMessageListener implements MessageListener {\r
247 \r
248         private Connection connection = null;\r
249         private Session session = null;\r
250         private Destination destination = null;\r
251         private MessageConsumer consumer = null;\r
252 \r
253         private void initialize() throws JMSException {\r
254             connection = connectionFactory.createConnection();\r
255             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);\r
256             destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);\r
257             consumer = session.createConsumer(destination);\r
258             connection.start();\r
259         }\r
260 \r
261         public void receive() {\r
262             try {\r
263                 initialize();\r
264                 consumer.setMessageListener(this);\r
265             } catch (JMSException e) {\r
266                 log.error("Failed to connect to the MQ service : " + e.getMessage(), e);\r
267                 try {\r
268                     close();\r
269                 } catch (JMSException e1) {\r
270                     log.error("Failed close connection  " + e1.getMessage(), e1);\r
271                 }\r
272             }\r
273         }\r
274 \r
275         public void onMessage(Message arg0) {\r
276             ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;\r
277             try {\r
278                 Serializable object = objectMessage.getObject();\r
279 \r
280                 if (object instanceof Alarm) {\r
281                     Alarm alarm = (Alarm) object;\r
282                     putRaisedIntoStream(alarm);\r
283                 }\r
284             } catch (JMSException e) {\r
285                 log.error("Failed get object : " + e.getMessage(), e);\r
286             }\r
287         }\r
288 \r
289         private void close() throws JMSException {\r
290             if (consumer != null) {\r
291                 consumer.close();\r
292             }\r
293             if (session != null) {\r
294                 session.close();\r
295             }\r
296             if (connection != null) {\r
297                 connection.close();\r
298             }\r
299         }\r
300     }\r
301 }\r