2 * Copyright 2017 ZTE Corporation.
\r
4 * Licensed under the Apache License, Version 2.0 (the "License");
\r
5 * you may not use this file except in compliance with the License.
\r
6 * You may obtain a copy of the License at
\r
8 * http://www.apache.org/licenses/LICENSE-2.0
\r
10 * Unless required by applicable law or agreed to in writing, software
\r
11 * distributed under the License is distributed on an "AS IS" BASIS,
\r
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
13 * See the License for the specific language governing permissions and
\r
14 * limitations under the License.
\r
16 package org.onap.holmes.engine.manager;
\r
17 import java.io.StringReader;
\r
18 import java.util.ArrayList;
\r
19 import java.util.HashSet;
\r
20 import java.util.List;
\r
21 import java.util.Locale;
\r
22 import java.util.Set;
\r
23 import javax.annotation.PostConstruct;
\r
24 import javax.inject.Inject;
\r
25 import lombok.extern.slf4j.Slf4j;
\r
26 import org.drools.compiler.kie.builder.impl.InternalKieModule;
\r
27 import org.jvnet.hk2.annotations.Service;
\r
29 import org.kie.api.KieBase;
\r
30 import org.kie.api.KieServices;
\r
31 import org.kie.api.builder.KieBuilder;
\r
32 import org.kie.api.builder.KieFileSystem;
\r
33 import org.kie.api.builder.KieRepository;
\r
34 import org.kie.api.builder.Message;
\r
35 import org.kie.api.builder.Message.Level;
\r
36 import org.kie.api.builder.model.KieBaseModel;
\r
37 import org.kie.api.builder.model.KieModuleModel;
\r
38 import org.kie.api.builder.model.KieSessionModel;
\r
39 import org.kie.api.conf.EqualityBehaviorOption;
\r
40 import org.kie.api.conf.EventProcessingOption;
\r
41 import org.kie.api.definition.KiePackage;
\r
42 import org.kie.api.io.KieResources;
\r
43 import org.kie.api.io.ResourceType;
\r
44 import org.kie.api.runtime.KieContainer;
\r
45 import org.kie.api.runtime.KieSession;
\r
46 import org.kie.api.runtime.conf.ClockTypeOption;
\r
47 import org.kie.api.runtime.rule.FactHandle;
\r
49 import org.onap.holmes.common.api.entity.AlarmInfo;
\r
51 import org.onap.holmes.common.api.stat.VesAlarm;
\r
52 import org.onap.holmes.common.dmaap.DmaapService;
\r
53 import org.onap.holmes.common.exception.AlarmInfoException;
\r
54 import org.onap.holmes.common.utils.DbDaoUtil;
\r
55 import org.onap.holmes.engine.db.AlarmInfoDao;
\r
56 import org.onap.holmes.engine.request.DeployRuleRequest;
\r
57 import org.onap.holmes.common.api.entity.CorrelationRule;
\r
58 import org.onap.holmes.common.exception.CorrelationException;
\r
59 import org.onap.holmes.common.utils.ExceptionUtil;
\r
60 import org.onap.holmes.engine.wrapper.RuleMgtWrapper;
\r
64 public class DroolsEngine {
\r
66 private final static int ENABLE = 1;
\r
67 private final Set<String> packageNames = new HashSet<String>();
\r
69 private RuleMgtWrapper ruleMgtWrapper;
\r
72 private KieBase kieBase;
\r
73 private KieSession kieSession;
\r
74 private KieContainer kieContainer;
\r
75 private KieFileSystem kfs;
\r
76 private KieServices ks;
\r
77 private KieBuilder kieBuilder;
\r
78 private KieResources resources;
\r
79 private KieRepository kieRepository;
\r
81 private AlarmInfoDao alarmInfoDao;
\r
83 private DbDaoUtil daoUtil;
\r
87 private void init() {
\r
88 alarmInfoDao = daoUtil.getJdbiDaoByOnDemand(AlarmInfoDao.class);
\r
92 } catch (Exception e) {
\r
93 log.error("Failed to start the service: " + e.getMessage(), e);
\r
94 throw ExceptionUtil.buildExceptionResponse("Failed to start the drools engine!");
\r
98 private void start() throws AlarmInfoException {
\r
99 log.info("Drools Engine Initialize Beginning...");
\r
101 initEngineParameter();
\r
102 alarmSynchronization();
\r
103 // initDeployRule();
\r
105 log.info("Alarm synchronization Successfully.");
\r
108 public void stop() {
\r
109 this.kieSession.dispose();
\r
112 public void initEngineParameter() {
\r
113 this.ks = KieServices.Factory.get();
\r
114 this.resources = ks.getResources();
\r
115 this.kieRepository = ks.getRepository();
\r
116 this.kfs = createKieFileSystemWithKProject(ks);
\r
118 this.kieBuilder = ks.newKieBuilder(kfs).buildAll();
\r
119 this.kieContainer = ks.newKieContainer(kieRepository.getDefaultReleaseId());
\r
121 this.kieBase = kieContainer.getKieBase();
\r
122 this.kieSession = kieContainer.newKieSession();
\r
125 private void initDeployRule() throws CorrelationException {
\r
126 List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);
\r
128 if (rules.isEmpty()) {
\r
131 for (CorrelationRule rule : rules) {
\r
132 if (rule.getContent() != null) {
\r
133 deployRuleFromDB(rule.getContent());
\r
134 DmaapService.loopControlNames.put(rule.getPackageName(), rule.getClosedControlLoopName());
\r
139 private void deployRuleFromDB(String ruleContent) throws CorrelationException {
\r
141 StringReader reader = new StringReader(ruleContent);
\r
142 kfs.write("src/main/resources/rules/rule.drl",
\r
143 this.resources.newReaderResource(reader,"UTF-8").setResourceType(ResourceType.DRL));
\r
144 kieBuilder = ks.newKieBuilder(kfs).buildAll();
\r
146 InternalKieModule internalKieModule = (InternalKieModule)kieBuilder.getKieModule();
\r
147 kieContainer.updateToVersion(internalKieModule.getReleaseId());
\r
148 } catch (Exception e) {
\r
149 throw new CorrelationException(e.getMessage(), e);
\r
151 kieSession.fireAllRules();
\r
154 public synchronized String deployRule(DeployRuleRequest rule, Locale locale)
\r
155 throws CorrelationException {
\r
157 StringReader reader = new StringReader(rule.getContent());
\r
158 kfs.write("src/main/resources/rules/rule.drl",
\r
159 this.resources.newReaderResource(reader,"UTF-8").setResourceType(ResourceType.DRL));
\r
160 kieBuilder = ks.newKieBuilder(kfs).buildAll();
\r
162 judgeRuleContent(locale, kieBuilder, true);
\r
164 InternalKieModule internalKieModule = (InternalKieModule)kieBuilder.getKieModule();;
\r
165 String packageName = internalKieModule.getKnowledgePackagesForKieBase("KBase").iterator().next().getName();
\r
167 kieContainer.updateToVersion(internalKieModule.getReleaseId());
\r
168 } catch (Exception e) {
\r
169 throw new CorrelationException("Failed to deploy the rule.", e);
\r
171 packageNames.add(packageName);
\r
172 kieSession.fireAllRules();
\r
173 return packageName;
\r
176 public synchronized void undeployRule(String packageName, Locale locale)
\r
177 throws CorrelationException {
\r
178 KiePackage kiePackage = kieBase.getKiePackage(packageName);
\r
179 if (null == kiePackage) {
\r
180 throw new CorrelationException("The rule " + packageName + " does not exist!");
\r
183 kieBase.removeKiePackage(kiePackage.getName());
\r
184 } catch (Exception e) {
\r
185 throw new CorrelationException("Failed to delete the rule: " + packageName, e);
\r
187 packageNames.remove(kiePackage.getName());
\r
190 public void compileRule(String content, Locale locale)
\r
191 throws CorrelationException {
\r
192 StringReader reader = new StringReader(content);
\r
194 kfs.write("src/main/resources/rules/rule.drl",
\r
195 this.resources.newReaderResource(reader,"UTF-8").setResourceType(ResourceType.DRL));
\r
197 kieBuilder = ks.newKieBuilder(kfs).buildAll();
\r
199 judgeRuleContent(locale, kieBuilder, false);
\r
202 private void judgeRuleContent(Locale locale, KieBuilder kbuilder, boolean judgePackageName)
\r
203 throws CorrelationException {
\r
204 if (kbuilder.getResults().hasMessages(Message.Level.ERROR)) {
\r
205 String errorMsg = "There are errors in the rule: " + kbuilder.getResults()
\r
206 .getMessages(Level.ERROR).toString();
\r
207 log.error(errorMsg);
\r
208 throw new CorrelationException(errorMsg);
\r
210 InternalKieModule internalKieModule = null;
\r
212 internalKieModule = (InternalKieModule) kbuilder.getKieModule();
\r
213 } catch (Exception e) {
\r
214 throw new CorrelationException("There are errors in the rule!" + e.getMessage(), e);
\r
216 if (internalKieModule == null) {
\r
217 throw new CorrelationException("There are errors in the rule!");
\r
219 String packageName = internalKieModule.getKnowledgePackagesForKieBase("KBase").iterator().next().getName();
\r
221 if (queryAllPackage().contains(packageName) && judgePackageName) {
\r
222 throw new CorrelationException("The rule " + packageName + " already exists in the drools engine.");
\r
226 public void putRaisedIntoStream(VesAlarm alarm) {
\r
227 FactHandle factHandle = this.kieSession.getFactHandle(alarm);
\r
228 if (factHandle != null) {
\r
229 Object obj = this.kieSession.getObject(factHandle);
\r
230 if (obj != null && obj instanceof VesAlarm) {
\r
231 alarm.setRootFlag(((VesAlarm) obj).getRootFlag());
\r
233 this.kieSession.delete(factHandle);
\r
235 if (alarm.getAlarmIsCleared() == 1) {
\r
236 alarmInfoDao.deleteClearedAlarm(convertVesAlarm2AlarmInfo(alarm));
\r
239 this.kieSession.insert(alarm);
\r
242 this.kieSession.fireAllRules();
\r
246 public List<String> queryAllPackage() {
\r
247 List<KiePackage> kiePackages = (List<KiePackage>)kieBase.getKiePackages();
\r
248 List<String> list = new ArrayList<>();
\r
249 for(KiePackage kiePackage : kiePackages) {
\r
250 list.add(kiePackage.getName());
\r
255 private KieFileSystem createKieFileSystemWithKProject(KieServices ks) {
\r
256 KieModuleModel kieModuleModel = ks.newKieModuleModel();
\r
257 KieBaseModel kieBaseModel = kieModuleModel.newKieBaseModel("KBase")
\r
258 .addPackage("rules")
\r
260 .setEqualsBehavior(EqualityBehaviorOption.EQUALITY)
\r
261 .setEventProcessingMode(EventProcessingOption.STREAM);
\r
262 KieSessionModel kieSessionModel = kieBaseModel.newKieSessionModel("KSession")
\r
263 .setDefault( true )
\r
264 .setType( KieSessionModel.KieSessionType.STATEFUL )
\r
265 .setClockType( ClockTypeOption.get("realtime") );
\r
266 KieFileSystem kfs = ks.newKieFileSystem();
\r
267 kfs.writeKModuleXML(kieModuleModel.toXML());
\r
271 private void avoidDeployBug() {
\r
272 String tmp = Math.random() + "";
\r
273 String rule = "package justInOrderToAvoidDeployBug" + tmp.substring(2);
\r
274 kfs.write("src/main/resources/rules/rule.drl", rule);
\r
275 kieBuilder = ks.newKieBuilder(kfs).buildAll();
\r
276 InternalKieModule internalKieModule = (InternalKieModule)kieBuilder.getKieModule();
\r
277 String packageName = internalKieModule.getKnowledgePackagesForKieBase("KBase").iterator().next().getName();
\r
278 kieRepository.addKieModule(internalKieModule);
\r
279 kieContainer.updateToVersion(internalKieModule.getReleaseId());
\r
281 KiePackage kiePackage = kieBase.getKiePackage(packageName);
\r
282 kieBase.removeKiePackage(kiePackage.getName());
\r
285 public void alarmSynchronization() throws AlarmInfoException {
\r
286 alarmInfoDao.queryAllAlarm().forEach(alarmInfo -> alarmInfoDao.deleteClearedAlarm(alarmInfo));
\r
287 alarmInfoDao.queryAllAlarm().forEach(alarmInfo -> putRaisedIntoStream(convertAlarmInfo2VesAlarm(alarmInfo)));
\r
290 private VesAlarm convertAlarmInfo2VesAlarm(AlarmInfo alarmInfo) {
\r
291 VesAlarm vesAlarm = new VesAlarm();
\r
292 vesAlarm.setEventId(alarmInfo.getEventId());
\r
293 vesAlarm.setEventName(alarmInfo.getEventName());
\r
294 vesAlarm.setStartEpochMicrosec(alarmInfo.getStartEpochMicroSec());
\r
295 vesAlarm.setSourceId(alarmInfo.getSourceId());
\r
296 vesAlarm.setSourceName(alarmInfo.getSourceName());
\r
297 vesAlarm.setRootFlag(alarmInfo.getRootFlag());
\r
298 vesAlarm.setAlarmIsCleared(alarmInfo.getAlarmIsCleared());
\r
299 vesAlarm.setLastEpochMicrosec(alarmInfo.getLastEpochMicroSec());
\r
303 private AlarmInfo convertVesAlarm2AlarmInfo(VesAlarm vesAlarm){
\r
304 AlarmInfo alarmInfo = new AlarmInfo();
\r
305 alarmInfo.setEventId(vesAlarm.getEventId());
\r
306 alarmInfo.setEventName(vesAlarm.getEventName());
\r
307 alarmInfo.setStartEpochMicroSec(vesAlarm.getStartEpochMicrosec());
\r
308 alarmInfo.setLastEpochMicroSec(vesAlarm.getLastEpochMicrosec());
\r
309 alarmInfo.setSourceId(vesAlarm.getSourceId());
\r
310 alarmInfo.setSourceName(vesAlarm.getSourceName());
\r
311 alarmInfo.setAlarmIsCleared(vesAlarm.getAlarmIsCleared());
\r
312 alarmInfo.setRootFlag(vesAlarm.getRootFlag());
\r