Merge "Use threadpools in the object server for performance."

This commit is contained in:
Jenkins 2013-06-11 22:47:07 +00:00 committed by Gerrit Code Review
commit b63b5d590a
6 changed files with 345 additions and 53 deletions

View File

@ -378,6 +378,13 @@ mb_per_sync 512 On PUT requests, sync file every n MB
keep_cache_size 5242880 Largest object size to keep in buffer cache
keep_cache_private false Allow non-public objects to stay in
kernel's buffer cache
threads_per_disk 0 Size of the per-disk thread pool used for
performing disk I/O. The default of 0 means
to not use a per-disk thread pool. It is
recommended to keep this value small, as
large values can result in high read
latencies due to large queue depths. A good
starting point is 4 threads per disk.
================== ============= ===========================================
[object-replicator]

View File

@ -83,6 +83,8 @@ use = egg:swift#object
# verbs, set to "False". Unless you have a separate replication network, you
# should not specify any value for "replication_server".
# replication_server = False
# A value of 0 means "don't use thread pools". A reasonable starting point is 4.
# threads_per_disk = 0
[filter:healthcheck]
use = egg:swift#healthcheck

View File

@ -22,6 +22,7 @@ import os
import pwd
import re
import sys
import threading as stdlib_threading
import time
import uuid
import functools
@ -34,6 +35,7 @@ import ctypes.util
from ConfigParser import ConfigParser, NoSectionError, NoOptionError, \
RawConfigParser
from optparse import OptionParser
from Queue import Queue, Empty
from tempfile import mkstemp, NamedTemporaryFile
try:
import simplejson as json
@ -46,7 +48,8 @@ import itertools
import eventlet
import eventlet.semaphore
from eventlet import GreenPool, sleep, Timeout, tpool
from eventlet import GreenPool, sleep, Timeout, tpool, greenthread, \
greenio, event
from eventlet.green import socket, threading
import netifaces
import codecs
@ -1814,3 +1817,164 @@ def tpool_reraise(func, *args, **kwargs):
if isinstance(resp, BaseException):
raise resp
return resp
class ThreadPool(object):
BYTE = 'a'.encode('utf-8')
"""
Perform blocking operations in background threads.
Call its methods from within greenlets to green-wait for results without
blocking the eventlet reactor (hopefully).
"""
def __init__(self, nthreads=2):
self.nthreads = nthreads
self._run_queue = Queue()
self._result_queue = Queue()
self._threads = []
if nthreads <= 0:
return
# We spawn a greenthread whose job it is to pull results from the
# worker threads via a real Queue and send them to eventlet Events so
# that the calling greenthreads can be awoken.
#
# Since each OS thread has its own collection of greenthreads, it
# doesn't work to have the worker thread send stuff to the event, as
# it then notifies its own thread-local eventlet hub to wake up, which
# doesn't do anything to help out the actual calling greenthread over
# in the main thread.
#
# Thus, each worker sticks its results into a result queue and then
# writes a byte to a pipe, signaling the result-consuming greenlet (in
# the main thread) to wake up and consume results.
#
# This is all stuff that eventlet.tpool does, but that code can't have
# multiple instances instantiated. Since the object server uses one
# pool per disk, we have to reimplement this stuff.
_raw_rpipe, self.wpipe = os.pipe()
self.rpipe = greenio.GreenPipe(_raw_rpipe, 'rb', bufsize=0)
for _junk in xrange(nthreads):
thr = stdlib_threading.Thread(
target=self._worker,
args=(self._run_queue, self._result_queue))
thr.daemon = True
thr.start()
self._threads.append(thr)
# This is the result-consuming greenthread that runs in the main OS
# thread, as described above.
self._consumer_coro = greenthread.spawn_n(self._consume_results,
self._result_queue)
def _worker(self, work_queue, result_queue):
"""
Pulls an item from the queue and runs it, then puts the result into
the result queue. Repeats forever.
:param work_queue: queue from which to pull work
:param result_queue: queue into which to place results
"""
while True:
item = work_queue.get()
ev, func, args, kwargs = item
try:
result = func(*args, **kwargs)
result_queue.put((ev, True, result))
except BaseException, err:
result_queue.put((ev, False, err))
finally:
work_queue.task_done()
os.write(self.wpipe, self.BYTE)
def _consume_results(self, queue):
"""
Runs as a greenthread in the same OS thread as callers of
run_in_thread().
Takes results from the worker OS threads and sends them to the waiting
greenthreads.
"""
while True:
try:
self.rpipe.read(1)
except ValueError:
# can happen at process shutdown when pipe is closed
break
while True:
try:
ev, success, result = queue.get(block=False)
except Empty:
break
try:
if success:
ev.send(result)
else:
ev.send_exception(result)
finally:
queue.task_done()
def run_in_thread(self, func, *args, **kwargs):
"""
Runs func(*args, **kwargs) in a thread. Blocks the current greenlet
until results are available.
Exceptions thrown will be reraised in the calling thread.
If the threadpool was initialized with nthreads=0, just calls
func(*args, **kwargs).
:returns: result of calling func
:raises: whatever func raises
"""
if self.nthreads <= 0:
return func(*args, **kwargs)
ev = event.Event()
self._run_queue.put((ev, func, args, kwargs), block=False)
# blocks this greenlet (and only *this* greenlet) until the real
# thread calls ev.send().
result = ev.wait()
return result
def _run_in_eventlet_tpool(self, func, *args, **kwargs):
"""
Really run something in an external thread, even if we haven't got any
threads of our own.
"""
def inner():
try:
return (True, func(*args, **kwargs))
except (Timeout, BaseException) as err:
return (False, err)
success, result = tpool.execute(inner)
if success:
return result
else:
raise result
def force_run_in_thread(self, func, *args, **kwargs):
"""
Runs func(*args, **kwargs) in a thread. Blocks the current greenlet
until results are available.
Exceptions thrown will be reraised in the calling thread.
If the threadpool was initialized with nthreads=0, uses eventlet.tpool
to run the function. This is in contrast to run_in_thread(), which
will (in that case) simply execute func in the calling thread.
:returns: result of calling func
:raises: whatever func raises
"""
if self.nthreads <= 0:
return self._run_in_eventlet_tpool(func, *args, **kwargs)
else:
return self.run_in_thread(func, *args, **kwargs)

