Push cooperative sleep call down into ThreadPool
The PUT REST API has no idea how writes are performed, so when thread pools are in use, the sleep is not necessary, though it is still necessary when thread pools are not in use. Since the ThreadPool object knows when threads are actually in use, it can take care of being cooperative with the eventlet hub. In addition, we can hide the cooperative iterator hook, given that the only other consumer of this was the auditor, which does not need it any longer. The only consumer of the DiskFile class that wants the cooperative behavior is the REST API layer of the object server, which is also using thread pools. Change-Id: Ibc4ac672899f9a35fd68c85d7f56403c19b4f991 Signed-off-by: Peter Portante <peter.portante@redhat.com>
This commit is contained in:
committed by
Gerrit Code Review
parent
9e25d38611
commit
853853edce
@@ -2164,14 +2164,18 @@ class ThreadPool(object):
|
||||
|
||||
Exceptions thrown will be reraised in the calling thread.
|
||||
|
||||
If the threadpool was initialized with nthreads=0, just calls
|
||||
func(*args, **kwargs).
|
||||
If the threadpool was initialized with nthreads=0, it invokes
|
||||
func(*args, **kwargs) directly, followed by eventlet.sleep() to ensure
|
||||
the eventlet hub has a chance to execute. It is more likely the hub
|
||||
will be invoked when queuing operations to an external thread.
|
||||
|
||||
:returns: result of calling func
|
||||
:raises: whatever func raises
|
||||
"""
|
||||
if self.nthreads <= 0:
|
||||
return func(*args, **kwargs)
|
||||
result = func(*args, **kwargs)
|
||||
sleep()
|
||||
return result
|
||||
|
||||
ev = event.Event()
|
||||
self._run_queue.put((ev, func, args, kwargs), block=False)
|
||||
|
||||
@@ -713,12 +713,11 @@ class DiskFileReader(object):
|
||||
:param device_path: on-disk device path, used when quarantining an obj
|
||||
:param logger: logger caller wants this object to use
|
||||
:param quarantine_hook: 1-arg callable called w/reason when quarantined
|
||||
:param iter_hook: called when __iter__ returns a chunk
|
||||
:param keep_cache: should resulting reads be kept in the buffer cache
|
||||
"""
|
||||
def __init__(self, fp, data_file, obj_size, etag, threadpool,
|
||||
disk_chunk_size, keep_cache_size, device_path, logger,
|
||||
quarantine_hook, iter_hook=None, keep_cache=False):
|
||||
quarantine_hook, keep_cache=False):
|
||||
# Parameter tracking
|
||||
self._fp = fp
|
||||
self._data_file = data_file
|
||||
@@ -729,7 +728,6 @@ class DiskFileReader(object):
|
||||
self._device_path = device_path
|
||||
self._logger = logger
|
||||
self._quarantine_hook = quarantine_hook
|
||||
self._iter_hook = iter_hook
|
||||
if keep_cache:
|
||||
# Caller suggests we keep this in cache, only do it if the
|
||||
# object's size is less than the maximum.
|
||||
@@ -767,8 +765,6 @@ class DiskFileReader(object):
|
||||
self._bytes_read - dropped_cache)
|
||||
dropped_cache = self._bytes_read
|
||||
yield chunk
|
||||
if self._iter_hook:
|
||||
self._iter_hook()
|
||||
else:
|
||||
self._read_to_eof = True
|
||||
self._drop_cache(self._fp.fileno(), dropped_cache,
|
||||
@@ -1259,7 +1255,7 @@ class DiskFile(object):
|
||||
with self.open():
|
||||
return self.get_metadata()
|
||||
|
||||
def reader(self, iter_hook=None, keep_cache=False,
|
||||
def reader(self, keep_cache=False,
|
||||
_quarantine_hook=lambda m: None):
|
||||
"""
|
||||
Return a :class:`swift.common.swob.Response` class compatible
|
||||
@@ -1269,7 +1265,6 @@ class DiskFile(object):
|
||||
For this implementation, the responsibility of closing the open file
|
||||
is passed to the :class:`swift.obj.diskfile.DiskFileReader` object.
|
||||
|
||||
:param iter_hook: called when __iter__ returns a chunk
|
||||
:param keep_cache: caller's preference for keeping data read in the
|
||||
OS buffer cache
|
||||
:param _quarantine_hook: 1-arg callable called when obj quarantined;
|
||||
@@ -1282,8 +1277,7 @@ class DiskFile(object):
|
||||
self._fp, self._data_file, int(self._metadata['Content-Length']),
|
||||
self._metadata['ETag'], self._threadpool, self._disk_chunk_size,
|
||||
self._mgr.keep_cache_size, self._device_path, self._logger,
|
||||
quarantine_hook=_quarantine_hook,
|
||||
iter_hook=iter_hook, keep_cache=keep_cache)
|
||||
quarantine_hook=_quarantine_hook, keep_cache=keep_cache)
|
||||
# At this point the reader object is now responsible for closing
|
||||
# the file pointer.
|
||||
self._fp = None
|
||||
|
||||
@@ -112,14 +112,12 @@ class DiskFileReader(object):
|
||||
:param fp: open file object pointer reference
|
||||
:param obj_size: on-disk size of object in bytes
|
||||
:param etag: MD5 hash of object from metadata
|
||||
:param iter_hook: called when __iter__ returns a chunk
|
||||
"""
|
||||
def __init__(self, name, fp, obj_size, etag, iter_hook=None):
|
||||
def __init__(self, name, fp, obj_size, etag):
|
||||
self._name = name
|
||||
self._fp = fp
|
||||
self._obj_size = obj_size
|
||||
self._etag = etag
|
||||
self._iter_hook = iter_hook
|
||||
#
|
||||
self._iter_etag = None
|
||||
self._bytes_read = 0
|
||||
@@ -144,8 +142,6 @@ class DiskFileReader(object):
|
||||
self._iter_etag.update(chunk)
|
||||
self._bytes_read += len(chunk)
|
||||
yield chunk
|
||||
if self._iter_hook:
|
||||
self._iter_hook()
|
||||
else:
|
||||
self._read_to_eof = True
|
||||
break
|
||||
@@ -234,7 +230,6 @@ class DiskFile(object):
|
||||
:param account: account name for the object
|
||||
:param container: container name for the object
|
||||
:param obj: object name for the object
|
||||
:param iter_hook: called when __iter__ returns a chunk
|
||||
:param keep_cache: caller's preference for keeping data read in the cache
|
||||
"""
|
||||
|
||||
@@ -348,19 +343,17 @@ class DiskFile(object):
|
||||
with self.open():
|
||||
return self.get_metadata()
|
||||
|
||||
def reader(self, iter_hook=None, keep_cache=False):
|
||||
def reader(self, keep_cache=False):
|
||||
"""
|
||||
Return a swift.common.swob.Response class compatible "app_iter"
|
||||
object. The responsibility of closing the open file is passed to the
|
||||
DiskFileReader object.
|
||||
|
||||
:param iter_hook:
|
||||
:param keep_cache:
|
||||
"""
|
||||
dr = DiskFileReader(self._name, self._fp,
|
||||
int(self._metadata['Content-Length']),
|
||||
self._metadata['ETag'],
|
||||
iter_hook=iter_hook)
|
||||
self._metadata['ETag'])
|
||||
# At this point the reader object is now responsible for
|
||||
# the file pointer.
|
||||
self._fp = None
|
||||
|
||||
@@ -407,7 +407,6 @@ class ObjectController(object):
|
||||
return HTTPRequestTimeout(request=request)
|
||||
etag.update(chunk)
|
||||
upload_size = writer.write(chunk)
|
||||
sleep()
|
||||
elapsed_time += time.time() - start_time
|
||||
if upload_size:
|
||||
self.logger.transfer_rate(
|
||||
@@ -505,8 +504,7 @@ class ObjectController(object):
|
||||
('X-Auth-Token' not in request.headers and
|
||||
'X-Storage-Token' not in request.headers))
|
||||
response = Response(
|
||||
app_iter=disk_file.reader(
|
||||
iter_hook=sleep, keep_cache=keep_cache),
|
||||
app_iter=disk_file.reader(keep_cache=keep_cache),
|
||||
request=request, conditional_response=True)
|
||||
response.headers['Content-Type'] = metadata.get(
|
||||
'Content-Type', 'application/octet-stream')
|
||||
|
||||
@@ -790,19 +790,6 @@ class TestDiskFile(unittest.TestCase):
|
||||
df.unit_test_len = fsize
|
||||
return df
|
||||
|
||||
def test_iter_hook(self):
|
||||
hook_call_count = [0]
|
||||
|
||||
def hook():
|
||||
hook_call_count[0] += 1
|
||||
|
||||
df = self._get_open_disk_file(fsize=65, csize=8)
|
||||
with df.open():
|
||||
for _ in df.reader(iter_hook=hook):
|
||||
pass
|
||||
|
||||
self.assertEquals(hook_call_count[0], 9)
|
||||
|
||||
def test_keep_cache(self):
|
||||
df = self._get_open_disk_file(fsize=65)
|
||||
with mock.patch("swift.obj.diskfile.drop_buffer_cache") as foo:
|
||||
|
||||
Reference in New Issue
Block a user