3 Program which acts as the collector for the Vendor Event Listener REST API.
5 Only intended for test purposes.
10 Copyright(c) <2016>, AT&T Intellectual Property. All other rights reserved.
12 Redistribution and use in source and binary forms, with or without
13 modification, are permitted provided that the following conditions are met:
15 1. Redistributions of source code must retain the above copyright notice,
16 this list of conditions and the following disclaimer.
17 2. Redistributions in binary form must reproduce the above copyright notice,
18 this list of conditions and the following disclaimer in the documentation
19 and/or other materials provided with the distribution.
20 3. All advertising materials mentioning features or use of this software
21 must display the following acknowledgement: This product includes
22 software developed by the AT&T.
23 4. Neither the name of AT&T nor the names of its contributors may be used to
24 endorse or promote products derived from this software without specific
25 prior written permission.
27 THIS SOFTWARE IS PROVIDED BY AT&T INTELLECTUAL PROPERTY ''AS IS'' AND ANY
28 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
29 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
30 DISCLAIMED. IN NO EVENT SHALL AT&T INTELLECTUAL PROPERTY BE LIABLE FOR ANY
31 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
32 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
33 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
34 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
35 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
36 THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 from rest_dispatcher import PathDispatcher, set_404_content
40 from wsgiref.simple_server import make_server
46 from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
48 import logging.handlers
49 from base64 import b64decode
53 from functools import partial
58 <title>Hello {name}</title>
61 <h1>Hello {name}!</h1>
65 _localtime_resp = '''\
68 <year>{t.tm_year}</year>
69 <month>{t.tm_mon}</month>
70 <day>{t.tm_mday}</day>
71 <hour>{t.tm_hour}</hour>
72 <minute>{t.tm_min}</minute>
73 <second>{t.tm_sec}</second>
78 __date__ = '2015-12-04'
79 __updated__ = '2015-12-04'
85 #------------------------------------------------------------------------------
86 # Credentials we expect clients to authenticate themselves with.
87 #------------------------------------------------------------------------------
91 #------------------------------------------------------------------------------
92 # The JSON schema which we will use to validate events.
93 #------------------------------------------------------------------------------
96 #------------------------------------------------------------------------------
97 # The JSON schema which we will use to validate client throttle state.
98 #------------------------------------------------------------------------------
99 throttle_schema = None
101 #------------------------------------------------------------------------------
102 # The JSON schema which we will use to provoke throttling commands for testing.
103 #------------------------------------------------------------------------------
104 test_control_schema = None
106 #------------------------------------------------------------------------------
107 # Pending command list from the testControl API
108 # This is sent as a response commandList to the next received event.
109 #------------------------------------------------------------------------------
110 pending_command_list = None
112 #------------------------------------------------------------------------------
113 # Logger for this module.
114 #------------------------------------------------------------------------------
117 def listener(environ, start_response, schema):
119 Handler for the Vendor Event Listener REST API.
121 Extract headers and the body and check that:
123 1) The client authenticated themselves correctly.
124 2) The body validates against the provided schema for the API.
127 logger.info('Got a Vendor Event request')
128 print('==== ' + time.asctime() + ' ' + '=' * 49)
130 #--------------------------------------------------------------------------
131 # Extract the content from the request.
132 #--------------------------------------------------------------------------
133 length = int(environ.get('CONTENT_LENGTH', '0'))
134 logger.debug('Content Length: {0}'.format(length))
135 body = environ['wsgi.input'].read(length)
136 logger.debug('Content Body: {0}'.format(body))
138 mode, b64_credentials = string.split(environ.get('HTTP_AUTHORIZATION',
140 # logger.debug('Auth. Mode: {0} Credentials: {1}'.format(mode,
142 logger.debug('Auth. Mode: {0} Credentials: ****'.format(mode))
143 if (b64_credentials != 'None'):
144 credentials = b64decode(b64_credentials)
148 logger.debug('Credentials: {0}'.format(credentials))
149 #logger.debug('Credentials: ****')
151 #--------------------------------------------------------------------------
152 # If we have a schema file then check that the event matches that expected.
153 #--------------------------------------------------------------------------
154 if (schema is not None):
155 logger.debug('Attempting to validate data: {0}\n'
156 'Against schema: {1}'.format(body, schema))
158 decoded_body = json.loads(body)
159 jsonschema.validate(decoded_body, schema)
160 logger.info('Event is valid!')
161 print('Valid body decoded & checked against schema OK:\n'
162 '{0}'.format(json.dumps(decoded_body,
165 separators=(',', ': '))))
167 except jsonschema.SchemaError as e:
168 logger.error('Schema is not valid! {0}'.format(e))
169 print('Schema is not valid! {0}'.format(e))
171 except jsonschema.ValidationError as e:
172 logger.warn('Event is not valid against schema! {0}'.format(e))
173 print('Event is not valid against schema! {0}'.format(e))
174 print('Bad JSON body decoded:\n'
175 '{0}'.format(json.dumps(decoded_body,
178 separators=(',', ': '))))
180 except Exception as e:
181 logger.error('Event invalid for unexpected reason! {0}'.format(e))
182 print('Schema is not valid for unexpected reason! {0}'.format(e))
184 logger.debug('No schema so just decode JSON: {0}'.format(body))
186 decoded_body = json.loads(body)
187 print('Valid JSON body (no schema checking) decoded:\n'
188 '{0}'.format(json.dumps(decoded_body,
191 separators=(',', ': '))))
192 logger.info('Event is valid JSON but not checked against schema!')
194 except Exception as e:
195 logger.error('Event invalid for unexpected reason! {0}'.format(e))
196 print('JSON body not valid for unexpected reason! {0}'.format(e))
198 #--------------------------------------------------------------------------
199 # See whether the user authenticated themselves correctly.
200 #--------------------------------------------------------------------------
201 if (credentials == (vel_username + ':' + vel_password)):
202 logger.debug('Authenticated OK')
203 print('Authenticated OK')
205 #----------------------------------------------------------------------
206 # Respond to the caller. If we have a pending commandList from the
207 # testControl API, send it in response.
208 #----------------------------------------------------------------------
209 global pending_command_list
210 if pending_command_list is not None:
211 start_response('202 Accepted',
212 [('Content-type', 'application/json')])
213 response = pending_command_list
214 pending_command_list = None
217 print('Sending pending commandList in the response:\n'
218 '{0}'.format(json.dumps(response,
221 separators=(',', ': '))))
223 yield json.dumps(response)
225 start_response('202 Accepted', [])
228 logger.warn('Failed to authenticate OK'+vel_username + ':' + vel_password)
229 print('Failed to authenticate OK'+vel_username + ':' + vel_password)
231 #----------------------------------------------------------------------
232 # Respond to the caller.
233 #----------------------------------------------------------------------
234 start_response('401 Unauthorized', [ ('Content-type',
235 'application/json')])
236 req_error = { 'requestError': {
238 'messageId': 'POL0001',
239 'text': 'Failed to authenticate'
243 yield json.dumps(req_error)
245 def test_listener(environ, start_response, schema):
247 Handler for the Test Collector Test Control API.
249 There is no authentication on this interface.
251 This simply stores a commandList which will be sent in response to the next
252 incoming event on the EVEL interface.
254 global pending_command_list
255 logger.info('Got a Test Control input')
256 print('============================')
257 print('==== TEST CONTROL INPUT ====')
259 #--------------------------------------------------------------------------
260 # GET allows us to get the current pending request.
261 #--------------------------------------------------------------------------
262 if environ.get('REQUEST_METHOD') == 'GET':
263 start_response('200 OK', [('Content-type', 'application/json')])
264 yield json.dumps(pending_command_list)
267 #--------------------------------------------------------------------------
268 # Extract the content from the request.
269 #--------------------------------------------------------------------------
270 length = int(environ.get('CONTENT_LENGTH', '0'))
271 logger.debug('TestControl Content Length: {0}'.format(length))
272 body = environ['wsgi.input'].read(length)
273 logger.debug('TestControl Content Body: {0}'.format(body))
275 #--------------------------------------------------------------------------
276 # If we have a schema file then check that the event matches that expected.
277 #--------------------------------------------------------------------------
278 if (schema is not None):
279 logger.debug('Attempting to validate data: {0}\n'
280 'Against schema: {1}'.format(body, schema))
282 decoded_body = json.loads(body)
283 jsonschema.validate(decoded_body, schema)
284 logger.info('TestControl is valid!')
285 print('TestControl:\n'
286 '{0}'.format(json.dumps(decoded_body,
289 separators=(',', ': '))))
291 except jsonschema.SchemaError as e:
292 logger.error('TestControl Schema is not valid: {0}'.format(e))
293 print('TestControl Schema is not valid: {0}'.format(e))
295 except jsonschema.ValidationError as e:
296 logger.warn('TestControl input not valid: {0}'.format(e))
297 print('TestControl input not valid: {0}'.format(e))
298 print('Bad JSON body decoded:\n'
299 '{0}'.format(json.dumps(decoded_body,
302 separators=(',', ': '))))
304 except Exception as e:
305 logger.error('TestControl input not valid: {0}'.format(e))
306 print('TestControl Schema not valid: {0}'.format(e))
308 logger.debug('Missing schema just decode JSON: {0}'.format(body))
310 decoded_body = json.loads(body)
311 print('Valid JSON body (no schema checking) decoded:\n'
312 '{0}'.format(json.dumps(decoded_body,
315 separators=(',', ': '))))
316 logger.info('TestControl input not checked against schema!')
318 except Exception as e:
319 logger.error('TestControl input not valid: {0}'.format(e))
320 print('TestControl input not valid: {0}'.format(e))
322 #--------------------------------------------------------------------------
323 # Respond to the caller. If we received otherField 'ThrottleRequest',
324 # generate the appropriate canned response.
325 #--------------------------------------------------------------------------
326 pending_command_list = decoded_body
327 print('===== TEST CONTROL END =====')
328 print('============================')
329 start_response('202 Accepted', [])
334 Main function for the collector start-up.
336 Called with command-line arguments:
338 * --section *<section>*
343 *<file>* specifies the path to the configuration file.
345 *<section>* specifies the section within that config file.
347 *verbose* generates more information in the log files.
349 The process listens for REST API invocations and checks them. Errors are
350 displayed to stdout and logged.
356 sys.argv.extend(argv)
358 program_name = os.path.basename(sys.argv[0])
359 program_version = 'v{0}'.format(__version__)
360 program_build_date = str(__updated__)
361 program_version_message = '%%(prog)s {0} ({1})'.format(program_version,
363 if (__import__('__main__').__doc__ is not None):
364 program_shortdesc = __import__('__main__').__doc__.split('\n')[1]
366 program_shortdesc = 'Running in test harness'
367 program_license = '''{0}
370 Copyright 2015 Metaswitch Networks Ltd. All rights reserved.
372 Distributed on an "AS IS" basis without warranties
373 or conditions of any kind, either express or implied.
376 '''.format(program_shortdesc, str(__date__))
379 #----------------------------------------------------------------------
380 # Setup argument parser so we can parse the command-line.
381 #----------------------------------------------------------------------
382 parser = ArgumentParser(description=program_license,
383 formatter_class=ArgumentDefaultsHelpFormatter)
384 parser.add_argument('-v', '--verbose',
387 help='set verbosity level')
388 parser.add_argument('-V', '--version',
390 version=program_version_message,
391 help='Display version information')
392 parser.add_argument('-a', '--api-version',
395 help='set API version')
396 parser.add_argument('-c', '--config',
398 default='/etc/opt/att/collector.conf',
399 help='Use this config file.',
401 parser.add_argument('-s', '--section',
405 help='section to use in the config file')
407 #----------------------------------------------------------------------
408 # Process arguments received.
409 #----------------------------------------------------------------------
410 args = parser.parse_args()
411 verbose = args.verbose
412 api_version = args.api_version
413 config_file = args.config
414 config_section = args.section
416 #----------------------------------------------------------------------
417 # Now read the config file, using command-line supplied values as
419 #----------------------------------------------------------------------
420 defaults = {'log_file': 'collector.log',
426 config = ConfigParser.SafeConfigParser(defaults)
427 config.read(config_file)
429 #----------------------------------------------------------------------
430 # extract the values we want.
431 #----------------------------------------------------------------------
432 log_file = config.get(config_section, 'log_file', vars=overrides)
433 vel_port = config.get(config_section, 'vel_port', vars=overrides)
434 vel_path = config.get(config_section, 'vel_path', vars=overrides)
435 vel_topic_name = config.get(config_section,
440 vel_username = config.get(config_section,
443 vel_password = config.get(config_section,
446 vel_schema_file = config.get(config_section,
449 base_schema_file = config.get(config_section,
452 throttle_schema_file = config.get(config_section,
453 'throttle_schema_file',
455 test_control_schema_file = config.get(config_section,
456 'test_control_schema_file',
459 #----------------------------------------------------------------------
460 # Finally we have enough info to start a proper flow trace.
461 #----------------------------------------------------------------------
463 print('Logfile: {0}'.format(log_file))
464 logger = logging.getLogger('collector')
466 print('Verbose mode on')
467 logger.setLevel(logging.DEBUG)
469 logger.setLevel(logging.INFO)
470 handler = logging.handlers.RotatingFileHandler(log_file,
473 if (platform.system() == 'Windows'):
474 date_format = '%Y-%m-%d %H:%M:%S'
476 date_format = '%Y-%m-%d %H:%M:%S.%f %z'
477 formatter = logging.Formatter('%(asctime)s %(name)s - '
478 '%(levelname)s - %(message)s',
480 handler.setFormatter(formatter)
481 logger.addHandler(handler)
482 logger.info('Started')
484 #----------------------------------------------------------------------
485 # Log the details of the configuration.
486 #----------------------------------------------------------------------
487 logger.debug('Log file = {0}'.format(log_file))
488 logger.debug('Event Listener Port = {0}'.format(vel_port))
489 logger.debug('Event Listener Path = {0}'.format(vel_path))
490 logger.debug('Event Listener Topic = {0}'.format(vel_topic_name))
491 logger.debug('Event Listener Username = {0}'.format(vel_username))
492 # logger.debug('Event Listener Password = {0}'.format(vel_password))
493 logger.debug('Event Listener JSON Schema File = {0}'.format(
495 logger.debug('Base JSON Schema File = {0}'.format(base_schema_file))
496 logger.debug('Throttle JSON Schema File = {0}'.format(
497 throttle_schema_file))
498 logger.debug('Test Control JSON Schema File = {0}'.format(
499 test_control_schema_file))
501 #----------------------------------------------------------------------
502 # Perform some basic error checking on the config.
503 #----------------------------------------------------------------------
504 if (int(vel_port) < 1024 or int(vel_port) > 65535):
505 logger.error('Invalid Vendor Event Listener port ({0}) '
506 'specified'.format(vel_port))
507 raise RuntimeError('Invalid Vendor Event Listener port ({0}) '
508 'specified'.format(vel_port))
510 if (len(vel_path) > 0 and vel_path[-1] != '/'):
511 logger.warning('Event Listener Path ({0}) should have terminating '
512 '"/"! Adding one on to configured string.'.format(
516 #----------------------------------------------------------------------
517 # Load up the vel_schema, if it exists.
518 #----------------------------------------------------------------------
519 if not os.path.exists(vel_schema_file):
520 logger.warning('Event Listener Schema File ({0}) not found. '
521 'No validation will be undertaken.'.format(
525 global throttle_schema
526 global test_control_schema
527 vel_schema = json.load(open(vel_schema_file, 'r'))
528 logger.debug('Loaded the JSON schema file')
530 #------------------------------------------------------------------
531 # Load up the throttle_schema, if it exists.
532 #------------------------------------------------------------------
533 if (os.path.exists(throttle_schema_file)):
534 logger.debug('Loading throttle schema')
535 throttle_fragment = json.load(open(throttle_schema_file, 'r'))
537 throttle_schema.update(vel_schema)
538 throttle_schema.update(throttle_fragment)
539 logger.debug('Loaded the throttle schema')
541 #------------------------------------------------------------------
542 # Load up the test control _schema, if it exists.
543 #------------------------------------------------------------------
544 if (os.path.exists(test_control_schema_file)):
545 logger.debug('Loading test control schema')
546 test_control_fragment = json.load(
547 open(test_control_schema_file, 'r'))
548 test_control_schema = {}
549 test_control_schema.update(vel_schema)
550 test_control_schema.update(test_control_fragment)
551 logger.debug('Loaded the test control schema')
553 #------------------------------------------------------------------
554 # Load up the base_schema, if it exists.
555 #------------------------------------------------------------------
556 if (os.path.exists(base_schema_file)):
557 logger.debug('Updating the schema with base definition')
558 base_schema = json.load(open(base_schema_file, 'r'))
559 vel_schema.update(base_schema)
560 logger.debug('Updated the JSON schema file')
562 #----------------------------------------------------------------------
563 # We are now ready to get started with processing. Start-up the various
564 # components of the system in order:
566 # 1) Create the dispatcher.
567 # 2) Register the functions for the URLs of interest.
568 # 3) Run the webserver.
569 #----------------------------------------------------------------------
570 root_url = '/{0}eventListener/v{1}{2}'.\
574 if len(vel_topic_name) > 0
576 throttle_url = '/{0}eventListener/v{1}/clientThrottlingState'.\
577 format(vel_path, api_version)
578 set_404_content(root_url)
579 dispatcher = PathDispatcher()
580 vendor_event_listener = partial(listener, schema = vel_schema)
581 dispatcher.register('GET', root_url, vendor_event_listener)
582 dispatcher.register('POST', root_url, vendor_event_listener)
583 vendor_throttle_listener = partial(listener, schema = throttle_schema)
584 dispatcher.register('GET', throttle_url, vendor_throttle_listener)
585 dispatcher.register('POST', throttle_url, vendor_throttle_listener)
587 #----------------------------------------------------------------------
588 # We also add a POST-only mechanism for test control, so that we can
589 # send commands to a single attached client.
590 #----------------------------------------------------------------------
591 test_control_url = '/testControl/v{0}/commandList'.format(api_version)
592 test_control_listener = partial(test_listener,
593 schema = test_control_schema)
594 dispatcher.register('POST', test_control_url, test_control_listener)
595 dispatcher.register('GET', test_control_url, test_control_listener)
597 httpd = make_server('', int(vel_port), dispatcher)
598 print('Serving on port {0}...'.format(vel_port))
599 httpd.serve_forever()
601 logger.error('Main loop exited unexpectedly!')
604 except KeyboardInterrupt:
605 #----------------------------------------------------------------------
606 # handle keyboard interrupt
607 #----------------------------------------------------------------------
608 logger.info('Exiting on keyboard interrupt!')
611 except Exception as e:
612 #----------------------------------------------------------------------
613 # Handle unexpected exceptions.
614 #----------------------------------------------------------------------
617 indent = len(program_name) * ' '
618 sys.stderr.write(program_name + ': ' + repr(e) + '\n')
619 sys.stderr.write(indent + ' for help use --help\n')
620 sys.stderr.write(traceback.format_exc())
621 logger.critical('Exiting because of exception: {0}'.format(e))
622 logger.critical(traceback.format_exc())
625 #------------------------------------------------------------------------------
626 # MAIN SCRIPT ENTRY POINT.
627 #------------------------------------------------------------------------------
628 if __name__ == '__main__':
630 #----------------------------------------------------------------------
631 # Running tests - note that doctest comments haven't been included so
632 # this is a hook for future improvements.
633 #----------------------------------------------------------------------
638 #----------------------------------------------------------------------
639 # Profiling performance. Performance isn't expected to be a major
640 # issue, but this should all work as expected.
641 #----------------------------------------------------------------------
644 profile_filename = 'collector_profile.txt'
645 cProfile.run('main()', profile_filename)
646 statsfile = open('collector_profile_stats.txt', 'wb')
647 p = pstats.Stats(profile_filename, stream=statsfile)
648 stats = p.strip_dirs().sort_stats('cumulative')
653 #--------------------------------------------------------------------------
654 # Normal operation - call through to the main function.
655 #--------------------------------------------------------------------------