Change the groupid from openo to onap
[holmes/engine-management.git] / engine-d / src / main / java / org / onap / holmes / engine / manager / DroolsEngine.java
1 /**\r
2  * Copyright 2017 ZTE Corporation.\r
3  *\r
4  * Licensed under the Apache License, Version 2.0 (the "License");\r
5  * you may not use this file except in compliance with the License.\r
6  * You may obtain a copy of the License at\r
7  *\r
8  * http://www.apache.org/licenses/LICENSE-2.0\r
9  *\r
10  * Unless required by applicable law or agreed to in writing, software\r
11  * distributed under the License is distributed on an "AS IS" BASIS,\r
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
13  * See the License for the specific language governing permissions and\r
14  * limitations under the License.\r
15  */\r
16 package org.onap.holmes.engine.manager;\r
17 \r
18 \r
19 import java.io.Serializable;\r
20 import java.io.StringReader;\r
21 import java.util.HashSet;\r
22 import java.util.List;\r
23 import java.util.Locale;\r
24 import java.util.Set;\r
25 import javax.annotation.PostConstruct;\r
26 import javax.inject.Inject;\r
27 import javax.jms.Connection;\r
28 import javax.jms.ConnectionFactory;\r
29 import javax.jms.Destination;\r
30 import javax.jms.JMSException;\r
31 import javax.jms.Message;\r
32 import javax.jms.MessageConsumer;\r
33 import javax.jms.MessageListener;\r
34 import javax.jms.Session;\r
35 import lombok.extern.slf4j.Slf4j;\r
36 import org.apache.activemq.ActiveMQConnectionFactory;\r
37 import org.apache.activemq.command.ActiveMQObjectMessage;\r
38 import org.drools.KnowledgeBase;\r
39 import org.drools.KnowledgeBaseConfiguration;\r
40 import org.drools.KnowledgeBaseFactory;\r
41 import org.drools.builder.KnowledgeBuilder;\r
42 import org.drools.builder.KnowledgeBuilderFactory;\r
43 import org.drools.builder.ResourceType;\r
44 import org.drools.conf.EventProcessingOption;\r
45 import org.drools.definition.KnowledgePackage;\r
46 import org.drools.io.Resource;\r
47 import org.drools.io.ResourceFactory;\r
48 import org.drools.runtime.StatefulKnowledgeSession;\r
49 import org.drools.runtime.rule.FactHandle;\r
50 import org.glassfish.hk2.api.IterableProvider;\r
51 import org.jvnet.hk2.annotations.Service;\r
52 import org.onap.holmes.engine.request.DeployRuleRequest;\r
53 import org.onap.holmes.common.api.entity.CorrelationRule;\r
54 import org.onap.holmes.common.api.stat.Alarm;\r
55 import org.onap.holmes.common.config.MQConfig;\r
56 import org.onap.holmes.common.constant.AlarmConst;\r
57 import org.onap.holmes.common.exception.CorrelationException;\r
58 import org.onap.holmes.common.utils.ExceptionUtil;\r
59 import org.onap.holmes.engine.wrapper.RuleMgtWrapper;\r
60 \r
61 @Slf4j\r
62 @Service\r
63 public class DroolsEngine {\r
64 \r
65     private final static int ENABLE = 1;\r
66     private final Set<String> packageNames = new HashSet<String>();\r
67     @Inject\r
68     private RuleMgtWrapper ruleMgtWrapper;\r
69     private KnowledgeBase kbase;\r
70     private KnowledgeBaseConfiguration kconf;\r
71     private StatefulKnowledgeSession ksession;\r
72     @Inject\r
73     private IterableProvider<MQConfig> mqConfigProvider;\r
74     private ConnectionFactory connectionFactory;\r
75 \r
76     @PostConstruct\r
77     private void init() {\r
78         try {\r
79             // 1. start engine\r
80             start();\r
81             // 2. start mq listener\r
82             registerAlarmTopicListener();\r
83         } catch (Exception e) {\r
84             log.error("Failed to start the service: " + e.getMessage(), e);\r
85             throw ExceptionUtil.buildExceptionResponse("Failed to start the drools engine!");\r
86         }\r
87     }\r
88 \r
89     private void registerAlarmTopicListener() {\r
90         String brokerURL =\r
91             "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;\r
92         connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,\r
93             mqConfigProvider.get().brokerPassword, brokerURL);\r
94 \r
95         AlarmMqMessageListener listener = new AlarmMqMessageListener();\r
96         listener.receive();\r
97     }\r
98 \r
99 \r
100     private void start() throws CorrelationException {\r
101         log.info("Drools Engine Initialize Beginning...");\r
102 \r
103         initEngineParameter();\r
104         initDeployRule();\r
105 \r
106         log.info("Business Rule Engine Initialize Successfully.");\r
107     }\r
108 \r
109     public void stop() {\r
110         this.ksession.dispose();\r
111     }\r
112 \r
113     private void initEngineParameter() {\r
114         this.kconf = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();\r
115 \r
116         this.kconf.setOption(EventProcessingOption.STREAM);\r
117 \r
118         this.kconf.setProperty("drools.assertBehaviour", "equality");\r
119 \r
120         this.kbase = KnowledgeBaseFactory.newKnowledgeBase("D-ENGINE", this.kconf);\r
121 \r
122         this.ksession = kbase.newStatefulKnowledgeSession();\r
123     }\r
124 \r
125     private void initDeployRule() throws CorrelationException {\r
126         List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);\r
127 \r
128         if (rules.isEmpty()) {\r
129             return;\r
130         }\r
131         for (CorrelationRule rule : rules) {\r
132             if (rule.getContent() != null) {\r
133                 deployRuleFromDB(rule.getContent());\r
134             }\r
135         }\r
136     }\r
137 \r
138     private void deployRuleFromDB(String ruleContent) throws CorrelationException {\r
139         StringReader reader = new StringReader(ruleContent);\r
140         Resource res = ResourceFactory.newReaderResource(reader);\r
141 \r
142         KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();\r
143 \r
144         kbuilder.add(res, ResourceType.DRL);\r
145 \r
146         try {\r
147 \r
148             kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());\r
149         } catch (Exception e) {\r
150             throw new CorrelationException(e.getMessage(), e);\r
151         }\r
152         ksession.fireAllRules();\r
153     }\r
154 \r
155     public synchronized String deployRule(DeployRuleRequest rule, Locale locale)\r
156         throws CorrelationException {\r
157         StringReader reader = new StringReader(rule.getContent());\r
158         Resource res = ResourceFactory.newReaderResource(reader);\r
159 \r
160         KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();\r
161 \r
162         kbuilder.add(res, ResourceType.DRL);\r
163 \r
164         judgeRuleContent(locale, kbuilder, true);\r
165 \r
166         String packageName = kbuilder.getKnowledgePackages().iterator().next().getName();\r
167         try {\r
168             packageNames.add(packageName);\r
169             kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());\r
170         } catch (Exception e) {\r
171             throw new CorrelationException("Failed to deploy the rule.", e);\r
172         }\r
173 \r
174         ksession.fireAllRules();\r
175         return packageName;\r
176     }\r
177 \r
178     public synchronized void undeployRule(String packageName, Locale locale)\r
179         throws CorrelationException {\r
180 \r
181         KnowledgePackage pkg = kbase.getKnowledgePackage(packageName);\r
182 \r
183         if (null == pkg) {\r
184             throw new CorrelationException("The rule " + packageName + " does not exist!");\r
185         }\r
186 \r
187         try {\r
188             kbase.removeKnowledgePackage(pkg.getName());\r
189         } catch (Exception e) {\r
190             throw new CorrelationException("Failed to delete the rule: " + packageName, e);\r
191         }\r
192     }\r
193 \r
194     public void compileRule(String content, Locale locale)\r
195         throws CorrelationException {\r
196         StringReader reader = new StringReader(content);\r
197         Resource res = ResourceFactory.newReaderResource(reader);\r
198 \r
199         KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();\r
200 \r
201         kbuilder.add(res, ResourceType.DRL);\r
202 \r
203         judgeRuleContent(locale, kbuilder, false);\r
204     }\r
205 \r
206     private void judgeRuleContent(Locale locale, KnowledgeBuilder kbuilder, boolean judgePackageName)\r
207         throws CorrelationException {\r
208         if (kbuilder.hasErrors()) {\r
209             String errorMsg = "There are errors in the rule: " + kbuilder.getErrors().toString();\r
210             log.error(errorMsg);\r
211             throw new CorrelationException(errorMsg);\r
212         }\r
213 \r
214         String packageName = kbuilder.getKnowledgePackages().iterator().next().getName();\r
215 \r
216         if (packageNames.contains(packageName) && judgePackageName) {\r
217             throw new CorrelationException("The rule " + packageName + " already exists in the drools engine.");\r
218         }\r
219     }\r
220 \r
221     public void putRaisedIntoStream(Alarm raiseAlarm) {\r
222         FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm);\r
223         if (factHandle != null) {\r
224             this.ksession.retract(factHandle);\r
225         }\r
226         this.ksession.insert(raiseAlarm);\r
227         this.ksession.fireAllRules();\r
228     }\r
229 \r
230     class AlarmMqMessageListener implements MessageListener {\r
231 \r
232         private Connection connection = null;\r
233         private Session session = null;\r
234         private Destination destination = null;\r
235         private MessageConsumer consumer = null;\r
236 \r
237         private void initialize() throws JMSException {\r
238             connection = connectionFactory.createConnection();\r
239             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);\r
240             destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);\r
241             consumer = session.createConsumer(destination);\r
242             connection.start();\r
243         }\r
244 \r
245         public void receive() {\r
246             try {\r
247                 initialize();\r
248                 consumer.setMessageListener(this);\r
249             } catch (JMSException e) {\r
250                 log.error("Failed to connect to the MQ service : " + e.getMessage(), e);\r
251                 try {\r
252                     close();\r
253                 } catch (JMSException e1) {\r
254                     log.error("Failed close connection  " + e1.getMessage(), e1);\r
255                 }\r
256             }\r
257         }\r
258 \r
259         public void onMessage(Message arg0) {\r
260             ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;\r
261             try {\r
262                 Serializable object = objectMessage.getObject();\r
263 \r
264                 if (object instanceof Alarm) {\r
265                     Alarm alarm = (Alarm) object;\r
266                     putRaisedIntoStream(alarm);\r
267                 }\r
268             } catch (JMSException e) {\r
269                 log.error("Failed get object : " + e.getMessage(), e);\r
270             }\r
271         }\r
272 \r
273         private void close() throws JMSException {\r
274             if (consumer != null) {\r
275                 consumer.close();\r
276             }\r
277             if (session != null) {\r
278                 session.close();\r
279             }\r
280             if (connection != null) {\r
281                 connection.close();\r
282             }\r
283         }\r
284     }\r
285 }\r