View File

@ -21,6 +21,7 @@ import errno
import os
import time
import traceback
from collections import defaultdict
from datetime import datetime
from hashlib import md5
from tempfile import mkstemp
@ -28,13 +29,13 @@ from urllib import unquote
from contextlib import contextmanager
from xattr import getxattr, setxattr
from eventlet import sleep, Timeout, tpool
from eventlet import sleep, Timeout
from swift.common.utils import mkdirs, normalize_timestamp, public, \
storage_directory, hash_path, renamer, fallocate, fsync, fdatasync, \
split_path, drop_buffer_cache, get_logger, write_pickle, \
config_true_value, validate_device_partition, timing_stats, \
tpool_reraise
ThreadPool
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_object_creation, check_mount, \
check_float, check_utf8
@ -100,12 +101,13 @@ class DiskWriter(object):
requests. Serves as the context manager object for DiskFile's writer()
method.
"""
def __init__(self, disk_file, fd, tmppath):
def __init__(self, disk_file, fd, tmppath, threadpool):
self.disk_file = disk_file
self.fd = fd
self.tmppath = tmppath
self.upload_size = 0
self.last_sync = 0
self.threadpool = threadpool
def write(self, chunk):
"""
@ -113,16 +115,21 @@ class DiskWriter(object):
:param chunk: the chunk of data to write as a string object
"""
while chunk:
written = os.write(self.fd, chunk)
self.upload_size += written
chunk = chunk[written:]
# For large files sync every 512MB (by default) written
diff = self.upload_size - self.last_sync
if diff >= self.disk_file.bytes_per_sync:
tpool.execute(fdatasync, self.fd)
drop_buffer_cache(self.fd, self.last_sync, diff)
self.last_sync = self.upload_size
def _write_entire_chunk(chunk):
while chunk:
written = os.write(self.fd, chunk)
self.upload_size += written
chunk = chunk[written:]
self.threadpool.run_in_thread(_write_entire_chunk, chunk)
# For large files sync every 512MB (by default) written
diff = self.upload_size - self.last_sync
if diff >= self.disk_file.bytes_per_sync:
self.threadpool.force_run_in_thread(fdatasync, self.fd)
drop_buffer_cache(self.fd, self.last_sync, diff)
self.last_sync = self.upload_size
def put(self, metadata, extension='.data'):
"""
@ -136,22 +143,27 @@ class DiskWriter(object):
assert self.tmppath is not None
timestamp = normalize_timestamp(metadata['X-Timestamp'])
metadata['name'] = self.disk_file.name
# Write the metadata before calling fsync() so that both data and
# metadata are flushed to disk.
write_metadata(self.fd, metadata)
# We call fsync() before calling drop_cache() to lower the amount of
# redundant work the drop cache code will perform on the pages (now
# that after fsync the pages will be all clean).
tpool.execute(fsync, self.fd)
# From the Department of the Redundancy Department, make sure we
# call drop_cache() after fsync() to avoid redundant work (pages
# all clean).
drop_buffer_cache(self.fd, 0, self.upload_size)
invalidate_hash(os.path.dirname(self.disk_file.datadir))
# After the rename completes, this object will be available for other
# requests to reference.
renamer(self.tmppath,
os.path.join(self.disk_file.datadir, timestamp + extension))
def finalize_put():
# Write the metadata before calling fsync() so that both data and
# metadata are flushed to disk.
write_metadata(self.fd, metadata)
# We call fsync() before calling drop_cache() to lower the amount
# of redundant work the drop cache code will perform on the pages
# (now that after fsync the pages will be all clean).
fsync(self.fd)
# From the Department of the Redundancy Department, make sure
# we call drop_cache() after fsync() to avoid redundant work
# (pages all clean).
drop_buffer_cache(self.fd, 0, self.upload_size)
invalidate_hash(os.path.dirname(self.disk_file.datadir))
# After the rename completes, this object will be available for
# other requests to reference.
renamer(self.tmppath,
os.path.join(self.disk_file.datadir,
timestamp + extension))
self.threadpool.force_run_in_thread(finalize_put)
self.disk_file.metadata = metadata
@ -169,12 +181,15 @@ class DiskFile(object):
:param disk_chunk_size: size of chunks on file reads
:param bytes_per_sync: number of bytes between fdatasync calls
:param iter_hook: called when __iter__ returns a chunk
:param threadpool: thread pool in which to do blocking operations
:raises DiskFileCollision: on md5 collision
"""
def __init__(self, path, device, partition, account, container, obj,
logger, keep_data_fp=False, disk_chunk_size=65536,
bytes_per_sync=(512 * 1024 * 1024), iter_hook=None):
bytes_per_sync=(512 * 1024 * 1024), iter_hook=None,
threadpool=None):
self.disk_chunk_size = disk_chunk_size
self.bytes_per_sync = bytes_per_sync
self.iter_hook = iter_hook
@ -195,6 +210,7 @@ class DiskFile(object):
self.quarantined_dir = None
self.keep_cache = False
self.suppress_file_closing = False
self.threadpool = threadpool or ThreadPool(nthreads=0)
if not os.path.exists(self.datadir):
return
files = sorted(os.listdir(self.datadir), reverse=True)
@ -240,7 +256,8 @@ class DiskFile(object):
self.started_at_0 = True
self.iter_etag = md5()
while True:
chunk = self.fp.read(self.disk_chunk_size)
chunk = self.threadpool.run_in_thread(
self.fp.read, self.disk_chunk_size)
if chunk:
if self.iter_etag:
self.iter_etag.update(chunk)
@ -366,7 +383,7 @@ class DiskFile(object):
fallocate(fd, size)
except OSError:
raise DiskFileNoSpace()
yield DiskWriter(self, fd, tmppath)
yield DiskWriter(self, fd, tmppath, self.threadpool)
finally:
try:
os.close(fd)
@ -396,13 +413,16 @@ class DiskFile(object):
:param timestamp: timestamp to compare with each file
"""
timestamp = normalize_timestamp(timestamp)
for fname in os.listdir(self.datadir):
if fname < timestamp:
try:
os.unlink(os.path.join(self.datadir, fname))
except OSError, err: # pragma: no cover
if err.errno != errno.ENOENT:
raise
def _unlinkold():
for fname in os.listdir(self.datadir):
if fname < timestamp:
try:
os.unlink(os.path.join(self.datadir, fname))
except OSError, err: # pragma: no cover
if err.errno != errno.ENOENT:
raise
self.threadpool.run_in_thread(_unlinkold)
def _drop_cache(self, fd, offset, length):
"""Method for no-oping buffer cache drop method."""
@ -418,8 +438,8 @@ class DiskFile(object):
directory otherwise None
"""
if not (self.is_deleted() or self.quarantined_dir):
self.quarantined_dir = quarantine_renamer(self.device_path,
self.data_file)
self.quarantined_dir = self.threadpool.run_in_thread(
quarantine_renamer, self.device_path, self.data_file)
self.logger.increment('quarantines')
return self.quarantined_dir
@ -436,7 +456,8 @@ class DiskFile(object):
try:
file_size = 0
if self.data_file:
file_size = os.path.getsize(self.data_file)
file_size = self.threadpool.run_in_thread(
os.path.getsize, self.data_file)
if 'Content-Length' in self.metadata:
metadata_size = int(self.metadata['Content-Length'])
if file_size != metadata_size:
@ -486,6 +507,9 @@ class ObjectController(object):
allowed_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'POST']
self.replication_server = replication_server
self.allowed_methods = allowed_methods
self.threads_per_disk = int(conf.get('threads_per_disk', '0'))
self.threadpools = defaultdict(
lambda: ThreadPool(nthreads=self.threads_per_disk))
default_allowed_headers = '''
content-disposition,
content-encoding,
@ -547,7 +571,8 @@ class ObjectController(object):
async_dir = os.path.join(self.devices, objdevice, ASYNCDIR)
ohash = hash_path(account, container, obj)
self.logger.increment('async_pendings')
write_pickle(
self.threadpools[objdevice].run_in_thread(
write_pickle,
{'op': op, 'account': account, 'container': container,
'obj': obj, 'headers': headers_out},
os.path.join(async_dir, ohash[-3:], ohash + '-' +
@ -688,7 +713,8 @@ class ObjectController(object):
disk_file = DiskFile(self.devices, device, partition, account,
container, obj, self.logger,
disk_chunk_size=self.disk_chunk_size,
bytes_per_sync=self.bytes_per_sync)
bytes_per_sync=self.bytes_per_sync,
threadpool=self.threadpools[device])
if disk_file.is_deleted() or disk_file.is_expired():
return HTTPNotFound(request=request)
try:
@ -746,7 +772,8 @@ class ObjectController(object):
disk_file = DiskFile(self.devices, device, partition, account,
container, obj, self.logger,
disk_chunk_size=self.disk_chunk_size,
bytes_per_sync=self.bytes_per_sync)
bytes_per_sync=self.bytes_per_sync,
threadpool=self.threadpools[device])
old_delete_at = int(disk_file.metadata.get('X-Delete-At') or 0)
orig_timestamp = disk_file.metadata.get('X-Timestamp')
upload_expiration = time.time() + self.max_upload_time
@ -831,6 +858,7 @@ class ObjectController(object):
container, obj, self.logger, keep_data_fp=True,
disk_chunk_size=self.disk_chunk_size,
bytes_per_sync=self.bytes_per_sync,
threadpool=self.threadpools[device],
iter_hook=sleep)
if disk_file.is_deleted() or disk_file.is_expired():
if request.headers.get('if-match') == '*':
@ -913,7 +941,8 @@ class ObjectController(object):
disk_file = DiskFile(self.devices, device, partition, account,
container, obj, self.logger,
disk_chunk_size=self.disk_chunk_size,
bytes_per_sync=self.bytes_per_sync)
bytes_per_sync=self.bytes_per_sync,
threadpool=self.threadpools[device])
if disk_file.is_deleted() or disk_file.is_expired():
return HTTPNotFound(request=request)
try:
@ -958,7 +987,8 @@ class ObjectController(object):
disk_file = DiskFile(self.devices, device, partition, account,
container, obj, self.logger,
disk_chunk_size=self.disk_chunk_size,
bytes_per_sync=self.bytes_per_sync)
bytes_per_sync=self.bytes_per_sync,
threadpool=self.threadpools[device])
if 'x-if-delete-at' in request.headers and \
int(request.headers['x-if-delete-at']) != \
int(disk_file.metadata.get('X-Delete-At') or 0):
@ -1006,7 +1036,8 @@ class ObjectController(object):
if not os.path.exists(path):
mkdirs(path)
suffixes = suffix.split('-') if suffix else []
_junk, hashes = tpool_reraise(get_hashes, path, recalculate=suffixes)
_junk, hashes = self.threadpools[device].force_run_in_thread(
get_hashes, path, recalculate=suffixes)
return Response(body=pickle.dumps(hashes))
def __call__(self, env, start_response):

View File

@ -27,9 +27,9 @@ import re
import socket
import sys
from textwrap import dedent
import threading
import time
import unittest
from threading import Thread
from Queue import Queue, Empty
from getpass import getuser
from shutil import rmtree
@ -1582,7 +1582,7 @@ class TestStatsdLoggingDelegation(unittest.TestCase):
self.sock.bind(('localhost', 0))
self.port = self.sock.getsockname()[1]
self.queue = Queue()
self.reader_thread = Thread(target=self.statsd_reader)
self.reader_thread = threading.Thread(target=self.statsd_reader)
self.reader_thread.setDaemon(1)
self.reader_thread.start()
@ -1866,5 +1866,91 @@ class TestStatsdLoggingDelegation(unittest.TestCase):
self.assertEquals(called, [12345])
class TestThreadpool(unittest.TestCase):
def _thread_id(self):
return threading.current_thread().ident
def _capture_args(self, *args, **kwargs):
return {'args': args, 'kwargs': kwargs}
def _raise_valueerror(self):
return int('fishcakes')
def test_run_in_thread_with_threads(self):
tp = utils.ThreadPool(1)
my_id = self._thread_id()
other_id = tp.run_in_thread(self._thread_id)
self.assertNotEquals(my_id, other_id)
result = tp.run_in_thread(self._capture_args, 1, 2, bert='ernie')
self.assertEquals(result, {'args': (1, 2),
'kwargs': {'bert': 'ernie'}})
caught = False
try:
tp.run_in_thread(self._raise_valueerror)
except ValueError:
caught = True
self.assertTrue(caught)
def test_force_run_in_thread_with_threads(self):
# with nthreads > 0, force_run_in_thread looks just like run_in_thread
tp = utils.ThreadPool(1)
my_id = self._thread_id()
other_id = tp.force_run_in_thread(self._thread_id)
self.assertNotEquals(my_id, other_id)
result = tp.force_run_in_thread(self._capture_args, 1, 2, bert='ernie')
self.assertEquals(result, {'args': (1, 2),
'kwargs': {'bert': 'ernie'}})
caught = False
try:
tp.force_run_in_thread(self._raise_valueerror)
except ValueError:
caught = True
self.assertTrue(caught)
def test_run_in_thread_without_threads(self):
# with zero threads, run_in_thread doesn't actually do so
tp = utils.ThreadPool(0)
my_id = self._thread_id()
other_id = tp.run_in_thread(self._thread_id)
self.assertEquals(my_id, other_id)
result = tp.run_in_thread(self._capture_args, 1, 2, bert='ernie')
self.assertEquals(result, {'args': (1, 2),
'kwargs': {'bert': 'ernie'}})
caught = False
try:
tp.run_in_thread(self._raise_valueerror)
except ValueError:
caught = True
self.assertTrue(caught)
def test_force_run_in_thread_without_threads(self):
# with zero threads, force_run_in_thread uses eventlet.tpool
tp = utils.ThreadPool(0)
my_id = self._thread_id()
other_id = tp.force_run_in_thread(self._thread_id)
self.assertNotEquals(my_id, other_id)
result = tp.force_run_in_thread(self._capture_args, 1, 2, bert='ernie')
self.assertEquals(result, {'args': (1, 2),
'kwargs': {'bert': 'ernie'}})
caught = False
try:
tp.force_run_in_thread(self._raise_valueerror)
except ValueError:
caught = True
self.assertTrue(caught)
if __name__ == '__main__':
unittest.main()

View File

@ -49,13 +49,15 @@ class TestDiskFile(unittest.TestCase):
self.testdir = os.path.join(mkdtemp(), 'tmp_test_obj_server_DiskFile')
mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
def fake_exe(*args, **kwargs):
pass
self._real_tpool_execute = tpool.execute
def fake_exe(meth, *args, **kwargs):
return meth(*args, **kwargs)
tpool.execute = fake_exe
def tearDown(self):
""" Tear down for testing swift.object_server.ObjectController """
rmtree(os.path.dirname(self.testdir))
tpool.execute = self._real_tpool_execute
def _create_test_file(self, data, keep_data_fp=True):
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',