* limitations under the License.\r
*/\r
package org.onap.holmes.engine.manager;\r
-\r
-\r
-import java.io.Serializable;\r
import java.io.StringReader;\r
+import java.util.ArrayList;\r
import java.util.HashSet;\r
import java.util.List;\r
import java.util.Locale;\r
import java.util.Set;\r
import javax.annotation.PostConstruct;\r
import javax.inject.Inject;\r
-import javax.jms.Connection;\r
-import javax.jms.ConnectionFactory;\r
-import javax.jms.Destination;\r
-import javax.jms.JMSException;\r
-import javax.jms.Message;\r
-import javax.jms.MessageConsumer;\r
-import javax.jms.MessageListener;\r
-import javax.jms.Session;\r
import lombok.extern.slf4j.Slf4j;\r
-import org.apache.activemq.ActiveMQConnectionFactory;\r
-import org.apache.activemq.command.ActiveMQObjectMessage;\r
-import org.drools.KnowledgeBase;\r
-import org.drools.KnowledgeBaseConfiguration;\r
-import org.drools.KnowledgeBaseFactory;\r
-import org.drools.builder.KnowledgeBuilder;\r
-import org.drools.builder.KnowledgeBuilderFactory;\r
-import org.drools.builder.ResourceType;\r
-import org.drools.conf.EventProcessingOption;\r
-import org.drools.definition.KnowledgePackage;\r
-import org.drools.io.Resource;\r
-import org.drools.io.ResourceFactory;\r
-import org.drools.runtime.StatefulKnowledgeSession;\r
-import org.drools.runtime.rule.FactHandle;\r
-import org.glassfish.hk2.api.IterableProvider;\r
+import org.drools.compiler.kie.builder.impl.InternalKieModule;\r
import org.jvnet.hk2.annotations.Service;\r
+\r
+import org.kie.api.KieBase;\r
+import org.kie.api.KieServices;\r
+import org.kie.api.builder.KieBuilder;\r
+import org.kie.api.builder.KieFileSystem;\r
+import org.kie.api.builder.KieRepository;\r
+import org.kie.api.builder.Message;\r
+import org.kie.api.builder.Message.Level;\r
+import org.kie.api.builder.model.KieBaseModel;\r
+import org.kie.api.builder.model.KieModuleModel;\r
+import org.kie.api.builder.model.KieSessionModel;\r
+import org.kie.api.conf.EqualityBehaviorOption;\r
+import org.kie.api.conf.EventProcessingOption;\r
+import org.kie.api.definition.KiePackage;\r
+import org.kie.api.io.KieResources;\r
+import org.kie.api.io.ResourceType;\r
+import org.kie.api.runtime.KieContainer;\r
+import org.kie.api.runtime.KieSession;\r
+import org.kie.api.runtime.conf.ClockTypeOption;\r
+import org.kie.api.runtime.rule.FactHandle;\r
+\r
+import org.onap.holmes.common.api.entity.AlarmInfo;\r
+\r
+import org.onap.holmes.common.api.stat.VesAlarm;\r
+import org.onap.holmes.common.dmaap.DmaapService;\r
+import org.onap.holmes.common.exception.AlarmInfoException;\r
+import org.onap.holmes.common.utils.DbDaoUtil;\r
+import org.onap.holmes.engine.db.AlarmInfoDao;\r
import org.onap.holmes.engine.request.DeployRuleRequest;\r
import org.onap.holmes.common.api.entity.CorrelationRule;\r
-import org.onap.holmes.common.api.stat.Alarm;\r
-import org.onap.holmes.common.config.MQConfig;\r
-import org.onap.holmes.common.constant.AlarmConst;\r
import org.onap.holmes.common.exception.CorrelationException;\r
import org.onap.holmes.common.utils.ExceptionUtil;\r
import org.onap.holmes.engine.wrapper.RuleMgtWrapper;\r
private final Set<String> packageNames = new HashSet<String>();\r
@Inject\r
private RuleMgtWrapper ruleMgtWrapper;\r
- private KnowledgeBase kbase;\r
- private KnowledgeBaseConfiguration kconf;\r
- private StatefulKnowledgeSession ksession;\r
+\r
+\r
+ private KieBase kieBase;\r
+ private KieSession kieSession;\r
+ private KieContainer kieContainer;\r
+ private KieFileSystem kfs;\r
+ private KieServices ks;\r
+ private KieBuilder kieBuilder;\r
+ private KieResources resources;\r
+ private KieRepository kieRepository;\r
+\r
+ private AlarmInfoDao alarmInfoDao;\r
@Inject\r
- private IterableProvider<MQConfig> mqConfigProvider;\r
- private ConnectionFactory connectionFactory;\r
+ private DbDaoUtil daoUtil;\r
+\r
\r
@PostConstruct\r
private void init() {\r
+ alarmInfoDao = daoUtil.getJdbiDaoByOnDemand(AlarmInfoDao.class);\r
try {\r
- // 1. start engine\r
+ // start engine\r
start();\r
- // 2. start mq listener\r
- registerAlarmTopicListener();\r
} catch (Exception e) {\r
log.error("Failed to start the service: " + e.getMessage(), e);\r
throw ExceptionUtil.buildExceptionResponse("Failed to start the drools engine!");\r
}\r
}\r
\r
- private void registerAlarmTopicListener() {\r
- String brokerURL =\r
- "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;\r
- connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,\r
- mqConfigProvider.get().brokerPassword, brokerURL);\r
-\r
- AlarmMqMessageListener listener = new AlarmMqMessageListener();\r
- listener.receive();\r
- }\r
-\r
-\r
- private void start() throws CorrelationException {\r
+ private void start() throws AlarmInfoException {\r
log.info("Drools Engine Initialize Beginning...");\r
\r
initEngineParameter();\r
- initDeployRule();\r
+ alarmSynchronization();\r
+// initDeployRule();\r
\r
- log.info("Business Rule Engine Initialize Successfully.");\r
+ log.info("Alarm synchronization Successfully.");\r
}\r
\r
public void stop() {\r
- this.ksession.dispose();\r
+ this.kieSession.dispose();\r
}\r
\r
- private void initEngineParameter() {\r
- this.kconf = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();\r
-\r
- this.kconf.setOption(EventProcessingOption.STREAM);\r
+ public void initEngineParameter() {\r
+ this.ks = KieServices.Factory.get();\r
+ this.resources = ks.getResources();\r
+ this.kieRepository = ks.getRepository();\r
+ this.kfs = createKieFileSystemWithKProject(ks);\r
\r
- this.kconf.setProperty("drools.assertBehaviour", "equality");\r
+ this.kieBuilder = ks.newKieBuilder(kfs).buildAll();\r
+ this.kieContainer = ks.newKieContainer(kieRepository.getDefaultReleaseId());\r
\r
- this.kbase = KnowledgeBaseFactory.newKnowledgeBase("D-ENGINE", this.kconf);\r
-\r
- this.ksession = kbase.newStatefulKnowledgeSession();\r
+ this.kieBase = kieContainer.getKieBase();\r
+ this.kieSession = kieContainer.newKieSession();\r
}\r
\r
private void initDeployRule() throws CorrelationException {\r
for (CorrelationRule rule : rules) {\r
if (rule.getContent() != null) {\r
deployRuleFromDB(rule.getContent());\r
+ DmaapService.loopControlNames.put(rule.getPackageName(), rule.getClosedControlLoopName());\r
}\r
}\r
}\r
\r
private void deployRuleFromDB(String ruleContent) throws CorrelationException {\r
+ avoidDeployBug();\r
StringReader reader = new StringReader(ruleContent);\r
- Resource res = ResourceFactory.newReaderResource(reader);\r
-\r
- KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();\r
-\r
- kbuilder.add(res, ResourceType.DRL);\r
-\r
+ kfs.write("src/main/resources/rules/rule.drl",\r
+ this.resources.newReaderResource(reader,"UTF-8").setResourceType(ResourceType.DRL));\r
+ kieBuilder = ks.newKieBuilder(kfs).buildAll();\r
try {\r
-\r
- kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());\r
+ InternalKieModule internalKieModule = (InternalKieModule)kieBuilder.getKieModule();\r
+ kieContainer.updateToVersion(internalKieModule.getReleaseId());\r
} catch (Exception e) {\r
throw new CorrelationException(e.getMessage(), e);\r
}\r
- ksession.fireAllRules();\r
+ kieSession.fireAllRules();\r
}\r
\r
public synchronized String deployRule(DeployRuleRequest rule, Locale locale)\r
throws CorrelationException {\r
+ avoidDeployBug();\r
StringReader reader = new StringReader(rule.getContent());\r
- Resource res = ResourceFactory.newReaderResource(reader);\r
-\r
- KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();\r
+ kfs.write("src/main/resources/rules/rule.drl",\r
+ this.resources.newReaderResource(reader,"UTF-8").setResourceType(ResourceType.DRL));\r
+ kieBuilder = ks.newKieBuilder(kfs).buildAll();\r
\r
- kbuilder.add(res, ResourceType.DRL);\r
+ judgeRuleContent(locale, kieBuilder, true);\r
\r
- judgeRuleContent(locale, kbuilder, true);\r
-\r
- String packageName = kbuilder.getKnowledgePackages().iterator().next().getName();\r
+ InternalKieModule internalKieModule = (InternalKieModule)kieBuilder.getKieModule();;\r
+ String packageName = internalKieModule.getKnowledgePackagesForKieBase("KBase").iterator().next().getName();\r
try {\r
- packageNames.add(packageName);\r
- kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());\r
+ kieContainer.updateToVersion(internalKieModule.getReleaseId());\r
} catch (Exception e) {\r
throw new CorrelationException("Failed to deploy the rule.", e);\r
}\r
-\r
- ksession.fireAllRules();\r
+ packageNames.add(packageName);\r
+ kieSession.fireAllRules();\r
return packageName;\r
}\r
\r
public synchronized void undeployRule(String packageName, Locale locale)\r
throws CorrelationException {\r
-\r
- KnowledgePackage pkg = kbase.getKnowledgePackage(packageName);\r
-\r
- if (null == pkg) {\r
+ KiePackage kiePackage = kieBase.getKiePackage(packageName);\r
+ if (null == kiePackage) {\r
throw new CorrelationException("The rule " + packageName + " does not exist!");\r
}\r
-\r
try {\r
- kbase.removeKnowledgePackage(pkg.getName());\r
+ kieBase.removeKiePackage(kiePackage.getName());\r
} catch (Exception e) {\r
throw new CorrelationException("Failed to delete the rule: " + packageName, e);\r
}\r
+ packageNames.remove(kiePackage.getName());\r
}\r
\r
public void compileRule(String content, Locale locale)\r
throws CorrelationException {\r
StringReader reader = new StringReader(content);\r
- Resource res = ResourceFactory.newReaderResource(reader);\r
\r
- KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();\r
+ kfs.write("src/main/resources/rules/rule.drl",\r
+ this.resources.newReaderResource(reader,"UTF-8").setResourceType(ResourceType.DRL));\r
\r
- kbuilder.add(res, ResourceType.DRL);\r
+ kieBuilder = ks.newKieBuilder(kfs).buildAll();\r
\r
- judgeRuleContent(locale, kbuilder, false);\r
+ judgeRuleContent(locale, kieBuilder, false);\r
}\r
\r
- private void judgeRuleContent(Locale locale, KnowledgeBuilder kbuilder, boolean judgePackageName)\r
+ private void judgeRuleContent(Locale locale, KieBuilder kbuilder, boolean judgePackageName)\r
throws CorrelationException {\r
- if (kbuilder.hasErrors()) {\r
- String errorMsg = "There are errors in the rule: " + kbuilder.getErrors().toString();\r
+ if (kbuilder.getResults().hasMessages(Message.Level.ERROR)) {\r
+ String errorMsg = "There are errors in the rule: " + kbuilder.getResults()\r
+ .getMessages(Level.ERROR).toString();\r
log.error(errorMsg);\r
throw new CorrelationException(errorMsg);\r
}\r
+ InternalKieModule internalKieModule = null;\r
+ try {\r
+ internalKieModule = (InternalKieModule) kbuilder.getKieModule();\r
+ } catch (Exception e) {\r
+ throw new CorrelationException("There are errors in the rule!" + e.getMessage(), e);\r
+ }\r
+ if (internalKieModule == null) {\r
+ throw new CorrelationException("There are errors in the rule!");\r
+ }\r
+ String packageName = internalKieModule.getKnowledgePackagesForKieBase("KBase").iterator().next().getName();\r
\r
- String packageName = kbuilder.getKnowledgePackages().iterator().next().getName();\r
-\r
- if (packageNames.contains(packageName) && judgePackageName) {\r
+ if (queryAllPackage().contains(packageName) && judgePackageName) {\r
throw new CorrelationException("The rule " + packageName + " already exists in the drools engine.");\r
}\r
}\r
\r
- public void putRaisedIntoStream(Alarm raiseAlarm) {\r
- FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm);\r
+ public void putRaisedIntoStream(VesAlarm raiseAlarm) {\r
+ FactHandle factHandle = this.kieSession.getFactHandle(raiseAlarm);\r
if (factHandle != null) {\r
- this.ksession.retract(factHandle);\r
+ Object obj = this.kieSession.getObject(factHandle);\r
+ if (obj != null && obj instanceof VesAlarm) {\r
+ raiseAlarm.setRootFlag(((VesAlarm) obj).getRootFlag());\r
+ }\r
+ this.kieSession.delete(factHandle);\r
}\r
- this.ksession.insert(raiseAlarm);\r
- this.ksession.fireAllRules();\r
- }\r
-\r
- class AlarmMqMessageListener implements MessageListener {\r
+ this.kieSession.insert(raiseAlarm);\r
+ this.kieSession.fireAllRules();\r
\r
- private Connection connection = null;\r
- private Session session = null;\r
- private Destination destination = null;\r
- private MessageConsumer consumer = null;\r
+ }\r
\r
- private void initialize() throws JMSException {\r
- connection = connectionFactory.createConnection();\r
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);\r
- destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);\r
- consumer = session.createConsumer(destination);\r
- connection.start();\r
+ public List<String> queryAllPackage() {\r
+ List<KiePackage> kiePackages = (List<KiePackage>)kieBase.getKiePackages();\r
+ List<String> list = new ArrayList<>();\r
+ for(KiePackage kiePackage : kiePackages) {\r
+ list.add(kiePackage.getName());\r
}\r
+ return list;\r
+ }\r
\r
- public void receive() {\r
- try {\r
- initialize();\r
- consumer.setMessageListener(this);\r
- } catch (JMSException e) {\r
- log.error("Failed to connect to the MQ service : " + e.getMessage(), e);\r
- try {\r
- close();\r
- } catch (JMSException e1) {\r
- log.error("Failed close connection " + e1.getMessage(), e1);\r
- }\r
- }\r
- }\r
+ private KieFileSystem createKieFileSystemWithKProject(KieServices ks) {\r
+ KieModuleModel kieModuleModel = ks.newKieModuleModel();\r
+ KieBaseModel kieBaseModel = kieModuleModel.newKieBaseModel("KBase")\r
+ .addPackage("rules")\r
+ .setDefault(true)\r
+ .setEqualsBehavior(EqualityBehaviorOption.EQUALITY)\r
+ .setEventProcessingMode(EventProcessingOption.STREAM);\r
+ KieSessionModel kieSessionModel = kieBaseModel.newKieSessionModel("KSession")\r
+ .setDefault( true )\r
+ .setType( KieSessionModel.KieSessionType.STATEFUL )\r
+ .setClockType( ClockTypeOption.get("realtime") );\r
+ KieFileSystem kfs = ks.newKieFileSystem();\r
+ kfs.writeKModuleXML(kieModuleModel.toXML());\r
+ return kfs;\r
+ }\r
\r
- public void onMessage(Message arg0) {\r
- ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;\r
- try {\r
- Serializable object = objectMessage.getObject();\r
-\r
- if (object instanceof Alarm) {\r
- Alarm alarm = (Alarm) object;\r
- putRaisedIntoStream(alarm);\r
- }\r
- } catch (JMSException e) {\r
- log.error("Failed get object : " + e.getMessage(), e);\r
- }\r
- }\r
+ private void avoidDeployBug() {\r
+ String tmp = Math.random() + "";\r
+ String rule = "package justInOrderToAvoidDeployBug" + tmp.substring(2);\r
+ kfs.write("src/main/resources/rules/rule.drl", rule);\r
+ kieBuilder = ks.newKieBuilder(kfs).buildAll();\r
+ InternalKieModule internalKieModule = (InternalKieModule)kieBuilder.getKieModule();\r
+ String packageName = internalKieModule.getKnowledgePackagesForKieBase("KBase").iterator().next().getName();\r
+ kieRepository.addKieModule(internalKieModule);\r
+ kieContainer.updateToVersion(internalKieModule.getReleaseId());\r
+\r
+ KiePackage kiePackage = kieBase.getKiePackage(packageName);\r
+ kieBase.removeKiePackage(kiePackage.getName());\r
+ }\r
\r
- private void close() throws JMSException {\r
- if (consumer != null) {\r
- consumer.close();\r
- }\r
- if (session != null) {\r
- session.close();\r
- }\r
- if (connection != null) {\r
- connection.close();\r
- }\r
- }\r
+ public void alarmSynchronization() throws AlarmInfoException {\r
+ alarmInfoDao.queryAllAlarm().forEach(alarmInfo -> alarmInfoDao.deleteClearedAlarm(alarmInfo));\r
+ alarmInfoDao.queryAllAlarm().forEach(alarmInfo -> putRaisedIntoStream(convertAlarmInfo2VesAlarm(alarmInfo)));\r
}\r
+\r
+ private VesAlarm convertAlarmInfo2VesAlarm(AlarmInfo alarmInfo) {\r
+ VesAlarm vesAlarm = new VesAlarm();\r
+ vesAlarm.setEventId(alarmInfo.getEventId());\r
+ vesAlarm.setEventName(alarmInfo.getEventName());\r
+ vesAlarm.setStartEpochMicrosec(alarmInfo.getStartEpochMicroSec());\r
+ vesAlarm.setSourceId(alarmInfo.getSourceId());\r
+ vesAlarm.setSourceName(alarmInfo.getSourceName());\r
+ vesAlarm.setRootFlag(alarmInfo.getRootFlag());\r
+ vesAlarm.setAlarmIsCleared(alarmInfo.getAlarmIsCleared());\r
+ vesAlarm.setLastEpochMicrosec(alarmInfo.getLastEpochMicroSec());\r
+ return vesAlarm;\r
+ }\r
+\r
}\r