Adding basic E2E test for workflow designer
[integration/csit.git] / tests / dcaegen2 / testcases / resources / DMaaP.py
1 '''
2 Created on Aug 15, 2017
3
4 @author: sw6830
5 '''
6 import os
7 import posixpath
8 import BaseHTTPServer
9 import urllib
10 import urlparse
11 import cgi
12 import sys
13 import shutil
14 import mimetypes
15 from jsonschema import validate
16 import jsonschema
17 import json
18 import DcaeVariables
19 import SimpleHTTPServer
20
21 try:
22     from cStringIO import StringIO
23 except ImportError:
24     from StringIO import StringIO
25
26 EvtSchema = None
27 DMaaPHttpd = None
28
29
30 def clean_up_event():
31     sz = DcaeVariables.VESEventQ.qsize()
32     for i in range(sz):
33         try:
34             self.evtQueue.get_nowait()
35         except:
36             pass
37
38
39 def enque_event(evt):
40     if DcaeVariables.VESEventQ is not None:
41         try:
42             DcaeVariables.VESEventQ.put(evt)
43             return True
44         except Exception as e:
45             print (str(e))
46             return False
47     return False
48
49
50 def deque_event(wait_sec=25):
51     if DcaeVariables.IsRobotRun:
52         pass
53     try:
54         evt = DcaeVariables.VESEventQ.get(True, wait_sec)
55         return evt
56     except Exception as e:
57         if DcaeVariables.IsRobotRun:
58             pass
59
60         else:
61             print("DMaaP Event dequeue timeout")
62         return None
63
64
65 class DMaaPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
66       
67     def do_PUT(self):
68         self.send_response(405)
69         return
70         
71     def do_POST(self):
72         resp_code = 0
73         # Parse the form data posted
74         '''
75         form = cgi.FieldStorage(
76             fp=self.rfile, 
77             headers=self.headers,
78             environ={'REQUEST_METHOD':'POST',
79                      'CONTENT_TYPE':self.headers['Content-Type'],
80                      })
81         
82         
83         form = cgi.FieldStorage(
84         fp=self.rfile,
85         headers=self.headers,
86         environ={"REQUEST_METHOD": "POST"})
87
88         for item in form.list:
89             print "%s=%s" % (item.name, item.value)
90             
91         '''
92         
93         if 'POST' not in self.requestline:
94             resp_code = 405
95             
96         '''
97         if resp_code == 0:
98             if '/eventlistener/v5' not in self.requestline and '/eventlistener/v5/eventBatch' not in self.requestline and \
99                         '/eventlistener/v5/clientThrottlingState' not in self.requestline:
100                 resp_code = 404
101          
102         
103         if resp_code == 0:
104             if 'Y29uc29sZTpaakprWWpsbE1qbGpNVEkyTTJJeg==' not in str(self.headers):
105                 resp_code = 401
106         '''  
107         
108         if resp_code == 0:
109             content_len = int(self.headers.getheader('content-length', 0))
110             post_body = self.rfile.read(content_len)
111             
112             indx = post_body.index("{")
113             if indx != 0:
114                 post_body = post_body[indx:]
115             
116             if not enque_event(post_body):
117                 print "enque event fails"
118                    
119             global EvtSchema
120             try:
121                 if EvtSchema is None:
122                     with open(DcaeVariables.CommonEventSchema) as opened_file:
123                         EvtSchema = json.load(opened_file)
124                 decoded_body = json.loads(post_body)
125                 jsonschema.validate(decoded_body, EvtSchema)
126             except:
127                 resp_code = 400
128         
129         # Begin the response
130         if not DcaeVariables.IsRobotRun:
131             print ("Response Message:")
132         
133         '''
134         {
135           "200" : {
136             "description" : "Success",
137             "schema" : {
138               "$ref" : "#/definitions/DR_Pub"
139             }
140         }
141         
142         rspStr = "{'responses' : {'200' : {'description' : 'Success'}}}"
143         rspStr1 = "{'count': 1, 'serverTimeMs': 3}"
144
145         '''
146         
147         if resp_code == 0:
148             if 'clientThrottlingState' in self.requestline:
149                 self.send_response(204)
150             else:
151                 self.send_response(200)
152                 self.send_header('Content-Type', 'application/json')
153                 self.end_headers()
154                 self.wfile.write("{'count': 1, 'serverTimeMs': 3}")
155                 self.wfile.close()
156         else:
157             self.send_response(resp_code)
158         
159         '''
160         self.end_headers()
161         self.wfile.write('Client: %s\n' % str(self.client_address))
162         self.wfile.write('User-agent: %s\n' % str(self.headers['user-agent']))
163         self.wfile.write('Path: %s\n' % self.path)
164         self.wfile.write('Form data:\n')
165         self.wfile.close()
166
167         # Echo back information about what was posted in the form
168         for field in form.keys():
169             field_item = form[field]
170             if field_item.filename:
171                 # The field contains an uploaded file
172                 file_data = field_item.file.read()
173                 file_len = len(file_data)
174                 del file_data
175                 self.wfile.write('\tUploaded %s as "%s" (%d bytes)\n' % \
176                         (field, field_item.filename, file_len))
177             else:
178                 # Regular form value
179                 self.wfile.write('\t%s=%s\n' % (field, form[field].value))
180         '''
181         return
182
183     def do_GET(self):
184         """Serve a GET request."""
185         f = self.send_head()
186         if f:
187             try:
188                 self.copyfile(f, self.wfile)
189             finally:
190                 f.close()
191
192     def do_HEAD(self):
193         """Serve a HEAD request."""
194         f = self.send_head()
195         if f:
196             f.close()
197
198     def send_head(self):
199         """Common code for GET and HEAD commands.
200
201         This sends the response code and MIME headers.
202
203         Return value is either a file object (which has to be copied
204         to the outputfile by the caller unless the command was HEAD,
205         and must be closed by the caller under all circumstances), or
206         None, in which case the caller has nothing further to do.
207
208         """
209         path = self.translate_path(self.path)
210         if os.path.isdir(path):
211             parts = urlparse.urlsplit(self.path)
212             if not parts.path.endswith('/'):
213                 # redirect browser - doing basically what apache does
214                 self.send_response(301)
215                 new_parts = (parts[0], parts[1], parts[2] + '/',
216                              parts[3], parts[4])
217                 new_url = urlparse.urlunsplit(new_parts)
218                 self.send_header("Location", new_url)
219                 self.end_headers()
220                 return None
221             for index in "index.html", "index.htm":
222                 index = os.path.join(path, index)
223                 if os.path.exists(index):
224                     path = index
225                     break
226             else:
227                 return self.list_directory(path)
228         ctype = self.guess_type(path)
229         try:
230             # Always read in binary mode. Opening files in text mode may cause
231             # newline translations, making the actual size of the content
232             # transmitted *less* than the content-length!
233             f = open(path, 'rb')
234         except IOError:
235             self.send_error(404, "File not found")
236             return None
237         try:
238             self.send_response(200)
239             self.send_header("Content-type", ctype)
240             fs = os.fstat(f.fileno())
241             self.send_header("Content-Length", str(fs[6]))
242             self.send_header("Last-Modified", self.date_time_string(fs.st_mtime))
243             self.end_headers()
244             return f
245         except:
246             f.close()
247             raise
248
249     def list_directory(self, path):
250         """Helper to produce a directory listing (absent index.html).
251
252         Return value is either a file object, or None (indicating an
253         error).  In either case, the headers are sent, making the
254         interface the same as for send_head().
255
256         """
257         try:
258             list_dir = os.listdir(path)
259         except os.error:
260             self.send_error(404, "No permission to list directory")
261             return None
262         list_dir.sort(key=lambda a: a.lower())
263         f = StringIO()
264         displaypath = cgi.escape(urllib.unquote(self.path))
265         f.write('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">')
266         f.write("<html>\n<title>Directory listing for %s</title>\n" % displaypath)
267         f.write("<body>\n<h2>Directory listing for %s</h2>\n" % displaypath)
268         f.write("<hr>\n<ul>\n")
269         for name in list_dir:
270             fullname = os.path.join(path, name)
271             displayname = linkname = name
272             # Append / for directories or @ for symbolic links
273             if os.path.isdir(fullname):
274                 displayname = name + "/"
275                 linkname = name + "/"
276             if os.path.islink(fullname):
277                 displayname = name + "@"
278                 # Note: a link to a directory displays with @ and links with /
279             f.write('<li><a href="%s">%s</a>\n'
280                     % (urllib.quote(linkname), cgi.escape(displayname)))
281         f.write("</ul>\n<hr>\n</body>\n</html>\n")
282         length = f.tell()
283         f.seek(0)
284         self.send_response(200)
285         encoding = sys.getfilesystemencoding()
286         self.send_header("Content-type", "text/html; charset=%s" % encoding)
287         self.send_header("Content-Length", str(length))
288         self.end_headers()
289         return f
290
291     @staticmethod
292     def translate_path(path):
293         """Translate a /-separated PATH to the local filename syntax.
294
295         Components that mean special things to the local file system
296         (e.g. drive or directory names) are ignored.  (XXX They should
297         probably be diagnosed.)
298
299         """
300         # abandon query parameters
301         path = path.split('?', 1)[0]
302         path = path.split('#', 1)[0]
303         # Don't forget explicit trailing slash when normalizing. Issue17324
304         trailing_slash = path.rstrip().endswith('/')
305         path = posixpath.normpath(urllib.unquote(path))
306         words = path.split('/')
307         words = filter(None, words)
308         path = os.getcwd()
309         for word in words:
310             if os.path.dirname(word) or word in (os.curdir, os.pardir):
311                 # Ignore components that are not a simple file/directory name
312                 continue
313             path = os.path.join(path, word)
314         if trailing_slash:
315             path += '/'
316         return path
317
318     @staticmethod
319     def copyfile(source, outputfile):
320         """Copy all data between two file objects.
321
322         The SOURCE argument is a file object open for reading
323         (or anything with a read() method) and the DESTINATION
324         argument is a file object open for writing (or
325         anything with a write() method).
326
327         The only reason for overriding this would be to change
328         the block size or perhaps to replace newlines by CRLF
329         -- note however that this the default server uses this
330         to copy binary data as well.
331
332         """
333         shutil.copyfileobj(source, outputfile)
334
335     def guess_type(self, path):
336         """Guess the type of a file.
337
338         Argument is a PATH (a filename).
339
340         Return value is a string of the form type/subtype,
341         usable for a MIME Content-type header.
342
343         The default implementation looks the file's extension
344         up in the table self.extensions_map, using application/octet-stream
345         as a default; however it would be permissible (if
346         slow) to look inside the data to make a better guess.
347
348         """
349
350         base, ext = posixpath.splitext(path)
351         if ext in self.extensions_map:
352             return self.extensions_map[ext]
353         ext = ext.lower()
354         if ext in self.extensions_map:
355             return self.extensions_map[ext]
356         else:
357             return self.extensions_map['']
358
359     if not mimetypes.inited:
360         mimetypes.init()  # try to read system mime.types
361     extensions_map = mimetypes.types_map.copy()
362     extensions_map.update({
363         '': 'application/octet-stream',  # Default
364         '.py': 'text/plain',
365         '.c': 'text/plain',
366         '.h': 'text/plain',
367         })
368
369
370 def test(handler_class=DMaaPHandler, server_class=BaseHTTPServer.HTTPServer, protocol="HTTP/1.0", port=3904):
371     print "Load event schema file: " + DcaeVariables.CommonEventSchema
372     with open(DcaeVariables.CommonEventSchema) as opened_file:
373         global EvtSchema
374         EvtSchema = json.load(opened_file)
375         
376     server_address = ('', port)
377
378     handler_class.protocol_version = protocol
379     httpd = server_class(server_address, handler_class)
380     
381     global DMaaPHttpd
382     DMaaPHttpd = httpd
383     DcaeVariables.HTTPD = httpd
384
385     sa = httpd.socket.getsockname()
386     print "Serving HTTP on", sa[0], "port", sa[1], "..."
387     # httpd.serve_forever()
388
389
390 def _main_(handler_class=DMaaPHandler, server_class=BaseHTTPServer.HTTPServer, protocol="HTTP/1.0"):
391     
392     if sys.argv[1:]:
393         port = int(sys.argv[1])
394     else:
395         port = 3904
396     
397     print "Load event schema file: " + DcaeVariables.CommonEventSchema
398     with open(DcaeVariables.CommonEventSchema) as opened_file:
399         global EvtSchema
400         EvtSchema = json.load(opened_file)
401         
402     server_address = ('', port)
403
404     handler_class.protocol_version = protocol
405     httpd = server_class(server_address, handler_class)
406
407     sa = httpd.socket.getsockname()
408     print "Serving HTTP on", sa[0], "port", sa[1], "..."
409     httpd.serve_forever()
410
411
412 if __name__ == '__main__':
413     _main_()