Modify MQ listener mode
[holmes/engine-management.git] / engine-d / src / main / java / org / openo / holmes / engine / manager / DroolsEngine.java
1 /**
2  * Copyright 2017 ZTE Corporation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.openo.holmes.engine.manager;
17
18
19 import java.io.Serializable;
20 import java.io.StringReader;
21 import java.util.List;
22 import java.util.Locale;
23 import javax.annotation.PostConstruct;
24 import javax.inject.Inject;
25 import javax.jms.Connection;
26 import javax.jms.ConnectionFactory;
27 import javax.jms.Destination;
28 import javax.jms.JMSException;
29 import javax.jms.Message;
30 import javax.jms.MessageConsumer;
31 import javax.jms.MessageListener;
32 import javax.jms.Session;
33 import lombok.extern.slf4j.Slf4j;
34 import org.apache.activemq.ActiveMQConnectionFactory;
35 import org.apache.activemq.command.ActiveMQObjectMessage;
36 import org.drools.KnowledgeBase;
37 import org.drools.KnowledgeBaseConfiguration;
38 import org.drools.KnowledgeBaseFactory;
39 import org.drools.builder.KnowledgeBuilder;
40 import org.drools.builder.KnowledgeBuilderFactory;
41 import org.drools.builder.ResourceType;
42 import org.drools.conf.EventProcessingOption;
43 import org.drools.definition.KnowledgePackage;
44 import org.drools.io.Resource;
45 import org.drools.io.ResourceFactory;
46 import org.drools.runtime.StatefulKnowledgeSession;
47 import org.drools.runtime.rule.FactHandle;
48 import org.glassfish.hk2.api.IterableProvider;
49 import org.jvnet.hk2.annotations.Service;
50 import org.openo.holmes.common.api.entity.CorrelationRule;
51 import org.openo.holmes.common.api.stat.Alarm;
52 import org.openo.holmes.common.config.MQConfig;
53 import org.openo.holmes.common.constant.AlarmConst;
54 import org.openo.holmes.common.exception.CorrelationException;
55 import org.openo.holmes.common.utils.ExceptionUtil;
56 import org.openo.holmes.common.utils.I18nProxy;
57 import org.openo.holmes.engine.request.DeployRuleRequest;
58 import org.openo.holmes.engine.wrapper.RuleMgtWrapper;
59
60 @Slf4j
61 @Service
62 public class DroolsEngine {
63     private final static int ENABLE = 1;
64
65     @Inject
66     private RuleMgtWrapper ruleMgtWrapper;
67
68     private KnowledgeBase kbase;
69
70     private KnowledgeBaseConfiguration kconf;
71
72     private StatefulKnowledgeSession ksession;
73
74     private KnowledgeBuilder kbuilder;
75
76     @Inject
77     private IterableProvider<MQConfig> mqConfigProvider;
78
79     private ConnectionFactory connectionFactory;
80
81     @PostConstruct
82     private void init() {
83         try {
84             // 1. start engine
85             start();
86             // 2. start mq listener
87             registerAlarmTopicListener();
88         } catch (Exception e) {
89             log.error("Start service failed: " + e.getMessage(), e);
90             throw ExceptionUtil.buildExceptionResponse("Start service failed!");
91         }
92     }
93
94     private void registerAlarmTopicListener() {
95         String brokerURL =
96             "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;
97         connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
98             mqConfigProvider.get().brokerPassword, brokerURL);
99
100         AlarmMqMessageListener listener = new AlarmMqMessageListener();
101         listener.receive();
102     }
103
104
105     private void start() throws CorrelationException {
106         log.info("Drools Engine Initialize Beginning...");
107
108         initEngineParameter();
109         initDeployRule();
110
111         log.info("Business Rule Engine Initialize Successfully.");
112     }
113
114     public void stop() {
115         this.ksession.dispose();
116     }
117
118     private void initEngineParameter(){
119         this.kconf = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
120
121         this.kconf.setOption(EventProcessingOption.STREAM);
122
123         this.kconf.setProperty("drools.assertBehaviour", "equality");
124
125         this.kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
126
127         this.kbase = KnowledgeBaseFactory.newKnowledgeBase("D-ENGINE", this.kconf);
128
129         this.ksession = kbase.newStatefulKnowledgeSession();
130     }
131
132     private void initDeployRule() throws CorrelationException {
133         List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);
134
135         if (rules.isEmpty()) {
136             return;
137         }
138         for (CorrelationRule rule : rules) {
139             if (rule.getContent() != null) {
140                 deployRuleFromDB(rule.getContent());
141             }
142         }
143     }
144
145     private void deployRuleFromDB(String ruleContent) throws CorrelationException {
146         StringReader reader = new StringReader(ruleContent);
147         Resource res = ResourceFactory.newReaderResource(reader);
148
149         if (kbuilder == null) {
150             kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
151         }
152
153         kbuilder.add(res, ResourceType.DRL);
154
155         try {
156
157             kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
158         } catch (Exception e) {
159             throw new CorrelationException(e.getMessage(), e);
160         }
161
162         ksession.fireAllRules();
163     }
164
165     public synchronized String deployRule(DeployRuleRequest rule, Locale locale)
166         throws CorrelationException {
167         StringReader reader = new StringReader(rule.getContent());
168         Resource res = ResourceFactory.newReaderResource(reader);
169
170         if (kbuilder == null) {
171             kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
172         }
173
174         kbuilder.add(res, ResourceType.DRL);
175
176         if (kbuilder.hasErrors()) {
177
178             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
179                 I18nProxy.ENGINE_CONTENT_ILLEGALITY,
180                 new String[]{kbuilder.getErrors().toString()});
181             throw new CorrelationException(errorMsg);
182         }
183
184         KnowledgePackage kpackage = kbuilder.getKnowledgePackages().iterator().next();
185
186         if (kbase.getKnowledgePackages().contains(kpackage)) {
187
188             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
189                 I18nProxy.ENGINE_CONTENT_ILLEGALITY,new String[]{
190                     I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_CONTAINS_PACKAGE)});
191
192             throw new CorrelationException(errorMsg);
193         }
194         try {
195
196             kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
197         } catch (Exception e) {
198
199             String errorMsg =
200                 I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_DEPLOY_RULE_FAILED);
201             throw new CorrelationException(errorMsg, e);
202         }
203
204         ksession.fireAllRules();
205         return kpackage.getName();
206     }
207
208     public synchronized void undeployRule(String packageName, Locale locale)
209         throws CorrelationException {
210
211         KnowledgePackage pkg = kbase.getKnowledgePackage(packageName);
212
213         if (null == pkg) {
214             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
215                 I18nProxy.ENGINE_DELETE_RULE_NULL,
216                 new String[]{packageName});
217             throw new CorrelationException(errorMsg);
218         }
219
220         try {
221
222             kbase.removeKnowledgePackage(pkg.getName());
223         } catch (Exception e) {
224             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
225                 I18nProxy.ENGINE_DELETE_RULE_FAILED, new String[]{packageName});
226             throw new CorrelationException(errorMsg, e);
227         }
228     }
229
230     public void compileRule(String content, Locale locale)
231         throws CorrelationException {
232         StringReader reader = new StringReader(content);
233         Resource res = ResourceFactory.newReaderResource(reader);
234
235         if (kbuilder == null) {
236             kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
237         }
238
239         kbuilder.add(res, ResourceType.DRL);
240
241         if (kbuilder.hasErrors()) {
242             String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
243                 I18nProxy.ENGINE_CONTENT_ILLEGALITY,
244                 new String[]{kbuilder.getErrors().toString()});
245             log.error(errorMsg);
246             throw new CorrelationException(errorMsg);
247         }
248     }
249
250     public void putRaisedIntoStream(Alarm raiseAlarm) {
251         FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm);
252         if (factHandle != null) {
253             this.ksession.retract(factHandle);
254         }
255         this.ksession.insert(raiseAlarm);
256         this.ksession.fireAllRules();
257     }
258
259     class AlarmMqMessageListener implements MessageListener {
260
261         private Connection connection = null;
262         private Session session = null;
263         private Destination destination = null;
264         private MessageConsumer consumer = null;
265
266         private void initialize() throws JMSException {
267             connection = connectionFactory.createConnection();
268             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
269             destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
270             consumer = session.createConsumer(destination);
271             connection.start();
272         }
273
274         public void receive() {
275             try {
276                 initialize();
277                 consumer.setMessageListener(this);
278             } catch (JMSException e) {
279                 log.error("Failed to connect to the MQ service : " + e.getMessage(), e);
280                 try {
281                     close();
282                 } catch (JMSException e1) {
283                     log.error("Failed close connection  " + e1.getMessage(), e1);
284                 }
285             }
286         }
287
288         public void onMessage(Message arg0) {
289             ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;
290             try {
291                 Serializable object = objectMessage.getObject();
292
293                 if (object instanceof Alarm) {
294                     Alarm alarm = (Alarm) object;
295                     putRaisedIntoStream(alarm);
296                 }
297             } catch (JMSException e) {
298                 log.error("Failed get object : " + e.getMessage(), e);
299             }
300         }
301
302         private void close() throws JMSException {
303             if (consumer != null)
304                 consumer.close();
305             if (session != null)
306                 session.close();
307             if (connection != null)
308                 connection.close();
309         }
310     }
311 }