[DMAAP-MR] Remove redundant data
[dmaap/messagerouter/messageservice.git] / src / test / java / org / onap / dmaap / mr / cambria / embed / KafkaLocal.java
index 9f3c05a..17c5bbb 100644 (file)
@@ -3,6 +3,7 @@
  * ONAP Policy Engine
  * ================================================================================
  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Modification copyright (C) 2021 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
  package org.onap.dmaap.mr.cambria.embed;
 
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
-
 import java.io.IOException;
 import java.util.Properties;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import org.apache.kafka.common.utils.Time;
 
 
 public class KafkaLocal {
  
-       public KafkaServerStartable kafka;
+       public KafkaServer kafka;
        public ZooKeeperLocal zookeeper;
        
        public KafkaLocal(Properties kafkaProperties, Properties zkProperties) throws IOException, InterruptedException{
@@ -38,10 +39,12 @@ public class KafkaLocal {
                //start local zookeeper
                System.out.println("starting local zookeeper...");
                zookeeper = new ZooKeeperLocal(zkProperties);
+               zookeeper.run();
                System.out.println("done");
                
                //start local kafka broker
-               kafka = new KafkaServerStartable(kafkaConfig);
+               final scala.Option<String> prefix = scala.Option.apply("kafka");
+               kafka = new KafkaServer(kafkaConfig, Time.SYSTEM, prefix, false);
                System.out.println("starting local kafka broker...");
                kafka.startup();
                System.out.println("done");
@@ -52,6 +55,10 @@ public class KafkaLocal {
                //stop kafka broker
                System.out.println("stopping kafka...");
                kafka.shutdown();
+               kafka.awaitShutdown();
+               System.out.println("done");
+               System.out.println("stopping zookeeper...");
+               zookeeper.stop();
                System.out.println("done");
        }