16ae49c9fc666b965136f7e7f137d0eef6699081
[holmes/engine-management.git] / engine-d / src / main / java / org / openo / holmes / engine / manager / DroolsEngine.java
1 /**
2  * Copyright 2017 ZTE Corporation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.openo.holmes.engine.manager;
17
18
19 import java.io.StringReader;
20 import java.util.List;
21 import java.util.Locale;
22 import javax.annotation.PostConstruct;
23 import javax.inject.Inject;
24 import javax.jms.Connection;
25 import javax.jms.ConnectionFactory;
26 import javax.jms.Destination;
27 import javax.jms.JMSException;
28 import javax.jms.MessageConsumer;
29 import javax.jms.ObjectMessage;
30 import javax.jms.Session;
31 import lombok.extern.slf4j.Slf4j;
32 import org.apache.activemq.ActiveMQConnectionFactory;
33 import org.drools.KnowledgeBase;
34 import org.drools.KnowledgeBaseConfiguration;
35 import org.drools.KnowledgeBaseFactory;
36 import org.drools.builder.KnowledgeBuilder;
37 import org.drools.builder.KnowledgeBuilderFactory;
38 import org.drools.builder.ResourceType;
39 import org.drools.conf.EventProcessingOption;
40 import org.drools.definition.KnowledgePackage;
41 import org.drools.io.Resource;
42 import org.drools.io.ResourceFactory;
43 import org.drools.runtime.StatefulKnowledgeSession;
44 import org.drools.runtime.rule.FactHandle;
45 import org.glassfish.hk2.api.IterableProvider;
46 import org.jvnet.hk2.annotations.Service;
47 import org.openo.holmes.common.api.entity.CorrelationRule;
48 import org.openo.holmes.common.api.stat.Alarm;
49 import org.openo.holmes.common.config.MQConfig;
50 import org.openo.holmes.common.constant.AlarmConst;
51 import org.openo.holmes.common.exception.CorrelationException;
52 import org.openo.holmes.common.exception.DbException;
53 import org.openo.holmes.common.exception.EngineException;
54 import org.openo.holmes.common.exception.RuleIllegalityException;
55 import org.openo.holmes.common.utils.ExceptionUtil;
56 import org.openo.holmes.common.utils.I18nProxy;
57 import org.openo.holmes.engine.request.DeployRuleRequest;
58 import org.openo.holmes.engine.wrapper.RuleMgtWrapper;
59
60 @Slf4j
61 @Service
62 public class DroolsEngine {
63
64     private final static String CORRELATION_RULE = "CORRELATION_RULE";
65
66     private final static String CORRELATION_ALARM = "CORRELATION_ALARM";
67
68     private final static int ENABLE = 1;
69
70     @Inject
71     private RuleMgtWrapper ruleMgtWrapper;
72
73     private KnowledgeBase kbase;
74
75     private KnowledgeBaseConfiguration kconf;
76
77     private StatefulKnowledgeSession ksession;
78
79     private KnowledgeBuilder kbuilder;
80
81     @Inject
82     private IterableProvider<MQConfig> mqConfigProvider;
83
84     private ConnectionFactory connectionFactory;
85
86     @PostConstruct
87     private void init() {
88         try {
89             // 1. start engine
90             start();
91             // 2. start mq listener
92             registerAlarmTopicListener();
93         } catch (Exception e) {
94             log.error("Start service failed: " + e.getMessage());
95             throw ExceptionUtil.buildExceptionResponse("Start service failed!");
96         }
97     }
98
99     private void registerAlarmTopicListener() {
100         String brokerURL =
101             "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;
102         connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
103             mqConfigProvider.get().brokerPassword, brokerURL);
104
105         Thread thread = new Thread(new AlarmMqMessageListener());
106         thread.start();
107     }
108
109
110     private void start() throws EngineException, RuleIllegalityException, DbException {
111         log.info("Drools Egine Initialize Begining ... ");
112
113         initEngineParameter();
114 //        initDeployRule();
115
116         log.info("Business Rule Egine Initialize Successfully ");
117     }
118
119     public void stop() throws Exception {
120         this.ksession.dispose();
121     }
122
123     private void initEngineParameter() throws EngineException {
124         this.kconf = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
125
126         this.kconf.setOption(EventProcessingOption.STREAM);
127
128         this.kconf.setProperty("drools.assertBehaviour", "equality");
129
130         this.kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
131
132         this.kbase = KnowledgeBaseFactory.newKnowledgeBase("D-ENGINE", this.kconf);
133
134         this.ksession = kbase.newStatefulKnowledgeSession();
135     }
136
137     private void initDeployRule() throws RuleIllegalityException, EngineException, DbException {
138         List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);
139
140         if (rules.size() > 0) {
141             for (CorrelationRule rule : rules) {
142                 if (rule.getContent() != null) {
143                     deployRuleFromCache(rule.getContent());
144                 }
145             }
146         }
147     }
148
149     private void deployRuleFromCache(String ruleContent) throws EngineException {
150         StringReader reader = new StringReader(ruleContent);
151         Resource res = ResourceFactory.newReaderResource(reader);
152
153         if (kbuilder == null) {
154             kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
155         }
156
157         kbuilder.add(res, ResourceType.DRL);
158
159         try {
160
161             kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
162         } catch (Exception e) {
163             throw new EngineException(e);
164         }
165
166         ksession.fireAllRules();
167     }
168
169     public synchronized String deployRule(DeployRuleRequest rule, Locale locale)
170         throws CorrelationException {
171         StringReader reader = new StringReader(rule.getContent());
172         Resource res = ResourceFactory.newReaderResource(reader);
173
174         if (kbuilder == null) {
175             kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
176         }
177
178         kbuilder.add(res, ResourceType.DRL);
179
180         if (kbuilder.hasErrors()) {
181
182             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
183                 I18nProxy.ENGINE_CONTENT_ILLEGALITY,
184                 new String[]{kbuilder.getErrors().toString()});
185             throw new CorrelationException(errorMsg);
186         }
187
188         KnowledgePackage kpackage = kbuilder.getKnowledgePackages().iterator().next();
189
190         if (kbase.getKnowledgePackages().contains(kpackage)) {
191
192             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
193                 I18nProxy.ENGINE_CONTENT_ILLEGALITY,new String[]{
194                     I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_CONTAINS_PACKAGE)});
195
196             throw new CorrelationException(errorMsg);
197         }
198         try {
199
200             kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
201         } catch (Exception e) {
202
203             String errorMsg =
204                 I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_DEPLOY_RULE_FAILED);
205             throw new CorrelationException(errorMsg, e);
206         }
207
208         ksession.fireAllRules();
209         return kpackage.getName();
210     }
211
212     public synchronized void undeployRule(String packageName, Locale locale)
213         throws CorrelationException {
214
215         KnowledgePackage pkg = kbase.getKnowledgePackage(packageName);
216
217         if (null == pkg) {
218             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
219                 I18nProxy.ENGINE_DELETE_RULE_NULL,
220                 new String[]{packageName});
221             throw new CorrelationException(errorMsg);
222         }
223
224         try {
225
226             kbase.removeKnowledgePackage(pkg.getName());
227         } catch (Exception e) {
228             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
229                 I18nProxy.ENGINE_DELETE_RULE_FAILED, new String[]{packageName});
230             throw new CorrelationException(errorMsg, e);
231         }
232     }
233
234     public void compileRule(String content, Locale locale)
235         throws CorrelationException {
236         StringReader reader = new StringReader(content);
237         Resource res = ResourceFactory.newReaderResource(reader);
238
239         if (kbuilder == null) {
240             kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
241         }
242
243         kbuilder.add(res, ResourceType.DRL);
244
245         if (kbuilder.hasErrors()) {
246             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
247                 I18nProxy.ENGINE_CONTENT_ILLEGALITY,
248                 new String[]{kbuilder.getErrors().toString()});
249             log.error(errorMsg);
250             throw new CorrelationException(errorMsg);
251         }
252     }
253
254     public void putRaisedIntoStream(Alarm raiseAlarm) {
255         FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm);
256         if (factHandle != null) {
257             this.ksession.retract(factHandle);
258         }
259         this.ksession.insert(raiseAlarm);
260         this.ksession.fireAllRules();
261     }
262
263     class AlarmMqMessageListener implements Runnable {
264
265         public void run() {
266             Connection connection;
267             Session session;
268             Destination destination;
269             MessageConsumer messageConsumer;
270
271             try {
272                 connection = connectionFactory.createConnection();
273                 connection.start();
274                 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
275                 destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
276                 messageConsumer = session.createConsumer(destination);
277
278                 while (true) {
279                     ObjectMessage objMessage = (ObjectMessage) messageConsumer.receive(100000);
280                     if (objMessage != null) {
281                         putRaisedIntoStream((Alarm) objMessage.getObject());
282                     } else {
283                         break;
284                     }
285                 }
286             } catch (JMSException e) {
287                 log.error("connection mq service Failed: " + e.getMessage());
288             }
289
290         }
291     }
292
293 }