Change the API Path
[holmes/engine-management.git] / engine-d / src / main / java / org / onap / holmes / engine / manager / DroolsEngine.java
1 /**\r
2  * Copyright 2017 ZTE Corporation.\r
3  *\r
4  * Licensed under the Apache License, Version 2.0 (the "License");\r
5  * you may not use this file except in compliance with the License.\r
6  * You may obtain a copy of the License at\r
7  *\r
8  * http://www.apache.org/licenses/LICENSE-2.0\r
9  *\r
10  * Unless required by applicable law or agreed to in writing, software\r
11  * distributed under the License is distributed on an "AS IS" BASIS,\r
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
13  * See the License for the specific language governing permissions and\r
14  * limitations under the License.\r
15  */\r
16 package org.onap.holmes.engine.manager;\r
17 \r
18 \r
19 import java.io.Serializable;\r
20 import java.io.StringReader;\r
21 import java.util.HashSet;\r
22 import java.util.List;\r
23 import java.util.Locale;\r
24 import java.util.Set;\r
25 import javax.annotation.PostConstruct;\r
26 import javax.inject.Inject;\r
27 import javax.jms.Connection;\r
28 import javax.jms.ConnectionFactory;\r
29 import javax.jms.Destination;\r
30 import javax.jms.JMSException;\r
31 import javax.jms.Message;\r
32 import javax.jms.MessageConsumer;\r
33 import javax.jms.MessageListener;\r
34 import javax.jms.Session;\r
35 import lombok.extern.slf4j.Slf4j;\r
36 import org.apache.activemq.ActiveMQConnectionFactory;\r
37 import org.apache.activemq.command.ActiveMQObjectMessage;\r
38 import org.drools.KnowledgeBase;\r
39 import org.drools.KnowledgeBaseConfiguration;\r
40 import org.drools.KnowledgeBaseFactory;\r
41 import org.drools.builder.KnowledgeBuilder;\r
42 import org.drools.builder.KnowledgeBuilderFactory;\r
43 import org.drools.builder.ResourceType;\r
44 import org.drools.conf.EventProcessingOption;\r
45 import org.drools.definition.KnowledgePackage;\r
46 import org.drools.io.Resource;\r
47 import org.drools.io.ResourceFactory;\r
48 import org.drools.runtime.StatefulKnowledgeSession;\r
49 import org.drools.runtime.rule.FactHandle;\r
50 import org.glassfish.hk2.api.IterableProvider;\r
51 import org.jvnet.hk2.annotations.Service;\r
52 import org.onap.holmes.engine.request.DeployRuleRequest;\r
53 import org.onap.holmes.common.api.entity.CorrelationRule;\r
54 import org.onap.holmes.common.api.stat.Alarm;\r
55 import org.onap.holmes.common.config.MQConfig;\r
56 import org.onap.holmes.common.constant.AlarmConst;\r
57 import org.onap.holmes.common.exception.CorrelationException;\r
58 import org.onap.holmes.common.utils.ExceptionUtil;\r
59 import org.onap.holmes.engine.wrapper.RuleMgtWrapper;\r
60 \r
61 @Slf4j\r
62 @Service\r
63 public class DroolsEngine {\r
64 \r
65     private final static int ENABLE = 1;\r
66     private final Set<String> packageNames = new HashSet<String>();\r
67     @Inject\r
68     private RuleMgtWrapper ruleMgtWrapper;\r
69     private KnowledgeBase kbase;\r
70     private KnowledgeBaseConfiguration kconf;\r
71     private StatefulKnowledgeSession ksession;\r
72     @Inject\r
73     private IterableProvider<MQConfig> mqConfigProvider;\r
74     private ConnectionFactory connectionFactory;\r
75 \r
76     @PostConstruct\r
77     private void init() {\r
78         try {\r
79             // 1. start engine\r
80             start();\r
81             // 2. start mq listener\r
82             registerAlarmTopicListener();\r
83         } catch (Exception e) {\r
84             log.error("Failed to start the service: " + e.getMessage(), e);\r
85             throw ExceptionUtil.buildExceptionResponse("Failed to start the drools engine!");\r
86         }\r
87     }\r
88 \r
89     private void registerAlarmTopicListener() {\r
90         String brokerURL =\r
91             "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;\r
92         connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,\r
93             mqConfigProvider.get().brokerPassword, brokerURL);\r
94 \r
95         AlarmMqMessageListener listener = new AlarmMqMessageListener();\r
96         listener.receive();\r
97     }\r
98 \r
99 \r
100     private void start() throws CorrelationException {\r
101         log.info("Drools Engine Initialize Beginning...");\r
102 \r
103         initEngineParameter();\r
104         initDeployRule();\r
105 \r
106         log.info("Business Rule Engine Initialize Successfully.");\r
107     }\r
108 \r
109     public void stop() {\r
110         this.ksession.dispose();\r
111     }\r
112 \r
113     private void initEngineParameter() {\r
114         this.kconf = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();\r
115 \r
116         this.kconf.setOption(EventProcessingOption.STREAM);\r
117 \r
118         this.kconf.setProperty("drools.assertBehaviour", "equality");\r
119 \r
120         this.kbase = KnowledgeBaseFactory.newKnowledgeBase("D-ENGINE", this.kconf);\r
121 \r
122         this.ksession = kbase.newStatefulKnowledgeSession();\r
123     }\r
124 \r
125     private void initDeployRule() throws CorrelationException {\r
126         List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);\r
127 \r
128         if (rules.isEmpty()) {\r
129             return;\r
130         }\r
131         for (CorrelationRule rule : rules) {\r
132             if (rule.getContent() != null) {\r
133                 deployRuleFromDB(rule.getContent());\r
134             }\r
135         }\r
136     }\r
137 \r
138     private void deployRuleFromDB(String ruleContent) throws CorrelationException {\r
139         StringReader reader = new StringReader(ruleContent);\r
140         Resource res = ResourceFactory.newReaderResource(reader);\r
141 \r
142         KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();\r
143 \r
144         kbuilder.add(res, ResourceType.DRL);\r
145 \r
146         try {\r
147 \r
148             kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());\r
149         } catch (Exception e) {\r
150             throw new CorrelationException(e.getMessage(), e);\r
151         }\r
152         ksession.fireAllRules();\r
153     }\r
154 \r
155     public synchronized String deployRule(DeployRuleRequest rule, Locale locale)\r
156         throws CorrelationException {\r
157         StringReader reader = new StringReader(rule.getContent());\r
158         Resource res = ResourceFactory.newReaderResource(reader);\r
159 \r
160         KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();\r
161 \r
162         kbuilder.add(res, ResourceType.DRL);\r
163 \r
164         judgeRuleContent(locale, kbuilder, true);\r
165 \r
166         String packageName = kbuilder.getKnowledgePackages().iterator().next().getName();\r
167         try {\r
168             packageNames.add(packageName);\r
169             kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());\r
170         } catch (Exception e) {\r
171             throw new CorrelationException("Failed to deploy the rule.", e);\r
172         }\r
173 \r
174         ksession.fireAllRules();\r
175         return packageName;\r
176     }\r
177 \r
178     public synchronized void undeployRule(String packageName, Locale locale)\r
179         throws CorrelationException {\r
180 \r
181         KnowledgePackage pkg = kbase.getKnowledgePackage(packageName);\r
182 \r
183         if (null == pkg) {\r
184             throw new CorrelationException("The rule " + packageName + " does not exist!");\r
185         }\r
186 \r
187         try {\r
188             kbase.removeKnowledgePackage(pkg.getName());\r
189         } catch (Exception e) {\r
190             throw new CorrelationException("Failed to delete the rule: " + packageName, e);\r
191         }\r
192         packageNames.remove(pkg.getName());\r
193     }\r
194 \r
195     public void compileRule(String content, Locale locale)\r
196         throws CorrelationException {\r
197         StringReader reader = new StringReader(content);\r
198         Resource res = ResourceFactory.newReaderResource(reader);\r
199 \r
200         KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();\r
201 \r
202         kbuilder.add(res, ResourceType.DRL);\r
203 \r
204         judgeRuleContent(locale, kbuilder, false);\r
205     }\r
206 \r
207     private void judgeRuleContent(Locale locale, KnowledgeBuilder kbuilder, boolean judgePackageName)\r
208         throws CorrelationException {\r
209         if (kbuilder.hasErrors()) {\r
210             String errorMsg = "There are errors in the rule: " + kbuilder.getErrors().toString();\r
211             log.error(errorMsg);\r
212             throw new CorrelationException(errorMsg);\r
213         }\r
214 \r
215         String packageName = kbuilder.getKnowledgePackages().iterator().next().getName();\r
216 \r
217         if (packageNames.contains(packageName) && judgePackageName) {\r
218             throw new CorrelationException("The rule " + packageName + " already exists in the drools engine.");\r
219         }\r
220     }\r
221 \r
222     public void putRaisedIntoStream(Alarm raiseAlarm) {\r
223         FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm);\r
224         if (factHandle != null) {\r
225             this.ksession.retract(factHandle);\r
226         }\r
227         this.ksession.insert(raiseAlarm);\r
228         this.ksession.fireAllRules();\r
229     }\r
230 \r
231     class AlarmMqMessageListener implements MessageListener {\r
232 \r
233         private Connection connection = null;\r
234         private Session session = null;\r
235         private Destination destination = null;\r
236         private MessageConsumer consumer = null;\r
237 \r
238         private void initialize() throws JMSException {\r
239             connection = connectionFactory.createConnection();\r
240             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);\r
241             destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);\r
242             consumer = session.createConsumer(destination);\r
243             connection.start();\r
244         }\r
245 \r
246         public void receive() {\r
247             try {\r
248                 initialize();\r
249                 consumer.setMessageListener(this);\r
250             } catch (JMSException e) {\r
251                 log.error("Failed to connect to the MQ service : " + e.getMessage(), e);\r
252                 try {\r
253                     close();\r
254                 } catch (JMSException e1) {\r
255                     log.error("Failed close connection  " + e1.getMessage(), e1);\r
256                 }\r
257             }\r
258         }\r
259 \r
260         public void onMessage(Message arg0) {\r
261             ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;\r
262             try {\r
263                 Serializable object = objectMessage.getObject();\r
264 \r
265                 if (object instanceof Alarm) {\r
266                     Alarm alarm = (Alarm) object;\r
267                     putRaisedIntoStream(alarm);\r
268                 }\r
269             } catch (JMSException e) {\r
270                 log.error("Failed get object : " + e.getMessage(), e);\r
271             }\r
272         }\r
273 \r
274         private void close() throws JMSException {\r
275             if (consumer != null) {\r
276                 consumer.close();\r
277             }\r
278             if (session != null) {\r
279                 session.close();\r
280             }\r
281             if (connection != null) {\r
282                 connection.close();\r
283             }\r
284         }\r
285     }\r
286 }\r