*/
package org.onap.holmes.engine.manager;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-import javax.annotation.PostConstruct;
-import javax.inject.Inject;
-
import lombok.extern.slf4j.Slf4j;
import org.drools.compiler.kie.builder.impl.InternalKieModule;
import org.drools.core.util.StringUtils;
import org.jvnet.hk2.annotations.Service;
-
import org.kie.api.KieServices;
import org.kie.api.builder.*;
import org.kie.api.builder.Message.Level;
+import org.kie.api.builder.model.KieBaseModel;
+import org.kie.api.builder.model.KieModuleModel;
+import org.kie.api.builder.model.KieSessionModel;
+import org.kie.api.conf.EqualityBehaviorOption;
import org.kie.api.io.Resource;
import org.kie.api.runtime.KieContainer;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.rule.FactHandle;
-
import org.onap.holmes.common.api.entity.AlarmInfo;
-
+import org.onap.holmes.common.api.entity.CorrelationRule;
import org.onap.holmes.common.api.stat.VesAlarm;
-import org.onap.holmes.common.dmaap.DmaapService;
+import org.onap.holmes.common.config.MicroServiceConfig;
+import org.onap.holmes.common.dmaap.store.ClosedLoopControlNameCache;
import org.onap.holmes.common.exception.AlarmInfoException;
+import org.onap.holmes.common.exception.CorrelationException;
import org.onap.holmes.common.utils.DbDaoUtil;
+import org.onap.holmes.common.utils.ExceptionUtil;
import org.onap.holmes.engine.db.AlarmInfoDao;
import org.onap.holmes.engine.request.DeployRuleRequest;
-import org.onap.holmes.common.api.entity.CorrelationRule;
-import org.onap.holmes.common.exception.CorrelationException;
-import org.onap.holmes.common.utils.ExceptionUtil;
import org.onap.holmes.engine.wrapper.RuleMgtWrapper;
+import javax.annotation.PostConstruct;
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
@Slf4j
@Service
public class DroolsEngine {
- @Inject
+ private final static int ENABLE = 1;
+ private final Map<String, String> deployed = new ConcurrentHashMap<>();
private RuleMgtWrapper ruleMgtWrapper;
- @Inject
private DbDaoUtil daoUtil;
-
- private final static int ENABLE = 1;
+ private ClosedLoopControlNameCache closedLoopControlNameCache;
private AlarmInfoDao alarmInfoDao;
- private final Map<String, String> deployed = new ConcurrentHashMap<>();
private KieServices ks = KieServices.Factory.get();
private ReleaseId releaseId = ks.newReleaseId("org.onap.holmes", "rules", "1.0.0-SNAPSHOT");
private ReleaseId compilationRelease = ks.newReleaseId("org.onap.holmes", "compilation", "1.0.0-SNAPSHOT");
private KieContainer container;
private KieSession session;
+ private String instanceIp;
+
+ @Inject
+ public void setRuleMgtWrapper(RuleMgtWrapper ruleMgtWrapper) {
+ this.ruleMgtWrapper = ruleMgtWrapper;
+ }
+
+ @Inject
+ public void setDaoUtil(DbDaoUtil daoUtil) {
+ this.daoUtil = daoUtil;
+ }
+
+ @Inject
+ public void setClosedLoopControlNameCache(ClosedLoopControlNameCache closedLoopControlNameCache) {
+ this.closedLoopControlNameCache = closedLoopControlNameCache;
+ }
@PostConstruct
private void init() {
alarmInfoDao = daoUtil.getJdbiDaoByOnDemand(AlarmInfoDao.class);
+ instanceIp = MicroServiceConfig.getMicroServiceIpAndPort()[0];
try {
log.info("Drools engine initializing...");
initEngine();
} catch (Exception e) {
log.error("Failed to initialize the engine service module.", e);
}
- if(null!=km) {
- container = ks.newKieContainer(km.getReleaseId());
+ if (null != km) {
+ container = ks.newKieContainer(km.getReleaseId());
}
session = container.newKieSession();
deployed.clear();
}
private void initRules() throws CorrelationException {
- List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);
+ List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE)
+ .stream()
+ .filter(r -> r.getEngineInstance().equals(instanceIp))
+ .collect(Collectors.toList());
+
if (rules.isEmpty()) {
return;
}
for (CorrelationRule rule : rules) {
if (!StringUtils.isEmpty(rule.getContent())) {
deployRule(rule.getContent());
- DmaapService.loopControlNames.put(rule.getPackageName(), rule.getClosedControlLoopName());
+ closedLoopControlNameCache.put(rule.getPackageName(), rule.getClosedControlLoopName());
}
}
}
public void syncAlarms() throws AlarmInfoException {
- alarmInfoDao.queryAllAlarm().forEach(alarmInfo -> alarmInfoDao.deleteClearedAlarm(alarmInfo));
alarmInfoDao.queryAllAlarm().forEach(alarmInfo -> putRaisedIntoStream(convertAlarmInfo2VesAlarm(alarmInfo)));
}
alarm.setRootFlag(((VesAlarm) obj).getRootFlag());
}
this.session.delete(factHandle);
-
- if (alarm.getAlarmIsCleared() == 1) {
- alarmInfoDao.deleteClearedAlarm(convertVesAlarm2AlarmInfo(alarm));
- }
- } else {
- this.session.insert(alarm);
}
+ this.session.insert(alarm);
+
this.session.fireAllRules();
}
.collect(Collectors.toList());
}
-
-
private VesAlarm convertAlarmInfo2VesAlarm(AlarmInfo alarmInfo) {
VesAlarm vesAlarm = new VesAlarm();
vesAlarm.setEventId(alarmInfo.getEventId());
}
private byte[] createJar(KieServices ks, ReleaseId releaseId, List<String> drls) throws CorrelationException {
- KieFileSystem kfs = ks.newKieFileSystem().generateAndWritePomXML(releaseId);
+ KieModuleModel kieModuleModel = ks.newKieModuleModel();
+ KieBaseModel kieBaseModel = kieModuleModel.newKieBaseModel("KBase")
+ .setDefault(true)
+ .setEqualsBehavior(EqualityBehaviorOption.EQUALITY);
+ kieBaseModel.newKieSessionModel("KSession")
+ .setDefault(true)
+ .setType(KieSessionModel.KieSessionType.STATEFUL);
+ KieFileSystem kfs = ks.newKieFileSystem().writeKModuleXML(kieModuleModel.toXML()).generateAndWritePomXML(releaseId);
+
int i = 0;
for (String drl : drls) {
if (!StringUtils.isEmpty(drl)) {