26149a233c307b268a58c9f707fd0c341687de07
[sdc/sdc-distribution-client.git] /
1 # -*- encoding: utf-8 -*-
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #  not use this file except in compliance with the License. You may obtain
5 #  a copy of the License at
6 #
7 #       http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #  License for the specific language governing permissions and limitations
13 #  under the License.
14
15 import argparse
16 try:
17     from StringIO import StringIO
18 except ImportError:
19     from io import StringIO
20
21 import codecs
22 import locale
23 import mock
24 import six
25 import sys
26
27 from cliff import app as application
28 from cliff import command as c_cmd
29 from cliff import commandmanager
30 from cliff.tests import utils as test_utils
31 from cliff import utils
32
33
34 def make_app(**kwargs):
35     cmd_mgr = commandmanager.CommandManager('cliff.tests')
36
37     # Register a command that succeeds
38     command = mock.MagicMock(spec=c_cmd.Command)
39     command_inst = mock.MagicMock(spec=c_cmd.Command)
40     command_inst.run.return_value = 0
41     command.return_value = command_inst
42     cmd_mgr.add_command('mock', command)
43
44     # Register a command that fails
45     err_command = mock.Mock(name='err_command', spec=c_cmd.Command)
46     err_command_inst = mock.Mock(spec=c_cmd.Command)
47     err_command_inst.run = mock.Mock(
48         side_effect=RuntimeError('test exception')
49     )
50     err_command.return_value = err_command_inst
51     cmd_mgr.add_command('error', err_command)
52
53     app = application.App('testing interactive mode',
54                           '1',
55                           cmd_mgr,
56                           stderr=mock.Mock(),  # suppress warning messages
57                           **kwargs
58                           )
59     return app, command
60
61
62 def test_no_args_triggers_interactive_mode():
63     app, command = make_app()
64     app.interact = mock.MagicMock(name='inspect')
65     app.run([])
66     app.interact.assert_called_once_with()
67
68
69 def test_interactive_mode_cmdloop():
70     app, command = make_app()
71     app.interactive_app_factory = mock.MagicMock(
72         name='interactive_app_factory'
73     )
74     assert app.interpreter is None
75     app.run([])
76     assert app.interpreter is not None
77     app.interactive_app_factory.return_value.cmdloop.assert_called_once_with()
78
79
80 def test_initialize_app():
81     app, command = make_app()
82     app.initialize_app = mock.MagicMock(name='initialize_app')
83     app.run(['mock'])
84     app.initialize_app.assert_called_once_with(['mock'])
85
86
87 def test_prepare_to_run_command():
88     app, command = make_app()
89     app.prepare_to_run_command = mock.MagicMock(name='prepare_to_run_command')
90     app.run(['mock'])
91     app.prepare_to_run_command.assert_called_once_with(command())
92
93
94 def test_clean_up_success():
95     app, command = make_app()
96     app.clean_up = mock.MagicMock(name='clean_up')
97     app.run(['mock'])
98     app.clean_up.assert_called_once_with(command.return_value, 0, None)
99
100
101 def test_clean_up_error():
102     app, command = make_app()
103
104     app.clean_up = mock.MagicMock(name='clean_up')
105     app.run(['error'])
106
107     app.clean_up.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY)
108     call_args = app.clean_up.call_args_list[0]
109     assert call_args == mock.call(mock.ANY, 1, mock.ANY)
110     args, kwargs = call_args
111     assert isinstance(args[2], RuntimeError)
112     assert args[2].args == ('test exception',)
113
114
115 def test_clean_up_error_debug():
116     app, command = make_app()
117
118     app.clean_up = mock.MagicMock(name='clean_up')
119     try:
120         app.run(['--debug', 'error'])
121     except RuntimeError as err:
122         assert app.clean_up.call_args_list[0][0][2] is err
123     else:
124         assert False, 'Should have had an exception'
125
126     assert app.clean_up.called
127     call_args = app.clean_up.call_args_list[0]
128     assert call_args == mock.call(mock.ANY, 1, mock.ANY)
129     args, kwargs = call_args
130     assert isinstance(args[2], RuntimeError)
131     assert args[2].args == ('test exception',)
132
133
134 def test_error_handling_clean_up_raises_exception():
135     app, command = make_app()
136
137     app.clean_up = mock.MagicMock(
138         name='clean_up',
139         side_effect=RuntimeError('within clean_up'),
140     )
141     app.run(['error'])
142
143     assert app.clean_up.called
144     call_args = app.clean_up.call_args_list[0]
145     assert call_args == mock.call(mock.ANY, 1, mock.ANY)
146     args, kwargs = call_args
147     assert isinstance(args[2], RuntimeError)
148     assert args[2].args == ('test exception',)
149
150
151 def test_error_handling_clean_up_raises_exception_debug():
152     app, command = make_app()
153
154     app.clean_up = mock.MagicMock(
155         name='clean_up',
156         side_effect=RuntimeError('within clean_up'),
157     )
158     try:
159         app.run(['--debug', 'error'])
160     except RuntimeError as err:
161         if not hasattr(err, '__context__'):
162             # The exception passed to clean_up is not the exception
163             # caused *by* clean_up.  This test is only valid in python
164             # 2 because under v3 the original exception is re-raised
165             # with the new one as a __context__ attribute.
166             assert app.clean_up.call_args_list[0][0][2] is not err
167     else:
168         assert False, 'Should have had an exception'
169
170     assert app.clean_up.called
171     call_args = app.clean_up.call_args_list[0]
172     assert call_args == mock.call(mock.ANY, 1, mock.ANY)
173     args, kwargs = call_args
174     assert isinstance(args[2], RuntimeError)
175     assert args[2].args == ('test exception',)
176
177
178 def test_normal_clean_up_raises_exception():
179     app, command = make_app()
180
181     app.clean_up = mock.MagicMock(
182         name='clean_up',
183         side_effect=RuntimeError('within clean_up'),
184     )
185     app.run(['mock'])
186
187     assert app.clean_up.called
188     call_args = app.clean_up.call_args_list[0]
189     assert call_args == mock.call(mock.ANY, 0, None)
190
191
192 def test_normal_clean_up_raises_exception_debug():
193     app, command = make_app()
194
195     app.clean_up = mock.MagicMock(
196         name='clean_up',
197         side_effect=RuntimeError('within clean_up'),
198     )
199     app.run(['--debug', 'mock'])
200
201     assert app.clean_up.called
202     call_args = app.clean_up.call_args_list[0]
203     assert call_args == mock.call(mock.ANY, 0, None)
204
205
206 def test_build_option_parser_conflicting_option_should_throw():
207     class MyApp(application.App):
208         def __init__(self):
209             super(MyApp, self).__init__(
210                 description='testing',
211                 version='0.1',
212                 command_manager=commandmanager.CommandManager('tests'),
213             )
214
215         def build_option_parser(self, description, version):
216             parser = super(MyApp, self).build_option_parser(description,
217                                                             version)
218             parser.add_argument(
219                 '-h', '--help',
220                 default=self,  # tricky
221                 help="Show help message and exit.",
222             )
223
224     # TODO: tests should really use unittest2.
225     try:
226         MyApp()
227     except argparse.ArgumentError:
228         pass
229     else:
230         raise Exception('Exception was not thrown')
231
232
233 def test_option_parser_conflicting_option_custom_arguments_should_not_throw():
234     class MyApp(application.App):
235         def __init__(self):
236             super(MyApp, self).__init__(
237                 description='testing',
238                 version='0.1',
239                 command_manager=commandmanager.CommandManager('tests'),
240             )
241
242         def build_option_parser(self, description, version):
243             argparse_kwargs = {'conflict_handler': 'resolve'}
244             parser = super(MyApp, self).build_option_parser(
245                 description,
246                 version,
247                 argparse_kwargs=argparse_kwargs)
248             parser.add_argument(
249                 '-h', '--help',
250                 default=self,  # tricky
251                 help="Show help message and exit.",
252             )
253
254     MyApp()
255
256
257 def test_option_parser_abbrev_issue():
258     class MyCommand(c_cmd.Command):
259         def get_parser(self, prog_name):
260             parser = super(MyCommand, self).get_parser(prog_name)
261             parser.add_argument("--end")
262             return parser
263
264         def take_action(self, parsed_args):
265             assert(parsed_args.end == '123')
266
267     class MyCommandManager(commandmanager.CommandManager):
268         def load_commands(self, namespace):
269             self.add_command("mycommand", MyCommand)
270
271     class MyApp(application.App):
272         def __init__(self):
273             super(MyApp, self).__init__(
274                 description='testing',
275                 version='0.1',
276                 command_manager=MyCommandManager(None),
277             )
278
279         def build_option_parser(self, description, version):
280             parser = super(MyApp, self).build_option_parser(
281                 description,
282                 version,
283                 argparse_kwargs={'allow_abbrev': False})
284             parser.add_argument('--endpoint')
285             return parser
286
287     app = MyApp()
288     # NOTE(jd) --debug is necessary so assert in take_action() raises correctly
289     # here
290     app.run(['--debug', 'mycommand', '--end', '123'])
291
292
293 def _test_help(deferred_help):
294     app, _ = make_app(deferred_help=deferred_help)
295     with mock.patch.object(app, 'initialize_app') as init:
296         with mock.patch('cliff.help.HelpAction.__call__',
297                         side_effect=SystemExit(0)) as helper:
298             try:
299                 app.run(['--help'])
300             except SystemExit:
301                 pass
302             else:
303                 raise Exception('Exception was not thrown')
304             assert helper.called
305         assert init.called == deferred_help
306
307
308 def test_help():
309     _test_help(False)
310
311
312 def test_deferred_help():
313     _test_help(True)
314
315
316 def test_subcommand_help():
317     app, _ = make_app(deferred_help=False)
318
319     # Help is called immediately
320     with mock.patch('cliff.help.HelpAction.__call__') as helper:
321         app.run(['show', 'files', '--help'])
322
323     assert helper.called
324
325
326 def test_subcommand_deferred_help():
327     app, _ = make_app(deferred_help=True)
328
329     # Show that provide_help_if_requested() did not show help and exit
330     with mock.patch.object(app, 'run_subcommand') as helper:
331         app.run(['show', 'files', '--help'])
332
333     helper.assert_called_once_with(['help', 'show', 'files'])
334
335
336 def test_unknown_cmd():
337     app, command = make_app()
338     assert app.run(['hell']) == 2
339
340
341 def test_unknown_cmd_debug():
342     app, command = make_app()
343     try:
344         app.run(['--debug', 'hell']) == 2
345     except ValueError as err:
346         assert "['hell']" in ('%s' % err)
347
348
349 def test_list_matching_commands():
350     stdout = StringIO()
351     app = application.App('testing', '1',
352                           test_utils.TestCommandManager(
353                             test_utils.TEST_NAMESPACE),
354                           stdout=stdout)
355     app.NAME = 'test'
356     try:
357         assert app.run(['t']) == 2
358     except SystemExit:
359         pass
360     output = stdout.getvalue()
361     assert "test: 't' is not a test command. See 'test --help'." in output
362     assert 'Did you mean one of these?' in output
363     assert 'three word command\n  two words\n' in output
364
365
366 def test_fuzzy_no_commands():
367     cmd_mgr = commandmanager.CommandManager('cliff.fuzzy')
368     app = application.App('test', '1.0', cmd_mgr)
369     cmd_mgr.commands = {}
370     matches = app.get_fuzzy_matches('foo')
371     assert matches == []
372
373
374 def test_fuzzy_common_prefix():
375     # searched string is a prefix of all commands
376     cmd_mgr = commandmanager.CommandManager('cliff.fuzzy')
377     app = application.App('test', '1.0', cmd_mgr)
378     cmd_mgr.commands = {}
379     cmd_mgr.add_command('user list', test_utils.TestCommand)
380     cmd_mgr.add_command('user show', test_utils.TestCommand)
381     matches = app.get_fuzzy_matches('user')
382     assert matches == ['user list', 'user show']
383
384
385 def test_fuzzy_same_distance():
386     # searched string has the same distance to all commands
387     cmd_mgr = commandmanager.CommandManager('cliff.fuzzy')
388     app = application.App('test', '1.0', cmd_mgr)
389     cmd_mgr.add_command('user', test_utils.TestCommand)
390     for cmd in cmd_mgr.commands.keys():
391         assert utils.damerau_levenshtein('node', cmd, utils.COST) == 8
392     matches = app.get_fuzzy_matches('node')
393     assert matches == ['complete', 'help', 'user']
394
395
396 def test_fuzzy_no_prefix():
397     # search by distance, no common prefix with any command
398     cmd_mgr = commandmanager.CommandManager('cliff.fuzzy')
399     app = application.App('test', '1.0', cmd_mgr)
400     cmd_mgr.add_command('user', test_utils.TestCommand)
401     matches = app.get_fuzzy_matches('uesr')
402     assert matches == ['user']
403
404
405 def test_verbose():
406     app, command = make_app()
407     app.clean_up = mock.MagicMock(name='clean_up')
408     app.run(['--verbose', 'mock'])
409     app.clean_up.assert_called_once_with(command.return_value, 0, None)
410     app.clean_up.reset_mock()
411     app.run(['--quiet', 'mock'])
412     app.clean_up.assert_called_once_with(command.return_value, 0, None)
413     try:
414         app.run(['--verbose', '--quiet', 'mock'])
415     except SystemExit:
416         pass
417     else:
418         raise Exception('Exception was not thrown')
419
420
421 def test_io_streams():
422     cmd_mgr = commandmanager.CommandManager('cliff.tests')
423     io = mock.Mock()
424
425     if six.PY2:
426         stdin_save = sys.stdin
427         stdout_save = sys.stdout
428         stderr_save = sys.stderr
429         encoding = locale.getpreferredencoding() or 'utf-8'
430
431         app = application.App('no io streams', 1, cmd_mgr)
432         assert isinstance(app.stdin, codecs.StreamReader)
433         assert isinstance(app.stdout, codecs.StreamWriter)
434         assert isinstance(app.stderr, codecs.StreamWriter)
435
436         app = application.App('with stdin io stream', 1, cmd_mgr, stdin=io)
437         assert app.stdin is io
438         assert isinstance(app.stdout, codecs.StreamWriter)
439         assert isinstance(app.stderr, codecs.StreamWriter)
440
441         app = application.App('with stdout io stream', 1, cmd_mgr, stdout=io)
442         assert isinstance(app.stdin, codecs.StreamReader)
443         assert app.stdout is io
444         assert isinstance(app.stderr, codecs.StreamWriter)
445
446         app = application.App('with stderr io stream', 1, cmd_mgr, stderr=io)
447         assert isinstance(app.stdin, codecs.StreamReader)
448         assert isinstance(app.stdout, codecs.StreamWriter)
449         assert app.stderr is io
450
451         try:
452             sys.stdin = codecs.getreader(encoding)(sys.stdin)
453             app = application.App(
454                 'with wrapped sys.stdin io stream', 1, cmd_mgr)
455             assert app.stdin is sys.stdin
456             assert isinstance(app.stdout, codecs.StreamWriter)
457             assert isinstance(app.stderr, codecs.StreamWriter)
458         finally:
459             sys.stdin = stdin_save
460
461         try:
462             sys.stdout = codecs.getwriter(encoding)(sys.stdout)
463             app = application.App('with wrapped stdout io stream', 1, cmd_mgr)
464             assert isinstance(app.stdin, codecs.StreamReader)
465             assert app.stdout is sys.stdout
466             assert isinstance(app.stderr, codecs.StreamWriter)
467         finally:
468             sys.stdout = stdout_save
469
470         try:
471             sys.stderr = codecs.getwriter(encoding)(sys.stderr)
472             app = application.App('with wrapped stderr io stream', 1, cmd_mgr)
473             assert isinstance(app.stdin, codecs.StreamReader)
474             assert isinstance(app.stdout, codecs.StreamWriter)
475             assert app.stderr is sys.stderr
476         finally:
477             sys.stderr = stderr_save
478
479     else:
480         app = application.App('no io streams', 1, cmd_mgr)
481         assert app.stdin is sys.stdin
482         assert app.stdout is sys.stdout
483         assert app.stderr is sys.stderr
484
485         app = application.App('with stdin io stream', 1, cmd_mgr, stdin=io)
486         assert app.stdin is io
487         assert app.stdout is sys.stdout
488         assert app.stderr is sys.stderr
489
490         app = application.App('with stdout io stream', 1, cmd_mgr, stdout=io)
491         assert app.stdin is sys.stdin
492         assert app.stdout is io
493         assert app.stderr is sys.stderr
494
495         app = application.App('with stderr io stream', 1, cmd_mgr, stderr=io)
496         assert app.stdin is sys.stdin
497         assert app.stdout is sys.stdout
498         assert app.stderr is io