- () -> ref.set(buildRestServer(dmaapName, restsim)),
- () -> ref.get().shutdown());
- }
-
- // NOTE: topics must be started AFTER the (dmaap) rest servers
-
- // topic sinks
- Map<String, TopicSink> sinks = new HashMap<>();
- for (TopicParameters topicParams : params.getTopicSinks()) {
- String topic = topicParams.getTopic();
- addAction("Sink " + topic,
- () -> sinks.put(topic, startSink(topicParams)),
- () -> sinks.get(topic).shutdown());
- }
-
- // topic sources
- Map<String, TopicSource> sources = new HashMap<>();
- for (TopicParameters topicParams : params.getTopicSources()) {
- String topic = topicParams.getTopic();
- addAction("Source " + topic,
- () -> sources.put(topic, startSource(topicParams)),
- () -> sources.get(topic).shutdown());
- }
-
- // topic server simulators
- for (TopicServerParameters topicsim : params.getTopicServers()) {
- AtomicReference<TopicServer<?>> ref = new AtomicReference<>();
- addAction(topicsim.getName(),
- () -> ref.set(buildTopicServer(topicsim, sinks, sources)),