shuffle nodes and cache files for public access performance
This commit is contained in:
@@ -51,6 +51,7 @@ ASYNCDIR = 'async_pending'
|
|||||||
PICKLE_PROTOCOL = 2
|
PICKLE_PROTOCOL = 2
|
||||||
METADATA_KEY = 'user.swift.metadata'
|
METADATA_KEY = 'user.swift.metadata'
|
||||||
MAX_OBJECT_NAME_LENGTH = 1024
|
MAX_OBJECT_NAME_LENGTH = 1024
|
||||||
|
KEEP_CACHE_SIZE = (5 * 1024 * 1024)
|
||||||
|
|
||||||
|
|
||||||
def read_metadata(fd):
|
def read_metadata(fd):
|
||||||
@@ -113,6 +114,7 @@ class DiskFile(object):
|
|||||||
self.meta_file = None
|
self.meta_file = None
|
||||||
self.data_file = None
|
self.data_file = None
|
||||||
self.fp = None
|
self.fp = None
|
||||||
|
self.keep_cache = False
|
||||||
if not os.path.exists(self.datadir):
|
if not os.path.exists(self.datadir):
|
||||||
return
|
return
|
||||||
files = sorted(os.listdir(self.datadir), reverse=True)
|
files = sorted(os.listdir(self.datadir), reverse=True)
|
||||||
@@ -150,12 +152,12 @@ class DiskFile(object):
|
|||||||
if chunk:
|
if chunk:
|
||||||
read += len(chunk)
|
read += len(chunk)
|
||||||
if read - dropped_cache > (1024 * 1024):
|
if read - dropped_cache > (1024 * 1024):
|
||||||
drop_buffer_cache(self.fp.fileno(), dropped_cache,
|
self.drop_cache(self.fp.fileno(), dropped_cache,
|
||||||
read - dropped_cache)
|
read - dropped_cache)
|
||||||
dropped_cache = read
|
dropped_cache = read
|
||||||
yield chunk
|
yield chunk
|
||||||
else:
|
else:
|
||||||
drop_buffer_cache(self.fp.fileno(), dropped_cache,
|
self.drop_cache(self.fp.fileno(), dropped_cache,
|
||||||
read - dropped_cache)
|
read - dropped_cache)
|
||||||
break
|
break
|
||||||
finally:
|
finally:
|
||||||
@@ -226,7 +228,7 @@ class DiskFile(object):
|
|||||||
timestamp = normalize_timestamp(metadata['X-Timestamp'])
|
timestamp = normalize_timestamp(metadata['X-Timestamp'])
|
||||||
write_metadata(fd, metadata)
|
write_metadata(fd, metadata)
|
||||||
if 'Content-Length' in metadata:
|
if 'Content-Length' in metadata:
|
||||||
drop_buffer_cache(fd, 0, int(metadata['Content-Length']))
|
self.drop_cache(fd, 0, int(metadata['Content-Length']))
|
||||||
tpool.execute(os.fsync, fd)
|
tpool.execute(os.fsync, fd)
|
||||||
invalidate_hash(os.path.dirname(self.datadir))
|
invalidate_hash(os.path.dirname(self.datadir))
|
||||||
renamer(tmppath, os.path.join(self.datadir, timestamp + extension))
|
renamer(tmppath, os.path.join(self.datadir, timestamp + extension))
|
||||||
@@ -248,6 +250,11 @@ class DiskFile(object):
|
|||||||
if err.errno != errno.ENOENT:
|
if err.errno != errno.ENOENT:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
def drop_cache(self, fd, offset, length):
|
||||||
|
"""Method for no-oping buffer cache drop method."""
|
||||||
|
if not self.keep_cache:
|
||||||
|
drop_buffer_cache(fd, offset, length)
|
||||||
|
|
||||||
|
|
||||||
class ObjectController(object):
|
class ObjectController(object):
|
||||||
"""Implements the WSGI application for the Swift Object Server."""
|
"""Implements the WSGI application for the Swift Object Server."""
|
||||||
@@ -482,6 +489,10 @@ class ObjectController(object):
|
|||||||
response.etag = file.metadata['ETag']
|
response.etag = file.metadata['ETag']
|
||||||
response.last_modified = float(file.metadata['X-Timestamp'])
|
response.last_modified = float(file.metadata['X-Timestamp'])
|
||||||
response.content_length = int(file.metadata['Content-Length'])
|
response.content_length = int(file.metadata['Content-Length'])
|
||||||
|
if response.content_length < KEEP_CACHE_SIZE and \
|
||||||
|
'X-Auth-Token' not in request.headers and \
|
||||||
|
'X-Storage-Token' not in request.headers:
|
||||||
|
file.keep_cache = True
|
||||||
if 'Content-Encoding' in file.metadata:
|
if 'Content-Encoding' in file.metadata:
|
||||||
response.content_encoding = file.metadata['Content-Encoding']
|
response.content_encoding = file.metadata['Content-Encoding']
|
||||||
return request.get_response(response)
|
return request.get_response(response)
|
||||||
|
@@ -29,6 +29,7 @@ from urllib import unquote, quote
|
|||||||
import uuid
|
import uuid
|
||||||
import functools
|
import functools
|
||||||
from hashlib import md5
|
from hashlib import md5
|
||||||
|
from random import shuffle
|
||||||
|
|
||||||
from eventlet import sleep
|
from eventlet import sleep
|
||||||
from eventlet.timeout import Timeout
|
from eventlet.timeout import Timeout
|
||||||
@@ -707,6 +708,7 @@ class ObjectController(Controller):
|
|||||||
return aresp
|
return aresp
|
||||||
partition, nodes = self.app.object_ring.get_nodes(
|
partition, nodes = self.app.object_ring.get_nodes(
|
||||||
self.account_name, self.container_name, self.object_name)
|
self.account_name, self.container_name, self.object_name)
|
||||||
|
shuffle(nodes)
|
||||||
resp = self.GETorHEAD_base(req, _('Object'), partition,
|
resp = self.GETorHEAD_base(req, _('Object'), partition,
|
||||||
self.iter_nodes(partition, nodes, self.app.object_ring),
|
self.iter_nodes(partition, nodes, self.app.object_ring),
|
||||||
req.path_info, self.app.object_ring.replica_count)
|
req.path_info, self.app.object_ring.replica_count)
|
||||||
|
@@ -1044,6 +1044,7 @@ class TestObjectController(unittest.TestCase):
|
|||||||
|
|
||||||
def test_error_limiting(self):
|
def test_error_limiting(self):
|
||||||
with save_globals():
|
with save_globals():
|
||||||
|
proxy_server.shuffle = lambda l: None
|
||||||
controller = proxy_server.ObjectController(self.app, 'account',
|
controller = proxy_server.ObjectController(self.app, 'account',
|
||||||
'container', 'object')
|
'container', 'object')
|
||||||
self.assert_status_map(controller.HEAD, (503, 200, 200), 200)
|
self.assert_status_map(controller.HEAD, (503, 200, 200), 200)
|
||||||
|
Reference in New Issue
Block a user