1 # -*- coding: utf-8 -*-
2 # Copyright (C) 2001,2002 Python Software Foundation
3 # csv package unit tests
11 import unittest2 as unittest
12 from codecs import EncodedFile
13 from io import BytesIO
15 import unicodecsv as csv
24 # pypy and cpython differ under which exception is raised under some
25 # circumstances e.g. whether a module is written in C or not.
26 py_compat_exc = (TypeError, AttributeError)
29 class Test_Csv(unittest.TestCase):
31 Test the underlying C csv parser in ways that are not appropriate
32 from the high level interface. Further tests of this nature are done
33 in TestDialectRegistry.
35 def _test_arg_valid(self, ctor, arg):
36 self.assertRaises(py_compat_exc, ctor)
37 self.assertRaises(py_compat_exc, ctor, None)
38 self.assertRaises(py_compat_exc, ctor, arg, bad_attr=0)
39 self.assertRaises(py_compat_exc, ctor, arg, delimiter=0)
40 self.assertRaises(py_compat_exc, ctor, arg, delimiter='XX')
41 self.assertRaises(csv.Error, ctor, arg, 'foo')
42 self.assertRaises(py_compat_exc, ctor, arg, delimiter=None)
43 self.assertRaises(py_compat_exc, ctor, arg, delimiter=1)
44 self.assertRaises(py_compat_exc, ctor, arg, quotechar=1)
45 self.assertRaises(py_compat_exc, ctor, arg, lineterminator=None)
46 self.assertRaises(py_compat_exc, ctor, arg, lineterminator=1)
47 self.assertRaises(py_compat_exc, ctor, arg, quoting=None)
48 self.assertRaises(py_compat_exc, ctor, arg,
49 quoting=csv.QUOTE_ALL, quotechar='')
50 self.assertRaises(py_compat_exc, ctor, arg,
51 quoting=csv.QUOTE_ALL, quotechar=None)
53 def test_reader_arg_valid(self):
54 self._test_arg_valid(csv.reader, [])
56 def test_writer_arg_valid(self):
57 self._test_arg_valid(csv.writer, BytesIO())
59 def _test_default_attrs(self, ctor, *args):
62 self.assertEqual(obj.dialect.delimiter, ',')
63 self.assertEqual(obj.dialect.doublequote, True)
64 self.assertEqual(obj.dialect.escapechar, None)
65 self.assertEqual(obj.dialect.lineterminator, "\r\n")
66 self.assertEqual(obj.dialect.quotechar, '"')
67 self.assertEqual(obj.dialect.quoting, csv.QUOTE_MINIMAL)
68 self.assertEqual(obj.dialect.skipinitialspace, False)
69 self.assertEqual(obj.dialect.strict, False)
70 # Try deleting or changing attributes (they are read-only)
71 self.assertRaises(py_compat_exc, delattr,
72 obj.dialect, 'delimiter')
73 self.assertRaises(py_compat_exc, setattr,
74 obj.dialect, 'delimiter', ':')
75 self.assertRaises(py_compat_exc, delattr,
76 obj.dialect, 'quoting')
77 self.assertRaises(py_compat_exc, setattr,
78 obj.dialect, 'quoting', None)
80 def test_reader_attrs(self):
81 self._test_default_attrs(csv.reader, [])
83 def test_writer_attrs(self):
84 self._test_default_attrs(csv.writer, BytesIO())
86 def _test_kw_attrs(self, ctor, *args):
87 # Now try with alternate options
88 kwargs = dict(delimiter=':', doublequote=False, escapechar='\\',
89 lineterminator='\r', quotechar='*',
90 quoting=csv.QUOTE_NONE, skipinitialspace=True,
92 obj = ctor(*args, **kwargs)
93 self.assertEqual(obj.dialect.delimiter, ':')
94 self.assertEqual(obj.dialect.doublequote, False)
95 self.assertEqual(obj.dialect.escapechar, '\\')
96 self.assertEqual(obj.dialect.lineterminator, "\r")
97 self.assertEqual(obj.dialect.quotechar, '*')
98 self.assertEqual(obj.dialect.quoting, csv.QUOTE_NONE)
99 self.assertEqual(obj.dialect.skipinitialspace, True)
100 self.assertEqual(obj.dialect.strict, True)
102 def test_reader_kw_attrs(self):
103 self._test_kw_attrs(csv.reader, [])
105 def test_writer_kw_attrs(self):
106 self._test_kw_attrs(csv.writer, BytesIO())
108 def _test_dialect_attrs(self, ctor, *args):
109 # Now try with dialect-derived options
116 quoting = csv.QUOTE_ALL
117 skipinitialspace = True
119 args = args + (dialect,)
121 self.assertEqual(obj.dialect.delimiter, '-')
122 self.assertEqual(obj.dialect.doublequote, False)
123 self.assertEqual(obj.dialect.escapechar, '^')
124 self.assertEqual(obj.dialect.lineterminator, "$")
125 self.assertEqual(obj.dialect.quotechar, '#')
126 self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)
127 self.assertEqual(obj.dialect.skipinitialspace, True)
128 self.assertEqual(obj.dialect.strict, False)
130 def test_reader_dialect_attrs(self):
131 self._test_dialect_attrs(csv.reader, [])
133 def test_writer_dialect_attrs(self):
134 self._test_dialect_attrs(csv.writer, BytesIO())
136 def _write_test(self, fields, expect, **kwargs):
137 fd, name = tempfile.mkstemp()
138 fileobj = os.fdopen(fd, "w+b")
140 writer = csv.writer(fileobj, **kwargs)
141 writer.writerow(fields)
143 self.assertEqual(fileobj.read(),
144 expect + writer.dialect.lineterminator.encode('utf-8'))
149 def test_write_arg_valid(self):
151 pypy3 = hasattr(sys, 'pypy_version_info') and sys.version_info.major == 3
153 self.assertRaises(TypeError if pypy3 else csv.Error, self._write_test, None, '')
154 self._write_test((), b'')
155 self._write_test([None], b'""')
156 self.assertRaises(csv.Error, self._write_test,
157 [None], None, quoting=csv.QUOTE_NONE)
159 # Check that exceptions are passed up the chain
164 def __getitem__(self, i):
168 self.assertRaises(IOError, self._write_test, BadList(), '')
174 self.assertRaises(IOError, self._write_test, [BadItem()], '')
176 def test_write_bigfield(self):
177 # This exercises the buffer realloc functionality
178 bigstring = 'X' * 50000
179 self._write_test([bigstring, bigstring],
180 b','.join([bigstring.encode('utf-8')] * 2))
182 def test_write_quoting(self):
183 self._write_test(['a', 1, 'p,q'], b'a,1,"p,q"')
184 self.assertRaises(csv.Error,
186 ['a', 1, 'p,q'], b'a,1,p,q',
187 quoting=csv.QUOTE_NONE)
188 self._write_test(['a', 1, 'p,q'], b'a,1,"p,q"',
189 quoting=csv.QUOTE_MINIMAL)
190 self._write_test(['a', 1, 'p,q'], b'"a",1,"p,q"',
191 quoting=csv.QUOTE_NONNUMERIC)
192 self._write_test(['a', 1, 'p,q'], b'"a","1","p,q"',
193 quoting=csv.QUOTE_ALL)
194 self._write_test(['a\nb', 1], b'"a\nb","1"',
195 quoting=csv.QUOTE_ALL)
197 def test_write_decimal(self):
198 self._write_test(['a', decimal.Decimal("1.1"), 'p,q'], b'"a",1.1,"p,q"',
199 quoting=csv.QUOTE_NONNUMERIC)
201 def test_write_escape(self):
202 self._write_test(['a', 1, 'p,q'], b'a,1,"p,q"',
204 self.assertRaises(csv.Error,
206 ['a', 1, 'p,"q"'], b'a,1,"p,\\"q\\""',
207 escapechar=None, doublequote=False)
208 self._write_test(['a', 1, 'p,"q"'], b'a,1,"p,\\"q\\""',
209 escapechar='\\', doublequote=False)
210 self._write_test(['"'], b'""""',
211 escapechar='\\', quoting=csv.QUOTE_MINIMAL)
212 self._write_test(['"'], b'\\"',
213 escapechar='\\', quoting=csv.QUOTE_MINIMAL,
215 self._write_test(['"'], b'\\"',
216 escapechar='\\', quoting=csv.QUOTE_NONE)
217 self._write_test(['a', 1, 'p,q'], b'a,1,p\\,q',
218 escapechar='\\', quoting=csv.QUOTE_NONE)
220 def test_writerows(self):
222 def write(self, buf):
225 writer = csv.writer(BrokenFile())
226 self.assertRaises(IOError, writer.writerows, [['a']])
228 fd, name = tempfile.mkstemp()
229 fileobj = os.fdopen(fd, "w+b")
231 writer = csv.writer(fileobj)
232 self.assertRaises(TypeError, writer.writerows, None)
233 writer.writerows([['a', 'b'], ['c', 'd']])
235 self.assertEqual(fileobj.read(), b"a,b\r\nc,d\r\n")
240 def _read_test(self, input, expect, **kwargs):
241 reader = csv.reader(input, **kwargs)
242 result = list(reader)
243 self.assertEqual(result, expect)
245 def test_read_oddinputs(self):
246 self._read_test([], [])
247 self._read_test([b''], [[]])
248 self.assertRaises(csv.Error, self._read_test,
249 [b'"ab"c'], None, strict=1)
250 # cannot handle null bytes for the moment
251 self.assertRaises(csv.Error, self._read_test,
252 [b'ab\0c'], None, strict=1)
253 self._read_test([b'"ab"c'], [['abc']], doublequote=0)
255 def test_read_eol(self):
256 self._read_test([b'a,b'], [['a', 'b']])
257 self._read_test([b'a,b\n'], [['a', 'b']])
258 self._read_test([b'a,b\r\n'], [['a', 'b']])
259 self._read_test([b'a,b\r'], [['a', 'b']])
260 self.assertRaises(csv.Error, self._read_test, [b'a,b\rc,d'], [])
261 self.assertRaises(csv.Error, self._read_test, [b'a,b\nc,d'], [])
262 self.assertRaises(csv.Error, self._read_test, [b'a,b\r\nc,d'], [])
264 def test_read_escape(self):
265 self._read_test([b'a,\\b,c'], [['a', 'b', 'c']], escapechar='\\')
266 self._read_test([b'a,b\\,c'], [['a', 'b,c']], escapechar='\\')
267 self._read_test([b'a,"b\\,c"'], [['a', 'b,c']], escapechar='\\')
268 self._read_test([b'a,"b,\\c"'], [['a', 'b,c']], escapechar='\\')
269 self._read_test([b'a,"b,c\\""'], [['a', 'b,c"']], escapechar='\\')
270 self._read_test([b'a,"b,c"\\'], [['a', 'b,c\\']], escapechar='\\')
272 def test_read_quoting(self):
273 self._read_test([b'1,",3,",5'], [['1', ',3,', '5']])
274 self._read_test([b'1,",3,",5'], [['1', '"', '3', '"', '5']],
275 quotechar=None, escapechar='\\')
276 self._read_test([b'1,",3,",5'], [['1', '"', '3', '"', '5']],
277 quoting=csv.QUOTE_NONE, escapechar='\\')
278 # will this fail where locale uses comma for decimals?
279 self._read_test([b',3,"5",7.3, 9'], [['', 3, '5', 7.3, 9]],
280 quoting=csv.QUOTE_NONNUMERIC)
281 self._read_test([b'"a\nb", 7'], [['a\nb', ' 7']])
282 self.assertRaises(ValueError, self._read_test,
284 quoting=csv.QUOTE_NONNUMERIC)
286 def test_read_linenum(self):
287 for r in (csv.reader([b'line,1', b'line,2', b'line,3']),
288 csv.DictReader([b'line,1', b'line,2', b'line,3'],
289 fieldnames=['a', 'b', 'c'])):
290 self.assertEqual(r.line_num, 0)
292 self.assertEqual(r.line_num, 1)
294 self.assertEqual(r.line_num, 2)
296 self.assertEqual(r.line_num, 3)
297 self.assertRaises(StopIteration, next, r)
298 self.assertEqual(r.line_num, 3)
300 def test_roundtrip_quoteed_newlines(self):
301 fd, name = tempfile.mkstemp()
302 fileobj = os.fdopen(fd, "w+b")
304 writer = csv.writer(fileobj)
305 self.assertRaises(TypeError, writer.writerows, None)
306 rows = [['a\nb', 'b'], ['c', 'x\r\nd']]
307 writer.writerows(rows)
309 for i, row in enumerate(csv.reader(fileobj)):
310 self.assertEqual(row, rows[i])
316 class TestDialectRegistry(unittest.TestCase):
317 def test_registry_badargs(self):
318 self.assertRaises(TypeError, csv.list_dialects, None)
319 self.assertRaises(TypeError, csv.get_dialect)
320 self.assertRaises(csv.Error, csv.get_dialect, None)
321 self.assertRaises(csv.Error, csv.get_dialect, "nonesuch")
322 self.assertRaises(TypeError, csv.unregister_dialect)
323 self.assertRaises(csv.Error, csv.unregister_dialect, None)
324 self.assertRaises(csv.Error, csv.unregister_dialect, "nonesuch")
325 self.assertRaises(TypeError, csv.register_dialect, None)
326 self.assertRaises(TypeError, csv.register_dialect, None, None)
327 self.assertRaises(TypeError, csv.register_dialect, "nonesuch", 0, 0)
328 self.assertRaises(TypeError, csv.register_dialect, "nonesuch",
330 self.assertRaises(TypeError, csv.register_dialect, "nonesuch",
332 self.assertRaises(TypeError, csv.register_dialect, [])
334 def test_registry(self):
335 class myexceltsv(csv.excel):
338 expected_dialects = csv.list_dialects() + [name]
339 expected_dialects.sort()
340 csv.register_dialect(name, myexceltsv)
342 self.assertEqual(csv.get_dialect(name).delimiter, '\t')
343 got_dialects = csv.list_dialects()
345 self.assertEqual(expected_dialects, got_dialects)
347 csv.unregister_dialect(name)
349 def test_register_kwargs(self):
351 csv.register_dialect(name, delimiter=';')
353 self.assertNotEqual(csv.get_dialect(name).delimiter, '\t')
354 self.assertEqual(list(csv.reader([b'X;Y;Z'], name)), [[u'X', u'Y', u'Z']])
356 csv.unregister_dialect(name)
358 def test_incomplete_dialect(self):
359 class myexceltsv(csv.Dialect):
361 self.assertRaises(csv.Error, myexceltsv)
363 def test_space_dialect(self):
364 class space(csv.excel):
366 quoting = csv.QUOTE_NONE
369 fd, name = tempfile.mkstemp()
370 fileobj = os.fdopen(fd, "w+b")
372 fileobj.write(b"abc def\nc1ccccc1 benzene\n")
374 rdr = csv.reader(fileobj, dialect=space())
375 self.assertEqual(next(rdr), ["abc", "def"])
376 self.assertEqual(next(rdr), ["c1ccccc1", "benzene"])
381 def test_dialect_apply(self):
382 class testA(csv.excel):
385 class testB(csv.excel):
388 class testC(csv.excel):
391 csv.register_dialect('testC', testC)
393 fd, name = tempfile.mkstemp()
394 fileobj = os.fdopen(fd, "w+b")
396 writer = csv.writer(fileobj)
397 writer.writerow([1, 2, 3])
399 self.assertEqual(fileobj.read(), b"1,2,3\r\n")
404 fd, name = tempfile.mkstemp()
405 fileobj = os.fdopen(fd, "w+b")
407 writer = csv.writer(fileobj, testA)
408 writer.writerow([1, 2, 3])
410 self.assertEqual(fileobj.read(), b"1\t2\t3\r\n")
415 fd, name = tempfile.mkstemp()
416 fileobj = os.fdopen(fd, "w+b")
418 writer = csv.writer(fileobj, dialect=testB())
419 writer.writerow([1, 2, 3])
421 self.assertEqual(fileobj.read(), b"1:2:3\r\n")
426 fd, name = tempfile.mkstemp()
427 fileobj = os.fdopen(fd, "w+b")
429 writer = csv.writer(fileobj, dialect='testC')
430 writer.writerow([1, 2, 3])
432 self.assertEqual(fileobj.read(), b"1|2|3\r\n")
437 fd, name = tempfile.mkstemp()
438 fileobj = os.fdopen(fd, "w+b")
440 writer = csv.writer(fileobj, dialect=testA, delimiter=';')
441 writer.writerow([1, 2, 3])
443 self.assertEqual(fileobj.read(), b"1;2;3\r\n")
449 csv.unregister_dialect('testC')
451 def test_bad_dialect(self):
453 self.assertRaises(TypeError, csv.reader, [], bad_attr=0)
455 self.assertRaises(TypeError, csv.reader, [], delimiter=None)
456 self.assertRaises(TypeError, csv.reader, [], quoting=-1)
457 self.assertRaises(TypeError, csv.reader, [], quoting=100)
460 class TestCsvBase(unittest.TestCase):
461 def readerAssertEqual(self, input, expected_result):
462 fd, name = tempfile.mkstemp()
463 fileobj = os.fdopen(fd, "w+b")
467 reader = csv.reader(fileobj, dialect=self.dialect)
468 fields = list(reader)
469 self.assertEqual(fields, expected_result)
474 def writerAssertEqual(self, input, expected_result):
475 fd, name = tempfile.mkstemp()
476 fileobj = os.fdopen(fd, "w+b")
478 writer = csv.writer(fileobj, dialect=self.dialect)
479 writer.writerows(input)
481 self.assertEqual(fileobj.read(), expected_result)
487 class TestDialectExcel(TestCsvBase):
490 def test_single(self):
491 self.readerAssertEqual(b'abc', [['abc']])
493 def test_simple(self):
494 self.readerAssertEqual(b'1,2,3,4,5', [['1', '2', '3', '4', '5']])
496 def test_blankline(self):
497 self.readerAssertEqual(b'', [])
499 def test_empty_fields(self):
500 self.readerAssertEqual(b',', [['', '']])
502 def test_singlequoted(self):
503 self.readerAssertEqual(b'""', [['']])
505 def test_singlequoted_left_empty(self):
506 self.readerAssertEqual(b'"",', [['', '']])
508 def test_singlequoted_right_empty(self):
509 self.readerAssertEqual(b',""', [['', '']])
511 def test_single_quoted_quote(self):
512 self.readerAssertEqual(b'""""', [['"']])
514 def test_quoted_quotes(self):
515 self.readerAssertEqual(b'""""""', [['""']])
517 def test_inline_quote(self):
518 self.readerAssertEqual(b'a""b', [['a""b']])
520 def test_inline_quotes(self):
521 self.readerAssertEqual(b'a"b"c', [['a"b"c']])
523 def test_quotes_and_more(self):
524 # Excel would never write a field containing '"a"b', but when
525 # reading one, it will return 'ab'.
526 self.readerAssertEqual(b'"a"b', [['ab']])
528 def test_lone_quote(self):
529 self.readerAssertEqual(b'a"b', [['a"b']])
531 def test_quote_and_quote(self):
532 # Excel would never write a field containing '"a" "b"', but when
533 # reading one, it will return 'a "b"'.
534 self.readerAssertEqual(b'"a" "b"', [['a "b"']])
536 def test_space_and_quote(self):
537 self.readerAssertEqual(b' "a"', [[' "a"']])
539 def test_quoted(self):
540 self.readerAssertEqual(b'1,2,3,"I think, therefore I am",5,6',
542 'I think, therefore I am',
545 def test_quoted_quote(self):
546 value = b'1,2,3,"""I see,"" said the blind man","as he picked up his hammer and saw"'
547 self.readerAssertEqual(value,
549 '"I see," said the blind man',
550 'as he picked up his hammer and saw']])
552 def test_quoted_nl(self):
555 said the blind man","as he picked up his
558 self.readerAssertEqual(input,
560 '"I see,"\nsaid the blind man',
561 'as he picked up his\nhammer and saw'],
562 ['9', '8', '7', '6']])
564 def test_dubious_quote(self):
565 self.readerAssertEqual(b'12,12,1",', [['12', '12', '1"', '']])
568 self.writerAssertEqual([], b'')
570 def test_single_writer(self):
571 self.writerAssertEqual([['abc']], b'abc\r\n')
573 def test_simple_writer(self):
574 self.writerAssertEqual([[1, 2, 'abc', 3, 4]],
577 def test_quotes(self):
578 self.writerAssertEqual([[1, 2, 'a"bc"', 3, 4]],
579 b'1,2,"a""bc""",3,4\r\n')
581 def test_quote_fieldsep(self):
582 self.writerAssertEqual([['abc,def']],
585 def test_newlines(self):
586 self.writerAssertEqual([[1, 2, 'a\nbc', 3, 4]],
587 b'1,2,"a\nbc",3,4\r\n')
590 class EscapedExcel(csv.excel):
591 quoting = csv.QUOTE_NONE
595 class TestEscapedExcel(TestCsvBase):
596 dialect = EscapedExcel()
598 def test_escape_fieldsep(self):
599 self.writerAssertEqual([['abc,def']], b'abc\\,def\r\n')
601 def test_read_escape_fieldsep(self):
602 self.readerAssertEqual(b'abc\\,def\r\n', [['abc,def']])
605 class QuotedEscapedExcel(csv.excel):
606 quoting = csv.QUOTE_NONNUMERIC
610 class TestQuotedEscapedExcel(TestCsvBase):
611 dialect = QuotedEscapedExcel()
613 def test_write_escape_fieldsep(self):
614 self.writerAssertEqual([['abc,def']], b'"abc,def"\r\n')
616 def test_read_escape_fieldsep(self):
617 self.readerAssertEqual(b'"abc\\,def"\r\n', [['abc,def']])
620 class TestDictFields(unittest.TestCase):
621 # "long" means the row is longer than the number of fieldnames
622 # "short" means there are fewer elements in the row than fieldnames
623 def test_write_simple_dict(self):
624 fd, name = tempfile.mkstemp()
625 fileobj = open(name, 'w+b')
627 writer = csv.DictWriter(fileobj, fieldnames=["f1", "f2", "f3"])
630 self.assertEqual(fileobj.readline(), b"f1,f2,f3\r\n")
631 writer.writerow({"f1": 10, "f3": "abc"})
633 fileobj.readline() # header
634 self.assertEqual(fileobj.read(), b"10,,abc\r\n")
639 def test_write_unicode_header_dict(self):
640 fd, name = tempfile.mkstemp()
641 fileobj = open(name, 'w+b')
643 writer = csv.DictWriter(fileobj, fieldnames=[u"ñ", u"ö"])
646 self.assertEqual(fileobj.readline().decode('utf-8'), u"ñ,ö\r\n")
651 def test_write_no_fields(self):
653 self.assertRaises(TypeError, csv.DictWriter, fileobj)
655 def test_read_dict_fields(self):
656 fd, name = tempfile.mkstemp()
657 fileobj = os.fdopen(fd, "w+b")
659 fileobj.write(b"1,2,abc\r\n")
661 reader = csv.DictReader(fileobj,
662 fieldnames=["f1", "f2", "f3"])
663 self.assertEqual(next(reader),
664 {"f1": '1', "f2": '2', "f3": 'abc'})
669 def test_read_dict_no_fieldnames(self):
670 fd, name = tempfile.mkstemp()
671 fileobj = os.fdopen(fd, "w+b")
673 fileobj.write(b"f1,f2,f3\r\n1,2,abc\r\n")
675 reader = csv.DictReader(fileobj)
676 self.assertEqual(reader.fieldnames,
678 self.assertEqual(next(reader),
679 {"f1": '1', "f2": '2', "f3": 'abc'})
684 # Two test cases to make sure existing ways of implicitly setting
685 # fieldnames continue to work. Both arise from discussion in issue3436.
686 def test_read_dict_fieldnames_from_file(self):
687 fd, name = tempfile.mkstemp()
688 f = os.fdopen(fd, "w+b")
690 f.write(b"f1,f2,f3\r\n1,2,abc\r\n")
692 reader = csv.DictReader(f, fieldnames=next(csv.reader(f)))
693 self.assertEqual(reader.fieldnames,
695 self.assertEqual(next(reader),
696 {"f1": '1', "f2": '2', "f3": 'abc'})
701 def test_read_dict_fieldnames_chain(self):
703 fd, name = tempfile.mkstemp()
704 f = os.fdopen(fd, "w+b")
706 f.write(b"f1,f2,f3\r\n1,2,abc\r\n")
708 reader = csv.DictReader(f)
710 for row in itertools.chain([first], reader):
711 self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"])
712 self.assertEqual(row, {"f1": '1', "f2": '2', "f3": 'abc'})
717 def test_read_long(self):
718 fd, name = tempfile.mkstemp()
719 fileobj = os.fdopen(fd, "w+b")
721 fileobj.write(b"1,2,abc,4,5,6\r\n")
723 reader = csv.DictReader(fileobj,
724 fieldnames=["f1", "f2"])
725 self.assertEqual(next(reader), {"f1": '1', "f2": '2',
726 None: ["abc", "4", "5", "6"]})
731 def test_read_long_with_rest(self):
732 fd, name = tempfile.mkstemp()
733 fileobj = os.fdopen(fd, "w+b")
735 fileobj.write(b"1,2,abc,4,5,6\r\n")
737 reader = csv.DictReader(fileobj,
738 fieldnames=["f1", "f2"], restkey="_rest")
739 self.assertEqual(next(reader), {"f1": '1', "f2": '2',
740 "_rest": ["abc", "4", "5", "6"]})
745 def test_read_long_with_rest_no_fieldnames(self):
746 fd, name = tempfile.mkstemp()
747 fileobj = os.fdopen(fd, "w+b")
749 fileobj.write(b"f1,f2\r\n1,2,abc,4,5,6\r\n")
751 reader = csv.DictReader(fileobj, restkey="_rest")
752 self.assertEqual(reader.fieldnames, ["f1", "f2"])
753 self.assertEqual(next(reader), {"f1": '1', "f2": '2',
754 "_rest": ["abc", "4", "5", "6"]})
759 def test_read_short(self):
760 fd, name = tempfile.mkstemp()
761 fileobj = os.fdopen(fd, "w+b")
763 fileobj.write(b"1,2,abc,4,5,6\r\n1,2,abc\r\n")
765 reader = csv.DictReader(fileobj,
766 fieldnames="1 2 3 4 5 6".split(),
768 self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc',
769 "4": '4', "5": '5', "6": '6'})
770 self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc',
771 "4": 'DEFAULT', "5": 'DEFAULT',
777 def test_read_multi(self):
779 b'2147483648,43.0e12,17,abc,def\r\n',
780 b'147483648,43.0e2,17,abc,def\r\n',
781 b'47483648,43.0,170,abc,def\r\n'
784 reader = csv.DictReader(sample,
785 fieldnames="i1 float i2 s1 s2".split())
786 self.assertEqual(next(reader), {"i1": '2147483648',
792 def test_read_with_blanks(self):
793 reader = csv.DictReader([b"1,2,abc,4,5,6\r\n", b"\r\n",
794 b"1,2,abc,4,5,6\r\n"],
795 fieldnames="1 2 3 4 5 6".split())
796 self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc',
797 "4": '4', "5": '5', "6": '6'})
798 self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc',
799 "4": '4', "5": '5', "6": '6'})
801 def test_read_semi_sep(self):
802 reader = csv.DictReader([b"1;2;abc;4;5;6\r\n"],
803 fieldnames="1 2 3 4 5 6".split(),
805 self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc',
806 "4": '4', "5": '5', "6": '6'})
808 def test_empty_file(self):
809 csv.DictReader(BytesIO())
811 class TestArrayWrites(unittest.TestCase):
812 def test_int_write(self):
813 contents = [(20-i) for i in range(20)]
814 a = array.array('i', contents)
816 fd, name = tempfile.mkstemp()
817 fileobj = os.fdopen(fd, "w+b")
819 writer = csv.writer(fileobj, dialect="excel")
821 expected = b",".join([str(i).encode('utf-8') for i in a])+b"\r\n"
823 self.assertEqual(fileobj.read(), expected)
828 def test_double_write(self):
829 contents = [(20-i)*0.1 for i in range(20)]
830 a = array.array('d', contents)
831 fd, name = tempfile.mkstemp()
832 fileobj = os.fdopen(fd, "w+b")
834 writer = csv.writer(fileobj, dialect="excel")
837 if sys.version_info >= (2, 7, 3):
839 expected = b",".join([float_repr(i).encode('utf-8') for i in a])+b"\r\n"
841 self.assertEqual(fileobj.read(), expected)
846 def test_float_write(self):
847 contents = [(20-i)*0.1 for i in range(20)]
848 a = array.array('f', contents)
849 fd, name = tempfile.mkstemp()
850 fileobj = os.fdopen(fd, "w+b")
852 writer = csv.writer(fileobj, dialect="excel")
855 if sys.version_info >= (2, 7, 3):
857 expected = b",".join([float_repr(i).encode('utf-8') for i in a])+b"\r\n"
859 self.assertEqual(fileobj.read(), expected)
864 def test_char_write(self):
865 a = string.ascii_letters
866 fd, name = tempfile.mkstemp()
867 fileobj = os.fdopen(fd, "w+b")
869 writer = csv.writer(fileobj, dialect="excel")
871 expected = ",".join(a).encode('utf-8')+b"\r\n"
873 self.assertEqual(fileobj.read(), expected)
879 class TestUnicode(unittest.TestCase):
880 def test_unicode_read(self):
881 f = EncodedFile(BytesIO((u"Martin von Löwis,"
882 u"Marc André Lemburg,"
884 u"François Pinard\r\n").encode('iso-8859-1')),
885 data_encoding='iso-8859-1')
886 reader = csv.reader(f, encoding='iso-8859-1')
887 self.assertEqual(list(reader), [[u"Martin von Löwis",
888 u"Marc André Lemburg",
890 u"François Pinard"]])
893 class TestUnicodeErrors(unittest.TestCase):
894 def test_encode_error(self):
896 writer = csv.writer(fd, encoding='cp1252', errors='xmlcharrefreplace')
897 writer.writerow(['hello', chr(2603)])
898 self.assertEqual(fd.getvalue(), b'hello,ਫ\r\n')
900 def test_encode_error_dictwriter(self):
902 dw = csv.DictWriter(fd, ['col1'],
903 encoding='cp1252', errors='xmlcharrefreplace')
904 dw.writerow({'col1': chr(2604)})
905 self.assertEqual(fd.getvalue(), b'ਬ\r\n')
907 def test_decode_error(self):
908 """Make sure the specified error-handling mode is obeyed on readers."""
909 file = EncodedFile(BytesIO(u'Löwis,2,3'.encode('iso-8859-1')),
910 data_encoding='iso-8859-1')
911 reader = csv.reader(file, encoding='ascii', errors='ignore')
912 self.assertEqual(list(reader)[0][0], 'Lwis')
914 def test_decode_error_dictreader(self):
915 """Make sure the error-handling mode is obeyed on DictReaders."""
916 file = EncodedFile(BytesIO(u'name,height,weight\nLöwis,2,3'.encode('iso-8859-1')),
917 data_encoding='iso-8859-1')
918 reader = csv.DictReader(file, encoding='ascii', errors='ignore')
919 self.assertEqual(list(reader)[0]['name'], 'Lwis')