Merge "Update certificates in PM Mapper CSIT's"
[integration/csit.git] / tests / dcaegen2 / bbs-testcases / resources / simulator / httpServerLib.py
1 import _thread
2 import ssl
3 from http.server import HTTPServer
4
5
6 def header_200_and_json(self):
7     self.send_response(200)
8     self.send_header('Content-Type', 'application/json')
9     self.end_headers()
10     
11 def header_404_and_json(self):
12     self.send_response(404)
13     self.send_header('Content-Type', 'application/json')
14     self.end_headers()
15
16 def start_http_endpoint(port, handler_class):
17     _thread.start_new_thread(init_http_endpoints, (port, handler_class))
18
19
20 def start_https_endpoint(port, handler_class, keyfile, certfile, ca_certs):
21     _thread.start_new_thread(init_https_endpoints, (port, handler_class, keyfile, certfile, ca_certs))
22
23
24 def init_http_endpoints(port, handler_class, server_class=HTTPServer):
25     server = server_class(('', port), handler_class)
26     sa = server.socket.getsockname()
27     print("Serving HTTP on", sa[0], "port", sa[1], "for", handler_class, "...")
28     server.serve_forever()
29
30
31 def init_https_endpoints(port, handler_class, keyfile, certfile, ca_certs, server_class=HTTPServer):
32     server = server_class(('', port), handler_class)
33     server.socket = ssl.wrap_socket(server.socket, keyfile=keyfile, certfile=certfile,
34                                     ca_certs=ca_certs, server_side=True)
35     sa = server.socket.getsockname()
36     print("Serving HTTPS on", sa[0], "port", sa[1], "for", handler_class, "...")
37     server.serve_forever()