From: tang peng Date: Thu, 14 Sep 2017 04:43:01 +0000 (+0000) Subject: Merge "Change the Version of Lombok to 1.16.8" X-Git-Tag: v1.0.0~39 X-Git-Url: https://gerrit.onap.org/r/gitweb?a=commitdiff_plain;h=c196f6fc8b716f76072e0acaa0324e18f687a660;hp=4f6224aef34b58b624d087db68619ca85797c777;p=holmes%2Fengine-management.git Merge "Change the Version of Lombok to 1.16.8" --- diff --git a/engine-d/pom.xml b/engine-d/pom.xml index bc8cb20..a5f85dc 100644 --- a/engine-d/pom.xml +++ b/engine-d/pom.xml @@ -42,6 +42,10 @@ org.reflections reflections + + org.onap.holmes.dsa + dmaap-dsa + org.onap.holmes.common holmes-actions diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/DMaaPPollingRequest.java b/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/DMaaPPollingRequest.java new file mode 100644 index 0000000..24e0817 --- /dev/null +++ b/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/DMaaPPollingRequest.java @@ -0,0 +1,47 @@ +/* + * Copyright 2017 ZTE Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.holmes.engine.dmaappolling; + +import java.util.ArrayList; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.onap.holmes.common.api.stat.VesAlarm; +import org.onap.holmes.common.exception.CorrelationException; +import org.onap.holmes.dsa.dmaappolling.Subscriber; +import org.onap.holmes.engine.manager.DroolsEngine; + +@Slf4j +public class DMaaPPollingRequest implements Runnable { + + private Subscriber subscriber; + + private DroolsEngine droolsEngine; + + public DMaaPPollingRequest(Subscriber subscriber, DroolsEngine droolsEngine) { + this.subscriber = subscriber; + this.droolsEngine = droolsEngine; + } + + public void run() { + List vesAlarmList = new ArrayList<>(); + try { + vesAlarmList = subscriber.subscribe(); + } catch (CorrelationException e) { + log.error("Failed polling request alarm." + e.getMessage()); + } + vesAlarmList.forEach(vesAlarm -> droolsEngine.putRaisedIntoStream(vesAlarm)); + } +} diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java b/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java new file mode 100644 index 0000000..1e71899 --- /dev/null +++ b/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java @@ -0,0 +1,51 @@ +/* + * Copyright 2017 ZTE Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.holmes.engine.dmaappolling; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import javax.inject.Inject; +import org.jvnet.hk2.annotations.Service; +import org.onap.holmes.dsa.dmaappolling.Subscriber; +import org.onap.holmes.engine.manager.DroolsEngine; + +@Service +public class SubscriberAction { + + @Inject + private DroolsEngine droolsEngine; + + private ConcurrentHashMap pollingRequests = new ConcurrentHashMap(); + private ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); + + public void addSubscriber(Subscriber subscriber) { + DMaaPPollingRequest pollingTask = new DMaaPPollingRequest(subscriber, droolsEngine); + ScheduledFuture future = service + .scheduleAtFixedRate(pollingTask, 0, subscriber.getPeriod(), TimeUnit.MILLISECONDS); + pollingRequests.put(subscriber.getTopic(), future); + } + + public void removeSubscriber(Subscriber subscriber) { + ScheduledFuture future = pollingRequests.get(subscriber.getTopic()); + if (future != null) { + future.cancel(true); + } + pollingRequests.remove(subscriber.getTopic()); + } +} diff --git a/engine-d/src/main/java/org/onap/holmes/engine/manager/DroolsEngine.java b/engine-d/src/main/java/org/onap/holmes/engine/manager/DroolsEngine.java index 5d1f442..b23dde0 100644 --- a/engine-d/src/main/java/org/onap/holmes/engine/manager/DroolsEngine.java +++ b/engine-d/src/main/java/org/onap/holmes/engine/manager/DroolsEngine.java @@ -16,7 +16,6 @@ package org.onap.holmes.engine.manager; -import java.io.Serializable; import java.io.StringReader; import java.util.HashSet; import java.util.List; @@ -24,17 +23,7 @@ import java.util.Locale; import java.util.Set; import javax.annotation.PostConstruct; import javax.inject.Inject; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; import lombok.extern.slf4j.Slf4j; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.command.ActiveMQObjectMessage; import org.drools.KnowledgeBase; import org.drools.KnowledgeBaseConfiguration; import org.drools.KnowledgeBaseFactory; @@ -47,13 +36,10 @@ import org.drools.io.Resource; import org.drools.io.ResourceFactory; import org.drools.runtime.StatefulKnowledgeSession; import org.drools.runtime.rule.FactHandle; -import org.glassfish.hk2.api.IterableProvider; import org.jvnet.hk2.annotations.Service; +import org.onap.holmes.common.api.stat.VesAlarm; import org.onap.holmes.engine.request.DeployRuleRequest; import org.onap.holmes.common.api.entity.CorrelationRule; -import org.onap.holmes.common.api.stat.Alarm; -import org.onap.holmes.common.config.MQConfig; -import org.onap.holmes.common.constant.AlarmConst; import org.onap.holmes.common.exception.CorrelationException; import org.onap.holmes.common.utils.ExceptionUtil; import org.onap.holmes.engine.wrapper.RuleMgtWrapper; @@ -69,34 +55,18 @@ public class DroolsEngine { private KnowledgeBase kbase; private KnowledgeBaseConfiguration kconf; private StatefulKnowledgeSession ksession; - @Inject - private IterableProvider mqConfigProvider; - private ConnectionFactory connectionFactory; @PostConstruct private void init() { try { - // 1. start engine + // start engine start(); - // 2. start mq listener - registerAlarmTopicListener(); } catch (Exception e) { log.error("Failed to start the service: " + e.getMessage(), e); throw ExceptionUtil.buildExceptionResponse("Failed to start the drools engine!"); } } - private void registerAlarmTopicListener() { - String brokerURL = - "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort; - connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername, - mqConfigProvider.get().brokerPassword, brokerURL); - - AlarmMqMessageListener listener = new AlarmMqMessageListener(); - listener.receive(); - } - - private void start() throws CorrelationException { log.info("Drools Engine Initialize Beginning..."); @@ -219,7 +189,7 @@ public class DroolsEngine { } } - public void putRaisedIntoStream(Alarm raiseAlarm) { + public void putRaisedIntoStream(VesAlarm raiseAlarm) { FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm); if (factHandle != null) { this.ksession.retract(factHandle); @@ -228,59 +198,4 @@ public class DroolsEngine { this.ksession.fireAllRules(); } - class AlarmMqMessageListener implements MessageListener { - - private Connection connection = null; - private Session session = null; - private Destination destination = null; - private MessageConsumer consumer = null; - - private void initialize() throws JMSException { - connection = connectionFactory.createConnection(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM); - consumer = session.createConsumer(destination); - connection.start(); - } - - public void receive() { - try { - initialize(); - consumer.setMessageListener(this); - } catch (JMSException e) { - log.error("Failed to connect to the MQ service : " + e.getMessage(), e); - try { - close(); - } catch (JMSException e1) { - log.error("Failed close connection " + e1.getMessage(), e1); - } - } - } - - public void onMessage(Message arg0) { - ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0; - try { - Serializable object = objectMessage.getObject(); - - if (object instanceof Alarm) { - Alarm alarm = (Alarm) object; - putRaisedIntoStream(alarm); - } - } catch (JMSException e) { - log.error("Failed get object : " + e.getMessage(), e); - } - } - - private void close() throws JMSException { - if (consumer != null) { - consumer.close(); - } - if (session != null) { - session.close(); - } - if (connection != null) { - connection.close(); - } - } - } } diff --git a/engine-d/src/main/java/org/onap/holmes/engine/mqconsumer/MQConsumer.java b/engine-d/src/main/java/org/onap/holmes/engine/mqconsumer/MQConsumer.java new file mode 100644 index 0000000..bd77312 --- /dev/null +++ b/engine-d/src/main/java/org/onap/holmes/engine/mqconsumer/MQConsumer.java @@ -0,0 +1,114 @@ +/** + * Copyright 2017 ZTE Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.holmes.engine.mqconsumer; + +import java.io.Serializable; +import javax.inject.Inject; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQObjectMessage; +import org.glassfish.hk2.api.IterableProvider; +import org.jvnet.hk2.annotations.Service; +import org.onap.holmes.common.api.stat.VesAlarm; +import org.onap.holmes.common.config.MQConfig; +import org.onap.holmes.common.constant.AlarmConst; +import org.onap.holmes.engine.manager.DroolsEngine; + +@Service +@Slf4j +@NoArgsConstructor +public class MQConsumer { + + @Inject + private IterableProvider mqConfigProvider; + private ConnectionFactory connectionFactory; + private ConnectionFactory connectionFactory1; + @Inject + private DroolsEngine engine; + + public void registerAlarmTopicListener() { + String brokerURL = + "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort; + connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername, + mqConfigProvider.get().brokerPassword, brokerURL); + + AlarmMqMessageListener listener = new AlarmMqMessageListener(); + listener.receive(); + } + class AlarmMqMessageListener implements MessageListener { + + private Connection connection = null; + private Session session = null; + private Destination destination = null; + private MessageConsumer consumer = null; + + private void initialize() throws JMSException { + connection = connectionFactory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM); + consumer = session.createConsumer(destination); + connection.start(); + } + + public void receive() { + try { + initialize(); + consumer.setMessageListener(this); + } catch (JMSException e) { + log.error("Failed to connect to the MQ service : " + e.getMessage(), e); + try { + close(); + } catch (JMSException e1) { + log.error("Failed close connection " + e1.getMessage(), e1); + } + } + } + + public void onMessage(Message arg0) { + ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0; + try { + Serializable object = objectMessage.getObject(); + if (object instanceof VesAlarm) { + VesAlarm vesAlarm = (VesAlarm) object; + engine.putRaisedIntoStream(vesAlarm); + } + } catch (JMSException e) { + log.error("Failed get object : " + e.getMessage(), e); + } + } + + private void close() throws JMSException { + if (consumer != null) { + consumer.close(); + } + if (session != null) { + session.close(); + } + if (connection != null) { + connection.close(); + } + } + } +} diff --git a/engine-d/src/test/java/org/onap/holmes/engine/manager/DroolsEngineTest.java b/engine-d/src/test/java/org/onap/holmes/engine/manager/DroolsEngineTest.java index 5487177..885b5e4 100644 --- a/engine-d/src/test/java/org/onap/holmes/engine/manager/DroolsEngineTest.java +++ b/engine-d/src/test/java/org/onap/holmes/engine/manager/DroolsEngineTest.java @@ -16,38 +16,25 @@ package org.onap.holmes.engine.manager; -import static org.easymock.EasyMock.anyBoolean; import static org.easymock.EasyMock.anyInt; -import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.expect; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import java.util.Locale; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.jms.Topic; -import org.apache.activemq.command.ActiveMQObjectMessage; import org.drools.KnowledgeBase; import org.drools.KnowledgeBaseConfiguration; import org.drools.KnowledgeBaseFactory; import org.drools.conf.EventProcessingOption; import org.drools.runtime.StatefulKnowledgeSession; -import org.easymock.EasyMock; -import org.glassfish.hk2.api.IterableProvider; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.onap.holmes.common.api.stat.VesAlarm; import org.onap.holmes.engine.request.DeployRuleRequest; import org.onap.holmes.common.api.entity.CorrelationRule; -import org.onap.holmes.common.api.stat.Alarm; -import org.onap.holmes.common.config.MQConfig; import org.onap.holmes.common.constant.AlarmConst; import org.onap.holmes.common.exception.CorrelationException; import org.onap.holmes.engine.wrapper.RuleMgtWrapper; @@ -71,9 +58,6 @@ public class DroolsEngineTest { private StatefulKnowledgeSession ksession; - private IterableProvider mqConfigProvider; - - private ConnectionFactory connectionFactory; private DroolsEngine droolsEngine; @@ -88,32 +72,24 @@ public class DroolsEngineTest { this.ksession = kbase.newStatefulKnowledgeSession(); ruleMgtWrapper = PowerMock.createMock(RuleMgtWrapper.class); - mqConfigProvider = PowerMock.createMock(IterableProvider.class); - connectionFactory = PowerMock.createMock(ConnectionFactory.class); Whitebox.setInternalState(droolsEngine, "ruleMgtWrapper", ruleMgtWrapper); - Whitebox.setInternalState(droolsEngine, "mqConfigProvider", mqConfigProvider); + Whitebox.setInternalState(droolsEngine, "kconf", kconf); Whitebox.setInternalState(droolsEngine, "kbase", kbase); Whitebox.setInternalState(droolsEngine, "ksession", ksession); - Whitebox.setInternalState(droolsEngine, "connectionFactory", connectionFactory); PowerMock.resetAll(); } @Test public void init() throws Exception { - MQConfig mqConfig = new MQConfig(); - mqConfig.brokerIp = "127.0.0.1"; - mqConfig.brokerPort = 4567; - mqConfig.brokerUsername = "admin"; - mqConfig.brokerPassword = "admin"; + List rules = new ArrayList(); CorrelationRule rule = new CorrelationRule(); rule.setContent("content"); rules.add(rule); - expect(mqConfigProvider.get()).andReturn(mqConfig).anyTimes(); expect(ruleMgtWrapper.queryRuleByEnable(anyInt())).andReturn(rules); PowerMock.replayAll(); @@ -189,128 +165,19 @@ public class DroolsEngineTest { droolsEngine.compileRule(content, locale); } - @Test public void putRaisedIntoStream_facthandle_is_null() { - Alarm raiseAlarm = new Alarm(); + VesAlarm raiseAlarm = new VesAlarm(); + raiseAlarm.setVersion((long) 245235); droolsEngine.putRaisedIntoStream(raiseAlarm); droolsEngine.putRaisedIntoStream(raiseAlarm); } @Test public void putRaisedIntoStream_factHandle_is_not_null() { - droolsEngine.putRaisedIntoStream(new Alarm()); - } - - - @Test - public void listener_receive() throws JMSException { - DroolsEngine.AlarmMqMessageListener listener = droolsEngine.new AlarmMqMessageListener(); - - Connection connection = PowerMock.createMock(Connection.class); - Session session = PowerMock.createMock(Session.class); - Destination destination = PowerMock.createMock(Topic.class); - MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class); - - Whitebox.setInternalState(listener, "connection", connection); - Whitebox.setInternalState(listener, "session", session); - Whitebox.setInternalState(listener, "destination", destination); - Whitebox.setInternalState(listener, "consumer", consumer); - - PowerMock.reset(); - - expect(connectionFactory.createConnection()).andReturn(connection); - connection.start(); - expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session); - expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination); - expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer); - consumer.setMessageListener(listener); - - PowerMock.replayAll(); - - listener.receive(); - - PowerMock.verifyAll(); - } - - @Test - public void listener_exception() throws JMSException { - DroolsEngine.AlarmMqMessageListener listener = droolsEngine.new AlarmMqMessageListener(); - - Connection connection = PowerMock.createMock(Connection.class); - Session session = PowerMock.createMock(Session.class); - Destination destination = PowerMock.createMock(Topic.class); - MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class); - - Whitebox.setInternalState(listener, "connection", connection); - Whitebox.setInternalState(listener, "session", session); - Whitebox.setInternalState(listener, "destination", destination); - Whitebox.setInternalState(listener, "consumer", consumer); - - PowerMock.reset(); - - expect(connectionFactory.createConnection()).andReturn(connection); - connection.start(); - expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session); - expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination); - expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer); - consumer.setMessageListener(listener); - EasyMock.expectLastCall().andThrow(new JMSException("")); - - consumer.close(); - session.close(); - connection.close(); - - PowerMock.replayAll(); - - listener.receive(); - - PowerMock.verifyAll(); - } - - @Test - public void listener_close_exception() throws JMSException { - DroolsEngine.AlarmMqMessageListener listener = droolsEngine.new AlarmMqMessageListener(); - - Connection connection = PowerMock.createMock(Connection.class); - Session session = PowerMock.createMock(Session.class); - Destination destination = PowerMock.createMock(Topic.class); - MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class); - - Whitebox.setInternalState(listener, "connection", connection); - Whitebox.setInternalState(listener, "session", session); - Whitebox.setInternalState(listener, "destination", destination); - Whitebox.setInternalState(listener, "consumer", consumer); - - PowerMock.reset(); - - expect(connectionFactory.createConnection()).andReturn(connection); - connection.start(); - expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session); - expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination); - expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer); - consumer.setMessageListener(listener); - EasyMock.expectLastCall().andThrow(new JMSException("")); - - consumer.close(); - EasyMock.expectLastCall().andThrow(new JMSException("")); - - PowerMock.replayAll(); - - listener.receive(); - - PowerMock.verifyAll(); - } - - @Test - public void listener_on_message() throws JMSException { - DroolsEngine.AlarmMqMessageListener listener = droolsEngine.new AlarmMqMessageListener(); - Alarm alarm = new Alarm(); - alarm.setAlarmKey("alarmKey"); - ActiveMQObjectMessage objectMessage = new ActiveMQObjectMessage(); - objectMessage.setObject(alarm); - - listener.onMessage(objectMessage); + VesAlarm raiseAlarm = new VesAlarm(); + raiseAlarm.setVersion((long) 245235); + droolsEngine.putRaisedIntoStream(raiseAlarm); } @Test diff --git a/engine-d/src/test/java/org/onap/holmes/engine/mqconsumer/MQConsumerTest.java b/engine-d/src/test/java/org/onap/holmes/engine/mqconsumer/MQConsumerTest.java new file mode 100644 index 0000000..b1ea3cb --- /dev/null +++ b/engine-d/src/test/java/org/onap/holmes/engine/mqconsumer/MQConsumerTest.java @@ -0,0 +1,189 @@ +/** + * Copyright 2017 ZTE Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.holmes.engine.mqconsumer; + +import static org.easymock.EasyMock.anyBoolean; +import static org.easymock.EasyMock.anyInt; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.expect; + + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.Topic; +import org.apache.activemq.command.ActiveMQObjectMessage; +import org.easymock.EasyMock; +import org.glassfish.hk2.api.IterableProvider; +import org.junit.Before; +import org.junit.Test; +import org.onap.holmes.common.api.stat.Alarm; +import org.onap.holmes.common.config.MQConfig; +import org.powermock.api.easymock.PowerMock; +import org.powermock.reflect.Whitebox; + +public class MQConsumerTest { + + private IterableProvider mqConfigProvider; + + private ConnectionFactory connectionFactory; + + private MQConsumer mqConsumer; + + private MQConsumer mqConsumer1; + + private MQConsumer mqConsumer2; + + @Before + public void setUp() { + + mqConsumer = new MQConsumer(); + + mqConfigProvider = PowerMock.createMock(IterableProvider.class); + connectionFactory = PowerMock.createMock(ConnectionFactory.class); + + Whitebox.setInternalState(mqConsumer, "mqConfigProvider", mqConfigProvider); + Whitebox.setInternalState(mqConsumer, "connectionFactory", connectionFactory); + } + + @Test + public void init() throws Exception { + MQConfig mqConfig = new MQConfig(); + mqConfig.brokerIp = "127.0.0.1"; + mqConfig.brokerPort = 4567; + mqConfig.brokerUsername = "admin"; + mqConfig.brokerPassword = "admin"; + + expect(mqConfigProvider.get()).andReturn(mqConfig).anyTimes(); + PowerMock.replayAll(); + + PowerMock.verifyAll(); + } + + @Test + public void listener_receive() throws JMSException { + MQConsumer.AlarmMqMessageListener listener = mqConsumer.new AlarmMqMessageListener(); + + Connection connection = PowerMock.createMock(Connection.class); + Session session = PowerMock.createMock(Session.class); + Destination destination = PowerMock.createMock(Topic.class); + MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class); + + Whitebox.setInternalState(listener, "connection", connection); + Whitebox.setInternalState(listener, "session", session); + Whitebox.setInternalState(listener, "destination", destination); + Whitebox.setInternalState(listener, "consumer", consumer); + + PowerMock.reset(); + + expect(connectionFactory.createConnection()).andReturn(connection); + connection.start(); + expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session); + expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination); + expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer); + consumer.setMessageListener(listener); + + PowerMock.replayAll(); + + listener.receive(); + + PowerMock.verifyAll(); + } + + @Test + public void listener_exception() throws JMSException { + MQConsumer.AlarmMqMessageListener listener = mqConsumer.new AlarmMqMessageListener(); + + Connection connection = PowerMock.createMock(Connection.class); + Session session = PowerMock.createMock(Session.class); + Destination destination = PowerMock.createMock(Topic.class); + MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class); + + Whitebox.setInternalState(listener, "connection", connection); + Whitebox.setInternalState(listener, "session", session); + Whitebox.setInternalState(listener, "destination", destination); + Whitebox.setInternalState(listener, "consumer", consumer); + + PowerMock.reset(); + + expect(connectionFactory.createConnection()).andReturn(connection); + connection.start(); + expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session); + expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination); + expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer); + consumer.setMessageListener(listener); + EasyMock.expectLastCall().andThrow(new JMSException("")); + + consumer.close(); + session.close(); + connection.close(); + + PowerMock.replayAll(); + + listener.receive(); + + PowerMock.verifyAll(); + } + + @Test + public void listener_close_exception() throws JMSException { + MQConsumer.AlarmMqMessageListener listener = mqConsumer.new AlarmMqMessageListener(); + + Connection connection = PowerMock.createMock(Connection.class); + Session session = PowerMock.createMock(Session.class); + Destination destination = PowerMock.createMock(Topic.class); + MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class); + + Whitebox.setInternalState(listener, "connection", connection); + Whitebox.setInternalState(listener, "session", session); + Whitebox.setInternalState(listener, "destination", destination); + Whitebox.setInternalState(listener, "consumer", consumer); + + PowerMock.reset(); + + expect(connectionFactory.createConnection()).andReturn(connection); + connection.start(); + expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session); + expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination); + expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer); + consumer.setMessageListener(listener); + EasyMock.expectLastCall().andThrow(new JMSException("")); + + consumer.close(); + EasyMock.expectLastCall().andThrow(new JMSException("")); + + PowerMock.replayAll(); + + listener.receive(); + + PowerMock.verifyAll(); + } + + @Test + public void listener_on_message() throws JMSException { + MQConsumer.AlarmMqMessageListener listener = mqConsumer.new AlarmMqMessageListener(); + Alarm alarm = new Alarm(); + alarm.setAlarmKey("alarmKey"); + ActiveMQObjectMessage objectMessage = new ActiveMQObjectMessage(); + objectMessage.setObject(alarm); + + listener.onMessage(objectMessage); + } +} diff --git a/pom.xml b/pom.xml index 8dcb2f1..3b517e8 100644 --- a/pom.xml +++ b/pom.xml @@ -28,7 +28,7 @@ holmes-engine-parent 1.0.0-SNAPSHOT pom - holmes-engine-parent + holmes-engine-management engine-d engine-d-standalone @@ -92,6 +92,11 @@ holmes-actions ${project.version} + + org.onap.holmes.dsa + dmaap-dsa + ${project.version} + io.dropwizard dropwizard-core