<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.onap.holmes.dsa</groupId>
+ <artifactId>dmaap-dsa</artifactId>
+ </dependency>
<dependency>
<groupId>org.onap.holmes.common</groupId>
<artifactId>holmes-actions</artifactId>
--- /dev/null
+/*
+ * Copyright 2017 ZTE Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.holmes.engine.dmaappolling;
+
+import java.util.ArrayList;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.holmes.common.api.stat.VesAlarm;
+import org.onap.holmes.common.exception.CorrelationException;
+import org.onap.holmes.dsa.dmaappolling.Subscriber;
+import org.onap.holmes.engine.manager.DroolsEngine;
+
+@Slf4j
+public class DMaaPPollingRequest implements Runnable {
+
+ private Subscriber subscriber;
+
+ private DroolsEngine droolsEngine;
+
+ public DMaaPPollingRequest(Subscriber subscriber, DroolsEngine droolsEngine) {
+ this.subscriber = subscriber;
+ this.droolsEngine = droolsEngine;
+ }
+
+ public void run() {
+ List<VesAlarm> vesAlarmList = new ArrayList<>();
+ try {
+ vesAlarmList = subscriber.subscribe();
+ } catch (CorrelationException e) {
+ log.error("Failed polling request alarm." + e.getMessage());
+ }
+ vesAlarmList.forEach(vesAlarm -> droolsEngine.putRaisedIntoStream(vesAlarm));
+ }
+}
--- /dev/null
+/*
+ * Copyright 2017 ZTE Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.holmes.engine.dmaappolling;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import javax.inject.Inject;
+import org.jvnet.hk2.annotations.Service;
+import org.onap.holmes.dsa.dmaappolling.Subscriber;
+import org.onap.holmes.engine.manager.DroolsEngine;
+
+@Service
+public class SubscriberAction {
+
+ @Inject
+ private DroolsEngine droolsEngine;
+
+ private ConcurrentHashMap<String, ScheduledFuture> pollingRequests = new ConcurrentHashMap<String, ScheduledFuture>();
+ private ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+
+ public void addSubscriber(Subscriber subscriber) {
+ DMaaPPollingRequest pollingTask = new DMaaPPollingRequest(subscriber, droolsEngine);
+ ScheduledFuture future = service
+ .scheduleAtFixedRate(pollingTask, 0, subscriber.getPeriod(), TimeUnit.MILLISECONDS);
+ pollingRequests.put(subscriber.getTopic(), future);
+ }
+
+ public void removeSubscriber(Subscriber subscriber) {
+ ScheduledFuture future = pollingRequests.get(subscriber.getTopic());
+ if (future != null) {
+ future.cancel(true);
+ }
+ pollingRequests.remove(subscriber.getTopic());
+ }
+}
package org.onap.holmes.engine.manager;\r
\r
\r
-import java.io.Serializable;\r
import java.io.StringReader;\r
import java.util.HashSet;\r
import java.util.List;\r
import java.util.Set;\r
import javax.annotation.PostConstruct;\r
import javax.inject.Inject;\r
-import javax.jms.Connection;\r
-import javax.jms.ConnectionFactory;\r
-import javax.jms.Destination;\r
-import javax.jms.JMSException;\r
-import javax.jms.Message;\r
-import javax.jms.MessageConsumer;\r
-import javax.jms.MessageListener;\r
-import javax.jms.Session;\r
import lombok.extern.slf4j.Slf4j;\r
-import org.apache.activemq.ActiveMQConnectionFactory;\r
-import org.apache.activemq.command.ActiveMQObjectMessage;\r
import org.drools.KnowledgeBase;\r
import org.drools.KnowledgeBaseConfiguration;\r
import org.drools.KnowledgeBaseFactory;\r
import org.drools.io.ResourceFactory;\r
import org.drools.runtime.StatefulKnowledgeSession;\r
import org.drools.runtime.rule.FactHandle;\r
-import org.glassfish.hk2.api.IterableProvider;\r
import org.jvnet.hk2.annotations.Service;\r
+import org.onap.holmes.common.api.stat.VesAlarm;\r
import org.onap.holmes.engine.request.DeployRuleRequest;\r
import org.onap.holmes.common.api.entity.CorrelationRule;\r
-import org.onap.holmes.common.api.stat.Alarm;\r
-import org.onap.holmes.common.config.MQConfig;\r
-import org.onap.holmes.common.constant.AlarmConst;\r
import org.onap.holmes.common.exception.CorrelationException;\r
import org.onap.holmes.common.utils.ExceptionUtil;\r
import org.onap.holmes.engine.wrapper.RuleMgtWrapper;\r
private KnowledgeBase kbase;\r
private KnowledgeBaseConfiguration kconf;\r
private StatefulKnowledgeSession ksession;\r
- @Inject\r
- private IterableProvider<MQConfig> mqConfigProvider;\r
- private ConnectionFactory connectionFactory;\r
\r
@PostConstruct\r
private void init() {\r
try {\r
- // 1. start engine\r
+ // start engine\r
start();\r
- // 2. start mq listener\r
- registerAlarmTopicListener();\r
} catch (Exception e) {\r
log.error("Failed to start the service: " + e.getMessage(), e);\r
throw ExceptionUtil.buildExceptionResponse("Failed to start the drools engine!");\r
}\r
}\r
\r
- private void registerAlarmTopicListener() {\r
- String brokerURL =\r
- "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;\r
- connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,\r
- mqConfigProvider.get().brokerPassword, brokerURL);\r
-\r
- AlarmMqMessageListener listener = new AlarmMqMessageListener();\r
- listener.receive();\r
- }\r
-\r
-\r
private void start() throws CorrelationException {\r
log.info("Drools Engine Initialize Beginning...");\r
\r
}\r
}\r
\r
- public void putRaisedIntoStream(Alarm raiseAlarm) {\r
+ public void putRaisedIntoStream(VesAlarm raiseAlarm) {\r
FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm);\r
if (factHandle != null) {\r
this.ksession.retract(factHandle);\r
this.ksession.fireAllRules();\r
}\r
\r
- class AlarmMqMessageListener implements MessageListener {\r
-\r
- private Connection connection = null;\r
- private Session session = null;\r
- private Destination destination = null;\r
- private MessageConsumer consumer = null;\r
-\r
- private void initialize() throws JMSException {\r
- connection = connectionFactory.createConnection();\r
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);\r
- destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);\r
- consumer = session.createConsumer(destination);\r
- connection.start();\r
- }\r
-\r
- public void receive() {\r
- try {\r
- initialize();\r
- consumer.setMessageListener(this);\r
- } catch (JMSException e) {\r
- log.error("Failed to connect to the MQ service : " + e.getMessage(), e);\r
- try {\r
- close();\r
- } catch (JMSException e1) {\r
- log.error("Failed close connection " + e1.getMessage(), e1);\r
- }\r
- }\r
- }\r
-\r
- public void onMessage(Message arg0) {\r
- ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;\r
- try {\r
- Serializable object = objectMessage.getObject();\r
-\r
- if (object instanceof Alarm) {\r
- Alarm alarm = (Alarm) object;\r
- putRaisedIntoStream(alarm);\r
- }\r
- } catch (JMSException e) {\r
- log.error("Failed get object : " + e.getMessage(), e);\r
- }\r
- }\r
-\r
- private void close() throws JMSException {\r
- if (consumer != null) {\r
- consumer.close();\r
- }\r
- if (session != null) {\r
- session.close();\r
- }\r
- if (connection != null) {\r
- connection.close();\r
- }\r
- }\r
- }\r
}\r
--- /dev/null
+/**
+ * Copyright 2017 ZTE Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.holmes.engine.mqconsumer;
+
+import java.io.Serializable;
+import javax.inject.Inject;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.glassfish.hk2.api.IterableProvider;
+import org.jvnet.hk2.annotations.Service;
+import org.onap.holmes.common.api.stat.VesAlarm;
+import org.onap.holmes.common.config.MQConfig;
+import org.onap.holmes.common.constant.AlarmConst;
+import org.onap.holmes.engine.manager.DroolsEngine;
+
+@Service
+@Slf4j
+@NoArgsConstructor
+public class MQConsumer {
+
+ @Inject
+ private IterableProvider<MQConfig> mqConfigProvider;
+ private ConnectionFactory connectionFactory;
+ private ConnectionFactory connectionFactory1;
+ @Inject
+ private DroolsEngine engine;
+
+ public void registerAlarmTopicListener() {
+ String brokerURL =
+ "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;
+ connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
+ mqConfigProvider.get().brokerPassword, brokerURL);
+
+ AlarmMqMessageListener listener = new AlarmMqMessageListener();
+ listener.receive();
+ }
+ class AlarmMqMessageListener implements MessageListener {
+
+ private Connection connection = null;
+ private Session session = null;
+ private Destination destination = null;
+ private MessageConsumer consumer = null;
+
+ private void initialize() throws JMSException {
+ connection = connectionFactory.createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
+ consumer = session.createConsumer(destination);
+ connection.start();
+ }
+
+ public void receive() {
+ try {
+ initialize();
+ consumer.setMessageListener(this);
+ } catch (JMSException e) {
+ log.error("Failed to connect to the MQ service : " + e.getMessage(), e);
+ try {
+ close();
+ } catch (JMSException e1) {
+ log.error("Failed close connection " + e1.getMessage(), e1);
+ }
+ }
+ }
+
+ public void onMessage(Message arg0) {
+ ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;
+ try {
+ Serializable object = objectMessage.getObject();
+ if (object instanceof VesAlarm) {
+ VesAlarm vesAlarm = (VesAlarm) object;
+ engine.putRaisedIntoStream(vesAlarm);
+ }
+ } catch (JMSException e) {
+ log.error("Failed get object : " + e.getMessage(), e);
+ }
+ }
+
+ private void close() throws JMSException {
+ if (consumer != null) {
+ consumer.close();
+ }
+ if (session != null) {
+ session.close();
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+}
\r
package org.onap.holmes.engine.manager;\r
\r
-import static org.easymock.EasyMock.anyBoolean;\r
import static org.easymock.EasyMock.anyInt;\r
-import static org.easymock.EasyMock.anyObject;\r
import static org.easymock.EasyMock.expect;\r
\r
import java.lang.reflect.Method;\r
import java.util.ArrayList;\r
import java.util.List;\r
import java.util.Locale;\r
-import javax.jms.Connection;\r
-import javax.jms.ConnectionFactory;\r
-import javax.jms.Destination;\r
-import javax.jms.JMSException;\r
-import javax.jms.MessageConsumer;\r
-import javax.jms.Session;\r
-import javax.jms.Topic;\r
-import org.apache.activemq.command.ActiveMQObjectMessage;\r
import org.drools.KnowledgeBase;\r
import org.drools.KnowledgeBaseConfiguration;\r
import org.drools.KnowledgeBaseFactory;\r
import org.drools.conf.EventProcessingOption;\r
import org.drools.runtime.StatefulKnowledgeSession;\r
-import org.easymock.EasyMock;\r
-import org.glassfish.hk2.api.IterableProvider;\r
import org.junit.Before;\r
import org.junit.Rule;\r
import org.junit.Test;\r
import org.junit.rules.ExpectedException;\r
+import org.onap.holmes.common.api.stat.VesAlarm;\r
import org.onap.holmes.engine.request.DeployRuleRequest;\r
import org.onap.holmes.common.api.entity.CorrelationRule;\r
-import org.onap.holmes.common.api.stat.Alarm;\r
-import org.onap.holmes.common.config.MQConfig;\r
import org.onap.holmes.common.constant.AlarmConst;\r
import org.onap.holmes.common.exception.CorrelationException;\r
import org.onap.holmes.engine.wrapper.RuleMgtWrapper;\r
\r
private StatefulKnowledgeSession ksession;\r
\r
- private IterableProvider<MQConfig> mqConfigProvider;\r
-\r
- private ConnectionFactory connectionFactory;\r
\r
private DroolsEngine droolsEngine;\r
\r
this.ksession = kbase.newStatefulKnowledgeSession();\r
\r
ruleMgtWrapper = PowerMock.createMock(RuleMgtWrapper.class);\r
- mqConfigProvider = PowerMock.createMock(IterableProvider.class);\r
- connectionFactory = PowerMock.createMock(ConnectionFactory.class);\r
\r
Whitebox.setInternalState(droolsEngine, "ruleMgtWrapper", ruleMgtWrapper);\r
- Whitebox.setInternalState(droolsEngine, "mqConfigProvider", mqConfigProvider);\r
+\r
Whitebox.setInternalState(droolsEngine, "kconf", kconf);\r
Whitebox.setInternalState(droolsEngine, "kbase", kbase);\r
Whitebox.setInternalState(droolsEngine, "ksession", ksession);\r
- Whitebox.setInternalState(droolsEngine, "connectionFactory", connectionFactory);\r
\r
PowerMock.resetAll();\r
}\r
\r
@Test\r
public void init() throws Exception {\r
- MQConfig mqConfig = new MQConfig();\r
- mqConfig.brokerIp = "127.0.0.1";\r
- mqConfig.brokerPort = 4567;\r
- mqConfig.brokerUsername = "admin";\r
- mqConfig.brokerPassword = "admin";\r
+\r
List<CorrelationRule> rules = new ArrayList<CorrelationRule>();\r
CorrelationRule rule = new CorrelationRule();\r
rule.setContent("content");\r
rules.add(rule);\r
\r
- expect(mqConfigProvider.get()).andReturn(mqConfig).anyTimes();\r
expect(ruleMgtWrapper.queryRuleByEnable(anyInt())).andReturn(rules);\r
PowerMock.replayAll();\r
\r
droolsEngine.compileRule(content, locale);\r
}\r
\r
-\r
@Test\r
public void putRaisedIntoStream_facthandle_is_null() {\r
- Alarm raiseAlarm = new Alarm();\r
+ VesAlarm raiseAlarm = new VesAlarm();\r
+ raiseAlarm.setVersion((long) 245235);\r
droolsEngine.putRaisedIntoStream(raiseAlarm);\r
droolsEngine.putRaisedIntoStream(raiseAlarm);\r
}\r
\r
@Test\r
public void putRaisedIntoStream_factHandle_is_not_null() {\r
- droolsEngine.putRaisedIntoStream(new Alarm());\r
- }\r
-\r
-\r
- @Test\r
- public void listener_receive() throws JMSException {\r
- DroolsEngine.AlarmMqMessageListener listener = droolsEngine.new AlarmMqMessageListener();\r
-\r
- Connection connection = PowerMock.createMock(Connection.class);\r
- Session session = PowerMock.createMock(Session.class);\r
- Destination destination = PowerMock.createMock(Topic.class);\r
- MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class);\r
-\r
- Whitebox.setInternalState(listener, "connection", connection);\r
- Whitebox.setInternalState(listener, "session", session);\r
- Whitebox.setInternalState(listener, "destination", destination);\r
- Whitebox.setInternalState(listener, "consumer", consumer);\r
-\r
- PowerMock.reset();\r
-\r
- expect(connectionFactory.createConnection()).andReturn(connection);\r
- connection.start();\r
- expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session);\r
- expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination);\r
- expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer);\r
- consumer.setMessageListener(listener);\r
-\r
- PowerMock.replayAll();\r
-\r
- listener.receive();\r
-\r
- PowerMock.verifyAll();\r
- }\r
-\r
- @Test\r
- public void listener_exception() throws JMSException {\r
- DroolsEngine.AlarmMqMessageListener listener = droolsEngine.new AlarmMqMessageListener();\r
-\r
- Connection connection = PowerMock.createMock(Connection.class);\r
- Session session = PowerMock.createMock(Session.class);\r
- Destination destination = PowerMock.createMock(Topic.class);\r
- MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class);\r
-\r
- Whitebox.setInternalState(listener, "connection", connection);\r
- Whitebox.setInternalState(listener, "session", session);\r
- Whitebox.setInternalState(listener, "destination", destination);\r
- Whitebox.setInternalState(listener, "consumer", consumer);\r
-\r
- PowerMock.reset();\r
-\r
- expect(connectionFactory.createConnection()).andReturn(connection);\r
- connection.start();\r
- expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session);\r
- expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination);\r
- expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer);\r
- consumer.setMessageListener(listener);\r
- EasyMock.expectLastCall().andThrow(new JMSException(""));\r
-\r
- consumer.close();\r
- session.close();\r
- connection.close();\r
-\r
- PowerMock.replayAll();\r
-\r
- listener.receive();\r
-\r
- PowerMock.verifyAll();\r
- }\r
-\r
- @Test\r
- public void listener_close_exception() throws JMSException {\r
- DroolsEngine.AlarmMqMessageListener listener = droolsEngine.new AlarmMqMessageListener();\r
-\r
- Connection connection = PowerMock.createMock(Connection.class);\r
- Session session = PowerMock.createMock(Session.class);\r
- Destination destination = PowerMock.createMock(Topic.class);\r
- MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class);\r
-\r
- Whitebox.setInternalState(listener, "connection", connection);\r
- Whitebox.setInternalState(listener, "session", session);\r
- Whitebox.setInternalState(listener, "destination", destination);\r
- Whitebox.setInternalState(listener, "consumer", consumer);\r
-\r
- PowerMock.reset();\r
-\r
- expect(connectionFactory.createConnection()).andReturn(connection);\r
- connection.start();\r
- expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session);\r
- expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination);\r
- expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer);\r
- consumer.setMessageListener(listener);\r
- EasyMock.expectLastCall().andThrow(new JMSException(""));\r
-\r
- consumer.close();\r
- EasyMock.expectLastCall().andThrow(new JMSException(""));\r
-\r
- PowerMock.replayAll();\r
-\r
- listener.receive();\r
-\r
- PowerMock.verifyAll();\r
- }\r
-\r
- @Test\r
- public void listener_on_message() throws JMSException {\r
- DroolsEngine.AlarmMqMessageListener listener = droolsEngine.new AlarmMqMessageListener();\r
- Alarm alarm = new Alarm();\r
- alarm.setAlarmKey("alarmKey");\r
- ActiveMQObjectMessage objectMessage = new ActiveMQObjectMessage();\r
- objectMessage.setObject(alarm);\r
-\r
- listener.onMessage(objectMessage);\r
+ VesAlarm raiseAlarm = new VesAlarm();\r
+ raiseAlarm.setVersion((long) 245235);\r
+ droolsEngine.putRaisedIntoStream(raiseAlarm);\r
}\r
\r
@Test\r
--- /dev/null
+/**
+ * Copyright 2017 ZTE Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.holmes.engine.mqconsumer;
+
+import static org.easymock.EasyMock.anyBoolean;
+import static org.easymock.EasyMock.anyInt;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
+
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.easymock.EasyMock;
+import org.glassfish.hk2.api.IterableProvider;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.holmes.common.api.stat.Alarm;
+import org.onap.holmes.common.config.MQConfig;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.reflect.Whitebox;
+
+public class MQConsumerTest {
+
+ private IterableProvider<MQConfig> mqConfigProvider;
+
+ private ConnectionFactory connectionFactory;
+
+ private MQConsumer mqConsumer;
+
+ private MQConsumer mqConsumer1;
+
+ private MQConsumer mqConsumer2;
+
+ @Before
+ public void setUp() {
+
+ mqConsumer = new MQConsumer();
+
+ mqConfigProvider = PowerMock.createMock(IterableProvider.class);
+ connectionFactory = PowerMock.createMock(ConnectionFactory.class);
+
+ Whitebox.setInternalState(mqConsumer, "mqConfigProvider", mqConfigProvider);
+ Whitebox.setInternalState(mqConsumer, "connectionFactory", connectionFactory);
+ }
+
+ @Test
+ public void init() throws Exception {
+ MQConfig mqConfig = new MQConfig();
+ mqConfig.brokerIp = "127.0.0.1";
+ mqConfig.brokerPort = 4567;
+ mqConfig.brokerUsername = "admin";
+ mqConfig.brokerPassword = "admin";
+
+ expect(mqConfigProvider.get()).andReturn(mqConfig).anyTimes();
+ PowerMock.replayAll();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void listener_receive() throws JMSException {
+ MQConsumer.AlarmMqMessageListener listener = mqConsumer.new AlarmMqMessageListener();
+
+ Connection connection = PowerMock.createMock(Connection.class);
+ Session session = PowerMock.createMock(Session.class);
+ Destination destination = PowerMock.createMock(Topic.class);
+ MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class);
+
+ Whitebox.setInternalState(listener, "connection", connection);
+ Whitebox.setInternalState(listener, "session", session);
+ Whitebox.setInternalState(listener, "destination", destination);
+ Whitebox.setInternalState(listener, "consumer", consumer);
+
+ PowerMock.reset();
+
+ expect(connectionFactory.createConnection()).andReturn(connection);
+ connection.start();
+ expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session);
+ expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination);
+ expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer);
+ consumer.setMessageListener(listener);
+
+ PowerMock.replayAll();
+
+ listener.receive();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void listener_exception() throws JMSException {
+ MQConsumer.AlarmMqMessageListener listener = mqConsumer.new AlarmMqMessageListener();
+
+ Connection connection = PowerMock.createMock(Connection.class);
+ Session session = PowerMock.createMock(Session.class);
+ Destination destination = PowerMock.createMock(Topic.class);
+ MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class);
+
+ Whitebox.setInternalState(listener, "connection", connection);
+ Whitebox.setInternalState(listener, "session", session);
+ Whitebox.setInternalState(listener, "destination", destination);
+ Whitebox.setInternalState(listener, "consumer", consumer);
+
+ PowerMock.reset();
+
+ expect(connectionFactory.createConnection()).andReturn(connection);
+ connection.start();
+ expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session);
+ expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination);
+ expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer);
+ consumer.setMessageListener(listener);
+ EasyMock.expectLastCall().andThrow(new JMSException(""));
+
+ consumer.close();
+ session.close();
+ connection.close();
+
+ PowerMock.replayAll();
+
+ listener.receive();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void listener_close_exception() throws JMSException {
+ MQConsumer.AlarmMqMessageListener listener = mqConsumer.new AlarmMqMessageListener();
+
+ Connection connection = PowerMock.createMock(Connection.class);
+ Session session = PowerMock.createMock(Session.class);
+ Destination destination = PowerMock.createMock(Topic.class);
+ MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class);
+
+ Whitebox.setInternalState(listener, "connection", connection);
+ Whitebox.setInternalState(listener, "session", session);
+ Whitebox.setInternalState(listener, "destination", destination);
+ Whitebox.setInternalState(listener, "consumer", consumer);
+
+ PowerMock.reset();
+
+ expect(connectionFactory.createConnection()).andReturn(connection);
+ connection.start();
+ expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session);
+ expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination);
+ expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer);
+ consumer.setMessageListener(listener);
+ EasyMock.expectLastCall().andThrow(new JMSException(""));
+
+ consumer.close();
+ EasyMock.expectLastCall().andThrow(new JMSException(""));
+
+ PowerMock.replayAll();
+
+ listener.receive();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void listener_on_message() throws JMSException {
+ MQConsumer.AlarmMqMessageListener listener = mqConsumer.new AlarmMqMessageListener();
+ Alarm alarm = new Alarm();
+ alarm.setAlarmKey("alarmKey");
+ ActiveMQObjectMessage objectMessage = new ActiveMQObjectMessage();
+ objectMessage.setObject(alarm);
+
+ listener.onMessage(objectMessage);
+ }
+}
<artifactId>holmes-engine-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>pom</packaging>
- <name>holmes-engine-parent</name>
+ <name>holmes-engine-management</name>
<modules>
<module>engine-d</module>
<module>engine-d-standalone</module>
<artifactId>holmes-actions</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.onap.holmes.dsa</groupId>
+ <artifactId>dmaap-dsa</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>io.dropwizard</groupId>
<artifactId>dropwizard-core</artifactId>