2 Created on Aug 15, 2017
15 from jsonschema import validate
19 import SimpleHTTPServer
20 from robot.api import logger
24 from cStringIO import StringIO
26 from StringIO import StringIO
33 sz = DcaeVariables.VESEventQ.qsize()
36 self.evtQueue.get_nowait()
42 if DcaeVariables.VESEventQ is not None:
44 DcaeVariables.VESEventQ.put(evt)
45 if DcaeVariables.IsRobotRun:
46 logger.console("DMaaP Event enqued - size=" + str(len(evt)))
48 print ("DMaaP Event enqueued - size=" + str(len(evt)))
50 except Exception as e:
56 def deque_event(wait_sec=25):
57 if DcaeVariables.IsRobotRun:
58 logger.console("Enter DequeEvent")
60 evt = DcaeVariables.VESEventQ.get(True, wait_sec)
61 if DcaeVariables.IsRobotRun:
62 logger.console("DMaaP Event dequeued - size=" + str(len(evt)))
64 print("DMaaP Event dequeued - size=" + str(len(evt)))
66 except Exception as e:
67 if DcaeVariables.IsRobotRun:
68 logger.console(str(e))
69 logger.console("DMaaP Event dequeue timeout")
71 print("DMaaP Event dequeue timeout")
75 class DMaaPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
78 self.send_response(405)
84 # Parse the form data posted
86 form = cgi.FieldStorage(
89 environ={'REQUEST_METHOD':'POST',
90 'CONTENT_TYPE':self.headers['Content-Type'],
94 form = cgi.FieldStorage(
97 environ={"REQUEST_METHOD": "POST"})
99 for item in form.list:
100 print "%s=%s" % (item.name, item.value)
104 if 'POST' not in self.requestline:
109 if '/eventlistener/v5' not in self.requestline and '/eventlistener/v5/eventBatch' not in self.requestline and \
110 '/eventlistener/v5/clientThrottlingState' not in self.requestline:
115 if 'Y29uc29sZTpaakprWWpsbE1qbGpNVEkyTTJJeg==' not in str(self.headers):
120 content_len = int(self.headers.getheader('content-length', 0))
121 post_body = self.rfile.read(content_len)
123 if DcaeVariables.IsRobotRun:
124 logger.console("\n" + "DMaaP Receive Event:\n" + post_body)
126 print("\n" + "DMaaP Receive Event:")
129 indx = post_body.index("{")
131 post_body = post_body[indx:]
133 if not enque_event(post_body):
134 print "enque event fails"
138 if EvtSchema is None:
139 with open(DcaeVariables.CommonEventSchemaV5) as opened_file:
140 EvtSchema = json.load(opened_file)
141 decoded_body = json.loads(post_body)
142 jsonschema.validate(decoded_body, EvtSchema)
147 if not DcaeVariables.IsRobotRun:
148 print ("Response Message:")
153 "description" : "Success",
155 "$ref" : "#/definitions/DR_Pub"
159 rspStr = "{'responses' : {'200' : {'description' : 'Success'}}}"
160 rspStr1 = "{'count': 1, 'serverTimeMs': 3}"
165 if 'clientThrottlingState' in self.requestline:
166 self.send_response(204)
168 self.send_response(200)
169 self.send_header('Content-Type', 'application/json')
171 # self.wfile.write("{'responses' : {'200' : {'description' : 'Success'}}}")
172 self.wfile.write("{'count': 1, 'serverTimeMs': 3}")
175 self.send_response(resp_code)
179 self.wfile.write('Client: %s\n' % str(self.client_address))
180 self.wfile.write('User-agent: %s\n' % str(self.headers['user-agent']))
181 self.wfile.write('Path: %s\n' % self.path)
182 self.wfile.write('Form data:\n')
185 # Echo back information about what was posted in the form
186 for field in form.keys():
187 field_item = form[field]
188 if field_item.filename:
189 # The field contains an uploaded file
190 file_data = field_item.file.read()
191 file_len = len(file_data)
193 self.wfile.write('\tUploaded %s as "%s" (%d bytes)\n' % \
194 (field, field_item.filename, file_len))
197 self.wfile.write('\t%s=%s\n' % (field, form[field].value))
202 """Serve a GET request."""
206 self.copyfile(f, self.wfile)
211 """Serve a HEAD request."""
217 """Common code for GET and HEAD commands.
219 This sends the response code and MIME headers.
221 Return value is either a file object (which has to be copied
222 to the outputfile by the caller unless the command was HEAD,
223 and must be closed by the caller under all circumstances), or
224 None, in which case the caller has nothing further to do.
227 path = self.translate_path(self.path)
228 if os.path.isdir(path):
229 parts = urlparse.urlsplit(self.path)
230 if not parts.path.endswith('/'):
231 # redirect browser - doing basically what apache does
232 self.send_response(301)
233 new_parts = (parts[0], parts[1], parts[2] + '/',
235 new_url = urlparse.urlunsplit(new_parts)
236 self.send_header("Location", new_url)
239 for index in "index.html", "index.htm":
240 index = os.path.join(path, index)
241 if os.path.exists(index):
245 return self.list_directory(path)
246 ctype = self.guess_type(path)
248 # Always read in binary mode. Opening files in text mode may cause
249 # newline translations, making the actual size of the content
250 # transmitted *less* than the content-length!
253 self.send_error(404, "File not found")
256 self.send_response(200)
257 self.send_header("Content-type", ctype)
258 fs = os.fstat(f.fileno())
259 self.send_header("Content-Length", str(fs[6]))
260 self.send_header("Last-Modified", self.date_time_string(fs.st_mtime))
267 def list_directory(self, path):
268 """Helper to produce a directory listing (absent index.html).
270 Return value is either a file object, or None (indicating an
271 error). In either case, the headers are sent, making the
272 interface the same as for send_head().
276 list_dir = os.listdir(path)
278 self.send_error(404, "No permission to list directory")
280 list_dir.sort(key=lambda a: a.lower())
282 displaypath = cgi.escape(urllib.unquote(self.path))
283 f.write('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">')
284 f.write("<html>\n<title>Directory listing for %s</title>\n" % displaypath)
285 f.write("<body>\n<h2>Directory listing for %s</h2>\n" % displaypath)
286 f.write("<hr>\n<ul>\n")
287 for name in list_dir:
288 fullname = os.path.join(path, name)
289 displayname = linkname = name
290 # Append / for directories or @ for symbolic links
291 if os.path.isdir(fullname):
292 displayname = name + "/"
293 linkname = name + "/"
294 if os.path.islink(fullname):
295 displayname = name + "@"
296 # Note: a link to a directory displays with @ and links with /
297 f.write('<li><a href="%s">%s</a>\n'
298 % (urllib.quote(linkname), cgi.escape(displayname)))
299 f.write("</ul>\n<hr>\n</body>\n</html>\n")
302 self.send_response(200)
303 encoding = sys.getfilesystemencoding()
304 self.send_header("Content-type", "text/html; charset=%s" % encoding)
305 self.send_header("Content-Length", str(length))
310 def translate_path(path):
311 """Translate a /-separated PATH to the local filename syntax.
313 Components that mean special things to the local file system
314 (e.g. drive or directory names) are ignored. (XXX They should
315 probably be diagnosed.)
318 # abandon query parameters
319 path = path.split('?', 1)[0]
320 path = path.split('#', 1)[0]
321 # Don't forget explicit trailing slash when normalizing. Issue17324
322 trailing_slash = path.rstrip().endswith('/')
323 path = posixpath.normpath(urllib.unquote(path))
324 words = path.split('/')
325 words = filter(None, words)
328 if os.path.dirname(word) or word in (os.curdir, os.pardir):
329 # Ignore components that are not a simple file/directory name
331 path = os.path.join(path, word)
337 def copyfile(source, outputfile):
338 """Copy all data between two file objects.
340 The SOURCE argument is a file object open for reading
341 (or anything with a read() method) and the DESTINATION
342 argument is a file object open for writing (or
343 anything with a write() method).
345 The only reason for overriding this would be to change
346 the block size or perhaps to replace newlines by CRLF
347 -- note however that this the default server uses this
348 to copy binary data as well.
351 shutil.copyfileobj(source, outputfile)
353 def guess_type(self, path):
354 """Guess the type of a file.
356 Argument is a PATH (a filename).
358 Return value is a string of the form type/subtype,
359 usable for a MIME Content-type header.
361 The default implementation looks the file's extension
362 up in the table self.extensions_map, using application/octet-stream
363 as a default; however it would be permissible (if
364 slow) to look inside the data to make a better guess.
368 base, ext = posixpath.splitext(path)
369 if ext in self.extensions_map:
370 return self.extensions_map[ext]
372 if ext in self.extensions_map:
373 return self.extensions_map[ext]
375 return self.extensions_map['']
377 if not mimetypes.inited:
378 mimetypes.init() # try to read system mime.types
379 extensions_map = mimetypes.types_map.copy()
380 extensions_map.update({
381 '': 'application/octet-stream', # Default
388 def test(handler_class=DMaaPHandler, server_class=BaseHTTPServer.HTTPServer, protocol="HTTP/1.0", port=3904):
389 print "Load event schema file: " + DcaeVariables.CommonEventSchemaV5
390 with open(DcaeVariables.CommonEventSchemaV5) as opened_file:
392 EvtSchema = json.load(opened_file)
394 server_address = ('', port)
396 handler_class.protocol_version = protocol
397 httpd = server_class(server_address, handler_class)
401 DcaeVariables.HTTPD = httpd
403 sa = httpd.socket.getsockname()
404 print "Serving HTTP on", sa[0], "port", sa[1], "..."
405 # httpd.serve_forever()
408 def _main_(handler_class=DMaaPHandler, server_class=BaseHTTPServer.HTTPServer, protocol="HTTP/1.0"):
411 port = int(sys.argv[1])
415 print "Load event schema file: " + DcaeVariables.CommonEventSchemaV5
416 with open(DcaeVariables.CommonEventSchemaV5) as opened_file:
418 EvtSchema = json.load(opened_file)
420 server_address = ('', port)
422 handler_class.protocol_version = protocol
423 httpd = server_class(server_address, handler_class)
425 sa = httpd.socket.getsockname()
426 print "Serving HTTP on", sa[0], "port", sa[1], "..."
427 httpd.serve_forever()
430 if __name__ == '__main__':