2 * Copyright 2017 ZTE Corporation.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onap.holmes.engine.manager;
18 import lombok.extern.slf4j.Slf4j;
19 import org.drools.compiler.kie.builder.impl.InternalKieModule;
20 import org.drools.core.util.StringUtils;
21 import org.jvnet.hk2.annotations.Service;
22 import org.kie.api.KieServices;
23 import org.kie.api.builder.*;
24 import org.kie.api.builder.Message.Level;
25 import org.kie.api.builder.model.KieBaseModel;
26 import org.kie.api.builder.model.KieModuleModel;
27 import org.kie.api.builder.model.KieSessionModel;
28 import org.kie.api.conf.EqualityBehaviorOption;
29 import org.kie.api.io.Resource;
30 import org.kie.api.runtime.KieContainer;
31 import org.kie.api.runtime.KieSession;
32 import org.kie.api.runtime.rule.FactHandle;
33 import org.onap.holmes.common.api.entity.AlarmInfo;
34 import org.onap.holmes.common.api.entity.CorrelationRule;
35 import org.onap.holmes.common.api.stat.VesAlarm;
36 import org.onap.holmes.common.config.MicroServiceConfig;
37 import org.onap.holmes.common.dmaap.store.ClosedLoopControlNameCache;
38 import org.onap.holmes.common.exception.AlarmInfoException;
39 import org.onap.holmes.common.exception.CorrelationException;
40 import org.onap.holmes.common.utils.DbDaoUtil;
41 import org.onap.holmes.common.utils.ExceptionUtil;
42 import org.onap.holmes.engine.db.AlarmInfoDao;
43 import org.onap.holmes.engine.request.DeployRuleRequest;
44 import org.onap.holmes.engine.wrapper.RuleMgtWrapper;
46 import javax.annotation.PostConstruct;
47 import javax.inject.Inject;
48 import java.util.ArrayList;
49 import java.util.List;
51 import java.util.concurrent.ConcurrentHashMap;
52 import java.util.stream.Collectors;
56 public class DroolsEngine {
58 private final static int ENABLE = 1;
59 private final Map<String, String> deployed = new ConcurrentHashMap<>();
60 private RuleMgtWrapper ruleMgtWrapper;
61 private DbDaoUtil daoUtil;
62 private ClosedLoopControlNameCache closedLoopControlNameCache;
63 private AlarmInfoDao alarmInfoDao;
64 private KieServices ks = KieServices.Factory.get();
65 private ReleaseId releaseId = ks.newReleaseId("org.onap.holmes", "rules", "1.0.0-SNAPSHOT");
66 private ReleaseId compilationRelease = ks.newReleaseId("org.onap.holmes", "compilation", "1.0.0-SNAPSHOT");
67 private KieContainer container;
68 private KieSession session;
69 private String instanceIp;
72 public void setRuleMgtWrapper(RuleMgtWrapper ruleMgtWrapper) {
73 this.ruleMgtWrapper = ruleMgtWrapper;
77 public void setDaoUtil(DbDaoUtil daoUtil) {
78 this.daoUtil = daoUtil;
82 public void setClosedLoopControlNameCache(ClosedLoopControlNameCache closedLoopControlNameCache) {
83 this.closedLoopControlNameCache = closedLoopControlNameCache;
88 alarmInfoDao = daoUtil.getJdbiDaoByOnDemand(AlarmInfoDao.class);
89 instanceIp = MicroServiceConfig.getMicroServiceIpAndPort()[0];
91 log.info("Drools engine initializing...");
93 log.info("Drools engine initialized.");
95 log.info("Start deploy existing rules...");
97 log.info("All rules were deployed.");
99 log.info("Synchronizing alarms...");
101 log.info("Alarm synchronization succeeded.");
102 } catch (Exception e) {
103 log.error("Failed to startup the engine of Holmes: " + e.getMessage(), e);
104 throw ExceptionUtil.buildExceptionResponse("Failed to startup Drools!");
112 public void initEngine() {
115 String drl = "package holmes;";
116 deployed.put(getPackageName(drl), drl);
117 km = createAndDeployJar(ks, releaseId, new ArrayList<>(deployed.values()));
118 } catch (Exception e) {
119 log.error("Failed to initialize the engine service module.", e);
122 container = ks.newKieContainer(km.getReleaseId());
124 session = container.newKieSession();
128 private void initRules() throws CorrelationException {
129 List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE)
131 .filter(r -> r.getEngineInstance().equals(instanceIp))
132 .collect(Collectors.toList());
134 if (rules.isEmpty()) {
138 for (CorrelationRule rule : rules) {
139 if (!StringUtils.isEmpty(rule.getContent())) {
140 deployRule(rule.getContent());
141 closedLoopControlNameCache.put(rule.getPackageName(), rule.getClosedControlLoopName());
145 session.fireAllRules();
148 public void syncAlarms() throws AlarmInfoException {
149 alarmInfoDao.queryAllAlarm().forEach(alarmInfo -> putRaisedIntoStream(convertAlarmInfo2VesAlarm(alarmInfo)));
152 public String deployRule(DeployRuleRequest rule) throws CorrelationException {
153 return deployRule(rule.getContent());
156 private synchronized String deployRule(String rule) throws CorrelationException {
157 final String packageName = getPackageName(rule);
159 if (StringUtils.isEmpty(packageName)) {
160 throw new CorrelationException("The package name can not be empty.");
163 if (deployed.containsKey(packageName)) {
164 throw new CorrelationException("A rule with the same package name already exists in the system.");
167 if (!StringUtils.isEmpty(rule)) {
168 deployed.put(packageName, rule);
171 } catch (CorrelationException e) {
172 deployed.remove(packageName);
175 session.fireAllRules();
181 public synchronized void undeployRule(String packageName) throws CorrelationException {
183 if (StringUtils.isEmpty(packageName)) {
184 throw new CorrelationException("The package name should not be null.");
187 if (!deployed.containsKey(packageName)) {
188 throw new CorrelationException("The rule " + packageName + " does not exist!");
191 String removed = deployed.remove(packageName);
194 } catch (Exception e) {
195 deployed.put(packageName, removed);
196 throw new CorrelationException("Failed to delete the rule: " + packageName, e);
200 private void refreshInMemRules() throws CorrelationException {
201 KieModule km = createAndDeployJar(ks, releaseId, new ArrayList<>(deployed.values()));
202 container.updateToVersion(km.getReleaseId());
205 public void compileRule(String content)
206 throws CorrelationException {
208 KieFileSystem kfs = ks.newKieFileSystem().generateAndWritePomXML(compilationRelease);
209 kfs.write("src/main/resources/rules/rule.drl", content);
210 KieBuilder builder = ks.newKieBuilder(kfs).buildAll();
211 if (builder.getResults().hasMessages(Message.Level.ERROR)) {
212 String errorMsg = "There are errors in the rule: " + builder.getResults()
213 .getMessages(Level.ERROR).toString();
214 log.info("Compilation failure: " + errorMsg);
215 throw new CorrelationException(errorMsg);
218 if (deployed.containsKey(getPackageName(content))) {
219 throw new CorrelationException("There's no compilation error. But a rule with the same package name already " +
220 "exists in the engine, which may cause a deployment failure.");
223 ks.getRepository().removeKieModule(compilationRelease);
226 public void putRaisedIntoStream(VesAlarm alarm) {
227 FactHandle factHandle = this.session.getFactHandle(alarm);
228 if (factHandle != null) {
229 Object obj = this.session.getObject(factHandle);
230 if (obj != null && obj instanceof VesAlarm) {
231 alarm.setRootFlag(((VesAlarm) obj).getRootFlag());
233 this.session.delete(factHandle);
236 this.session.insert(alarm);
238 this.session.fireAllRules();
241 public List<String> queryPackagesFromEngine() {
242 return container.getKieBase().getKiePackages().stream()
243 .filter(pkg -> pkg.getRules().size() != 0)
244 .map(pkg -> pkg.getName())
245 .collect(Collectors.toList());
248 private VesAlarm convertAlarmInfo2VesAlarm(AlarmInfo alarmInfo) {
249 VesAlarm vesAlarm = new VesAlarm();
250 vesAlarm.setEventId(alarmInfo.getEventId());
251 vesAlarm.setEventName(alarmInfo.getEventName());
252 vesAlarm.setStartEpochMicrosec(alarmInfo.getStartEpochMicroSec());
253 vesAlarm.setSourceId(alarmInfo.getSourceId());
254 vesAlarm.setSourceName(alarmInfo.getSourceName());
255 vesAlarm.setRootFlag(alarmInfo.getRootFlag());
256 vesAlarm.setAlarmIsCleared(alarmInfo.getAlarmIsCleared());
257 vesAlarm.setLastEpochMicrosec(alarmInfo.getLastEpochMicroSec());
261 private AlarmInfo convertVesAlarm2AlarmInfo(VesAlarm vesAlarm) {
262 AlarmInfo alarmInfo = new AlarmInfo();
263 alarmInfo.setEventId(vesAlarm.getEventId());
264 alarmInfo.setEventName(vesAlarm.getEventName());
265 alarmInfo.setStartEpochMicroSec(vesAlarm.getStartEpochMicrosec());
266 alarmInfo.setLastEpochMicroSec(vesAlarm.getLastEpochMicrosec());
267 alarmInfo.setSourceId(vesAlarm.getSourceId());
268 alarmInfo.setSourceName(vesAlarm.getSourceName());
269 alarmInfo.setAlarmIsCleared(vesAlarm.getAlarmIsCleared());
270 alarmInfo.setRootFlag(vesAlarm.getRootFlag());
275 private String getPackageName(String contents) {
276 String ret = contents.trim();
277 StringBuilder stringBuilder = new StringBuilder();
278 if (ret.startsWith("package")) {
279 ret = ret.substring(7).trim();
280 for (int i = 0; i < ret.length(); i++) {
281 char tmp = ret.charAt(i);
282 if (tmp == ';' || tmp == ' ' || tmp == '\n') {
285 stringBuilder.append(tmp);
288 return stringBuilder.toString();
291 private KieModule createAndDeployJar(KieServices ks, ReleaseId releaseId, List<String> drls) throws CorrelationException {
292 byte[] jar = createJar(ks, releaseId, drls);
293 KieModule km = deployJarIntoRepository(ks, jar);
297 private byte[] createJar(KieServices ks, ReleaseId releaseId, List<String> drls) throws CorrelationException {
298 KieModuleModel kieModuleModel = ks.newKieModuleModel();
299 KieBaseModel kieBaseModel = kieModuleModel.newKieBaseModel("KBase")
301 .setEqualsBehavior(EqualityBehaviorOption.EQUALITY);
302 kieBaseModel.newKieSessionModel("KSession")
304 .setType(KieSessionModel.KieSessionType.STATEFUL);
305 KieFileSystem kfs = ks.newKieFileSystem().writeKModuleXML(kieModuleModel.toXML()).generateAndWritePomXML(releaseId);
308 for (String drl : drls) {
309 if (!StringUtils.isEmpty(drl)) {
310 kfs.write("src/main/resources/" + getPackageName(drl) + ".drl", drl);
313 KieBuilder kb = ks.newKieBuilder(kfs).buildAll();
314 if (kb.getResults().hasMessages(Message.Level.ERROR)) {
315 StringBuilder sb = new StringBuilder();
316 for (Message msg : kb.getResults().getMessages()) {
317 sb.append(String.format("[%s]Line: %d, Col: %d\t%s\n", msg.getLevel().toString(), msg.getLine(),
318 msg.getColumn(), msg.getText()));
320 throw new CorrelationException("Failed to compile JAR. Details: \n" + sb.toString());
323 InternalKieModule kieModule = (InternalKieModule) ks.getRepository()
324 .getKieModule(releaseId);
326 return kieModule.getBytes();
329 private KieModule deployJarIntoRepository(KieServices ks, byte[] jar) {
330 Resource jarRes = ks.getResources().newByteArrayResource(jar);
331 return ks.getRepository().addKieModule(jarRes);