Merge "Update manifest"
[integration.git] / test / csit / tests / dcaegen2 / prh_testcases / resources / DMaaP.py
1 import BaseHTTPServer
2 import json
3 import posixpath
4 import sys
5 import urllib
6 from Queue import Queue
7
8 import jsonschema
9 from robot.api import logger
10
11 try:
12     from cStringIO import StringIO
13 except ImportError:
14     from StringIO import StringIO
15
16 CommonEventSchemaV5 = "./CommonEventFormat_28.3.json"
17 EvtSchema = None
18 EventQueue = {"defaultTopic": Queue()}
19
20
21 def cleanUpEvent(topic="defaultTopic"):
22     try:
23         EventQueue.get(topic).empty()
24     except Exception as e:
25         logger.console(str(e))
26         logger.console("DMaaP Event enqueue failed")
27
28
29 def enqueEvent(evt, topic="defaultTopic"):
30     if topic not in EventQueue.keys():
31         EventQueue.update({topic: Queue()})
32
33     try:
34         EventQueue.get(topic).put(evt)
35         logger.console("DMaaP Event enqued - size=" + str(len(evt)))
36         return True
37     except Exception as e:
38         logger.console(str(e))
39         logger.console("DMaaP Event enqueue failed")
40         return False
41
42
43 def dequeEvent(topic="defaultTopic", waitSec=10):
44     try:
45         evt = EventQueue.get(topic).get(True, waitSec)
46         logger.console("DMaaP Event dequeued - size=" + str(len(evt)))
47         return evt
48     except Exception as e:
49         logger.console(str(e))
50         logger.console("DMaaP Event dequeue failed")
51         return None
52
53
54 class DMaaPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
55
56     def do_PUT(self):
57         self.send_response(405)
58         return
59
60     def do_PATCH(self):
61         self.send_response(200)
62         return
63         
64     def do_POST(self):
65         
66         respCode = 0
67
68         if 'POST' not in self.requestline:
69             respCode = 405
70         
71         if respCode == 0:
72             content_len = int(self.headers.getheader('content-length', 0))
73             post_body = self.rfile.read(content_len)
74
75             logger.console("DMaaP Receive Event:\n" + post_body)
76             
77             indx = post_body.index("{")
78             if indx != 0:
79                 post_body = post_body[indx:]
80
81             topic = self.getTopicName(self.path)
82
83             if topic is not None:
84                 logger.console("DMaaP Topic Name: " + topic)
85                 if enqueEvent(post_body, topic) == False:
86                     print "enque event fails"
87                    
88             global EvtSchema
89             try:
90                 if EvtSchema is None:
91                     with open(CommonEventSchemaV5) as file:
92                         EvtSchema = json.load(file)
93                 decoded_body = json.loads(post_body)
94                 jsonschema.validate(decoded_body, EvtSchema)
95             except:
96                 respCode = 400
97         
98         if respCode == 0:
99             if 'clientThrottlingState' in self.requestline:
100                 self.send_response(204)
101             else:
102                 self.send_response(200)
103                 self.send_header('Content-Type', 'application/json')
104                 self.end_headers()
105                 self.wfile.write("{\"count\": 1, \"serverTimeMs\": 3}")
106                 self.wfile.close()
107         else:
108             self.send_response(respCode)
109
110         return
111
112     def do_GET(self):
113         self.send_response(200)
114         self.send_header('Content-Type', 'application/json')
115         self.end_headers()
116         self.wfile.write(dequeEvent(self.getTopicName(self.path)))
117         self.wfile.close()
118
119         return
120
121     def getTopicName(self, path):
122         # abandon query parameters
123         path = path.split('?',1)[0]
124         path = path.split('#',1)[0]
125
126         path = posixpath.normpath(urllib.unquote(path))
127         parts = filter(None, path.split('/'))
128
129         if len(parts) > 1 and parts[0] == "events":
130             return str(parts[1])
131         else:
132             return None
133
134 def _main_ (HandlerClass = DMaaPHandler,
135          ServerClass = BaseHTTPServer.HTTPServer, protocol="HTTP/1.0"):
136     
137     if sys.argv[1:]:
138         port = int(sys.argv[1])
139     else:
140         port = 2222
141     
142     print "Load event schema file: " + CommonEventSchemaV5
143     with open(CommonEventSchemaV5) as file:
144         global EvtSchema
145         EvtSchema = json.load(file)
146         
147     server_address = ('', port)
148
149     HandlerClass.protocol_version = protocol
150     httpd = ServerClass(server_address, HandlerClass)
151
152     sa = httpd.socket.getsockname()
153     print "Serving HTTP on", sa[0], "port", sa[1], "..."
154     httpd.serve_forever()
155     
156 if __name__ == '__main__':
157     _main_()