X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=tests%2Fdcaegen2%2Ftestcases%2Fresources%2FDMaaP.py;fp=tests%2Fdcaegen2%2Ftestcases%2Fresources%2FDMaaP.py;h=0000000000000000000000000000000000000000;hb=131d4dda822b637e816ef225d8a4c9981ccf56a7;hp=4c245614065d57d1437ad3ef45099a58deb9e52d;hpb=bbd96bd93c8d3c773aee53a69c44b3ac9cc3696d;p=integration%2Fcsit.git diff --git a/tests/dcaegen2/testcases/resources/DMaaP.py b/tests/dcaegen2/testcases/resources/DMaaP.py deleted file mode 100644 index 4c245614..00000000 --- a/tests/dcaegen2/testcases/resources/DMaaP.py +++ /dev/null @@ -1,418 +0,0 @@ -''' -Created on Aug 15, 2017 - -@author: sw6830 -''' -import os -import posixpath -import BaseHTTPServer -import urllib -import urlparse -import cgi -import sys -import shutil -import mimetypes -from jsonschema import validate -import jsonschema -import json -import DcaeVariables -import SimpleHTTPServer - -try: - from cStringIO import StringIO -except ImportError: - from StringIO import StringIO - -EvtSchema = None -DMaaPHttpd = None - - -def clean_up_event(): - sz = DcaeVariables.VESEventQ.qsize() - for i in range(sz): - try: - self.evtQueue.get_nowait() - except: - pass - - -def enque_event(evt): - if DcaeVariables.VESEventQ is not None: - try: - DcaeVariables.VESEventQ.put(evt) - return True - except Exception as e: - print (str(e)) - return False - return False - - -def deque_event(wait_sec=25): - if DcaeVariables.IsRobotRun: - pass - try: - evt = DcaeVariables.VESEventQ.get(True, wait_sec) - return evt - except Exception as e: - if DcaeVariables.IsRobotRun: - pass - - else: - print("DMaaP Event dequeue timeout") - return None - - -class DMaaPHandler(BaseHTTPServer.BaseHTTPRequestHandler): - - def do_PUT(self): - self.send_response(405) - return - - def do_POST(self): - resp_code = 0 - # Parse the form data posted - ''' - form = cgi.FieldStorage( - fp=self.rfile, - headers=self.headers, - environ={'REQUEST_METHOD':'POST', - 'CONTENT_TYPE':self.headers['Content-Type'], - }) - - - form = cgi.FieldStorage( - fp=self.rfile, - headers=self.headers, - environ={"REQUEST_METHOD": "POST"}) - - for item in form.list: - print "%s=%s" % (item.name, item.value) - - ''' - - if 'POST' not in self.requestline: - resp_code = 405 - - ''' - if resp_code == 0: - if '/eventlistener/v5' not in self.requestline and '/eventlistener/v5/eventBatch' not in self.requestline and \ - '/eventlistener/v5/clientThrottlingState' not in self.requestline: - resp_code = 404 - - - if resp_code == 0: - if 'Y29uc29sZTpaakprWWpsbE1qbGpNVEkyTTJJeg==' not in str(self.headers): - resp_code = 401 - ''' - - if resp_code == 0: - topic = self.extract_topic_from_path() - content_len = int(self.headers.getheader('content-length', 0)) - post_body = self.rfile.read(content_len) - - indx = post_body.index("{") - if indx != 0: - post_body = post_body[indx:] - - event = "\""+topic+"\":" + post_body - if not enque_event(event): - print "enque event fails" - - global EvtSchema - try: - if EvtSchema is None: - with open(DcaeVariables.CommonEventSchema) as opened_file: - EvtSchema = json.load(opened_file) - decoded_body = json.loads(post_body) - jsonschema.validate(decoded_body, EvtSchema) - except: - resp_code = 400 - - # Begin the response - if not DcaeVariables.IsRobotRun: - print ("Response Message:") - - ''' - { - "200" : { - "description" : "Success", - "schema" : { - "$ref" : "#/definitions/DR_Pub" - } - } - - rspStr = "{'responses' : {'200' : {'description' : 'Success'}}}" - rspStr1 = "{'count': 1, 'serverTimeMs': 3}" - - ''' - - if resp_code == 0: - if 'clientThrottlingState' in self.requestline: - self.send_response(204) - else: - self.send_response(200) - self.send_header('Content-Type', 'application/json') - self.end_headers() - self.wfile.write("{'count': 1, 'serverTimeMs': 3}") - self.wfile.close() - else: - self.send_response(resp_code) - - ''' - self.end_headers() - self.wfile.write('Client: %s\n' % str(self.client_address)) - self.wfile.write('User-agent: %s\n' % str(self.headers['user-agent'])) - self.wfile.write('Path: %s\n' % self.path) - self.wfile.write('Form data:\n') - self.wfile.close() - - # Echo back information about what was posted in the form - for field in form.keys(): - field_item = form[field] - if field_item.filename: - # The field contains an uploaded file - file_data = field_item.file.read() - file_len = len(file_data) - del file_data - self.wfile.write('\tUploaded %s as "%s" (%d bytes)\n' % \ - (field, field_item.filename, file_len)) - else: - # Regular form value - self.wfile.write('\t%s=%s\n' % (field, form[field].value)) - ''' - return - - def extract_topic_from_path(self): - return self.path["/events/".__len__():] - - def do_GET(self): - """Serve a GET request.""" - f = self.send_head() - if f: - try: - self.copyfile(f, self.wfile) - finally: - f.close() - - def do_HEAD(self): - """Serve a HEAD request.""" - f = self.send_head() - if f: - f.close() - - def send_head(self): - """Common code for GET and HEAD commands. - - This sends the response code and MIME headers. - - Return value is either a file object (which has to be copied - to the outputfile by the caller unless the command was HEAD, - and must be closed by the caller under all circumstances), or - None, in which case the caller has nothing further to do. - - """ - path = self.translate_path(self.path) - if os.path.isdir(path): - parts = urlparse.urlsplit(self.path) - if not parts.path.endswith('/'): - # redirect browser - doing basically what apache does - self.send_response(301) - new_parts = (parts[0], parts[1], parts[2] + '/', - parts[3], parts[4]) - new_url = urlparse.urlunsplit(new_parts) - self.send_header("Location", new_url) - self.end_headers() - return None - for index in "index.html", "index.htm": - index = os.path.join(path, index) - if os.path.exists(index): - path = index - break - else: - return self.list_directory(path) - ctype = self.guess_type(path) - try: - # Always read in binary mode. Opening files in text mode may cause - # newline translations, making the actual size of the content - # transmitted *less* than the content-length! - f = open(path, 'rb') - except IOError: - self.send_error(404, "File not found") - return None - try: - self.send_response(200) - self.send_header("Content-type", ctype) - fs = os.fstat(f.fileno()) - self.send_header("Content-Length", str(fs[6])) - self.send_header("Last-Modified", self.date_time_string(fs.st_mtime)) - self.end_headers() - return f - except: - f.close() - raise - - def list_directory(self, path): - """Helper to produce a directory listing (absent index.html). - - Return value is either a file object, or None (indicating an - error). In either case, the headers are sent, making the - interface the same as for send_head(). - - """ - try: - list_dir = os.listdir(path) - except os.error: - self.send_error(404, "No permission to list directory") - return None - list_dir.sort(key=lambda a: a.lower()) - f = StringIO() - displaypath = cgi.escape(urllib.unquote(self.path)) - f.write('') - f.write("\n