3 Program which acts as the collector for the Vendor Event Listener REST API.
5 Only intended for test purposes.
10 ===================================================================
11 Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
12 ===================================================================
13 Licensed under the Apache License, Version 2.0 (the "License");
14 you may not use this file except in compliance with the License.
15 You may obtain a copy of the License at
17 http://www.apache.org/licenses/LICENSE-2.0
19 Unless required by applicable law or agreed to in writing, software
20 distributed under the License is distributed on an "AS IS" BASIS,
21 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22 See the License for the specific language governing permissions and
23 limitations under the License.
26 from rest_dispatcher import PathDispatcher, set_404_content
27 from wsgiref.simple_server import make_server
33 from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
35 import logging.handlers
36 from base64 import b64decode
40 from functools import partial
46 <title>Hello {name}</title>
49 <h1>Hello {name}!</h1>
53 _localtime_resp = '''\
56 <year>{t.tm_year}</year>
57 <month>{t.tm_mon}</month>
58 <day>{t.tm_mday}</day>
59 <hour>{t.tm_hour}</hour>
60 <minute>{t.tm_min}</minute>
61 <second>{t.tm_sec}</second>
66 __date__ = '2015-12-04'
67 __updated__ = '2015-12-04'
73 #------------------------------------------------------------------------------
74 # Credentials we expect clients to authenticate themselves with.
75 #------------------------------------------------------------------------------
79 #------------------------------------------------------------------------------
80 # The JSON schema which we will use to validate events.
81 #------------------------------------------------------------------------------
84 #------------------------------------------------------------------------------
85 # The JSON schema which we will use to validate client throttle state.
86 #------------------------------------------------------------------------------
87 throttle_schema = None
89 #------------------------------------------------------------------------------
90 # The JSON schema which we will use to provoke throttling commands for testing.
91 #------------------------------------------------------------------------------
92 test_control_schema = None
94 #------------------------------------------------------------------------------
95 # Pending command list from the testControl API
96 # This is sent as a response commandList to the next received event.
97 #------------------------------------------------------------------------------
98 pending_command_list = None
100 #------------------------------------------------------------------------------
101 # Logger for this module.
102 #------------------------------------------------------------------------------
105 def listener(environ, start_response, schema):
107 Handler for the Vendor Event Listener REST API.
109 Extract headers and the body and check that:
111 1) The client authenticated themselves correctly.
112 2) The body validates against the provided schema for the API.
115 logger.info('Got a Vendor Event request')
116 print('==== ' + time.asctime() + ' ' + '=' * 49)
118 #--------------------------------------------------------------------------
119 # Extract the content from the request.
120 #--------------------------------------------------------------------------
121 length = int(environ.get('CONTENT_LENGTH', '0'))
122 logger.debug('Content Length: {0}'.format(length))
123 body = environ['wsgi.input'].read(length)
124 logger.debug('Content Body: {0}'.format(body))
126 mode, b64_credentials = string.split(environ.get('HTTP_AUTHORIZATION',
128 # logger.debug('Auth. Mode: {0} Credentials: {1}'.format(mode,
130 logger.debug('Auth. Mode: {0} Credentials: ****'.format(mode))
131 if (b64_credentials != 'None'):
132 credentials = b64decode(b64_credentials)
136 logger.debug('Credentials: {0}'.format(credentials))
137 #logger.debug('Credentials: ****')
139 #--------------------------------------------------------------------------
140 # If we have a schema file then check that the event matches that expected.
141 #--------------------------------------------------------------------------
142 if (schema is not None):
143 logger.debug('Attempting to validate data: {0}\n'
144 'Against schema: {1}'.format(body, schema))
146 decoded_body = json.loads(body)
147 jsonschema.validate(decoded_body, schema)
148 logger.info('Event is valid!')
149 print('Valid body decoded & checked against schema OK:\n'
150 '{0}'.format(json.dumps(decoded_body,
153 separators=(',', ': '))))
155 except jsonschema.SchemaError as e:
156 logger.error('Schema is not valid! {0}'.format(e))
157 print('Schema is not valid! {0}'.format(e))
159 except jsonschema.ValidationError as e:
160 logger.warn('Event is not valid against schema! {0}'.format(e))
161 print('Event is not valid against schema! {0}'.format(e))
162 print('Bad JSON body decoded:\n'
163 '{0}'.format(json.dumps(decoded_body,
166 separators=(',', ': '))))
168 except Exception as e:
169 logger.error('Event invalid for unexpected reason! {0}'.format(e))
170 print('Schema is not valid for unexpected reason! {0}'.format(e))
172 logger.debug('No schema so just decode JSON: {0}'.format(body))
174 decoded_body = json.loads(body)
175 print('Valid JSON body (no schema checking) decoded:\n'
176 '{0}'.format(json.dumps(decoded_body,
179 separators=(',', ': '))))
180 logger.info('Event is valid JSON but not checked against schema!')
182 except Exception as e:
183 logger.error('Event invalid for unexpected reason! {0}'.format(e))
184 print('JSON body not valid for unexpected reason! {0}'.format(e))
186 #--------------------------------------------------------------------------
187 # See whether the user authenticated themselves correctly.
188 #--------------------------------------------------------------------------
189 if (credentials == (vel_username + ':' + vel_password)):
190 logger.debug('Authenticated OK')
191 print('Authenticated OK')
193 #----------------------------------------------------------------------
194 # Respond to the caller. If we have a pending commandList from the
195 # testControl API, send it in response.
196 #----------------------------------------------------------------------
197 global pending_command_list
198 if pending_command_list is not None:
199 start_response('202 Accepted',
200 [('Content-type', 'application/json')])
201 response = pending_command_list
202 pending_command_list = None
205 print('Sending pending commandList in the response:\n'
206 '{0}'.format(json.dumps(response,
209 separators=(',', ': '))))
211 yield json.dumps(response)
213 start_response('202 Accepted', [])
216 logger.warn('Failed to authenticate OK'+vel_username + ':' + vel_password)
217 print('Failed to authenticate OK'+vel_username + ':' + vel_password)
219 #----------------------------------------------------------------------
220 # Respond to the caller.
221 #----------------------------------------------------------------------
222 start_response('401 Unauthorized', [ ('Content-type',
223 'application/json')])
224 req_error = { 'requestError': {
226 'messageId': 'POL0001',
227 'text': 'Failed to authenticate'
231 yield json.dumps(req_error)
233 def test_listener(environ, start_response, schema):
235 Handler for the Test Collector Test Control API.
237 There is no authentication on this interface.
239 This simply stores a commandList which will be sent in response to the next
240 incoming event on the EVEL interface.
242 global pending_command_list
243 logger.info('Got a Test Control input')
244 print('============================')
245 print('==== TEST CONTROL INPUT ====')
247 #--------------------------------------------------------------------------
248 # GET allows us to get the current pending request.
249 #--------------------------------------------------------------------------
250 if environ.get('REQUEST_METHOD') == 'GET':
251 start_response('200 OK', [('Content-type', 'application/json')])
252 yield json.dumps(pending_command_list)
255 #--------------------------------------------------------------------------
256 # Extract the content from the request.
257 #--------------------------------------------------------------------------
258 length = int(environ.get('CONTENT_LENGTH', '0'))
259 logger.debug('TestControl Content Length: {0}'.format(length))
260 body = environ['wsgi.input'].read(length)
261 logger.debug('TestControl Content Body: {0}'.format(body))
263 #--------------------------------------------------------------------------
264 # If we have a schema file then check that the event matches that expected.
265 #--------------------------------------------------------------------------
266 if (schema is not None):
267 logger.debug('Attempting to validate data: {0}\n'
268 'Against schema: {1}'.format(body, schema))
270 decoded_body = json.loads(body)
271 jsonschema.validate(decoded_body, schema)
272 logger.info('TestControl is valid!')
273 print('TestControl:\n'
274 '{0}'.format(json.dumps(decoded_body,
277 separators=(',', ': '))))
279 except jsonschema.SchemaError as e:
280 logger.error('TestControl Schema is not valid: {0}'.format(e))
281 print('TestControl Schema is not valid: {0}'.format(e))
283 except jsonschema.ValidationError as e:
284 logger.warn('TestControl input not valid: {0}'.format(e))
285 print('TestControl input not valid: {0}'.format(e))
286 print('Bad JSON body decoded:\n'
287 '{0}'.format(json.dumps(decoded_body,
290 separators=(',', ': '))))
292 except Exception as e:
293 logger.error('TestControl input not valid: {0}'.format(e))
294 print('TestControl Schema not valid: {0}'.format(e))
296 logger.debug('Missing schema just decode JSON: {0}'.format(body))
298 decoded_body = json.loads(body)
299 print('Valid JSON body (no schema checking) decoded:\n'
300 '{0}'.format(json.dumps(decoded_body,
303 separators=(',', ': '))))
304 logger.info('TestControl input not checked against schema!')
306 except Exception as e:
307 logger.error('TestControl input not valid: {0}'.format(e))
308 print('TestControl input not valid: {0}'.format(e))
310 #--------------------------------------------------------------------------
311 # Respond to the caller. If we received otherField 'ThrottleRequest',
312 # generate the appropriate canned response.
313 #--------------------------------------------------------------------------
314 pending_command_list = decoded_body
315 print('===== TEST CONTROL END =====')
316 print('============================')
317 start_response('202 Accepted', [])
322 Main function for the collector start-up.
324 Called with command-line arguments:
326 * --section *<section>*
331 *<file>* specifies the path to the configuration file.
333 *<section>* specifies the section within that config file.
335 *verbose* generates more information in the log files.
337 The process listens for REST API invocations and checks them. Errors are
338 displayed to stdout and logged.
344 sys.argv.extend(argv)
346 program_name = os.path.basename(sys.argv[0])
347 program_version = 'v{0}'.format(__version__)
348 program_build_date = str(__updated__)
349 program_version_message = '%%(prog)s {0} ({1})'.format(program_version,
351 if (__import__('__main__').__doc__ is not None):
352 program_shortdesc = __import__('__main__').__doc__.split('\n')[1]
354 program_shortdesc = 'Running in test harness'
355 program_license = '''{0}
358 Copyright 2015 Metaswitch Networks Ltd. All rights reserved.
360 Distributed on an "AS IS" basis without warranties
361 or conditions of any kind, either express or implied.
364 '''.format(program_shortdesc, str(__date__))
367 #----------------------------------------------------------------------
368 # Setup argument parser so we can parse the command-line.
369 #----------------------------------------------------------------------
370 parser = ArgumentParser(description=program_license,
371 formatter_class=ArgumentDefaultsHelpFormatter)
372 parser.add_argument('-v', '--verbose',
375 help='set verbosity level')
376 parser.add_argument('-V', '--version',
378 version=program_version_message,
379 help='Display version information')
380 parser.add_argument('-a', '--api-version',
383 help='set API version')
384 parser.add_argument('-c', '--config',
386 default='/etc/opt/att/collector.conf',
387 help='Use this config file.',
389 parser.add_argument('-s', '--section',
393 help='section to use in the config file')
395 #----------------------------------------------------------------------
396 # Process arguments received.
397 #----------------------------------------------------------------------
398 args = parser.parse_args()
399 verbose = args.verbose
400 api_version = args.api_version
401 config_file = args.config
402 config_section = args.section
404 #----------------------------------------------------------------------
405 # Now read the config file, using command-line supplied values as
407 #----------------------------------------------------------------------
408 defaults = {'log_file': 'collector.log',
411 'vel_topic_name': '',
412 'transport_prot': 'http'
415 config = ConfigParser.SafeConfigParser(defaults)
416 config.read(config_file)
418 #----------------------------------------------------------------------
419 # extract the values we want.
420 #----------------------------------------------------------------------
421 log_file = config.get(config_section, 'log_file', vars=overrides)
422 vel_port = config.get(config_section, 'vel_port', vars=overrides)
423 vel_path = config.get(config_section, 'vel_path', vars=overrides)
424 transport_prot = config.get(config_section, 'protocol', vars=overrides)
425 vel_topic_name = config.get(config_section,
429 if (transport_prot.lower() != 'http' and transport_prot.lower() != 'https' ):
430 logger.error('Invalid Transport must be http or https ({0}) '
431 'specified'.format(transport_prot))
432 raise RuntimeError('Invalid Transport protcol specified ({0}) '
433 'specified'.format(transport_prot))
436 vel_username = config.get(config_section,
439 vel_password = config.get(config_section,
442 vel_schema_file = config.get(config_section,
445 base_schema_file = config.get(config_section,
448 throttle_schema_file = config.get(config_section,
449 'throttle_schema_file',
451 test_control_schema_file = config.get(config_section,
452 'test_control_schema_file',
455 #----------------------------------------------------------------------
456 # Finally we have enough info to start a proper flow trace.
457 #----------------------------------------------------------------------
459 print('Logfile: {0}'.format(log_file))
460 logger = logging.getLogger('collector')
462 print('Verbose mode on')
463 logger.setLevel(logging.DEBUG)
465 logger.setLevel(logging.INFO)
466 handler = logging.handlers.RotatingFileHandler(log_file,
470 if (transport_prot.lower() == 'https' ):
471 transport_prot = transport_prot.lower()
472 ca_file = config.get(config_section, 'ca_file', vars=overrides)
473 cert_file = config.get(config_section, 'cert_file', vars=overrides)
474 key_file = config.get(config_section, 'key_file', vars=overrides)
475 if not os.path.exists(ca_file):
476 logger.error('Event Listener SSL CA File ({0}) not found. '
477 'No validation will be undertaken.'.format(ca_file))
478 raise RuntimeError('Invalid CA file ({0}) '
479 'specified'.format(ca_file))
480 if not os.path.exists(cert_file):
481 logger.error('Event Listener SSL Certificate File ({0}) not found. '
482 'No validation will be undertaken.'.format(cert_file))
483 raise RuntimeError('Invalid Certificate file ({0}) '
484 'specified'.format(cert_file))
485 if not os.path.exists(key_file):
486 logger.error('Event Listener SSL Key File ({0}) not found. '
487 'No validation will be undertaken.'.format(key_file))
488 raise RuntimeError('Invalid Key file ({0}) '
489 'specified'.format(key_file))
491 if (platform.system() == 'Windows'):
492 date_format = '%Y-%m-%d %H:%M:%S'
494 date_format = '%Y-%m-%d %H:%M:%S.%f %z'
495 formatter = logging.Formatter('%(asctime)s %(name)s - '
496 '%(levelname)s - %(message)s',
498 handler.setFormatter(formatter)
499 logger.addHandler(handler)
500 logger.info('Started')
502 #----------------------------------------------------------------------
503 # Log the details of the configuration.
504 #----------------------------------------------------------------------
505 logger.debug('Log file = {0}'.format(log_file))
506 logger.debug('Event Listener Transport = {0}'.format(transport_prot))
507 logger.debug('Event Listener Port = {0}'.format(vel_port))
508 logger.debug('Event Listener Path = {0}'.format(vel_path))
509 logger.debug('Event Listener Topic = {0}'.format(vel_topic_name))
510 logger.debug('Event Listener Username = {0}'.format(vel_username))
511 # logger.debug('Event Listener Password = {0}'.format(vel_password))
512 logger.debug('Event Listener JSON Schema File = {0}'.format(
514 logger.debug('Base JSON Schema File = {0}'.format(base_schema_file))
515 logger.debug('Throttle JSON Schema File = {0}'.format(
516 throttle_schema_file))
517 logger.debug('Test Control JSON Schema File = {0}'.format(
518 test_control_schema_file))
520 #----------------------------------------------------------------------
521 # Perform some basic error checking on the config.
522 #----------------------------------------------------------------------
523 if (int(vel_port) < 1024 or int(vel_port) > 65535):
524 logger.error('Invalid Vendor Event Listener port ({0}) '
525 'specified'.format(vel_port))
526 raise RuntimeError('Invalid Vendor Event Listener port ({0}) '
527 'specified'.format(vel_port))
529 if (vel_path and vel_path[-1] != '/'):
530 logger.warning('Event Listener Path ({0}) should have terminating '
531 '"/"! Adding one on to configured string.'.format(
535 #----------------------------------------------------------------------
536 # Load up the vel_schema, if it exists.
537 #----------------------------------------------------------------------
538 if not os.path.exists(vel_schema_file):
539 logger.warning('Event Listener Schema File ({0}) not found. '
540 'No validation will be undertaken.'.format(
544 global throttle_schema
545 global test_control_schema
546 vel_schema = json.load(open(vel_schema_file, 'r'))
547 logger.debug('Loaded the JSON schema file')
549 #------------------------------------------------------------------
550 # Load up the throttle_schema, if it exists.
551 #------------------------------------------------------------------
552 if (os.path.exists(throttle_schema_file)):
553 logger.debug('Loading throttle schema')
554 throttle_fragment = json.load(open(throttle_schema_file, 'r'))
556 throttle_schema.update(vel_schema)
557 throttle_schema.update(throttle_fragment)
558 logger.debug('Loaded the throttle schema')
560 #------------------------------------------------------------------
561 # Load up the test control _schema, if it exists.
562 #------------------------------------------------------------------
563 if (os.path.exists(test_control_schema_file)):
564 logger.debug('Loading test control schema')
565 test_control_fragment = json.load(
566 open(test_control_schema_file, 'r'))
567 test_control_schema = {}
568 test_control_schema.update(vel_schema)
569 test_control_schema.update(test_control_fragment)
570 logger.debug('Loaded the test control schema')
572 #------------------------------------------------------------------
573 # Load up the base_schema, if it exists.
574 #------------------------------------------------------------------
575 if (os.path.exists(base_schema_file)):
576 logger.debug('Updating the schema with base definition')
577 base_schema = json.load(open(base_schema_file, 'r'))
578 vel_schema.update(base_schema)
579 logger.debug('Updated the JSON schema file')
581 #----------------------------------------------------------------------
582 # We are now ready to get started with processing. Start-up the various
583 # components of the system in order:
585 # 1) Create the dispatcher.
586 # 2) Register the functions for the URLs of interest.
587 # 3) Run the webserver.
588 #----------------------------------------------------------------------
589 root_url = '/{0}eventListener/v{1}{2}'.\
593 if vel_topic_name else '')
594 throttle_url = '/{0}eventListener/v{1}/clientThrottlingState'.\
595 format(vel_path, api_version)
596 batch_url = '/{0}eventListener/v{1}/eventBatch'.\
597 format(vel_path, api_version)
598 set_404_content(root_url)
599 dispatcher = PathDispatcher()
600 vendor_event_listener = partial(listener, schema = vel_schema)
601 dispatcher.register('GET', root_url, vendor_event_listener)
602 dispatcher.register('POST', root_url, vendor_event_listener)
603 batch_event_listener = partial(listener, schema = vel_schema)
604 dispatcher.register('GET', batch_url, batch_event_listener)
605 dispatcher.register('POST', batch_url, batch_event_listener)
606 vendor_throttle_listener = partial(listener, schema = throttle_schema)
607 dispatcher.register('GET', throttle_url, vendor_throttle_listener)
608 dispatcher.register('POST', throttle_url, vendor_throttle_listener)
610 #----------------------------------------------------------------------
611 # We also add a POST-only mechanism for test control, so that we can
612 # send commands to a single attached client.
613 #----------------------------------------------------------------------
614 test_control_url = '/testControl/v{0}/commandList'.format(api_version)
615 test_control_listener = partial(test_listener,
616 schema = test_control_schema)
617 dispatcher.register('POST', test_control_url, test_control_listener)
618 dispatcher.register('GET', test_control_url, test_control_listener)
620 httpd = make_server('', int(vel_port), dispatcher)
621 if (transport_prot == 'https' ):
622 #httpd.socket = ssl.wrap_socket(httpd.socket, server_side=True, ca_certs = "../../../sslcerts/test.ca.pem", certfile="../../../sslcerts/www.testsite.com.crt", keyfile="../../../sslcerts/www.testsite.com.key", cert_reqs=ssl.CERT_REQUIRED, ssl_version=ssl.PROTOCOL_TLSv1_2)
623 logger.debug('Invoking HTTP Secure mode : ca file {0} cert file {1} key file {2} '.format(ca_file,cert_file,key_file))
624 httpd.socket = ssl.wrap_socket(httpd.socket, server_side=True, ca_certs=ca_file, certfile=cert_file, keyfile=key_file, cert_reqs=ssl.CERT_REQUIRED, ssl_version=ssl.PROTOCOL_TLSv1_2)
625 print('Serving on port {0}...'.format(vel_port))
626 httpd.serve_forever()
628 logger.error('Main loop exited unexpectedly!')
631 except KeyboardInterrupt:
632 #----------------------------------------------------------------------
633 # handle keyboard interrupt
634 #----------------------------------------------------------------------
635 logger.info('Exiting on keyboard interrupt!')
638 except Exception as e:
639 #----------------------------------------------------------------------
640 # Handle unexpected exceptions.
641 #----------------------------------------------------------------------
644 indent = len(program_name) * ' '
645 sys.stderr.write(program_name + ': ' + repr(e) + '\n')
646 sys.stderr.write(indent + ' for help use --help\n')
647 sys.stderr.write(traceback.format_exc())
648 logger.critical('Exiting because of exception: {0}'.format(e))
649 logger.critical(traceback.format_exc())
652 #------------------------------------------------------------------------------
653 # MAIN SCRIPT ENTRY POINT.
654 #------------------------------------------------------------------------------
655 if __name__ == '__main__':
657 #----------------------------------------------------------------------
658 # Running tests - note that doctest comments haven't been included so
659 # this is a hook for future improvements.
660 #----------------------------------------------------------------------
665 #----------------------------------------------------------------------
666 # Profiling performance. Performance isn't expected to be a major
667 # issue, but this should all work as expected.
668 #----------------------------------------------------------------------
671 profile_filename = 'collector_profile.txt'
672 cProfile.run('main()', profile_filename)
673 statsfile = open('collector_profile_stats.txt', 'wb')
674 p = pstats.Stats(profile_filename, stream=statsfile)
675 stats = p.strip_dirs().sort_stats('cumulative')
680 #--------------------------------------------------------------------------
681 # Normal operation - call through to the main function.
682 #--------------------------------------------------------------------------