package org.onap.holmes.engine.manager;\r
\r
\r
-import java.io.Serializable;\r
import java.io.StringReader;\r
import java.util.HashSet;\r
import java.util.List;\r
import java.util.Set;\r
import javax.annotation.PostConstruct;\r
import javax.inject.Inject;\r
-import javax.jms.Connection;\r
-import javax.jms.ConnectionFactory;\r
-import javax.jms.Destination;\r
-import javax.jms.JMSException;\r
-import javax.jms.Message;\r
-import javax.jms.MessageConsumer;\r
-import javax.jms.MessageListener;\r
-import javax.jms.Session;\r
import lombok.extern.slf4j.Slf4j;\r
-import org.apache.activemq.ActiveMQConnectionFactory;\r
-import org.apache.activemq.command.ActiveMQObjectMessage;\r
import org.drools.KnowledgeBase;\r
import org.drools.KnowledgeBaseConfiguration;\r
import org.drools.KnowledgeBaseFactory;\r
import org.drools.io.ResourceFactory;\r
import org.drools.runtime.StatefulKnowledgeSession;\r
import org.drools.runtime.rule.FactHandle;\r
-import org.glassfish.hk2.api.IterableProvider;\r
import org.jvnet.hk2.annotations.Service;\r
+import org.onap.holmes.common.api.stat.VesAlarm;\r
import org.onap.holmes.engine.request.DeployRuleRequest;\r
import org.onap.holmes.common.api.entity.CorrelationRule;\r
-import org.onap.holmes.common.api.stat.Alarm;\r
-import org.onap.holmes.common.config.MQConfig;\r
-import org.onap.holmes.common.constant.AlarmConst;\r
import org.onap.holmes.common.exception.CorrelationException;\r
import org.onap.holmes.common.utils.ExceptionUtil;\r
import org.onap.holmes.engine.wrapper.RuleMgtWrapper;\r
private KnowledgeBase kbase;\r
private KnowledgeBaseConfiguration kconf;\r
private StatefulKnowledgeSession ksession;\r
- @Inject\r
- private IterableProvider<MQConfig> mqConfigProvider;\r
- private ConnectionFactory connectionFactory;\r
\r
@PostConstruct\r
private void init() {\r
try {\r
- // 1. start engine\r
+ // start engine\r
start();\r
- // 2. start mq listener\r
- registerAlarmTopicListener();\r
} catch (Exception e) {\r
log.error("Failed to start the service: " + e.getMessage(), e);\r
throw ExceptionUtil.buildExceptionResponse("Failed to start the drools engine!");\r
}\r
}\r
\r
- private void registerAlarmTopicListener() {\r
- String brokerURL =\r
- "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;\r
- connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,\r
- mqConfigProvider.get().brokerPassword, brokerURL);\r
-\r
- AlarmMqMessageListener listener = new AlarmMqMessageListener();\r
- listener.receive();\r
- }\r
-\r
-\r
private void start() throws CorrelationException {\r
log.info("Drools Engine Initialize Beginning...");\r
\r
}\r
}\r
\r
- public void putRaisedIntoStream(Alarm raiseAlarm) {\r
+ public void putRaisedIntoStream(VesAlarm raiseAlarm) {\r
FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm);\r
if (factHandle != null) {\r
this.ksession.retract(factHandle);\r
this.ksession.fireAllRules();\r
}\r
\r
- class AlarmMqMessageListener implements MessageListener {\r
-\r
- private Connection connection = null;\r
- private Session session = null;\r
- private Destination destination = null;\r
- private MessageConsumer consumer = null;\r
-\r
- private void initialize() throws JMSException {\r
- connection = connectionFactory.createConnection();\r
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);\r
- destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);\r
- consumer = session.createConsumer(destination);\r
- connection.start();\r
- }\r
-\r
- public void receive() {\r
- try {\r
- initialize();\r
- consumer.setMessageListener(this);\r
- } catch (JMSException e) {\r
- log.error("Failed to connect to the MQ service : " + e.getMessage(), e);\r
- try {\r
- close();\r
- } catch (JMSException e1) {\r
- log.error("Failed close connection " + e1.getMessage(), e1);\r
- }\r
- }\r
- }\r
-\r
- public void onMessage(Message arg0) {\r
- ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;\r
- try {\r
- Serializable object = objectMessage.getObject();\r
-\r
- if (object instanceof Alarm) {\r
- Alarm alarm = (Alarm) object;\r
- putRaisedIntoStream(alarm);\r
- }\r
- } catch (JMSException e) {\r
- log.error("Failed get object : " + e.getMessage(), e);\r
- }\r
- }\r
-\r
- private void close() throws JMSException {\r
- if (consumer != null) {\r
- consumer.close();\r
- }\r
- if (session != null) {\r
- session.close();\r
- }\r
- if (connection != null) {\r
- connection.close();\r
- }\r
- }\r
- }\r
}\r
\r
package org.onap.holmes.engine.manager;\r
\r
-import static org.easymock.EasyMock.anyBoolean;\r
import static org.easymock.EasyMock.anyInt;\r
-import static org.easymock.EasyMock.anyObject;\r
import static org.easymock.EasyMock.expect;\r
\r
import java.lang.reflect.Method;\r
import java.util.ArrayList;\r
import java.util.List;\r
import java.util.Locale;\r
-import javax.jms.Connection;\r
-import javax.jms.ConnectionFactory;\r
-import javax.jms.Destination;\r
-import javax.jms.JMSException;\r
-import javax.jms.MessageConsumer;\r
-import javax.jms.Session;\r
-import javax.jms.Topic;\r
-import org.apache.activemq.command.ActiveMQObjectMessage;\r
import org.drools.KnowledgeBase;\r
import org.drools.KnowledgeBaseConfiguration;\r
import org.drools.KnowledgeBaseFactory;\r
import org.drools.conf.EventProcessingOption;\r
import org.drools.runtime.StatefulKnowledgeSession;\r
-import org.easymock.EasyMock;\r
-import org.glassfish.hk2.api.IterableProvider;\r
import org.junit.Before;\r
import org.junit.Rule;\r
import org.junit.Test;\r
import org.junit.rules.ExpectedException;\r
+import org.onap.holmes.common.api.stat.VesAlarm;\r
import org.onap.holmes.engine.request.DeployRuleRequest;\r
import org.onap.holmes.common.api.entity.CorrelationRule;\r
-import org.onap.holmes.common.api.stat.Alarm;\r
-import org.onap.holmes.common.config.MQConfig;\r
import org.onap.holmes.common.constant.AlarmConst;\r
import org.onap.holmes.common.exception.CorrelationException;\r
import org.onap.holmes.engine.wrapper.RuleMgtWrapper;\r
\r
private StatefulKnowledgeSession ksession;\r
\r
- private IterableProvider<MQConfig> mqConfigProvider;\r
-\r
- private ConnectionFactory connectionFactory;\r
\r
private DroolsEngine droolsEngine;\r
\r
this.ksession = kbase.newStatefulKnowledgeSession();\r
\r
ruleMgtWrapper = PowerMock.createMock(RuleMgtWrapper.class);\r
- mqConfigProvider = PowerMock.createMock(IterableProvider.class);\r
- connectionFactory = PowerMock.createMock(ConnectionFactory.class);\r
\r
Whitebox.setInternalState(droolsEngine, "ruleMgtWrapper", ruleMgtWrapper);\r
- Whitebox.setInternalState(droolsEngine, "mqConfigProvider", mqConfigProvider);\r
+\r
Whitebox.setInternalState(droolsEngine, "kconf", kconf);\r
Whitebox.setInternalState(droolsEngine, "kbase", kbase);\r
Whitebox.setInternalState(droolsEngine, "ksession", ksession);\r
- Whitebox.setInternalState(droolsEngine, "connectionFactory", connectionFactory);\r
\r
PowerMock.resetAll();\r
}\r
\r
@Test\r
public void init() throws Exception {\r
- MQConfig mqConfig = new MQConfig();\r
- mqConfig.brokerIp = "127.0.0.1";\r
- mqConfig.brokerPort = 4567;\r
- mqConfig.brokerUsername = "admin";\r
- mqConfig.brokerPassword = "admin";\r
+\r
List<CorrelationRule> rules = new ArrayList<CorrelationRule>();\r
CorrelationRule rule = new CorrelationRule();\r
rule.setContent("content");\r
rules.add(rule);\r
\r
- expect(mqConfigProvider.get()).andReturn(mqConfig).anyTimes();\r
expect(ruleMgtWrapper.queryRuleByEnable(anyInt())).andReturn(rules);\r
PowerMock.replayAll();\r
\r
droolsEngine.compileRule(content, locale);\r
}\r
\r
-\r
@Test\r
public void putRaisedIntoStream_facthandle_is_null() {\r
- Alarm raiseAlarm = new Alarm();\r
+ VesAlarm raiseAlarm = new VesAlarm();\r
+ raiseAlarm.setVersion((long) 245235);\r
droolsEngine.putRaisedIntoStream(raiseAlarm);\r
droolsEngine.putRaisedIntoStream(raiseAlarm);\r
}\r
\r
@Test\r
public void putRaisedIntoStream_factHandle_is_not_null() {\r
- droolsEngine.putRaisedIntoStream(new Alarm());\r
- }\r
-\r
-\r
- @Test\r
- public void listener_receive() throws JMSException {\r
- DroolsEngine.AlarmMqMessageListener listener = droolsEngine.new AlarmMqMessageListener();\r
-\r
- Connection connection = PowerMock.createMock(Connection.class);\r
- Session session = PowerMock.createMock(Session.class);\r
- Destination destination = PowerMock.createMock(Topic.class);\r
- MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class);\r
-\r
- Whitebox.setInternalState(listener, "connection", connection);\r
- Whitebox.setInternalState(listener, "session", session);\r
- Whitebox.setInternalState(listener, "destination", destination);\r
- Whitebox.setInternalState(listener, "consumer", consumer);\r
-\r
- PowerMock.reset();\r
-\r
- expect(connectionFactory.createConnection()).andReturn(connection);\r
- connection.start();\r
- expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session);\r
- expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination);\r
- expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer);\r
- consumer.setMessageListener(listener);\r
-\r
- PowerMock.replayAll();\r
-\r
- listener.receive();\r
-\r
- PowerMock.verifyAll();\r
- }\r
-\r
- @Test\r
- public void listener_exception() throws JMSException {\r
- DroolsEngine.AlarmMqMessageListener listener = droolsEngine.new AlarmMqMessageListener();\r
-\r
- Connection connection = PowerMock.createMock(Connection.class);\r
- Session session = PowerMock.createMock(Session.class);\r
- Destination destination = PowerMock.createMock(Topic.class);\r
- MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class);\r
-\r
- Whitebox.setInternalState(listener, "connection", connection);\r
- Whitebox.setInternalState(listener, "session", session);\r
- Whitebox.setInternalState(listener, "destination", destination);\r
- Whitebox.setInternalState(listener, "consumer", consumer);\r
-\r
- PowerMock.reset();\r
-\r
- expect(connectionFactory.createConnection()).andReturn(connection);\r
- connection.start();\r
- expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session);\r
- expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination);\r
- expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer);\r
- consumer.setMessageListener(listener);\r
- EasyMock.expectLastCall().andThrow(new JMSException(""));\r
-\r
- consumer.close();\r
- session.close();\r
- connection.close();\r
-\r
- PowerMock.replayAll();\r
-\r
- listener.receive();\r
-\r
- PowerMock.verifyAll();\r
- }\r
-\r
- @Test\r
- public void listener_close_exception() throws JMSException {\r
- DroolsEngine.AlarmMqMessageListener listener = droolsEngine.new AlarmMqMessageListener();\r
-\r
- Connection connection = PowerMock.createMock(Connection.class);\r
- Session session = PowerMock.createMock(Session.class);\r
- Destination destination = PowerMock.createMock(Topic.class);\r
- MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class);\r
-\r
- Whitebox.setInternalState(listener, "connection", connection);\r
- Whitebox.setInternalState(listener, "session", session);\r
- Whitebox.setInternalState(listener, "destination", destination);\r
- Whitebox.setInternalState(listener, "consumer", consumer);\r
-\r
- PowerMock.reset();\r
-\r
- expect(connectionFactory.createConnection()).andReturn(connection);\r
- connection.start();\r
- expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session);\r
- expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination);\r
- expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer);\r
- consumer.setMessageListener(listener);\r
- EasyMock.expectLastCall().andThrow(new JMSException(""));\r
-\r
- consumer.close();\r
- EasyMock.expectLastCall().andThrow(new JMSException(""));\r
-\r
- PowerMock.replayAll();\r
-\r
- listener.receive();\r
-\r
- PowerMock.verifyAll();\r
- }\r
-\r
- @Test\r
- public void listener_on_message() throws JMSException {\r
- DroolsEngine.AlarmMqMessageListener listener = droolsEngine.new AlarmMqMessageListener();\r
- Alarm alarm = new Alarm();\r
- alarm.setAlarmKey("alarmKey");\r
- ActiveMQObjectMessage objectMessage = new ActiveMQObjectMessage();\r
- objectMessage.setObject(alarm);\r
-\r
- listener.onMessage(objectMessage);\r
+ VesAlarm raiseAlarm = new VesAlarm();\r
+ raiseAlarm.setVersion((long) 245235);\r
+ droolsEngine.putRaisedIntoStream(raiseAlarm);\r
}\r
\r
@Test\r