* ONAP
* ================================================================================
* Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2020 Nordix Foundation
+ * Modifications Copyright (C) 2020, 2024 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.policy.drools.pooling;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import com.google.gson.Gson;
import com.google.gson.JsonParseException;
-import java.util.Arrays;
import java.util.Deque;
import java.util.IdentityHashMap;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Getter;
import org.apache.commons.lang3.tuple.Pair;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.onap.policy.common.endpoints.event.comm.Topic;
* <p>Invoke {@link #runSlow()}, before the test, to slow things down.
*/
-public class FeatureTest {
+class FeatureTest {
private static final Logger logger = LoggerFactory.getLogger(FeatureTest.class);
/**
* Name of the topic used for inter-host communication.
* Context for the current test case.
*/
private Context ctx;
+
/**
* Setup.
*/
- @Before
+ @BeforeEach
public void setUp() {
ctx = null;
}
+
/**
* Tear down.
*/
- @After
+ @AfterEach
public void tearDown() {
if (ctx != null) {
ctx.destroy();
}
@Test
- public void test_SingleHost() throws Exception {
+ void test_SingleHost() throws Exception {
run(70, 1);
}
@Test
- public void test_TwoHosts() throws Exception {
+ void test_TwoHosts() throws Exception {
run(200, 2);
}
@Test
- public void test_ThreeHosts() throws Exception {
+ void test_ThreeHosts() throws Exception {
run(200, 3);
}
private String makeMessage(int reqnum) {
return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
}
+
/**
* Invoke this to slow the timers down.
*/
stdInterPollMs = 2000;
stdEventWaitSec = 1000;
}
+
/**
* Decodes an event.
*
return null;
}
}
+
/**
* Context used for a single test case.
*/
* Number of events we're still waiting to receive.
*/
private final CountDownLatch eventCounter;
+
/**
* The current host. Set by {@link #withHost(Host, VoidFunction)} and used by
* {@link #getCurrentHost()}.
*/
+ @Getter
private Host currentHost = null;
/**
public Context(int events) {
eventCounter = new CountDownLatch(events);
}
+
/**
* Destroys the context, stopping any hosts that remain.
*/
public void checkAllSawAMsg() {
int msgs = 0;
for (Host host : hosts) {
- assertTrue("msgs=" + msgs, host.messageSeen());
+ assertTrue(host.messageSeen(), "msgs=" + msgs);
++msgs;
}
}
* Adds an internal channel to the set of channels.
*
* @param channel channel
- * @param queue the channel's queue
+ * @param queue the channel's queue
*/
public void addInternal(String channel, BlockingQueue<String> queue) {
/**
* Associates a controller with its drools controller.
*
- * @param controller controller
+ * @param controller controller
* @param droolsController drools controller
*/
/**
* Waits, for a period of time, for all events to be processed.
*
- * @param time time
+ * @param time time
* @param units units
* @return {@code true} if all events have been processed, {@code false} otherwise
* @throws InterruptedException throws interrupted
return eventCounter.await(time, units);
}
- /**
- * Gets the current host, provided this is used from within a call to
- * {@link #withHost(Host, VoidFunction)}.
- *
- * @return the current host, or {@code null} if there is no current host
- */
-
- public Host getCurrentHost() {
- return currentHost;
- }
}
/**
* Offers an event to the feature, before the policy controller handles it.
*
* @param protocol protocol
- * @param topic2 topic
- * @param event event
+ * @param topic2 topic
+ * @param event event
* @return {@code true} if the event was handled, {@code false} otherwise
*/
* Offers an event to the feature, after the policy controller handles it.
*
* @param protocol protocol
- * @param topic topic
- * @param event event
- * @param success success
+ * @param topic topic
+ * @param event event
+ * @param success success
* @return {@code true} if the event was handled, {@code false} otherwise
*/
/**
* Offers an event to the feature, after the drools controller handles it.
*
- * @param fact fact
+ * @param fact fact
* @param successInsert {@code true} if it was successfully inserted by the drools
- * controller, {@code false} otherwise
+ * controller, {@code false} otherwise
* @return {@code true} if the event was handled, {@code false} otherwise
*/
/**
* Message seen.
*
- * @return {@code true} if a message was seen for this host, {@code false}
- * otherwise
+ * @return {@code true} if a message was seen for this host, {@code false} otherwise
*/
public boolean messageSeen() {
/**
* Constructor.
*
- * @param type topic type
+ * @param type topic type
* @param queue topic from which to read
*/
/**
* Polls for messages from the topic and offers them to the listener.
*
- * @param stopped triggered if processing should stop
+ * @param stopped triggered if processing should stop
* @param listener listener
* @throws InterruptedException throws interrupted exception
*/
props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
- props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
- "" + stdOfflinePubWaitMs);
- props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
- "" + stdStartHeartbeatMs);
+ props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1), "" + stdOfflinePubWaitMs);
+ props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1), "" + stdStartHeartbeatMs);
props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + stdReactivateWaitMs);
props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + stdIdentificationMs);
props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
- "" + stdActiveHeartbeatMs);
- props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
- "" + stdInterHeartbeatMs);
+ "" + stdActiveHeartbeatMs);
+ props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1), "" + stdInterHeartbeatMs);
return props;
}
* Embeds a specializer within a property name, after the prefix.
*
* @param propnm property name into which it should be embedded
- * @param spec specializer to be embedded
+ * @param spec specializer to be embedded
* @return the property name, with the specializer embedded within it
*/
@Override
protected PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
- CountDownLatch activeLatch) {
+ CountDownLatch activeLatch) {
currentContext.set(context);
return new PoolingManagerTest(host, controller, props, activeLatch);
}
/**
* Constructor.
*
- * @param host the host
- * @param controller the controller
- * @param props the properties
+ * @param host the host
+ * @param controller the controller
+ * @param props the properties
* @param activeLatch the latch
*/
public PoolingManagerTest(String host, PolicyController controller, PoolingProperties props,
- CountDownLatch activeLatch) {
+ CountDownLatch activeLatch) {
super(host, controller, props, activeLatch);
}
@Override
- protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
- return new DmaapManagerImpl(topic);
+ protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
+ return new TopicMessageManagerImpl(topic);
}
@Override
* DMaaP Manager with overrides.
*/
- private static class DmaapManagerImpl extends DmaapManager {
+ private static class TopicMessageManagerImpl extends TopicMessageManager {
/**
* Constructor.
* @throws PoolingFeatureException if an error occurs
*/
- public DmaapManagerImpl(String topic) throws PoolingFeatureException {
+ public TopicMessageManagerImpl(String topic) throws PoolingFeatureException {
super(topic);
}
@Override
protected List<TopicSource> getTopicSources() {
- return Arrays.asList(new TopicSourceImpl(INTERNAL_TOPIC,
- currentContext.get().getCurrentHost().getInternalQueue()));
+ return List.of(
+ new TopicSourceImpl(INTERNAL_TOPIC, currentContext.get().getCurrentHost().getInternalQueue()));
}
@Override
protected List<TopicSink> getTopicSinks() {
- return Arrays.asList(new TopicSinkImpl(currentContext.get()));
+ return List.of(new TopicSinkImpl(currentContext.get()));
}
}