Move CSIT to integration/csit repo
[integration/csit.git] / tests / dcaegen2 / testcases / resources / DMaaP.py
1 '''
2 Created on Aug 15, 2017
3
4 @author: sw6830
5 '''
6 import os
7 import posixpath
8 import BaseHTTPServer
9 import urllib
10 import urlparse
11 import cgi
12 import sys
13 import shutil
14 import mimetypes
15 from jsonschema import validate
16 import jsonschema
17 import json
18 import DcaeVariables
19 import SimpleHTTPServer
20 from robot.api import logger
21
22
23 try:
24     from cStringIO import StringIO
25 except ImportError:
26     from StringIO import StringIO
27
28 EvtSchema = None
29 DMaaPHttpd = None
30
31
32 def clean_up_event():
33     sz = DcaeVariables.VESEventQ.qsize()
34     for i in range(sz):
35         try:
36             self.evtQueue.get_nowait()
37         except:
38             pass
39
40
41 def enque_event(evt):
42     if DcaeVariables.VESEventQ is not None:
43         try:
44             DcaeVariables.VESEventQ.put(evt)
45             if DcaeVariables.IsRobotRun:
46                 logger.console("DMaaP Event enqued - size=" + str(len(evt)))
47             else:
48                 print ("DMaaP Event enqueued - size=" + str(len(evt)))
49             return True
50         except Exception as e:
51             print (str(e))
52             return False
53     return False
54
55
56 def deque_event(wait_sec=25):
57     if DcaeVariables.IsRobotRun:
58         logger.console("Enter DequeEvent")
59     try:
60         evt = DcaeVariables.VESEventQ.get(True, wait_sec)
61         if DcaeVariables.IsRobotRun:
62             logger.console("DMaaP Event dequeued - size=" + str(len(evt)))
63         else:
64             print("DMaaP Event dequeued - size=" + str(len(evt)))
65         return evt
66     except Exception as e:
67         if DcaeVariables.IsRobotRun:
68             logger.console(str(e))
69             logger.console("DMaaP Event dequeue timeout")
70         else:
71             print("DMaaP Event dequeue timeout")
72         return None
73
74
75 class DMaaPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
76       
77     def do_PUT(self):
78         self.send_response(405)
79         return
80         
81     def do_POST(self):
82         
83         resp_code = 0
84         # Parse the form data posted
85         '''
86         form = cgi.FieldStorage(
87             fp=self.rfile, 
88             headers=self.headers,
89             environ={'REQUEST_METHOD':'POST',
90                      'CONTENT_TYPE':self.headers['Content-Type'],
91                      })
92         
93         
94         form = cgi.FieldStorage(
95         fp=self.rfile,
96         headers=self.headers,
97         environ={"REQUEST_METHOD": "POST"})
98
99         for item in form.list:
100             print "%s=%s" % (item.name, item.value)
101             
102         '''
103         
104         if 'POST' not in self.requestline:
105             resp_code = 405
106             
107         '''
108         if resp_code == 0:
109             if '/eventlistener/v5' not in self.requestline and '/eventlistener/v5/eventBatch' not in self.requestline and \
110                         '/eventlistener/v5/clientThrottlingState' not in self.requestline:
111                 resp_code = 404
112          
113         
114         if resp_code == 0:
115             if 'Y29uc29sZTpaakprWWpsbE1qbGpNVEkyTTJJeg==' not in str(self.headers):
116                 resp_code = 401
117         '''  
118         
119         if resp_code == 0:
120             content_len = int(self.headers.getheader('content-length', 0))
121             post_body = self.rfile.read(content_len)
122             
123             if DcaeVariables.IsRobotRun:
124                 logger.console("\n" + "DMaaP Receive Event:\n" + post_body)
125             else:
126                 print("\n" + "DMaaP Receive Event:")
127                 print (post_body)
128             
129             indx = post_body.index("{")
130             if indx != 0:
131                 post_body = post_body[indx:]
132             
133             if not enque_event(post_body):
134                 print "enque event fails"
135                    
136             global EvtSchema
137             try:
138                 if EvtSchema is None:
139                     with open(DcaeVariables.CommonEventSchemaV5) as opened_file:
140                         EvtSchema = json.load(opened_file)
141                 decoded_body = json.loads(post_body)
142                 jsonschema.validate(decoded_body, EvtSchema)
143             except:
144                 resp_code = 400
145         
146         # Begin the response
147         if not DcaeVariables.IsRobotRun:
148             print ("Response Message:")
149         
150         '''
151         {
152           "200" : {
153             "description" : "Success",
154             "schema" : {
155               "$ref" : "#/definitions/DR_Pub"
156             }
157         }
158         
159         rspStr = "{'responses' : {'200' : {'description' : 'Success'}}}"
160         rspStr1 = "{'count': 1, 'serverTimeMs': 3}"
161
162         '''
163         
164         if resp_code == 0:
165             if 'clientThrottlingState' in self.requestline:
166                 self.send_response(204)
167             else:
168                 self.send_response(200)
169                 self.send_header('Content-Type', 'application/json')
170                 self.end_headers()
171                 # self.wfile.write("{'responses' : {'200' : {'description' : 'Success'}}}")
172                 self.wfile.write("{'count': 1, 'serverTimeMs': 3}")
173                 self.wfile.close()
174         else:
175             self.send_response(resp_code)
176         
177         '''
178         self.end_headers()
179         self.wfile.write('Client: %s\n' % str(self.client_address))
180         self.wfile.write('User-agent: %s\n' % str(self.headers['user-agent']))
181         self.wfile.write('Path: %s\n' % self.path)
182         self.wfile.write('Form data:\n')
183         self.wfile.close()
184
185         # Echo back information about what was posted in the form
186         for field in form.keys():
187             field_item = form[field]
188             if field_item.filename:
189                 # The field contains an uploaded file
190                 file_data = field_item.file.read()
191                 file_len = len(file_data)
192                 del file_data
193                 self.wfile.write('\tUploaded %s as "%s" (%d bytes)\n' % \
194                         (field, field_item.filename, file_len))
195             else:
196                 # Regular form value
197                 self.wfile.write('\t%s=%s\n' % (field, form[field].value))
198         '''
199         return
200
201     def do_GET(self):
202         """Serve a GET request."""
203         f = self.send_head()
204         if f:
205             try:
206                 self.copyfile(f, self.wfile)
207             finally:
208                 f.close()
209
210     def do_HEAD(self):
211         """Serve a HEAD request."""
212         f = self.send_head()
213         if f:
214             f.close()
215
216     def send_head(self):
217         """Common code for GET and HEAD commands.
218
219         This sends the response code and MIME headers.
220
221         Return value is either a file object (which has to be copied
222         to the outputfile by the caller unless the command was HEAD,
223         and must be closed by the caller under all circumstances), or
224         None, in which case the caller has nothing further to do.
225
226         """
227         path = self.translate_path(self.path)
228         if os.path.isdir(path):
229             parts = urlparse.urlsplit(self.path)
230             if not parts.path.endswith('/'):
231                 # redirect browser - doing basically what apache does
232                 self.send_response(301)
233                 new_parts = (parts[0], parts[1], parts[2] + '/',
234                              parts[3], parts[4])
235                 new_url = urlparse.urlunsplit(new_parts)
236                 self.send_header("Location", new_url)
237                 self.end_headers()
238                 return None
239             for index in "index.html", "index.htm":
240                 index = os.path.join(path, index)
241                 if os.path.exists(index):
242                     path = index
243                     break
244             else:
245                 return self.list_directory(path)
246         ctype = self.guess_type(path)
247         try:
248             # Always read in binary mode. Opening files in text mode may cause
249             # newline translations, making the actual size of the content
250             # transmitted *less* than the content-length!
251             f = open(path, 'rb')
252         except IOError:
253             self.send_error(404, "File not found")
254             return None
255         try:
256             self.send_response(200)
257             self.send_header("Content-type", ctype)
258             fs = os.fstat(f.fileno())
259             self.send_header("Content-Length", str(fs[6]))
260             self.send_header("Last-Modified", self.date_time_string(fs.st_mtime))
261             self.end_headers()
262             return f
263         except:
264             f.close()
265             raise
266
267     def list_directory(self, path):
268         """Helper to produce a directory listing (absent index.html).
269
270         Return value is either a file object, or None (indicating an
271         error).  In either case, the headers are sent, making the
272         interface the same as for send_head().
273
274         """
275         try:
276             list_dir = os.listdir(path)
277         except os.error:
278             self.send_error(404, "No permission to list directory")
279             return None
280         list_dir.sort(key=lambda a: a.lower())
281         f = StringIO()
282         displaypath = cgi.escape(urllib.unquote(self.path))
283         f.write('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">')
284         f.write("<html>\n<title>Directory listing for %s</title>\n" % displaypath)
285         f.write("<body>\n<h2>Directory listing for %s</h2>\n" % displaypath)
286         f.write("<hr>\n<ul>\n")
287         for name in list_dir:
288             fullname = os.path.join(path, name)
289             displayname = linkname = name
290             # Append / for directories or @ for symbolic links
291             if os.path.isdir(fullname):
292                 displayname = name + "/"
293                 linkname = name + "/"
294             if os.path.islink(fullname):
295                 displayname = name + "@"
296                 # Note: a link to a directory displays with @ and links with /
297             f.write('<li><a href="%s">%s</a>\n'
298                     % (urllib.quote(linkname), cgi.escape(displayname)))
299         f.write("</ul>\n<hr>\n</body>\n</html>\n")
300         length = f.tell()
301         f.seek(0)
302         self.send_response(200)
303         encoding = sys.getfilesystemencoding()
304         self.send_header("Content-type", "text/html; charset=%s" % encoding)
305         self.send_header("Content-Length", str(length))
306         self.end_headers()
307         return f
308
309     @staticmethod
310     def translate_path(path):
311         """Translate a /-separated PATH to the local filename syntax.
312
313         Components that mean special things to the local file system
314         (e.g. drive or directory names) are ignored.  (XXX They should
315         probably be diagnosed.)
316
317         """
318         # abandon query parameters
319         path = path.split('?', 1)[0]
320         path = path.split('#', 1)[0]
321         # Don't forget explicit trailing slash when normalizing. Issue17324
322         trailing_slash = path.rstrip().endswith('/')
323         path = posixpath.normpath(urllib.unquote(path))
324         words = path.split('/')
325         words = filter(None, words)
326         path = os.getcwd()
327         for word in words:
328             if os.path.dirname(word) or word in (os.curdir, os.pardir):
329                 # Ignore components that are not a simple file/directory name
330                 continue
331             path = os.path.join(path, word)
332         if trailing_slash:
333             path += '/'
334         return path
335
336     @staticmethod
337     def copyfile(source, outputfile):
338         """Copy all data between two file objects.
339
340         The SOURCE argument is a file object open for reading
341         (or anything with a read() method) and the DESTINATION
342         argument is a file object open for writing (or
343         anything with a write() method).
344
345         The only reason for overriding this would be to change
346         the block size or perhaps to replace newlines by CRLF
347         -- note however that this the default server uses this
348         to copy binary data as well.
349
350         """
351         shutil.copyfileobj(source, outputfile)
352
353     def guess_type(self, path):
354         """Guess the type of a file.
355
356         Argument is a PATH (a filename).
357
358         Return value is a string of the form type/subtype,
359         usable for a MIME Content-type header.
360
361         The default implementation looks the file's extension
362         up in the table self.extensions_map, using application/octet-stream
363         as a default; however it would be permissible (if
364         slow) to look inside the data to make a better guess.
365
366         """
367
368         base, ext = posixpath.splitext(path)
369         if ext in self.extensions_map:
370             return self.extensions_map[ext]
371         ext = ext.lower()
372         if ext in self.extensions_map:
373             return self.extensions_map[ext]
374         else:
375             return self.extensions_map['']
376
377     if not mimetypes.inited:
378         mimetypes.init()  # try to read system mime.types
379     extensions_map = mimetypes.types_map.copy()
380     extensions_map.update({
381         '': 'application/octet-stream',  # Default
382         '.py': 'text/plain',
383         '.c': 'text/plain',
384         '.h': 'text/plain',
385         })
386
387
388 def test(handler_class=DMaaPHandler, server_class=BaseHTTPServer.HTTPServer, protocol="HTTP/1.0", port=3904):
389     print "Load event schema file: " + DcaeVariables.CommonEventSchemaV5
390     with open(DcaeVariables.CommonEventSchemaV5) as opened_file:
391         global EvtSchema
392         EvtSchema = json.load(opened_file)
393         
394     server_address = ('', port)
395
396     handler_class.protocol_version = protocol
397     httpd = server_class(server_address, handler_class)
398     
399     global DMaaPHttpd
400     DMaaPHttpd = httpd
401     DcaeVariables.HTTPD = httpd
402
403     sa = httpd.socket.getsockname()
404     print "Serving HTTP on", sa[0], "port", sa[1], "..."
405     # httpd.serve_forever()
406
407
408 def _main_(handler_class=DMaaPHandler, server_class=BaseHTTPServer.HTTPServer, protocol="HTTP/1.0"):
409     
410     if sys.argv[1:]:
411         port = int(sys.argv[1])
412     else:
413         port = 3904
414     
415     print "Load event schema file: " + DcaeVariables.CommonEventSchemaV5
416     with open(DcaeVariables.CommonEventSchemaV5) as opened_file:
417         global EvtSchema
418         EvtSchema = json.load(opened_file)
419         
420     server_address = ('', port)
421
422     handler_class.protocol_version = protocol
423     httpd = server_class(server_address, handler_class)
424
425     sa = httpd.socket.getsockname()
426     print "Serving HTTP on", sa[0], "port", sa[1], "..."
427     httpd.serve_forever()
428
429
430 if __name__ == '__main__':
431     _main_()