Remove DBException
[holmes/engine-management.git] / engine-d / src / main / java / org / openo / holmes / engine / manager / DroolsEngine.java
1 /**
2  * Copyright 2017 ZTE Corporation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.openo.holmes.engine.manager;
17
18
19 import java.io.StringReader;
20 import java.util.List;
21 import java.util.Locale;
22 import javax.annotation.PostConstruct;
23 import javax.inject.Inject;
24 import javax.jms.Connection;
25 import javax.jms.ConnectionFactory;
26 import javax.jms.Destination;
27 import javax.jms.JMSException;
28 import javax.jms.MessageConsumer;
29 import javax.jms.ObjectMessage;
30 import javax.jms.Session;
31 import lombok.extern.slf4j.Slf4j;
32 import org.apache.activemq.ActiveMQConnectionFactory;
33 import org.drools.KnowledgeBase;
34 import org.drools.KnowledgeBaseConfiguration;
35 import org.drools.KnowledgeBaseFactory;
36 import org.drools.builder.KnowledgeBuilder;
37 import org.drools.builder.KnowledgeBuilderFactory;
38 import org.drools.builder.ResourceType;
39 import org.drools.conf.EventProcessingOption;
40 import org.drools.definition.KnowledgePackage;
41 import org.drools.io.Resource;
42 import org.drools.io.ResourceFactory;
43 import org.drools.runtime.StatefulKnowledgeSession;
44 import org.drools.runtime.rule.FactHandle;
45 import org.glassfish.hk2.api.IterableProvider;
46 import org.jvnet.hk2.annotations.Service;
47 import org.openo.holmes.common.api.entity.CorrelationRule;
48 import org.openo.holmes.common.api.stat.Alarm;
49 import org.openo.holmes.common.config.MQConfig;
50 import org.openo.holmes.common.constant.AlarmConst;
51 import org.openo.holmes.common.exception.CorrelationException;
52 import org.openo.holmes.common.utils.ExceptionUtil;
53 import org.openo.holmes.common.utils.I18nProxy;
54 import org.openo.holmes.engine.request.DeployRuleRequest;
55 import org.openo.holmes.engine.wrapper.RuleMgtWrapper;
56
57 @Slf4j
58 @Service
59 public class DroolsEngine {
60
61     private final static String CORRELATION_RULE = "CORRELATION_RULE";
62
63     private final static String CORRELATION_ALARM = "CORRELATION_ALARM";
64
65     private final static int ENABLE = 1;
66
67     @Inject
68     private RuleMgtWrapper ruleMgtWrapper;
69
70     private KnowledgeBase kbase;
71
72     private KnowledgeBaseConfiguration kconf;
73
74     private StatefulKnowledgeSession ksession;
75
76     private KnowledgeBuilder kbuilder;
77
78     @Inject
79     private IterableProvider<MQConfig> mqConfigProvider;
80
81     private ConnectionFactory connectionFactory;
82
83     @PostConstruct
84     private void init() {
85         try {
86             // 1. start engine
87             start();
88             // 2. start mq listener
89             registerAlarmTopicListener();
90         } catch (Exception e) {
91             log.error("Start service failed: " + e.getMessage(), e);
92             throw ExceptionUtil.buildExceptionResponse("Start service failed!");
93         }
94     }
95
96     private void registerAlarmTopicListener() {
97         String brokerURL =
98             "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;
99         connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
100             mqConfigProvider.get().brokerPassword, brokerURL);
101
102         Thread thread = new Thread(new AlarmMqMessageListener());
103         thread.start();
104     }
105
106
107     private void start() throws CorrelationException {
108         log.info("Drools Engine Initialize Beginning...");
109
110         initEngineParameter();
111         initDeployRule();
112
113         log.info("Business Rule Engine Initialize Successfully.");
114     }
115
116     public void stop() {
117         this.ksession.dispose();
118     }
119
120     private void initEngineParameter(){
121         this.kconf = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
122
123         this.kconf.setOption(EventProcessingOption.STREAM);
124
125         this.kconf.setProperty("drools.assertBehaviour", "equality");
126
127         this.kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
128
129         this.kbase = KnowledgeBaseFactory.newKnowledgeBase("D-ENGINE", this.kconf);
130
131         this.ksession = kbase.newStatefulKnowledgeSession();
132     }
133
134     private void initDeployRule() throws CorrelationException {
135         List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);
136
137         if (!rules.isEmpty()) {
138             for (CorrelationRule rule : rules) {
139                 if (rule.getContent() != null) {
140                     deployRuleFromDB(rule.getContent());
141                 }
142             }
143         }
144     }
145
146     private void deployRuleFromDB(String ruleContent) throws CorrelationException {
147         StringReader reader = new StringReader(ruleContent);
148         Resource res = ResourceFactory.newReaderResource(reader);
149
150         if (kbuilder == null) {
151             kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
152         }
153
154         kbuilder.add(res, ResourceType.DRL);
155
156         try {
157
158             kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
159         } catch (Exception e) {
160             throw new CorrelationException(e.getMessage(), e);
161         }
162
163         ksession.fireAllRules();
164     }
165
166     public synchronized String deployRule(DeployRuleRequest rule, Locale locale)
167         throws CorrelationException {
168         StringReader reader = new StringReader(rule.getContent());
169         Resource res = ResourceFactory.newReaderResource(reader);
170
171         if (kbuilder == null) {
172             kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
173         }
174
175         kbuilder.add(res, ResourceType.DRL);
176
177         if (kbuilder.hasErrors()) {
178
179             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
180                 I18nProxy.ENGINE_CONTENT_ILLEGALITY,
181                 new String[]{kbuilder.getErrors().toString()});
182             throw new CorrelationException(errorMsg);
183         }
184
185         KnowledgePackage kpackage = kbuilder.getKnowledgePackages().iterator().next();
186
187         if (kbase.getKnowledgePackages().contains(kpackage)) {
188
189             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
190                 I18nProxy.ENGINE_CONTENT_ILLEGALITY,new String[]{
191                     I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_CONTAINS_PACKAGE)});
192
193             throw new CorrelationException(errorMsg);
194         }
195         try {
196
197             kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
198         } catch (Exception e) {
199
200             String errorMsg =
201                 I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_DEPLOY_RULE_FAILED);
202             throw new CorrelationException(errorMsg, e);
203         }
204
205         ksession.fireAllRules();
206         return kpackage.getName();
207     }
208
209     public synchronized void undeployRule(String packageName, Locale locale)
210         throws CorrelationException {
211
212         KnowledgePackage pkg = kbase.getKnowledgePackage(packageName);
213
214         if (null == pkg) {
215             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
216                 I18nProxy.ENGINE_DELETE_RULE_NULL,
217                 new String[]{packageName});
218             throw new CorrelationException(errorMsg);
219         }
220
221         try {
222
223             kbase.removeKnowledgePackage(pkg.getName());
224         } catch (Exception e) {
225             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
226                 I18nProxy.ENGINE_DELETE_RULE_FAILED, new String[]{packageName});
227             throw new CorrelationException(errorMsg, e);
228         }
229     }
230
231     public void compileRule(String content, Locale locale)
232         throws CorrelationException {
233         StringReader reader = new StringReader(content);
234         Resource res = ResourceFactory.newReaderResource(reader);
235
236         if (kbuilder == null) {
237             kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
238         }
239
240         kbuilder.add(res, ResourceType.DRL);
241
242         if (kbuilder.hasErrors()) {
243             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
244                 I18nProxy.ENGINE_CONTENT_ILLEGALITY,
245                 new String[]{kbuilder.getErrors().toString()});
246             log.error(errorMsg);
247             throw new CorrelationException(errorMsg);
248         }
249     }
250
251     public void putRaisedIntoStream(Alarm raiseAlarm) {
252         FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm);
253         if (factHandle != null) {
254             this.ksession.retract(factHandle);
255         }
256         this.ksession.insert(raiseAlarm);
257         this.ksession.fireAllRules();
258     }
259
260     class AlarmMqMessageListener implements Runnable {
261
262         public void run() {
263             Connection connection;
264             Session session;
265             Destination destination;
266             MessageConsumer messageConsumer;
267
268             try {
269                 connection = connectionFactory.createConnection();
270                 connection.start();
271                 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
272                 destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
273                 messageConsumer = session.createConsumer(destination);
274
275                 while (true) {
276                     ObjectMessage objMessage = (ObjectMessage) messageConsumer.receive(100000);
277                     if (objMessage != null) {
278                         putRaisedIntoStream((Alarm) objMessage.getObject());
279                     } else {
280                         break;
281                     }
282                 }
283             } catch (JMSException e) {
284                 log.error("connection mq service Failed: " + e.getMessage(), e);
285             }
286
287         }
288     }
289
290 }