[KAFKA] Add docker-compose to sample project
[dmaap/kafka11aaf.git] / sampleClient / src / main / java / org / onap / dmaap / kafka / sample / Main.java
index 37a3097..c80c87f 100644 (file)
 
 package org.onap.dmaap.kafka.sample;
 
+import java.util.List;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.onap.dmaap.kafka.OnapKafkaClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.boot.SpringApplication;
@@ -29,6 +33,8 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
 @SpringBootApplication
 public class Main implements CommandLineRunner{
 
+    private final Logger log = LoggerFactory.getLogger(OnapKafkaClient.class.getName());
+
     @Autowired
     private SampleConfiguration configuration;
 
@@ -37,8 +43,22 @@ public class Main implements CommandLineRunner{
     }
 
     @Override
-    public void run(String... args) {
+    public void run(String... args) throws InterruptedException {
         OnapKafkaClient handler = new OnapKafkaClient(configuration);
-        handler.fetchFromTopic("dummy.topic.blah");
+        String testTopic = configuration.getConsumerTopics().get(0);
+        for (int i = 0; i < 5; i++) {
+            RecordMetadata recordMetadata = handler.publishToTopic(testTopic, "dummy-message-"+i);
+            if (recordMetadata != null) {
+                log.info("Topic: {}, Partition: {}, Offset: {}", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
+            }
+        }
+        int fetch = 0;
+        while (true) {
+            fetch++;
+            log.info("Fetch {} from topic: {}", fetch, testTopic);
+            List<String> res = handler.fetchFromTopic(testTopic);
+            log.info("Messages from fetch {}: " + res, fetch);
+            Thread.sleep(3000);
+        }
     }
 }
\ No newline at end of file