2 Created on Aug 15, 2017
15 from jsonschema import validate
19 import SimpleHTTPServer
22 from cStringIO import StringIO
24 from StringIO import StringIO
31 sz = DcaeVariables.VESEventQ.qsize()
34 self.evtQueue.get_nowait()
40 if DcaeVariables.VESEventQ is not None:
42 DcaeVariables.VESEventQ.put(evt)
44 except Exception as e:
50 def deque_event(wait_sec=25):
51 if DcaeVariables.IsRobotRun:
54 evt = DcaeVariables.VESEventQ.get(True, wait_sec)
56 except Exception as e:
57 if DcaeVariables.IsRobotRun:
61 print("DMaaP Event dequeue timeout")
65 class DMaaPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
68 self.send_response(405)
73 # Parse the form data posted
75 form = cgi.FieldStorage(
78 environ={'REQUEST_METHOD':'POST',
79 'CONTENT_TYPE':self.headers['Content-Type'],
83 form = cgi.FieldStorage(
86 environ={"REQUEST_METHOD": "POST"})
88 for item in form.list:
89 print "%s=%s" % (item.name, item.value)
93 if 'POST' not in self.requestline:
98 if '/eventlistener/v5' not in self.requestline and '/eventlistener/v5/eventBatch' not in self.requestline and \
99 '/eventlistener/v5/clientThrottlingState' not in self.requestline:
104 if 'Y29uc29sZTpaakprWWpsbE1qbGpNVEkyTTJJeg==' not in str(self.headers):
109 topic = self.extract_topic_from_path()
110 content_len = int(self.headers.getheader('content-length', 0))
111 post_body = self.rfile.read(content_len)
113 indx = post_body.index("{")
115 post_body = post_body[indx:]
117 event = "\""+topic+"\":" + post_body
118 if not enque_event(event):
119 print "enque event fails"
123 if EvtSchema is None:
124 with open(DcaeVariables.CommonEventSchema) as opened_file:
125 EvtSchema = json.load(opened_file)
126 decoded_body = json.loads(post_body)
127 jsonschema.validate(decoded_body, EvtSchema)
132 if not DcaeVariables.IsRobotRun:
133 print ("Response Message:")
138 "description" : "Success",
140 "$ref" : "#/definitions/DR_Pub"
144 rspStr = "{'responses' : {'200' : {'description' : 'Success'}}}"
145 rspStr1 = "{'count': 1, 'serverTimeMs': 3}"
150 if 'clientThrottlingState' in self.requestline:
151 self.send_response(204)
153 self.send_response(200)
154 self.send_header('Content-Type', 'application/json')
156 self.wfile.write("{'count': 1, 'serverTimeMs': 3}")
159 self.send_response(resp_code)
163 self.wfile.write('Client: %s\n' % str(self.client_address))
164 self.wfile.write('User-agent: %s\n' % str(self.headers['user-agent']))
165 self.wfile.write('Path: %s\n' % self.path)
166 self.wfile.write('Form data:\n')
169 # Echo back information about what was posted in the form
170 for field in form.keys():
171 field_item = form[field]
172 if field_item.filename:
173 # The field contains an uploaded file
174 file_data = field_item.file.read()
175 file_len = len(file_data)
177 self.wfile.write('\tUploaded %s as "%s" (%d bytes)\n' % \
178 (field, field_item.filename, file_len))
181 self.wfile.write('\t%s=%s\n' % (field, form[field].value))
185 def extract_topic_from_path(self):
186 return self.path["/events/".__len__():]
189 """Serve a GET request."""
193 self.copyfile(f, self.wfile)
198 """Serve a HEAD request."""
204 """Common code for GET and HEAD commands.
206 This sends the response code and MIME headers.
208 Return value is either a file object (which has to be copied
209 to the outputfile by the caller unless the command was HEAD,
210 and must be closed by the caller under all circumstances), or
211 None, in which case the caller has nothing further to do.
214 path = self.translate_path(self.path)
215 if os.path.isdir(path):
216 parts = urlparse.urlsplit(self.path)
217 if not parts.path.endswith('/'):
218 # redirect browser - doing basically what apache does
219 self.send_response(301)
220 new_parts = (parts[0], parts[1], parts[2] + '/',
222 new_url = urlparse.urlunsplit(new_parts)
223 self.send_header("Location", new_url)
226 for index in "index.html", "index.htm":
227 index = os.path.join(path, index)
228 if os.path.exists(index):
232 return self.list_directory(path)
233 ctype = self.guess_type(path)
235 # Always read in binary mode. Opening files in text mode may cause
236 # newline translations, making the actual size of the content
237 # transmitted *less* than the content-length!
240 self.send_error(404, "File not found")
243 self.send_response(200)
244 self.send_header("Content-type", ctype)
245 fs = os.fstat(f.fileno())
246 self.send_header("Content-Length", str(fs[6]))
247 self.send_header("Last-Modified", self.date_time_string(fs.st_mtime))
254 def list_directory(self, path):
255 """Helper to produce a directory listing (absent index.html).
257 Return value is either a file object, or None (indicating an
258 error). In either case, the headers are sent, making the
259 interface the same as for send_head().
263 list_dir = os.listdir(path)
265 self.send_error(404, "No permission to list directory")
267 list_dir.sort(key=lambda a: a.lower())
269 displaypath = cgi.escape(urllib.unquote(self.path))
270 f.write('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">')
271 f.write("<html>\n<title>Directory listing for %s</title>\n" % displaypath)
272 f.write("<body>\n<h2>Directory listing for %s</h2>\n" % displaypath)
273 f.write("<hr>\n<ul>\n")
274 for name in list_dir:
275 fullname = os.path.join(path, name)
276 displayname = linkname = name
277 # Append / for directories or @ for symbolic links
278 if os.path.isdir(fullname):
279 displayname = name + "/"
280 linkname = name + "/"
281 if os.path.islink(fullname):
282 displayname = name + "@"
283 # Note: a link to a directory displays with @ and links with /
284 f.write('<li><a href="%s">%s</a>\n'
285 % (urllib.quote(linkname), cgi.escape(displayname)))
286 f.write("</ul>\n<hr>\n</body>\n</html>\n")
289 self.send_response(200)
290 encoding = sys.getfilesystemencoding()
291 self.send_header("Content-type", "text/html; charset=%s" % encoding)
292 self.send_header("Content-Length", str(length))
297 def translate_path(path):
298 """Translate a /-separated PATH to the local filename syntax.
300 Components that mean special things to the local file system
301 (e.g. drive or directory names) are ignored. (XXX They should
302 probably be diagnosed.)
305 # abandon query parameters
306 path = path.split('?', 1)[0]
307 path = path.split('#', 1)[0]
308 # Don't forget explicit trailing slash when normalizing. Issue17324
309 trailing_slash = path.rstrip().endswith('/')
310 path = posixpath.normpath(urllib.unquote(path))
311 words = path.split('/')
312 words = filter(None, words)
315 if os.path.dirname(word) or word in (os.curdir, os.pardir):
316 # Ignore components that are not a simple file/directory name
318 path = os.path.join(path, word)
324 def copyfile(source, outputfile):
325 """Copy all data between two file objects.
327 The SOURCE argument is a file object open for reading
328 (or anything with a read() method) and the DESTINATION
329 argument is a file object open for writing (or
330 anything with a write() method).
332 The only reason for overriding this would be to change
333 the block size or perhaps to replace newlines by CRLF
334 -- note however that this the default server uses this
335 to copy binary data as well.
338 shutil.copyfileobj(source, outputfile)
340 def guess_type(self, path):
341 """Guess the type of a file.
343 Argument is a PATH (a filename).
345 Return value is a string of the form type/subtype,
346 usable for a MIME Content-type header.
348 The default implementation looks the file's extension
349 up in the table self.extensions_map, using application/octet-stream
350 as a default; however it would be permissible (if
351 slow) to look inside the data to make a better guess.
355 base, ext = posixpath.splitext(path)
356 if ext in self.extensions_map:
357 return self.extensions_map[ext]
359 if ext in self.extensions_map:
360 return self.extensions_map[ext]
362 return self.extensions_map['']
364 if not mimetypes.inited:
365 mimetypes.init() # try to read system mime.types
366 extensions_map = mimetypes.types_map.copy()
367 extensions_map.update({
368 '': 'application/octet-stream', # Default
375 def test(handler_class=DMaaPHandler, server_class=BaseHTTPServer.HTTPServer, protocol="HTTP/1.0", port=3904):
376 print "Load event schema file: " + DcaeVariables.CommonEventSchema
377 with open(DcaeVariables.CommonEventSchema) as opened_file:
379 EvtSchema = json.load(opened_file)
381 server_address = ('', port)
383 handler_class.protocol_version = protocol
384 httpd = server_class(server_address, handler_class)
388 DcaeVariables.HTTPD = httpd
390 sa = httpd.socket.getsockname()
391 print "Serving HTTP on", sa[0], "port", sa[1], "..."
392 # httpd.serve_forever()
395 def _main_(handler_class=DMaaPHandler, server_class=BaseHTTPServer.HTTPServer, protocol="HTTP/1.0"):
398 port = int(sys.argv[1])
402 print "Load event schema file: " + DcaeVariables.CommonEventSchema
403 with open(DcaeVariables.CommonEventSchema) as opened_file:
405 EvtSchema = json.load(opened_file)
407 server_address = ('', port)
409 handler_class.protocol_version = protocol
410 httpd = server_class(server_address, handler_class)
412 sa = httpd.socket.getsockname()
413 print "Serving HTTP on", sa[0], "port", sa[1], "..."
414 httpd.serve_forever()
417 if __name__ == '__main__':