+ String testTopic = configuration.getConsumerTopics().get(0);
+ for (int i = 0; i < 5; i++) {
+ RecordMetadata recordMetadata = handler.publishToTopic(testTopic, "dummy-message-"+i);
+ if (recordMetadata != null) {
+ log.info("Topic: {}, Partition: {}, Offset: {}", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
+ }
+ }
+ int fetch = 0;
+ while (true) {
+ fetch++;
+ log.info("Fetch {} from topic: {}", fetch, testTopic);
+ List<String> res = handler.fetchFromTopic(testTopic);
+ log.info("Messages from fetch {}: " + res, fetch);
+ Thread.sleep(3000);
+ }