package org.onap.policy.controlloop.actor.test;
+import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;
+import java.util.List;
import java.util.function.BiConsumer;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
+import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
+import org.onap.policy.common.endpoints.parameters.TopicParameters;
import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.common.utils.coder.StandardCoderObject;
-import org.onap.policy.common.utils.time.PseudoExecutor;
-import org.onap.policy.controlloop.actorserviceprovider.impl.BidirectionalTopicOperator;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicConfig;
import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
+import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicManager;
import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
+import org.onap.policy.simulators.TopicServer;
+import org.onap.policy.simulators.Util;
/**
* Superclass for various BidirectionalTopicOperation tests.
+ *
+ * @param <Q> request type
*/
-public class BasicBidirectionalTopicOperation extends BasicOperation {
+public abstract class BasicBidirectionalTopicOperation<Q> extends BasicOperation {
protected static final String MY_SINK = "my-sink";
protected static final String MY_SOURCE = "my-source";
- protected static final int TIMEOUT = 10;
+ protected static final int TIMEOUT_SEC = 10;
+ protected static final long TIMEOUT_MS = 1000L * TIMEOUT_SEC;
+
+ // sink and source used by the TopicServer
+ private static TopicSink serverSink;
+ private static TopicSource serverSource;
+ private static BidirectionalTopicHandler realTopicHandler;
+
+ protected static BidirectionalTopicManager topicMgr = (sink, source) -> {
+ // note: the sink and source names are swapped for the simulator
+ assertEquals(serverSource.getTopic(), sink);
+ assertEquals(serverSink.getTopic(), source);
+ return realTopicHandler;
+ };
@Captor
protected ArgumentCaptor<BiConsumer<String, StandardCoderObject>> listenerCaptor;
@Mock
protected Forwarder forwarder;
@Mock
- protected BidirectionalTopicOperator operator;
+ protected BidirectionalTopicConfig config;
- protected BidirectionalTopicParams topicParams;
+ private TopicServer<Q> topicServer;
/**
* Constructs the object using a default actor and operation name.
}
/**
- * Initializes mocks and sets up.
+ * Starts the topic.
*/
- public void setUp() throws Exception {
- MockitoAnnotations.initMocks(this);
+ protected static void initBeforeClass(String sinkTopic, String sourceTopic) throws Exception {
+
+ Util.buildDmaapSim();
+
+ // note: the sink and source names are swapped for the simulator
+ TopicParameters ptopic = new TopicParameters();
+ ptopic.setTopic(sourceTopic);
+ ptopic.setManaged(true);
+ ptopic.setServers(List.of("localhost"));
+ ptopic.setTopicCommInfrastructure("dmaap");
+ ptopic.setFetchTimeout(500);
+ serverSink = TopicEndpointManager.getManager().addTopicSinks(List.of(ptopic)).get(0);
- executor = new PseudoExecutor();
+ ptopic.setTopic(sinkTopic);
+ serverSource = TopicEndpointManager.getManager().addTopicSources(List.of(ptopic)).get(0);
- makeContext();
+ serverSink.start();
+ serverSource.start();
- outcome = params.makeOutcome();
- topicParams = BidirectionalTopicParams.builder().sinkTopic(MY_SINK).sourceTopic(MY_SOURCE).timeoutSec(TIMEOUT)
- .build();
+ if (!sinkTopic.equals(sourceTopic)) {
+ // sink and source are different - create other ends for the actor
+ initActorTopics(sinkTopic, sourceTopic, ptopic);
+ }
+
+ realTopicHandler = new BidirectionalTopicHandler(sinkTopic, sourceTopic);
+ realTopicHandler.start();
+ }
+
+ private static void initActorTopics(String sinkTopic, String sourceTopic, TopicParameters ptopic) {
+ // create sink and source for the actor, too
+ ptopic.setTopic(sinkTopic);
+ TopicEndpointManager.getManager().addTopicSinks(List.of(ptopic)).get(0).start();
+
+ ptopic.setTopic(sourceTopic);
+ TopicEndpointManager.getManager().addTopicSources(List.of(ptopic)).get(0).start();
+ }
- initOperator();
+ protected static void destroyAfterClass() {
+ TopicEndpointManager.getManager().shutdown();
+ HttpServletServerFactoryInstance.getServerFactory().destroy();
+ HttpClientFactoryInstance.getClientFactory().destroy();
}
/**
- * Initializes an operator so that it is "alive" and has the given names.
+ * Initializes mocks and sets up.
+ */
+ @Override
+ public void setUpBasic() {
+ super.setUpBasic();
+ topicServer = makeServer(serverSink, serverSource);
+ initConfig();
+ }
+
+ public void tearDownBasic() {
+ topicServer.shutdown();
+ }
+
+ /**
+ * Makes a simulator for the given sink and source.
+ *
+ * @param sink topic to which the simulator should publish responses
+ * @param source topic from which the simulator should receive messages
+ * @return a new topic server/simulator
+ */
+ protected abstract TopicServer<Q> makeServer(TopicSink sink, TopicSource source);
+
+ /**
+ * Initializes a configuration.
*/
- protected void initOperator() {
- when(operator.isAlive()).thenReturn(true);
- when(operator.getFullName()).thenReturn(actorName + "." + operationName);
- when(operator.getActorName()).thenReturn(actorName);
- when(operator.getName()).thenReturn(operationName);
- when(operator.getTopicHandler()).thenReturn(topicHandler);
- when(operator.getForwarder()).thenReturn(forwarder);
- when(operator.getParams()).thenReturn(topicParams);
+ protected void initConfig() {
+ when(config.getTopicHandler()).thenReturn(topicHandler);
+ when(config.getForwarder()).thenReturn(forwarder);
+ when(config.getTimeoutMs()).thenReturn(TIMEOUT_MS);
}
/**