3 Program which acts as the collector for the Vendor Event Listener REST API.
5 Only intended for test purposes.
10 * ===================================================================
11 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
12 * ===================================================================
13 * Licensed under the Apache License, Version 2.0 (the "License");
14 * you may not use this file except in compliance with the License.
15 * You may obtain a copy of the License at
17 * http://www.apache.org/licenses/LICENSE-2.0
19 * Unless required by applicable law or agreed to in writing, software
20 * distributed under the License is distributed on an "AS IS" BASIS,
21 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22 * See the License for the specific language governing permissions and
23 * limitations under the License.
26 from rest_dispatcher import PathDispatcher, set_404_content
27 from wsgiref.simple_server import make_server
33 from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
35 import logging.handlers
36 from base64 import b64decode
40 from functools import partial
45 <title>Hello {name}</title>
48 <h1>Hello {name}!</h1>
52 _localtime_resp = '''\
55 <year>{t.tm_year}</year>
56 <month>{t.tm_mon}</month>
57 <day>{t.tm_mday}</day>
58 <hour>{t.tm_hour}</hour>
59 <minute>{t.tm_min}</minute>
60 <second>{t.tm_sec}</second>
65 __date__ = '2015-12-04'
66 __updated__ = '2015-12-04'
72 #------------------------------------------------------------------------------
73 # Credentials we expect clients to authenticate themselves with.
74 #------------------------------------------------------------------------------
78 #------------------------------------------------------------------------------
79 # The JSON schema which we will use to validate events.
80 #------------------------------------------------------------------------------
83 #------------------------------------------------------------------------------
84 # The JSON schema which we will use to validate client throttle state.
85 #------------------------------------------------------------------------------
86 throttle_schema = None
88 #------------------------------------------------------------------------------
89 # The JSON schema which we will use to provoke throttling commands for testing.
90 #------------------------------------------------------------------------------
91 test_control_schema = None
93 #------------------------------------------------------------------------------
94 # Pending command list from the testControl API
95 # This is sent as a response commandList to the next received event.
96 #------------------------------------------------------------------------------
97 pending_command_list = None
99 #------------------------------------------------------------------------------
100 # Logger for this module.
101 #------------------------------------------------------------------------------
104 def listener(environ, start_response, schema):
106 Handler for the Vendor Event Listener REST API.
108 Extract headers and the body and check that:
110 1) The client authenticated themselves correctly.
111 2) The body validates against the provided schema for the API.
114 logger.info('Got a Vendor Event request')
115 print('==== ' + time.asctime() + ' ' + '=' * 49)
117 #--------------------------------------------------------------------------
118 # Extract the content from the request.
119 #--------------------------------------------------------------------------
120 length = int(environ.get('CONTENT_LENGTH', '0'))
121 logger.debug('Content Length: {0}'.format(length))
122 body = environ['wsgi.input'].read(length)
123 logger.debug('Content Body: {0}'.format(body))
125 mode, b64_credentials = string.split(environ.get('HTTP_AUTHORIZATION',
127 # logger.debug('Auth. Mode: {0} Credentials: {1}'.format(mode,
129 logger.debug('Auth. Mode: {0} Credentials: ****'.format(mode))
130 if (b64_credentials != 'None'):
131 credentials = b64decode(b64_credentials)
135 logger.debug('Credentials: {0}'.format(credentials))
136 #logger.debug('Credentials: ****')
138 #--------------------------------------------------------------------------
139 # If we have a schema file then check that the event matches that expected.
140 #--------------------------------------------------------------------------
141 if (schema is not None):
142 logger.debug('Attempting to validate data: {0}\n'
143 'Against schema: {1}'.format(body, schema))
145 decoded_body = json.loads(body)
146 jsonschema.validate(decoded_body, schema)
147 logger.info('Event is valid!')
148 print('Valid body decoded & checked against schema OK:\n'
149 '{0}'.format(json.dumps(decoded_body,
152 separators=(',', ': '))))
154 except jsonschema.SchemaError as e:
155 logger.error('Schema is not valid! {0}'.format(e))
156 print('Schema is not valid! {0}'.format(e))
158 except jsonschema.ValidationError as e:
159 logger.warn('Event is not valid against schema! {0}'.format(e))
160 print('Event is not valid against schema! {0}'.format(e))
161 print('Bad JSON body decoded:\n'
162 '{0}'.format(json.dumps(decoded_body,
165 separators=(',', ': '))))
167 except Exception as e:
168 logger.error('Event invalid for unexpected reason! {0}'.format(e))
169 print('Schema is not valid for unexpected reason! {0}'.format(e))
171 logger.debug('No schema so just decode JSON: {0}'.format(body))
173 decoded_body = json.loads(body)
174 print('Valid JSON body (no schema checking) decoded:\n'
175 '{0}'.format(json.dumps(decoded_body,
178 separators=(',', ': '))))
179 logger.info('Event is valid JSON but not checked against schema!')
181 except Exception as e:
182 logger.error('Event invalid for unexpected reason! {0}'.format(e))
183 print('JSON body not valid for unexpected reason! {0}'.format(e))
185 #--------------------------------------------------------------------------
186 # See whether the user authenticated themselves correctly.
187 #--------------------------------------------------------------------------
188 if (credentials == (vel_username + ':' + vel_password)):
189 logger.debug('Authenticated OK')
190 print('Authenticated OK')
192 #----------------------------------------------------------------------
193 # Respond to the caller. If we have a pending commandList from the
194 # testControl API, send it in response.
195 #----------------------------------------------------------------------
196 global pending_command_list
197 if pending_command_list is not None:
198 start_response('202 Accepted',
199 [('Content-type', 'application/json')])
200 response = pending_command_list
201 pending_command_list = None
204 print('Sending pending commandList in the response:\n'
205 '{0}'.format(json.dumps(response,
208 separators=(',', ': '))))
210 yield json.dumps(response)
212 start_response('202 Accepted', [])
215 logger.warn('Failed to authenticate OK'+vel_username + ':' + vel_password)
216 print('Failed to authenticate OK'+vel_username + ':' + vel_password)
218 #----------------------------------------------------------------------
219 # Respond to the caller.
220 #----------------------------------------------------------------------
221 start_response('401 Unauthorized', [ ('Content-type',
222 'application/json')])
223 req_error = { 'requestError': {
225 'messageId': 'POL0001',
226 'text': 'Failed to authenticate'
230 yield json.dumps(req_error)
232 def test_listener(environ, start_response, schema):
234 Handler for the Test Collector Test Control API.
236 There is no authentication on this interface.
238 This simply stores a commandList which will be sent in response to the next
239 incoming event on the EVEL interface.
241 global pending_command_list
242 logger.info('Got a Test Control input')
243 print('============================')
244 print('==== TEST CONTROL INPUT ====')
246 #--------------------------------------------------------------------------
247 # GET allows us to get the current pending request.
248 #--------------------------------------------------------------------------
249 if environ.get('REQUEST_METHOD') == 'GET':
250 start_response('200 OK', [('Content-type', 'application/json')])
251 yield json.dumps(pending_command_list)
254 #--------------------------------------------------------------------------
255 # Extract the content from the request.
256 #--------------------------------------------------------------------------
257 length = int(environ.get('CONTENT_LENGTH', '0'))
258 logger.debug('TestControl Content Length: {0}'.format(length))
259 body = environ['wsgi.input'].read(length)
260 logger.debug('TestControl Content Body: {0}'.format(body))
262 #--------------------------------------------------------------------------
263 # If we have a schema file then check that the event matches that expected.
264 #--------------------------------------------------------------------------
265 if (schema is not None):
266 logger.debug('Attempting to validate data: {0}\n'
267 'Against schema: {1}'.format(body, schema))
269 decoded_body = json.loads(body)
270 jsonschema.validate(decoded_body, schema)
271 logger.info('TestControl is valid!')
272 print('TestControl:\n'
273 '{0}'.format(json.dumps(decoded_body,
276 separators=(',', ': '))))
278 except jsonschema.SchemaError as e:
279 logger.error('TestControl Schema is not valid: {0}'.format(e))
280 print('TestControl Schema is not valid: {0}'.format(e))
282 except jsonschema.ValidationError as e:
283 logger.warn('TestControl input not valid: {0}'.format(e))
284 print('TestControl input not valid: {0}'.format(e))
285 print('Bad JSON body decoded:\n'
286 '{0}'.format(json.dumps(decoded_body,
289 separators=(',', ': '))))
291 except Exception as e:
292 logger.error('TestControl input not valid: {0}'.format(e))
293 print('TestControl Schema not valid: {0}'.format(e))
295 logger.debug('Missing schema just decode JSON: {0}'.format(body))
297 decoded_body = json.loads(body)
298 print('Valid JSON body (no schema checking) decoded:\n'
299 '{0}'.format(json.dumps(decoded_body,
302 separators=(',', ': '))))
303 logger.info('TestControl input not checked against schema!')
305 except Exception as e:
306 logger.error('TestControl input not valid: {0}'.format(e))
307 print('TestControl input not valid: {0}'.format(e))
309 #--------------------------------------------------------------------------
310 # Respond to the caller. If we received otherField 'ThrottleRequest',
311 # generate the appropriate canned response.
312 #--------------------------------------------------------------------------
313 pending_command_list = decoded_body
314 print('===== TEST CONTROL END =====')
315 print('============================')
316 start_response('202 Accepted', [])
321 Main function for the collector start-up.
323 Called with command-line arguments:
325 * --section *<section>*
330 *<file>* specifies the path to the configuration file.
332 *<section>* specifies the section within that config file.
334 *verbose* generates more information in the log files.
336 The process listens for REST API invocations and checks them. Errors are
337 displayed to stdout and logged.
343 sys.argv.extend(argv)
345 program_name = os.path.basename(sys.argv[0])
346 program_version = 'v{0}'.format(__version__)
347 program_build_date = str(__updated__)
348 program_version_message = '%%(prog)s {0} ({1})'.format(program_version,
350 if (__import__('__main__').__doc__ is not None):
351 program_shortdesc = __import__('__main__').__doc__.split('\n')[1]
353 program_shortdesc = 'Running in test harness'
354 program_license = '''{0}
357 Copyright 2015 Metaswitch Networks Ltd. All rights reserved.
359 Distributed on an "AS IS" basis without warranties
360 or conditions of any kind, either express or implied.
363 '''.format(program_shortdesc, str(__date__))
366 #----------------------------------------------------------------------
367 # Setup argument parser so we can parse the command-line.
368 #----------------------------------------------------------------------
369 parser = ArgumentParser(description=program_license,
370 formatter_class=ArgumentDefaultsHelpFormatter)
371 parser.add_argument('-v', '--verbose',
374 help='set verbosity level')
375 parser.add_argument('-V', '--version',
377 version=program_version_message,
378 help='Display version information')
379 parser.add_argument('-a', '--api-version',
382 help='set API version')
383 parser.add_argument('-c', '--config',
385 default='/etc/opt/att/collector.conf',
386 help='Use this config file.',
388 parser.add_argument('-s', '--section',
392 help='section to use in the config file')
394 #----------------------------------------------------------------------
395 # Process arguments received.
396 #----------------------------------------------------------------------
397 args = parser.parse_args()
398 verbose = args.verbose
399 api_version = args.api_version
400 config_file = args.config
401 config_section = args.section
403 #----------------------------------------------------------------------
404 # Now read the config file, using command-line supplied values as
406 #----------------------------------------------------------------------
407 defaults = {'log_file': 'collector.log',
413 config = ConfigParser.SafeConfigParser(defaults)
414 config.read(config_file)
416 #----------------------------------------------------------------------
417 # extract the values we want.
418 #----------------------------------------------------------------------
419 log_file = config.get(config_section, 'log_file', vars=overrides)
420 vel_port = config.get(config_section, 'vel_port', vars=overrides)
421 vel_path = config.get(config_section, 'vel_path', vars=overrides)
422 vel_topic_name = config.get(config_section,
427 vel_username = config.get(config_section,
430 vel_password = config.get(config_section,
433 vel_schema_file = config.get(config_section,
436 base_schema_file = config.get(config_section,
439 throttle_schema_file = config.get(config_section,
440 'throttle_schema_file',
442 test_control_schema_file = config.get(config_section,
443 'test_control_schema_file',
446 #----------------------------------------------------------------------
447 # Finally we have enough info to start a proper flow trace.
448 #----------------------------------------------------------------------
450 print('Logfile: {0}'.format(log_file))
451 logger = logging.getLogger('collector')
453 print('Verbose mode on')
454 logger.setLevel(logging.DEBUG)
456 logger.setLevel(logging.INFO)
457 handler = logging.handlers.RotatingFileHandler(log_file,
460 if (platform.system() == 'Windows'):
461 date_format = '%Y-%m-%d %H:%M:%S'
463 date_format = '%Y-%m-%d %H:%M:%S.%f %z'
464 formatter = logging.Formatter('%(asctime)s %(name)s - '
465 '%(levelname)s - %(message)s',
467 handler.setFormatter(formatter)
468 logger.addHandler(handler)
469 logger.info('Started')
471 #----------------------------------------------------------------------
472 # Log the details of the configuration.
473 #----------------------------------------------------------------------
474 logger.debug('Log file = {0}'.format(log_file))
475 logger.debug('Event Listener Port = {0}'.format(vel_port))
476 logger.debug('Event Listener Path = {0}'.format(vel_path))
477 logger.debug('Event Listener Topic = {0}'.format(vel_topic_name))
478 logger.debug('Event Listener Username = {0}'.format(vel_username))
479 # logger.debug('Event Listener Password = {0}'.format(vel_password))
480 logger.debug('Event Listener JSON Schema File = {0}'.format(
482 logger.debug('Base JSON Schema File = {0}'.format(base_schema_file))
483 logger.debug('Throttle JSON Schema File = {0}'.format(
484 throttle_schema_file))
485 logger.debug('Test Control JSON Schema File = {0}'.format(
486 test_control_schema_file))
488 #----------------------------------------------------------------------
489 # Perform some basic error checking on the config.
490 #----------------------------------------------------------------------
491 if (int(vel_port) < 1024 or int(vel_port) > 65535):
492 logger.error('Invalid Vendor Event Listener port ({0}) '
493 'specified'.format(vel_port))
494 raise RuntimeError('Invalid Vendor Event Listener port ({0}) '
495 'specified'.format(vel_port))
497 if (len(vel_path) > 0 and vel_path[-1] != '/'):
498 logger.warning('Event Listener Path ({0}) should have terminating '
499 '"/"! Adding one on to configured string.'.format(
503 #----------------------------------------------------------------------
504 # Load up the vel_schema, if it exists.
505 #----------------------------------------------------------------------
506 if not os.path.exists(vel_schema_file):
507 logger.warning('Event Listener Schema File ({0}) not found. '
508 'No validation will be undertaken.'.format(
512 global throttle_schema
513 global test_control_schema
514 vel_schema = json.load(open(vel_schema_file, 'r'))
515 logger.debug('Loaded the JSON schema file')
517 #------------------------------------------------------------------
518 # Load up the throttle_schema, if it exists.
519 #------------------------------------------------------------------
520 if (os.path.exists(throttle_schema_file)):
521 logger.debug('Loading throttle schema')
522 throttle_fragment = json.load(open(throttle_schema_file, 'r'))
524 throttle_schema.update(vel_schema)
525 throttle_schema.update(throttle_fragment)
526 logger.debug('Loaded the throttle schema')
528 #------------------------------------------------------------------
529 # Load up the test control _schema, if it exists.
530 #------------------------------------------------------------------
531 if (os.path.exists(test_control_schema_file)):
532 logger.debug('Loading test control schema')
533 test_control_fragment = json.load(
534 open(test_control_schema_file, 'r'))
535 test_control_schema = {}
536 test_control_schema.update(vel_schema)
537 test_control_schema.update(test_control_fragment)
538 logger.debug('Loaded the test control schema')
540 #------------------------------------------------------------------
541 # Load up the base_schema, if it exists.
542 #------------------------------------------------------------------
543 if (os.path.exists(base_schema_file)):
544 logger.debug('Updating the schema with base definition')
545 base_schema = json.load(open(base_schema_file, 'r'))
546 vel_schema.update(base_schema)
547 logger.debug('Updated the JSON schema file')
549 #----------------------------------------------------------------------
550 # We are now ready to get started with processing. Start-up the various
551 # components of the system in order:
553 # 1) Create the dispatcher.
554 # 2) Register the functions for the URLs of interest.
555 # 3) Run the webserver.
556 #----------------------------------------------------------------------
557 root_url = '/{0}eventListener/v{1}{2}'.\
561 if len(vel_topic_name) > 0
563 throttle_url = '/{0}eventListener/v{1}/clientThrottlingState'.\
564 format(vel_path, api_version)
565 set_404_content(root_url)
566 dispatcher = PathDispatcher()
567 vendor_event_listener = partial(listener, schema = vel_schema)
568 dispatcher.register('GET', root_url, vendor_event_listener)
569 dispatcher.register('POST', root_url, vendor_event_listener)
570 vendor_throttle_listener = partial(listener, schema = throttle_schema)
571 dispatcher.register('GET', throttle_url, vendor_throttle_listener)
572 dispatcher.register('POST', throttle_url, vendor_throttle_listener)
574 #----------------------------------------------------------------------
575 # We also add a POST-only mechanism for test control, so that we can
576 # send commands to a single attached client.
577 #----------------------------------------------------------------------
578 test_control_url = '/testControl/v{0}/commandList'.format(api_version)
579 test_control_listener = partial(test_listener,
580 schema = test_control_schema)
581 dispatcher.register('POST', test_control_url, test_control_listener)
582 dispatcher.register('GET', test_control_url, test_control_listener)
584 httpd = make_server('', int(vel_port), dispatcher)
585 print('Serving on port {0}...'.format(vel_port))
586 httpd.serve_forever()
588 logger.error('Main loop exited unexpectedly!')
591 except KeyboardInterrupt:
592 #----------------------------------------------------------------------
593 # handle keyboard interrupt
594 #----------------------------------------------------------------------
595 logger.info('Exiting on keyboard interrupt!')
598 except Exception as e:
599 #----------------------------------------------------------------------
600 # Handle unexpected exceptions.
601 #----------------------------------------------------------------------
604 indent = len(program_name) * ' '
605 sys.stderr.write(program_name + ': ' + repr(e) + '\n')
606 sys.stderr.write(indent + ' for help use --help\n')
607 sys.stderr.write(traceback.format_exc())
608 logger.critical('Exiting because of exception: {0}'.format(e))
609 logger.critical(traceback.format_exc())
612 #------------------------------------------------------------------------------
613 # MAIN SCRIPT ENTRY POINT.
614 #------------------------------------------------------------------------------
615 if __name__ == '__main__':
617 #----------------------------------------------------------------------
618 # Running tests - note that doctest comments haven't been included so
619 # this is a hook for future improvements.
620 #----------------------------------------------------------------------
625 #----------------------------------------------------------------------
626 # Profiling performance. Performance isn't expected to be a major
627 # issue, but this should all work as expected.
628 #----------------------------------------------------------------------
631 profile_filename = 'collector_profile.txt'
632 cProfile.run('main()', profile_filename)
633 statsfile = open('collector_profile_stats.txt', 'wb')
634 p = pstats.Stats(profile_filename, stream=statsfile)
635 stats = p.strip_dirs().sort_stats('cumulative')
640 #--------------------------------------------------------------------------
641 # Normal operation - call through to the main function.
642 #--------------------------------------------------------------------------