Add junit coverage to policy-common 61/68961/4
authorJim Hahn <jrh3@att.com>
Tue, 25 Sep 2018 15:29:16 +0000 (11:29 -0400)
committerJim Hahn <jrh3@att.com>
Wed, 26 Sep 2018 19:03:56 +0000 (15:03 -0400)
Added coverage tests for policy-endpoints.
Fixed new checkstyle warnings.
Use powermock version from parent pom.
Replaced literals with constants in new tests.
Moved test superclass higher up the class hierarchy so it can be re-used.
Removed powermock version.

Change-Id: I7d3d45132cd0973f4d02d3af320a1d53a1234e4d
Issue-ID: POLICY-1148
Signed-off-by: Jim Hahn <jrh3@att.com>
policy-endpoints/pom.xml
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicTestBase.java [new file with mode: 0644]
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java [new file with mode: 0644]
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisherTest.java [new file with mode: 0644]
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.java [new file with mode: 0644]
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java [new file with mode: 0644]
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBaseTest.java [new file with mode: 0644]

index a4ac93d..1ef859c 100644 (file)
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+        
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito</artifactId>
+            <scope>test</scope>
+        </dependency>
 
         <dependency>
             <groupId>com.openpojo</groupId>
index 690a6d0..746798f 100644 (file)
@@ -322,7 +322,7 @@ public interface BusConsumer {
                 logger.debug("DMaaP consumer received {} : {}" + response.getResponseCode(),
                         response.getResponseMessage());
 
-                if (response.getResponseCode() == null || !"200".equals(response.getResponseCode())) {
+                if (!"200".equals(response.getResponseCode())) {
 
                     logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
                             response.getResponseMessage());
@@ -443,8 +443,9 @@ public interface BusConsumer {
             super(busTopicParams);
 
 
-            final String dme2RouteOffer = busTopicParams.getAdditionalProps()
-                    .get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
+            final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
+                            ? busTopicParams.getAdditionalProps().get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY)
+                            : null);
 
             if (busTopicParams.isEnvironmentInvalid()) {
                 throw parmException(busTopicParams.getTopic(),
index 716ce95..3365b4e 100644 (file)
@@ -204,6 +204,8 @@ public interface BusPublisher {
                 this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
 
                 this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
+            } else {
+                throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol);
             }
 
             this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
@@ -361,12 +363,14 @@ public interface BusPublisher {
             props.setProperty("TransportType", "DME2");
             props.setProperty("MethodType", "POST");
 
-            for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
-                String key = entry.getKey();
-                String value = entry.getValue();
+            if (busTopicParams.isAdditionalPropsValid()) {
+                for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
+                    String key = entry.getKey();
+                    String value = entry.getValue();
 
-                if (value != null) {
-                    props.setProperty(key, value);
+                    if (value != null) {
+                        props.setProperty(key, value);
+                    }
                 }
             }
 
index b9463e8..b588d1f 100644 (file)
@@ -168,7 +168,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
                 try {
                     this.init();
                     this.alive = true;
-                    this.busPollerThread = new Thread(this);
+                    this.busPollerThread = makePollerThread();
                     this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
                     busPollerThread.start();
                 } catch (Exception e) {
@@ -181,6 +181,15 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
         return this.alive;
     }
 
+    /**
+     * Makes a new thread to be used for polling.
+     * 
+     * @return a new Thread
+     */
+    protected Thread makePollerThread() {
+        return new Thread(this);
+    }
+
     @Override
     public boolean stop() {
         logger.info("{}: stopping", this);
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicTestBase.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicTestBase.java
new file mode 100644 (file)
index 0000000..ba52f19
--- /dev/null
@@ -0,0 +1,121 @@
+/*
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder;
+
+/**
+ * Base class for BusTopicXxxTest classes.
+ */
+public class BusTopicTestBase {
+
+    public static final String MY_AFT_ENV = "my-aft-env";
+    public static final String MY_API_KEY = "my-api-key";
+    public static final String MY_API_SECRET = "my-api-secret";
+    public static final String MY_BASE_PATH = "my-base";
+    public static final String MY_CLIENT_NAME = "my-client";
+    public static final String MY_CONS_GROUP = "my-cons-group";
+    public static final String MY_CONS_INST = "my-cons-inst";
+    public static final String MY_ENV = "my-env";
+    public static final int MY_FETCH_LIMIT = 100;
+    public static final int MY_FETCH_TIMEOUT = 101;
+    public static final String MY_HOST = "my-host";
+    public static final String MY_LAT = "my-lat";
+    public static final String MY_LONG = "my-long";
+    public static final String MY_PARTNER = "my-partner";
+    public static final String MY_PASSWD = "my-pass";
+    public static final int MY_PORT = 102;
+    public static final String MY_TOPIC = "my-topic";
+    public static final String MY_USERNAME = "my-user";
+
+    public static final String MY_MESSAGE = "my-message";
+    public static final String MY_PARTITION = "my-partition";
+    public static final String MY_MESSAGE2 = "my-message-2";
+    public static final String MY_PARTITION2 = "my-partition-2";
+
+    public static final String ROUTE_PROP = "routeOffer";
+    public static final String MY_ROUTE = "my-route";
+
+    /**
+     * Message used within exceptions that are expected.
+     */
+    public static final String EXPECTED = "expected exception";
+
+    /**
+     * Additional properties to be added to the parameter builder.
+     */
+    protected Map<String, String> addProps;
+
+    /**
+     * Servers to be added to the parameter builder.
+     */
+    protected List<String> servers;
+
+    /**
+     * Parameter builder used to build topic parameters.
+     */
+    protected TopicParamsBuilder builder;
+
+    /**
+     * Initializes {@link #addProps}, {@link #servers}, and {@link #builder}.
+     */
+    public void setUp() {
+        addProps = new TreeMap<>();
+        addProps.put("my-key-A", "my-value-A");
+        addProps.put("my-key-B", "my-value-B");
+
+        servers = Arrays.asList("svra", "svrb");
+
+        builder = makeBuilder();
+    }
+
+    /**
+     * Makes a fully populated parameter builder.
+     * 
+     * @return a new parameter builder
+     */
+    public TopicParamsBuilder makeBuilder() {
+        return makeBuilder(addProps, servers);
+    }
+
+    /**
+     * Makes a fully populated parameter builder.
+     * 
+     * @param addProps additional properties to be added to the builder
+     * @param servers servers to be added to the builder
+     * @return a new parameter builder
+     */
+    public TopicParamsBuilder makeBuilder(Map<String, String> addProps, List<String> servers) {
+
+        return BusTopicParams.builder().additionalProps(addProps).aftEnvironment(MY_AFT_ENV).allowSelfSignedCerts(true)
+                        .apiKey(MY_API_KEY).apiSecret(MY_API_SECRET).basePath(MY_BASE_PATH).clientName(MY_CLIENT_NAME)
+                        .consumerGroup(MY_CONS_GROUP).consumerInstance(MY_CONS_INST).environment(MY_ENV)
+                        .fetchLimit(MY_FETCH_LIMIT).fetchTimeout(MY_FETCH_TIMEOUT).hostname(MY_HOST).latitude(MY_LAT)
+                        .longitude(MY_LONG).managed(true).partitionId(MY_PARTITION).partner(MY_PARTNER)
+                        .password(MY_PASSWD).port(MY_PORT).servers(servers).topic(MY_TOPIC).useHttps(true)
+                        .userName(MY_USERNAME);
+    }
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java
new file mode 100644 (file)
index 0000000..ef4d5a0
--- /dev/null
@@ -0,0 +1,236 @@
+/*
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus.internal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.att.aft.dme2.internal.apache.commons.collections.IteratorUtils;
+import com.att.nsa.cambria.client.CambriaConsumer;
+import com.att.nsa.mr.client.impl.MRConsumerImpl;
+import com.att.nsa.mr.client.response.MRConsumerResponse;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.bus.BusTopicTestBase;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.CambriaConsumerWrapper;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapAafConsumerWrapper;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper;
+import org.powermock.reflect.Whitebox;
+
+public class BusConsumerTest extends BusTopicTestBase {
+
+    @Before
+    public void setUp() {
+        super.setUp();
+    }
+
+    @Test
+    public void testCambriaConsumerWrapper() {
+        // verify that different wrappers can be built
+        new CambriaConsumerWrapper(makeBuilder().build());
+        new CambriaConsumerWrapper(makeBuilder().useHttps(false).build());
+        new CambriaConsumerWrapper(makeBuilder().useHttps(true).build());
+        new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build());
+        new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build());
+        new CambriaConsumerWrapper(makeBuilder().apiKey(null).build());
+        new CambriaConsumerWrapper(makeBuilder().apiSecret(null).build());
+        new CambriaConsumerWrapper(makeBuilder().apiKey(null).apiSecret(null).build());
+        new CambriaConsumerWrapper(makeBuilder().userName(null).build());
+        new CambriaConsumerWrapper(makeBuilder().password(null).build());
+        new CambriaConsumerWrapper(makeBuilder().userName(null).password(null).build());
+    }
+
+    @Test
+    public void testCambriaConsumerWrapperFetch() throws Exception {
+        CambriaConsumer inner = mock(CambriaConsumer.class);
+        List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
+        when(inner.fetch()).thenReturn(lst);
+
+        CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
+        Whitebox.setInternalState(cons, "consumer", inner);
+
+        assertEquals(lst, IteratorUtils.toList(cons.fetch().iterator()));
+
+        // arrange to throw exception next time fetch is called
+        IOException ex = new IOException(EXPECTED);
+        when(inner.fetch()).thenThrow(ex);
+
+        cons.fetchTimeout = 10;
+
+        try {
+            cons.fetch();
+            fail("missing exception");
+
+        } catch (IOException | InterruptedException e) {
+            assertEquals(ex, e);
+        }
+    }
+
+    @Test
+    public void testCambriaConsumerWrapperClose() throws Exception {
+        CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
+
+        // set filter several times to cause different branches of close() to be executed
+        for (int count = 0; count < 3; ++count) {
+            cons.close();
+            cons.setFilter("close=" + count);
+        }
+    }
+
+    @Test
+    public void testCambriaConsumerWrapperSetFilter() {
+        // set filter several times to cause different branches to be executed
+        CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
+        for (int count = 0; count < 3; ++count) {
+            cons.setFilter("set-filter=" + count);
+        }
+    }
+
+    @Test
+    public void testCambriaConsumerWrapperToString() {
+        assertNotNull(new CambriaConsumerWrapper(makeBuilder().build()).toString());
+    }
+
+    @Test
+    public void testDmaapConsumerWrapper() throws Exception {
+        // verify that different wrappers can be built
+        new DmaapAafConsumerWrapper(makeBuilder().build());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testDmaapConsumerWrapper_InvalidTopic() throws Exception {
+        new DmaapAafConsumerWrapper(makeBuilder().topic(null).build());
+    }
+
+    @Test
+    public void testDmaapConsumerWrapperFetch() throws Exception {
+        DmaapAafConsumerWrapper dmaap = new DmaapAafConsumerWrapper(makeBuilder().build());
+        MRConsumerImpl cons = mock(MRConsumerImpl.class);
+
+        dmaap.fetchTimeout = 5;
+        dmaap.consumer = cons;
+
+        // null return
+        when(cons.fetchWithReturnConsumerResponse()).thenReturn(null);
+        assertFalse(dmaap.fetch().iterator().hasNext());
+
+        // with messages, 200
+        List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
+        MRConsumerResponse resp = new MRConsumerResponse();
+        resp.setResponseCode("200");
+        resp.setActualMessages(lst);
+        when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
+
+        assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
+
+        // null messages
+        resp.setActualMessages(null);
+        when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
+
+        assertFalse(dmaap.fetch().iterator().hasNext());
+
+        // with messages, NOT 200
+        resp.setResponseCode("400");
+        resp.setActualMessages(lst);
+        when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
+
+        assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
+    }
+
+    @Test
+    public void testDmaapConsumerWrapperClose() throws Exception {
+        new DmaapAafConsumerWrapper(makeBuilder().build()).close();
+    }
+
+    @Test
+    public void testDmaapConsumerWrapperToString() throws Exception {
+        assertNotNull(new DmaapConsumerWrapper(makeBuilder().build()) {}.toString());
+    }
+
+    @Test
+    public void testDmaapAafConsumerWrapper() throws Exception {
+        // verify that different wrappers can be built
+        new DmaapAafConsumerWrapper(makeBuilder().useHttps(true).build());
+        new DmaapAafConsumerWrapper(makeBuilder().useHttps(false).build());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testDmaapAafConsumerWrapper_InvalidServers() throws Exception {
+        /*
+         * Unfortunately, the MR code intercepts this and throws an exception before the
+         * wrapper gets a chance to check it, thus this test does not improve the coverage
+         * for the constructor.
+         */
+        new DmaapAafConsumerWrapper(makeBuilder().servers(Collections.emptyList()).build());
+    }
+
+    @Test
+    public void testDmaapAafConsumerWrapperToString() throws Exception {
+        assertNotNull(new DmaapAafConsumerWrapper(makeBuilder().build()).toString());
+    }
+
+    @Test
+    public void testDmaapDmeConsumerWrapper() throws Exception {
+        // verify that different wrappers can be built
+        new DmaapDmeConsumerWrapper(makeBuilder().build());
+        new DmaapDmeConsumerWrapper(makeBuilder().useHttps(true).build());
+        new DmaapDmeConsumerWrapper(makeBuilder().useHttps(false).build());
+        new DmaapDmeConsumerWrapper(makeBuilder().additionalProps(null).build());
+
+        addProps.put(ROUTE_PROP, MY_ROUTE);
+        new DmaapDmeConsumerWrapper(makeBuilder().build());
+        new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testDmaapDmeConsumerWrapper_InvalidEnvironment() throws Exception {
+        new DmaapDmeConsumerWrapper(makeBuilder().environment(null).build());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testDmaapDmeConsumerWrapper_InvalidAft() throws Exception {
+        new DmaapDmeConsumerWrapper(makeBuilder().aftEnvironment(null).build());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testDmaapDmeConsumerWrapper_InvalidLat() throws Exception {
+        new DmaapDmeConsumerWrapper(makeBuilder().latitude(null).build());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testDmaapDmeConsumerWrapper_InvalidLong() throws Exception {
+        new DmaapDmeConsumerWrapper(makeBuilder().longitude(null).build());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testDmaapDmeConsumerWrapper_InvalidPartner() throws Exception {
+        new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build());
+    }
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisherTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisherTest.java
new file mode 100644 (file)
index 0000000..4e78b67
--- /dev/null
@@ -0,0 +1,217 @@
+/*
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus.internal;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
+import com.att.nsa.mr.client.response.MRPublisherResponse;
+import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.bus.BusTopicTestBase;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher.CambriaPublisherWrapper;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher.DmaapAafPublisherWrapper;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher.DmaapDmePublisherWrapper;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher.DmaapPublisherWrapper;
+
+public class BusPublisherTest extends BusTopicTestBase {
+
+    @Before
+    public void setUp() {
+        super.setUp();
+    }
+
+    @Test
+    public void testCambriaPublisherWrapper() {
+        // verify that different wrappers can be built
+        new CambriaPublisherWrapper(makeBuilder().build());
+        new CambriaPublisherWrapper(makeBuilder().useHttps(false).build());
+        new CambriaPublisherWrapper(makeBuilder().useHttps(true).build());
+        new CambriaPublisherWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build());
+        new CambriaPublisherWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build());
+        new CambriaPublisherWrapper(makeBuilder().apiKey(null).build());
+        new CambriaPublisherWrapper(makeBuilder().apiSecret(null).build());
+        new CambriaPublisherWrapper(makeBuilder().apiKey(null).apiSecret(null).build());
+        new CambriaPublisherWrapper(makeBuilder().userName(null).build());
+        new CambriaPublisherWrapper(makeBuilder().password(null).build());
+        new CambriaPublisherWrapper(makeBuilder().userName(null).password(null).build());
+    }
+
+    @Test
+    public void testCambriaPublisherWrapperSend() throws Exception {
+        CambriaBatchingPublisher pub = mock(CambriaBatchingPublisher.class);
+        CambriaPublisherWrapper cambria = new CambriaPublisherWrapper(makeBuilder().build());
+        cambria.publisher = pub;
+
+        assertTrue(cambria.send(MY_PARTITION, MY_MESSAGE));
+
+        // publisher exception
+        when(pub.send(anyString(), anyString())).thenThrow(new IOException(EXPECTED));
+        assertFalse(cambria.send(MY_PARTITION2, MY_MESSAGE2));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testCambriaPublisherWrapperSend_InvalidMsg() {
+        CambriaPublisherWrapper cambria = new CambriaPublisherWrapper(makeBuilder().build());
+        cambria.publisher = mock(CambriaBatchingPublisher.class);
+
+        cambria.send(MY_PARTITION, null);
+    }
+
+    @Test
+    public void testCambriaPublisherWrapperClose() throws Exception {
+        CambriaBatchingPublisher pub = mock(CambriaBatchingPublisher.class);
+        CambriaPublisherWrapper cambria = new CambriaPublisherWrapper(makeBuilder().build());
+        cambria.publisher = pub;
+
+        cambria.close();
+        verify(pub).close();
+
+        // try again, this time with an exception
+        doThrow(new RuntimeException(EXPECTED)).when(pub).close();
+        cambria.close();
+    }
+
+    @Test
+    public void testDmaapPublisherWrapper() {
+        // verify with different constructor arguments
+        new DmaapAafPublisherWrapper(servers, MY_TOPIC, MY_USERNAME, MY_PASSWD, true);
+        new DmaapAafPublisherWrapper(servers, MY_TOPIC, MY_USERNAME, MY_PASSWD, false);
+        new DmaapPublisherWrapper(ProtocolTypeConstants.DME2, servers, MY_TOPIC, MY_USERNAME, MY_PASSWD, true) {};
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testDmaapPublisherWrapper_InvalidTopic() {
+        new DmaapPublisherWrapper(ProtocolTypeConstants.DME2, servers, "", MY_USERNAME, MY_PASSWD, true) {};
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testDmaapPublisherWrapper_Aaf_NullServers() {
+        new DmaapAafPublisherWrapper(null, MY_TOPIC, MY_USERNAME, MY_PASSWD, true);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testDmaapPublisherWrapper_Aaf_NoServers() {
+        new DmaapAafPublisherWrapper(Collections.emptyList(), MY_TOPIC, MY_USERNAME, MY_PASSWD, true);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testDmaapPublisherWrapper_InvalidProtocol() {
+        new DmaapPublisherWrapper(ProtocolTypeConstants.HTTPNOAUTH, servers, MY_TOPIC, MY_USERNAME, MY_PASSWD, true) {};
+    }
+
+    @Test
+    public void testDmaapPublisherWrapperClose() throws Exception {
+        MRSimplerBatchPublisher pub = mock(MRSimplerBatchPublisher.class);
+        DmaapPublisherWrapper dmaap = new DmaapAafPublisherWrapper(servers, MY_TOPIC, MY_USERNAME, MY_PASSWD, true);
+        dmaap.publisher = pub;
+
+        dmaap.close();
+        verify(pub).close(anyLong(), any(TimeUnit.class));
+
+        // close, but with exception from publisher
+        doThrow(new IOException(EXPECTED)).when(pub).close(anyLong(), any(TimeUnit.class));
+        dmaap.close();
+    }
+
+    @Test
+    public void testDmaapPublisherWrapperSend() throws Exception {
+        MRSimplerBatchPublisher pub = mock(MRSimplerBatchPublisher.class);
+        DmaapPublisherWrapper dmaap = new DmaapAafPublisherWrapper(servers, MY_TOPIC, MY_USERNAME, MY_PASSWD, true);
+        dmaap.publisher = pub;
+
+        // null response
+        assertTrue(dmaap.send(MY_PARTITION, MY_MESSAGE));
+        verify(pub).setPubResponse(any(MRPublisherResponse.class));
+        verify(pub).send(MY_PARTITION, MY_MESSAGE);
+
+        // with response
+        pub = mock(MRSimplerBatchPublisher.class);
+        dmaap.publisher = pub;
+
+        MRPublisherResponse resp = new MRPublisherResponse();
+        when(pub.sendBatchWithResponse()).thenReturn(resp);
+        assertTrue(dmaap.send(MY_PARTITION, MY_MESSAGE));
+        verify(pub).setPubResponse(any(MRPublisherResponse.class));
+        verify(pub).send(MY_PARTITION, MY_MESSAGE);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testDmaapPublisherWrapperSend_NullMessage() throws Exception {
+        MRSimplerBatchPublisher pub = mock(MRSimplerBatchPublisher.class);
+        DmaapPublisherWrapper dmaap = new DmaapAafPublisherWrapper(servers, MY_TOPIC, MY_USERNAME, MY_PASSWD, true);
+        dmaap.publisher = pub;
+
+        dmaap.send(MY_PARTITION, null);
+    }
+
+    @Test
+    public void testDmaapDmePublisherWrapper() {
+        // verify with different parameters
+        new DmaapDmePublisherWrapper(makeBuilder().build());
+        new DmaapDmePublisherWrapper(makeBuilder().additionalProps(null).build());
+
+        addProps.put(ROUTE_PROP, MY_ROUTE);
+        new DmaapDmePublisherWrapper(makeBuilder().build());
+        new DmaapDmePublisherWrapper(makeBuilder().partner(null).build());
+
+        addProps.put("null-value", null);
+        new DmaapDmePublisherWrapper(makeBuilder().build());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testDmaapDmePublisherWrapper_InvalidEnv() {
+        new DmaapDmePublisherWrapper(makeBuilder().environment(null).build());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testDmaapDmePublisherWrapper_InvalidAft() {
+        new DmaapDmePublisherWrapper(makeBuilder().aftEnvironment(null).build());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testDmaapDmePublisherWrapper_InvalidLat() {
+        new DmaapDmePublisherWrapper(makeBuilder().latitude(null).build());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testDmaapDmePublisherWrapper_InvalidLong() {
+        new DmaapDmePublisherWrapper(makeBuilder().longitude(null).build());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testDmaapDmePublisherWrapper_InvalidPartner() {
+        new DmaapDmePublisherWrapper(makeBuilder().partner(null).build());
+    }
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.java
new file mode 100644 (file)
index 0000000..01e2e61
--- /dev/null
@@ -0,0 +1,124 @@
+/*
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus.internal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.bus.BusTopicTestBase;
+
+public class BusTopicBaseTest extends BusTopicTestBase {
+
+    private BusTopicBaseImpl base;
+
+    /**
+     * Initializes the object to be tested.
+     */
+    @Before
+    public void setUp() {
+        super.setUp();
+
+        base = new BusTopicBaseImpl(builder.build());
+    }
+
+    @Test
+    public void testToString() {
+        assertNotNull(base.toString());
+    }
+
+    @Test
+    public void testGetApiKey() {
+        assertEquals(MY_API_KEY, base.getApiKey());
+    }
+
+    @Test
+    public void testGetApiSecret() {
+        assertEquals(MY_API_SECRET, base.getApiSecret());
+    }
+
+    @Test
+    public void testIsUseHttps() {
+        assertEquals(true, base.isUseHttps());
+        assertEquals(false, new BusTopicBaseImpl(builder.useHttps(false).build()).isUseHttps());
+    }
+
+    @Test
+    public void testIsAllowSelfSignedCerts() {
+        assertEquals(true, base.isAllowSelfSignedCerts());
+        assertEquals(false, new BusTopicBaseImpl(builder.allowSelfSignedCerts(false).build()).isAllowSelfSignedCerts());
+    }
+
+    @Test
+    public void testAnyNullOrEmpty() {
+        assertFalse(base.anyNullOrEmpty());
+        assertFalse(base.anyNullOrEmpty("any-none-null", "any-none-null-B"));
+
+        assertTrue(base.anyNullOrEmpty(null, "any-first-null"));
+        assertTrue(base.anyNullOrEmpty("any-middle-null", null, "any-middle-null-B"));
+        assertTrue(base.anyNullOrEmpty("any-last-null", null));
+        assertTrue(base.anyNullOrEmpty("any-empty", ""));
+    }
+
+    @Test
+    public void testAllNullOrEmpty() {
+        assertTrue(base.allNullOrEmpty());
+        assertTrue(base.allNullOrEmpty(""));
+        assertTrue(base.allNullOrEmpty(null, ""));
+
+        assertFalse(base.allNullOrEmpty("all-ok-only-one"));
+        assertFalse(base.allNullOrEmpty("all-ok-one", "all-ok-two"));
+        assertFalse(base.allNullOrEmpty("all-ok-null", null));
+        assertFalse(base.allNullOrEmpty("", "all-ok-empty"));
+        assertFalse(base.allNullOrEmpty("", "all-one-ok", null));
+    }
+
+    private static class BusTopicBaseImpl extends BusTopicBase {
+
+        public BusTopicBaseImpl(BusTopicParams busTopicParams) {
+            super(busTopicParams);
+        }
+
+        @Override
+        public CommInfrastructure getTopicCommInfrastructure() {
+            return CommInfrastructure.NOOP;
+        }
+
+        @Override
+        public boolean start() {
+            return true;
+        }
+
+        @Override
+        public boolean stop() {
+            return true;
+        }
+
+        @Override
+        public void shutdown() {
+            // do nothing
+        }
+
+    }
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java
new file mode 100644 (file)
index 0000000..d56374f
--- /dev/null
@@ -0,0 +1,148 @@
+/*
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus.internal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.function.BiConsumer;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.bus.BusTopicTestBase;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder;
+
+public class BusTopicParamsTest extends BusTopicTestBase {
+
+    @Before
+    public void setUp() {
+        super.setUp();
+    }
+
+    @Test
+    public void test() {
+        BusTopicParams params = makeBuilder().build();
+
+        assertEquals(addProps, params.getAdditionalProps());
+        assertEquals(MY_AFT_ENV, params.getAftEnvironment());
+        assertEquals(true, params.isAllowSelfSignedCerts());
+        assertEquals(MY_API_KEY, params.getApiKey());
+        assertEquals(MY_API_SECRET, params.getApiSecret());
+        assertEquals(MY_BASE_PATH, params.getBasePath());
+        assertEquals(MY_CLIENT_NAME, params.getClientName());
+        assertEquals(MY_CONS_GROUP, params.getConsumerGroup());
+        assertEquals(MY_CONS_INST, params.getConsumerInstance());
+        assertEquals(MY_ENV, params.getEnvironment());
+        assertEquals(MY_FETCH_LIMIT, params.getFetchLimit());
+        assertEquals(MY_FETCH_TIMEOUT, params.getFetchTimeout());
+        assertEquals(MY_HOST, params.getHostname());
+        assertEquals(MY_LAT, params.getLatitude());
+        assertEquals(MY_LONG, params.getLongitude());
+        assertEquals(true, params.isManaged());
+        assertEquals(MY_PARTITION, params.getPartitionId());
+        assertEquals(MY_PARTNER, params.getPartner());
+        assertEquals(MY_PASSWD, params.getPassword());
+        assertEquals(MY_PORT, params.getPort());
+        assertEquals(servers, params.getServers());
+        assertEquals(MY_TOPIC, params.getTopic());
+        assertEquals(true, params.isUseHttps());
+        assertEquals(MY_USERNAME, params.getUserName());
+
+        // ensure that booleans are independent of each other
+        testBoolean("true:false:false", (bldr, flag) -> bldr.allowSelfSignedCerts(flag));
+        testBoolean("false:true:false", (bldr, flag) -> bldr.managed(flag));
+        testBoolean("false:false:true", (bldr, flag) -> bldr.useHttps(flag));
+
+        // test validity methods
+        assertTrue(params.isAdditionalPropsValid());
+        assertFalse(params.isAftEnvironmentInvalid());
+        assertTrue(params.isApiKeyValid());
+        assertTrue(params.isApiSecretValid());
+        assertFalse(params.isClientNameInvalid());
+        assertFalse(params.isConsumerGroupInvalid());
+        assertFalse(params.isConsumerInstanceInvalid());
+        assertFalse(params.isEnvironmentInvalid());
+        assertFalse(params.isHostnameInvalid());
+        assertFalse(params.isLatitudeInvalid());
+        assertFalse(params.isLongitudeInvalid());
+        assertFalse(params.isPartitionIdInvalid());
+        assertFalse(params.isPartnerInvalid());
+        assertTrue(params.isPasswordValid());
+        assertFalse(params.isPortInvalid());
+        assertFalse(params.isServersInvalid());
+        assertFalse(params.isTopicInvalid());
+        assertTrue(params.isUserNameValid());
+
+        // test inverted validity
+        assertFalse(makeBuilder().additionalProps(null).build().isAdditionalPropsValid());
+        assertTrue(makeBuilder().aftEnvironment("").build().isAftEnvironmentInvalid());
+        assertFalse(makeBuilder().apiKey("").build().isApiKeyValid());
+        assertFalse(makeBuilder().apiSecret("").build().isApiSecretValid());
+        assertTrue(makeBuilder().clientName("").build().isClientNameInvalid());
+        assertTrue(makeBuilder().consumerGroup("").build().isConsumerGroupInvalid());
+        assertTrue(makeBuilder().consumerInstance("").build().isConsumerInstanceInvalid());
+        assertTrue(makeBuilder().environment("").build().isEnvironmentInvalid());
+        assertTrue(makeBuilder().hostname("").build().isHostnameInvalid());
+        assertTrue(makeBuilder().latitude("").build().isLatitudeInvalid());
+        assertTrue(makeBuilder().longitude("").build().isLongitudeInvalid());
+        assertTrue(makeBuilder().partitionId("").build().isPartitionIdInvalid());
+        assertTrue(makeBuilder().partner("").build().isPartnerInvalid());
+        assertFalse(makeBuilder().password("").build().isPasswordValid());
+        assertTrue(makeBuilder().port(-1).build().isPortInvalid());
+        assertTrue(makeBuilder().port(65536).build().isPortInvalid());
+        assertTrue(makeBuilder().servers(null).build().isServersInvalid());
+        assertTrue(makeBuilder().servers(new LinkedList<>()).build().isServersInvalid());
+        assertTrue(makeBuilder().servers(Arrays.asList("")).build().isServersInvalid());
+        assertFalse(makeBuilder().servers(Arrays.asList("one-server")).build().isServersInvalid());
+        assertTrue(makeBuilder().topic("").build().isTopicInvalid());
+        assertFalse(makeBuilder().userName("").build().isUserNameValid());
+    }
+
+    /**
+     * Tests the boolean methods by applying a function, once with {@code false} and once
+     * with {@code true}. Verifies that all of the boolean methods return the correct
+     * value by concatenating them.
+     * 
+     * @param expectedTrue the string that is expected when {@code true} is passed to the
+     *        method
+     * @param function function to be applied to the builder
+     */
+    private void testBoolean(String expectedTrue, BiConsumer<TopicParamsBuilder, Boolean> function) {
+        TopicParamsBuilder builder = BusTopicParams.builder();
+
+        // first try the "false" case
+        function.accept(builder, false);
+
+        BusTopicParams params = builder.build();
+        assertEquals("false:false:false",
+                        "" + params.isAllowSelfSignedCerts() + ":" + params.isManaged() + ":" + params.isUseHttps());
+
+
+        // now try the "true" case
+        function.accept(builder, true);
+
+        params = builder.build();
+        assertEquals(expectedTrue,
+                        "" + params.isAllowSelfSignedCerts() + ":" + params.isManaged() + ":" + params.isUseHttps());
+    }
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBaseTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBaseTest.java
new file mode 100644 (file)
index 0000000..4634d12
--- /dev/null
@@ -0,0 +1,295 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Engine - Common Modules
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus.internal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.common.endpoints.event.comm.bus.BusTopicTestBase;
+
+public class TopicBaseTest extends BusTopicTestBase {
+
+    private TopicBaseImpl base;
+
+    /**
+     * Creates the object to be tested.
+     */
+    @Before
+    public void setUp() {
+        super.setUp();
+        
+        base = new TopicBaseImpl(servers, MY_TOPIC);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testTopicBase_NullServers() {
+        new TopicBaseImpl(null, MY_TOPIC);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testTopicBase_EmptyServers() {
+        new TopicBaseImpl(Collections.emptyList(), MY_TOPIC);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testTopicBase_NullTopic() {
+        new TopicBaseImpl(servers, null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testTopicBase_EmptyTopic() {
+        new TopicBaseImpl(servers, "");
+    }
+
+    @Test
+    public void testRegister() {
+        TopicListener listener = mock(TopicListener.class);
+        base.register(listener);
+        assertEquals(Arrays.asList(listener), base.snapshotTopicListeners());
+
+        // re-register - list should be unchanged
+        base.register(listener);
+        assertEquals(Arrays.asList(listener), base.snapshotTopicListeners());
+
+        // register a new listener
+        TopicListener listener2 = mock(TopicListener.class);
+        base.register(listener2);
+        assertEquals(Arrays.asList(listener, listener2), base.snapshotTopicListeners());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testRegister_NullListener() {
+        base.register(null);
+    }
+
+    @Test
+    public void testUnregister() {
+        // register two listeners
+        TopicListener listener = mock(TopicListener.class);
+        TopicListener listener2 = mock(TopicListener.class);
+        base.register(listener);
+        base.register(listener2);
+
+        // unregister one
+        base.unregister(listener);
+        assertEquals(Arrays.asList(listener2), base.snapshotTopicListeners());
+
+        // unregister the other
+        base.unregister(listener2);
+        assertTrue(base.snapshotTopicListeners().isEmpty());
+
+        // unregister again
+        base.unregister(listener2);
+        assertTrue(base.snapshotTopicListeners().isEmpty());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testUnregister_NullListener() {
+        base.register(mock(TopicListener.class));
+        base.unregister(null);
+    }
+
+    @Test
+    public void testBroadcast() {
+        // register two listeners
+        TopicListener listener = mock(TopicListener.class);
+        TopicListener listener2 = mock(TopicListener.class);
+        base.register(listener);
+        base.register(listener2);
+
+        // broadcast a message
+        final String msg1 = "message-A";
+        base.broadcast(msg1);
+        verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, msg1);
+        verify(listener2).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, msg1);
+
+        // broadcast another message, with an exception
+        final String msg2 = "message-B";
+        doThrow(new RuntimeException(EXPECTED)).when(listener).onTopicEvent(any(), any(), any());
+        base.broadcast(msg2);
+        verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, msg2);
+        verify(listener2).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, msg2);
+    }
+
+    @Test
+    public void testLock_testUnlock() {
+        assertFalse(base.isLocked());
+        assertTrue(base.lock());
+        assertEquals(0, base.startCount);
+        assertEquals(1, base.stopCount);
+
+        // lock again - should not stop again
+        assertTrue(base.isLocked());
+        assertTrue(base.lock());
+        assertEquals(0, base.startCount);
+        assertEquals(1, base.stopCount);
+
+        assertTrue(base.isLocked());
+        assertTrue(base.unlock());
+        assertEquals(1, base.startCount);
+        assertEquals(1, base.stopCount);
+
+        // unlock again - should not start again
+        assertFalse(base.isLocked());
+        assertTrue(base.unlock());
+        assertEquals(1, base.startCount);
+        assertEquals(1, base.stopCount);
+
+        // lock, but stop returns false
+        base = new TopicBaseImpl(servers, MY_TOPIC);
+        base.stopReturn = false;
+        assertFalse(base.lock());
+        assertTrue(base.isLocked());
+        assertTrue(base.lock());
+
+        // unlock, but start returns false
+        base.startReturn = false;
+        assertFalse(base.unlock());
+        assertFalse(base.isLocked());
+        assertTrue(base.unlock());
+
+        // lock & re-lock, but start throws an exception
+        base = new TopicBaseImpl(servers, MY_TOPIC);
+        base.startEx = true;
+        assertTrue(base.lock());
+        assertFalse(base.unlock());
+        assertFalse(base.isLocked());
+        assertTrue(base.unlock());
+    }
+
+    @Test
+    public void testIsLocked() {
+        assertFalse(base.isLocked());
+        base.lock();
+        assertTrue(base.isLocked());
+        base.unlock();
+        assertFalse(base.isLocked());
+    }
+
+    @Test
+    public void testGetTopic() {
+        assertEquals(MY_TOPIC, base.getTopic());
+    }
+
+    @Test
+    public void testIsAlive() {
+        assertFalse(base.isAlive());
+        base.start();
+        assertTrue(base.isAlive());
+        base.stop();
+        assertFalse(base.isAlive());
+    }
+
+    @Test
+    public void testGetServers() {
+        assertEquals(servers, base.getServers());
+    }
+
+    @Test
+    public void testGetRecentEvents() {
+        assertEquals(0, base.getRecentEvents().length);
+
+        base.addEvent("recent-A");
+        base.addEvent("recent-B");
+
+        String[] recent = base.getRecentEvents();
+        assertEquals(2, recent.length);
+        assertEquals("recent-A", recent[0]);
+        assertEquals("recent-B", recent[1]);
+    }
+
+    @Test
+    public void testToString() {
+        assertNotNull(base.toString());
+    }
+
+    /**
+     * Implementation of TopicBase.
+     */
+    private static class TopicBaseImpl extends TopicBase {
+        private int startCount = 0;
+        private int stopCount = 0;
+        private boolean startReturn = true;
+        private boolean stopReturn = true;
+        private boolean startEx = false;
+
+        /**
+         * Constructor.
+         * 
+         * @param servers list of servers
+         * @param topic topic name
+         */
+        public TopicBaseImpl(List<String> servers, String topic) {
+            super(servers, topic);
+        }
+
+        @Override
+        public CommInfrastructure getTopicCommInfrastructure() {
+            return CommInfrastructure.NOOP;
+        }
+
+        @Override
+        public boolean start() {
+            ++startCount;
+
+            if (startEx) {
+                throw new RuntimeException(EXPECTED);
+            }
+
+            alive = true;
+            return startReturn;
+        }
+
+        @Override
+        public boolean stop() {
+            ++stopCount;
+            alive = false;
+            return stopReturn;
+        }
+
+        @Override
+        public void shutdown() {
+            // do nothing
+        }
+
+        /**
+         * Adds an event to the list of recent events.
+         * 
+         * @param event event to be added
+         */
+        public void addEvent(String event) {
+            recentEvents.add(event);
+        }
+    }
+}