3 from http.server import BaseHTTPRequestHandler
7 posted_event_from_bbs = b'[]'
8 received_event_to_get_method = b'[]'
11 class DmaapSetup(BaseHTTPRequestHandler):
14 This Handler is used by the test harness to prepare the buffers:
15 test harness places VES events using PUT from the harness into the
16 "received_event buffer"
17 test harness will write policy topics from the BBS us to the posted event buffer
20 # Read the event from the test harness place it in the received event buffer
21 if re.search('/set_get_event', self.path):
22 content_length = int(self.headers['Content-Length'])
23 global received_event_to_get_method
24 received_event_to_get_method = self.rfile.read(content_length)
25 httpServerLib.header_200_and_json(self)
30 # The test harness receives the policy triggers from the posted event
31 # by issuing a get and receiving the response.
32 if re.search('/events/dcaeClOutput', self.path):
33 global posted_event_from_bbs
34 httpServerLib.header_200_and_json(self)
35 self.wfile.write(posted_event_from_bbs)
40 if re.search('/reset', self.path):
41 global posted_event_from_bbs
42 global received_event_to_get_method
43 posted_event_from_bbs = b'[]'
44 received_event_to_get_method = b'[]'
45 httpServerLib.header_200_and_json(self)
50 class DMaaPHandler(BaseHTTPRequestHandler):
52 This Handler is what the BBS uS connects to - The test library has posted the
53 the VES events in the setup Handler which are then received by the BBS uS via
54 this handler's do_GET function.
55 Likewise the policy trigger posted by the BBS uS is received and placed in the
56 in the posted event buffer which the test harness retrieves using the setup handler.
60 # Post of the policy triggers from the BBS uS
61 if re.search('/events/unauthenticated.DCAE_CL_OUTPUT', self.path):
62 global posted_event_from_bbs
63 content_length = int(self.headers['Content-Length'])
64 posted_event_from_bbs = self.rfile.read(content_length)
65 httpServerLib.header_200_and_json(self)
70 # BBS uS issues a Get to receive VES and PNF UPdate event from DMAAP
71 global received_event_to_get_method
72 if re.search('/events/unauthenticated.PNF_UPDATE', self.path):
73 httpServerLib.header_200_and_json(self)
74 self.wfile.write(received_event_to_get_method)
75 elif re.search('/events/unauthenticated_CPE_AUTHENTICATION/OpenDcae-c12/c12', self.path):
76 httpServerLib.header_200_and_json(self)
77 self.wfile.write(received_event_to_get_method)
82 def _main_(handler_class=DMaaPHandler, protocol="HTTP/1.0"):
83 handler_class.protocol_version = protocol
84 httpServerLib.start_http_endpoint(2222, DMaaPHandler)
85 httpServerLib.start_https_endpoint(2223, DMaaPHandler, keyfile="certs/org.onap.dmaap-bc.key", certfile="certs/dmaap_bc_topic_mgr_dmaap_bc.onap.org.cer", ca_certs="certs/ca_local_0.cer")
86 httpServerLib.start_http_endpoint(2224, DmaapSetup)
91 if __name__ == '__main__':