Removing deprecated DMAAP library
[policy/drools-pdp.git] / feature-pooling-messages / src / test / java / org / onap / policy / drools / pooling / PoolingManagerImplTest.java
@@ -3,6 +3,7 @@
  * ONAP
  * ================================================================================
  * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -22,9 +23,10 @@ package org.onap.policy.drools.pooling;
 
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.contains;
 import static org.mockito.Mockito.doThrow;
@@ -41,8 +43,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
 import org.onap.policy.common.endpoints.event.comm.TopicListener;
@@ -59,7 +61,7 @@ import org.onap.policy.drools.pooling.state.StartState;
 import org.onap.policy.drools.pooling.state.State;
 import org.onap.policy.drools.system.PolicyController;
 
-public class PoolingManagerImplTest {
+class PoolingManagerImplTest {
 
     protected static final long STD_HEARTBEAT_WAIT_MS = 10;
     protected static final long STD_REACTIVATE_WAIT_MS = STD_HEARTBEAT_WAIT_MS + 1;
@@ -81,7 +83,7 @@ public class PoolingManagerImplTest {
     private static final Object DECODED_EVENT = new Object();
 
     /**
-     * Number of dmaap.publish() invocations that should be issued when the manager is
+     * Number of publish() invocations that should be issued when the manager is
      * started.
      */
     private static final int START_PUB = 1;
@@ -93,8 +95,8 @@ public class PoolingManagerImplTest {
 
     private PoolingProperties poolProps;
     private ListeningController controller;
-    private DmaapManager dmaap;
-    private boolean gotDmaap;
+    private TopicMessageManager topicMessageManager;
+    private boolean gotManager;
     private ScheduledThreadPoolExecutor sched;
     private int schedCount;
     private DroolsController drools;
@@ -108,7 +110,7 @@ public class PoolingManagerImplTest {
      *
      * @throws Exception throws exception
      */
-    @Before
+    @BeforeEach
     public void setUp() throws Exception {
         Properties plainProps = new Properties();
 
@@ -126,8 +128,8 @@ public class PoolingManagerImplTest {
         ser = new Serializer();
         active = new CountDownLatch(1);
 
-        dmaap = mock(DmaapManager.class);
-        gotDmaap = false;
+        topicMessageManager = mock(TopicMessageManager.class);
+        gotManager = false;
         controller = mock(ListeningController.class);
         sched = mock(ScheduledThreadPoolExecutor.class);
         schedCount = 0;
@@ -156,31 +158,31 @@ public class PoolingManagerImplTest {
     }
 
     @Test
-    public void testPoolingManagerImpl() throws Exception {
-        assertTrue(gotDmaap);
+    void testPoolingManagerImpl() {
+        assertTrue(gotManager);
 
         State st = mgr.getCurrent();
-        assertTrue(st instanceof IdleState);
+        assertInstanceOf(IdleState.class, st);
 
         // ensure the state is attached to the manager
         assertEquals(mgr.getHost(), st.getHost());
     }
 
     @Test
-    public void testPoolingManagerImpl_PoolEx() throws PoolingFeatureException {
-        // throw an exception when we try to create the dmaap manager
+    void testPoolingManagerImpl_PoolEx() {
+        // throw an exception when we try to create the topic messages manager
         PoolingFeatureException ex = new PoolingFeatureException();
 
         assertThatThrownBy(() -> new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
             @Override
-            protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
+            protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
                 throw ex;
             }
         }).isInstanceOf(PoolingFeatureRtException.class).hasCause(ex);
     }
 
     @Test
-    public void testGetCurrent() throws Exception {
+    void testGetCurrent() throws Exception {
         assertEquals(IdleState.class, mgr.getCurrent().getClass());
 
         startMgr();
@@ -189,7 +191,7 @@ public class PoolingManagerImplTest {
     }
 
     @Test
-    public void testGetHost() {
+    void testGetHost() {
         assertEquals(MY_HOST, mgr.getHost());
 
         mgr = new PoolingManagerTest(HOST2, controller, poolProps, active);
@@ -197,21 +199,21 @@ public class PoolingManagerImplTest {
     }
 
     @Test
-    public void testGetTopic() {
+    void testGetTopic() {
         assertEquals(MY_TOPIC, mgr.getTopic());
     }
 
     @Test
-    public void testGetProperties() {
+    void testGetProperties() {
         assertEquals(poolProps, mgr.getProperties());
     }
 
     @Test
-    public void testBeforeStart() throws Exception {
+    void testBeforeStart() {
         // not running yet
         mgr.beforeStart();
 
-        verify(dmaap).startPublisher();
+        verify(topicMessageManager).startPublisher();
 
         assertEquals(1, schedCount);
         verify(sched).setMaximumPoolSize(1);
@@ -221,7 +223,7 @@ public class PoolingManagerImplTest {
         // try again - nothing should happen
         mgr.beforeStart();
 
-        verify(dmaap).startPublisher();
+        verify(topicMessageManager).startPublisher();
 
         assertEquals(1, schedCount);
         verify(sched).setMaximumPoolSize(1);
@@ -229,13 +231,13 @@ public class PoolingManagerImplTest {
     }
 
     @Test
-    public void testAfterStart() throws Exception {
+    void testAfterStart() throws Exception {
         startMgr();
 
-        verify(dmaap).startConsumer(mgr);
+        verify(topicMessageManager).startConsumer(mgr);
 
         State st = mgr.getCurrent();
-        assertTrue(st instanceof StartState);
+        assertInstanceOf(StartState.class, st);
 
         // ensure the state is attached to the manager
         assertEquals(mgr.getHost(), st.getHost());
@@ -251,41 +253,41 @@ public class PoolingManagerImplTest {
         // already started - nothing else happens
         mgr.afterStart();
 
-        verify(dmaap).startConsumer(mgr);
+        verify(topicMessageManager).startConsumer(mgr);
 
-        assertTrue(mgr.getCurrent() instanceof StartState);
+        assertInstanceOf(StartState.class, mgr.getCurrent());
 
         verify(sched).schedule(any(Runnable.class), any(Long.class), any(TimeUnit.class));
     }
 
     @Test
-    public void testBeforeStop() throws Exception {
+    void testBeforeStop() throws Exception {
         startMgr();
         mgr.startDistributing(makeAssignments(false));
 
-        verify(dmaap, times(START_PUB)).publish(any());
+        verify(topicMessageManager, times(START_PUB)).publish(any());
 
         mgr.beforeStop();
 
-        verify(dmaap).stopConsumer(mgr);
+        verify(topicMessageManager).stopConsumer(mgr);
         verify(sched).shutdownNow();
-        verify(dmaap, times(START_PUB + 1)).publish(any());
-        verify(dmaap).publish(contains("offline"));
+        verify(topicMessageManager, times(START_PUB + 1)).publish(any());
+        verify(topicMessageManager).publish(contains("offline"));
 
-        assertTrue(mgr.getCurrent() instanceof IdleState);
+        assertInstanceOf(IdleState.class, mgr.getCurrent());
 
         // verify that next message is handled locally
         assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
-        verify(dmaap, times(START_PUB + 1)).publish(any());
+        verify(topicMessageManager, times(START_PUB + 1)).publish(any());
     }
 
     @Test
-    public void testBeforeStop_NotRunning() throws Exception {
+    void testBeforeStop_NotRunning() {
         final State st = mgr.getCurrent();
 
         mgr.beforeStop();
 
-        verify(dmaap, never()).stopConsumer(any());
+        verify(topicMessageManager, never()).stopConsumer(any());
         verify(sched, never()).shutdownNow();
 
         // hasn't changed states either
@@ -293,7 +295,7 @@ public class PoolingManagerImplTest {
     }
 
     @Test
-    public void testBeforeStop_AfterPartialStart() throws Exception {
+    void testBeforeStop_AfterPartialStart() {
         // call beforeStart but not afterStart
         mgr.beforeStart();
 
@@ -304,33 +306,33 @@ public class PoolingManagerImplTest {
         // should still shut the scheduler down
         verify(sched).shutdownNow();
 
-        verify(dmaap, never()).stopConsumer(any());
+        verify(topicMessageManager, never()).stopConsumer(any());
 
         // hasn't changed states
         assertEquals(st, mgr.getCurrent());
     }
 
     @Test
-    public void testAfterStop() throws Exception {
+    void testAfterStop() throws Exception {
         startMgr();
         mgr.beforeStop();
 
         mgr.afterStop();
 
-        verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS);
+        verify(topicMessageManager).stopPublisher(STD_OFFLINE_PUB_WAIT_MS);
     }
 
     @Test
-    public void testBeforeLock() throws Exception {
+    void testBeforeLock() throws Exception {
         startMgr();
 
         mgr.beforeLock();
 
-        assertTrue(mgr.getCurrent() instanceof IdleState);
+        assertInstanceOf(IdleState.class, mgr.getCurrent());
     }
 
     @Test
-    public void testAfterUnlock_AliveIdle() throws Exception {
+    void testAfterUnlock_AliveIdle() {
         // this really shouldn't happen
 
         lockMgr();
@@ -338,21 +340,21 @@ public class PoolingManagerImplTest {
         mgr.afterUnlock();
 
         // stays in idle state, because it has no scheduler
-        assertTrue(mgr.getCurrent() instanceof IdleState);
+        assertInstanceOf(IdleState.class, mgr.getCurrent());
     }
 
     @Test
-    public void testAfterUnlock_AliveStarted() throws Exception {
+    void testAfterUnlock_AliveStarted() throws Exception {
         startMgr();
         lockMgr();
 
         mgr.afterUnlock();
 
-        assertTrue(mgr.getCurrent() instanceof StartState);
+        assertInstanceOf(StartState.class, mgr.getCurrent());
     }
 
     @Test
-    public void testAfterUnlock_StoppedIdle() throws Exception {
+    void testAfterUnlock_StoppedIdle() throws Exception {
         startMgr();
         lockMgr();
 
@@ -361,11 +363,11 @@ public class PoolingManagerImplTest {
 
         mgr.afterUnlock();
 
-        assertTrue(mgr.getCurrent() instanceof IdleState);
+        assertInstanceOf(IdleState.class, mgr.getCurrent());
     }
 
     @Test
-    public void testAfterUnlock_StoppedStarted() throws Exception {
+    void testAfterUnlock_StoppedStarted() throws Exception {
         startMgr();
 
         // Note: don't lockMgr()
@@ -375,11 +377,11 @@ public class PoolingManagerImplTest {
 
         mgr.afterUnlock();
 
-        assertTrue(mgr.getCurrent() instanceof StartState);
+        assertInstanceOf(StartState.class, mgr.getCurrent());
     }
 
     @Test
-    public void testChangeState() throws Exception {
+    void testChangeState() throws Exception {
         // start should invoke changeState()
         startMgr();
 
@@ -405,7 +407,7 @@ public class PoolingManagerImplTest {
     }
 
     @Test
-    public void testSchedule() throws Exception {
+    void testSchedule() throws Exception {
         // must start the scheduler
         startMgr();
 
@@ -433,7 +435,7 @@ public class PoolingManagerImplTest {
     }
 
     @Test
-    public void testScheduleWithFixedDelay() throws Exception {
+    void testScheduleWithFixedDelay() throws Exception {
         // must start the scheduler
         startMgr();
 
@@ -464,45 +466,45 @@ public class PoolingManagerImplTest {
     }
 
     @Test
-    public void testPublishAdmin() throws Exception {
+    void testPublishAdmin() throws Exception {
         Offline msg = new Offline(mgr.getHost());
         mgr.publishAdmin(msg);
 
         assertEquals(Message.ADMIN, msg.getChannel());
 
-        verify(dmaap).publish(any());
+        verify(topicMessageManager).publish(any());
     }
 
     @Test
-    public void testPublish() throws Exception {
+    void testPublish() throws Exception {
         Offline msg = new Offline(mgr.getHost());
         mgr.publish("my.channel", msg);
 
         assertEquals("my.channel", msg.getChannel());
 
-        verify(dmaap).publish(any());
+        verify(topicMessageManager).publish(any());
     }
 
     @Test
-    public void testPublish_InvalidMsg() throws Exception {
+    void testPublish_InvalidMsg() throws Exception {
         // message is missing data
         mgr.publish(Message.ADMIN, new Offline());
 
         // should not have attempted to publish it
-        verify(dmaap, never()).publish(any());
+        verify(topicMessageManager, never()).publish(any());
     }
 
     @Test
-    public void testPublish_DmaapEx() throws Exception {
+    void testPublish_TopicMessageMngEx() throws Exception {
 
         // generate exception
-        doThrow(new PoolingFeatureException()).when(dmaap).publish(any());
+        doThrow(new PoolingFeatureException()).when(topicMessageManager).publish(any());
 
         assertThatCode(() -> mgr.publish(Message.ADMIN, new Offline(mgr.getHost()))).doesNotThrowAnyException();
     }
 
     @Test
-    public void testOnTopicEvent() throws Exception {
+    void testOnTopicEvent() throws Exception {
         startMgr();
 
         StartState st = (StartState) mgr.getCurrent();
@@ -517,18 +519,18 @@ public class PoolingManagerImplTest {
 
         mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
 
-        assertTrue(mgr.getCurrent() instanceof QueryState);
+        assertInstanceOf(QueryState.class, mgr.getCurrent());
     }
 
     @Test
-    public void testOnTopicEvent_NullEvent() throws Exception {
+    void testOnTopicEvent_NullEvent() throws Exception {
         startMgr();
 
         assertThatCode(() -> mgr.onTopicEvent(CommInfrastructure.UEB, TOPIC2, null)).doesNotThrowAnyException();
     }
 
     @Test
-    public void testBeforeOffer_Unlocked() throws Exception {
+    void testBeforeOffer_Unlocked() throws Exception {
         startMgr();
 
         // route the message to another host
@@ -538,7 +540,7 @@ public class PoolingManagerImplTest {
     }
 
     @Test
-    public void testBeforeOffer_Locked() throws Exception {
+    void testBeforeOffer_Locked() throws Exception {
         startMgr();
         lockMgr();
 
@@ -549,7 +551,7 @@ public class PoolingManagerImplTest {
     }
 
     @Test
-    public void testBeforeInsert() throws Exception {
+    void testBeforeInsert() throws Exception {
         startMgr();
         lockMgr();
 
@@ -560,56 +562,56 @@ public class PoolingManagerImplTest {
     }
 
     @Test
-    public void testHandleExternalCommInfrastructureStringStringString_NullReqId() throws Exception {
+    void testHandleExternalCommInfrastructureStringStringString_NullReqId() throws Exception {
         validateHandleReqId(null);
     }
 
     @Test
-    public void testHandleExternalCommInfrastructureStringStringString_EmptyReqId() throws Exception {
+    void testHandleExternalCommInfrastructureStringStringString_EmptyReqId() throws Exception {
         validateHandleReqId("");
     }
 
     @Test
-    public void testHandleExternalCommInfrastructureStringStringString_InvalidMsg() throws Exception {
+    void testHandleExternalCommInfrastructureStringStringString_InvalidMsg() throws Exception {
         startMgr();
 
         assertFalse(mgr.beforeInsert(TOPIC2, "invalid message"));
     }
 
     @Test
-    public void testHandleExternalCommInfrastructureStringStringString() throws Exception {
+    void testHandleExternalCommInfrastructureStringStringString() throws Exception {
         validateUnhandled();
     }
 
     @Test
-    public void testHandleExternalForward_NoAssignments() throws Exception {
+    void testHandleExternalForward_NoAssignments() throws Exception {
         validateUnhandled();
     }
 
     @Test
-    public void testHandleExternalForward() throws Exception {
+    void testHandleExternalForward() throws Exception {
         validateNoForward();
     }
 
     @Test
-    public void testHandleEvent_NullTarget() throws Exception {
+    void testHandleEvent_NullTarget() throws Exception {
         // buckets have null targets
         validateDiscarded(new BucketAssignments(new String[] {null, null}));
     }
 
     @Test
-    public void testHandleEvent_SameHost() throws Exception {
+    void testHandleEvent_SameHost() throws Exception {
         validateNoForward();
     }
 
     @Test
-    public void testHandleEvent_DiffHost() throws Exception {
+    void testHandleEvent_DiffHost() throws Exception {
         // route the message to the *OTHER* host
         validateDiscarded(makeAssignments(false));
     }
 
     @Test
-    public void testDecodeEvent_CannotDecode() throws Exception {
+    void testDecodeEvent_CannotDecode() throws Exception {
 
         mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
             @Override
@@ -629,7 +631,7 @@ public class PoolingManagerImplTest {
     }
 
     @Test
-    public void testDecodeEvent_UnsuppEx() throws Exception {
+    void testDecodeEvent_UnsuppEx() throws Exception {
 
         // generate exception
         mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
@@ -650,7 +652,7 @@ public class PoolingManagerImplTest {
     }
 
     @Test
-    public void testDecodeEvent_ArgEx() throws Exception {
+    void testDecodeEvent_ArgEx() throws Exception {
         // generate exception
         mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
             @Override
@@ -670,7 +672,7 @@ public class PoolingManagerImplTest {
     }
 
     @Test
-    public void testDecodeEvent_StateEx() throws Exception {
+    void testDecodeEvent_StateEx() throws Exception {
         // generate exception
         mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
             @Override
@@ -690,7 +692,7 @@ public class PoolingManagerImplTest {
     }
 
     @Test
-    public void testDecodeEvent() throws Exception {
+    void testDecodeEvent() throws Exception {
         startMgr();
 
         when(controller.isLocked()).thenReturn(true);
@@ -702,7 +704,7 @@ public class PoolingManagerImplTest {
     }
 
     @Test
-    public void testHandleInternal() throws Exception {
+    void testHandleInternal() throws Exception {
         startMgr();
 
         StartState st = (StartState) mgr.getCurrent();
@@ -717,20 +719,20 @@ public class PoolingManagerImplTest {
 
         mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
 
-        assertTrue(mgr.getCurrent() instanceof QueryState);
+        assertInstanceOf(QueryState.class, mgr.getCurrent());
     }
 
     @Test
-    public void testHandleInternal_IoEx() throws Exception {
+    void testHandleInternal_IoEx() throws Exception {
         startMgr();
 
         mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, "invalid message");
 
-        assertTrue(mgr.getCurrent() instanceof StartState);
+        assertInstanceOf(StartState.class, mgr.getCurrent());
     }
 
     @Test
-    public void testHandleInternal_PoolEx() throws Exception {
+    void testHandleInternal_PoolEx() throws Exception {
         startMgr();
 
         StartState st = (StartState) mgr.getCurrent();
@@ -746,18 +748,18 @@ public class PoolingManagerImplTest {
 
         mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
 
-        assertTrue(mgr.getCurrent() instanceof StartState);
+        assertInstanceOf(StartState.class, mgr.getCurrent());
     }
 
     @Test
-    public void testStartDistributing() throws Exception {
+    void testStartDistributing() throws Exception {
         validateNoForward();
 
 
         // null assignments should cause message to be processed locally
         mgr.startDistributing(null);
         assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
-        verify(dmaap, times(START_PUB)).publish(any());
+        verify(topicMessageManager, times(START_PUB)).publish(any());
 
 
         // message for this host
@@ -771,47 +773,47 @@ public class PoolingManagerImplTest {
     }
 
     @Test
-    public void testGoStart() {
+    void testGoStart() {
         State st = mgr.goStart();
-        assertTrue(st instanceof StartState);
+        assertInstanceOf(StartState.class, st);
         assertEquals(mgr.getHost(), st.getHost());
     }
 
     @Test
-    public void testGoQuery() {
+    void testGoQuery() {
         BucketAssignments asgn = new BucketAssignments(new String[] {HOST2});
         mgr.startDistributing(asgn);
 
         State st = mgr.goQuery();
 
-        assertTrue(st instanceof QueryState);
+        assertInstanceOf(QueryState.class, st);
         assertEquals(mgr.getHost(), st.getHost());
         assertEquals(asgn, mgr.getAssignments());
     }
 
     @Test
-    public void testGoActive() {
+    void testGoActive() {
         BucketAssignments asgn = new BucketAssignments(new String[] {HOST2});
         mgr.startDistributing(asgn);
 
         State st = mgr.goActive();
 
-        assertTrue(st instanceof ActiveState);
+        assertInstanceOf(ActiveState.class, st);
         assertEquals(mgr.getHost(), st.getHost());
         assertEquals(asgn, mgr.getAssignments());
         assertEquals(0, active.getCount());
     }
 
     @Test
-    public void testGoInactive() {
+    void testGoInactive() {
         State st = mgr.goInactive();
-        assertTrue(st instanceof InactiveState);
+        assertInstanceOf(InactiveState.class, st);
         assertEquals(mgr.getHost(), st.getHost());
         assertEquals(1, active.getCount());
     }
 
     @Test
-    public void testTimerActionRun() throws Exception {
+    void testTimerActionRun() throws Exception {
         // must start the scheduler
         startMgr();
 
@@ -834,7 +836,7 @@ public class PoolingManagerImplTest {
     }
 
     @Test
-    public void testTimerActionRun_DiffState() throws Exception {
+    void testTimerActionRun_DiffState() throws Exception {
         // must start the scheduler
         startMgr();
 
@@ -859,7 +861,7 @@ public class PoolingManagerImplTest {
 
         mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
 
-        assertTrue(mgr.getCurrent() instanceof QueryState);
+        assertInstanceOf(QueryState.class, mgr.getCurrent());
 
         // execute it
         taskCap.getValue().run();
@@ -882,7 +884,7 @@ public class PoolingManagerImplTest {
 
         assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
 
-        verify(dmaap, times(START_PUB)).publish(any());
+        verify(topicMessageManager, times(START_PUB)).publish(any());
     }
 
     private void validateUnhandled() throws PoolingFeatureException {
@@ -902,7 +904,7 @@ public class PoolingManagerImplTest {
     /**
      * Makes an assignment with two buckets.
      *
-     * @param sameHost {@code true} if the {@link #REQUEST_ID} should hash to the
+     * @param sameHost {@code true} if the REQUEST_ID should hash to the
      *        manager's bucket, {@code false} if it should hash to the other host's bucket
      * @return a new bucket assignment
      */
@@ -924,9 +926,8 @@ public class PoolingManagerImplTest {
     /**
      * Invokes methods necessary to start the manager.
      *
-     * @throws PoolingFeatureException if an error occurs
      */
-    private void startMgr() throws PoolingFeatureException {
+    private void startMgr() {
         mgr.beforeStart();
         mgr.afterStart();
     }
@@ -966,9 +967,9 @@ public class PoolingManagerImplTest {
         }
 
         @Override
-        protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
-            gotDmaap = true;
-            return dmaap;
+        protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
+            gotManager = true;
+            return topicMessageManager;
         }
 
         @Override