forked from dataiku/dataiku-api-client-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
121 lines (98 loc) · 3.24 KB
/
utils.py
File metadata and controls
121 lines (98 loc) · 3.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import csv, sys
from dateutil import parser as date_iso_parser
from contextlib import closing
import itertools
if sys.version_info > (3,0):
import codecs
dku_basestring_type = str
dku_zip_longest = itertools.zip_longest
else:
dku_basestring_type = basestring
dku_zip_longest = itertools.izip_longest
class DataikuException(Exception):
"""Exception launched by the Dataiku API clients when an error occurs"""
class DataikuUTF8CSVReader(object):
"""
A CSV reader which will iterate over lines in the CSV file "f",
which is encoded in UTF-8.
"""
def __init__(self, f, **kwds):
self.reader = csv.reader(f, **kwds)
def next(self):
row = self.reader.next()
return [unicode(s, "utf-8") for s in row]
def __iter__(self):
return self
def none_if_throws(f):
def aux(*args, **kargs):
try:
return f(*args, **kargs)
except:
return None
return aux
class DataikuStreamedHttpUTF8CSVReader(object):
"""
A CSV reader with a schema
"""
def __init__(self, schema, csv_stream):
self.schema = schema
self.csv_stream = csv_stream
def iter_rows(self):
def decode(x):
if sys.version_info > (3,0):
return x
else:
return unicode(x, "utf8")
def parse_iso_date(s):
if s == "":
return None
else:
return date_iso_parser.parse(s)
def str_to_bool(s):
if s is None:
return False
return s.lower() == "true"
CASTERS = {
"tinyint" : int,
"smallint" : int,
"int": int,
"bigint": int,
"float": float,
"double": float,
"date": parse_iso_date,
"boolean": str_to_bool,
}
schema = self.schema
casters = [
CASTERS.get(col["type"], decode) for col in schema
]
with closing(self.csv_stream) as r:
if sys.version_info > (3,0):
raw_generator = codecs.iterdecode(r.raw, 'utf-8')
else:
raw_generator = r.raw
for uncasted_tuple in csv.reader(raw_generator,
delimiter='\t',
quotechar='"',
doublequote=True):
yield [none_if_throws(caster)(val)
for (caster, val) in dku_zip_longest(casters, uncasted_tuple)]
class DSSInternalDict(object):
"""
Class that provides some helpers and an `_internal_dict` dict field that is the actual holder of the data.
"""
def __init__(self, orig_dict=None):
if orig_dict is None:
self._internal_dict = dict()
else:
self._internal_dict = orig_dict
def get(self, name, default=None):
return self._internal_dict.get(name, default)
def get_raw(self):
"""
Gets the raw dictionary of the actual data
:rtype: dict
"""
return self._internal_dict
def __repr__(self):
return self.__class__.__name__ + "(" + self._internal_dict.__repr__() + ")"