Merge "Fix PM_MAPPER_01 test - use a non-generic log"
[integration/csit.git] / tests / dcaegen2 / testcases / resources / DMaaP.py
1 '''
2 Created on Aug 15, 2017
3
4 @author: sw6830
5 '''
6 import os
7 import posixpath
8 import BaseHTTPServer
9 import urllib
10 import urlparse
11 import cgi
12 import sys
13 import shutil
14 import mimetypes
15 from jsonschema import validate
16 import jsonschema
17 import json
18 import DcaeVariables
19 import SimpleHTTPServer
20
21 try:
22     from cStringIO import StringIO
23 except ImportError:
24     from StringIO import StringIO
25
26 EvtSchema = None
27 DMaaPHttpd = None
28
29
30 def clean_up_event():
31     sz = DcaeVariables.VESEventQ.qsize()
32     for i in range(sz):
33         try:
34             self.evtQueue.get_nowait()
35         except:
36             pass
37
38
39 def enque_event(evt):
40     if DcaeVariables.VESEventQ is not None:
41         try:
42             DcaeVariables.VESEventQ.put(evt)
43             return True
44         except Exception as e:
45             print (str(e))
46             return False
47     return False
48
49
50 def deque_event(wait_sec=25):
51     if DcaeVariables.IsRobotRun:
52         pass
53     try:
54         evt = DcaeVariables.VESEventQ.get(True, wait_sec)
55         return evt
56     except Exception as e:
57         if DcaeVariables.IsRobotRun:
58             pass
59
60         else:
61             print("DMaaP Event dequeue timeout")
62         return None
63
64
65 class DMaaPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
66       
67     def do_PUT(self):
68         self.send_response(405)
69         return
70         
71     def do_POST(self):
72         resp_code = 0
73         # Parse the form data posted
74         '''
75         form = cgi.FieldStorage(
76             fp=self.rfile, 
77             headers=self.headers,
78             environ={'REQUEST_METHOD':'POST',
79                      'CONTENT_TYPE':self.headers['Content-Type'],
80                      })
81         
82         
83         form = cgi.FieldStorage(
84         fp=self.rfile,
85         headers=self.headers,
86         environ={"REQUEST_METHOD": "POST"})
87
88         for item in form.list:
89             print "%s=%s" % (item.name, item.value)
90             
91         '''
92         
93         if 'POST' not in self.requestline:
94             resp_code = 405
95             
96         '''
97         if resp_code == 0:
98             if '/eventlistener/v5' not in self.requestline and '/eventlistener/v5/eventBatch' not in self.requestline and \
99                         '/eventlistener/v5/clientThrottlingState' not in self.requestline:
100                 resp_code = 404
101          
102         
103         if resp_code == 0:
104             if 'Y29uc29sZTpaakprWWpsbE1qbGpNVEkyTTJJeg==' not in str(self.headers):
105                 resp_code = 401
106         '''  
107         
108         if resp_code == 0:
109             topic = self.extract_topic_from_path()
110             content_len = int(self.headers.getheader('content-length', 0))
111             post_body = self.rfile.read(content_len)
112             
113             indx = post_body.index("{")
114             if indx != 0:
115                 post_body = post_body[indx:]
116
117             event = "\""+topic+"\":" + post_body
118             if not enque_event(event):
119                 print "enque event fails"
120                    
121             global EvtSchema
122             try:
123                 if EvtSchema is None:
124                     with open(DcaeVariables.CommonEventSchema) as opened_file:
125                         EvtSchema = json.load(opened_file)
126                 decoded_body = json.loads(post_body)
127                 jsonschema.validate(decoded_body, EvtSchema)
128             except:
129                 resp_code = 400
130         
131         # Begin the response
132         if not DcaeVariables.IsRobotRun:
133             print ("Response Message:")
134         
135         '''
136         {
137           "200" : {
138             "description" : "Success",
139             "schema" : {
140               "$ref" : "#/definitions/DR_Pub"
141             }
142         }
143         
144         rspStr = "{'responses' : {'200' : {'description' : 'Success'}}}"
145         rspStr1 = "{'count': 1, 'serverTimeMs': 3}"
146
147         '''
148         
149         if resp_code == 0:
150             if 'clientThrottlingState' in self.requestline:
151                 self.send_response(204)
152             else:
153                 self.send_response(200)
154                 self.send_header('Content-Type', 'application/json')
155                 self.end_headers()
156                 self.wfile.write("{'count': 1, 'serverTimeMs': 3}")
157                 self.wfile.close()
158         else:
159             self.send_response(resp_code)
160         
161         '''
162         self.end_headers()
163         self.wfile.write('Client: %s\n' % str(self.client_address))
164         self.wfile.write('User-agent: %s\n' % str(self.headers['user-agent']))
165         self.wfile.write('Path: %s\n' % self.path)
166         self.wfile.write('Form data:\n')
167         self.wfile.close()
168
169         # Echo back information about what was posted in the form
170         for field in form.keys():
171             field_item = form[field]
172             if field_item.filename:
173                 # The field contains an uploaded file
174                 file_data = field_item.file.read()
175                 file_len = len(file_data)
176                 del file_data
177                 self.wfile.write('\tUploaded %s as "%s" (%d bytes)\n' % \
178                         (field, field_item.filename, file_len))
179             else:
180                 # Regular form value
181                 self.wfile.write('\t%s=%s\n' % (field, form[field].value))
182         '''
183         return
184
185     def extract_topic_from_path(self):
186         return self.path["/events/".__len__():]
187
188     def do_GET(self):
189         """Serve a GET request."""
190         f = self.send_head()
191         if f:
192             try:
193                 self.copyfile(f, self.wfile)
194             finally:
195                 f.close()
196
197     def do_HEAD(self):
198         """Serve a HEAD request."""
199         f = self.send_head()
200         if f:
201             f.close()
202
203     def send_head(self):
204         """Common code for GET and HEAD commands.
205
206         This sends the response code and MIME headers.
207
208         Return value is either a file object (which has to be copied
209         to the outputfile by the caller unless the command was HEAD,
210         and must be closed by the caller under all circumstances), or
211         None, in which case the caller has nothing further to do.
212
213         """
214         path = self.translate_path(self.path)
215         if os.path.isdir(path):
216             parts = urlparse.urlsplit(self.path)
217             if not parts.path.endswith('/'):
218                 # redirect browser - doing basically what apache does
219                 self.send_response(301)
220                 new_parts = (parts[0], parts[1], parts[2] + '/',
221                              parts[3], parts[4])
222                 new_url = urlparse.urlunsplit(new_parts)
223                 self.send_header("Location", new_url)
224                 self.end_headers()
225                 return None
226             for index in "index.html", "index.htm":
227                 index = os.path.join(path, index)
228                 if os.path.exists(index):
229                     path = index
230                     break
231             else:
232                 return self.list_directory(path)
233         ctype = self.guess_type(path)
234         try:
235             # Always read in binary mode. Opening files in text mode may cause
236             # newline translations, making the actual size of the content
237             # transmitted *less* than the content-length!
238             f = open(path, 'rb')
239         except IOError:
240             self.send_error(404, "File not found")
241             return None
242         try:
243             self.send_response(200)
244             self.send_header("Content-type", ctype)
245             fs = os.fstat(f.fileno())
246             self.send_header("Content-Length", str(fs[6]))
247             self.send_header("Last-Modified", self.date_time_string(fs.st_mtime))
248             self.end_headers()
249             return f
250         except:
251             f.close()
252             raise
253
254     def list_directory(self, path):
255         """Helper to produce a directory listing (absent index.html).
256
257         Return value is either a file object, or None (indicating an
258         error).  In either case, the headers are sent, making the
259         interface the same as for send_head().
260
261         """
262         try:
263             list_dir = os.listdir(path)
264         except os.error:
265             self.send_error(404, "No permission to list directory")
266             return None
267         list_dir.sort(key=lambda a: a.lower())
268         f = StringIO()
269         displaypath = cgi.escape(urllib.unquote(self.path))
270         f.write('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">')
271         f.write("<html>\n<title>Directory listing for %s</title>\n" % displaypath)
272         f.write("<body>\n<h2>Directory listing for %s</h2>\n" % displaypath)
273         f.write("<hr>\n<ul>\n")
274         for name in list_dir:
275             fullname = os.path.join(path, name)
276             displayname = linkname = name
277             # Append / for directories or @ for symbolic links
278             if os.path.isdir(fullname):
279                 displayname = name + "/"
280                 linkname = name + "/"
281             if os.path.islink(fullname):
282                 displayname = name + "@"
283                 # Note: a link to a directory displays with @ and links with /
284             f.write('<li><a href="%s">%s</a>\n'
285                     % (urllib.quote(linkname), cgi.escape(displayname)))
286         f.write("</ul>\n<hr>\n</body>\n</html>\n")
287         length = f.tell()
288         f.seek(0)
289         self.send_response(200)
290         encoding = sys.getfilesystemencoding()
291         self.send_header("Content-type", "text/html; charset=%s" % encoding)
292         self.send_header("Content-Length", str(length))
293         self.end_headers()
294         return f
295
296     @staticmethod
297     def translate_path(path):
298         """Translate a /-separated PATH to the local filename syntax.
299
300         Components that mean special things to the local file system
301         (e.g. drive or directory names) are ignored.  (XXX They should
302         probably be diagnosed.)
303
304         """
305         # abandon query parameters
306         path = path.split('?', 1)[0]
307         path = path.split('#', 1)[0]
308         # Don't forget explicit trailing slash when normalizing. Issue17324
309         trailing_slash = path.rstrip().endswith('/')
310         path = posixpath.normpath(urllib.unquote(path))
311         words = path.split('/')
312         words = filter(None, words)
313         path = os.getcwd()
314         for word in words:
315             if os.path.dirname(word) or word in (os.curdir, os.pardir):
316                 # Ignore components that are not a simple file/directory name
317                 continue
318             path = os.path.join(path, word)
319         if trailing_slash:
320             path += '/'
321         return path
322
323     @staticmethod
324     def copyfile(source, outputfile):
325         """Copy all data between two file objects.
326
327         The SOURCE argument is a file object open for reading
328         (or anything with a read() method) and the DESTINATION
329         argument is a file object open for writing (or
330         anything with a write() method).
331
332         The only reason for overriding this would be to change
333         the block size or perhaps to replace newlines by CRLF
334         -- note however that this the default server uses this
335         to copy binary data as well.
336
337         """
338         shutil.copyfileobj(source, outputfile)
339
340     def guess_type(self, path):
341         """Guess the type of a file.
342
343         Argument is a PATH (a filename).
344
345         Return value is a string of the form type/subtype,
346         usable for a MIME Content-type header.
347
348         The default implementation looks the file's extension
349         up in the table self.extensions_map, using application/octet-stream
350         as a default; however it would be permissible (if
351         slow) to look inside the data to make a better guess.
352
353         """
354
355         base, ext = posixpath.splitext(path)
356         if ext in self.extensions_map:
357             return self.extensions_map[ext]
358         ext = ext.lower()
359         if ext in self.extensions_map:
360             return self.extensions_map[ext]
361         else:
362             return self.extensions_map['']
363
364     if not mimetypes.inited:
365         mimetypes.init()  # try to read system mime.types
366     extensions_map = mimetypes.types_map.copy()
367     extensions_map.update({
368         '': 'application/octet-stream',  # Default
369         '.py': 'text/plain',
370         '.c': 'text/plain',
371         '.h': 'text/plain',
372         })
373
374
375 def test(handler_class=DMaaPHandler, server_class=BaseHTTPServer.HTTPServer, protocol="HTTP/1.0", port=3904):
376     print "Load event schema file: " + DcaeVariables.CommonEventSchema
377     with open(DcaeVariables.CommonEventSchema) as opened_file:
378         global EvtSchema
379         EvtSchema = json.load(opened_file)
380         
381     server_address = ('', port)
382
383     handler_class.protocol_version = protocol
384     httpd = server_class(server_address, handler_class)
385     
386     global DMaaPHttpd
387     DMaaPHttpd = httpd
388     DcaeVariables.HTTPD = httpd
389
390     sa = httpd.socket.getsockname()
391     print "Serving HTTP on", sa[0], "port", sa[1], "..."
392     # httpd.serve_forever()
393
394
395 def _main_(handler_class=DMaaPHandler, server_class=BaseHTTPServer.HTTPServer, protocol="HTTP/1.0"):
396     
397     if sys.argv[1:]:
398         port = int(sys.argv[1])
399     else:
400         port = 3904
401     
402     print "Load event schema file: " + DcaeVariables.CommonEventSchema
403     with open(DcaeVariables.CommonEventSchema) as opened_file:
404         global EvtSchema
405         EvtSchema = json.load(opened_file)
406         
407     server_address = ('', port)
408
409     handler_class.protocol_version = protocol
410     httpd = server_class(server_address, handler_class)
411
412     sa = httpd.socket.getsockname()
413     print "Serving HTTP on", sa[0], "port", sa[1], "..."
414     httpd.serve_forever()
415
416
417 if __name__ == '__main__':
418     _main_()