-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathrStringIO.py
More file actions
198 lines (181 loc) · 6.53 KB
/
rStringIO.py
File metadata and controls
198 lines (181 loc) · 6.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
from rpython.rlib.rstring import StringBuilder
from rpython.rlib.objectmodel import we_are_translated
AT_END = -1
class RStringIO(object):
"""RPython-level StringIO object.
The fastest path through this code is for the case of a bunch of write()
followed by getvalue().
"""
def __init__(self):
self.init()
def init(self):
# The real content is the join of the following data:
# * the list of characters self.__bigbuffer;
# * each of the strings in self.__strings.
#
self.__closed = False
self.__strings = None
self.__bigbuffer = None
self.__pos = AT_END
def close(self):
self.__closed = True
self.__strings = None
self.__bigbuffer = None
self.__pos = AT_END
def is_closed(self):
return self.__closed
def __copy_into_bigbuffer(self):
"""Copy all the data into the list of characters self.__bigbuffer."""
if self.__bigbuffer is None:
self.__bigbuffer = []
if self.__strings is not None:
self.__bigbuffer += self.__strings.build()
self.__strings = None
def getvalue(self):
"""If self.__strings contains more than 1 string, join all the
strings together. Return the final single string."""
if self.__bigbuffer is not None:
self.__copy_into_bigbuffer()
return ''.join(self.__bigbuffer)
if self.__strings is not None:
return self.__strings.build()
return ''
def getsize(self):
result = 0
if self.__bigbuffer is not None:
result += len(self.__bigbuffer)
if self.__strings is not None:
result += self.__strings.getlength()
return result
def write(self, buffer):
# Idea: for the common case of a sequence of write() followed
# by only getvalue(), self.__bigbuffer remains empty. It is only
# used to handle the more complicated cases.
if self.__pos == AT_END:
self.__fast_write(buffer)
else:
self.__slow_write(buffer)
def __fast_write(self, buffer):
if self.__strings is None:
self.__strings = StringBuilder()
self.__strings.append(buffer)
def __slow_write(self, buffer):
assert buffer is not None # help annotator
p = self.__pos
assert p >= 0
endp = p + len(buffer)
if self.__bigbuffer is not None and len(self.__bigbuffer) >= endp:
# semi-fast path: the write is entirely inside self.__bigbuffer
for i in range(len(buffer)):
self.__bigbuffer[p + i] = buffer[i]
else:
# slow path: collect all data into self.__bigbuffer and
# handle the various cases
self.__copy_into_bigbuffer()
fitting = len(self.__bigbuffer) - p
if fitting > 0:
# the write starts before the end of the data
fitting = min(len(buffer), fitting)
for i in range(fitting):
self.__bigbuffer[p + i] = buffer[i]
if len(buffer) > fitting:
# the write extends beyond the end of the data
self.__bigbuffer += buffer[fitting:]
endp = AT_END
else:
# the write starts at or beyond the end of the data
self.__bigbuffer += '\x00' * (-fitting) + buffer
endp = AT_END
self.__pos = endp
def seek(self, position, mode=0):
if mode == 0:
if position == self.getsize():
self.__pos = AT_END
return
elif mode == 1:
if self.__pos == AT_END:
self.__pos = self.getsize()
position += self.__pos
elif mode == 2:
if position == 0:
self.__pos = AT_END
return
position += self.getsize()
if position < 0:
position = 0
self.__pos = position
def tell(self):
if self.__pos == AT_END:
result = self.getsize()
else:
result = self.__pos
assert result >= 0
return result
def read(self, size=-1):
p = self.__pos
if p == 0 and size < 0:
self.__pos = AT_END
return self.getvalue() # reading everything
if p == AT_END or size == 0:
return ''
assert p >= 0
self.__copy_into_bigbuffer()
mysize = len(self.__bigbuffer)
count = mysize - p
if size >= 0:
count = min(size, count)
if count <= 0:
return ''
if p == 0 and count == mysize:
self.__pos = AT_END
return ''.join(self.__bigbuffer)
else:
self.__pos = p + count
return ''.join(self.__bigbuffer[p:p+count])
def readline(self, size=-1):
p = self.__pos
if p == AT_END or size == 0:
return ''
assert p >= 0
self.__copy_into_bigbuffer()
end = len(self.__bigbuffer)
count = end - p
if size >= 0 and size < count:
end = p + size
if count <= 0:
return ''
i = p
while i < end:
finished = self.__bigbuffer[i] == '\n'
i += 1
if finished:
break
self.__pos = i
if not we_are_translated():
# assert that we read within the bounds!
bl = len(self.__bigbuffer)
assert p <= bl
assert i <= bl
return ''.join(self.__bigbuffer[p:i])
def truncate(self, size):
"""Warning, this gets us slightly strange behavior from the
point of view of a traditional Unix file, but consistent with
Python 2.7's cStringIO module: it will not enlarge the file,
and it will always seek to the (new) end of the file."""
assert size >= 0
if size == 0:
self.__bigbuffer = None
self.__strings = None
else:
if self.__bigbuffer is None or size > len(self.__bigbuffer):
self.__copy_into_bigbuffer()
else:
# we can drop all extra strings
if self.__strings is not None:
self.__strings = None
if size < len(self.__bigbuffer):
del self.__bigbuffer[size:]
if len(self.__bigbuffer) == 0:
self.__bigbuffer = None
# it always has the effect of seeking at the new end
self.__pos = AT_END