3 Program which acts as the collector for the Vendor Event Listener REST API.
5 Only intended for test purposes.
10 Copyright(c) <2016>, AT&T Intellectual Property. All other rights reserved.
12 Redistribution and use in source and binary forms, with or without
13 modification, are permitted provided that the following conditions are met:
15 1. Redistributions of source code must retain the above copyright notice,
16 this list of conditions and the following disclaimer.
17 2. Redistributions in binary form must reproduce the above copyright notice,
18 this list of conditions and the following disclaimer in the documentation
19 and/or other materials provided with the distribution.
20 3. All advertising materials mentioning features or use of this software
21 must display the following acknowledgement: This product includes
22 software developed by the AT&T.
23 4. Neither the name of AT&T nor the names of its contributors may be used to
24 endorse or promote products derived from this software without specific
25 prior written permission.
27 THIS SOFTWARE IS PROVIDED BY AT&T INTELLECTUAL PROPERTY ''AS IS'' AND ANY
28 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
29 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
30 DISCLAIMED. IN NO EVENT SHALL AT&T INTELLECTUAL PROPERTY BE LIABLE FOR ANY
31 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
32 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
33 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
34 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
35 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
36 THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 from rest_dispatcher import PathDispatcher, set_404_content
40 from wsgiref.simple_server import make_server
46 from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
48 import logging.handlers
49 from base64 import b64decode
53 from jsonschema import Draft4Validator
54 from functools import partial
59 <title>Hello {name}</title>
62 <h1>Hello {name}!</h1>
66 _localtime_resp = '''\
69 <year>{t.tm_year}</year>
70 <month>{t.tm_mon}</month>
71 <day>{t.tm_mday}</day>
72 <hour>{t.tm_hour}</hour>
73 <minute>{t.tm_min}</minute>
74 <second>{t.tm_sec}</second>
79 __date__ = '2015-12-04'
80 __updated__ = '2015-12-04'
86 #------------------------------------------------------------------------------
87 # Credentials we expect clients to authenticate themselves with.
88 #------------------------------------------------------------------------------
92 #------------------------------------------------------------------------------
93 # The JSON schema which we will use to validate events.
94 #------------------------------------------------------------------------------
97 #------------------------------------------------------------------------------
98 # The JSON schema which we will use to validate client throttle state.
99 #------------------------------------------------------------------------------
100 throttle_schema = None
102 #------------------------------------------------------------------------------
103 # The JSON schema which we will use to provoke throttling commands for testing.
104 #------------------------------------------------------------------------------
105 test_control_schema = None
107 #------------------------------------------------------------------------------
108 # Pending command list from the testControl API
109 # This is sent as a response commandList to the next received event.
110 #------------------------------------------------------------------------------
111 pending_command_list = None
113 #------------------------------------------------------------------------------
114 # Logger for this module.
115 #------------------------------------------------------------------------------
118 def listener(environ, start_response, schema):
120 Handler for the Vendor Event Listener REST API.
122 Extract headers and the body and check that:
124 1) The client authenticated themselves correctly.
125 2) The body validates against the provided schema for the API.
128 logger.info('Got a Vendor Event request')
129 print('==== ' + time.asctime() + ' ' + '=' * 49)
131 #--------------------------------------------------------------------------
132 # Extract the content from the request.
133 #--------------------------------------------------------------------------
134 length = int(environ.get('CONTENT_LENGTH', '0'))
135 logger.debug('Content Length: {0}'.format(length))
136 body = environ['wsgi.input'].read(length)
137 logger.debug('Content Body: {0}'.format(body))
139 mode, b64_credentials = string.split(environ.get('HTTP_AUTHORIZATION',
141 # logger.debug('Auth. Mode: {0} Credentials: {1}'.format(mode,
143 logger.debug('Auth. Mode: {0} Credentials: ****'.format(mode))
144 if (b64_credentials != 'None'):
145 credentials = b64decode(b64_credentials)
149 # logger.debug('Credentials: {0}'.format(credentials))
150 logger.debug('Credentials: ****')
152 #--------------------------------------------------------------------------
153 # If we have a schema file then check that the event matches that expected.
154 #--------------------------------------------------------------------------
155 if (schema is not None):
156 logger.debug('Attempting to validate data: {0}\n'
157 'Against schema: {1}'.format(body, schema))
159 decoded_body = json.loads(body)
160 jsonschema.validate(decoded_body, schema)
161 logger.info('Event is valid!')
162 print('Valid body decoded & checked against schema OK:\n'
163 '{0}'.format(json.dumps(decoded_body,
166 separators=(',', ': '))))
168 except jsonschema.SchemaError as e:
169 logger.error('Schema is not valid! {0}'.format(e))
170 print('Schema is not valid! {0}'.format(e))
172 except jsonschema.ValidationError as e:
173 logger.warn('Event is not valid against schema! {0}'.format(e))
174 print('Event is not valid against schema! {0}'.format(e))
175 print('Bad JSON body decoded:\n'
176 '{0}'.format(json.dumps(decoded_body,
179 separators=(',', ': '))))
181 except Exception as e:
182 logger.error('Event invalid for unexpected reason! {0}'.format(e))
183 print('Schema is not valid for unexpected reason! {0}'.format(e))
185 logger.debug('No schema so just decode JSON: {0}'.format(body))
187 decoded_body = json.loads(body)
188 print('Valid JSON body (no schema checking) decoded:\n'
189 '{0}'.format(json.dumps(decoded_body,
192 separators=(',', ': '))))
193 logger.info('Event is valid JSON but not checked against schema!')
195 except Exception as e:
196 logger.error('Event invalid for unexpected reason! {0}'.format(e))
197 print('JSON body not valid for unexpected reason! {0}'.format(e))
199 #--------------------------------------------------------------------------
200 # See whether the user authenticated themselves correctly.
201 #--------------------------------------------------------------------------
202 if (credentials == (vel_username + ':' + vel_password)):
203 logger.debug('Authenticated OK')
204 print('Authenticated OK')
206 #----------------------------------------------------------------------
207 # Respond to the caller. If we have a pending commandList from the
208 # testControl API, send it in response.
209 #----------------------------------------------------------------------
210 global pending_command_list
211 if pending_command_list is not None:
212 start_response('202 Accepted',
213 [('Content-type', 'application/json')])
214 response = pending_command_list
215 pending_command_list = None
218 print('Sending pending commandList in the response:\n'
219 '{0}'.format(json.dumps(response,
222 separators=(',', ': '))))
224 yield json.dumps(response)
226 start_response('202 Accepted', [])
229 logger.warn('Failed to authenticate OK')
230 print('Failed to authenticate OK')
232 #----------------------------------------------------------------------
233 # Respond to the caller.
234 #----------------------------------------------------------------------
235 start_response('401 Unauthorized', [ ('Content-type',
236 'application/json')])
237 req_error = { 'requestError': {
239 'messageId': 'POL0001',
240 'text': 'Failed to authenticate'
244 yield json.dumps(req_error)
246 def test_listener(environ, start_response, schema):
248 Handler for the Test Collector Test Control API.
250 There is no authentication on this interface.
252 This simply stores a commandList which will be sent in response to the next
253 incoming event on the EVEL interface.
255 global pending_command_list
256 logger.info('Got a Test Control input')
257 print('============================')
258 print('==== TEST CONTROL INPUT ====')
260 #--------------------------------------------------------------------------
261 # GET allows us to get the current pending request.
262 #--------------------------------------------------------------------------
263 if environ.get('REQUEST_METHOD') == 'GET':
264 start_response('200 OK', [('Content-type', 'application/json')])
265 yield json.dumps(pending_command_list)
268 #--------------------------------------------------------------------------
269 # Extract the content from the request.
270 #--------------------------------------------------------------------------
271 length = int(environ.get('CONTENT_LENGTH', '0'))
272 logger.debug('TestControl Content Length: {0}'.format(length))
273 body = environ['wsgi.input'].read(length)
274 logger.debug('TestControl Content Body: {0}'.format(body))
276 #--------------------------------------------------------------------------
277 # If we have a schema file then check that the event matches that expected.
278 #--------------------------------------------------------------------------
279 if (schema is not None):
280 logger.debug('Attempting to validate data: {0}\n'
281 'Against schema: {1}'.format(body, schema))
283 decoded_body = json.loads(body)
284 jsonschema.validate(decoded_body, schema)
285 logger.info('TestControl is valid!')
286 print('TestControl:\n'
287 '{0}'.format(json.dumps(decoded_body,
290 separators=(',', ': '))))
292 except jsonschema.SchemaError as e:
293 logger.error('TestControl Schema is not valid: {0}'.format(e))
294 print('TestControl Schema is not valid: {0}'.format(e))
296 except jsonschema.ValidationError as e:
297 logger.warn('TestControl input not valid: {0}'.format(e))
298 print('TestControl input not valid: {0}'.format(e))
299 print('Bad JSON body decoded:\n'
300 '{0}'.format(json.dumps(decoded_body,
303 separators=(',', ': '))))
305 except Exception as e:
306 logger.error('TestControl input not valid: {0}'.format(e))
307 print('TestControl Schema not valid: {0}'.format(e))
309 logger.debug('Missing schema just decode JSON: {0}'.format(body))
311 decoded_body = json.loads(body)
312 print('Valid JSON body (no schema checking) decoded:\n'
313 '{0}'.format(json.dumps(decoded_body,
316 separators=(',', ': '))))
317 logger.info('TestControl input not checked against schema!')
319 except Exception as e:
320 logger.error('TestControl input not valid: {0}'.format(e))
321 print('TestControl input not valid: {0}'.format(e))
323 #--------------------------------------------------------------------------
324 # Respond to the caller. If we received otherField 'ThrottleRequest',
325 # generate the appropriate canned response.
326 #--------------------------------------------------------------------------
327 pending_command_list = decoded_body
328 print('===== TEST CONTROL END =====')
329 print('============================')
330 start_response('202 Accepted', [])
335 Main function for the collector start-up.
337 Called with command-line arguments:
339 * --section *<section>*
344 *<file>* specifies the path to the configuration file.
346 *<section>* specifies the section within that config file.
348 *verbose* generates more information in the log files.
350 The process listens for REST API invocations and checks them. Errors are
351 displayed to stdout and logged.
357 sys.argv.extend(argv)
359 program_name = os.path.basename(sys.argv[0])
360 program_version = 'v{0}'.format(__version__)
361 program_build_date = str(__updated__)
362 program_version_message = '%%(prog)s {0} ({1})'.format(program_version,
364 if (__import__('__main__').__doc__ is not None):
365 program_shortdesc = __import__('__main__').__doc__.split('\n')[1]
367 program_shortdesc = 'Running in test harness'
368 program_license = '''{0}
371 Copyright 2015 Metaswitch Networks Ltd. All rights reserved.
373 Distributed on an "AS IS" basis without warranties
374 or conditions of any kind, either express or implied.
377 '''.format(program_shortdesc, str(__date__))
380 #----------------------------------------------------------------------
381 # Setup argument parser so we can parse the command-line.
382 #----------------------------------------------------------------------
383 parser = ArgumentParser(description=program_license,
384 formatter_class=ArgumentDefaultsHelpFormatter)
385 parser.add_argument('-v', '--verbose',
388 help='set verbosity level')
389 parser.add_argument('-V', '--version',
391 version=program_version_message,
392 help='Display version information')
393 parser.add_argument('-a', '--api-version',
396 help='set API version')
397 parser.add_argument('-c', '--config',
399 default='/etc/opt/att/collector.conf',
400 help='Use this config file.',
402 parser.add_argument('-s', '--section',
406 help='section to use in the config file')
408 #----------------------------------------------------------------------
409 # Process arguments received.
410 #----------------------------------------------------------------------
411 args = parser.parse_args()
412 verbose = args.verbose
413 api_version = args.api_version
414 config_file = args.config
415 config_section = args.section
417 #----------------------------------------------------------------------
418 # Now read the config file, using command-line supplied values as
420 #----------------------------------------------------------------------
421 defaults = {'log_file': 'collector.log',
427 config = ConfigParser.SafeConfigParser(defaults)
428 config.read(config_file)
430 #----------------------------------------------------------------------
431 # extract the values we want.
432 #----------------------------------------------------------------------
433 log_file = config.get(config_section, 'log_file', vars=overrides)
434 vel_port = config.get(config_section, 'vel_port', vars=overrides)
435 vel_path = config.get(config_section, 'vel_path', vars=overrides)
436 vel_topic_name = config.get(config_section,
441 vel_username = config.get(config_section,
444 vel_password = config.get(config_section,
447 vel_schema_file = config.get(config_section,
450 base_schema_file = config.get(config_section,
453 throttle_schema_file = config.get(config_section,
454 'throttle_schema_file',
456 test_control_schema_file = config.get(config_section,
457 'test_control_schema_file',
460 #----------------------------------------------------------------------
461 # Finally we have enough info to start a proper flow trace.
462 #----------------------------------------------------------------------
464 print('Logfile: {0}'.format(log_file))
465 logger = logging.getLogger('collector')
467 print('Verbose mode on')
468 logger.setLevel(logging.DEBUG)
470 logger.setLevel(logging.INFO)
471 handler = logging.handlers.RotatingFileHandler(log_file,
474 if (platform.system() == 'Windows'):
475 date_format = '%Y-%m-%d %H:%M:%S'
477 date_format = '%Y-%m-%d %H:%M:%S.%f %z'
478 formatter = logging.Formatter('%(asctime)s %(name)s - '
479 '%(levelname)s - %(message)s',
481 handler.setFormatter(formatter)
482 logger.addHandler(handler)
483 logger.info('Started')
485 #----------------------------------------------------------------------
486 # Log the details of the configuration.
487 #----------------------------------------------------------------------
488 logger.debug('Log file = {0}'.format(log_file))
489 logger.debug('Event Listener Port = {0}'.format(vel_port))
490 logger.debug('Event Listener Path = {0}'.format(vel_path))
491 logger.debug('Event Listener Topic = {0}'.format(vel_topic_name))
492 logger.debug('Event Listener Username = {0}'.format(vel_username))
493 # logger.debug('Event Listener Password = {0}'.format(vel_password))
494 logger.debug('Event Listener JSON Schema File = {0}'.format(
496 logger.debug('Base JSON Schema File = {0}'.format(base_schema_file))
497 logger.debug('Throttle JSON Schema File = {0}'.format(
498 throttle_schema_file))
499 logger.debug('Test Control JSON Schema File = {0}'.format(
500 test_control_schema_file))
502 #----------------------------------------------------------------------
503 # Perform some basic error checking on the config.
504 #----------------------------------------------------------------------
505 if (int(vel_port) < 1024 or int(vel_port) > 65535):
506 logger.error('Invalid Vendor Event Listener port ({0}) '
507 'specified'.format(vel_port))
508 raise RuntimeError('Invalid Vendor Event Listener port ({0}) '
509 'specified'.format(vel_port))
511 if (len(vel_path) > 0 and vel_path[-1] != '/'):
512 logger.warning('Event Listener Path ({0}) should have terminating '
513 '"/"! Adding one on to configured string.'.format(
517 #----------------------------------------------------------------------
518 # Load up the vel_schema, if it exists.
519 #----------------------------------------------------------------------
520 if not os.path.exists(vel_schema_file):
521 logger.warning('Event Listener Schema File ({0}) not found. '
522 'No validation will be undertaken.'.format(
526 global throttle_schema
527 global test_control_schema
528 vel_schema = json.load(open(vel_schema_file, 'r'))
529 Draft4Validator.check_schema(vel_schema)
530 logger.debug('Loaded the JSON schema file')
532 #------------------------------------------------------------------
533 # Load up the throttle_schema, if it exists.
534 #------------------------------------------------------------------
535 if (os.path.exists(throttle_schema_file)):
536 logger.debug('Loading throttle schema')
537 throttle_fragment = json.load(open(throttle_schema_file, 'r'))
539 throttle_schema.update(vel_schema)
540 throttle_schema.update(throttle_fragment)
541 logger.debug('Loaded the throttle schema')
543 #------------------------------------------------------------------
544 # Load up the test control _schema, if it exists.
545 #------------------------------------------------------------------
546 if (os.path.exists(test_control_schema_file)):
547 logger.debug('Loading test control schema')
548 test_control_fragment = json.load(
549 open(test_control_schema_file, 'r'))
550 test_control_schema = {}
551 test_control_schema.update(vel_schema)
552 test_control_schema.update(test_control_fragment)
553 logger.debug('Loaded the test control schema')
555 #------------------------------------------------------------------
556 # Load up the base_schema, if it exists.
557 #------------------------------------------------------------------
558 if (os.path.exists(base_schema_file)):
559 logger.debug('Updating the schema with base definition')
560 base_schema = json.load(open(base_schema_file, 'r'))
561 vel_schema.update(base_schema)
562 logger.debug('Updated the JSON schema file')
564 #----------------------------------------------------------------------
565 # We are now ready to get started with processing. Start-up the various
566 # components of the system in order:
568 # 1) Create the dispatcher.
569 # 2) Register the functions for the URLs of interest.
570 # 3) Run the webserver.
571 #----------------------------------------------------------------------
572 root_url = '/{0}eventListener/v{1}{2}'.\
576 if len(vel_topic_name) > 0
578 throttle_url = '/{0}eventListener/v{1}/clientThrottlingState'.\
579 format(vel_path, api_version)
580 set_404_content(root_url)
581 dispatcher = PathDispatcher()
582 vendor_event_listener = partial(listener, schema = vel_schema)
583 dispatcher.register('GET', root_url, vendor_event_listener)
584 dispatcher.register('POST', root_url, vendor_event_listener)
585 vendor_throttle_listener = partial(listener, schema = throttle_schema)
586 dispatcher.register('GET', throttle_url, vendor_throttle_listener)
587 dispatcher.register('POST', throttle_url, vendor_throttle_listener)
589 #----------------------------------------------------------------------
590 # We also add a POST-only mechanism for test control, so that we can
591 # send commands to a single attached client.
592 #----------------------------------------------------------------------
593 test_control_url = '/testControl/v{0}/commandList'.format(api_version)
594 test_control_listener = partial(test_listener,
595 schema = test_control_schema)
596 dispatcher.register('POST', test_control_url, test_control_listener)
597 dispatcher.register('GET', test_control_url, test_control_listener)
599 httpd = make_server('', int(vel_port), dispatcher)
600 print('Serving on port {0}...'.format(vel_port))
601 httpd.serve_forever()
603 logger.error('Main loop exited unexpectedly!')
606 except KeyboardInterrupt:
607 #----------------------------------------------------------------------
608 # handle keyboard interrupt
609 #----------------------------------------------------------------------
610 logger.info('Exiting on keyboard interrupt!')
613 except Exception as e:
614 #----------------------------------------------------------------------
615 # Handle unexpected exceptions.
616 #----------------------------------------------------------------------
619 indent = len(program_name) * ' '
620 sys.stderr.write(program_name + ': ' + repr(e) + '\n')
621 sys.stderr.write(indent + ' for help use --help\n')
622 sys.stderr.write(traceback.format_exc())
623 logger.critical('Exiting because of exception: {0}'.format(e))
624 logger.critical(traceback.format_exc())
627 #------------------------------------------------------------------------------
628 # MAIN SCRIPT ENTRY POINT.
629 #------------------------------------------------------------------------------
630 if __name__ == '__main__':
632 #----------------------------------------------------------------------
633 # Running tests - note that doctest comments haven't been included so
634 # this is a hook for future improvements.
635 #----------------------------------------------------------------------
640 #----------------------------------------------------------------------
641 # Profiling performance. Performance isn't expected to be a major
642 # issue, but this should all work as expected.
643 #----------------------------------------------------------------------
646 profile_filename = 'collector_profile.txt'
647 cProfile.run('main()', profile_filename)
648 statsfile = open('collector_profile_stats.txt', 'wb')
649 p = pstats.Stats(profile_filename, stream=statsfile)
650 stats = p.strip_dirs().sort_stats('cumulative')
655 #--------------------------------------------------------------------------
656 # Normal operation - call through to the main function.
657 #--------------------------------------------------------------------------