edeec8cf762e1d6389232fc64800d8d696ddcd59
[integration/csit.git] / tests / dcaegen2 / bbs-testcases / resources / simulator / DMaaP.py
1 import re
2 import time
3 from http.server import BaseHTTPRequestHandler
4 import httpServerLib
5
6
7 posted_event_from_bbs = b'[]'
8 received_event_to_get_method = b'[]'
9
10
11 class DmaapSetup(BaseHTTPRequestHandler):
12
13     """
14     This Handler is used by the test harness to prepare the buffers:
15         test harness places VES events using PUT from the harness into the 
16         "received_event buffer"
17         test harness will write policy topics from the BBS us to the posted event buffer
18     """
19     def do_PUT(self):
20         # Read the event from the test harness place it in the received event buffer
21         if re.search('/set_get_event', self.path):
22             content_length = int(self.headers['Content-Length'])
23             global received_event_to_get_method
24             received_event_to_get_method = self.rfile.read(content_length)
25             httpServerLib.header_200_and_json(self)
26
27         return
28
29     def do_GET(self):
30         # The test harness receives the policy triggers from the posted event
31         # by issuing a get and receiving the response.
32         if re.search('/events/dcaeClOutput', self.path):
33             global posted_event_from_bbs
34             httpServerLib.header_200_and_json(self)
35             self.wfile.write(posted_event_from_bbs)
36
37         return
38
39     def do_POST(self):
40         if re.search('/reset', self.path):
41             global posted_event_from_bbs
42             global received_event_to_get_method
43             posted_event_from_bbs = b'[]'
44             received_event_to_get_method = b'[]'
45             httpServerLib.header_200_and_json(self)
46
47         return
48
49
50 class DMaaPHandler(BaseHTTPRequestHandler):
51     """
52     This Handler is what the BBS uS connects to - The test library has posted the
53     the VES events in the setup Handler which are then received by the BBS uS via
54     this handler's do_GET function.
55     Likewise the policy trigger posted by the BBS uS is received and placed in the
56      in the posted event buffer which the test harness retrieves using the setup handler.
57     """
58
59     def do_POST(self):
60         # Post of the policy triggers from the BBS uS
61         if re.search('/events/unauthenticated.DCAE_CL_OUTPUT', self.path):
62             global posted_event_from_bbs
63             content_length = int(self.headers['Content-Length'])
64             posted_event_from_bbs = self.rfile.read(content_length)
65             httpServerLib.header_200_and_json(self)
66
67         return
68
69     def do_GET(self):
70         # BBS uS issues a Get to receive VES and PNF UPdate event from DMAAP
71         global received_event_to_get_method
72         if re.search('/events/unauthenticated.PNF_UPDATE', self.path):
73             httpServerLib.header_200_and_json(self)
74             self.wfile.write(received_event_to_get_method)
75         elif re.search('/events/unauthenticated_CPE_AUTHENTICATION/OpenDcae-c12/c12', self.path):
76             httpServerLib.header_200_and_json(self)
77             self.wfile.write(received_event_to_get_method)
78
79         return
80
81
82 def _main_(handler_class=DMaaPHandler, protocol="HTTP/1.0"):
83     handler_class.protocol_version = protocol
84     httpServerLib.start_http_endpoint(2222, DMaaPHandler)
85     httpServerLib.start_https_endpoint(2223, DMaaPHandler, keyfile="certs/org.onap.dmaap-bc.key", certfile="certs/dmaap_bc_topic_mgr_dmaap_bc.onap.org.cer", ca_certs="certs/ca_local_0.cer")
86     httpServerLib.start_http_endpoint(2224, DmaapSetup)
87     while 1:
88         time.sleep(10)
89
90
91 if __name__ == '__main__':
92     _main_()