Merge "obj: Add option to tune down etag validation in object-server"
This commit is contained in:
commit
94d3a5dee8
@ -155,6 +155,14 @@ use = egg:swift#object
|
||||
# yielding.
|
||||
# cooperative_period = 0
|
||||
#
|
||||
# By default, the object-server will always validate the MD5 of object data
|
||||
# while streaming a complete object response. Occassionally this is identified
|
||||
# as a CPU bottleneck, consuming as much as 40% of the CPU time of the
|
||||
# object-server. Since range-request-heavy clients don't get these integrity
|
||||
# checks, it seems reasonable to give operators a chance to tune it down and
|
||||
# instead rely on the object-auditor to detect and quarantine corrupted objects.
|
||||
# etag_validate_pct = 100
|
||||
#
|
||||
# on PUTs, sync data every n MB
|
||||
# mb_per_sync = 512
|
||||
#
|
||||
|
@ -44,7 +44,7 @@ import logging
|
||||
import traceback
|
||||
import xattr
|
||||
from os.path import basename, dirname, exists, join, splitext
|
||||
from random import shuffle
|
||||
import random
|
||||
from tempfile import mkstemp
|
||||
from contextlib import contextmanager
|
||||
from collections import defaultdict
|
||||
@ -576,7 +576,7 @@ def object_audit_location_generator(devices, datadir, mount_check=True,
|
||||
device_dirs = list(
|
||||
set(listdir(devices)).intersection(set(device_dirs)))
|
||||
# randomize devices in case of process restart before sweep completed
|
||||
shuffle(device_dirs)
|
||||
random.shuffle(device_dirs)
|
||||
|
||||
base, policy = split_policy_string(datadir)
|
||||
for device in device_dirs:
|
||||
@ -2113,11 +2113,14 @@ class BaseDiskFileReader(object):
|
||||
:param keep_cache: should resulting reads be kept in the buffer cache
|
||||
:param cooperative_period: the period parameter when does cooperative
|
||||
yielding during file read
|
||||
:param etag_validate_frac: the probability that we should perform etag
|
||||
validation during a complete file read
|
||||
"""
|
||||
def __init__(self, fp, data_file, obj_size, etag,
|
||||
disk_chunk_size, keep_cache_size, device_path, logger,
|
||||
quarantine_hook, use_splice, pipe_size, diskfile,
|
||||
keep_cache=False, cooperative_period=0):
|
||||
keep_cache=False, cooperative_period=0,
|
||||
etag_validate_frac=1):
|
||||
# Parameter tracking
|
||||
self._fp = fp
|
||||
self._data_file = data_file
|
||||
@ -2137,6 +2140,7 @@ class BaseDiskFileReader(object):
|
||||
else:
|
||||
self._keep_cache = False
|
||||
self._cooperative_period = cooperative_period
|
||||
self._etag_validate_frac = etag_validate_frac
|
||||
|
||||
# Internal Attributes
|
||||
self._iter_etag = None
|
||||
@ -2154,7 +2158,8 @@ class BaseDiskFileReader(object):
|
||||
def _init_checks(self):
|
||||
if self._fp.tell() == 0:
|
||||
self._started_at_0 = True
|
||||
self._iter_etag = md5(usedforsecurity=False)
|
||||
if random.random() < self._etag_validate_frac:
|
||||
self._iter_etag = md5(usedforsecurity=False)
|
||||
|
||||
def _update_checks(self, chunk):
|
||||
if self._iter_etag:
|
||||
@ -2984,6 +2989,7 @@ class BaseDiskFile(object):
|
||||
return self.get_metadata()
|
||||
|
||||
def reader(self, keep_cache=False, cooperative_period=0,
|
||||
etag_validate_frac=1,
|
||||
_quarantine_hook=lambda m: None):
|
||||
"""
|
||||
Return a :class:`swift.common.swob.Response` class compatible
|
||||
@ -2997,6 +3003,8 @@ class BaseDiskFile(object):
|
||||
OS buffer cache
|
||||
:param cooperative_period: the period parameter for cooperative
|
||||
yielding during file read
|
||||
:param etag_validate_frac: the probability that we should perform etag
|
||||
validation during a complete file read
|
||||
:param _quarantine_hook: 1-arg callable called when obj quarantined;
|
||||
the arg is the reason for quarantine.
|
||||
Default is to ignore it.
|
||||
@ -3009,7 +3017,8 @@ class BaseDiskFile(object):
|
||||
self._manager.keep_cache_size, self._device_path, self._logger,
|
||||
use_splice=self._use_splice, quarantine_hook=_quarantine_hook,
|
||||
pipe_size=self._pipe_size, diskfile=self, keep_cache=keep_cache,
|
||||
cooperative_period=cooperative_period)
|
||||
cooperative_period=cooperative_period,
|
||||
etag_validate_frac=etag_validate_frac)
|
||||
# At this point the reader object is now responsible for closing
|
||||
# the file pointer.
|
||||
self._fp = None
|
||||
@ -3172,12 +3181,13 @@ class ECDiskFileReader(BaseDiskFileReader):
|
||||
def __init__(self, fp, data_file, obj_size, etag,
|
||||
disk_chunk_size, keep_cache_size, device_path, logger,
|
||||
quarantine_hook, use_splice, pipe_size, diskfile,
|
||||
keep_cache=False, cooperative_period=0):
|
||||
keep_cache=False, cooperative_period=0,
|
||||
etag_validate_frac=1):
|
||||
super(ECDiskFileReader, self).__init__(
|
||||
fp, data_file, obj_size, etag,
|
||||
disk_chunk_size, keep_cache_size, device_path, logger,
|
||||
quarantine_hook, use_splice, pipe_size, diskfile, keep_cache,
|
||||
cooperative_period)
|
||||
cooperative_period, etag_validate_frac)
|
||||
self.frag_buf = None
|
||||
self.frag_offset = 0
|
||||
self.frag_size = self._diskfile.policy.fragment_size
|
||||
|
@ -426,7 +426,8 @@ class DiskFile(object):
|
||||
with self.open(current_time=current_time):
|
||||
return self.get_metadata()
|
||||
|
||||
def reader(self, keep_cache=False, cooperative_period=0):
|
||||
def reader(self, keep_cache=False, cooperative_period=0,
|
||||
etag_validate_frac=1):
|
||||
"""
|
||||
Return a swift.common.swob.Response class compatible "app_iter"
|
||||
object. The responsibility of closing the open file is passed to the
|
||||
@ -434,6 +435,7 @@ class DiskFile(object):
|
||||
|
||||
:param keep_cache:
|
||||
:param cooperative_period:
|
||||
:param etag_validate_frac:
|
||||
"""
|
||||
dr = DiskFileReader(self._name, self._fp,
|
||||
int(self._metadata['Content-Length']),
|
||||
|
@ -30,7 +30,7 @@ from eventlet import sleep, wsgi, Timeout, tpool
|
||||
from eventlet.greenthread import spawn
|
||||
|
||||
from swift.common.utils import public, get_logger, \
|
||||
config_true_value, timing_stats, replication, \
|
||||
config_true_value, config_percent_value, timing_stats, replication, \
|
||||
normalize_delete_at_timestamp, get_log_line, Timestamp, \
|
||||
get_expirer_container, parse_mime_headers, \
|
||||
iter_multipart_mime_documents, extract_swift_bytes, safe_json_loads, \
|
||||
@ -157,6 +157,8 @@ class ObjectController(BaseStorageServer):
|
||||
self.keep_cache_slo_manifest = \
|
||||
config_true_value(conf.get('keep_cache_slo_manifest', 'false'))
|
||||
self.cooperative_period = int(conf.get("cooperative_period", 0))
|
||||
self.etag_validate_frac = config_percent_value(
|
||||
conf.get("etag_validate_pct", 100))
|
||||
|
||||
default_allowed_headers = '''
|
||||
content-disposition,
|
||||
@ -1126,6 +1128,7 @@ class ObjectController(BaseStorageServer):
|
||||
app_iter = disk_file.reader(
|
||||
keep_cache=keep_cache,
|
||||
cooperative_period=self.cooperative_period,
|
||||
etag_validate_frac=self.etag_validate_frac,
|
||||
)
|
||||
response = Response(
|
||||
app_iter=app_iter, request=request,
|
||||
|
@ -4268,6 +4268,56 @@ class TestObjectController(BaseTestCase):
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
|
||||
def test_GET_no_etag_validation(self):
|
||||
conf = {'devices': self.testdir, 'mount_check': 'false',
|
||||
'container_update_timeout': 0.0,
|
||||
'etag_validate_pct': '0'}
|
||||
object_controller = object_server.ObjectController(
|
||||
conf, logger=self.logger)
|
||||
timestamp = normalize_timestamp(time())
|
||||
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': timestamp,
|
||||
'Content-Type': 'application/x-test'})
|
||||
req.body = b'VERIFY'
|
||||
resp = req.get_response(object_controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
disk_file = self.df_mgr.get_diskfile('sda1', 'p', 'a', 'c', 'o',
|
||||
policy=POLICIES.legacy)
|
||||
disk_file.open()
|
||||
file_name = os.path.basename(disk_file._data_file)
|
||||
bad_etag = md5(b'VERIF', usedforsecurity=False).hexdigest()
|
||||
metadata = {'X-Timestamp': timestamp, 'name': '/a/c/o',
|
||||
'Content-Length': 6, 'ETag': bad_etag}
|
||||
diskfile.write_metadata(disk_file._fp, metadata)
|
||||
self.assertEqual(os.listdir(disk_file._datadir)[0], file_name)
|
||||
req = Request.blank('/sda1/p/a/c/o')
|
||||
resp = req.get_response(object_controller)
|
||||
quar_dir = os.path.join(
|
||||
self.testdir, 'sda1', 'quarantined', 'objects',
|
||||
os.path.basename(os.path.dirname(disk_file._data_file)))
|
||||
self.assertEqual(os.listdir(disk_file._datadir)[0], file_name)
|
||||
body = resp.body
|
||||
self.assertEqual(body, b'VERIFY')
|
||||
self.assertEqual(resp.headers['Etag'], '"%s"' % bad_etag)
|
||||
# Didn't quarantine!
|
||||
self.assertFalse(os.path.exists(quar_dir))
|
||||
req = Request.blank('/sda1/p/a/c/o')
|
||||
resp = req.get_response(object_controller)
|
||||
body = resp.body
|
||||
self.assertEqual(body, b'VERIFY')
|
||||
self.assertEqual(resp.headers['Etag'], '"%s"' % bad_etag)
|
||||
|
||||
# If there's a size mismatch, though, we *should* quarantine
|
||||
metadata = {'X-Timestamp': timestamp, 'name': '/a/c/o',
|
||||
'Content-Length': 5, 'ETag': bad_etag}
|
||||
diskfile.write_metadata(disk_file._fp, metadata)
|
||||
self.assertEqual(os.listdir(disk_file._datadir)[0], file_name)
|
||||
req = Request.blank('/sda1/p/a/c/o')
|
||||
resp = req.get_response(object_controller)
|
||||
self.assertEqual('404 Not Found', resp.status)
|
||||
self.assertFalse(os.path.exists(disk_file._datadir))
|
||||
self.assertTrue(os.path.exists(quar_dir))
|
||||
|
||||
def test_GET_quarantine_zbyte(self):
|
||||
# Test swift.obj.server.ObjectController.GET
|
||||
timestamp = normalize_timestamp(time())
|
||||
@ -4379,7 +4429,7 @@ class TestObjectController(BaseTestCase):
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=True, cooperative_period=0)
|
||||
keep_cache=True, cooperative_period=0, etag_validate_frac=1)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest()
|
||||
self.assertEqual(dict(resp.headers), {
|
||||
@ -4403,7 +4453,7 @@ class TestObjectController(BaseTestCase):
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=True, cooperative_period=0)
|
||||
keep_cache=True, cooperative_period=0, etag_validate_frac=1)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
# Request headers have 'X-Storage-Token'.
|
||||
@ -4414,7 +4464,7 @@ class TestObjectController(BaseTestCase):
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=True, cooperative_period=0)
|
||||
keep_cache=True, cooperative_period=0, etag_validate_frac=1)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
# Request headers have both 'X-Auth-Token' and 'X-Storage-Token'.
|
||||
@ -4426,7 +4476,7 @@ class TestObjectController(BaseTestCase):
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=True, cooperative_period=0)
|
||||
keep_cache=True, cooperative_period=0, etag_validate_frac=1)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
def test_GET_keep_cache_private_config_false(self):
|
||||
@ -4455,7 +4505,7 @@ class TestObjectController(BaseTestCase):
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=True, cooperative_period=0)
|
||||
keep_cache=True, cooperative_period=0, etag_validate_frac=1.0)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest()
|
||||
self.assertEqual(dict(resp.headers), {
|
||||
@ -4479,7 +4529,7 @@ class TestObjectController(BaseTestCase):
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=False, cooperative_period=0)
|
||||
keep_cache=False, cooperative_period=0, etag_validate_frac=1)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
# Request headers have 'X-Storage-Token'.
|
||||
@ -4490,7 +4540,7 @@ class TestObjectController(BaseTestCase):
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=False, cooperative_period=0)
|
||||
keep_cache=False, cooperative_period=0, etag_validate_frac=1)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
# Request headers have both 'X-Auth-Token' and 'X-Storage-Token'.
|
||||
@ -4502,7 +4552,7 @@ class TestObjectController(BaseTestCase):
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=False, cooperative_period=0)
|
||||
keep_cache=False, cooperative_period=0, etag_validate_frac=1)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
def test_GET_keep_cache_slo_manifest_no_config(self):
|
||||
@ -4533,7 +4583,7 @@ class TestObjectController(BaseTestCase):
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=False, cooperative_period=0)
|
||||
keep_cache=False, cooperative_period=0, etag_validate_frac=1)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest()
|
||||
self.assertEqual(dict(resp.headers), {
|
||||
@ -4579,7 +4629,7 @@ class TestObjectController(BaseTestCase):
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=False, cooperative_period=0)
|
||||
keep_cache=False, cooperative_period=0, etag_validate_frac=1)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest()
|
||||
self.assertEqual(dict(resp.headers), {
|
||||
@ -4625,7 +4675,7 @@ class TestObjectController(BaseTestCase):
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=True, cooperative_period=0)
|
||||
keep_cache=True, cooperative_period=0, etag_validate_frac=1)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest()
|
||||
self.assertEqual(dict(resp.headers), {
|
||||
@ -4670,7 +4720,7 @@ class TestObjectController(BaseTestCase):
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=False, cooperative_period=0)
|
||||
keep_cache=False, cooperative_period=0, etag_validate_frac=1)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest()
|
||||
self.assertEqual(dict(resp.headers), {
|
||||
@ -4709,7 +4759,8 @@ class TestObjectController(BaseTestCase):
|
||||
"swift.obj.diskfile.BaseDiskFile.reader"
|
||||
) as reader_mock:
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(keep_cache=False, cooperative_period=99)
|
||||
reader_mock.assert_called_with(keep_cache=False, cooperative_period=99,
|
||||
etag_validate_frac=1.0)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
# Test DiskFile reader actually sleeps when reading chunks. When
|
||||
|
Loading…
x
Reference in New Issue
Block a user