Switch to DMaaP kafka image
[integration/csit.git] / tests / dcaegen2-collectors-hv-ves / testcases / libraries / KafkaLibrary.py
1 # ============LICENSE_START====================================
2 # csit-dcaegen2-collectors-hv-ves
3 # =========================================================
4 # Copyright (C) 2019 Nokia. All rights reserved.
5 # =========================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #       http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=====================================
18
19 import docker
20 import os
21 from robot.api import logger
22
23 KAFKA_IMAGE_FULL_NAME = os.getenv("KAFKA_IMAGE_FULL_NAME")
24 KAFKA_ADDRESS = "kafka:9092"
25 ZOOKEEPER_ADDRESS = "zookeeper:2181"
26
27 LIST_TOPICS_COMMAND = "kafka-topics.sh --list --zookeeper %s" % ZOOKEEPER_ADDRESS
28 TOPIC_STATUS_COMMAND = "kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list " + KAFKA_ADDRESS + " --topic %s --time -1"
29 DELETE_TOPIC_COMMAND = "kafka-topics.sh --zookeeper " + ZOOKEEPER_ADDRESS + " --delete --topic %s"
30
31
32 class KafkaLibrary:
33
34     def log_kafka_status(self):
35         dockerClient = docker.from_env()
36         kafka = dockerClient.containers.list(filters={"ancestor": KAFKA_IMAGE_FULL_NAME}, all=True)[0]
37
38         topics = self.get_topics(kafka)
39         logger.info("Topics initialized in Kafka cluster: " + str(topics))
40         for topic in topics:
41             if topic == "__consumer_offsets":
42                 # kafka-internal topic, ignore it
43                 continue
44
45             self.log_topic_status(kafka, topic)
46             self.reset_topic(kafka, topic)
47
48         dockerClient.close()
49
50     def get_topics(self, kafka):
51         exitCode, output = kafka.exec_run(LIST_TOPICS_COMMAND)
52         return output.splitlines()
53
54     def log_topic_status(self, kafka, topic):
55         _, topic_status = kafka.exec_run(TOPIC_STATUS_COMMAND % topic)
56         logger.info("Messages on topic: " + str(topic_status))
57
58     def reset_topic(self, kafka, topic):
59         logger.info("Removing topic " + str(
60             topic) + " (note that it will be recreated by dcae-app-simulator/hv-ves-collector, however the offset will be reseted)")
61         _, output = kafka.exec_run(DELETE_TOPIC_COMMAND % topic)
62         logger.info(str(output))