f3d037761c620d1f7481408dcabe521e9bdfb0cd
[sdc/sdc-distribution-client.git] /
1 # -*- coding: utf-8 -*-
2 # Copyright (C) 2001,2002 Python Software Foundation
3 # csv package unit tests
4
5 import array
6 import decimal
7 import os
8 import string
9 import sys
10 import tempfile
11 import unittest2 as unittest
12 from codecs import EncodedFile
13 from io import BytesIO
14
15 import unicodecsv as csv
16
17 try:
18     # Python 2
19     chr = unichr
20 except:
21     pass
22
23
24 # pypy and cpython differ under which exception is raised under some
25 # circumstances e.g. whether a module is written in C or not.
26 py_compat_exc = (TypeError, AttributeError)
27
28
29 class Test_Csv(unittest.TestCase):
30     """
31     Test the underlying C csv parser in ways that are not appropriate
32     from the high level interface. Further tests of this nature are done
33     in TestDialectRegistry.
34     """
35     def _test_arg_valid(self, ctor, arg):
36         self.assertRaises(py_compat_exc, ctor)
37         self.assertRaises(py_compat_exc, ctor, None)
38         self.assertRaises(py_compat_exc, ctor, arg, bad_attr=0)
39         self.assertRaises(py_compat_exc, ctor, arg, delimiter=0)
40         self.assertRaises(py_compat_exc, ctor, arg, delimiter='XX')
41         self.assertRaises(csv.Error, ctor, arg, 'foo')
42         self.assertRaises(py_compat_exc, ctor, arg, delimiter=None)
43         self.assertRaises(py_compat_exc, ctor, arg, delimiter=1)
44         self.assertRaises(py_compat_exc, ctor, arg, quotechar=1)
45         self.assertRaises(py_compat_exc, ctor, arg, lineterminator=None)
46         self.assertRaises(py_compat_exc, ctor, arg, lineterminator=1)
47         self.assertRaises(py_compat_exc, ctor, arg, quoting=None)
48         self.assertRaises(py_compat_exc, ctor, arg,
49                           quoting=csv.QUOTE_ALL, quotechar='')
50         self.assertRaises(py_compat_exc, ctor, arg,
51                           quoting=csv.QUOTE_ALL, quotechar=None)
52
53     def test_reader_arg_valid(self):
54         self._test_arg_valid(csv.reader, [])
55
56     def test_writer_arg_valid(self):
57         self._test_arg_valid(csv.writer, BytesIO())
58
59     def _test_default_attrs(self, ctor, *args):
60         obj = ctor(*args)
61         # Check defaults
62         self.assertEqual(obj.dialect.delimiter, ',')
63         self.assertEqual(obj.dialect.doublequote, True)
64         self.assertEqual(obj.dialect.escapechar, None)
65         self.assertEqual(obj.dialect.lineterminator, "\r\n")
66         self.assertEqual(obj.dialect.quotechar, '"')
67         self.assertEqual(obj.dialect.quoting, csv.QUOTE_MINIMAL)
68         self.assertEqual(obj.dialect.skipinitialspace, False)
69         self.assertEqual(obj.dialect.strict, False)
70         # Try deleting or changing attributes (they are read-only)
71         self.assertRaises(py_compat_exc, delattr,
72                           obj.dialect, 'delimiter')
73         self.assertRaises(py_compat_exc, setattr,
74                           obj.dialect, 'delimiter', ':')
75         self.assertRaises(py_compat_exc, delattr,
76                           obj.dialect, 'quoting')
77         self.assertRaises(py_compat_exc, setattr,
78                           obj.dialect, 'quoting', None)
79
80     def test_reader_attrs(self):
81         self._test_default_attrs(csv.reader, [])
82
83     def test_writer_attrs(self):
84         self._test_default_attrs(csv.writer, BytesIO())
85
86     def _test_kw_attrs(self, ctor, *args):
87         # Now try with alternate options
88         kwargs = dict(delimiter=':', doublequote=False, escapechar='\\',
89                       lineterminator='\r', quotechar='*',
90                       quoting=csv.QUOTE_NONE, skipinitialspace=True,
91                       strict=True)
92         obj = ctor(*args, **kwargs)
93         self.assertEqual(obj.dialect.delimiter, ':')
94         self.assertEqual(obj.dialect.doublequote, False)
95         self.assertEqual(obj.dialect.escapechar, '\\')
96         self.assertEqual(obj.dialect.lineterminator, "\r")
97         self.assertEqual(obj.dialect.quotechar, '*')
98         self.assertEqual(obj.dialect.quoting, csv.QUOTE_NONE)
99         self.assertEqual(obj.dialect.skipinitialspace, True)
100         self.assertEqual(obj.dialect.strict, True)
101
102     def test_reader_kw_attrs(self):
103         self._test_kw_attrs(csv.reader, [])
104
105     def test_writer_kw_attrs(self):
106         self._test_kw_attrs(csv.writer, BytesIO())
107
108     def _test_dialect_attrs(self, ctor, *args):
109         # Now try with dialect-derived options
110         class dialect:
111             delimiter = '-'
112             doublequote = False
113             escapechar = '^'
114             lineterminator = '$'
115             quotechar = '#'
116             quoting = csv.QUOTE_ALL
117             skipinitialspace = True
118             strict = False
119         args = args + (dialect,)
120         obj = ctor(*args)
121         self.assertEqual(obj.dialect.delimiter, '-')
122         self.assertEqual(obj.dialect.doublequote, False)
123         self.assertEqual(obj.dialect.escapechar, '^')
124         self.assertEqual(obj.dialect.lineterminator, "$")
125         self.assertEqual(obj.dialect.quotechar, '#')
126         self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)
127         self.assertEqual(obj.dialect.skipinitialspace, True)
128         self.assertEqual(obj.dialect.strict, False)
129
130     def test_reader_dialect_attrs(self):
131         self._test_dialect_attrs(csv.reader, [])
132
133     def test_writer_dialect_attrs(self):
134         self._test_dialect_attrs(csv.writer, BytesIO())
135
136     def _write_test(self, fields, expect, **kwargs):
137         fd, name = tempfile.mkstemp()
138         fileobj = os.fdopen(fd, "w+b")
139         try:
140             writer = csv.writer(fileobj, **kwargs)
141             writer.writerow(fields)
142             fileobj.seek(0)
143             self.assertEqual(fileobj.read(),
144                              expect + writer.dialect.lineterminator.encode('utf-8'))
145         finally:
146             fileobj.close()
147             os.unlink(name)
148
149     def test_write_arg_valid(self):
150         import sys
151         pypy3 = hasattr(sys, 'pypy_version_info') and sys.version_info.major == 3
152
153         self.assertRaises(TypeError if pypy3 else csv.Error, self._write_test, None, '')
154         self._write_test((), b'')
155         self._write_test([None], b'""')
156         self.assertRaises(csv.Error, self._write_test,
157                           [None], None, quoting=csv.QUOTE_NONE)
158
159         # Check that exceptions are passed up the chain
160         class BadList:
161             def __len__(self):
162                 return 10
163
164             def __getitem__(self, i):
165                 if i > 2:
166                     raise IOError
167
168         self.assertRaises(IOError, self._write_test, BadList(), '')
169
170         class BadItem:
171             def __str__(self):
172                 raise IOError
173
174         self.assertRaises(IOError, self._write_test, [BadItem()], '')
175
176     def test_write_bigfield(self):
177         # This exercises the buffer realloc functionality
178         bigstring = 'X' * 50000
179         self._write_test([bigstring, bigstring],
180                          b','.join([bigstring.encode('utf-8')] * 2))
181
182     def test_write_quoting(self):
183         self._write_test(['a', 1, 'p,q'], b'a,1,"p,q"')
184         self.assertRaises(csv.Error,
185                           self._write_test,
186                           ['a', 1, 'p,q'], b'a,1,p,q',
187                           quoting=csv.QUOTE_NONE)
188         self._write_test(['a', 1, 'p,q'], b'a,1,"p,q"',
189                          quoting=csv.QUOTE_MINIMAL)
190         self._write_test(['a', 1, 'p,q'], b'"a",1,"p,q"',
191                          quoting=csv.QUOTE_NONNUMERIC)
192         self._write_test(['a', 1, 'p,q'], b'"a","1","p,q"',
193                          quoting=csv.QUOTE_ALL)
194         self._write_test(['a\nb', 1], b'"a\nb","1"',
195                          quoting=csv.QUOTE_ALL)
196
197     def test_write_decimal(self):
198         self._write_test(['a', decimal.Decimal("1.1"), 'p,q'], b'"a",1.1,"p,q"',
199                          quoting=csv.QUOTE_NONNUMERIC)
200
201     def test_write_escape(self):
202         self._write_test(['a', 1, 'p,q'], b'a,1,"p,q"',
203                          escapechar='\\')
204         self.assertRaises(csv.Error,
205                           self._write_test,
206                           ['a', 1, 'p,"q"'], b'a,1,"p,\\"q\\""',
207                           escapechar=None, doublequote=False)
208         self._write_test(['a', 1, 'p,"q"'], b'a,1,"p,\\"q\\""',
209                          escapechar='\\', doublequote=False)
210         self._write_test(['"'], b'""""',
211                          escapechar='\\', quoting=csv.QUOTE_MINIMAL)
212         self._write_test(['"'], b'\\"',
213                          escapechar='\\', quoting=csv.QUOTE_MINIMAL,
214                          doublequote=False)
215         self._write_test(['"'], b'\\"',
216                          escapechar='\\', quoting=csv.QUOTE_NONE)
217         self._write_test(['a', 1, 'p,q'], b'a,1,p\\,q',
218                          escapechar='\\', quoting=csv.QUOTE_NONE)
219
220     def test_writerows(self):
221         class BrokenFile:
222             def write(self, buf):
223                 raise IOError
224
225         writer = csv.writer(BrokenFile())
226         self.assertRaises(IOError, writer.writerows, [['a']])
227
228         fd, name = tempfile.mkstemp()
229         fileobj = os.fdopen(fd, "w+b")
230         try:
231             writer = csv.writer(fileobj)
232             self.assertRaises(TypeError, writer.writerows, None)
233             writer.writerows([['a', 'b'], ['c', 'd']])
234             fileobj.seek(0)
235             self.assertEqual(fileobj.read(), b"a,b\r\nc,d\r\n")
236         finally:
237             fileobj.close()
238             os.unlink(name)
239
240     def _read_test(self, input, expect, **kwargs):
241         reader = csv.reader(input, **kwargs)
242         result = list(reader)
243         self.assertEqual(result, expect)
244
245     def test_read_oddinputs(self):
246         self._read_test([], [])
247         self._read_test([b''], [[]])
248         self.assertRaises(csv.Error, self._read_test,
249                           [b'"ab"c'], None, strict=1)
250         # cannot handle null bytes for the moment
251         self.assertRaises(csv.Error, self._read_test,
252                           [b'ab\0c'], None, strict=1)
253         self._read_test([b'"ab"c'], [['abc']], doublequote=0)
254
255     def test_read_eol(self):
256         self._read_test([b'a,b'], [['a', 'b']])
257         self._read_test([b'a,b\n'], [['a', 'b']])
258         self._read_test([b'a,b\r\n'], [['a', 'b']])
259         self._read_test([b'a,b\r'], [['a', 'b']])
260         self.assertRaises(csv.Error, self._read_test, [b'a,b\rc,d'], [])
261         self.assertRaises(csv.Error, self._read_test, [b'a,b\nc,d'], [])
262         self.assertRaises(csv.Error, self._read_test, [b'a,b\r\nc,d'], [])
263
264     def test_read_escape(self):
265         self._read_test([b'a,\\b,c'], [['a', 'b', 'c']], escapechar='\\')
266         self._read_test([b'a,b\\,c'], [['a', 'b,c']], escapechar='\\')
267         self._read_test([b'a,"b\\,c"'], [['a', 'b,c']], escapechar='\\')
268         self._read_test([b'a,"b,\\c"'], [['a', 'b,c']], escapechar='\\')
269         self._read_test([b'a,"b,c\\""'], [['a', 'b,c"']], escapechar='\\')
270         self._read_test([b'a,"b,c"\\'], [['a', 'b,c\\']], escapechar='\\')
271
272     def test_read_quoting(self):
273         self._read_test([b'1,",3,",5'], [['1', ',3,', '5']])
274         self._read_test([b'1,",3,",5'], [['1', '"', '3', '"', '5']],
275                         quotechar=None, escapechar='\\')
276         self._read_test([b'1,",3,",5'], [['1', '"', '3', '"', '5']],
277                         quoting=csv.QUOTE_NONE, escapechar='\\')
278         # will this fail where locale uses comma for decimals?
279         self._read_test([b',3,"5",7.3, 9'], [['', 3, '5', 7.3, 9]],
280                         quoting=csv.QUOTE_NONNUMERIC)
281         self._read_test([b'"a\nb", 7'], [['a\nb', ' 7']])
282         self.assertRaises(ValueError, self._read_test,
283                           [b'abc,3'], [[]],
284                           quoting=csv.QUOTE_NONNUMERIC)
285
286     def test_read_linenum(self):
287         for r in (csv.reader([b'line,1', b'line,2', b'line,3']),
288                   csv.DictReader([b'line,1', b'line,2', b'line,3'],
289                                  fieldnames=['a', 'b', 'c'])):
290             self.assertEqual(r.line_num, 0)
291             next(r)
292             self.assertEqual(r.line_num, 1)
293             next(r)
294             self.assertEqual(r.line_num, 2)
295             next(r)
296             self.assertEqual(r.line_num, 3)
297             self.assertRaises(StopIteration, next, r)
298             self.assertEqual(r.line_num, 3)
299
300     def test_roundtrip_quoteed_newlines(self):
301         fd, name = tempfile.mkstemp()
302         fileobj = os.fdopen(fd, "w+b")
303         try:
304             writer = csv.writer(fileobj)
305             self.assertRaises(TypeError, writer.writerows, None)
306             rows = [['a\nb', 'b'], ['c', 'x\r\nd']]
307             writer.writerows(rows)
308             fileobj.seek(0)
309             for i, row in enumerate(csv.reader(fileobj)):
310                 self.assertEqual(row, rows[i])
311         finally:
312             fileobj.close()
313             os.unlink(name)
314
315
316 class TestDialectRegistry(unittest.TestCase):
317     def test_registry_badargs(self):
318         self.assertRaises(TypeError, csv.list_dialects, None)
319         self.assertRaises(TypeError, csv.get_dialect)
320         self.assertRaises(csv.Error, csv.get_dialect, None)
321         self.assertRaises(csv.Error, csv.get_dialect, "nonesuch")
322         self.assertRaises(TypeError, csv.unregister_dialect)
323         self.assertRaises(csv.Error, csv.unregister_dialect, None)
324         self.assertRaises(csv.Error, csv.unregister_dialect, "nonesuch")
325         self.assertRaises(TypeError, csv.register_dialect, None)
326         self.assertRaises(TypeError, csv.register_dialect, None, None)
327         self.assertRaises(TypeError, csv.register_dialect, "nonesuch", 0, 0)
328         self.assertRaises(TypeError, csv.register_dialect, "nonesuch",
329                           badargument=None)
330         self.assertRaises(TypeError, csv.register_dialect, "nonesuch",
331                           quoting=None)
332         self.assertRaises(TypeError, csv.register_dialect, [])
333
334     def test_registry(self):
335         class myexceltsv(csv.excel):
336             delimiter = "\t"
337         name = "myexceltsv"
338         expected_dialects = csv.list_dialects() + [name]
339         expected_dialects.sort()
340         csv.register_dialect(name, myexceltsv)
341         try:
342             self.assertEqual(csv.get_dialect(name).delimiter, '\t')
343             got_dialects = csv.list_dialects()
344             got_dialects.sort()
345             self.assertEqual(expected_dialects, got_dialects)
346         finally:
347             csv.unregister_dialect(name)
348
349     def test_register_kwargs(self):
350         name = 'fedcba'
351         csv.register_dialect(name, delimiter=';')
352         try:
353             self.assertNotEqual(csv.get_dialect(name).delimiter, '\t')
354             self.assertEqual(list(csv.reader([b'X;Y;Z'], name)), [[u'X', u'Y', u'Z']])
355         finally:
356             csv.unregister_dialect(name)
357
358     def test_incomplete_dialect(self):
359         class myexceltsv(csv.Dialect):
360             delimiter = "\t"
361         self.assertRaises(csv.Error, myexceltsv)
362
363     def test_space_dialect(self):
364         class space(csv.excel):
365             delimiter = " "
366             quoting = csv.QUOTE_NONE
367             escapechar = "\\"
368
369         fd, name = tempfile.mkstemp()
370         fileobj = os.fdopen(fd, "w+b")
371         try:
372             fileobj.write(b"abc def\nc1ccccc1 benzene\n")
373             fileobj.seek(0)
374             rdr = csv.reader(fileobj, dialect=space())
375             self.assertEqual(next(rdr), ["abc", "def"])
376             self.assertEqual(next(rdr), ["c1ccccc1", "benzene"])
377         finally:
378             fileobj.close()
379             os.unlink(name)
380
381     def test_dialect_apply(self):
382         class testA(csv.excel):
383             delimiter = "\t"
384
385         class testB(csv.excel):
386             delimiter = ":"
387
388         class testC(csv.excel):
389             delimiter = "|"
390
391         csv.register_dialect('testC', testC)
392         try:
393             fd, name = tempfile.mkstemp()
394             fileobj = os.fdopen(fd, "w+b")
395             try:
396                 writer = csv.writer(fileobj)
397                 writer.writerow([1, 2, 3])
398                 fileobj.seek(0)
399                 self.assertEqual(fileobj.read(), b"1,2,3\r\n")
400             finally:
401                 fileobj.close()
402                 os.unlink(name)
403
404             fd, name = tempfile.mkstemp()
405             fileobj = os.fdopen(fd, "w+b")
406             try:
407                 writer = csv.writer(fileobj, testA)
408                 writer.writerow([1, 2, 3])
409                 fileobj.seek(0)
410                 self.assertEqual(fileobj.read(), b"1\t2\t3\r\n")
411             finally:
412                 fileobj.close()
413                 os.unlink(name)
414
415             fd, name = tempfile.mkstemp()
416             fileobj = os.fdopen(fd, "w+b")
417             try:
418                 writer = csv.writer(fileobj, dialect=testB())
419                 writer.writerow([1, 2, 3])
420                 fileobj.seek(0)
421                 self.assertEqual(fileobj.read(), b"1:2:3\r\n")
422             finally:
423                 fileobj.close()
424                 os.unlink(name)
425
426             fd, name = tempfile.mkstemp()
427             fileobj = os.fdopen(fd, "w+b")
428             try:
429                 writer = csv.writer(fileobj, dialect='testC')
430                 writer.writerow([1, 2, 3])
431                 fileobj.seek(0)
432                 self.assertEqual(fileobj.read(), b"1|2|3\r\n")
433             finally:
434                 fileobj.close()
435                 os.unlink(name)
436
437             fd, name = tempfile.mkstemp()
438             fileobj = os.fdopen(fd, "w+b")
439             try:
440                 writer = csv.writer(fileobj, dialect=testA, delimiter=';')
441                 writer.writerow([1, 2, 3])
442                 fileobj.seek(0)
443                 self.assertEqual(fileobj.read(), b"1;2;3\r\n")
444             finally:
445                 fileobj.close()
446                 os.unlink(name)
447
448         finally:
449             csv.unregister_dialect('testC')
450
451     def test_bad_dialect(self):
452         # Unknown parameter
453         self.assertRaises(TypeError, csv.reader, [], bad_attr=0)
454         # Bad values
455         self.assertRaises(TypeError, csv.reader, [], delimiter=None)
456         self.assertRaises(TypeError, csv.reader, [], quoting=-1)
457         self.assertRaises(TypeError, csv.reader, [], quoting=100)
458
459
460 class TestCsvBase(unittest.TestCase):
461     def readerAssertEqual(self, input, expected_result):
462         fd, name = tempfile.mkstemp()
463         fileobj = os.fdopen(fd, "w+b")
464         try:
465             fileobj.write(input)
466             fileobj.seek(0)
467             reader = csv.reader(fileobj, dialect=self.dialect)
468             fields = list(reader)
469             self.assertEqual(fields, expected_result)
470         finally:
471             fileobj.close()
472             os.unlink(name)
473
474     def writerAssertEqual(self, input, expected_result):
475         fd, name = tempfile.mkstemp()
476         fileobj = os.fdopen(fd, "w+b")
477         try:
478             writer = csv.writer(fileobj, dialect=self.dialect)
479             writer.writerows(input)
480             fileobj.seek(0)
481             self.assertEqual(fileobj.read(), expected_result)
482         finally:
483             fileobj.close()
484             os.unlink(name)
485
486
487 class TestDialectExcel(TestCsvBase):
488     dialect = 'excel'
489
490     def test_single(self):
491         self.readerAssertEqual(b'abc', [['abc']])
492
493     def test_simple(self):
494         self.readerAssertEqual(b'1,2,3,4,5', [['1', '2', '3', '4', '5']])
495
496     def test_blankline(self):
497         self.readerAssertEqual(b'', [])
498
499     def test_empty_fields(self):
500         self.readerAssertEqual(b',', [['', '']])
501
502     def test_singlequoted(self):
503         self.readerAssertEqual(b'""', [['']])
504
505     def test_singlequoted_left_empty(self):
506         self.readerAssertEqual(b'"",', [['', '']])
507
508     def test_singlequoted_right_empty(self):
509         self.readerAssertEqual(b',""', [['', '']])
510
511     def test_single_quoted_quote(self):
512         self.readerAssertEqual(b'""""', [['"']])
513
514     def test_quoted_quotes(self):
515         self.readerAssertEqual(b'""""""', [['""']])
516
517     def test_inline_quote(self):
518         self.readerAssertEqual(b'a""b', [['a""b']])
519
520     def test_inline_quotes(self):
521         self.readerAssertEqual(b'a"b"c', [['a"b"c']])
522
523     def test_quotes_and_more(self):
524         # Excel would never write a field containing '"a"b', but when
525         # reading one, it will return 'ab'.
526         self.readerAssertEqual(b'"a"b', [['ab']])
527
528     def test_lone_quote(self):
529         self.readerAssertEqual(b'a"b', [['a"b']])
530
531     def test_quote_and_quote(self):
532         # Excel would never write a field containing '"a" "b"', but when
533         # reading one, it will return 'a "b"'.
534         self.readerAssertEqual(b'"a" "b"', [['a "b"']])
535
536     def test_space_and_quote(self):
537         self.readerAssertEqual(b' "a"', [[' "a"']])
538
539     def test_quoted(self):
540         self.readerAssertEqual(b'1,2,3,"I think, therefore I am",5,6',
541                                [['1', '2', '3',
542                                  'I think, therefore I am',
543                                  '5', '6']])
544
545     def test_quoted_quote(self):
546         value = b'1,2,3,"""I see,"" said the blind man","as he picked up his hammer and saw"'
547         self.readerAssertEqual(value,
548                                [['1', '2', '3',
549                                  '"I see," said the blind man',
550                                  'as he picked up his hammer and saw']])
551
552     def test_quoted_nl(self):
553         input = b'''\
554 1,2,3,"""I see,""
555 said the blind man","as he picked up his
556 hammer and saw"
557 9,8,7,6'''
558         self.readerAssertEqual(input,
559                                [['1', '2', '3',
560                                    '"I see,"\nsaid the blind man',
561                                    'as he picked up his\nhammer and saw'],
562                                 ['9', '8', '7', '6']])
563
564     def test_dubious_quote(self):
565         self.readerAssertEqual(b'12,12,1",', [['12', '12', '1"', '']])
566
567     def test_null(self):
568         self.writerAssertEqual([], b'')
569
570     def test_single_writer(self):
571         self.writerAssertEqual([['abc']], b'abc\r\n')
572
573     def test_simple_writer(self):
574         self.writerAssertEqual([[1, 2, 'abc', 3, 4]],
575                                b'1,2,abc,3,4\r\n')
576
577     def test_quotes(self):
578         self.writerAssertEqual([[1, 2, 'a"bc"', 3, 4]],
579                                b'1,2,"a""bc""",3,4\r\n')
580
581     def test_quote_fieldsep(self):
582         self.writerAssertEqual([['abc,def']],
583                                b'"abc,def"\r\n')
584
585     def test_newlines(self):
586         self.writerAssertEqual([[1, 2, 'a\nbc', 3, 4]],
587                                b'1,2,"a\nbc",3,4\r\n')
588
589
590 class EscapedExcel(csv.excel):
591     quoting = csv.QUOTE_NONE
592     escapechar = '\\'
593
594
595 class TestEscapedExcel(TestCsvBase):
596     dialect = EscapedExcel()
597
598     def test_escape_fieldsep(self):
599         self.writerAssertEqual([['abc,def']], b'abc\\,def\r\n')
600
601     def test_read_escape_fieldsep(self):
602         self.readerAssertEqual(b'abc\\,def\r\n', [['abc,def']])
603
604
605 class QuotedEscapedExcel(csv.excel):
606     quoting = csv.QUOTE_NONNUMERIC
607     escapechar = '\\'
608
609
610 class TestQuotedEscapedExcel(TestCsvBase):
611     dialect = QuotedEscapedExcel()
612
613     def test_write_escape_fieldsep(self):
614         self.writerAssertEqual([['abc,def']], b'"abc,def"\r\n')
615
616     def test_read_escape_fieldsep(self):
617         self.readerAssertEqual(b'"abc\\,def"\r\n', [['abc,def']])
618
619
620 class TestDictFields(unittest.TestCase):
621     # "long" means the row is longer than the number of fieldnames
622     # "short" means there are fewer elements in the row than fieldnames
623     def test_write_simple_dict(self):
624         fd, name = tempfile.mkstemp()
625         fileobj = open(name, 'w+b')
626         try:
627             writer = csv.DictWriter(fileobj, fieldnames=["f1", "f2", "f3"])
628             writer.writeheader()
629             fileobj.seek(0)
630             self.assertEqual(fileobj.readline(), b"f1,f2,f3\r\n")
631             writer.writerow({"f1": 10, "f3": "abc"})
632             fileobj.seek(0)
633             fileobj.readline()  # header
634             self.assertEqual(fileobj.read(), b"10,,abc\r\n")
635         finally:
636             fileobj.close()
637             os.unlink(name)
638
639     def test_write_unicode_header_dict(self):
640         fd, name = tempfile.mkstemp()
641         fileobj = open(name, 'w+b')
642         try:
643             writer = csv.DictWriter(fileobj, fieldnames=[u"ñ", u"ö"])
644             writer.writeheader()
645             fileobj.seek(0)
646             self.assertEqual(fileobj.readline().decode('utf-8'), u"ñ,ö\r\n")
647         finally:
648             fileobj.close()
649             os.unlink(name)
650
651     def test_write_no_fields(self):
652         fileobj = BytesIO()
653         self.assertRaises(TypeError, csv.DictWriter, fileobj)
654
655     def test_read_dict_fields(self):
656         fd, name = tempfile.mkstemp()
657         fileobj = os.fdopen(fd, "w+b")
658         try:
659             fileobj.write(b"1,2,abc\r\n")
660             fileobj.seek(0)
661             reader = csv.DictReader(fileobj,
662                                     fieldnames=["f1", "f2", "f3"])
663             self.assertEqual(next(reader),
664                              {"f1": '1', "f2": '2', "f3": 'abc'})
665         finally:
666             fileobj.close()
667             os.unlink(name)
668
669     def test_read_dict_no_fieldnames(self):
670         fd, name = tempfile.mkstemp()
671         fileobj = os.fdopen(fd, "w+b")
672         try:
673             fileobj.write(b"f1,f2,f3\r\n1,2,abc\r\n")
674             fileobj.seek(0)
675             reader = csv.DictReader(fileobj)
676             self.assertEqual(reader.fieldnames,
677                              ["f1", "f2", "f3"])
678             self.assertEqual(next(reader),
679                              {"f1": '1', "f2": '2', "f3": 'abc'})
680         finally:
681             fileobj.close()
682             os.unlink(name)
683
684     # Two test cases to make sure existing ways of implicitly setting
685     # fieldnames continue to work.  Both arise from discussion in issue3436.
686     def test_read_dict_fieldnames_from_file(self):
687         fd, name = tempfile.mkstemp()
688         f = os.fdopen(fd, "w+b")
689         try:
690             f.write(b"f1,f2,f3\r\n1,2,abc\r\n")
691             f.seek(0)
692             reader = csv.DictReader(f, fieldnames=next(csv.reader(f)))
693             self.assertEqual(reader.fieldnames,
694                              ["f1", "f2", "f3"])
695             self.assertEqual(next(reader),
696                              {"f1": '1', "f2": '2', "f3": 'abc'})
697         finally:
698             f.close()
699             os.unlink(name)
700
701     def test_read_dict_fieldnames_chain(self):
702         import itertools
703         fd, name = tempfile.mkstemp()
704         f = os.fdopen(fd, "w+b")
705         try:
706             f.write(b"f1,f2,f3\r\n1,2,abc\r\n")
707             f.seek(0)
708             reader = csv.DictReader(f)
709             first = next(reader)
710             for row in itertools.chain([first], reader):
711                 self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"])
712                 self.assertEqual(row, {"f1": '1', "f2": '2', "f3": 'abc'})
713         finally:
714             f.close()
715             os.unlink(name)
716
717     def test_read_long(self):
718         fd, name = tempfile.mkstemp()
719         fileobj = os.fdopen(fd, "w+b")
720         try:
721             fileobj.write(b"1,2,abc,4,5,6\r\n")
722             fileobj.seek(0)
723             reader = csv.DictReader(fileobj,
724                                     fieldnames=["f1", "f2"])
725             self.assertEqual(next(reader), {"f1": '1', "f2": '2',
726                                              None: ["abc", "4", "5", "6"]})
727         finally:
728             fileobj.close()
729             os.unlink(name)
730
731     def test_read_long_with_rest(self):
732         fd, name = tempfile.mkstemp()
733         fileobj = os.fdopen(fd, "w+b")
734         try:
735             fileobj.write(b"1,2,abc,4,5,6\r\n")
736             fileobj.seek(0)
737             reader = csv.DictReader(fileobj,
738                                     fieldnames=["f1", "f2"], restkey="_rest")
739             self.assertEqual(next(reader), {"f1": '1', "f2": '2',
740                                              "_rest": ["abc", "4", "5", "6"]})
741         finally:
742             fileobj.close()
743             os.unlink(name)
744
745     def test_read_long_with_rest_no_fieldnames(self):
746         fd, name = tempfile.mkstemp()
747         fileobj = os.fdopen(fd, "w+b")
748         try:
749             fileobj.write(b"f1,f2\r\n1,2,abc,4,5,6\r\n")
750             fileobj.seek(0)
751             reader = csv.DictReader(fileobj, restkey="_rest")
752             self.assertEqual(reader.fieldnames, ["f1", "f2"])
753             self.assertEqual(next(reader), {"f1": '1', "f2": '2',
754                                              "_rest": ["abc", "4", "5", "6"]})
755         finally:
756             fileobj.close()
757             os.unlink(name)
758
759     def test_read_short(self):
760         fd, name = tempfile.mkstemp()
761         fileobj = os.fdopen(fd, "w+b")
762         try:
763             fileobj.write(b"1,2,abc,4,5,6\r\n1,2,abc\r\n")
764             fileobj.seek(0)
765             reader = csv.DictReader(fileobj,
766                                     fieldnames="1 2 3 4 5 6".split(),
767                                     restval="DEFAULT")
768             self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc',
769                                              "4": '4', "5": '5', "6": '6'})
770             self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc',
771                                              "4": 'DEFAULT', "5": 'DEFAULT',
772                                              "6": 'DEFAULT'})
773         finally:
774             fileobj.close()
775             os.unlink(name)
776
777     def test_read_multi(self):
778         sample = [
779             b'2147483648,43.0e12,17,abc,def\r\n',
780             b'147483648,43.0e2,17,abc,def\r\n',
781             b'47483648,43.0,170,abc,def\r\n'
782             ]
783
784         reader = csv.DictReader(sample,
785                                 fieldnames="i1 float i2 s1 s2".split())
786         self.assertEqual(next(reader), {"i1": '2147483648',
787                                          "float": '43.0e12',
788                                          "i2": '17',
789                                          "s1": 'abc',
790                                          "s2": 'def'})
791
792     def test_read_with_blanks(self):
793         reader = csv.DictReader([b"1,2,abc,4,5,6\r\n", b"\r\n",
794                                  b"1,2,abc,4,5,6\r\n"],
795                                 fieldnames="1 2 3 4 5 6".split())
796         self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc',
797                                          "4": '4', "5": '5', "6": '6'})
798         self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc',
799                                          "4": '4', "5": '5', "6": '6'})
800
801     def test_read_semi_sep(self):
802         reader = csv.DictReader([b"1;2;abc;4;5;6\r\n"],
803                                 fieldnames="1 2 3 4 5 6".split(),
804                                 delimiter=';')
805         self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc',
806                                          "4": '4', "5": '5', "6": '6'})
807
808     def test_empty_file(self):
809         csv.DictReader(BytesIO())
810
811 class TestArrayWrites(unittest.TestCase):
812     def test_int_write(self):
813         contents = [(20-i) for i in range(20)]
814         a = array.array('i', contents)
815
816         fd, name = tempfile.mkstemp()
817         fileobj = os.fdopen(fd, "w+b")
818         try:
819             writer = csv.writer(fileobj, dialect="excel")
820             writer.writerow(a)
821             expected = b",".join([str(i).encode('utf-8') for i in a])+b"\r\n"
822             fileobj.seek(0)
823             self.assertEqual(fileobj.read(), expected)
824         finally:
825             fileobj.close()
826             os.unlink(name)
827
828     def test_double_write(self):
829         contents = [(20-i)*0.1 for i in range(20)]
830         a = array.array('d', contents)
831         fd, name = tempfile.mkstemp()
832         fileobj = os.fdopen(fd, "w+b")
833         try:
834             writer = csv.writer(fileobj, dialect="excel")
835             writer.writerow(a)
836             float_repr = str
837             if sys.version_info >= (2, 7, 3):
838                 float_repr = repr
839             expected = b",".join([float_repr(i).encode('utf-8') for i in a])+b"\r\n"
840             fileobj.seek(0)
841             self.assertEqual(fileobj.read(), expected)
842         finally:
843             fileobj.close()
844             os.unlink(name)
845
846     def test_float_write(self):
847         contents = [(20-i)*0.1 for i in range(20)]
848         a = array.array('f', contents)
849         fd, name = tempfile.mkstemp()
850         fileobj = os.fdopen(fd, "w+b")
851         try:
852             writer = csv.writer(fileobj, dialect="excel")
853             writer.writerow(a)
854             float_repr = str
855             if sys.version_info >= (2, 7, 3):
856                 float_repr = repr
857             expected = b",".join([float_repr(i).encode('utf-8') for i in a])+b"\r\n"
858             fileobj.seek(0)
859             self.assertEqual(fileobj.read(), expected)
860         finally:
861             fileobj.close()
862             os.unlink(name)
863
864     def test_char_write(self):
865         a = string.ascii_letters
866         fd, name = tempfile.mkstemp()
867         fileobj = os.fdopen(fd, "w+b")
868         try:
869             writer = csv.writer(fileobj, dialect="excel")
870             writer.writerow(a)
871             expected = ",".join(a).encode('utf-8')+b"\r\n"
872             fileobj.seek(0)
873             self.assertEqual(fileobj.read(), expected)
874         finally:
875             fileobj.close()
876             os.unlink(name)
877
878
879 class TestUnicode(unittest.TestCase):
880     def test_unicode_read(self):
881         f = EncodedFile(BytesIO((u"Martin von Löwis,"
882                                  u"Marc André Lemburg,"
883                                  u"Guido van Rossum,"
884                                  u"François Pinard\r\n").encode('iso-8859-1')),
885                         data_encoding='iso-8859-1')
886         reader = csv.reader(f, encoding='iso-8859-1')
887         self.assertEqual(list(reader), [[u"Martin von Löwis",
888                                          u"Marc André Lemburg",
889                                          u"Guido van Rossum",
890                                          u"François Pinard"]])
891
892
893 class TestUnicodeErrors(unittest.TestCase):
894     def test_encode_error(self):
895         fd = BytesIO()
896         writer = csv.writer(fd, encoding='cp1252', errors='xmlcharrefreplace')
897         writer.writerow(['hello', chr(2603)])
898         self.assertEqual(fd.getvalue(), b'hello,ਫ\r\n')
899
900     def test_encode_error_dictwriter(self):
901         fd = BytesIO()
902         dw = csv.DictWriter(fd, ['col1'],
903                             encoding='cp1252', errors='xmlcharrefreplace')
904         dw.writerow({'col1': chr(2604)})
905         self.assertEqual(fd.getvalue(), b'ਬ\r\n')
906
907     def test_decode_error(self):
908         """Make sure the specified error-handling mode is obeyed on readers."""
909         file = EncodedFile(BytesIO(u'Löwis,2,3'.encode('iso-8859-1')),
910                            data_encoding='iso-8859-1')
911         reader = csv.reader(file, encoding='ascii', errors='ignore')
912         self.assertEqual(list(reader)[0][0], 'Lwis')
913
914     def test_decode_error_dictreader(self):
915         """Make sure the error-handling mode is obeyed on DictReaders."""
916         file = EncodedFile(BytesIO(u'name,height,weight\nLöwis,2,3'.encode('iso-8859-1')),
917                            data_encoding='iso-8859-1')
918         reader = csv.DictReader(file, encoding='ascii', errors='ignore')
919         self.assertEqual(list(reader)[0]['name'], 'Lwis')