First working draft of kafka for inference app
[demo.git] / vnfs / DAaaS / microservices / PythonApps / python-kafkaConsumer-inference-app / src / main.py
diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/main.py b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/main.py
new file mode 100755 (executable)
index 0000000..bf62f50
--- /dev/null
@@ -0,0 +1,47 @@
+#!/usr/bin/env python3
+
+# from .consumer.CustomKafkaConsumer import CustomKafkaConsumer
+# from .producer.CustomKafkaProducer import CustomKafkaProducer
+
+import sys
+import os, threading
+import traceback
+import json
+import concurrent.futures
+import logging
+
+from consumer import CustomKafkaConsumer
+from producer import CustomKafkaProducer
+
+logging.basicConfig(format='%(asctime)s::%(process)d::%(levelname)s::%(message)s', level=logging.INFO, datefmt='%d-%b-%y %H:%M:%S')
+
+def main():
+    #Begin: Sample producer based on file
+    customKafkaProducer = CustomKafkaProducer.CustomKafkaProducer()
+    with open("./multithreading-metrics.json") as input_file:
+        for each_line in input_file:
+            python_obj = json.loads(each_line)
+            # print(python_obj["labels"]["__name__"])
+            customKafkaProducer.produce(each_line, python_obj["labels"]["__name__"])
+    #END: Sample producer based on file
+
+    customKafkaConsumer = CustomKafkaConsumer.CustomKafkaConsumer()
+
+    #Form a data structure for query formation
+    queries = []
+    queries.append({"metric_name" : "go_gc_duration_seconds_count", "ip": "10.42.1.93:8686"})
+    queries.append({"metric_name" : 'go_gc_duration_seconds_count', "ip": "10.42.1.92:8686"})
+
+    executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
+    executor.submit(customKafkaConsumer.consume)
+
+    while(True):
+        for each_record in queries:
+            list_of_records = customKafkaConsumer.executeQuery(each_record["metric_name"], each_record["ip"])
+            logging.info("The records collected :: {}".format(list_of_records))
+            logging.info("The length of records collected: {}".format(len(list_of_records)))
+            print("The records :: {}".format(list_of_records))
+
+
+if __name__ == '__main__':
+    main()
\ No newline at end of file