fallback: Rewrite buffer from array of bytes to bytearray
This commit is contained in:
parent
318ddfc052
commit
f421f59a28
@ -86,11 +86,8 @@ def unpack(stream, **kwargs):
|
||||
Raises `ExtraData` when `packed` contains extra bytes.
|
||||
See :class:`Unpacker` for options.
|
||||
"""
|
||||
unpacker = Unpacker(stream, **kwargs)
|
||||
ret = unpacker._fb_unpack()
|
||||
if unpacker._fb_got_extradata():
|
||||
raise ExtraData(ret, unpacker._fb_get_extradata())
|
||||
return ret
|
||||
data = stream.read()
|
||||
return unpackb(data, **kwargs)
|
||||
|
||||
|
||||
def unpackb(packed, **kwargs):
|
||||
@ -121,7 +118,7 @@ class Unpacker(object):
|
||||
If specified, unpacker reads serialized data from it and :meth:`feed()` is not usable.
|
||||
|
||||
:param int read_size:
|
||||
Used as `file_like.read(read_size)`. (default: `min(1024**2, max_buffer_size)`)
|
||||
Used as `file_like.read(read_size)`. (default: `min(16*1024, max_buffer_size)`)
|
||||
|
||||
:param bool use_list:
|
||||
If true, unpack msgpack array to Python list.
|
||||
@ -199,13 +196,9 @@ class Unpacker(object):
|
||||
self._fb_feeding = False
|
||||
|
||||
#: array of bytes feeded.
|
||||
self._fb_buffers = []
|
||||
#: Which buffer we currently reads
|
||||
self._fb_buf_i = 0
|
||||
self._buffer = b""
|
||||
#: Which position we currently reads
|
||||
self._fb_buf_o = 0
|
||||
#: Total size of _fb_bufferes
|
||||
self._fb_buf_n = 0
|
||||
self._buff_i = 0
|
||||
|
||||
# When Unpacker is used as an iterable, between the calls to next(),
|
||||
# the buffer is not "consumed" completely, for efficiency sake.
|
||||
@ -213,13 +206,13 @@ class Unpacker(object):
|
||||
# the correct moments, we have to keep track of how sloppy we were.
|
||||
# Furthermore, when the buffer is incomplete (that is: in the case
|
||||
# we raise an OutOfData) we need to rollback the buffer to the correct
|
||||
# state, which _fb_slopiness records.
|
||||
self._fb_sloppiness = 0
|
||||
# state, which _buf_checkpoint records.
|
||||
self._buf_checkpoint = 0
|
||||
|
||||
self._max_buffer_size = max_buffer_size or 2**31-1
|
||||
if read_size > self._max_buffer_size:
|
||||
raise ValueError("read_size must be smaller than max_buffer_size")
|
||||
self._read_size = read_size or min(self._max_buffer_size, 4096)
|
||||
self._read_size = read_size or min(self._max_buffer_size, 16*1024)
|
||||
self._encoding = encoding
|
||||
self._unicode_errors = unicode_errors
|
||||
self._use_list = use_list
|
||||
@ -248,103 +241,75 @@ class Unpacker(object):
|
||||
def feed(self, next_bytes):
|
||||
if isinstance(next_bytes, array.array):
|
||||
next_bytes = next_bytes.tostring()
|
||||
elif isinstance(next_bytes, bytearray):
|
||||
next_bytes = bytes(next_bytes)
|
||||
if not isinstance(next_bytes, (bytes, bytearray)):
|
||||
raise TypeError("next_bytes should be bytes, bytearray or array.array")
|
||||
assert self._fb_feeding
|
||||
if (self._fb_buf_n + len(next_bytes) - self._fb_sloppiness
|
||||
> self._max_buffer_size):
|
||||
raise BufferFull
|
||||
self._fb_buf_n += len(next_bytes)
|
||||
self._fb_buffers.append(next_bytes)
|
||||
|
||||
def _fb_sloppy_consume(self):
|
||||
""" Gets rid of some of the used parts of the buffer. """
|
||||
if self._fb_buf_i:
|
||||
for i in xrange(self._fb_buf_i):
|
||||
self._fb_buf_n -= len(self._fb_buffers[i])
|
||||
self._fb_buffers = self._fb_buffers[self._fb_buf_i:]
|
||||
self._fb_buf_i = 0
|
||||
if self._fb_buffers:
|
||||
self._fb_sloppiness = self._fb_buf_o
|
||||
else:
|
||||
self._fb_sloppiness = 0
|
||||
if (len(self._buffer) - self._buff_i + len(next_bytes) > self._max_buffer_size):
|
||||
raise BufferFull
|
||||
# bytes + bytearray -> bytearray
|
||||
# So cast before append
|
||||
self._buffer += bytes(next_bytes)
|
||||
|
||||
def _fb_consume(self):
|
||||
""" Gets rid of the used parts of the buffer. """
|
||||
if self._fb_buf_i:
|
||||
for i in xrange(self._fb_buf_i):
|
||||
self._fb_buf_n -= len(self._fb_buffers[i])
|
||||
self._fb_buffers = self._fb_buffers[self._fb_buf_i:]
|
||||
self._fb_buf_i = 0
|
||||
if self._fb_buffers:
|
||||
self._fb_buffers[0] = self._fb_buffers[0][self._fb_buf_o:]
|
||||
self._fb_buf_n -= self._fb_buf_o
|
||||
else:
|
||||
self._fb_buf_n = 0
|
||||
self._fb_buf_o = 0
|
||||
self._fb_sloppiness = 0
|
||||
self._buf_checkpoint = self._buff_i
|
||||
|
||||
def _fb_got_extradata(self):
|
||||
if self._fb_buf_i != len(self._fb_buffers):
|
||||
return True
|
||||
if self._fb_feeding:
|
||||
return False
|
||||
if not self.file_like:
|
||||
return False
|
||||
if self.file_like.read(1):
|
||||
return True
|
||||
return False
|
||||
return self._buff_i < len(self._buffer)
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
def _fb_get_extradata(self):
|
||||
return self._buffer[self._buff_i:]
|
||||
|
||||
def read_bytes(self, n):
|
||||
return self._fb_read(n)
|
||||
|
||||
def _fb_rollback(self):
|
||||
self._fb_buf_i = 0
|
||||
self._fb_buf_o = self._fb_sloppiness
|
||||
|
||||
def _fb_get_extradata(self):
|
||||
bufs = self._fb_buffers[self._fb_buf_i:]
|
||||
if bufs:
|
||||
bufs[0] = bufs[0][self._fb_buf_o:]
|
||||
return b''.join(bufs)
|
||||
|
||||
def _fb_read(self, n, write_bytes=None):
|
||||
buffs = self._fb_buffers
|
||||
# We have a redundant codepath for the most common case, such that
|
||||
# pypy optimizes it properly. This is the case that the read fits
|
||||
# in the current buffer.
|
||||
if (write_bytes is None and self._fb_buf_i < len(buffs) and
|
||||
self._fb_buf_o + n < len(buffs[self._fb_buf_i])):
|
||||
self._fb_buf_o += n
|
||||
return buffs[self._fb_buf_i][self._fb_buf_o - n:self._fb_buf_o]
|
||||
# (int, Optional[Callable]) -> bytearray
|
||||
remain_bytes = len(self._buffer) - self._buff_i - n
|
||||
|
||||
# The remaining cases.
|
||||
ret = b''
|
||||
while len(ret) != n:
|
||||
sliced = n - len(ret)
|
||||
if self._fb_buf_i == len(buffs):
|
||||
if self._fb_feeding:
|
||||
break
|
||||
to_read = sliced
|
||||
if self._read_size > to_read:
|
||||
to_read = self._read_size
|
||||
tmp = self.file_like.read(to_read)
|
||||
if not tmp:
|
||||
break
|
||||
buffs.append(tmp)
|
||||
self._fb_buf_n += len(tmp)
|
||||
continue
|
||||
ret += buffs[self._fb_buf_i][self._fb_buf_o:self._fb_buf_o + sliced]
|
||||
self._fb_buf_o += sliced
|
||||
if self._fb_buf_o >= len(buffs[self._fb_buf_i]):
|
||||
self._fb_buf_o = 0
|
||||
self._fb_buf_i += 1
|
||||
if len(ret) != n:
|
||||
self._fb_rollback()
|
||||
# Fast path: buffer has n bytes already
|
||||
if remain_bytes >= 0:
|
||||
ret = self._buffer[self._buff_i:self._buff_i+n]
|
||||
self._buff_i += n
|
||||
if write_bytes is not None:
|
||||
write_bytes(ret)
|
||||
return ret
|
||||
|
||||
if self._fb_feeding:
|
||||
self._buff_i = self._buf_checkpoint
|
||||
raise OutOfData
|
||||
|
||||
# Strip buffer before checkpoint before reading file.
|
||||
if self._buf_checkpoint > 0:
|
||||
self._buffer = self._buffer[self._buf_checkpoint:]
|
||||
self._buff_i -= self._buf_checkpoint
|
||||
self._buf_checkpoint = 0
|
||||
|
||||
# Read from file
|
||||
remain_bytes = -remain_bytes
|
||||
while remain_bytes > 0:
|
||||
to_read_bytes = max(self._read_size, remain_bytes)
|
||||
read_data = self.file_like.read(to_read_bytes)
|
||||
if not read_data:
|
||||
break
|
||||
assert isinstance(read_data, bytes)
|
||||
self._buffer += read_data
|
||||
remain_bytes -= len(read_data)
|
||||
|
||||
if len(self._buffer) < n + self._buff_i:
|
||||
self._buff_i = 0 # rollback
|
||||
raise OutOfData
|
||||
|
||||
if len(self._buffer) == n:
|
||||
# checkpoint == 0
|
||||
ret = self._buffer
|
||||
self._buffer = b""
|
||||
self._buff_i = 0
|
||||
else:
|
||||
ret = self._buffer[self._buff_i:self._buff_i+n]
|
||||
self._buff_i += n
|
||||
|
||||
if write_bytes is not None:
|
||||
write_bytes(ret)
|
||||
return ret
|
||||
@ -562,15 +527,19 @@ class Unpacker(object):
|
||||
assert typ == TYPE_IMMEDIATE
|
||||
return obj
|
||||
|
||||
def next(self):
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
try:
|
||||
ret = self._fb_unpack(EX_CONSTRUCT, None)
|
||||
self._fb_sloppy_consume()
|
||||
self._fb_consume()
|
||||
return ret
|
||||
except OutOfData:
|
||||
self._fb_consume()
|
||||
raise StopIteration
|
||||
__next__ = next
|
||||
|
||||
next = __next__
|
||||
|
||||
def skip(self, write_bytes=None):
|
||||
self._fb_unpack(EX_SKIP, write_bytes)
|
||||
|
Loading…
Reference in New Issue
Block a user