Fix whitespace issues in Python files
[demo.git] / vnfs / DAaaS / microservices / PythonApps / python-kafkaConsumer-inference-app / src / consumer / CustomKafkaConsumer.py
1 import logging
2 from confluent_kafka import Consumer
3 import json
4
5 logging.basicConfig(format='%(asctime)s::%(process)d::%(levelname)s::%(message)s', level=logging.INFO, datefmt='%d-%b-%y %H:%M:%S')
6
7
8 class CustomKafkaConsumer:
9     def __init__(self):
10         self.output_map = dict()
11         self.topic_name = "metrics3"
12         #self.topic_name = "adatopic1"
13         self.consumer = Consumer({
14             'bootstrap.servers': 'kafka-cluster-kafka-bootstrap:9092',
15             #'bootstrap.servers': '172.25.103.6:31610',
16             'group.id': 'grp1',
17             'auto.offset.reset': 'earliest'
18         })
19         self.duration = 31536000 #50
20         self.time_format = 'timestamp' #or 'iso'
21         # duration may be equal to no_of_recs_wanted, say we gurantee 50 secs generate 50 recs
22         self.no_of_recs_wanted = 3
23
24
25     def processMessage(self, msg_key, msg_val):
26         python_obj = {}
27         try:
28             python_obj = json.loads(msg_key)
29         except ValueError:
30             pass
31         try:
32             python_obj = json.loads(msg_val)
33         except ValueError:
34             pass
35         #print(python_obj["labels"]["__name__"])
36         metric_name = python_obj["labels"]["__name__"]
37         ip = python_obj["labels"]["instance"]
38         if self.time_format == 'iso':
39             logging.info("Time_format is ISO-FORMAT")
40             iso_time = python_obj["timestamp"]
41             logging.info("iso_time:: {}".format(iso_time))
42             import dateutil.parser as dp
43             parsed_datetime_obj = dp.parse(iso_time)
44             from datetime import datetime
45             now_datetime_obj = datetime.now()
46             st_datetime_obj = now_datetime_obj - datetime.timedelta(seconds= self.duration)
47             en_datetime_obj = now_datetime_obj
48             if st_datetime_obj <= parsed_datetime_obj and parsed_datetime_obj <= en_datetime_obj:
49                 logging.info("Parsed a relevant record")
50                 if metric_name in self.output_map:
51                     if ip in self.output_map[metric_name]:
52                         self.output_map[metric_name][ip].append(python_obj)
53                         logging.info("::Appended a record to existing time series data::")
54                     else:
55                         self.output_map[metric_name][ip] = list()
56                         self.output_map[metric_name][ip].append(python_obj)
57                         logging.info("::Appended a recorded to existing time series data with a new ip::")
58                 else:
59                     self.output_map[metric_name] = dict()
60                     self.output_map[metric_name][ip] = list()
61                     self.output_map[metric_name][ip].append(python_obj)
62                     logging.info("::Inserted the first record to a new time series::")
63         else:
64             logging.info("Time_format is timestamp")
65             parsed_timestamp = python_obj["timestamp"]
66             logging.info("parsed_timestamp:: {}".format(parsed_timestamp))
67             from datetime import datetime, timedelta
68             now_datetime_obj = datetime.now()
69             st_datetime_obj = now_datetime_obj - timedelta(seconds=self.duration)
70             en_datetime_obj = now_datetime_obj
71             st_timestamp = int(st_datetime_obj.timestamp()*1000)
72             en_timestamp = int(en_datetime_obj.timestamp()*1000)
73
74             logging.info("st_timestamp:: {}".format(st_timestamp))
75             logging.info("en_timestamp:: {}".format(en_timestamp))
76             if st_timestamp <= parsed_timestamp and en_timestamp>=parsed_timestamp:
77                 if metric_name in self.output_map:
78                     if ip in self.output_map[metric_name]:
79                         self.output_map[metric_name][ip].append(python_obj)
80                         logging.info("::Appended a record to existing time series data::")
81                     else:
82                         self.output_map[metric_name][ip] = list()
83                         self.output_map[metric_name][ip].append(python_obj)
84                         logging.info("::Appended a recorded to existing time series data with a new ip::")
85                 else:
86                     self.output_map[metric_name] = dict()
87                     self.output_map[metric_name][ip] = list()
88                     self.output_map[metric_name][ip].append(python_obj)
89                     logging.info("::Inserted the first record to a new time series::")
90
91         logging.info("The size of the o/p map :: {}".format(len(self.output_map[metric_name][ip])))
92         if len(self.output_map[metric_name][ip]) == self.no_of_recs_wanted:
93             logging.info("Size of the q {}-{} exceeded ".format(metric_name, ip))
94             logging.info("Poping out the record: {}".format(self.output_map[metric_name][ip].pop(0)))
95
96
97     def executeQuery(self, metric_name, ip):
98         if metric_name in self.output_map:
99             if ip in self.output_map[metric_name]:
100                 return self.output_map[metric_name][ip]
101
102
103     def consume(self):
104         self.consumer.subscribe([self.topic_name])
105         while True:
106             msg = self.consumer.poll(1.0)
107             if msg is None:
108                 logging.info('Looking for message on topic:: {}'.format(self.topic_name))
109                 continue
110             if msg.error():
111                 print("Consumer error: {}".format(msg.error()))
112                 continue
113             # print("msg type:: {} and msg:: {}".format(type(msg), msg))
114             # print('Received message key from producer: {}'.format(msg.key().decode('utf-8')))
115             # print('Received message val from producer: {}'.format(msg.value().decode('utf-8')))
116             # print("mes-key-type:: {}".format(type(msg.key().decode('utf-8'))))
117             # print("msg-value-type:: {}".format(type(msg.value().decode('utf-8'))))
118
119             self.processMessage(msg.key(), msg.value())
120         self.consumer.close()