* ONAP
* ================================================================================
* Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.contains;
import static org.mockito.Mockito.doThrow;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.drools.pooling.state.State;
import org.onap.policy.drools.system.PolicyController;
-public class PoolingManagerImplTest {
+class PoolingManagerImplTest {
protected static final long STD_HEARTBEAT_WAIT_MS = 10;
protected static final long STD_REACTIVATE_WAIT_MS = STD_HEARTBEAT_WAIT_MS + 1;
private static final Object DECODED_EVENT = new Object();
/**
- * Number of dmaap.publish() invocations that should be issued when the manager is
+ * Number of publish() invocations that should be issued when the manager is
* started.
*/
private static final int START_PUB = 1;
private PoolingProperties poolProps;
private ListeningController controller;
- private DmaapManager dmaap;
- private boolean gotDmaap;
+ private TopicMessageManager topicMessageManager;
+ private boolean gotManager;
private ScheduledThreadPoolExecutor sched;
private int schedCount;
private DroolsController drools;
*
* @throws Exception throws exception
*/
- @Before
+ @BeforeEach
public void setUp() throws Exception {
Properties plainProps = new Properties();
ser = new Serializer();
active = new CountDownLatch(1);
- dmaap = mock(DmaapManager.class);
- gotDmaap = false;
+ topicMessageManager = mock(TopicMessageManager.class);
+ gotManager = false;
controller = mock(ListeningController.class);
sched = mock(ScheduledThreadPoolExecutor.class);
schedCount = 0;
}
@Test
- public void testPoolingManagerImpl() throws Exception {
- assertTrue(gotDmaap);
+ void testPoolingManagerImpl() {
+ assertTrue(gotManager);
State st = mgr.getCurrent();
- assertTrue(st instanceof IdleState);
+ assertInstanceOf(IdleState.class, st);
// ensure the state is attached to the manager
assertEquals(mgr.getHost(), st.getHost());
}
@Test
- public void testPoolingManagerImpl_PoolEx() throws PoolingFeatureException {
- // throw an exception when we try to create the dmaap manager
+ void testPoolingManagerImpl_PoolEx() {
+ // throw an exception when we try to create the topic messages manager
PoolingFeatureException ex = new PoolingFeatureException();
assertThatThrownBy(() -> new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
@Override
- protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
+ protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
throw ex;
}
}).isInstanceOf(PoolingFeatureRtException.class).hasCause(ex);
}
@Test
- public void testGetCurrent() throws Exception {
+ void testGetCurrent() throws Exception {
assertEquals(IdleState.class, mgr.getCurrent().getClass());
startMgr();
}
@Test
- public void testGetHost() {
+ void testGetHost() {
assertEquals(MY_HOST, mgr.getHost());
mgr = new PoolingManagerTest(HOST2, controller, poolProps, active);
}
@Test
- public void testGetTopic() {
+ void testGetTopic() {
assertEquals(MY_TOPIC, mgr.getTopic());
}
@Test
- public void testGetProperties() {
+ void testGetProperties() {
assertEquals(poolProps, mgr.getProperties());
}
@Test
- public void testBeforeStart() throws Exception {
+ void testBeforeStart() {
// not running yet
mgr.beforeStart();
- verify(dmaap).startPublisher();
+ verify(topicMessageManager).startPublisher();
assertEquals(1, schedCount);
verify(sched).setMaximumPoolSize(1);
// try again - nothing should happen
mgr.beforeStart();
- verify(dmaap).startPublisher();
+ verify(topicMessageManager).startPublisher();
assertEquals(1, schedCount);
verify(sched).setMaximumPoolSize(1);
}
@Test
- public void testAfterStart() throws Exception {
+ void testAfterStart() throws Exception {
startMgr();
- verify(dmaap).startConsumer(mgr);
+ verify(topicMessageManager).startConsumer(mgr);
State st = mgr.getCurrent();
- assertTrue(st instanceof StartState);
+ assertInstanceOf(StartState.class, st);
// ensure the state is attached to the manager
assertEquals(mgr.getHost(), st.getHost());
// already started - nothing else happens
mgr.afterStart();
- verify(dmaap).startConsumer(mgr);
+ verify(topicMessageManager).startConsumer(mgr);
- assertTrue(mgr.getCurrent() instanceof StartState);
+ assertInstanceOf(StartState.class, mgr.getCurrent());
verify(sched).schedule(any(Runnable.class), any(Long.class), any(TimeUnit.class));
}
@Test
- public void testBeforeStop() throws Exception {
+ void testBeforeStop() throws Exception {
startMgr();
mgr.startDistributing(makeAssignments(false));
- verify(dmaap, times(START_PUB)).publish(any());
+ verify(topicMessageManager, times(START_PUB)).publish(any());
mgr.beforeStop();
- verify(dmaap).stopConsumer(mgr);
+ verify(topicMessageManager).stopConsumer(mgr);
verify(sched).shutdownNow();
- verify(dmaap, times(START_PUB + 1)).publish(any());
- verify(dmaap).publish(contains("offline"));
+ verify(topicMessageManager, times(START_PUB + 1)).publish(any());
+ verify(topicMessageManager).publish(contains("offline"));
- assertTrue(mgr.getCurrent() instanceof IdleState);
+ assertInstanceOf(IdleState.class, mgr.getCurrent());
// verify that next message is handled locally
assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
- verify(dmaap, times(START_PUB + 1)).publish(any());
+ verify(topicMessageManager, times(START_PUB + 1)).publish(any());
}
@Test
- public void testBeforeStop_NotRunning() throws Exception {
+ void testBeforeStop_NotRunning() {
final State st = mgr.getCurrent();
mgr.beforeStop();
- verify(dmaap, never()).stopConsumer(any());
+ verify(topicMessageManager, never()).stopConsumer(any());
verify(sched, never()).shutdownNow();
// hasn't changed states either
}
@Test
- public void testBeforeStop_AfterPartialStart() throws Exception {
+ void testBeforeStop_AfterPartialStart() {
// call beforeStart but not afterStart
mgr.beforeStart();
// should still shut the scheduler down
verify(sched).shutdownNow();
- verify(dmaap, never()).stopConsumer(any());
+ verify(topicMessageManager, never()).stopConsumer(any());
// hasn't changed states
assertEquals(st, mgr.getCurrent());
}
@Test
- public void testAfterStop() throws Exception {
+ void testAfterStop() throws Exception {
startMgr();
mgr.beforeStop();
mgr.afterStop();
- verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS);
+ verify(topicMessageManager).stopPublisher(STD_OFFLINE_PUB_WAIT_MS);
}
@Test
- public void testBeforeLock() throws Exception {
+ void testBeforeLock() throws Exception {
startMgr();
mgr.beforeLock();
- assertTrue(mgr.getCurrent() instanceof IdleState);
+ assertInstanceOf(IdleState.class, mgr.getCurrent());
}
@Test
- public void testAfterUnlock_AliveIdle() throws Exception {
+ void testAfterUnlock_AliveIdle() {
// this really shouldn't happen
lockMgr();
mgr.afterUnlock();
// stays in idle state, because it has no scheduler
- assertTrue(mgr.getCurrent() instanceof IdleState);
+ assertInstanceOf(IdleState.class, mgr.getCurrent());
}
@Test
- public void testAfterUnlock_AliveStarted() throws Exception {
+ void testAfterUnlock_AliveStarted() throws Exception {
startMgr();
lockMgr();
mgr.afterUnlock();
- assertTrue(mgr.getCurrent() instanceof StartState);
+ assertInstanceOf(StartState.class, mgr.getCurrent());
}
@Test
- public void testAfterUnlock_StoppedIdle() throws Exception {
+ void testAfterUnlock_StoppedIdle() throws Exception {
startMgr();
lockMgr();
mgr.afterUnlock();
- assertTrue(mgr.getCurrent() instanceof IdleState);
+ assertInstanceOf(IdleState.class, mgr.getCurrent());
}
@Test
- public void testAfterUnlock_StoppedStarted() throws Exception {
+ void testAfterUnlock_StoppedStarted() throws Exception {
startMgr();
// Note: don't lockMgr()
mgr.afterUnlock();
- assertTrue(mgr.getCurrent() instanceof StartState);
+ assertInstanceOf(StartState.class, mgr.getCurrent());
}
@Test
- public void testChangeState() throws Exception {
+ void testChangeState() throws Exception {
// start should invoke changeState()
startMgr();
}
@Test
- public void testSchedule() throws Exception {
+ void testSchedule() throws Exception {
// must start the scheduler
startMgr();
}
@Test
- public void testScheduleWithFixedDelay() throws Exception {
+ void testScheduleWithFixedDelay() throws Exception {
// must start the scheduler
startMgr();
}
@Test
- public void testPublishAdmin() throws Exception {
+ void testPublishAdmin() throws Exception {
Offline msg = new Offline(mgr.getHost());
mgr.publishAdmin(msg);
assertEquals(Message.ADMIN, msg.getChannel());
- verify(dmaap).publish(any());
+ verify(topicMessageManager).publish(any());
}
@Test
- public void testPublish() throws Exception {
+ void testPublish() throws Exception {
Offline msg = new Offline(mgr.getHost());
mgr.publish("my.channel", msg);
assertEquals("my.channel", msg.getChannel());
- verify(dmaap).publish(any());
+ verify(topicMessageManager).publish(any());
}
@Test
- public void testPublish_InvalidMsg() throws Exception {
+ void testPublish_InvalidMsg() throws Exception {
// message is missing data
mgr.publish(Message.ADMIN, new Offline());
// should not have attempted to publish it
- verify(dmaap, never()).publish(any());
+ verify(topicMessageManager, never()).publish(any());
}
@Test
- public void testPublish_DmaapEx() throws Exception {
+ void testPublish_TopicMessageMngEx() throws Exception {
// generate exception
- doThrow(new PoolingFeatureException()).when(dmaap).publish(any());
+ doThrow(new PoolingFeatureException()).when(topicMessageManager).publish(any());
assertThatCode(() -> mgr.publish(Message.ADMIN, new Offline(mgr.getHost()))).doesNotThrowAnyException();
}
@Test
- public void testOnTopicEvent() throws Exception {
+ void testOnTopicEvent() throws Exception {
startMgr();
StartState st = (StartState) mgr.getCurrent();
mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
- assertTrue(mgr.getCurrent() instanceof QueryState);
+ assertInstanceOf(QueryState.class, mgr.getCurrent());
}
@Test
- public void testOnTopicEvent_NullEvent() throws Exception {
+ void testOnTopicEvent_NullEvent() throws Exception {
startMgr();
assertThatCode(() -> mgr.onTopicEvent(CommInfrastructure.UEB, TOPIC2, null)).doesNotThrowAnyException();
}
@Test
- public void testBeforeOffer_Unlocked() throws Exception {
+ void testBeforeOffer_Unlocked() throws Exception {
startMgr();
// route the message to another host
}
@Test
- public void testBeforeOffer_Locked() throws Exception {
+ void testBeforeOffer_Locked() throws Exception {
startMgr();
lockMgr();
}
@Test
- public void testBeforeInsert() throws Exception {
+ void testBeforeInsert() throws Exception {
startMgr();
lockMgr();
}
@Test
- public void testHandleExternalCommInfrastructureStringStringString_NullReqId() throws Exception {
+ void testHandleExternalCommInfrastructureStringStringString_NullReqId() throws Exception {
validateHandleReqId(null);
}
@Test
- public void testHandleExternalCommInfrastructureStringStringString_EmptyReqId() throws Exception {
+ void testHandleExternalCommInfrastructureStringStringString_EmptyReqId() throws Exception {
validateHandleReqId("");
}
@Test
- public void testHandleExternalCommInfrastructureStringStringString_InvalidMsg() throws Exception {
+ void testHandleExternalCommInfrastructureStringStringString_InvalidMsg() throws Exception {
startMgr();
assertFalse(mgr.beforeInsert(TOPIC2, "invalid message"));
}
@Test
- public void testHandleExternalCommInfrastructureStringStringString() throws Exception {
+ void testHandleExternalCommInfrastructureStringStringString() throws Exception {
validateUnhandled();
}
@Test
- public void testHandleExternalForward_NoAssignments() throws Exception {
+ void testHandleExternalForward_NoAssignments() throws Exception {
validateUnhandled();
}
@Test
- public void testHandleExternalForward() throws Exception {
+ void testHandleExternalForward() throws Exception {
validateNoForward();
}
@Test
- public void testHandleEvent_NullTarget() throws Exception {
+ void testHandleEvent_NullTarget() throws Exception {
// buckets have null targets
validateDiscarded(new BucketAssignments(new String[] {null, null}));
}
@Test
- public void testHandleEvent_SameHost() throws Exception {
+ void testHandleEvent_SameHost() throws Exception {
validateNoForward();
}
@Test
- public void testHandleEvent_DiffHost() throws Exception {
+ void testHandleEvent_DiffHost() throws Exception {
// route the message to the *OTHER* host
validateDiscarded(makeAssignments(false));
}
@Test
- public void testDecodeEvent_CannotDecode() throws Exception {
+ void testDecodeEvent_CannotDecode() throws Exception {
mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
@Override
}
@Test
- public void testDecodeEvent_UnsuppEx() throws Exception {
+ void testDecodeEvent_UnsuppEx() throws Exception {
// generate exception
mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
}
@Test
- public void testDecodeEvent_ArgEx() throws Exception {
+ void testDecodeEvent_ArgEx() throws Exception {
// generate exception
mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
@Override
}
@Test
- public void testDecodeEvent_StateEx() throws Exception {
+ void testDecodeEvent_StateEx() throws Exception {
// generate exception
mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
@Override
}
@Test
- public void testDecodeEvent() throws Exception {
+ void testDecodeEvent() throws Exception {
startMgr();
when(controller.isLocked()).thenReturn(true);
}
@Test
- public void testHandleInternal() throws Exception {
+ void testHandleInternal() throws Exception {
startMgr();
StartState st = (StartState) mgr.getCurrent();
mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
- assertTrue(mgr.getCurrent() instanceof QueryState);
+ assertInstanceOf(QueryState.class, mgr.getCurrent());
}
@Test
- public void testHandleInternal_IoEx() throws Exception {
+ void testHandleInternal_IoEx() throws Exception {
startMgr();
mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, "invalid message");
- assertTrue(mgr.getCurrent() instanceof StartState);
+ assertInstanceOf(StartState.class, mgr.getCurrent());
}
@Test
- public void testHandleInternal_PoolEx() throws Exception {
+ void testHandleInternal_PoolEx() throws Exception {
startMgr();
StartState st = (StartState) mgr.getCurrent();
mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
- assertTrue(mgr.getCurrent() instanceof StartState);
+ assertInstanceOf(StartState.class, mgr.getCurrent());
}
@Test
- public void testStartDistributing() throws Exception {
+ void testStartDistributing() throws Exception {
validateNoForward();
// null assignments should cause message to be processed locally
mgr.startDistributing(null);
assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
- verify(dmaap, times(START_PUB)).publish(any());
+ verify(topicMessageManager, times(START_PUB)).publish(any());
// message for this host
}
@Test
- public void testGoStart() {
+ void testGoStart() {
State st = mgr.goStart();
- assertTrue(st instanceof StartState);
+ assertInstanceOf(StartState.class, st);
assertEquals(mgr.getHost(), st.getHost());
}
@Test
- public void testGoQuery() {
+ void testGoQuery() {
BucketAssignments asgn = new BucketAssignments(new String[] {HOST2});
mgr.startDistributing(asgn);
State st = mgr.goQuery();
- assertTrue(st instanceof QueryState);
+ assertInstanceOf(QueryState.class, st);
assertEquals(mgr.getHost(), st.getHost());
assertEquals(asgn, mgr.getAssignments());
}
@Test
- public void testGoActive() {
+ void testGoActive() {
BucketAssignments asgn = new BucketAssignments(new String[] {HOST2});
mgr.startDistributing(asgn);
State st = mgr.goActive();
- assertTrue(st instanceof ActiveState);
+ assertInstanceOf(ActiveState.class, st);
assertEquals(mgr.getHost(), st.getHost());
assertEquals(asgn, mgr.getAssignments());
assertEquals(0, active.getCount());
}
@Test
- public void testGoInactive() {
+ void testGoInactive() {
State st = mgr.goInactive();
- assertTrue(st instanceof InactiveState);
+ assertInstanceOf(InactiveState.class, st);
assertEquals(mgr.getHost(), st.getHost());
assertEquals(1, active.getCount());
}
@Test
- public void testTimerActionRun() throws Exception {
+ void testTimerActionRun() throws Exception {
// must start the scheduler
startMgr();
}
@Test
- public void testTimerActionRun_DiffState() throws Exception {
+ void testTimerActionRun_DiffState() throws Exception {
// must start the scheduler
startMgr();
mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
- assertTrue(mgr.getCurrent() instanceof QueryState);
+ assertInstanceOf(QueryState.class, mgr.getCurrent());
// execute it
taskCap.getValue().run();
assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
- verify(dmaap, times(START_PUB)).publish(any());
+ verify(topicMessageManager, times(START_PUB)).publish(any());
}
private void validateUnhandled() throws PoolingFeatureException {
/**
* Makes an assignment with two buckets.
*
- * @param sameHost {@code true} if the {@link #REQUEST_ID} should hash to the
+ * @param sameHost {@code true} if the REQUEST_ID should hash to the
* manager's bucket, {@code false} if it should hash to the other host's bucket
* @return a new bucket assignment
*/
/**
* Invokes methods necessary to start the manager.
*
- * @throws PoolingFeatureException if an error occurs
*/
- private void startMgr() throws PoolingFeatureException {
+ private void startMgr() {
mgr.beforeStart();
mgr.afterStart();
}
}
@Override
- protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
- gotDmaap = true;
- return dmaap;
+ protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
+ gotManager = true;
+ return topicMessageManager;
}
@Override