f436397ef2c5add299f225bbe514220531857a7e
[sdc/sdc-distribution-client.git] /
1 # -*- coding: utf-8 -*-
2 import csv
3 import numbers
4
5 from itertools import izip
6
7 pass_throughs = [
8     'register_dialect',
9     'unregister_dialect',
10     'get_dialect',
11     'list_dialects',
12     'field_size_limit',
13     'Dialect',
14     'excel',
15     'excel_tab',
16     'Sniffer',
17     'QUOTE_ALL',
18     'QUOTE_MINIMAL',
19     'QUOTE_NONNUMERIC',
20     'QUOTE_NONE',
21     'Error'
22 ]
23 __all__ = [
24     'reader',
25     'writer',
26     'DictReader',
27     'DictWriter',
28 ] + pass_throughs
29
30 for prop in pass_throughs:
31     globals()[prop] = getattr(csv, prop)
32
33
34 def _stringify(s, encoding, errors):
35     if s is None:
36         return ''
37     if isinstance(s, unicode):
38         return s.encode(encoding, errors)
39     elif isinstance(s, numbers.Number):
40         pass  # let csv.QUOTE_NONNUMERIC do its thing.
41     elif not isinstance(s, str):
42         s = str(s)
43     return s
44
45
46 def _stringify_list(l, encoding, errors='strict'):
47     try:
48         return [_stringify(s, encoding, errors) for s in iter(l)]
49     except TypeError as e:
50         raise csv.Error(str(e))
51
52
53 def _unicodify(s, encoding):
54     if s is None:
55         return None
56     if isinstance(s, (unicode, int, float)):
57         return s
58     elif isinstance(s, str):
59         return s.decode(encoding)
60     return s
61
62
63 class UnicodeWriter(object):
64     """
65     >>> import unicodecsv
66     >>> from cStringIO import StringIO
67     >>> f = StringIO()
68     >>> w = unicodecsv.writer(f, encoding='utf-8')
69     >>> w.writerow((u'é', u'ñ'))
70     >>> f.seek(0)
71     >>> r = unicodecsv.reader(f, encoding='utf-8')
72     >>> row = r.next()
73     >>> row[0] == u'é'
74     True
75     >>> row[1] == u'ñ'
76     True
77     """
78     def __init__(self, f, dialect=csv.excel, encoding='utf-8', errors='strict',
79                  *args, **kwds):
80         self.encoding = encoding
81         self.writer = csv.writer(f, dialect, *args, **kwds)
82         self.encoding_errors = errors
83
84     def writerow(self, row):
85         return self.writer.writerow(
86                 _stringify_list(row, self.encoding, self.encoding_errors))
87
88     def writerows(self, rows):
89         for row in rows:
90             self.writerow(row)
91
92     @property
93     def dialect(self):
94         return self.writer.dialect
95 writer = UnicodeWriter
96
97
98 class UnicodeReader(object):
99     def __init__(self, f, dialect=None, encoding='utf-8', errors='strict',
100                  **kwds):
101
102         format_params = ['delimiter', 'doublequote', 'escapechar',
103                          'lineterminator', 'quotechar', 'quoting',
104                          'skipinitialspace']
105
106         if dialect is None:
107             if not any([kwd_name in format_params
108                         for kwd_name in kwds.keys()]):
109                 dialect = csv.excel
110         self.reader = csv.reader(f, dialect, **kwds)
111         self.encoding = encoding
112         self.encoding_errors = errors
113         self._parse_numerics = bool(
114             self.dialect.quoting & csv.QUOTE_NONNUMERIC)
115
116     def next(self):
117         row = self.reader.next()
118         encoding = self.encoding
119         encoding_errors = self.encoding_errors
120         unicode_ = unicode
121         if self._parse_numerics:
122             float_ = float
123             return [(value if isinstance(value, float_) else
124                     unicode_(value, encoding, encoding_errors))
125                     for value in row]
126         else:
127             return [unicode_(value, encoding, encoding_errors)
128                     for value in row]
129
130     def __iter__(self):
131         return self
132
133     @property
134     def dialect(self):
135         return self.reader.dialect
136
137     @property
138     def line_num(self):
139         return self.reader.line_num
140 reader = UnicodeReader
141
142
143 class DictWriter(csv.DictWriter):
144     """
145     >>> from cStringIO import StringIO
146     >>> f = StringIO()
147     >>> w = DictWriter(f, ['a', u'ñ', 'b'], restval=u'î')
148     >>> w.writerow({'a':'1', u'ñ':'2'})
149     >>> w.writerow({'a':'1', u'ñ':'2', 'b':u'ø'})
150     >>> w.writerow({'a':u'é', u'ñ':'2'})
151     >>> f.seek(0)
152     >>> r = DictReader(f, fieldnames=['a', u'ñ'], restkey='r')
153     >>> r.next() == {'a': u'1', u'ñ':'2', 'r': [u'î']}
154     True
155     >>> r.next() == {'a': u'1', u'ñ':'2', 'r': [u'\xc3\xb8']}
156     True
157     >>> r.next() == {'a': u'\xc3\xa9', u'ñ':'2', 'r': [u'\xc3\xae']}
158     True
159     """
160     def __init__(self, csvfile, fieldnames, restval='',
161                  extrasaction='raise', dialect='excel', encoding='utf-8',
162                  errors='strict', *args, **kwds):
163         self.encoding = encoding
164         csv.DictWriter.__init__(self, csvfile, fieldnames, restval,
165                                 extrasaction, dialect, *args, **kwds)
166         self.writer = UnicodeWriter(csvfile, dialect, encoding=encoding,
167                                     errors=errors, *args, **kwds)
168         self.encoding_errors = errors
169
170     def writeheader(self):
171         header = dict(zip(self.fieldnames, self.fieldnames))
172         self.writerow(header)
173
174
175 class DictReader(csv.DictReader):
176     """
177     >>> from cStringIO import StringIO
178     >>> f = StringIO()
179     >>> w = DictWriter(f, fieldnames=['name', 'place'])
180     >>> w.writerow({'name': 'Cary Grant', 'place': 'hollywood'})
181     >>> w.writerow({'name': 'Nathan Brillstone', 'place': u'øLand'})
182     >>> w.writerow({'name': u'Will ø. Unicoder', 'place': u'éSpandland'})
183     >>> f.seek(0)
184     >>> r = DictReader(f, fieldnames=['name', 'place'])
185     >>> print r.next() == {'name': 'Cary Grant', 'place': 'hollywood'}
186     True
187     >>> print r.next() == {'name': 'Nathan Brillstone', 'place': u'øLand'}
188     True
189     >>> print r.next() == {'name': u'Will ø. Unicoder', 'place': u'éSpandland'}
190     True
191     """
192     def __init__(self, csvfile, fieldnames=None, restkey=None, restval=None,
193                  dialect='excel', encoding='utf-8', errors='strict', *args,
194                  **kwds):
195         if fieldnames is not None:
196             fieldnames = _stringify_list(fieldnames, encoding)
197         csv.DictReader.__init__(self, csvfile, fieldnames, restkey, restval,
198                                 dialect, *args, **kwds)
199         self.reader = UnicodeReader(csvfile, dialect, encoding=encoding,
200                                     errors=errors, *args, **kwds)
201         if fieldnames is None and not hasattr(csv.DictReader, 'fieldnames'):
202             # Python 2.5 fieldnames workaround.
203             # See http://bugs.python.org/issue3436
204             reader = UnicodeReader(csvfile, dialect, encoding=encoding,
205                                    *args, **kwds)
206             self.fieldnames = _stringify_list(reader.next(), reader.encoding)
207
208         if self.fieldnames is not None:
209             self.unicode_fieldnames = [_unicodify(f, encoding) for f in
210                                        self.fieldnames]
211         else:
212             self.unicode_fieldnames = []
213
214         self.unicode_restkey = _unicodify(restkey, encoding)
215
216     def next(self):
217         row = csv.DictReader.next(self)
218         result = dict((uni_key, row[str_key]) for (str_key, uni_key) in
219                       izip(self.fieldnames, self.unicode_fieldnames))
220         rest = row.get(self.restkey)
221         if rest:
222             result[self.unicode_restkey] = rest
223         return result