[DMAAP-MR] Remove redundant data
[dmaap/messagerouter/messageservice.git] / src / test / java / org / onap / dmaap / mr / cambria / embed / KafkaLocal.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP Policy Engine
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * Modification copyright (C) 2021 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22  package org.onap.dmaap.mr.cambria.embed;
23
24 import java.io.IOException;
25 import java.util.Properties;
26 import kafka.server.KafkaConfig;
27 import kafka.server.KafkaServer;
28 import org.apache.kafka.common.utils.Time;
29
30
31 public class KafkaLocal {
32  
33         public KafkaServer kafka;
34         public ZooKeeperLocal zookeeper;
35         
36         public KafkaLocal(Properties kafkaProperties, Properties zkProperties) throws IOException, InterruptedException{
37                 KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
38                 
39                 //start local zookeeper
40                 System.out.println("starting local zookeeper...");
41                 zookeeper = new ZooKeeperLocal(zkProperties);
42                 zookeeper.run();
43                 System.out.println("done");
44                 
45                 //start local kafka broker
46                 final scala.Option<String> prefix = scala.Option.apply("kafka");
47                 kafka = new KafkaServer(kafkaConfig, Time.SYSTEM, prefix, false);
48                 System.out.println("starting local kafka broker...");
49                 kafka.startup();
50                 System.out.println("done");
51         }
52         
53         
54         public void stop(){
55                 //stop kafka broker
56                 System.out.println("stopping kafka...");
57                 kafka.shutdown();
58                 kafka.awaitShutdown();
59                 System.out.println("done");
60                 System.out.println("stopping zookeeper...");
61                 zookeeper.stop();
62                 System.out.println("done");
63         }
64         
65 }