First working draft of kafka for inference app 36/97836/4
authorRajamohan Raj <rajamohan.raj@intel.com>
Fri, 1 Nov 2019 00:30:30 +0000 (00:30 +0000)
committerMarco Platania <platania@research.att.com>
Thu, 14 Nov 2019 14:10:09 +0000 (14:10 +0000)
Created a python based inference app which can query a given metrics for
a given duration from kafka topic.
Consumer runs on separate thread and doesnt interfere with the main app.

Issue-ID: ONAPARC-528
Signed-off-by: Rajamohan Raj <rajamohan.raj@intel.com>
Change-Id: Ic84ea137b134385246bf11dee2ed6d34b593b956

12 files changed:
vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/README.md [new file with mode: 0644]
vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/kubernetes-manifests/kafka.inference.deploy.yaml [new file with mode: 0644]
vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/skaffold.yaml [new file with mode: 0644]
vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/Dockerfile [new file with mode: 0644]
vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/__init__.py [new file with mode: 0644]
vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/consumer/CustomKafkaConsumer.py [new file with mode: 0644]
vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/consumer/__init__.py [new file with mode: 0644]
vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/main.py [new file with mode: 0755]
vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py [new file with mode: 0644]
vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/__init__.py [new file with mode: 0644]
vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/requirements.txt [new file with mode: 0644]
vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/utils/utils.py [new file with mode: 0644]

diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/README.md b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/README.md
new file mode 100644 (file)
index 0000000..e1e30f3
--- /dev/null
@@ -0,0 +1 @@
+### Kakfka based Inference APP
\ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/kubernetes-manifests/kafka.inference.deploy.yaml b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/kubernetes-manifests/kafka.inference.deploy.yaml
new file mode 100644 (file)
index 0000000..bbd3d55
--- /dev/null
@@ -0,0 +1,31 @@
+apiVersion: extensions/v1beta1
+kind: Deployment
+metadata:
+  name: python-kafkaconsumer-inference-app
+  labels:
+    app: python-kafkaconsumer-inference-app
+    tier: app
+spec:
+  replicas: 1
+  selector:
+    matchLabels:
+      app: python-kafkaconsumer-inference-app
+      tier: app
+  template:
+    metadata:
+      labels:
+        app: python-kafkaconsumer-inference-app
+        tier: app
+    spec:
+      containers:
+      - name: python-kafkaconsumer-inference-app
+        image: python-kafkaconsumer-inference-app
+        ports:
+        - containerPort: 8080
+        resources:
+          requests:
+            memory: "640Mi"
+            cpu: "2500m"
+          limits:
+            memory: "1280Mi"
+            cpu: "5000m"
diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/skaffold.yaml b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/skaffold.yaml
new file mode 100644 (file)
index 0000000..e8891c6
--- /dev/null
@@ -0,0 +1,19 @@
+apiVersion: skaffold/v1beta11
+kind: Config
+build:
+  tagPolicy:
+    sha256: {}
+  # defines where to find the code at build time and where to push the resulting image
+  artifacts:
+    - context: src
+      image: python-kafkaconsumer-inference-app
+# defines the Kubernetes manifests to deploy on each run
+deploy:
+  kubectl:
+    manifests:
+      - kubernetes-manifests/**.yaml
+# use the cloudbuild profile to build images using Google Cloud Build
+profiles:
+- name: cloudbuild
+  build:
+    googleCloudBuild: {}
diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/Dockerfile b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/Dockerfile
new file mode 100644 (file)
index 0000000..8c5d822
--- /dev/null
@@ -0,0 +1,35 @@
+# Python image to use.
+FROM python:3.8
+
+# Set the working directory to /src/hdfs-writer
+WORKDIR /src/inferenceApp
+
+# Install librdkafka
+RUN mkdir /librdkafka-dir && cd /librdkafka-dir
+RUN git clone https://github.com/edenhill/librdkafka.git && \
+cd librdkafka && \
+./configure --prefix /usr && \
+make && \
+make install
+
+#RUN export PYTHONPATH="/usr/bin/python3:/src/python-kafkaconsumer-inference-app/"
+
+# copy the requirements file used for dependencies
+COPY requirements.txt .
+
+# Install any needed packages specified in requirements.txt
+RUN pip install --trusted-host pypi.python.org -r requirements.txt
+
+RUN pip install confluent-kafka
+RUN pip install python-dateutil
+
+# Install ptvsd for debugging
+RUN pip install ptvsd
+
+
+
+# Copy the rest of the working directory contents into the container at /app
+COPY . ./
+
+# Start the server when the container launches
+CMD ["python3", "-m", "ptvsd", "--host", "localhost", "--port", "5000", "--wait", "/src/inferenceApp/main.py"]
\ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/__init__.py b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/consumer/CustomKafkaConsumer.py b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/consumer/CustomKafkaConsumer.py
new file mode 100644 (file)
index 0000000..1e311bf
--- /dev/null
@@ -0,0 +1,120 @@
+import logging
+from confluent_kafka import Consumer
+import json
+
+logging.basicConfig(format='%(asctime)s::%(process)d::%(levelname)s::%(message)s', level=logging.INFO, datefmt='%d-%b-%y %H:%M:%S')
+
+
+class CustomKafkaConsumer:
+    def __init__(self):
+        self.output_map = dict()
+        self.topic_name = "metrics3"
+        #self.topic_name = "adatopic1"
+        self.consumer = Consumer({
+            'bootstrap.servers': 'kafka-cluster-kafka-bootstrap:9092',
+            #'bootstrap.servers': '172.25.103.6:31610',
+            'group.id': 'grp1',
+            'auto.offset.reset': 'earliest'
+        })
+        self.duration = 31536000 #50
+        self.time_format = 'timestamp' #or 'iso'
+        # duration may be equal to no_of_recs_wanted, say we gurantee 50 secs generate 50 recs
+        self.no_of_recs_wanted = 3
+
+
+    def processMessage(self, msg_key, msg_val):
+        python_obj = {}
+        try:
+            python_obj = json.loads(msg_key)
+        except ValueError:
+            pass
+        try:
+            python_obj = json.loads(msg_val)
+        except ValueError:
+            pass
+        #print(python_obj["labels"]["__name__"])
+        metric_name = python_obj["labels"]["__name__"]
+        ip = python_obj["labels"]["instance"]
+        if self.time_format == 'iso':
+            logging.info("Time_format is ISO-FORMAT")
+            iso_time = python_obj["timestamp"]
+            logging.info("iso_time:: {}".format(iso_time))
+            import dateutil.parser as dp
+            parsed_datetime_obj = dp.parse(iso_time)
+            from datetime import datetime
+            now_datetime_obj = datetime.now()
+            st_datetime_obj = now_datetime_obj - datetime.timedelta(seconds= self.duration)
+            en_datetime_obj = now_datetime_obj
+            if st_datetime_obj <= parsed_datetime_obj and parsed_datetime_obj <= en_datetime_obj:
+                logging.info("Parsed a relevant record")
+                if metric_name in self.output_map:
+                    if ip in self.output_map[metric_name]:
+                        self.output_map[metric_name][ip].append(python_obj)
+                        logging.info("::Appended a record to existing time series data::")
+                    else:
+                        self.output_map[metric_name][ip] = list()
+                        self.output_map[metric_name][ip].append(python_obj)
+                        logging.info("::Appended a recorded to existing time series data with a new ip::")
+                else:
+                    self.output_map[metric_name] = dict()
+                    self.output_map[metric_name][ip] = list()
+                    self.output_map[metric_name][ip].append(python_obj)
+                    logging.info("::Inserted the first record to a new time series::")
+        else:
+            logging.info("Time_format is timestamp")
+            parsed_timestamp = python_obj["timestamp"]
+            logging.info("parsed_timestamp:: {}".format(parsed_timestamp))
+            from datetime import datetime, timedelta
+            now_datetime_obj = datetime.now()
+            st_datetime_obj = now_datetime_obj - timedelta(seconds=self.duration)
+            en_datetime_obj = now_datetime_obj
+            st_timestamp = int(st_datetime_obj.timestamp()*1000)
+            en_timestamp = int(en_datetime_obj.timestamp()*1000)
+
+            logging.info("st_timestamp:: {}".format(st_timestamp))
+            logging.info("en_timestamp:: {}".format(en_timestamp))
+            if st_timestamp <= parsed_timestamp and en_timestamp>=parsed_timestamp:
+                if metric_name in self.output_map:
+                    if ip in self.output_map[metric_name]:
+                        self.output_map[metric_name][ip].append(python_obj)
+                        logging.info("::Appended a record to existing time series data::")
+                    else:
+                        self.output_map[metric_name][ip] = list()
+                        self.output_map[metric_name][ip].append(python_obj)
+                        logging.info("::Appended a recorded to existing time series data with a new ip::")
+                else:
+                    self.output_map[metric_name] = dict()
+                    self.output_map[metric_name][ip] = list()
+                    self.output_map[metric_name][ip].append(python_obj)
+                    logging.info("::Inserted the first record to a new time series::")
+
+        logging.info("The size of the o/p map :: {}".format(len(self.output_map[metric_name][ip])))
+        if len(self.output_map[metric_name][ip]) == self.no_of_recs_wanted:
+            logging.info("Size of the q {}-{} exceeded ".format(metric_name, ip))
+            logging.info("Poping out the record: {}".format(self.output_map[metric_name][ip].pop(0)))
+
+
+    def executeQuery(self, metric_name, ip):
+        if metric_name in self.output_map:
+            if ip in self.output_map[metric_name]:
+                return self.output_map[metric_name][ip]
+
+
+    def consume(self):
+        self.consumer.subscribe([self.topic_name])
+        while True:
+            msg = self.consumer.poll(1.0)
+            if msg is None:
+                logging.info('Looking for message on topic:: {}'.format(self.topic_name))
+                continue
+            if msg.error():
+                print("Consumer error: {}".format(msg.error()))
+                continue
+            # print("msg type:: {} and msg:: {}".format(type(msg), msg))
+            # print('Received message key from producer: {}'.format(msg.key().decode('utf-8')))
+            # print('Received message val from producer: {}'.format(msg.value().decode('utf-8')))
+            # print("mes-key-type:: {}".format(type(msg.key().decode('utf-8'))))
+            # print("msg-value-type:: {}".format(type(msg.value().decode('utf-8'))))
+
+            self.processMessage(msg.key(), msg.value())
+        self.consumer.close()
\ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/consumer/__init__.py b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/consumer/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/main.py b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/main.py
new file mode 100755 (executable)
index 0000000..bf62f50
--- /dev/null
@@ -0,0 +1,47 @@
+#!/usr/bin/env python3
+
+# from .consumer.CustomKafkaConsumer import CustomKafkaConsumer
+# from .producer.CustomKafkaProducer import CustomKafkaProducer
+
+import sys
+import os, threading
+import traceback
+import json
+import concurrent.futures
+import logging
+
+from consumer import CustomKafkaConsumer
+from producer import CustomKafkaProducer
+
+logging.basicConfig(format='%(asctime)s::%(process)d::%(levelname)s::%(message)s', level=logging.INFO, datefmt='%d-%b-%y %H:%M:%S')
+
+def main():
+    #Begin: Sample producer based on file
+    customKafkaProducer = CustomKafkaProducer.CustomKafkaProducer()
+    with open("./multithreading-metrics.json") as input_file:
+        for each_line in input_file:
+            python_obj = json.loads(each_line)
+            # print(python_obj["labels"]["__name__"])
+            customKafkaProducer.produce(each_line, python_obj["labels"]["__name__"])
+    #END: Sample producer based on file
+
+    customKafkaConsumer = CustomKafkaConsumer.CustomKafkaConsumer()
+
+    #Form a data structure for query formation
+    queries = []
+    queries.append({"metric_name" : "go_gc_duration_seconds_count", "ip": "10.42.1.93:8686"})
+    queries.append({"metric_name" : 'go_gc_duration_seconds_count', "ip": "10.42.1.92:8686"})
+
+    executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
+    executor.submit(customKafkaConsumer.consume)
+
+    while(True):
+        for each_record in queries:
+            list_of_records = customKafkaConsumer.executeQuery(each_record["metric_name"], each_record["ip"])
+            logging.info("The records collected :: {}".format(list_of_records))
+            logging.info("The length of records collected: {}".format(len(list_of_records)))
+            print("The records :: {}".format(list_of_records))
+
+
+if __name__ == '__main__':
+    main()
\ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py
new file mode 100644 (file)
index 0000000..8f726bd
--- /dev/null
@@ -0,0 +1,38 @@
+import logging
+from confluent_kafka import Producer
+import traceback
+
+logging.basicConfig(format='%(asctime)s::%(process)d::%(levelname)s::%(message)s', level=logging.INFO, datefmt='%d-%b-%y %H:%M:%S')
+
+
+class CustomKafkaProducer:
+    def __init__(self):
+        self.topic_name = "metrics3"
+        #self.topic_name = "adatopic1"
+        conf = {'bootstrap.servers': 'kafka-cluster-kafka-bootstrap:9092'
+                }
+        self.producer = Producer(**conf)
+
+
+    def produce(self, kafka_msg, kafka_key):
+        try:
+            self.producer.produce(topic=self.topic_name,
+                              value=kafka_msg,
+                              key=kafka_key,
+                              callback=lambda err, msg: self.on_delivery(err, msg)
+            )
+            self.producer.flush()
+
+        except Exception as e:
+            #print("Error during producing to kafka topic. Stacktrace is %s",e)
+            logging.error("Error during producing to kafka topic.")
+            traceback.print_exc()
+
+
+    def on_delivery(self, err, msg):
+        if err:
+            print("Message failed delivery, error: %s", err)
+            logging.error('%s raised an error', err)
+        else:
+            logging.info("Message delivered to %s on partition %s",
+                        msg.topic(), msg.partition())
\ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/__init__.py b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/requirements.txt b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/requirements.txt
new file mode 100644 (file)
index 0000000..78cdc97
--- /dev/null
@@ -0,0 +1,2 @@
+confluent-kafka
+python-dateutil
\ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/utils/utils.py b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/utils/utils.py
new file mode 100644 (file)
index 0000000..4ed3b47
--- /dev/null
@@ -0,0 +1,8 @@
+class utils:
+
+    def __init__(self):
+        pass
+
+    def readFile(self, fileName):
+        pass
+