+ /**
+ * Starts the topic.
+ *
+ * @throws InterruptedException if interrupted
+ * @throws BidirectionalTopicClientException if the client cannot be built
+ */
+ protected static void initBeforeClass(String sinkTopic, String sourceTopic)
+ throws InterruptedException, BidirectionalTopicClientException {
+
+ Util.buildDmaapSim();
+
+ // note: the sink and source names are swapped for the simulator
+ var ptopic = new TopicParameters();
+ ptopic.setTopic(sourceTopic);
+ ptopic.setManaged(true);
+ ptopic.setServers(List.of("localhost"));
+ ptopic.setTopicCommInfrastructure("dmaap");
+ ptopic.setFetchTimeout(500);
+ serverSink = TopicEndpointManager.getManager().addTopicSinks(List.of(ptopic)).get(0);
+
+ ptopic.setTopic(sinkTopic);
+ serverSource = TopicEndpointManager.getManager().addTopicSources(List.of(ptopic)).get(0);
+
+ serverSink.start();
+ serverSource.start();
+
+ if (!sinkTopic.equals(sourceTopic)) {
+ // sink and source are different - create other ends for the actor
+ initActorTopics(sinkTopic, sourceTopic, ptopic);
+ }
+
+ realTopicHandler = new BidirectionalTopicHandler(sinkTopic, sourceTopic);
+ realTopicHandler.start();
+ }
+
+ private static void initActorTopics(String sinkTopic, String sourceTopic, TopicParameters ptopic) {
+ // create sink and source for the actor, too
+ ptopic.setTopic(sinkTopic);
+ TopicEndpointManager.getManager().addTopicSinks(List.of(ptopic)).get(0).start();
+
+ ptopic.setTopic(sourceTopic);
+ TopicEndpointManager.getManager().addTopicSources(List.of(ptopic)).get(0).start();
+ }
+
+ protected static void destroyAfterClass() {
+ TopicEndpointManager.getManager().shutdown();
+ HttpServletServerFactoryInstance.getServerFactory().destroy();
+ HttpClientFactoryInstance.getClientFactory().destroy();
+ }
+