package org.onap.holmes.engine.manager;\r
\r
\r
-import java.io.Serializable;\r
import java.io.StringReader;\r
import java.util.HashSet;\r
import java.util.List;\r
import java.util.Set;\r
import javax.annotation.PostConstruct;\r
import javax.inject.Inject;\r
-import javax.jms.Connection;\r
-import javax.jms.ConnectionFactory;\r
-import javax.jms.Destination;\r
-import javax.jms.JMSException;\r
-import javax.jms.Message;\r
-import javax.jms.MessageConsumer;\r
-import javax.jms.MessageListener;\r
-import javax.jms.Session;\r
import lombok.extern.slf4j.Slf4j;\r
-import org.apache.activemq.ActiveMQConnectionFactory;\r
-import org.apache.activemq.command.ActiveMQObjectMessage;\r
import org.drools.KnowledgeBase;\r
import org.drools.KnowledgeBaseConfiguration;\r
import org.drools.KnowledgeBaseFactory;\r
import org.drools.io.ResourceFactory;\r
import org.drools.runtime.StatefulKnowledgeSession;\r
import org.drools.runtime.rule.FactHandle;\r
-import org.glassfish.hk2.api.IterableProvider;\r
import org.jvnet.hk2.annotations.Service;\r
+import org.onap.holmes.common.api.stat.VesAlarm;\r
import org.onap.holmes.engine.request.DeployRuleRequest;\r
import org.onap.holmes.common.api.entity.CorrelationRule;\r
-import org.onap.holmes.common.api.stat.Alarm;\r
-import org.onap.holmes.common.config.MQConfig;\r
-import org.onap.holmes.common.constant.AlarmConst;\r
import org.onap.holmes.common.exception.CorrelationException;\r
import org.onap.holmes.common.utils.ExceptionUtil;\r
import org.onap.holmes.engine.wrapper.RuleMgtWrapper;\r
private KnowledgeBase kbase;\r
private KnowledgeBaseConfiguration kconf;\r
private StatefulKnowledgeSession ksession;\r
- @Inject\r
- private IterableProvider<MQConfig> mqConfigProvider;\r
- private ConnectionFactory connectionFactory;\r
\r
@PostConstruct\r
private void init() {\r
try {\r
- // 1. start engine\r
+ // start engine\r
start();\r
- // 2. start mq listener\r
- registerAlarmTopicListener();\r
} catch (Exception e) {\r
log.error("Failed to start the service: " + e.getMessage(), e);\r
throw ExceptionUtil.buildExceptionResponse("Failed to start the drools engine!");\r
}\r
}\r
\r
- private void registerAlarmTopicListener() {\r
- String brokerURL =\r
- "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;\r
- connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,\r
- mqConfigProvider.get().brokerPassword, brokerURL);\r
-\r
- AlarmMqMessageListener listener = new AlarmMqMessageListener();\r
- listener.receive();\r
- }\r
-\r
-\r
private void start() throws CorrelationException {\r
log.info("Drools Engine Initialize Beginning...");\r
\r
}\r
}\r
\r
- public void putRaisedIntoStream(Alarm raiseAlarm) {\r
+ public void putRaisedIntoStream(VesAlarm raiseAlarm) {\r
FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm);\r
if (factHandle != null) {\r
this.ksession.retract(factHandle);\r
this.ksession.fireAllRules();\r
}\r
\r
- class AlarmMqMessageListener implements MessageListener {\r
-\r
- private Connection connection = null;\r
- private Session session = null;\r
- private Destination destination = null;\r
- private MessageConsumer consumer = null;\r
-\r
- private void initialize() throws JMSException {\r
- connection = connectionFactory.createConnection();\r
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);\r
- destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);\r
- consumer = session.createConsumer(destination);\r
- connection.start();\r
- }\r
-\r
- public void receive() {\r
- try {\r
- initialize();\r
- consumer.setMessageListener(this);\r
- } catch (JMSException e) {\r
- log.error("Failed to connect to the MQ service : " + e.getMessage(), e);\r
- try {\r
- close();\r
- } catch (JMSException e1) {\r
- log.error("Failed close connection " + e1.getMessage(), e1);\r
- }\r
- }\r
- }\r
-\r
- public void onMessage(Message arg0) {\r
- ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;\r
- try {\r
- Serializable object = objectMessage.getObject();\r
-\r
- if (object instanceof Alarm) {\r
- Alarm alarm = (Alarm) object;\r
- putRaisedIntoStream(alarm);\r
- }\r
- } catch (JMSException e) {\r
- log.error("Failed get object : " + e.getMessage(), e);\r
- }\r
- }\r
-\r
- private void close() throws JMSException {\r
- if (consumer != null) {\r
- consumer.close();\r
- }\r
- if (session != null) {\r
- session.close();\r
- }\r
- if (connection != null) {\r
- connection.close();\r
- }\r
- }\r
- }\r
}\r