2 * Copyright 2017 ZTE Corporation.
\r
4 * Licensed under the Apache License, Version 2.0 (the "License");
\r
5 * you may not use this file except in compliance with the License.
\r
6 * You may obtain a copy of the License at
\r
8 * http://www.apache.org/licenses/LICENSE-2.0
\r
10 * Unless required by applicable law or agreed to in writing, software
\r
11 * distributed under the License is distributed on an "AS IS" BASIS,
\r
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
13 * See the License for the specific language governing permissions and
\r
14 * limitations under the License.
\r
16 package org.onap.holmes.engine.manager;
\r
17 import java.io.StringReader;
\r
18 import java.util.ArrayList;
\r
19 import java.util.HashSet;
\r
20 import java.util.List;
\r
21 import java.util.Locale;
\r
22 import java.util.Set;
\r
23 import javax.annotation.PostConstruct;
\r
24 import javax.inject.Inject;
\r
25 import lombok.extern.slf4j.Slf4j;
\r
26 import org.drools.compiler.kie.builder.impl.InternalKieModule;
\r
27 import org.jvnet.hk2.annotations.Service;
\r
29 import org.kie.api.KieBase;
\r
30 import org.kie.api.KieServices;
\r
31 import org.kie.api.builder.KieBuilder;
\r
32 import org.kie.api.builder.KieFileSystem;
\r
33 import org.kie.api.builder.KieRepository;
\r
34 import org.kie.api.builder.Message;
\r
35 import org.kie.api.builder.Message.Level;
\r
36 import org.kie.api.builder.model.KieBaseModel;
\r
37 import org.kie.api.builder.model.KieModuleModel;
\r
38 import org.kie.api.builder.model.KieSessionModel;
\r
39 import org.kie.api.conf.EqualityBehaviorOption;
\r
40 import org.kie.api.conf.EventProcessingOption;
\r
41 import org.kie.api.definition.KiePackage;
\r
42 import org.kie.api.io.KieResources;
\r
43 import org.kie.api.io.ResourceType;
\r
44 import org.kie.api.runtime.KieContainer;
\r
45 import org.kie.api.runtime.KieSession;
\r
46 import org.kie.api.runtime.conf.ClockTypeOption;
\r
47 import org.kie.api.runtime.rule.FactHandle;
\r
49 import org.onap.holmes.common.api.entity.AlarmInfo;
\r
51 import org.onap.holmes.common.api.stat.VesAlarm;
\r
52 import org.onap.holmes.common.dmaap.DmaapService;
\r
53 import org.onap.holmes.common.exception.AlarmInfoException;
\r
54 import org.onap.holmes.common.utils.DbDaoUtil;
\r
55 import org.onap.holmes.engine.db.AlarmInfoDao;
\r
56 import org.onap.holmes.engine.request.DeployRuleRequest;
\r
57 import org.onap.holmes.common.api.entity.CorrelationRule;
\r
58 import org.onap.holmes.common.exception.CorrelationException;
\r
59 import org.onap.holmes.common.utils.ExceptionUtil;
\r
60 import org.onap.holmes.engine.wrapper.RuleMgtWrapper;
\r
64 public class DroolsEngine {
\r
66 private static final int ENABLE = 1;
\r
67 public static final String UTF_8 = "UTF-8";
\r
68 public static final String K_BASE = "KBase";
\r
69 private static final String RULES_FILE_NAME = "src/main/resources/rules/rule.drl";
\r
70 private final Set<String> packageNames = new HashSet<String>();
\r
73 private RuleMgtWrapper ruleMgtWrapper;
\r
76 private KieBase kieBase;
\r
77 private KieSession kieSession;
\r
78 private KieContainer kieContainer;
\r
79 private KieFileSystem kfs;
\r
80 private KieServices ks;
\r
81 private KieBuilder kieBuilder;
\r
82 private KieResources resources;
\r
83 private KieRepository kieRepository;
\r
85 private AlarmInfoDao alarmInfoDao;
\r
87 private DbDaoUtil daoUtil;
\r
91 private void init() {
\r
92 alarmInfoDao = daoUtil.getJdbiDaoByOnDemand(AlarmInfoDao.class);
\r
96 } catch (Exception e) {
\r
97 log.error("Failed to start the service: " + e.getMessage(), e);
\r
98 throw ExceptionUtil.buildExceptionResponse("Failed to start the drools engine!");
\r
102 private void start() throws AlarmInfoException {
\r
103 log.info("Drools Engine Initialize Beginning...");
\r
105 initEngineParameter();
\r
106 alarmSynchronization();
\r
107 // initDeployRule();
\r
109 log.info("Alarm synchronization Successfully.");
\r
112 public void stop() {
\r
113 this.kieSession.dispose();
\r
116 public void initEngineParameter() {
\r
117 this.ks = KieServices.Factory.get();
\r
118 this.resources = ks.getResources();
\r
119 this.kieRepository = ks.getRepository();
\r
120 this.kfs = createKieFileSystemWithKProject(ks);
\r
122 this.kieBuilder = ks.newKieBuilder(kfs).buildAll();
\r
123 this.kieContainer = ks.newKieContainer(kieRepository.getDefaultReleaseId());
\r
125 this.kieBase = kieContainer.getKieBase();
\r
126 this.kieSession = kieContainer.newKieSession();
\r
129 private void initDeployRule() throws CorrelationException {
\r
130 List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);
\r
132 if (rules.isEmpty()) {
\r
135 for (CorrelationRule rule : rules) {
\r
136 if (rule.getContent() != null) {
\r
137 deployRuleFromDB(rule.getContent());
\r
138 DmaapService.loopControlNames.put(rule.getPackageName(), rule.getClosedControlLoopName());
\r
143 private void deployRuleFromDB(String ruleContent) throws CorrelationException {
\r
145 StringReader reader = new StringReader(ruleContent);
\r
146 kfs.write(RULES_FILE_NAME,
\r
147 this.resources.newReaderResource(reader, UTF_8).setResourceType(ResourceType.DRL));
\r
148 kieBuilder = ks.newKieBuilder(kfs).buildAll();
\r
150 InternalKieModule internalKieModule = (InternalKieModule)kieBuilder.getKieModule();
\r
151 kieContainer.updateToVersion(internalKieModule.getReleaseId());
\r
152 } catch (Exception e) {
\r
153 throw new CorrelationException(e.getMessage(), e);
\r
155 kieSession.fireAllRules();
\r
158 public synchronized String deployRule(DeployRuleRequest rule, Locale locale)
\r
159 throws CorrelationException {
\r
161 StringReader reader = new StringReader(rule.getContent());
\r
162 kfs.write(RULES_FILE_NAME,
\r
163 this.resources.newReaderResource(reader, UTF_8).setResourceType(ResourceType.DRL));
\r
164 kieBuilder = ks.newKieBuilder(kfs).buildAll();
\r
166 judgeRuleContent(locale, kieBuilder, true);
\r
168 InternalKieModule internalKieModule = (InternalKieModule)kieBuilder.getKieModule();;
\r
169 String packageName = internalKieModule.getKnowledgePackagesForKieBase(K_BASE).iterator().next().getName();
\r
171 kieContainer.updateToVersion(internalKieModule.getReleaseId());
\r
172 } catch (Exception e) {
\r
173 throw new CorrelationException("Failed to deploy the rule.", e);
\r
175 packageNames.add(packageName);
\r
176 kieSession.fireAllRules();
\r
177 return packageName;
\r
180 public synchronized void undeployRule(String packageName, Locale locale)
\r
181 throws CorrelationException {
\r
182 KiePackage kiePackage = kieBase.getKiePackage(packageName);
\r
183 if (null == kiePackage) {
\r
184 throw new CorrelationException("The rule " + packageName + " does not exist!");
\r
187 kieBase.removeKiePackage(kiePackage.getName());
\r
188 } catch (Exception e) {
\r
189 throw new CorrelationException("Failed to delete the rule: " + packageName, e);
\r
191 packageNames.remove(kiePackage.getName());
\r
194 public void compileRule(String content, Locale locale)
\r
195 throws CorrelationException {
\r
196 StringReader reader = new StringReader(content);
\r
198 kfs.write(RULES_FILE_NAME,
\r
199 this.resources.newReaderResource(reader, UTF_8).setResourceType(ResourceType.DRL));
\r
201 kieBuilder = ks.newKieBuilder(kfs).buildAll();
\r
203 judgeRuleContent(locale, kieBuilder, false);
\r
206 private void judgeRuleContent(Locale locale, KieBuilder kbuilder, boolean judgePackageName)
\r
207 throws CorrelationException {
\r
208 if (kbuilder.getResults().hasMessages(Message.Level.ERROR)) {
\r
209 String errorMsg = "There are errors in the rule: " + kbuilder.getResults()
\r
210 .getMessages(Level.ERROR).toString();
\r
211 log.error(errorMsg);
\r
212 throw new CorrelationException(errorMsg);
\r
214 InternalKieModule internalKieModule = null;
\r
216 internalKieModule = (InternalKieModule) kbuilder.getKieModule();
\r
217 } catch (Exception e) {
\r
218 throw new CorrelationException("There are errors in the rule!" + e.getMessage(), e);
\r
220 if (internalKieModule == null) {
\r
221 throw new CorrelationException("There are errors in the rule!");
\r
223 String packageName = internalKieModule.getKnowledgePackagesForKieBase(K_BASE).iterator().next().getName();
\r
225 if (queryAllPackage().contains(packageName) && judgePackageName) {
\r
226 throw new CorrelationException("The rule " + packageName + " already exists in the drools engine.");
\r
230 public void putRaisedIntoStream(VesAlarm alarm) {
\r
231 FactHandle factHandle = this.kieSession.getFactHandle(alarm);
\r
232 if (factHandle != null) {
\r
233 Object obj = this.kieSession.getObject(factHandle);
\r
234 if (obj != null && obj instanceof VesAlarm) {
\r
235 alarm.setRootFlag(((VesAlarm) obj).getRootFlag());
\r
237 this.kieSession.delete(factHandle);
\r
239 if (alarm.getAlarmIsCleared() == 1) {
\r
240 alarmInfoDao.deleteClearedAlarm(convertVesAlarm2AlarmInfo(alarm));
\r
243 this.kieSession.insert(alarm);
\r
246 this.kieSession.fireAllRules();
\r
250 public List<String> queryAllPackage() {
\r
251 List<KiePackage> kiePackages = (List<KiePackage>)kieBase.getKiePackages();
\r
252 List<String> list = new ArrayList<>();
\r
253 for(KiePackage kiePackage : kiePackages) {
\r
254 list.add(kiePackage.getName());
\r
259 private KieFileSystem createKieFileSystemWithKProject(KieServices ks) {
\r
260 KieModuleModel kieModuleModel = ks.newKieModuleModel();
\r
261 KieBaseModel kieBaseModel = kieModuleModel.newKieBaseModel(K_BASE)
\r
262 .addPackage("rules")
\r
264 .setEqualsBehavior(EqualityBehaviorOption.EQUALITY)
\r
265 .setEventProcessingMode(EventProcessingOption.STREAM);
\r
266 KieSessionModel kieSessionModel = kieBaseModel.newKieSessionModel("KSession")
\r
267 .setDefault( true )
\r
268 .setType( KieSessionModel.KieSessionType.STATEFUL )
\r
269 .setClockType( ClockTypeOption.get("realtime") );
\r
270 KieFileSystem kfs = ks.newKieFileSystem();
\r
271 kfs.writeKModuleXML(kieModuleModel.toXML());
\r
275 private void avoidDeployBug() {
\r
276 String tmp = Math.random() + "";
\r
277 String rule = "package justInOrderToAvoidDeployBug" + tmp.substring(2);
\r
278 kfs.write(RULES_FILE_NAME, rule);
\r
279 kieBuilder = ks.newKieBuilder(kfs).buildAll();
\r
280 InternalKieModule internalKieModule = (InternalKieModule)kieBuilder.getKieModule();
\r
281 String packageName = internalKieModule.getKnowledgePackagesForKieBase(K_BASE).iterator().next().getName();
\r
282 kieRepository.addKieModule(internalKieModule);
\r
283 kieContainer.updateToVersion(internalKieModule.getReleaseId());
\r
285 KiePackage kiePackage = kieBase.getKiePackage(packageName);
\r
286 kieBase.removeKiePackage(kiePackage.getName());
\r
289 public void alarmSynchronization() throws AlarmInfoException {
\r
290 alarmInfoDao.queryAllAlarm().forEach(alarmInfo -> alarmInfoDao.deleteClearedAlarm(alarmInfo));
\r
291 alarmInfoDao.queryAllAlarm().forEach(alarmInfo -> putRaisedIntoStream(convertAlarmInfo2VesAlarm(alarmInfo)));
\r
294 private VesAlarm convertAlarmInfo2VesAlarm(AlarmInfo alarmInfo) {
\r
295 VesAlarm vesAlarm = new VesAlarm();
\r
296 vesAlarm.setEventId(alarmInfo.getEventId());
\r
297 vesAlarm.setEventName(alarmInfo.getEventName());
\r
298 vesAlarm.setStartEpochMicrosec(alarmInfo.getStartEpochMicroSec());
\r
299 vesAlarm.setSourceId(alarmInfo.getSourceId());
\r
300 vesAlarm.setSourceName(alarmInfo.getSourceName());
\r
301 vesAlarm.setRootFlag(alarmInfo.getRootFlag());
\r
302 vesAlarm.setAlarmIsCleared(alarmInfo.getAlarmIsCleared());
\r
303 vesAlarm.setLastEpochMicrosec(alarmInfo.getLastEpochMicroSec());
\r
307 private AlarmInfo convertVesAlarm2AlarmInfo(VesAlarm vesAlarm){
\r
308 AlarmInfo alarmInfo = new AlarmInfo();
\r
309 alarmInfo.setEventId(vesAlarm.getEventId());
\r
310 alarmInfo.setEventName(vesAlarm.getEventName());
\r
311 alarmInfo.setStartEpochMicroSec(vesAlarm.getStartEpochMicrosec());
\r
312 alarmInfo.setLastEpochMicroSec(vesAlarm.getLastEpochMicrosec());
\r
313 alarmInfo.setSourceId(vesAlarm.getSourceId());
\r
314 alarmInfo.setSourceName(vesAlarm.getSourceName());
\r
315 alarmInfo.setAlarmIsCleared(vesAlarm.getAlarmIsCleared());
\r
316 alarmInfo.setRootFlag(vesAlarm.getRootFlag());
\r