refactor the quarantine
This commit is contained in:
@@ -179,9 +179,9 @@ def mkdirs(path):
|
||||
raise
|
||||
|
||||
|
||||
def renamer(old, new): # pragma: no cover
|
||||
def renamer(old, new):
|
||||
"""
|
||||
Attempt to fix^H^H^Hhide race conditions like empty object directories
|
||||
Attempt to fix / hide race conditions like empty object directories
|
||||
being removed by backend processes during uploads, by retrying.
|
||||
|
||||
:param old: old path to be renamed
|
||||
|
||||
@@ -149,17 +149,8 @@ class AuditorWorker(object):
|
||||
self.quarantines += 1
|
||||
self.logger.error(_('ERROR Object %(obj)s failed audit and will '
|
||||
'be quarantined: %(err)s'), {'obj': path, 'err': err})
|
||||
object_dir = os.path.dirname(path)
|
||||
invalidate_hash(os.path.dirname(object_dir))
|
||||
renamer_path = os.path.dirname(path)
|
||||
to_path = os.path.join(self.devices, device, 'quarantined',
|
||||
'objects', os.path.basename(renamer_path))
|
||||
try:
|
||||
renamer(renamer_path, to_path)
|
||||
except OSError, e:
|
||||
if e.errno == errno.EEXIST:
|
||||
to_path = "%s-%s" % (to_path, uuid.uuid4().hex)
|
||||
renamer(renamer_path, to_path)
|
||||
object_server.DiskFile.quarantine(
|
||||
os.path.join(self.devices, device), path)
|
||||
return
|
||||
except Exception:
|
||||
self.errors += 1
|
||||
|
||||
@@ -21,6 +21,7 @@ import errno
|
||||
import os
|
||||
import time
|
||||
import traceback
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from hashlib import md5
|
||||
from tempfile import mkstemp
|
||||
@@ -109,6 +110,7 @@ class DiskFile(object):
|
||||
name_hash = hash_path(account, container, obj)
|
||||
self.datadir = os.path.join(path, device,
|
||||
storage_directory(DATADIR, partition, name_hash))
|
||||
self.device_path = os.path.join(path, device)
|
||||
self.tmpdir = os.path.join(path, device, 'tmp')
|
||||
self.metadata = {}
|
||||
self.meta_file = None
|
||||
@@ -255,6 +257,31 @@ class DiskFile(object):
|
||||
if not self.keep_cache:
|
||||
drop_buffer_cache(fd, offset, length)
|
||||
|
||||
@classmethod
|
||||
def quarantine(cls, device_path, corrupted_file_path):
|
||||
"""
|
||||
In the case that a file is corrupted, move it to a quarantined
|
||||
area to allow replication to fix it.
|
||||
|
||||
:params device_path: The path to the device the corrupted file is on.
|
||||
:params corrupted_file_path: The path to the file you want quarantined.
|
||||
|
||||
:returns: path (str) of directory the file was moved to
|
||||
:raises OSError: re-raises non errno.EEXIST exceptions from rename
|
||||
"""
|
||||
from_dir = os.path.dirname(corrupted_file_path)
|
||||
to_dir = os.path.join(device_path, 'quarantined',
|
||||
'objects', os.path.basename(from_dir))
|
||||
invalidate_hash(os.path.dirname(from_dir))
|
||||
try:
|
||||
renamer(from_dir, to_dir)
|
||||
except OSError, e:
|
||||
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
|
||||
raise
|
||||
to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex)
|
||||
renamer(from_dir, to_dir)
|
||||
return to_dir
|
||||
|
||||
|
||||
class ObjectController(object):
|
||||
"""Implements the WSGI application for the Swift Object Server."""
|
||||
|
||||
@@ -36,16 +36,97 @@ from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
|
||||
NullLogger, storage_directory
|
||||
|
||||
|
||||
class TestDiskFile(unittest.TestCase):
|
||||
"""Test swift.obj.server.DiskFile"""
|
||||
|
||||
def setUp(self):
|
||||
""" Set up for testing swift.object_server.ObjectController """
|
||||
self.testdir = os.path.join(mkdtemp(), 'tmp_test_obj_server_DiskFile')
|
||||
mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
|
||||
|
||||
def tearDown(self):
|
||||
""" Tear down for testing swift.object_server.ObjectController """
|
||||
rmtree(os.path.dirname(self.testdir))
|
||||
|
||||
def test_disk_file_app_iter_corners(self):
|
||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
|
||||
mkdirs(df.datadir)
|
||||
f = open(os.path.join(df.datadir,
|
||||
normalize_timestamp(time()) + '.data'), 'wb')
|
||||
f.write('1234567890')
|
||||
setxattr(f.fileno(), object_server.METADATA_KEY,
|
||||
pickle.dumps({}, object_server.PICKLE_PROTOCOL))
|
||||
f.close()
|
||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
|
||||
keep_data_fp=True)
|
||||
it = df.app_iter_range(0, None)
|
||||
sio = StringIO()
|
||||
for chunk in it:
|
||||
sio.write(chunk)
|
||||
self.assertEquals(sio.getvalue(), '1234567890')
|
||||
|
||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
|
||||
keep_data_fp=True)
|
||||
it = df.app_iter_range(5, None)
|
||||
sio = StringIO()
|
||||
for chunk in it:
|
||||
sio.write(chunk)
|
||||
self.assertEquals(sio.getvalue(), '67890')
|
||||
|
||||
def test_disk_file_mkstemp_creates_dir(self):
|
||||
tmpdir = os.path.join(self.testdir, 'sda1', 'tmp')
|
||||
os.rmdir(tmpdir)
|
||||
with object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
|
||||
'o').mkstemp():
|
||||
self.assert_(os.path.exists(tmpdir))
|
||||
|
||||
def test_quarantine(self):
|
||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
|
||||
mkdirs(df.datadir)
|
||||
f = open(os.path.join(df.datadir,
|
||||
normalize_timestamp(time()) + '.data'), 'wb')
|
||||
setxattr(f.fileno(), object_server.METADATA_KEY,
|
||||
pickle.dumps({}, object_server.PICKLE_PROTOCOL))
|
||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
|
||||
object_server.DiskFile.quarantine(df.device_path, df.data_file)
|
||||
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined',
|
||||
'objects', os.path.basename(os.path.dirname(
|
||||
df.data_file)))
|
||||
self.assert_(os.path.isdir(quar_dir))
|
||||
|
||||
def test_quarantine_double(self):
|
||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
|
||||
mkdirs(df.datadir)
|
||||
f = open(os.path.join(df.datadir,
|
||||
normalize_timestamp(time()) + '.data'), 'wb')
|
||||
setxattr(f.fileno(), object_server.METADATA_KEY,
|
||||
pickle.dumps({}, object_server.PICKLE_PROTOCOL))
|
||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
|
||||
new_dir = object_server.DiskFile.quarantine(df.device_path,
|
||||
df.data_file)
|
||||
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined',
|
||||
'objects', os.path.basename(os.path.dirname(
|
||||
df.data_file)))
|
||||
self.assert_(os.path.isdir(quar_dir))
|
||||
self.assertEquals(quar_dir, new_dir)
|
||||
# have to remake the datadir
|
||||
mkdirs(df.datadir)
|
||||
double_uuid_path = df.quarantine(df.device_path, df.data_file)
|
||||
self.assert_(os.path.isdir(double_uuid_path))
|
||||
self.assert_('-' in os.path.basename(double_uuid_path))
|
||||
|
||||
def test_unlinkold(self):
|
||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'ob')
|
||||
|
||||
|
||||
|
||||
class TestObjectController(unittest.TestCase):
|
||||
""" Test swift.object_server.ObjectController """
|
||||
""" Test swift.obj.server.ObjectController """
|
||||
|
||||
def setUp(self):
|
||||
""" Set up for testing swift.object_server.ObjectController """
|
||||
self.testdir = \
|
||||
os.path.join(mkdtemp(), 'tmp_test_object_server_ObjectController')
|
||||
mkdirs(self.testdir)
|
||||
rmtree(self.testdir)
|
||||
mkdirs(os.path.join(self.testdir, 'sda1'))
|
||||
mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
|
||||
conf = {'devices': self.testdir, 'mount_check': 'false'}
|
||||
self.object_controller = object_server.ObjectController(conf)
|
||||
@@ -870,38 +951,6 @@ class TestObjectController(unittest.TestCase):
|
||||
resp = self.object_controller.PUT(req)
|
||||
self.assertEquals(resp.status_int, 400)
|
||||
|
||||
def test_disk_file_app_iter_corners(self):
|
||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
|
||||
mkdirs(df.datadir)
|
||||
f = open(os.path.join(df.datadir,
|
||||
normalize_timestamp(time()) + '.data'), 'wb')
|
||||
f.write('1234567890')
|
||||
setxattr(f.fileno(), object_server.METADATA_KEY,
|
||||
pickle.dumps({}, object_server.PICKLE_PROTOCOL))
|
||||
f.close()
|
||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
|
||||
keep_data_fp=True)
|
||||
it = df.app_iter_range(0, None)
|
||||
sio = StringIO()
|
||||
for chunk in it:
|
||||
sio.write(chunk)
|
||||
self.assertEquals(sio.getvalue(), '1234567890')
|
||||
|
||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
|
||||
keep_data_fp=True)
|
||||
it = df.app_iter_range(5, None)
|
||||
sio = StringIO()
|
||||
for chunk in it:
|
||||
sio.write(chunk)
|
||||
self.assertEquals(sio.getvalue(), '67890')
|
||||
|
||||
def test_disk_file_mkstemp_creates_dir(self):
|
||||
tmpdir = os.path.join(self.testdir, 'sda1', 'tmp')
|
||||
os.rmdir(tmpdir)
|
||||
with object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
|
||||
'o').mkstemp():
|
||||
self.assert_(os.path.exists(tmpdir))
|
||||
|
||||
def test_max_upload_time(self):
|
||||
|
||||
class SlowBody():
|
||||
|
||||
Reference in New Issue
Block a user