X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=sampleClient%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fkafka%2Fsample%2FMain.java;h=c80c87f1cd003d31be526d83e72f176e08633a01;hb=c3386ebc4fd444e810d8739f7ddd78765d36c631;hp=37a30978cb57b80658e3695563fad501016ba264;hpb=38f5b4b9dc667c52561867d4e36f940109f3e3a5;p=dmaap%2Fkafka11aaf.git diff --git a/sampleClient/src/main/java/org/onap/dmaap/kafka/sample/Main.java b/sampleClient/src/main/java/org/onap/dmaap/kafka/sample/Main.java index 37a3097..c80c87f 100644 --- a/sampleClient/src/main/java/org/onap/dmaap/kafka/sample/Main.java +++ b/sampleClient/src/main/java/org/onap/dmaap/kafka/sample/Main.java @@ -20,7 +20,11 @@ package org.onap.dmaap.kafka.sample; +import java.util.List; +import org.apache.kafka.clients.producer.RecordMetadata; import org.onap.dmaap.kafka.OnapKafkaClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; @@ -29,6 +33,8 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Main implements CommandLineRunner{ + private final Logger log = LoggerFactory.getLogger(OnapKafkaClient.class.getName()); + @Autowired private SampleConfiguration configuration; @@ -37,8 +43,22 @@ public class Main implements CommandLineRunner{ } @Override - public void run(String... args) { + public void run(String... args) throws InterruptedException { OnapKafkaClient handler = new OnapKafkaClient(configuration); - handler.fetchFromTopic("dummy.topic.blah"); + String testTopic = configuration.getConsumerTopics().get(0); + for (int i = 0; i < 5; i++) { + RecordMetadata recordMetadata = handler.publishToTopic(testTopic, "dummy-message-"+i); + if (recordMetadata != null) { + log.info("Topic: {}, Partition: {}, Offset: {}", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); + } + } + int fetch = 0; + while (true) { + fetch++; + log.info("Fetch {} from topic: {}", fetch, testTopic); + List res = handler.fetchFromTopic(testTopic); + log.info("Messages from fetch {}: " + res, fetch); + Thread.sleep(3000); + } } } \ No newline at end of file