3 Program which acts as the collector for the Vendor Event Listener REST API.
5 Only intended for test purposes.
10 ===================================================================
11 Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
12 ===================================================================
13 Licensed under the Apache License, Version 2.0 (the "License");
14 you may not use this file except in compliance with the License.
15 You may obtain a copy of the License at
17 http://www.apache.org/licenses/LICENSE-2.0
19 Unless required by applicable law or agreed to in writing, software
20 distributed under the License is distributed on an "AS IS" BASIS,
21 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22 See the License for the specific language governing permissions and
23 limitations under the License.
26 from rest_dispatcher import PathDispatcher, set_404_content
27 from wsgiref.simple_server import make_server
33 from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
35 import logging.handlers
36 from base64 import b64decode
40 from functools import partial
45 <title>Hello {name}</title>
48 <h1>Hello {name}!</h1>
52 _localtime_resp = '''\
55 <year>{t.tm_year}</year>
56 <month>{t.tm_mon}</month>
57 <day>{t.tm_mday}</day>
58 <hour>{t.tm_hour}</hour>
59 <minute>{t.tm_min}</minute>
60 <second>{t.tm_sec}</second>
65 __date__ = '2015-12-04'
66 __updated__ = '2015-12-04'
72 #------------------------------------------------------------------------------
73 # Credentials we expect clients to authenticate themselves with.
74 #------------------------------------------------------------------------------
78 #------------------------------------------------------------------------------
79 # The JSON schema which we will use to validate events.
80 #------------------------------------------------------------------------------
83 #------------------------------------------------------------------------------
84 # The JSON schema which we will use to validate client throttle state.
85 #------------------------------------------------------------------------------
86 throttle_schema = None
88 #------------------------------------------------------------------------------
89 # The JSON schema which we will use to provoke throttling commands for testing.
90 #------------------------------------------------------------------------------
91 test_control_schema = None
93 #------------------------------------------------------------------------------
94 # Pending command list from the testControl API
95 # This is sent as a response commandList to the next received event.
96 #------------------------------------------------------------------------------
97 pending_command_list = None
99 #------------------------------------------------------------------------------
100 # Logger for this module.
101 #------------------------------------------------------------------------------
104 def listener(environ, start_response, schema):
106 Handler for the Vendor Event Listener REST API.
108 Extract headers and the body and check that:
110 1) The client authenticated themselves correctly.
111 2) The body validates against the provided schema for the API.
114 logger.info('Got a Vendor Event request')
115 print('==== ' + time.asctime() + ' ' + '=' * 49)
117 #--------------------------------------------------------------------------
118 # Extract the content from the request.
119 #--------------------------------------------------------------------------
120 length = int(environ.get('CONTENT_LENGTH', '0'))
121 logger.debug('Content Length: {0}'.format(length))
122 body = environ['wsgi.input'].read(length)
123 logger.debug('Content Body: {0}'.format(body))
125 mode, b64_credentials = string.split(environ.get('HTTP_AUTHORIZATION',
127 # logger.debug('Auth. Mode: {0} Credentials: {1}'.format(mode,
129 logger.debug('Auth. Mode: {0} Credentials: ****'.format(mode))
130 if (b64_credentials != 'None'):
131 credentials = b64decode(b64_credentials)
135 logger.debug('Credentials: {0}'.format(credentials))
136 #logger.debug('Credentials: ****')
138 #--------------------------------------------------------------------------
139 # If we have a schema file then check that the event matches that expected.
140 #--------------------------------------------------------------------------
141 if (schema is not None):
142 logger.debug('Attempting to validate data: {0}\n'
143 'Against schema: {1}'.format(body, schema))
145 decoded_body = json.loads(body)
146 jsonschema.validate(decoded_body, schema)
147 logger.info('Event is valid!')
148 print('Valid body decoded & checked against schema OK:\n'
149 '{0}'.format(json.dumps(decoded_body,
152 separators=(',', ': '))))
154 except jsonschema.SchemaError as e:
155 logger.error('Schema is not valid! {0}'.format(e))
156 print('Schema is not valid! {0}'.format(e))
158 except jsonschema.ValidationError as e:
159 logger.warn('Event is not valid against schema! {0}'.format(e))
160 print('Event is not valid against schema! {0}'.format(e))
161 print('Bad JSON body decoded:\n'
162 '{0}'.format(json.dumps(decoded_body,
165 separators=(',', ': '))))
167 except Exception as e:
168 logger.error('Event invalid for unexpected reason! {0}'.format(e))
169 print('Schema is not valid for unexpected reason! {0}'.format(e))
171 logger.debug('No schema so just decode JSON: {0}'.format(body))
173 decoded_body = json.loads(body)
174 print('Valid JSON body (no schema checking) decoded:\n'
175 '{0}'.format(json.dumps(decoded_body,
178 separators=(',', ': '))))
179 logger.info('Event is valid JSON but not checked against schema!')
181 except Exception as e:
182 logger.error('Event invalid for unexpected reason! {0}'.format(e))
183 print('JSON body not valid for unexpected reason! {0}'.format(e))
185 #--------------------------------------------------------------------------
186 # See whether the user authenticated themselves correctly.
187 #--------------------------------------------------------------------------
188 if (credentials == (vel_username + ':' + vel_password)):
189 logger.debug('Authenticated OK')
190 print('Authenticated OK')
192 #----------------------------------------------------------------------
193 # Respond to the caller. If we have a pending commandList from the
194 # testControl API, send it in response.
195 #----------------------------------------------------------------------
196 global pending_command_list
197 if pending_command_list is not None:
198 start_response('202 Accepted',
199 [('Content-type', 'application/json')])
200 response = pending_command_list
201 pending_command_list = None
204 print('Sending pending commandList in the response:\n'
205 '{0}'.format(json.dumps(response,
208 separators=(',', ': '))))
210 yield json.dumps(response)
212 start_response('202 Accepted', [])
215 logger.warn('Failed to authenticate OK'+vel_username + ':' + vel_password)
216 print('Failed to authenticate OK'+vel_username + ':' + vel_password)
218 #----------------------------------------------------------------------
219 # Respond to the caller.
220 #----------------------------------------------------------------------
221 start_response('401 Unauthorized', [ ('Content-type',
222 'application/json')])
223 req_error = { 'requestError': {
225 'messageId': 'POL0001',
226 'text': 'Failed to authenticate'
230 yield json.dumps(req_error)
232 def test_listener(environ, start_response, schema):
234 Handler for the Test Collector Test Control API.
236 There is no authentication on this interface.
238 This simply stores a commandList which will be sent in response to the next
239 incoming event on the EVEL interface.
241 global pending_command_list
242 logger.info('Got a Test Control input')
243 print('============================')
244 print('==== TEST CONTROL INPUT ====')
246 #--------------------------------------------------------------------------
247 # GET allows us to get the current pending request.
248 #--------------------------------------------------------------------------
249 if environ.get('REQUEST_METHOD') == 'GET':
250 start_response('200 OK', [('Content-type', 'application/json')])
251 yield json.dumps(pending_command_list)
254 #--------------------------------------------------------------------------
255 # Extract the content from the request.
256 #--------------------------------------------------------------------------
257 length = int(environ.get('CONTENT_LENGTH', '0'))
258 logger.debug('TestControl Content Length: {0}'.format(length))
259 body = environ['wsgi.input'].read(length)
260 logger.debug('TestControl Content Body: {0}'.format(body))
262 #--------------------------------------------------------------------------
263 # If we have a schema file then check that the event matches that expected.
264 #--------------------------------------------------------------------------
265 if (schema is not None):
266 logger.debug('Attempting to validate data: {0}\n'
267 'Against schema: {1}'.format(body, schema))
269 decoded_body = json.loads(body)
270 jsonschema.validate(decoded_body, schema)
271 logger.info('TestControl is valid!')
272 print('TestControl:\n'
273 '{0}'.format(json.dumps(decoded_body,
276 separators=(',', ': '))))
278 except jsonschema.SchemaError as e:
279 logger.error('TestControl Schema is not valid: {0}'.format(e))
280 print('TestControl Schema is not valid: {0}'.format(e))
282 except jsonschema.ValidationError as e:
283 logger.warn('TestControl input not valid: {0}'.format(e))
284 print('TestControl input not valid: {0}'.format(e))
285 print('Bad JSON body decoded:\n'
286 '{0}'.format(json.dumps(decoded_body,
289 separators=(',', ': '))))
291 except Exception as e:
292 logger.error('TestControl input not valid: {0}'.format(e))
293 print('TestControl Schema not valid: {0}'.format(e))
295 logger.debug('Missing schema just decode JSON: {0}'.format(body))
297 decoded_body = json.loads(body)
298 print('Valid JSON body (no schema checking) decoded:\n'
299 '{0}'.format(json.dumps(decoded_body,
302 separators=(',', ': '))))
303 logger.info('TestControl input not checked against schema!')
305 except Exception as e:
306 logger.error('TestControl input not valid: {0}'.format(e))
307 print('TestControl input not valid: {0}'.format(e))
309 #--------------------------------------------------------------------------
310 # Respond to the caller. If we received otherField 'ThrottleRequest',
311 # generate the appropriate canned response.
312 #--------------------------------------------------------------------------
313 pending_command_list = decoded_body
314 print('===== TEST CONTROL END =====')
315 print('============================')
316 start_response('202 Accepted', [])
321 Main function for the collector start-up.
323 Called with command-line arguments:
325 * --section *<section>*
330 *<file>* specifies the path to the configuration file.
332 *<section>* specifies the section within that config file.
334 *verbose* generates more information in the log files.
336 The process listens for REST API invocations and checks them. Errors are
337 displayed to stdout and logged.
343 sys.argv.extend(argv)
345 program_name = os.path.basename(sys.argv[0])
346 program_version = 'v{0}'.format(__version__)
347 program_build_date = str(__updated__)
348 program_version_message = '%%(prog)s {0} ({1})'.format(program_version,
350 if (__import__('__main__').__doc__ is not None):
351 program_shortdesc = __import__('__main__').__doc__.split('\n')[1]
353 program_shortdesc = 'Running in test harness'
354 program_license = '''{0}
357 Copyright 2015 Metaswitch Networks Ltd. All rights reserved.
359 Distributed on an "AS IS" basis without warranties
360 or conditions of any kind, either express or implied.
363 '''.format(program_shortdesc, str(__date__))
366 #----------------------------------------------------------------------
367 # Setup argument parser so we can parse the command-line.
368 #----------------------------------------------------------------------
369 parser = ArgumentParser(description=program_license,
370 formatter_class=ArgumentDefaultsHelpFormatter)
371 parser.add_argument('-v', '--verbose',
374 help='set verbosity level')
375 parser.add_argument('-V', '--version',
377 version=program_version_message,
378 help='Display version information')
379 parser.add_argument('-a', '--api-version',
382 help='set API version')
383 parser.add_argument('-c', '--config',
385 default='/etc/opt/att/collector.conf',
386 help='Use this config file.',
388 parser.add_argument('-s', '--section',
392 help='section to use in the config file')
394 #----------------------------------------------------------------------
395 # Process arguments received.
396 #----------------------------------------------------------------------
397 args = parser.parse_args()
398 verbose = args.verbose
399 api_version = args.api_version
400 config_file = args.config
401 config_section = args.section
403 #----------------------------------------------------------------------
404 # Now read the config file, using command-line supplied values as
406 #----------------------------------------------------------------------
407 defaults = {'log_file': 'collector.log',
413 config = ConfigParser.SafeConfigParser(defaults)
414 config.read(config_file)
416 #----------------------------------------------------------------------
417 # extract the values we want.
418 #----------------------------------------------------------------------
419 log_file = config.get(config_section, 'log_file', vars=overrides)
420 vel_port = config.get(config_section, 'vel_port', vars=overrides)
421 vel_path = config.get(config_section, 'vel_path', vars=overrides)
422 vel_topic_name = config.get(config_section,
427 vel_username = config.get(config_section,
430 vel_password = config.get(config_section,
433 vel_schema_file = config.get(config_section,
436 base_schema_file = config.get(config_section,
439 throttle_schema_file = config.get(config_section,
440 'throttle_schema_file',
442 test_control_schema_file = config.get(config_section,
443 'test_control_schema_file',
446 #----------------------------------------------------------------------
447 # Finally we have enough info to start a proper flow trace.
448 #----------------------------------------------------------------------
450 print('Logfile: {0}'.format(log_file))
451 logger = logging.getLogger('collector')
453 print('Verbose mode on')
454 logger.setLevel(logging.DEBUG)
456 logger.setLevel(logging.INFO)
457 handler = logging.handlers.RotatingFileHandler(log_file,
460 if (platform.system() == 'Windows'):
461 date_format = '%Y-%m-%d %H:%M:%S'
463 date_format = '%Y-%m-%d %H:%M:%S.%f %z'
464 formatter = logging.Formatter('%(asctime)s %(name)s - '
465 '%(levelname)s - %(message)s',
467 handler.setFormatter(formatter)
468 logger.addHandler(handler)
469 logger.info('Started')
471 #----------------------------------------------------------------------
472 # Log the details of the configuration.
473 #----------------------------------------------------------------------
474 logger.debug('Log file = {0}'.format(log_file))
475 logger.debug('Event Listener Port = {0}'.format(vel_port))
476 logger.debug('Event Listener Path = {0}'.format(vel_path))
477 logger.debug('Event Listener Topic = {0}'.format(vel_topic_name))
478 logger.debug('Event Listener Username = {0}'.format(vel_username))
479 # logger.debug('Event Listener Password = {0}'.format(vel_password))
480 logger.debug('Event Listener JSON Schema File = {0}'.format(
482 logger.debug('Base JSON Schema File = {0}'.format(base_schema_file))
483 logger.debug('Throttle JSON Schema File = {0}'.format(
484 throttle_schema_file))
485 logger.debug('Test Control JSON Schema File = {0}'.format(
486 test_control_schema_file))
488 #----------------------------------------------------------------------
489 # Perform some basic error checking on the config.
490 #----------------------------------------------------------------------
491 if (int(vel_port) < 1024 or int(vel_port) > 65535):
492 logger.error('Invalid Vendor Event Listener port ({0}) '
493 'specified'.format(vel_port))
494 raise RuntimeError('Invalid Vendor Event Listener port ({0}) '
495 'specified'.format(vel_port))
497 if (len(vel_path) > 0 and vel_path[-1] != '/'):
498 logger.warning('Event Listener Path ({0}) should have terminating '
499 '"/"! Adding one on to configured string.'.format(
503 #----------------------------------------------------------------------
504 # Load up the vel_schema, if it exists.
505 #----------------------------------------------------------------------
506 if not os.path.exists(vel_schema_file):
507 logger.warning('Event Listener Schema File ({0}) not found. '
508 'No validation will be undertaken.'.format(
512 global throttle_schema
513 global test_control_schema
514 vel_schema = json.load(open(vel_schema_file, 'r'))
515 logger.debug('Loaded the JSON schema file')
517 #------------------------------------------------------------------
518 # Load up the throttle_schema, if it exists.
519 #------------------------------------------------------------------
520 if (os.path.exists(throttle_schema_file)):
521 logger.debug('Loading throttle schema')
522 throttle_fragment = json.load(open(throttle_schema_file, 'r'))
524 throttle_schema.update(vel_schema)
525 throttle_schema.update(throttle_fragment)
526 logger.debug('Loaded the throttle schema')
528 #------------------------------------------------------------------
529 # Load up the test control _schema, if it exists.
530 #------------------------------------------------------------------
531 if (os.path.exists(test_control_schema_file)):
532 logger.debug('Loading test control schema')
533 test_control_fragment = json.load(
534 open(test_control_schema_file, 'r'))
535 test_control_schema = {}
536 test_control_schema.update(vel_schema)
537 test_control_schema.update(test_control_fragment)
538 logger.debug('Loaded the test control schema')
540 #------------------------------------------------------------------
541 # Load up the base_schema, if it exists.
542 #------------------------------------------------------------------
543 if (os.path.exists(base_schema_file)):
544 logger.debug('Updating the schema with base definition')
545 base_schema = json.load(open(base_schema_file, 'r'))
546 vel_schema.update(base_schema)
547 logger.debug('Updated the JSON schema file')
549 #----------------------------------------------------------------------
550 # We are now ready to get started with processing. Start-up the various
551 # components of the system in order:
553 # 1) Create the dispatcher.
554 # 2) Register the functions for the URLs of interest.
555 # 3) Run the webserver.
556 #----------------------------------------------------------------------
557 root_url = '/{0}eventListener/v{1}{2}'.\
561 if len(vel_topic_name) > 0
563 throttle_url = '/{0}eventListener/v{1}/clientThrottlingState'.\
564 format(vel_path, api_version)
565 batch_url = '/{0}eventListener/v{1}/eventBatch'.\
566 format(vel_path, api_version)
567 set_404_content(root_url)
568 dispatcher = PathDispatcher()
569 vendor_event_listener = partial(listener, schema = vel_schema)
570 dispatcher.register('GET', root_url, vendor_event_listener)
571 dispatcher.register('POST', root_url, vendor_event_listener)
572 batch_event_listener = partial(listener, schema = vel_schema)
573 dispatcher.register('GET', batch_url, batch_event_listener)
574 dispatcher.register('POST', batch_url, batch_event_listener)
575 vendor_throttle_listener = partial(listener, schema = throttle_schema)
576 dispatcher.register('GET', throttle_url, vendor_throttle_listener)
577 dispatcher.register('POST', throttle_url, vendor_throttle_listener)
579 #----------------------------------------------------------------------
580 # We also add a POST-only mechanism for test control, so that we can
581 # send commands to a single attached client.
582 #----------------------------------------------------------------------
583 test_control_url = '/testControl/v{0}/commandList'.format(api_version)
584 test_control_listener = partial(test_listener,
585 schema = test_control_schema)
586 dispatcher.register('POST', test_control_url, test_control_listener)
587 dispatcher.register('GET', test_control_url, test_control_listener)
589 httpd = make_server('', int(vel_port), dispatcher)
590 print('Serving on port {0}...'.format(vel_port))
591 httpd.serve_forever()
593 logger.error('Main loop exited unexpectedly!')
596 except KeyboardInterrupt:
597 #----------------------------------------------------------------------
598 # handle keyboard interrupt
599 #----------------------------------------------------------------------
600 logger.info('Exiting on keyboard interrupt!')
603 except Exception as e:
604 #----------------------------------------------------------------------
605 # Handle unexpected exceptions.
606 #----------------------------------------------------------------------
609 indent = len(program_name) * ' '
610 sys.stderr.write(program_name + ': ' + repr(e) + '\n')
611 sys.stderr.write(indent + ' for help use --help\n')
612 sys.stderr.write(traceback.format_exc())
613 logger.critical('Exiting because of exception: {0}'.format(e))
614 logger.critical(traceback.format_exc())
617 #------------------------------------------------------------------------------
618 # MAIN SCRIPT ENTRY POINT.
619 #------------------------------------------------------------------------------
620 if __name__ == '__main__':
622 #----------------------------------------------------------------------
623 # Running tests - note that doctest comments haven't been included so
624 # this is a hook for future improvements.
625 #----------------------------------------------------------------------
630 #----------------------------------------------------------------------
631 # Profiling performance. Performance isn't expected to be a major
632 # issue, but this should all work as expected.
633 #----------------------------------------------------------------------
636 profile_filename = 'collector_profile.txt'
637 cProfile.run('main()', profile_filename)
638 statsfile = open('collector_profile_stats.txt', 'wb')
639 p = pstats.Stats(profile_filename, stream=statsfile)
640 stats = p.strip_dirs().sort_stats('cumulative')
645 #--------------------------------------------------------------------------
646 # Normal operation - call through to the main function.
647 #--------------------------------------------------------------------------