X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=engine-d%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fholmes%2Fengine%2Fmanager%2FDroolsEngine.java;h=74724579c6617890ec492ee4b4e169a58cfb8a01;hb=163dd74935802f562ef7272cadf0f88b6a63960a;hp=5d1f4421ceb9a2938e17d070d969840351c8a525;hpb=8abc672e7368792cf8705e8f2ad48b28a9d4864d;p=holmes%2Fengine-management.git diff --git a/engine-d/src/main/java/org/onap/holmes/engine/manager/DroolsEngine.java b/engine-d/src/main/java/org/onap/holmes/engine/manager/DroolsEngine.java index 5d1f442..7472457 100644 --- a/engine-d/src/main/java/org/onap/holmes/engine/manager/DroolsEngine.java +++ b/engine-d/src/main/java/org/onap/holmes/engine/manager/DroolsEngine.java @@ -1,286 +1,308 @@ -/** - * Copyright 2017 ZTE Corporation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onap.holmes.engine.manager; - - -import java.io.Serializable; -import java.io.StringReader; -import java.util.HashSet; -import java.util.List; -import java.util.Locale; -import java.util.Set; -import javax.annotation.PostConstruct; -import javax.inject.Inject; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; -import lombok.extern.slf4j.Slf4j; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.command.ActiveMQObjectMessage; -import org.drools.KnowledgeBase; -import org.drools.KnowledgeBaseConfiguration; -import org.drools.KnowledgeBaseFactory; -import org.drools.builder.KnowledgeBuilder; -import org.drools.builder.KnowledgeBuilderFactory; -import org.drools.builder.ResourceType; -import org.drools.conf.EventProcessingOption; -import org.drools.definition.KnowledgePackage; -import org.drools.io.Resource; -import org.drools.io.ResourceFactory; -import org.drools.runtime.StatefulKnowledgeSession; -import org.drools.runtime.rule.FactHandle; -import org.glassfish.hk2.api.IterableProvider; -import org.jvnet.hk2.annotations.Service; -import org.onap.holmes.engine.request.DeployRuleRequest; -import org.onap.holmes.common.api.entity.CorrelationRule; -import org.onap.holmes.common.api.stat.Alarm; -import org.onap.holmes.common.config.MQConfig; -import org.onap.holmes.common.constant.AlarmConst; -import org.onap.holmes.common.exception.CorrelationException; -import org.onap.holmes.common.utils.ExceptionUtil; -import org.onap.holmes.engine.wrapper.RuleMgtWrapper; - -@Slf4j -@Service -public class DroolsEngine { - - private final static int ENABLE = 1; - private final Set packageNames = new HashSet(); - @Inject - private RuleMgtWrapper ruleMgtWrapper; - private KnowledgeBase kbase; - private KnowledgeBaseConfiguration kconf; - private StatefulKnowledgeSession ksession; - @Inject - private IterableProvider mqConfigProvider; - private ConnectionFactory connectionFactory; - - @PostConstruct - private void init() { - try { - // 1. start engine - start(); - // 2. start mq listener - registerAlarmTopicListener(); - } catch (Exception e) { - log.error("Failed to start the service: " + e.getMessage(), e); - throw ExceptionUtil.buildExceptionResponse("Failed to start the drools engine!"); - } - } - - private void registerAlarmTopicListener() { - String brokerURL = - "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort; - connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername, - mqConfigProvider.get().brokerPassword, brokerURL); - - AlarmMqMessageListener listener = new AlarmMqMessageListener(); - listener.receive(); - } - - - private void start() throws CorrelationException { - log.info("Drools Engine Initialize Beginning..."); - - initEngineParameter(); - initDeployRule(); - - log.info("Business Rule Engine Initialize Successfully."); - } - - public void stop() { - this.ksession.dispose(); - } - - private void initEngineParameter() { - this.kconf = KnowledgeBaseFactory.newKnowledgeBaseConfiguration(); - - this.kconf.setOption(EventProcessingOption.STREAM); - - this.kconf.setProperty("drools.assertBehaviour", "equality"); - - this.kbase = KnowledgeBaseFactory.newKnowledgeBase("D-ENGINE", this.kconf); - - this.ksession = kbase.newStatefulKnowledgeSession(); - } - - private void initDeployRule() throws CorrelationException { - List rules = ruleMgtWrapper.queryRuleByEnable(ENABLE); - - if (rules.isEmpty()) { - return; - } - for (CorrelationRule rule : rules) { - if (rule.getContent() != null) { - deployRuleFromDB(rule.getContent()); - } - } - } - - private void deployRuleFromDB(String ruleContent) throws CorrelationException { - StringReader reader = new StringReader(ruleContent); - Resource res = ResourceFactory.newReaderResource(reader); - - KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder(); - - kbuilder.add(res, ResourceType.DRL); - - try { - - kbase.addKnowledgePackages(kbuilder.getKnowledgePackages()); - } catch (Exception e) { - throw new CorrelationException(e.getMessage(), e); - } - ksession.fireAllRules(); - } - - public synchronized String deployRule(DeployRuleRequest rule, Locale locale) - throws CorrelationException { - StringReader reader = new StringReader(rule.getContent()); - Resource res = ResourceFactory.newReaderResource(reader); - - KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder(); - - kbuilder.add(res, ResourceType.DRL); - - judgeRuleContent(locale, kbuilder, true); - - String packageName = kbuilder.getKnowledgePackages().iterator().next().getName(); - try { - packageNames.add(packageName); - kbase.addKnowledgePackages(kbuilder.getKnowledgePackages()); - } catch (Exception e) { - throw new CorrelationException("Failed to deploy the rule.", e); - } - - ksession.fireAllRules(); - return packageName; - } - - public synchronized void undeployRule(String packageName, Locale locale) - throws CorrelationException { - - KnowledgePackage pkg = kbase.getKnowledgePackage(packageName); - - if (null == pkg) { - throw new CorrelationException("The rule " + packageName + " does not exist!"); - } - - try { - kbase.removeKnowledgePackage(pkg.getName()); - } catch (Exception e) { - throw new CorrelationException("Failed to delete the rule: " + packageName, e); - } - packageNames.remove(pkg.getName()); - } - - public void compileRule(String content, Locale locale) - throws CorrelationException { - StringReader reader = new StringReader(content); - Resource res = ResourceFactory.newReaderResource(reader); - - KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder(); - - kbuilder.add(res, ResourceType.DRL); - - judgeRuleContent(locale, kbuilder, false); - } - - private void judgeRuleContent(Locale locale, KnowledgeBuilder kbuilder, boolean judgePackageName) - throws CorrelationException { - if (kbuilder.hasErrors()) { - String errorMsg = "There are errors in the rule: " + kbuilder.getErrors().toString(); - log.error(errorMsg); - throw new CorrelationException(errorMsg); - } - - String packageName = kbuilder.getKnowledgePackages().iterator().next().getName(); - - if (packageNames.contains(packageName) && judgePackageName) { - throw new CorrelationException("The rule " + packageName + " already exists in the drools engine."); - } - } - - public void putRaisedIntoStream(Alarm raiseAlarm) { - FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm); - if (factHandle != null) { - this.ksession.retract(factHandle); - } - this.ksession.insert(raiseAlarm); - this.ksession.fireAllRules(); - } - - class AlarmMqMessageListener implements MessageListener { - - private Connection connection = null; - private Session session = null; - private Destination destination = null; - private MessageConsumer consumer = null; - - private void initialize() throws JMSException { - connection = connectionFactory.createConnection(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM); - consumer = session.createConsumer(destination); - connection.start(); - } - - public void receive() { - try { - initialize(); - consumer.setMessageListener(this); - } catch (JMSException e) { - log.error("Failed to connect to the MQ service : " + e.getMessage(), e); - try { - close(); - } catch (JMSException e1) { - log.error("Failed close connection " + e1.getMessage(), e1); - } - } - } - - public void onMessage(Message arg0) { - ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0; - try { - Serializable object = objectMessage.getObject(); - - if (object instanceof Alarm) { - Alarm alarm = (Alarm) object; - putRaisedIntoStream(alarm); - } - } catch (JMSException e) { - log.error("Failed get object : " + e.getMessage(), e); - } - } - - private void close() throws JMSException { - if (consumer != null) { - consumer.close(); - } - if (session != null) { - session.close(); - } - if (connection != null) { - connection.close(); - } - } - } -} +/** + * Copyright 2017 ZTE Corporation. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.holmes.engine.manager; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import javax.annotation.PostConstruct; +import javax.inject.Inject; + +import lombok.extern.slf4j.Slf4j; +import org.drools.compiler.kie.builder.impl.InternalKieModule; +import org.drools.core.util.StringUtils; +import org.jvnet.hk2.annotations.Service; + +import org.kie.api.KieServices; +import org.kie.api.builder.*; +import org.kie.api.builder.Message.Level; +import org.kie.api.io.Resource; +import org.kie.api.runtime.KieContainer; +import org.kie.api.runtime.KieSession; +import org.kie.api.runtime.rule.FactHandle; + +import org.onap.holmes.common.api.entity.AlarmInfo; + +import org.onap.holmes.common.api.stat.VesAlarm; +import org.onap.holmes.common.dmaap.DmaapService; +import org.onap.holmes.common.exception.AlarmInfoException; +import org.onap.holmes.common.utils.DbDaoUtil; +import org.onap.holmes.engine.db.AlarmInfoDao; +import org.onap.holmes.engine.request.DeployRuleRequest; +import org.onap.holmes.common.api.entity.CorrelationRule; +import org.onap.holmes.common.exception.CorrelationException; +import org.onap.holmes.common.utils.ExceptionUtil; +import org.onap.holmes.engine.wrapper.RuleMgtWrapper; + +@Slf4j +@Service +public class DroolsEngine { + + @Inject + private RuleMgtWrapper ruleMgtWrapper; + @Inject + private DbDaoUtil daoUtil; + + private final static int ENABLE = 1; + private AlarmInfoDao alarmInfoDao; + private final Map deployed = new ConcurrentHashMap<>(); + private KieServices ks = KieServices.Factory.get(); + private ReleaseId releaseId = ks.newReleaseId("org.onap.holmes", "rules", "1.0.0-SNAPSHOT"); + private ReleaseId compilationRelease = ks.newReleaseId("org.onap.holmes", "compilation", "1.0.0-SNAPSHOT"); + private KieContainer container; + private KieSession session; + + @PostConstruct + private void init() { + alarmInfoDao = daoUtil.getJdbiDaoByOnDemand(AlarmInfoDao.class); + try { + log.info("Drools engine initializing..."); + initEngine(); + log.info("Drools engine initialized."); + + log.info("Start deploy existing rules..."); + initRules(); + log.info("All rules were deployed."); + + log.info("Synchronizing alarms..."); + syncAlarms(); + log.info("Alarm synchronization succeeded."); + } catch (Exception e) { + log.error("Failed to startup the engine of Holmes: " + e.getMessage(), e); + throw ExceptionUtil.buildExceptionResponse("Failed to startup Drools!"); + } + } + + public void stop() { + session.dispose(); + } + + public void initEngine() { + KieModule km = null; + try { + String drl = "package holmes;"; + deployed.put(getPackageName(drl), drl); + km = createAndDeployJar(ks, releaseId, new ArrayList<>(deployed.values())); + } catch (Exception e) { + log.error("Failed to initialize the engine service module.", e); + } + container = ks.newKieContainer(km.getReleaseId()); + session = container.newKieSession(); + deployed.clear(); + } + + private void initRules() throws CorrelationException { + List rules = ruleMgtWrapper.queryRuleByEnable(ENABLE); + if (rules.isEmpty()) { + return; + } + + for (CorrelationRule rule : rules) { + if (!StringUtils.isEmpty(rule.getContent())) { + deployRule(rule.getContent()); + DmaapService.loopControlNames.put(rule.getPackageName(), rule.getClosedControlLoopName()); + } + } + + session.fireAllRules(); + } + + public void syncAlarms() throws AlarmInfoException { + alarmInfoDao.queryAllAlarm().forEach(alarmInfo -> alarmInfoDao.deleteClearedAlarm(alarmInfo)); + alarmInfoDao.queryAllAlarm().forEach(alarmInfo -> putRaisedIntoStream(convertAlarmInfo2VesAlarm(alarmInfo))); + } + + public String deployRule(DeployRuleRequest rule) throws CorrelationException { + return deployRule(rule.getContent()); + } + + private synchronized String deployRule(String rule) throws CorrelationException { + final String packageName = getPackageName(rule); + + if (StringUtils.isEmpty(packageName)) { + throw new CorrelationException("The package name can not be empty."); + } + + if (deployed.containsKey(packageName)) { + throw new CorrelationException("A rule with the same package name already exists in the system."); + } + + if (!StringUtils.isEmpty(rule)) { + deployed.put(packageName, rule); + try { + refreshInMemRules(); + } catch (CorrelationException e) { + deployed.remove(packageName); + throw e; + } + session.fireAllRules(); + } + + return packageName; + } + + public synchronized void undeployRule(String packageName) throws CorrelationException { + + if (StringUtils.isEmpty(packageName)) { + throw new CorrelationException("The package name should not be null."); + } + + if (!deployed.containsKey(packageName)) { + throw new CorrelationException("The rule " + packageName + " does not exist!"); + } + + String removed = deployed.remove(packageName); + try { + refreshInMemRules(); + } catch (Exception e) { + deployed.put(packageName, removed); + throw new CorrelationException("Failed to delete the rule: " + packageName, e); + } + } + + private void refreshInMemRules() throws CorrelationException { + KieModule km = createAndDeployJar(ks, releaseId, new ArrayList<>(deployed.values())); + container.updateToVersion(km.getReleaseId()); + } + + public void compileRule(String content) + throws CorrelationException { + + KieFileSystem kfs = ks.newKieFileSystem().generateAndWritePomXML(compilationRelease); + kfs.write("src/main/resources/rules/rule.drl", content); + KieBuilder builder = ks.newKieBuilder(kfs).buildAll(); + if (builder.getResults().hasMessages(Message.Level.ERROR)) { + String errorMsg = "There are errors in the rule: " + builder.getResults() + .getMessages(Level.ERROR).toString(); + log.info("Compilation failure: " + errorMsg); + throw new CorrelationException(errorMsg); + } + + if (deployed.containsKey(getPackageName(content))) { + throw new CorrelationException("There's no compilation error. But a rule with the same package name already " + + "exists in the engine, which may cause a deployment failure."); + } + + ks.getRepository().removeKieModule(compilationRelease); + } + + public void putRaisedIntoStream(VesAlarm alarm) { + FactHandle factHandle = this.session.getFactHandle(alarm); + if (factHandle != null) { + Object obj = this.session.getObject(factHandle); + if (obj != null && obj instanceof VesAlarm) { + alarm.setRootFlag(((VesAlarm) obj).getRootFlag()); + } + this.session.delete(factHandle); + + if (alarm.getAlarmIsCleared() == 1) { + alarmInfoDao.deleteClearedAlarm(convertVesAlarm2AlarmInfo(alarm)); + } + } else { + this.session.insert(alarm); + } + + this.session.fireAllRules(); + } + + public List queryPackagesFromEngine() { + return container.getKieBase().getKiePackages().stream() + .filter(pkg -> pkg.getRules().size() != 0) + .map(pkg -> pkg.getName()) + .collect(Collectors.toList()); + } + + + + private VesAlarm convertAlarmInfo2VesAlarm(AlarmInfo alarmInfo) { + VesAlarm vesAlarm = new VesAlarm(); + vesAlarm.setEventId(alarmInfo.getEventId()); + vesAlarm.setEventName(alarmInfo.getEventName()); + vesAlarm.setStartEpochMicrosec(alarmInfo.getStartEpochMicroSec()); + vesAlarm.setSourceId(alarmInfo.getSourceId()); + vesAlarm.setSourceName(alarmInfo.getSourceName()); + vesAlarm.setRootFlag(alarmInfo.getRootFlag()); + vesAlarm.setAlarmIsCleared(alarmInfo.getAlarmIsCleared()); + vesAlarm.setLastEpochMicrosec(alarmInfo.getLastEpochMicroSec()); + return vesAlarm; + } + + private AlarmInfo convertVesAlarm2AlarmInfo(VesAlarm vesAlarm) { + AlarmInfo alarmInfo = new AlarmInfo(); + alarmInfo.setEventId(vesAlarm.getEventId()); + alarmInfo.setEventName(vesAlarm.getEventName()); + alarmInfo.setStartEpochMicroSec(vesAlarm.getStartEpochMicrosec()); + alarmInfo.setLastEpochMicroSec(vesAlarm.getLastEpochMicrosec()); + alarmInfo.setSourceId(vesAlarm.getSourceId()); + alarmInfo.setSourceName(vesAlarm.getSourceName()); + alarmInfo.setAlarmIsCleared(vesAlarm.getAlarmIsCleared()); + alarmInfo.setRootFlag(vesAlarm.getRootFlag()); + + return alarmInfo; + } + + private String getPackageName(String contents) { + String ret = contents.trim(); + StringBuilder stringBuilder = new StringBuilder(); + if (ret.startsWith("package")) { + ret = ret.substring(7).trim(); + for (int i = 0; i < ret.length(); i++) { + char tmp = ret.charAt(i); + if (tmp == ';' || tmp == ' ' || tmp == '\n') { + break; + } + stringBuilder.append(tmp); + } + } + return stringBuilder.toString(); + } + + private KieModule createAndDeployJar(KieServices ks, ReleaseId releaseId, List drls) throws CorrelationException { + byte[] jar = createJar(ks, releaseId, drls); + KieModule km = deployJarIntoRepository(ks, jar); + return km; + } + + private byte[] createJar(KieServices ks, ReleaseId releaseId, List drls) throws CorrelationException { + KieFileSystem kfs = ks.newKieFileSystem().generateAndWritePomXML(releaseId); + int i = 0; + for (String drl : drls) { + if (!StringUtils.isEmpty(drl)) { + kfs.write("src/main/resources/" + getPackageName(drl) + ".drl", drl); + } + } + KieBuilder kb = ks.newKieBuilder(kfs).buildAll(); + if (kb.getResults().hasMessages(Message.Level.ERROR)) { + StringBuilder sb = new StringBuilder(); + for (Message msg : kb.getResults().getMessages()) { + sb.append(String.format("[%s]Line: %d, Col: %d\t%s\n", msg.getLevel().toString(), msg.getLine(), + msg.getColumn(), msg.getText())); + } + throw new CorrelationException("Failed to compile JAR. Details: \n" + sb.toString()); + } + + InternalKieModule kieModule = (InternalKieModule) ks.getRepository() + .getKieModule(releaseId); + + return kieModule.getBytes(); + } + + private KieModule deployJarIntoRepository(KieServices ks, byte[] jar) { + Resource jarRes = ks.getResources().newByteArrayResource(jar); + return ks.getRepository().addKieModule(jarRes); + } + +}