6 from Queue import Queue
9 from robot.api import logger
12 from cStringIO import StringIO
14 from StringIO import StringIO
16 CommonEventSchemaV5 = "./CommonEventFormat_28.3.json"
18 EventQueue = {"defaultTopic": Queue()}
21 def cleanUpEvent(topic="defaultTopic"):
23 EventQueue.get(topic).empty()
24 except Exception as e:
25 logger.console(str(e))
26 logger.console("DMaaP Event enqueue failed")
29 def enqueEvent(evt, topic="defaultTopic"):
30 if topic not in EventQueue.keys():
31 EventQueue.update({topic: Queue()})
34 EventQueue.get(topic).put(evt)
35 logger.console("DMaaP Event enqued - size=" + str(len(evt)))
37 except Exception as e:
38 logger.console(str(e))
39 logger.console("DMaaP Event enqueue failed")
43 def dequeEvent(topic="defaultTopic", waitSec=10):
45 evt = EventQueue.get(topic).get(True, waitSec)
46 logger.console("DMaaP Event dequeued - size=" + str(len(evt)))
48 except Exception as e:
49 logger.console(str(e))
50 logger.console("DMaaP Event dequeue failed")
54 class DMaaPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
57 self.send_response(405)
61 self.send_response(200)
68 if 'POST' not in self.requestline:
72 content_len = int(self.headers.getheader('content-length', 0))
73 post_body = self.rfile.read(content_len)
75 logger.console("DMaaP Receive Event:\n" + post_body)
77 indx = post_body.index("{")
79 post_body = post_body[indx:]
81 topic = self.getTopicName(self.path)
84 logger.console("DMaaP Topic Name: " + topic)
85 if enqueEvent(post_body, topic) == False:
86 print "enque event fails"
91 with open(CommonEventSchemaV5) as file:
92 EvtSchema = json.load(file)
93 decoded_body = json.loads(post_body)
94 jsonschema.validate(decoded_body, EvtSchema)
99 if 'clientThrottlingState' in self.requestline:
100 self.send_response(204)
102 self.send_response(200)
103 self.send_header('Content-Type', 'application/json')
105 self.wfile.write("{\"count\": 1, \"serverTimeMs\": 3}")
108 self.send_response(respCode)
113 self.send_response(200)
114 self.send_header('Content-Type', 'application/json')
116 self.wfile.write(dequeEvent(self.getTopicName(self.path)))
121 def getTopicName(self, path):
122 # abandon query parameters
123 path = path.split('?',1)[0]
124 path = path.split('#',1)[0]
126 path = posixpath.normpath(urllib.unquote(path))
127 parts = filter(None, path.split('/'))
129 if len(parts) > 1 and parts[0] == "events":
134 def _main_ (HandlerClass = DMaaPHandler,
135 ServerClass = BaseHTTPServer.HTTPServer, protocol="HTTP/1.0"):
138 port = int(sys.argv[1])
142 print "Load event schema file: " + CommonEventSchemaV5
143 with open(CommonEventSchemaV5) as file:
145 EvtSchema = json.load(file)
147 server_address = ('', port)
149 HandlerClass.protocol_version = protocol
150 httpd = ServerClass(server_address, HandlerClass)
152 sa = httpd.socket.getsockname()
153 print "Serving HTTP on", sa[0], "port", sa[1], "..."
154 httpd.serve_forever()
156 if __name__ == '__main__':