Merge pull request #558 from dpkp/batch_size_zero
Support batch_size = 0 in producer buffers
This commit is contained in:
@@ -30,8 +30,6 @@ class MessageSetBuffer(object):
|
||||
'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4),
|
||||
}
|
||||
def __init__(self, buf, batch_size, compression_type=None):
|
||||
assert batch_size > 0, 'batch_size must be > 0'
|
||||
|
||||
if compression_type is not None:
|
||||
assert compression_type in self._COMPRESSORS, 'Unrecognized compression type'
|
||||
checker, encoder, attributes = self._COMPRESSORS[compression_type]
|
||||
@@ -121,7 +119,7 @@ class SimpleBufferPool(object):
|
||||
self._poolable_size = poolable_size
|
||||
self._lock = threading.RLock()
|
||||
|
||||
buffers = int(memory / poolable_size)
|
||||
buffers = int(memory / poolable_size) if poolable_size else 0
|
||||
self._free = collections.deque([io.BytesIO() for _ in range(buffers)])
|
||||
|
||||
self._waiters = collections.deque()
|
||||
@@ -130,12 +128,13 @@ class SimpleBufferPool(object):
|
||||
#MetricName metricName = metrics.metricName("bufferpool-wait-ratio", metricGrpName, "The fraction of time an appender waits for space allocation.");
|
||||
#this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
|
||||
|
||||
def allocate(self, max_time_to_block_ms):
|
||||
def allocate(self, size, max_time_to_block_ms):
|
||||
"""
|
||||
Allocate a buffer of the given size. This method blocks if there is not
|
||||
enough memory and the buffer pool is configured with blocking mode.
|
||||
|
||||
Arguments:
|
||||
size (int): The buffer size to allocate in bytes [ignored]
|
||||
max_time_to_block_ms (int): The maximum time in milliseconds to
|
||||
block for buffer memory to be available
|
||||
|
||||
@@ -147,6 +146,9 @@ class SimpleBufferPool(object):
|
||||
if self._free:
|
||||
return self._free.popleft()
|
||||
|
||||
elif self._poolable_size == 0:
|
||||
return io.BytesIO()
|
||||
|
||||
else:
|
||||
# we are out of buffers and will have to block
|
||||
buf = None
|
||||
|
||||
@@ -200,7 +200,7 @@ class RecordAccumulator(object):
|
||||
|
||||
size = max(self.config['batch_size'], message_size)
|
||||
log.debug("Allocating a new %d byte message buffer for %s", size, tp) # trace
|
||||
buf = self._free.allocate(max_time_to_block_ms)
|
||||
buf = self._free.allocate(size, max_time_to_block_ms)
|
||||
with self._tp_locks[tp]:
|
||||
# Need to check if producer is closed again after grabbing the
|
||||
# dequeue lock.
|
||||
|
||||
Reference in New Issue
Block a user