* ONAP Policy Engine
* ================================================================================
* Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Modification copyright (C) 2021 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.dmaap.mr.cambria.embed;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
-
import java.io.IOException;
import java.util.Properties;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import org.apache.kafka.common.utils.Time;
public class KafkaLocal {
- public KafkaServerStartable kafka;
+ public KafkaServer kafka;
public ZooKeeperLocal zookeeper;
public KafkaLocal(Properties kafkaProperties, Properties zkProperties) throws IOException, InterruptedException{
//start local zookeeper
System.out.println("starting local zookeeper...");
zookeeper = new ZooKeeperLocal(zkProperties);
+ zookeeper.run();
System.out.println("done");
//start local kafka broker
- kafka = new KafkaServerStartable(kafkaConfig);
+ final scala.Option<String> prefix = scala.Option.apply("kafka");
+ kafka = new KafkaServer(kafkaConfig, Time.SYSTEM, prefix, false);
System.out.println("starting local kafka broker...");
kafka.startup();
System.out.println("done");
//stop kafka broker
System.out.println("stopping kafka...");
kafka.shutdown();
+ kafka.awaitShutdown();
+ System.out.println("done");
+ System.out.println("stopping zookeeper...");
+ zookeeper.stop();
System.out.println("done");
}