Delete attribute
[holmes/engine-management.git] / engine-d / src / main / java / org / openo / holmes / engine / manager / DroolsEngine.java
1 /**
2  * Copyright 2017 ZTE Corporation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.openo.holmes.engine.manager;
17
18
19 import java.io.StringReader;
20 import java.util.List;
21 import java.util.Locale;
22 import javax.annotation.PostConstruct;
23 import javax.inject.Inject;
24 import javax.jms.Connection;
25 import javax.jms.ConnectionFactory;
26 import javax.jms.Destination;
27 import javax.jms.JMSException;
28 import javax.jms.MessageConsumer;
29 import javax.jms.ObjectMessage;
30 import javax.jms.Session;
31 import lombok.extern.slf4j.Slf4j;
32 import org.apache.activemq.ActiveMQConnectionFactory;
33 import org.drools.KnowledgeBase;
34 import org.drools.KnowledgeBaseConfiguration;
35 import org.drools.KnowledgeBaseFactory;
36 import org.drools.builder.KnowledgeBuilder;
37 import org.drools.builder.KnowledgeBuilderFactory;
38 import org.drools.builder.ResourceType;
39 import org.drools.conf.EventProcessingOption;
40 import org.drools.definition.KnowledgePackage;
41 import org.drools.io.Resource;
42 import org.drools.io.ResourceFactory;
43 import org.drools.runtime.StatefulKnowledgeSession;
44 import org.drools.runtime.rule.FactHandle;
45 import org.glassfish.hk2.api.IterableProvider;
46 import org.jvnet.hk2.annotations.Service;
47 import org.openo.holmes.common.api.entity.CorrelationRule;
48 import org.openo.holmes.common.api.stat.Alarm;
49 import org.openo.holmes.common.config.MQConfig;
50 import org.openo.holmes.common.constant.AlarmConst;
51 import org.openo.holmes.common.exception.CorrelationException;
52 import org.openo.holmes.common.utils.ExceptionUtil;
53 import org.openo.holmes.common.utils.I18nProxy;
54 import org.openo.holmes.engine.request.DeployRuleRequest;
55 import org.openo.holmes.engine.wrapper.RuleMgtWrapper;
56
57 @Slf4j
58 @Service
59 public class DroolsEngine {
60
61     private final static String CORRELATION_RULE = "CORRELATION_RULE";
62
63     private final static String CORRELATION_ALARM = "CORRELATION_ALARM";
64
65     private final static int ENABLE = 1;
66
67     @Inject
68     private RuleMgtWrapper ruleMgtWrapper;
69
70     private KnowledgeBase kbase;
71
72     private KnowledgeBaseConfiguration kconf;
73
74     private StatefulKnowledgeSession ksession;
75
76     private KnowledgeBuilder kbuilder;
77
78     @Inject
79     private IterableProvider<MQConfig> mqConfigProvider;
80
81     private ConnectionFactory connectionFactory;
82
83     @PostConstruct
84     private void init() {
85         try {
86             // 1. start engine
87             start();
88             // 2. start mq listener
89             registerAlarmTopicListener();
90         } catch (Exception e) {
91             log.error("Start service failed: " + e.getMessage(), e);
92             throw ExceptionUtil.buildExceptionResponse("Start service failed!");
93         }
94     }
95
96     private void registerAlarmTopicListener() {
97         String brokerURL =
98             "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;
99         connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
100             mqConfigProvider.get().brokerPassword, brokerURL);
101
102         Thread thread = new Thread(new AlarmMqMessageListener());
103         thread.start();
104     }
105
106
107     private void start() throws CorrelationException {
108         log.info("Drools Engine Initialize Beginning...");
109
110         initEngineParameter();
111         initDeployRule();
112
113         log.info("Business Rule Engine Initialize Successfully.");
114     }
115
116     public void stop() {
117         this.ksession.dispose();
118     }
119
120     private void initEngineParameter(){
121         this.kconf = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
122
123         this.kconf.setOption(EventProcessingOption.STREAM);
124
125         this.kconf.setProperty("drools.assertBehaviour", "equality");
126
127         this.kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
128
129         this.kbase = KnowledgeBaseFactory.newKnowledgeBase("D-ENGINE", this.kconf);
130
131         this.ksession = kbase.newStatefulKnowledgeSession();
132     }
133
134     private void initDeployRule() throws CorrelationException {
135         List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);
136
137         if (rules.isEmpty()) {
138             return;
139         }
140         for (CorrelationRule rule : rules) {
141             if (rule.getContent() != null) {
142                 deployRuleFromDB(rule.getContent());
143             }
144         }
145     }
146
147     private void deployRuleFromDB(String ruleContent) throws CorrelationException {
148         StringReader reader = new StringReader(ruleContent);
149         Resource res = ResourceFactory.newReaderResource(reader);
150
151         if (kbuilder == null) {
152             kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
153         }
154
155         kbuilder.add(res, ResourceType.DRL);
156
157         try {
158
159             kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
160         } catch (Exception e) {
161             throw new CorrelationException(e.getMessage(), e);
162         }
163
164         ksession.fireAllRules();
165     }
166
167     public synchronized String deployRule(DeployRuleRequest rule, Locale locale)
168         throws CorrelationException {
169         StringReader reader = new StringReader(rule.getContent());
170         Resource res = ResourceFactory.newReaderResource(reader);
171
172         if (kbuilder == null) {
173             kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
174         }
175
176         kbuilder.add(res, ResourceType.DRL);
177
178         if (kbuilder.hasErrors()) {
179
180             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
181                 I18nProxy.ENGINE_CONTENT_ILLEGALITY,
182                 new String[]{kbuilder.getErrors().toString()});
183             throw new CorrelationException(errorMsg);
184         }
185
186         KnowledgePackage kpackage = kbuilder.getKnowledgePackages().iterator().next();
187
188         if (kbase.getKnowledgePackages().contains(kpackage)) {
189
190             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
191                 I18nProxy.ENGINE_CONTENT_ILLEGALITY,new String[]{
192                     I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_CONTAINS_PACKAGE)});
193
194             throw new CorrelationException(errorMsg);
195         }
196         try {
197
198             kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
199         } catch (Exception e) {
200
201             String errorMsg =
202                 I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_DEPLOY_RULE_FAILED);
203             throw new CorrelationException(errorMsg, e);
204         }
205
206         ksession.fireAllRules();
207         return kpackage.getName();
208     }
209
210     public synchronized void undeployRule(String packageName, Locale locale)
211         throws CorrelationException {
212
213         KnowledgePackage pkg = kbase.getKnowledgePackage(packageName);
214
215         if (null == pkg) {
216             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
217                 I18nProxy.ENGINE_DELETE_RULE_NULL,
218                 new String[]{packageName});
219             throw new CorrelationException(errorMsg);
220         }
221
222         try {
223
224             kbase.removeKnowledgePackage(pkg.getName());
225         } catch (Exception e) {
226             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
227                 I18nProxy.ENGINE_DELETE_RULE_FAILED, new String[]{packageName});
228             throw new CorrelationException(errorMsg, e);
229         }
230     }
231
232     public void compileRule(String content, Locale locale)
233         throws CorrelationException {
234         StringReader reader = new StringReader(content);
235         Resource res = ResourceFactory.newReaderResource(reader);
236
237         if (kbuilder == null) {
238             kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
239         }
240
241         kbuilder.add(res, ResourceType.DRL);
242
243         if (kbuilder.hasErrors()) {
244             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
245                 I18nProxy.ENGINE_CONTENT_ILLEGALITY,
246                 new String[]{kbuilder.getErrors().toString()});
247             log.error(errorMsg);
248             throw new CorrelationException(errorMsg);
249         }
250     }
251
252     public void putRaisedIntoStream(Alarm raiseAlarm) {
253         FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm);
254         if (factHandle != null) {
255             this.ksession.retract(factHandle);
256         }
257         this.ksession.insert(raiseAlarm);
258         this.ksession.fireAllRules();
259     }
260
261     class AlarmMqMessageListener implements Runnable {
262
263         public void run() {
264             Connection connection;
265             Session session;
266             Destination destination;
267             MessageConsumer messageConsumer;
268
269             try {
270                 connection = connectionFactory.createConnection();
271                 connection.start();
272                 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
273                 destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
274                 messageConsumer = session.createConsumer(destination);
275
276                 while (true) {
277                     ObjectMessage objMessage = (ObjectMessage) messageConsumer.receive(100000);
278                     if (objMessage != null) {
279                         putRaisedIntoStream((Alarm) objMessage.getObject());
280                     } else {
281                         break;
282                     }
283                 }
284             } catch (JMSException e) {
285                 log.error("connection mq service Failed: " + e.getMessage(), e);
286             }
287
288         }
289     }
290
291 }