import re import time from http.server import BaseHTTPRequestHandler import httpServerLib posted_event_from_bbs = b'[]' received_event_to_get_method = b'[]' class DmaapSetup(BaseHTTPRequestHandler): """ This Handler is used by the test harness to prepare the buffers: test harness places VES events using PUT from the harness into the "received_event buffer" test harness will write policy topics from the BBS us to the posted event buffer """ def do_PUT(self): # Read the event from the test harness place it in the received event buffer if re.search('/set_get_event', self.path): content_length = int(self.headers['Content-Length']) global received_event_to_get_method received_event_to_get_method = self.rfile.read(content_length) httpServerLib.header_200_and_json(self) return def do_GET(self): # The test harness receives the policy triggers from the posted event # by issuing a get and receiving the response. if re.search('/events/dcaeClOutput', self.path): global posted_event_from_bbs httpServerLib.header_200_and_json(self) self.wfile.write(posted_event_from_bbs) return def do_POST(self): if re.search('/reset', self.path): global posted_event_from_bbs global received_event_to_get_method posted_event_from_bbs = b'[]' received_event_to_get_method = b'[]' httpServerLib.header_200_and_json(self) return class DMaaPHandler(BaseHTTPRequestHandler): """ This Handler is what the BBS uS connects to - The test library has posted the the VES events in the setup Handler which are then received by the BBS uS via this handler's do_GET function. Likewise the policy trigger posted by the BBS uS is received and placed in the in the posted event buffer which the test harness retrieves using the setup handler. """ def do_POST(self): # Post of the policy triggers from the BBS uS if re.search('/events/unauthenticated.DCAE_CL_OUTPUT', self.path): global posted_event_from_bbs content_length = int(self.headers['Content-Length']) posted_event_from_bbs = self.rfile.read(content_length) httpServerLib.header_200_and_json(self) return def do_GET(self): # BBS uS issues a Get to receive VES and PNF UPdate event from DMAAP global received_event_to_get_method if re.search('/events/unauthenticated.PNF_UPDATE', self.path): httpServerLib.header_200_and_json(self) self.wfile.write(received_event_to_get_method) elif re.search('/events/unauthenticated_CPE_AUTHENTICATION/OpenDcae-c12/c12', self.path): httpServerLib.header_200_and_json(self) self.wfile.write(received_event_to_get_method) return def _main_(handler_class=DMaaPHandler, protocol="HTTP/1.0"): handler_class.protocol_version = protocol httpServerLib.start_http_endpoint(2222, DMaaPHandler) httpServerLib.start_https_endpoint(2223, DMaaPHandler, keyfile="certs/org.onap.dmaap-bc.key", certfile="certs/dmaap_bc_topic_mgr_dmaap_bc.onap.org.cer", ca_certs="certs/ca_local_0.cer") httpServerLib.start_http_endpoint(2224, DmaapSetup) while 1: time.sleep(10) if __name__ == '__main__': _main_()