obj: Add option to tune down etag validation in object-server
Historically, the object-server would validate the ETag of an object whenever it was streaming the complete object. This minimizes the possibility of returning corrupted data to clients, but - Clients that only ever make ranged requests get no benefit and - MD5 can be rather CPU-intensive; this is especially noticeable in all-flash clusters/policies where Swift is not disk-constrained. Add a new `etag_validate_pct` option to tune down this validation. This takes values from 100 (default; all whole-object downloads are validated) down to 0 (none are). Note that even with etag validation turned off, the object-auditor should eventually detect and quarantine corrupted objects. However, transient read errors may cause clients to download corrupted data. Hat-tip to Jianjian for all the profiling work! Co-Authored-By: Jianjian Huo <jhuo@nvidia.com> Change-Id: Iae48e8db642f6772114c0ae7c6bdd9c653cd035b
This commit is contained in:
@@ -154,6 +154,14 @@ use = egg:swift#object
|
||||
# chunk reads. A value of '0' (the default) will turn off cooperative yielding.
|
||||
# cooperative_period = 0
|
||||
#
|
||||
# By default, the object-server will always validate the MD5 of object data
|
||||
# while streaming a complete object response. Occassionally this is identified
|
||||
# as a CPU bottleneck, consuming as much as 40% of the CPU time of the
|
||||
# object-server. Since range-request-heavy clients don't get these integrity
|
||||
# checks, it seems reasonable to give operators a chance to tune it down and
|
||||
# instead rely on the object-auditor to detect and quarantine corrupted objects.
|
||||
# etag_validate_pct = 100
|
||||
#
|
||||
# on PUTs, sync data every n MB
|
||||
# mb_per_sync = 512
|
||||
#
|
||||
|
||||
@@ -44,7 +44,7 @@ import logging
|
||||
import traceback
|
||||
import xattr
|
||||
from os.path import basename, dirname, exists, join, splitext
|
||||
from random import shuffle
|
||||
import random
|
||||
from tempfile import mkstemp
|
||||
from contextlib import contextmanager
|
||||
from collections import defaultdict
|
||||
@@ -576,7 +576,7 @@ def object_audit_location_generator(devices, datadir, mount_check=True,
|
||||
device_dirs = list(
|
||||
set(listdir(devices)).intersection(set(device_dirs)))
|
||||
# randomize devices in case of process restart before sweep completed
|
||||
shuffle(device_dirs)
|
||||
random.shuffle(device_dirs)
|
||||
|
||||
base, policy = split_policy_string(datadir)
|
||||
for device in device_dirs:
|
||||
@@ -2112,11 +2112,14 @@ class BaseDiskFileReader(object):
|
||||
:param keep_cache: should resulting reads be kept in the buffer cache
|
||||
:param cooperative_period: the period parameter when does cooperative
|
||||
yielding during file read
|
||||
:param etag_validate_frac: the probability that we should perform etag
|
||||
validation during a complete file read
|
||||
"""
|
||||
def __init__(self, fp, data_file, obj_size, etag,
|
||||
disk_chunk_size, keep_cache_size, device_path, logger,
|
||||
quarantine_hook, use_splice, pipe_size, diskfile,
|
||||
keep_cache=False, cooperative_period=0):
|
||||
keep_cache=False, cooperative_period=0,
|
||||
etag_validate_frac=1):
|
||||
# Parameter tracking
|
||||
self._fp = fp
|
||||
self._data_file = data_file
|
||||
@@ -2136,6 +2139,7 @@ class BaseDiskFileReader(object):
|
||||
else:
|
||||
self._keep_cache = False
|
||||
self._cooperative_period = cooperative_period
|
||||
self._etag_validate_frac = etag_validate_frac
|
||||
|
||||
# Internal Attributes
|
||||
self._iter_etag = None
|
||||
@@ -2153,7 +2157,8 @@ class BaseDiskFileReader(object):
|
||||
def _init_checks(self):
|
||||
if self._fp.tell() == 0:
|
||||
self._started_at_0 = True
|
||||
self._iter_etag = md5(usedforsecurity=False)
|
||||
if random.random() < self._etag_validate_frac:
|
||||
self._iter_etag = md5(usedforsecurity=False)
|
||||
|
||||
def _update_checks(self, chunk):
|
||||
if self._iter_etag:
|
||||
@@ -2983,6 +2988,7 @@ class BaseDiskFile(object):
|
||||
return self.get_metadata()
|
||||
|
||||
def reader(self, keep_cache=False, cooperative_period=0,
|
||||
etag_validate_frac=1,
|
||||
_quarantine_hook=lambda m: None):
|
||||
"""
|
||||
Return a :class:`swift.common.swob.Response` class compatible
|
||||
@@ -2996,6 +3002,8 @@ class BaseDiskFile(object):
|
||||
OS buffer cache
|
||||
:param cooperative_period: the period parameter for cooperative
|
||||
yielding during file read
|
||||
:param etag_validate_frac: the probability that we should perform etag
|
||||
validation during a complete file read
|
||||
:param _quarantine_hook: 1-arg callable called when obj quarantined;
|
||||
the arg is the reason for quarantine.
|
||||
Default is to ignore it.
|
||||
@@ -3008,7 +3016,8 @@ class BaseDiskFile(object):
|
||||
self._manager.keep_cache_size, self._device_path, self._logger,
|
||||
use_splice=self._use_splice, quarantine_hook=_quarantine_hook,
|
||||
pipe_size=self._pipe_size, diskfile=self, keep_cache=keep_cache,
|
||||
cooperative_period=cooperative_period)
|
||||
cooperative_period=cooperative_period,
|
||||
etag_validate_frac=etag_validate_frac)
|
||||
# At this point the reader object is now responsible for closing
|
||||
# the file pointer.
|
||||
self._fp = None
|
||||
@@ -3171,12 +3180,13 @@ class ECDiskFileReader(BaseDiskFileReader):
|
||||
def __init__(self, fp, data_file, obj_size, etag,
|
||||
disk_chunk_size, keep_cache_size, device_path, logger,
|
||||
quarantine_hook, use_splice, pipe_size, diskfile,
|
||||
keep_cache=False, cooperative_period=0):
|
||||
keep_cache=False, cooperative_period=0,
|
||||
etag_validate_frac=1):
|
||||
super(ECDiskFileReader, self).__init__(
|
||||
fp, data_file, obj_size, etag,
|
||||
disk_chunk_size, keep_cache_size, device_path, logger,
|
||||
quarantine_hook, use_splice, pipe_size, diskfile, keep_cache,
|
||||
cooperative_period)
|
||||
cooperative_period, etag_validate_frac)
|
||||
self.frag_buf = None
|
||||
self.frag_offset = 0
|
||||
self.frag_size = self._diskfile.policy.fragment_size
|
||||
|
||||
@@ -426,7 +426,8 @@ class DiskFile(object):
|
||||
with self.open(current_time=current_time):
|
||||
return self.get_metadata()
|
||||
|
||||
def reader(self, keep_cache=False, cooperative_period=0):
|
||||
def reader(self, keep_cache=False, cooperative_period=0,
|
||||
etag_validate_frac=1):
|
||||
"""
|
||||
Return a swift.common.swob.Response class compatible "app_iter"
|
||||
object. The responsibility of closing the open file is passed to the
|
||||
@@ -434,6 +435,7 @@ class DiskFile(object):
|
||||
|
||||
:param keep_cache:
|
||||
:param cooperative_period:
|
||||
:param etag_validate_frac:
|
||||
"""
|
||||
dr = DiskFileReader(self._name, self._fp,
|
||||
int(self._metadata['Content-Length']),
|
||||
|
||||
@@ -30,7 +30,7 @@ from eventlet import sleep, wsgi, Timeout, tpool
|
||||
from eventlet.greenthread import spawn
|
||||
|
||||
from swift.common.utils import public, get_logger, \
|
||||
config_true_value, timing_stats, replication, \
|
||||
config_true_value, config_percent_value, timing_stats, replication, \
|
||||
normalize_delete_at_timestamp, get_log_line, Timestamp, \
|
||||
get_expirer_container, parse_mime_headers, \
|
||||
iter_multipart_mime_documents, extract_swift_bytes, safe_json_loads, \
|
||||
@@ -157,6 +157,8 @@ class ObjectController(BaseStorageServer):
|
||||
self.keep_cache_slo_manifest = \
|
||||
config_true_value(conf.get('keep_cache_slo_manifest', 'false'))
|
||||
self.cooperative_period = int(conf.get("cooperative_period", 0))
|
||||
self.etag_validate_frac = config_percent_value(
|
||||
conf.get("etag_validate_pct", 100))
|
||||
|
||||
default_allowed_headers = '''
|
||||
content-disposition,
|
||||
@@ -1119,6 +1121,7 @@ class ObjectController(BaseStorageServer):
|
||||
app_iter = disk_file.reader(
|
||||
keep_cache=keep_cache,
|
||||
cooperative_period=self.cooperative_period,
|
||||
etag_validate_frac=self.etag_validate_frac,
|
||||
)
|
||||
response = Response(
|
||||
app_iter=app_iter, request=request,
|
||||
|
||||
@@ -4267,6 +4267,56 @@ class TestObjectController(BaseTestCase):
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
|
||||
def test_GET_no_etag_validation(self):
|
||||
conf = {'devices': self.testdir, 'mount_check': 'false',
|
||||
'container_update_timeout': 0.0,
|
||||
'etag_validate_pct': '0'}
|
||||
object_controller = object_server.ObjectController(
|
||||
conf, logger=self.logger)
|
||||
timestamp = normalize_timestamp(time())
|
||||
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': timestamp,
|
||||
'Content-Type': 'application/x-test'})
|
||||
req.body = b'VERIFY'
|
||||
resp = req.get_response(object_controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
disk_file = self.df_mgr.get_diskfile('sda1', 'p', 'a', 'c', 'o',
|
||||
policy=POLICIES.legacy)
|
||||
disk_file.open()
|
||||
file_name = os.path.basename(disk_file._data_file)
|
||||
bad_etag = md5(b'VERIF', usedforsecurity=False).hexdigest()
|
||||
metadata = {'X-Timestamp': timestamp, 'name': '/a/c/o',
|
||||
'Content-Length': 6, 'ETag': bad_etag}
|
||||
diskfile.write_metadata(disk_file._fp, metadata)
|
||||
self.assertEqual(os.listdir(disk_file._datadir)[0], file_name)
|
||||
req = Request.blank('/sda1/p/a/c/o')
|
||||
resp = req.get_response(object_controller)
|
||||
quar_dir = os.path.join(
|
||||
self.testdir, 'sda1', 'quarantined', 'objects',
|
||||
os.path.basename(os.path.dirname(disk_file._data_file)))
|
||||
self.assertEqual(os.listdir(disk_file._datadir)[0], file_name)
|
||||
body = resp.body
|
||||
self.assertEqual(body, b'VERIFY')
|
||||
self.assertEqual(resp.headers['Etag'], '"%s"' % bad_etag)
|
||||
# Didn't quarantine!
|
||||
self.assertFalse(os.path.exists(quar_dir))
|
||||
req = Request.blank('/sda1/p/a/c/o')
|
||||
resp = req.get_response(object_controller)
|
||||
body = resp.body
|
||||
self.assertEqual(body, b'VERIFY')
|
||||
self.assertEqual(resp.headers['Etag'], '"%s"' % bad_etag)
|
||||
|
||||
# If there's a size mismatch, though, we *should* quarantine
|
||||
metadata = {'X-Timestamp': timestamp, 'name': '/a/c/o',
|
||||
'Content-Length': 5, 'ETag': bad_etag}
|
||||
diskfile.write_metadata(disk_file._fp, metadata)
|
||||
self.assertEqual(os.listdir(disk_file._datadir)[0], file_name)
|
||||
req = Request.blank('/sda1/p/a/c/o')
|
||||
resp = req.get_response(object_controller)
|
||||
self.assertEqual('404 Not Found', resp.status)
|
||||
self.assertFalse(os.path.exists(disk_file._datadir))
|
||||
self.assertTrue(os.path.exists(quar_dir))
|
||||
|
||||
def test_GET_quarantine_zbyte(self):
|
||||
# Test swift.obj.server.ObjectController.GET
|
||||
timestamp = normalize_timestamp(time())
|
||||
@@ -4378,7 +4428,7 @@ class TestObjectController(BaseTestCase):
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=True, cooperative_period=0)
|
||||
keep_cache=True, cooperative_period=0, etag_validate_frac=1)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest()
|
||||
self.assertEqual(dict(resp.headers), {
|
||||
@@ -4402,7 +4452,7 @@ class TestObjectController(BaseTestCase):
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=True, cooperative_period=0)
|
||||
keep_cache=True, cooperative_period=0, etag_validate_frac=1)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
# Request headers have 'X-Storage-Token'.
|
||||
@@ -4413,7 +4463,7 @@ class TestObjectController(BaseTestCase):
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=True, cooperative_period=0)
|
||||
keep_cache=True, cooperative_period=0, etag_validate_frac=1)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
# Request headers have both 'X-Auth-Token' and 'X-Storage-Token'.
|
||||
@@ -4425,7 +4475,7 @@ class TestObjectController(BaseTestCase):
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=True, cooperative_period=0)
|
||||
keep_cache=True, cooperative_period=0, etag_validate_frac=1)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
def test_GET_keep_cache_private_config_false(self):
|
||||
@@ -4454,7 +4504,7 @@ class TestObjectController(BaseTestCase):
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=True, cooperative_period=0)
|
||||
keep_cache=True, cooperative_period=0, etag_validate_frac=1.0)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest()
|
||||
self.assertEqual(dict(resp.headers), {
|
||||
@@ -4478,7 +4528,7 @@ class TestObjectController(BaseTestCase):
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=False, cooperative_period=0)
|
||||
keep_cache=False, cooperative_period=0, etag_validate_frac=1)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
# Request headers have 'X-Storage-Token'.
|
||||
@@ -4489,7 +4539,7 @@ class TestObjectController(BaseTestCase):
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=False, cooperative_period=0)
|
||||
keep_cache=False, cooperative_period=0, etag_validate_frac=1)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
# Request headers have both 'X-Auth-Token' and 'X-Storage-Token'.
|
||||
@@ -4501,7 +4551,7 @@ class TestObjectController(BaseTestCase):
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=False, cooperative_period=0)
|
||||
keep_cache=False, cooperative_period=0, etag_validate_frac=1)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
def test_GET_keep_cache_slo_manifest_no_config(self):
|
||||
@@ -4532,7 +4582,7 @@ class TestObjectController(BaseTestCase):
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=False, cooperative_period=0)
|
||||
keep_cache=False, cooperative_period=0, etag_validate_frac=1)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest()
|
||||
self.assertEqual(dict(resp.headers), {
|
||||
@@ -4578,7 +4628,7 @@ class TestObjectController(BaseTestCase):
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=False, cooperative_period=0)
|
||||
keep_cache=False, cooperative_period=0, etag_validate_frac=1)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest()
|
||||
self.assertEqual(dict(resp.headers), {
|
||||
@@ -4624,7 +4674,7 @@ class TestObjectController(BaseTestCase):
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=True, cooperative_period=0)
|
||||
keep_cache=True, cooperative_period=0, etag_validate_frac=1)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest()
|
||||
self.assertEqual(dict(resp.headers), {
|
||||
@@ -4669,7 +4719,7 @@ class TestObjectController(BaseTestCase):
|
||||
with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock):
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(
|
||||
keep_cache=False, cooperative_period=0)
|
||||
keep_cache=False, cooperative_period=0, etag_validate_frac=1)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest()
|
||||
self.assertEqual(dict(resp.headers), {
|
||||
@@ -4708,7 +4758,8 @@ class TestObjectController(BaseTestCase):
|
||||
"swift.obj.diskfile.BaseDiskFile.reader"
|
||||
) as reader_mock:
|
||||
resp = req.get_response(obj_controller)
|
||||
reader_mock.assert_called_with(keep_cache=False, cooperative_period=99)
|
||||
reader_mock.assert_called_with(keep_cache=False, cooperative_period=99,
|
||||
etag_validate_frac=1.0)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
# Test DiskFile reader actually sleeps when reading chunks. When
|
||||
|
||||
Reference in New Issue
Block a user