Merge "Added licenses to test collector files"
[demo.git] / vnfs / VES5.0 / evel / evel-test-collector / code / collector / collector.py
1 #!/usr/bin/env python
2 '''
3 Program which acts as the collector for the Vendor Event Listener REST API.
4
5 Only intended for test purposes.
6
7 License
8 -------
9
10  * ===================================================================
11  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
12  * ===================================================================
13  * Licensed under the Apache License, Version 2.0 (the "License");
14  * you may not use this file except in compliance with the License.
15  * You may obtain a copy of the License at
16  *
17  *        http://www.apache.org/licenses/LICENSE-2.0
18  *
19  * Unless required by applicable law or agreed to in writing, software
20  * distributed under the License is distributed on an "AS IS" BASIS,
21  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22  * See the License for the specific language governing permissions and
23  * limitations under the License.
24 '''
25
26 from rest_dispatcher import PathDispatcher, set_404_content
27 from wsgiref.simple_server import make_server
28 import sys
29 import os
30 import platform
31 import traceback
32 import time
33 from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
34 import ConfigParser
35 import logging.handlers
36 from base64 import b64decode
37 import string
38 import json
39 import jsonschema
40 from functools import partial
41
42 _hello_resp = '''\
43 <html>
44   <head>
45      <title>Hello {name}</title>
46    </head>
47    <body>
48      <h1>Hello {name}!</h1>
49    </body>
50 </html>'''
51
52 _localtime_resp = '''\
53 <?xml version="1.0"?>
54 <time>
55   <year>{t.tm_year}</year>
56   <month>{t.tm_mon}</month>
57   <day>{t.tm_mday}</day>
58   <hour>{t.tm_hour}</hour>
59   <minute>{t.tm_min}</minute>
60   <second>{t.tm_sec}</second>
61 </time>'''
62
63 __all__ = []
64 __version__ = 0.1
65 __date__ = '2015-12-04'
66 __updated__ = '2015-12-04'
67
68 TESTRUN = False
69 DEBUG = False
70 PROFILE = False
71
72 #------------------------------------------------------------------------------
73 # Credentials we expect clients to authenticate themselves with.
74 #------------------------------------------------------------------------------
75 vel_username = ''
76 vel_password = ''
77
78 #------------------------------------------------------------------------------
79 # The JSON schema which we will use to validate events.
80 #------------------------------------------------------------------------------
81 vel_schema = None
82
83 #------------------------------------------------------------------------------
84 # The JSON schema which we will use to validate client throttle state.
85 #------------------------------------------------------------------------------
86 throttle_schema = None
87
88 #------------------------------------------------------------------------------
89 # The JSON schema which we will use to provoke throttling commands for testing.
90 #------------------------------------------------------------------------------
91 test_control_schema = None
92
93 #------------------------------------------------------------------------------
94 # Pending command list from the testControl API
95 # This is sent as a response commandList to the next received event.
96 #------------------------------------------------------------------------------
97 pending_command_list = None
98
99 #------------------------------------------------------------------------------
100 # Logger for this module.
101 #------------------------------------------------------------------------------
102 logger = None
103
104 def listener(environ, start_response, schema):
105     '''
106     Handler for the Vendor Event Listener REST API.
107
108     Extract headers and the body and check that:
109
110       1)  The client authenticated themselves correctly.
111       2)  The body validates against the provided schema for the API.
112
113     '''
114     logger.info('Got a Vendor Event request')
115     print('==== ' + time.asctime() + ' ' + '=' * 49)
116
117     #--------------------------------------------------------------------------
118     # Extract the content from the request.
119     #--------------------------------------------------------------------------
120     length = int(environ.get('CONTENT_LENGTH', '0'))
121     logger.debug('Content Length: {0}'.format(length))
122     body = environ['wsgi.input'].read(length)
123     logger.debug('Content Body: {0}'.format(body))
124
125     mode, b64_credentials = string.split(environ.get('HTTP_AUTHORIZATION',
126                                                      'None None'))
127     # logger.debug('Auth. Mode: {0} Credentials: {1}'.format(mode,
128     #                                                     b64_credentials))
129     logger.debug('Auth. Mode: {0} Credentials: ****'.format(mode))
130     if (b64_credentials != 'None'):
131         credentials = b64decode(b64_credentials)
132     else:
133         credentials = None
134
135     logger.debug('Credentials: {0}'.format(credentials))
136     #logger.debug('Credentials: ****')
137
138     #--------------------------------------------------------------------------
139     # If we have a schema file then check that the event matches that expected.
140     #--------------------------------------------------------------------------
141     if (schema is not None):
142         logger.debug('Attempting to validate data: {0}\n'
143                      'Against schema: {1}'.format(body, schema))
144         try:
145             decoded_body = json.loads(body)
146             jsonschema.validate(decoded_body, schema)
147             logger.info('Event is valid!')
148             print('Valid body decoded & checked against schema OK:\n'
149                   '{0}'.format(json.dumps(decoded_body,
150                                           sort_keys=True,
151                                           indent=4,
152                                           separators=(',', ': '))))
153
154         except jsonschema.SchemaError as e:
155             logger.error('Schema is not valid! {0}'.format(e))
156             print('Schema is not valid! {0}'.format(e))
157
158         except jsonschema.ValidationError as e:
159             logger.warn('Event is not valid against schema! {0}'.format(e))
160             print('Event is not valid against schema! {0}'.format(e))
161             print('Bad JSON body decoded:\n'
162                   '{0}'.format(json.dumps(decoded_body,
163                                          sort_keys=True,
164                                          indent=4,
165                                          separators=(',', ': '))))
166
167         except Exception as e:
168             logger.error('Event invalid for unexpected reason! {0}'.format(e))
169             print('Schema is not valid for unexpected reason! {0}'.format(e))
170     else:
171         logger.debug('No schema so just decode JSON: {0}'.format(body))
172         try:
173             decoded_body = json.loads(body)
174             print('Valid JSON body (no schema checking) decoded:\n'
175                   '{0}'.format(json.dumps(decoded_body,
176                                          sort_keys=True,
177                                          indent=4,
178                                          separators=(',', ': '))))
179             logger.info('Event is valid JSON but not checked against schema!')
180
181         except Exception as e:
182             logger.error('Event invalid for unexpected reason! {0}'.format(e))
183             print('JSON body not valid for unexpected reason! {0}'.format(e))
184
185     #--------------------------------------------------------------------------
186     # See whether the user authenticated themselves correctly.
187     #--------------------------------------------------------------------------
188     if (credentials == (vel_username + ':' + vel_password)):
189         logger.debug('Authenticated OK')
190         print('Authenticated OK')
191
192         #----------------------------------------------------------------------
193         # Respond to the caller. If we have a pending commandList from the
194         # testControl API, send it in response.
195         #----------------------------------------------------------------------
196         global pending_command_list
197         if pending_command_list is not None:
198             start_response('202 Accepted',
199                            [('Content-type', 'application/json')])
200             response = pending_command_list
201             pending_command_list = None
202
203             print('\n'+ '='*80)
204             print('Sending pending commandList in the response:\n'
205                   '{0}'.format(json.dumps(response,
206                                           sort_keys=True,
207                                           indent=4,
208                                           separators=(',', ': '))))
209             print('='*80 + '\n')
210             yield json.dumps(response)
211         else:
212             start_response('202 Accepted', [])
213             yield ''
214     else:
215         logger.warn('Failed to authenticate OK'+vel_username + ':' + vel_password)
216         print('Failed to authenticate OK'+vel_username + ':' + vel_password)
217
218         #----------------------------------------------------------------------
219         # Respond to the caller.
220         #----------------------------------------------------------------------
221         start_response('401 Unauthorized', [ ('Content-type',
222                                               'application/json')])
223         req_error = { 'requestError': {
224                         'policyException': {
225                             'messageId': 'POL0001',
226                             'text': 'Failed to authenticate'
227                             }
228                         }
229                     }
230         yield json.dumps(req_error)
231
232 def test_listener(environ, start_response, schema):
233     '''
234     Handler for the Test Collector Test Control API.
235
236     There is no authentication on this interface.
237
238     This simply stores a commandList which will be sent in response to the next
239     incoming event on the EVEL interface.
240     '''
241     global pending_command_list
242     logger.info('Got a Test Control input')
243     print('============================')
244     print('==== TEST CONTROL INPUT ====')
245
246     #--------------------------------------------------------------------------
247     # GET allows us to get the current pending request.
248     #--------------------------------------------------------------------------
249     if environ.get('REQUEST_METHOD') == 'GET':
250         start_response('200 OK', [('Content-type', 'application/json')])
251         yield json.dumps(pending_command_list)
252         return
253
254     #--------------------------------------------------------------------------
255     # Extract the content from the request.
256     #--------------------------------------------------------------------------
257     length = int(environ.get('CONTENT_LENGTH', '0'))
258     logger.debug('TestControl Content Length: {0}'.format(length))
259     body = environ['wsgi.input'].read(length)
260     logger.debug('TestControl Content Body: {0}'.format(body))
261
262     #--------------------------------------------------------------------------
263     # If we have a schema file then check that the event matches that expected.
264     #--------------------------------------------------------------------------
265     if (schema is not None):
266         logger.debug('Attempting to validate data: {0}\n'
267                      'Against schema: {1}'.format(body, schema))
268         try:
269             decoded_body = json.loads(body)
270             jsonschema.validate(decoded_body, schema)
271             logger.info('TestControl is valid!')
272             print('TestControl:\n'
273                   '{0}'.format(json.dumps(decoded_body,
274                                           sort_keys=True,
275                                           indent=4,
276                                           separators=(',', ': '))))
277
278         except jsonschema.SchemaError as e:
279             logger.error('TestControl Schema is not valid: {0}'.format(e))
280             print('TestControl Schema is not valid: {0}'.format(e))
281
282         except jsonschema.ValidationError as e:
283             logger.warn('TestControl input not valid: {0}'.format(e))
284             print('TestControl input not valid: {0}'.format(e))
285             print('Bad JSON body decoded:\n'
286                   '{0}'.format(json.dumps(decoded_body,
287                                           sort_keys=True,
288                                           indent=4,
289                                           separators=(',', ': '))))
290
291         except Exception as e:
292             logger.error('TestControl input not valid: {0}'.format(e))
293             print('TestControl Schema not valid: {0}'.format(e))
294     else:
295         logger.debug('Missing schema just decode JSON: {0}'.format(body))
296         try:
297             decoded_body = json.loads(body)
298             print('Valid JSON body (no schema checking) decoded:\n'
299                   '{0}'.format(json.dumps(decoded_body,
300                                           sort_keys=True,
301                                           indent=4,
302                                           separators=(',', ': '))))
303             logger.info('TestControl input not checked against schema!')
304
305         except Exception as e:
306             logger.error('TestControl input not valid: {0}'.format(e))
307             print('TestControl input not valid: {0}'.format(e))
308
309     #--------------------------------------------------------------------------
310     # Respond to the caller. If we received otherField 'ThrottleRequest',
311     # generate the appropriate canned response.
312     #--------------------------------------------------------------------------
313     pending_command_list = decoded_body
314     print('===== TEST CONTROL END =====')
315     print('============================')
316     start_response('202 Accepted', [])
317     yield ''
318
319 def main(argv=None):
320     '''
321     Main function for the collector start-up.
322
323     Called with command-line arguments:
324         *    --config *<file>*
325         *    --section *<section>*
326         *    --verbose
327
328     Where:
329
330         *<file>* specifies the path to the configuration file.
331
332         *<section>* specifies the section within that config file.
333
334         *verbose* generates more information in the log files.
335
336     The process listens for REST API invocations and checks them. Errors are
337     displayed to stdout and logged.
338     '''
339
340     if argv is None:
341         argv = sys.argv
342     else:
343         sys.argv.extend(argv)
344
345     program_name = os.path.basename(sys.argv[0])
346     program_version = 'v{0}'.format(__version__)
347     program_build_date = str(__updated__)
348     program_version_message = '%%(prog)s {0} ({1})'.format(program_version,
349                                                          program_build_date)
350     if (__import__('__main__').__doc__ is not None):
351         program_shortdesc = __import__('__main__').__doc__.split('\n')[1]
352     else:
353         program_shortdesc = 'Running in test harness'
354     program_license = '''{0}
355
356   Created  on {1}.
357   Copyright 2015 Metaswitch Networks Ltd. All rights reserved.
358
359   Distributed on an "AS IS" basis without warranties
360   or conditions of any kind, either express or implied.
361
362 USAGE
363 '''.format(program_shortdesc, str(__date__))
364
365     try:
366         #----------------------------------------------------------------------
367         # Setup argument parser so we can parse the command-line.
368         #----------------------------------------------------------------------
369         parser = ArgumentParser(description=program_license,
370                                 formatter_class=ArgumentDefaultsHelpFormatter)
371         parser.add_argument('-v', '--verbose',
372                             dest='verbose',
373                             action='count',
374                             help='set verbosity level')
375         parser.add_argument('-V', '--version',
376                             action='version',
377                             version=program_version_message,
378                             help='Display version information')
379         parser.add_argument('-a', '--api-version',
380                             dest='api_version',
381                             default='5',
382                             help='set API version')
383         parser.add_argument('-c', '--config',
384                             dest='config',
385                             default='/etc/opt/att/collector.conf',
386                             help='Use this config file.',
387                             metavar='<file>')
388         parser.add_argument('-s', '--section',
389                             dest='section',
390                             default='default',
391                             metavar='<section>',
392                             help='section to use in the config file')
393
394         #----------------------------------------------------------------------
395         # Process arguments received.
396         #----------------------------------------------------------------------
397         args = parser.parse_args()
398         verbose = args.verbose
399         api_version = args.api_version
400         config_file = args.config
401         config_section = args.section
402
403         #----------------------------------------------------------------------
404         # Now read the config file, using command-line supplied values as
405         # overrides.
406         #----------------------------------------------------------------------
407         defaults = {'log_file': 'collector.log',
408                     'vel_port': '12233',
409                     'vel_path': '',
410                     'vel_topic_name': ''
411                    }
412         overrides = {}
413         config = ConfigParser.SafeConfigParser(defaults)
414         config.read(config_file)
415
416         #----------------------------------------------------------------------
417         # extract the values we want.
418         #----------------------------------------------------------------------
419         log_file = config.get(config_section, 'log_file', vars=overrides)
420         vel_port = config.get(config_section, 'vel_port', vars=overrides)
421         vel_path = config.get(config_section, 'vel_path', vars=overrides)
422         vel_topic_name = config.get(config_section,
423                                     'vel_topic_name',
424                                     vars=overrides)
425         global vel_username
426         global vel_password
427         vel_username = config.get(config_section,
428                                   'vel_username',
429                                   vars=overrides)
430         vel_password = config.get(config_section,
431                                   'vel_password',
432                                   vars=overrides)
433         vel_schema_file = config.get(config_section,
434                                      'schema_file',
435                                      vars=overrides)
436         base_schema_file = config.get(config_section,
437                                       'base_schema_file',
438                                       vars=overrides)
439         throttle_schema_file = config.get(config_section,
440                                           'throttle_schema_file',
441                                           vars=overrides)
442         test_control_schema_file = config.get(config_section,
443                                            'test_control_schema_file',
444                                            vars=overrides)
445
446         #----------------------------------------------------------------------
447         # Finally we have enough info to start a proper flow trace.
448         #----------------------------------------------------------------------
449         global logger
450         print('Logfile: {0}'.format(log_file))
451         logger = logging.getLogger('collector')
452         if verbose > 0:
453             print('Verbose mode on')
454             logger.setLevel(logging.DEBUG)
455         else:
456             logger.setLevel(logging.INFO)
457         handler = logging.handlers.RotatingFileHandler(log_file,
458                                                        maxBytes=1000000,
459                                                        backupCount=10)
460         if (platform.system() == 'Windows'):
461             date_format = '%Y-%m-%d %H:%M:%S'
462         else:
463             date_format = '%Y-%m-%d %H:%M:%S.%f %z'
464         formatter = logging.Formatter('%(asctime)s %(name)s - '
465                                       '%(levelname)s - %(message)s',
466                                       date_format)
467         handler.setFormatter(formatter)
468         logger.addHandler(handler)
469         logger.info('Started')
470
471         #----------------------------------------------------------------------
472         # Log the details of the configuration.
473         #----------------------------------------------------------------------
474         logger.debug('Log file = {0}'.format(log_file))
475         logger.debug('Event Listener Port = {0}'.format(vel_port))
476         logger.debug('Event Listener Path = {0}'.format(vel_path))
477         logger.debug('Event Listener Topic = {0}'.format(vel_topic_name))
478         logger.debug('Event Listener Username = {0}'.format(vel_username))
479         # logger.debug('Event Listener Password = {0}'.format(vel_password))
480         logger.debug('Event Listener JSON Schema File = {0}'.format(
481                                                               vel_schema_file))
482         logger.debug('Base JSON Schema File = {0}'.format(base_schema_file))
483         logger.debug('Throttle JSON Schema File = {0}'.format(
484                                                          throttle_schema_file))
485         logger.debug('Test Control JSON Schema File = {0}'.format(
486                                                      test_control_schema_file))
487
488         #----------------------------------------------------------------------
489         # Perform some basic error checking on the config.
490         #----------------------------------------------------------------------
491         if (int(vel_port) < 1024 or int(vel_port) > 65535):
492             logger.error('Invalid Vendor Event Listener port ({0}) '
493                          'specified'.format(vel_port))
494             raise RuntimeError('Invalid Vendor Event Listener port ({0}) '
495                                'specified'.format(vel_port))
496
497         if (len(vel_path) > 0 and vel_path[-1] != '/'):
498             logger.warning('Event Listener Path ({0}) should have terminating '
499                            '"/"!  Adding one on to configured string.'.format(
500                                                                      vel_path))
501             vel_path += '/'
502
503         #----------------------------------------------------------------------
504         # Load up the vel_schema, if it exists.
505         #----------------------------------------------------------------------
506         if not os.path.exists(vel_schema_file):
507             logger.warning('Event Listener Schema File ({0}) not found. '
508                            'No validation will be undertaken.'.format(
509                                                               vel_schema_file))
510         else:
511             global vel_schema
512             global throttle_schema
513             global test_control_schema
514             vel_schema = json.load(open(vel_schema_file, 'r'))
515             logger.debug('Loaded the JSON schema file')
516
517             #------------------------------------------------------------------
518             # Load up the throttle_schema, if it exists.
519             #------------------------------------------------------------------
520             if (os.path.exists(throttle_schema_file)):
521                 logger.debug('Loading throttle schema')
522                 throttle_fragment = json.load(open(throttle_schema_file, 'r'))
523                 throttle_schema = {}
524                 throttle_schema.update(vel_schema)
525                 throttle_schema.update(throttle_fragment)
526                 logger.debug('Loaded the throttle schema')
527
528             #------------------------------------------------------------------
529             # Load up the test control _schema, if it exists.
530             #------------------------------------------------------------------
531             if (os.path.exists(test_control_schema_file)):
532                 logger.debug('Loading test control schema')
533                 test_control_fragment = json.load(
534                     open(test_control_schema_file, 'r'))
535                 test_control_schema = {}
536                 test_control_schema.update(vel_schema)
537                 test_control_schema.update(test_control_fragment)
538                 logger.debug('Loaded the test control schema')
539
540             #------------------------------------------------------------------
541             # Load up the base_schema, if it exists.
542             #------------------------------------------------------------------
543             if (os.path.exists(base_schema_file)):
544                 logger.debug('Updating the schema with base definition')
545                 base_schema = json.load(open(base_schema_file, 'r'))
546                 vel_schema.update(base_schema)
547                 logger.debug('Updated the JSON schema file')
548
549         #----------------------------------------------------------------------
550         # We are now ready to get started with processing. Start-up the various
551         # components of the system in order:
552         #
553         #  1) Create the dispatcher.
554         #  2) Register the functions for the URLs of interest.
555         #  3) Run the webserver.
556         #----------------------------------------------------------------------
557         root_url = '/{0}eventListener/v{1}{2}'.\
558                    format(vel_path,
559                           api_version,
560                           '/' + vel_topic_name
561                           if len(vel_topic_name) > 0
562                           else '')
563         throttle_url = '/{0}eventListener/v{1}/clientThrottlingState'.\
564                        format(vel_path, api_version)
565         set_404_content(root_url)
566         dispatcher = PathDispatcher()
567         vendor_event_listener = partial(listener, schema = vel_schema)
568         dispatcher.register('GET', root_url, vendor_event_listener)
569         dispatcher.register('POST', root_url, vendor_event_listener)
570         vendor_throttle_listener = partial(listener, schema = throttle_schema)
571         dispatcher.register('GET', throttle_url, vendor_throttle_listener)
572         dispatcher.register('POST', throttle_url, vendor_throttle_listener)
573
574         #----------------------------------------------------------------------
575         # We also add a POST-only mechanism for test control, so that we can
576         # send commands to a single attached client.
577         #----------------------------------------------------------------------
578         test_control_url = '/testControl/v{0}/commandList'.format(api_version)
579         test_control_listener = partial(test_listener,
580                                         schema = test_control_schema)
581         dispatcher.register('POST', test_control_url, test_control_listener)
582         dispatcher.register('GET', test_control_url, test_control_listener)
583
584         httpd = make_server('', int(vel_port), dispatcher)
585         print('Serving on port {0}...'.format(vel_port))
586         httpd.serve_forever()
587
588         logger.error('Main loop exited unexpectedly!')
589         return 0
590
591     except KeyboardInterrupt:
592         #----------------------------------------------------------------------
593         # handle keyboard interrupt
594         #----------------------------------------------------------------------
595         logger.info('Exiting on keyboard interrupt!')
596         return 0
597
598     except Exception as e:
599         #----------------------------------------------------------------------
600         # Handle unexpected exceptions.
601         #----------------------------------------------------------------------
602         if DEBUG or TESTRUN:
603             raise(e)
604         indent = len(program_name) * ' '
605         sys.stderr.write(program_name + ': ' + repr(e) + '\n')
606         sys.stderr.write(indent + '  for help use --help\n')
607         sys.stderr.write(traceback.format_exc())
608         logger.critical('Exiting because of exception: {0}'.format(e))
609         logger.critical(traceback.format_exc())
610         return 2
611
612 #------------------------------------------------------------------------------
613 # MAIN SCRIPT ENTRY POINT.
614 #------------------------------------------------------------------------------
615 if __name__ == '__main__':
616     if TESTRUN:
617         #----------------------------------------------------------------------
618         # Running tests - note that doctest comments haven't been included so
619         # this is a hook for future improvements.
620         #----------------------------------------------------------------------
621         import doctest
622         doctest.testmod()
623
624     if PROFILE:
625         #----------------------------------------------------------------------
626         # Profiling performance.  Performance isn't expected to be a major
627         # issue, but this should all work as expected.
628         #----------------------------------------------------------------------
629         import cProfile
630         import pstats
631         profile_filename = 'collector_profile.txt'
632         cProfile.run('main()', profile_filename)
633         statsfile = open('collector_profile_stats.txt', 'wb')
634         p = pstats.Stats(profile_filename, stream=statsfile)
635         stats = p.strip_dirs().sort_stats('cumulative')
636         stats.print_stats()
637         statsfile.close()
638         sys.exit(0)
639
640     #--------------------------------------------------------------------------
641     # Normal operation - call through to the main function.
642     #--------------------------------------------------------------------------
643     sys.exit(main())