Check the md5sum against metadata ETag on object GETs, and zero byte checks on GETs, HEADs, and POSTs.

This commit is contained in:
David Goetz 2011-03-29 20:19:09 +00:00 committed by Tarmac
commit 955e6fa81d
11 changed files with 876 additions and 166 deletions

View File

@ -183,7 +183,7 @@ def direct_head_object(node, part, account, container, obj, conn_timeout=5,
def direct_get_object(node, part, account, container, obj, conn_timeout=5,
response_timeout=15, resp_chunk_size=None):
response_timeout=15, resp_chunk_size=None, headers={}):
"""
Get object directly from the object server.
@ -195,13 +195,14 @@ def direct_get_object(node, part, account, container, obj, conn_timeout=5,
:param conn_timeout: timeout in seconds for establishing the connection
:param response_timeout: timeout in seconds for getting the response
:param resp_chunk_size: if defined, chunk size of data to read.
:param headers: dict to be passed into HTTPConnection headers
:returns: a tuple of (response headers, the object's contents) The response
headers will be a dict and all header names will be lowercase.
"""
path = '/%s/%s/%s' % (account, container, obj)
with Timeout(conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], part,
'GET', path)
'GET', path, headers=headers)
with Timeout(response_timeout):
resp = conn.getresponse()
if resp.status < 200 or resp.status >= 300:
@ -286,6 +287,40 @@ def direct_put_object(node, part, account, container, name, contents,
return resp.getheader('etag').strip('"')
def direct_post_object(node, part, account, container, name, headers,
conn_timeout=5, response_timeout=15):
"""
Direct update to object metadata on object server.
:param node: node dictionary from the ring
:param part: partition the container is on
:param account: account name
:param container: container name
:param name: object name
:param headers: headers to store as metadata
:param conn_timeout: timeout in seconds for establishing the connection
:param response_timeout: timeout in seconds for getting the response
:raises ClientException: HTTP POST request failed
"""
path = '/%s/%s/%s' % (account, container, name)
headers['X-Timestamp'] = normalize_timestamp(time())
with Timeout(conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], part,
'POST', path, headers=headers)
with Timeout(response_timeout):
resp = conn.getresponse()
resp.read()
if resp.status < 200 or resp.status >= 300:
raise ClientException(
'Object server %s:%s direct POST %s gave status %s' %
(node['ip'], node['port'],
repr('/%s/%s%s' % (node['device'], part, path)),
resp.status),
http_host=node['ip'], http_port=node['port'],
http_device=node['device'], http_status=resp.status,
http_reason=resp.reason)
def direct_delete_object(node, part, account, container, obj,
conn_timeout=5, response_timeout=15, headers={}):
"""

View File

@ -30,6 +30,14 @@ class AuditException(Exception):
pass
class DiskFileError(Exception):
pass
class DiskFileNotExist(Exception):
pass
class AuthException(Exception):
pass

View File

@ -179,9 +179,9 @@ def mkdirs(path):
raise
def renamer(old, new): # pragma: no cover
def renamer(old, new):
"""
Attempt to fix^H^H^Hhide race conditions like empty object directories
Attempt to fix / hide race conditions like empty object directories
being removed by backend processes during uploads, by retrying.
:param old: old path to be renamed
@ -567,7 +567,7 @@ def storage_directory(datadir, partition, hash):
:param hash: Account, container or object hash
:returns: Storage directory
"""
return os.path.join(datadir, partition, hash[-3:], hash)
return os.path.join(datadir, str(partition), hash[-3:], hash)
def hash_path(account, container=None, object=None, raw_digest=False):

View File

@ -23,7 +23,8 @@ from swift.obj import server as object_server
from swift.obj.replicator import invalidate_hash
from swift.common.utils import get_logger, renamer, audit_location_generator, \
ratelimit_sleep, TRUE_VALUES
from swift.common.exceptions import AuditException
from swift.common.exceptions import AuditException, DiskFileError, \
DiskFileNotExist
from swift.common.daemon import Daemon
SLEEP_BETWEEN_AUDITS = 30
@ -119,47 +120,39 @@ class AuditorWorker(object):
except Exception, exc:
raise AuditException('Error when reading metadata: %s' % exc)
_junk, account, container, obj = name.split('/', 3)
df = object_server.DiskFile(self.devices, device,
partition, account,
container, obj,
df = object_server.DiskFile(self.devices, device, partition,
account, container, obj, self.logger,
keep_data_fp=True)
if df.data_file is None:
# file is deleted, we found the tombstone
return
obj_size = os.path.getsize(df.data_file)
if obj_size != int(df.metadata['Content-Length']):
raise AuditException('Content-Length of %s does not match '
'file size of %s' % (int(df.metadata['Content-Length']),
os.path.getsize(df.data_file)))
try:
obj_size = df.get_data_file_size()
except DiskFileError, e:
raise AuditException(str(e))
except DiskFileNotExist:
return
if self.zero_byte_only_at_fps and obj_size:
return
etag = md5()
for chunk in df:
self.bytes_running_time = ratelimit_sleep(
self.bytes_running_time, self.max_bytes_per_second,
incr_by=len(chunk))
etag.update(chunk)
self.bytes_processed += len(chunk)
self.total_bytes_processed += len(chunk)
etag = etag.hexdigest()
if etag != df.metadata['ETag']:
raise AuditException("ETag of %s does not match file's md5 of "
"%s" % (df.metadata['ETag'], etag))
df.close()
if df.quarantined_dir:
self.quarantines += 1
self.logger.error(
_("ERROR Object %(path)s failed audit and will be "
"quarantined: ETag and file's md5 do not match"),
{'path': path})
except AuditException, err:
self.quarantines += 1
self.logger.error(_('ERROR Object %(obj)s failed audit and will '
'be quarantined: %(err)s'), {'obj': path, 'err': err})
object_dir = os.path.dirname(path)
invalidate_hash(os.path.dirname(object_dir))
renamer_path = os.path.dirname(path)
to_path = os.path.join(self.devices, device, 'quarantined',
'objects', os.path.basename(renamer_path))
try:
renamer(renamer_path, to_path)
except OSError, e:
if e.errno == errno.EEXIST:
to_path = "%s-%s" % (to_path, uuid.uuid4().hex)
renamer(renamer_path, to_path)
object_server.quarantine_renamer(
os.path.join(self.devices, device), path)
return
except Exception:
self.errors += 1

View File

@ -21,6 +21,7 @@ import errno
import os
import time
import traceback
import uuid
from datetime import datetime
from hashlib import md5
from tempfile import mkstemp
@ -41,7 +42,8 @@ from swift.common.utils import mkdirs, normalize_timestamp, \
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_object_creation, check_mount, \
check_float, check_utf8
from swift.common.exceptions import ConnectionTimeout
from swift.common.exceptions import ConnectionTimeout, DiskFileError, \
DiskFileNotExist
from swift.obj.replicator import get_hashes, invalidate_hash
@ -89,6 +91,32 @@ def write_metadata(fd, metadata):
key += 1
def quarantine_renamer(device_path, corrupted_file_path):
"""
In the case that a file is corrupted, move it to a quarantined
area to allow replication to fix it.
:params device_path: The path to the device the corrupted file is on.
:params corrupted_file_path: The path to the file you want quarantined.
:returns: path (str) of directory the file was moved to
:raises OSError: re-raises non errno.EEXIST / errno.ENOTEMPTY
exceptions from rename
"""
from_dir = os.path.dirname(corrupted_file_path)
to_dir = os.path.join(device_path, 'quarantined',
'objects', os.path.basename(from_dir))
invalidate_hash(os.path.dirname(from_dir))
try:
renamer(from_dir, to_dir)
except OSError, e:
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
raise
to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex)
renamer(from_dir, to_dir)
return to_dir
class DiskFile(object):
"""
Manage object files on disk.
@ -104,17 +132,23 @@ class DiskFile(object):
"""
def __init__(self, path, device, partition, account, container, obj,
keep_data_fp=False, disk_chunk_size=65536):
logger, keep_data_fp=False, disk_chunk_size=65536):
self.disk_chunk_size = disk_chunk_size
self.name = '/' + '/'.join((account, container, obj))
name_hash = hash_path(account, container, obj)
self.datadir = os.path.join(path, device,
storage_directory(DATADIR, partition, name_hash))
self.device_path = os.path.join(path, device)
self.tmpdir = os.path.join(path, device, 'tmp')
self.logger = logger
self.metadata = {}
self.meta_file = None
self.data_file = None
self.fp = None
self.iter_etag = None
self.started_at_0 = False
self.read_to_eof = False
self.quarantined_dir = None
self.keep_cache = False
if not os.path.exists(self.datadir):
return
@ -134,7 +168,7 @@ class DiskFile(object):
self.fp = open(self.data_file, 'rb')
self.metadata = read_metadata(self.fp)
if not keep_data_fp:
self.close()
self.close(verify_file=False)
if self.meta_file:
with open(self.meta_file) as mfp:
for key in self.metadata.keys():
@ -147,9 +181,16 @@ class DiskFile(object):
try:
dropped_cache = 0
read = 0
self.started_at_0 = False
self.read_to_eof = False
if self.fp.tell() == 0:
self.started_at_0 = True
self.iter_etag = md5()
while True:
chunk = self.fp.read(self.disk_chunk_size)
if chunk:
if self.iter_etag:
self.iter_etag.update(chunk)
read += len(chunk)
if read - dropped_cache > (1024 * 1024):
self.drop_cache(self.fp.fileno(), dropped_cache,
@ -157,6 +198,7 @@ class DiskFile(object):
dropped_cache = read
yield chunk
else:
self.read_to_eof = True
self.drop_cache(self.fp.fileno(), dropped_cache,
read - dropped_cache)
break
@ -180,11 +222,41 @@ class DiskFile(object):
break
yield chunk
def close(self):
"""Close the file."""
def _handle_close_quarantine(self):
"""Check if file needs to be quarantined"""
try:
obj_size = self.get_data_file_size()
except DiskFileError, e:
self.quarantine()
return
except DiskFileNotExist:
return
if (self.iter_etag and self.started_at_0 and self.read_to_eof and
'ETag' in self.metadata and
self.iter_etag.hexdigest() != self.metadata.get('ETag')):
self.quarantine()
def close(self, verify_file=True):
"""
Close the file. Will handle quarantining file if necessary.
:param verify_file: Defaults to True. If false, will not check
file to see if it needs quarantining.
"""
if self.fp:
self.fp.close()
self.fp = None
try:
if verify_file:
self._handle_close_quarantine()
except Exception, e:
import traceback
self.logger.error(_('ERROR DiskFile %(data_file)s in '
'%(data_dir)s close failure: %(exc)s : %(stack)'),
{'exc': e, 'stack': ''.join(traceback.format_stack()),
'data_file': self.data_file, 'data_dir': self.datadir})
finally:
self.fp.close()
self.fp = None
def is_deleted(self):
"""
@ -255,6 +327,44 @@ class DiskFile(object):
if not self.keep_cache:
drop_buffer_cache(fd, offset, length)
def quarantine(self):
"""
In the case that a file is corrupted, move it to a quarantined
area to allow replication to fix it.
:returns: if quarantine is successful, path to quarantined
directory otherwise None
"""
if not (self.is_deleted() or self.quarantined_dir):
self.quarantined_dir = quarantine_renamer(self.device_path,
self.data_file)
return self.quarantined_dir
def get_data_file_size(self):
"""
Returns the os.path.getsize for the file. Raises an exception if this
file does not match the Content-Length stored in the metadata. Or if
self.data_file does not exist.
:returns: file size as an int
:raises DiskFileError: on file size mismatch.
:raises DiskFileNotExist: on file not existing (including deleted)
"""
try:
file_size = 0
if self.data_file:
file_size = os.path.getsize(self.data_file)
if 'Content-Length' in self.metadata:
metadata_size = int(self.metadata['Content-Length'])
if file_size != metadata_size:
raise DiskFileError('Content-Length of %s does not '
'match file size of %s' % (metadata_size, file_size))
return file_size
except OSError, err:
if err.errno != errno.ENOENT:
raise
raise DiskFileNotExist('Data File does not exist.')
class ObjectController(object):
"""Implements the WSGI application for the Swift Object Server."""
@ -349,13 +459,17 @@ class ObjectController(object):
if self.mount_check and not check_mount(self.devices, device):
return Response(status='507 %s is not mounted' % device)
file = DiskFile(self.devices, device, partition, account, container,
obj, disk_chunk_size=self.disk_chunk_size)
obj, self.logger, disk_chunk_size=self.disk_chunk_size)
if file.is_deleted():
response_class = HTTPNotFound
else:
response_class = HTTPAccepted
try:
file_size = file.get_data_file_size()
except (DiskFileError, DiskFileNotExist):
file.quarantine()
return HTTPNotFound(request=request)
metadata = {'X-Timestamp': request.headers['x-timestamp']}
metadata.update(val for val in request.headers.iteritems()
if val[0].lower().startswith('x-object-meta-'))
@ -385,7 +499,7 @@ class ObjectController(object):
if error_response:
return error_response
file = DiskFile(self.devices, device, partition, account, container,
obj, disk_chunk_size=self.disk_chunk_size)
obj, self.logger, disk_chunk_size=self.disk_chunk_size)
upload_expiration = time.time() + self.max_upload_time
etag = md5()
upload_size = 0
@ -451,12 +565,18 @@ class ObjectController(object):
if self.mount_check and not check_mount(self.devices, device):
return Response(status='507 %s is not mounted' % device)
file = DiskFile(self.devices, device, partition, account, container,
obj, keep_data_fp=True, disk_chunk_size=self.disk_chunk_size)
obj, self.logger, keep_data_fp=True,
disk_chunk_size=self.disk_chunk_size)
if file.is_deleted():
if request.headers.get('if-match') == '*':
return HTTPPreconditionFailed(request=request)
else:
return HTTPNotFound(request=request)
try:
file_size = file.get_data_file_size()
except (DiskFileError, DiskFileNotExist):
file.quarantine()
return HTTPNotFound(request=request)
if request.headers.get('if-match') not in (None, '*') and \
file.metadata['ETag'] not in request.if_match:
file.close()
@ -496,7 +616,7 @@ class ObjectController(object):
response.headers[key] = value
response.etag = file.metadata['ETag']
response.last_modified = float(file.metadata['X-Timestamp'])
response.content_length = int(file.metadata['Content-Length'])
response.content_length = file_size
if response.content_length < KEEP_CACHE_SIZE and \
'X-Auth-Token' not in request.headers and \
'X-Storage-Token' not in request.headers:
@ -518,9 +638,14 @@ class ObjectController(object):
if self.mount_check and not check_mount(self.devices, device):
return Response(status='507 %s is not mounted' % device)
file = DiskFile(self.devices, device, partition, account, container,
obj, disk_chunk_size=self.disk_chunk_size)
obj, self.logger, disk_chunk_size=self.disk_chunk_size)
if file.is_deleted():
return HTTPNotFound(request=request)
try:
file_size = file.get_data_file_size()
except (DiskFileError, DiskFileNotExist):
file.quarantine()
return HTTPNotFound(request=request)
response = Response(content_type=file.metadata['Content-Type'],
request=request, conditional_response=True)
for key, value in file.metadata.iteritems():
@ -529,7 +654,7 @@ class ObjectController(object):
response.headers[key] = value
response.etag = file.metadata['ETag']
response.last_modified = float(file.metadata['X-Timestamp'])
response.content_length = int(file.metadata['Content-Length'])
response.content_length = file_size
if 'Content-Encoding' in file.metadata:
response.content_encoding = file.metadata['Content-Encoding']
return response
@ -550,7 +675,7 @@ class ObjectController(object):
return Response(status='507 %s is not mounted' % device)
response_class = HTTPNoContent
file = DiskFile(self.devices, device, partition, account, container,
obj, disk_chunk_size=self.disk_chunk_size)
obj, self.logger, disk_chunk_size=self.disk_chunk_size)
if file.is_deleted():
response_class = HTTPNotFound
metadata = {

View File

@ -0,0 +1,176 @@
#!/usr/bin/python -u
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import os
from uuid import uuid4
from swift.common import client, direct_client
from swift.common.utils import hash_path, readconf
from swift.obj.server import write_metadata, read_metadata
from test.probe.common import kill_pids, reset_environment
class TestObjectFailures(unittest.TestCase):
def setUp(self):
self.pids, self.port2server, self.account_ring, self.container_ring, \
self.object_ring, self.url, self.token, self.account = \
reset_environment()
def tearDown(self):
kill_pids(self.pids)
def _get_data_file_path(self, obj_dir):
files = sorted(os.listdir(obj_dir), reverse=True)
for file in files:
return os.path.join(obj_dir, file)
def _setup_data_file(self, container, obj, data):
client.put_container(self.url, self.token, container)
client.put_object(self.url, self.token, container, obj, data)
odata = client.get_object(self.url, self.token, container, obj)[-1]
self.assertEquals(odata, data)
opart, onodes = self.object_ring.get_nodes(
self.account, container, obj)
onode = onodes[0]
node_id = (onode['port'] - 6000) / 10
device = onode['device']
hash_str = hash_path(self.account, container, obj)
obj_server_conf = readconf('/etc/swift/object-server/%s.conf' %
node_id)
devices = obj_server_conf['app:object-server']['devices']
obj_dir = '%s/%s/objects/%s/%s/%s/' % (devices,
device, opart,
hash_str[-3:], hash_str)
data_file = self._get_data_file_path(obj_dir)
return onode, opart, data_file
def run_quarantine(self):
container = 'container-%s' % uuid4()
obj = 'object-%s' % uuid4()
onode, opart, data_file = self._setup_data_file(container, obj,
'VERIFY')
with open(data_file) as fp:
metadata = read_metadata(fp)
metadata['ETag'] = 'badetag'
with open(data_file) as fp:
write_metadata(fp, metadata)
odata = direct_client.direct_get_object(onode, opart,
self.account, container, obj)[-1]
self.assertEquals(odata, 'VERIFY')
try:
resp = direct_client.direct_get_object(onode, opart, self.account,
container, obj)
raise "Did not quarantine object"
except client.ClientException, e:
self.assertEquals(e.http_status, 404)
def run_quarantine_range_etag(self):
container = 'container-range-%s' % uuid4()
obj = 'object-range-%s' % uuid4()
onode, opart, data_file = self._setup_data_file(container, obj,
'RANGE')
with open(data_file) as fp:
metadata = read_metadata(fp)
metadata['ETag'] = 'badetag'
with open(data_file) as fp:
write_metadata(fp, metadata)
for header, result in [({'Range': 'bytes=0-2'}, 'RAN'),
({'Range': 'bytes=1-11'}, 'ANGE'),
({'Range': 'bytes=0-11'}, 'RANGE')]:
odata = direct_client.direct_get_object(onode, opart,
self.account, container, obj,
headers=header)[-1]
self.assertEquals(odata, result)
try:
resp = direct_client.direct_get_object(onode, opart, self.account,
container, obj)
raise "Did not quarantine object"
except client.ClientException, e:
self.assertEquals(e.http_status, 404)
def run_quarantine_zero_byte_get(self):
container = 'container-zbyte-%s' % uuid4()
obj = 'object-zbyte-%s' % uuid4()
onode, opart, data_file = self._setup_data_file(container, obj, 'DATA')
with open(data_file) as fp:
metadata = read_metadata(fp)
os.unlink(data_file)
with open(data_file, 'w') as fp:
write_metadata(fp, metadata)
try:
resp = direct_client.direct_get_object(onode, opart, self.account,
container, obj,
conn_timeout=1,
response_timeout=1)
raise "Did not quarantine object"
except client.ClientException, e:
self.assertEquals(e.http_status, 404)
def run_quarantine_zero_byte_head(self):
container = 'container-zbyte-%s' % uuid4()
obj = 'object-zbyte-%s' % uuid4()
onode, opart, data_file = self._setup_data_file(container, obj, 'DATA')
with open(data_file) as fp:
metadata = read_metadata(fp)
os.unlink(data_file)
with open(data_file, 'w') as fp:
write_metadata(fp, metadata)
try:
resp = direct_client.direct_head_object(onode, opart, self.account,
container, obj,
conn_timeout=1,
response_timeout=1)
raise "Did not quarantine object"
except client.ClientException, e:
self.assertEquals(e.http_status, 404)
def run_quarantine_zero_byte_post(self):
container = 'container-zbyte-%s' % uuid4()
obj = 'object-zbyte-%s' % uuid4()
onode, opart, data_file = self._setup_data_file(container, obj, 'DATA')
with open(data_file) as fp:
metadata = read_metadata(fp)
os.unlink(data_file)
with open(data_file, 'w') as fp:
write_metadata(fp, metadata)
try:
resp = direct_client.direct_post_object(
onode, opart, self.account,
container, obj,
{'X-Object-Meta-1': 'One', 'X-Object-Meta-Two': 'Two'},
conn_timeout=1,
response_timeout=1)
raise "Did not quarantine object"
except client.ClientException, e:
self.assertEquals(e.http_status, 404)
def test_runner(self):
self.run_quarantine()
self.run_quarantine_range_etag()
self.run_quarantine_zero_byte_get()
self.run_quarantine_zero_byte_head()
self.run_quarantine_zero_byte_post()
if __name__ == '__main__':
unittest.main()

View File

@ -91,6 +91,22 @@ def temptree(files, contents=''):
rmtree(tempdir)
class FakeLogger(object):
# a thread safe logger
def __init__(self):
self.log_dict = dict(error=[], info=[], warning=[])
def error(self, *args, **kwargs):
self.log_dict['error'].append((args, kwargs))
def info(self, *args, **kwargs):
self.log_dict['info'].append((args, kwargs))
def warning(self, *args, **kwargs):
self.log_dict['warning'].append((args, kwargs))
class MockTrue(object):
"""
Instances of MockTrue evaluate like True

View File

@ -19,6 +19,7 @@ from contextlib import contextmanager
from threading import Thread
from webob import Request
from test.unit import FakeLogger
from swift.common.middleware import ratelimit
from swift.proxy.server import get_container_memcache_key
from swift.common.memcached import MemcacheConnectionError
@ -96,19 +97,6 @@ class FakeApp(object):
return ['204 No Content']
class FakeLogger(object):
# a thread safe logger
def error(self, *args, **kwargs):
pass
def info(self, *args, **kwargs):
pass
def warning(self, *args, **kwargs):
pass
def start_response(*args):
pass

View File

@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# TODO: Tests
from test import unit
import unittest
import tempfile
@ -22,6 +21,7 @@ import time
from shutil import rmtree
from hashlib import md5
from tempfile import mkdtemp
from test.unit import FakeLogger
from swift.obj import auditor
from swift.obj import server as object_server
from swift.obj.server import DiskFile, write_metadata, DATADIR
@ -34,13 +34,11 @@ from swift.common.exceptions import AuditException
class TestAuditor(unittest.TestCase):
def setUp(self):
self.testdir = \
os.path.join(mkdtemp(), 'tmp_test_object_auditor')
self.testdir = os.path.join(mkdtemp(), 'tmp_test_object_auditor')
self.devices = os.path.join(self.testdir, 'node')
self.logger = FakeLogger()
rmtree(self.testdir, ignore_errors=1)
os.mkdir(self.testdir)
os.mkdir(self.devices)
os.mkdir(os.path.join(self.devices, 'sda'))
mkdirs(os.path.join(self.devices, 'sda'))
self.objects = os.path.join(self.devices, 'sda', 'objects')
os.mkdir(os.path.join(self.devices, 'sdb'))
@ -55,6 +53,8 @@ class TestAuditor(unittest.TestCase):
self.conf = dict(
devices=self.devices,
mount_check='false')
self.disk_file = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o',
self.logger)
def tearDown(self):
rmtree(os.path.dirname(self.testdir), ignore_errors=1)
@ -62,11 +62,9 @@ class TestAuditor(unittest.TestCase):
def test_object_audit_extra_data(self):
self.auditor = auditor.AuditorWorker(self.conf)
cur_part = '0'
disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
data = '0' * 1024
etag = md5()
with disk_file.mkstemp() as (fd, tmppath):
with self.disk_file.mkstemp() as (fd, tmppath):
os.write(fd, data)
etag.update(data)
etag = etag.hexdigest()
@ -76,28 +74,26 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size),
}
disk_file.put(fd, tmppath, metadata)
self.disk_file.put(fd, tmppath, metadata)
pre_quarantines = self.auditor.quarantines
self.auditor.object_audit(
os.path.join(disk_file.datadir, timestamp + '.data'),
'sda', cur_part)
os.path.join(self.disk_file.datadir, timestamp + '.data'),
'sda', '0')
self.assertEquals(self.auditor.quarantines, pre_quarantines)
os.write(fd, 'extra_data')
self.auditor.object_audit(
os.path.join(disk_file.datadir, timestamp + '.data'),
'sda', cur_part)
os.path.join(self.disk_file.datadir, timestamp + '.data'),
'sda', '0')
self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
def test_object_audit_diff_data(self):
self.auditor = auditor.AuditorWorker(self.conf)
cur_part = '0'
disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
data = '0' * 1024
etag = md5()
timestamp = str(normalize_timestamp(time.time()))
with disk_file.mkstemp() as (fd, tmppath):
with self.disk_file.mkstemp() as (fd, tmppath):
os.write(fd, data)
etag.update(data)
etag = etag.hexdigest()
@ -106,12 +102,15 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size),
}
disk_file.put(fd, tmppath, metadata)
self.disk_file.put(fd, tmppath, metadata)
pre_quarantines = self.auditor.quarantines
# remake so it will have metadata
self.disk_file = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o',
self.logger)
self.auditor.object_audit(
os.path.join(disk_file.datadir, timestamp + '.data'),
'sda', cur_part)
os.path.join(self.disk_file.datadir, timestamp + '.data'),
'sda', '0')
self.assertEquals(self.auditor.quarantines, pre_quarantines)
etag = md5()
etag.update('1' + '0' * 1023)
@ -120,25 +119,23 @@ class TestAuditor(unittest.TestCase):
write_metadata(fd, metadata)
self.auditor.object_audit(
os.path.join(disk_file.datadir, timestamp + '.data'),
'sda', cur_part)
os.path.join(self.disk_file.datadir, timestamp + '.data'),
'sda', '0')
self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
def test_object_audit_no_meta(self):
cur_part = '0'
disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
timestamp = str(normalize_timestamp(time.time()))
path = os.path.join(disk_file.datadir, timestamp + '.data')
mkdirs(disk_file.datadir)
path = os.path.join(self.disk_file.datadir, timestamp + '.data')
mkdirs(self.disk_file.datadir)
fp = open(path, 'w')
fp.write('0' * 1024)
fp.close()
invalidate_hash(os.path.dirname(disk_file.datadir))
invalidate_hash(os.path.dirname(self.disk_file.datadir))
self.auditor = auditor.AuditorWorker(self.conf)
pre_quarantines = self.auditor.quarantines
self.auditor.object_audit(
os.path.join(disk_file.datadir, timestamp + '.data'),
'sda', cur_part)
os.path.join(self.disk_file.datadir, timestamp + '.data'),
'sda', '0')
self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
def test_object_audit_bad_args(self):
@ -153,13 +150,11 @@ class TestAuditor(unittest.TestCase):
def test_object_run_once_pass(self):
self.auditor = auditor.AuditorWorker(self.conf)
self.auditor.log_time = 0
cur_part = '0'
timestamp = str(normalize_timestamp(time.time()))
pre_quarantines = self.auditor.quarantines
disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
data = '0' * 1024
etag = md5()
with disk_file.mkstemp() as (fd, tmppath):
with self.disk_file.mkstemp() as (fd, tmppath):
os.write(fd, data)
etag.update(data)
etag = etag.hexdigest()
@ -168,20 +163,18 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size),
}
disk_file.put(fd, tmppath, metadata)
disk_file.close()
self.disk_file.put(fd, tmppath, metadata)
self.disk_file.close()
self.auditor.audit_all_objects()
self.assertEquals(self.auditor.quarantines, pre_quarantines)
def test_object_run_once_no_sda(self):
self.auditor = auditor.AuditorWorker(self.conf)
cur_part = '0'
timestamp = str(normalize_timestamp(time.time()))
pre_quarantines = self.auditor.quarantines
disk_file = DiskFile(self.devices, 'sdb', cur_part, 'a', 'c', 'o')
data = '0' * 1024
etag = md5()
with disk_file.mkstemp() as (fd, tmppath):
with self.disk_file.mkstemp() as (fd, tmppath):
os.write(fd, data)
etag.update(data)
etag = etag.hexdigest()
@ -190,21 +183,19 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size),
}
disk_file.put(fd, tmppath, metadata)
disk_file.close()
self.disk_file.put(fd, tmppath, metadata)
self.disk_file.close()
os.write(fd, 'extra_data')
self.auditor.audit_all_objects()
self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
def test_object_run_once_multi_devices(self):
self.auditor = auditor.AuditorWorker(self.conf)
cur_part = '0'
timestamp = str(normalize_timestamp(time.time()))
pre_quarantines = self.auditor.quarantines
disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
data = '0' * 10
etag = md5()
with disk_file.mkstemp() as (fd, tmppath):
with self.disk_file.mkstemp() as (fd, tmppath):
os.write(fd, data)
etag.update(data)
etag = etag.hexdigest()
@ -213,13 +204,14 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size),
}
disk_file.put(fd, tmppath, metadata)
disk_file.close()
self.disk_file.put(fd, tmppath, metadata)
self.disk_file.close()
self.auditor.audit_all_objects()
disk_file = DiskFile(self.devices, 'sdb', cur_part, 'a', 'c', 'ob')
self.disk_file = DiskFile(self.devices, 'sdb', '0', 'a', 'c',
'ob', self.logger)
data = '1' * 10
etag = md5()
with disk_file.mkstemp() as (fd, tmppath):
with self.disk_file.mkstemp() as (fd, tmppath):
os.write(fd, data)
etag.update(data)
etag = etag.hexdigest()
@ -228,8 +220,8 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size),
}
disk_file.put(fd, tmppath, metadata)
disk_file.close()
self.disk_file.put(fd, tmppath, metadata)
self.disk_file.close()
os.write(fd, 'extra_data')
self.auditor.audit_all_objects()
self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
@ -237,11 +229,9 @@ class TestAuditor(unittest.TestCase):
def test_object_run_fast_track_non_zero(self):
self.auditor = auditor.ObjectAuditor(self.conf)
self.auditor.log_time = 0
cur_part = '0'
disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
data = '0' * 1024
etag = md5()
with disk_file.mkstemp() as (fd, tmppath):
with self.disk_file.mkstemp() as (fd, tmppath):
os.write(fd, data)
etag.update(data)
etag = etag.hexdigest()
@ -250,7 +240,7 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': str(normalize_timestamp(time.time())),
'Content-Length': str(os.fstat(fd).st_size),
}
disk_file.put(fd, tmppath, metadata)
self.disk_file.put(fd, tmppath, metadata)
etag = md5()
etag.update('1' + '0' * 1023)
etag = etag.hexdigest()
@ -267,36 +257,33 @@ class TestAuditor(unittest.TestCase):
def setup_bad_zero_byte(self, with_ts=False):
self.auditor = auditor.ObjectAuditor(self.conf)
self.auditor.log_time = 0
cur_part = '0'
ts_file_path = ''
if with_ts:
name_hash = hash_path('a', 'c', 'o')
dir_path = os.path.join(self.devices, 'sda',
storage_directory(DATADIR, cur_part, name_hash))
storage_directory(DATADIR, '0', name_hash))
ts_file_path = os.path.join(dir_path, '99999.ts')
if not os.path.exists(dir_path):
mkdirs(dir_path)
fp = open(ts_file_path, 'w')
fp.close()
disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
etag = md5()
with disk_file.mkstemp() as (fd, tmppath):
with self.disk_file.mkstemp() as (fd, tmppath):
etag = etag.hexdigest()
metadata = {
'ETag': etag,
'X-Timestamp': str(normalize_timestamp(time.time())),
'Content-Length': 10,
}
disk_file.put(fd, tmppath, metadata)
self.disk_file.put(fd, tmppath, metadata)
etag = md5()
etag = etag.hexdigest()
metadata['ETag'] = etag
write_metadata(fd, metadata)
if disk_file.data_file:
return disk_file.data_file
if self.disk_file.data_file:
return self.disk_file.data_file
return ts_file_path
def test_object_run_fast_track_all(self):

View File

@ -25,9 +25,8 @@ import fcntl
import time
import tempfile
from contextlib import contextmanager
from eventlet import tpool
from eventlet.green import subprocess
from test.unit import FakeLogger
from swift.common import utils
from swift.common.utils import hash_path, mkdirs, normalize_timestamp
from swift.common import ring
@ -164,7 +163,8 @@ class TestObjectReplicator(unittest.TestCase):
was_connector = object_replicator.http_connect
object_replicator.http_connect = mock_http_connect(200)
cur_part = '0'
df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o',
FakeLogger())
mkdirs(df.datadir)
f = open(os.path.join(df.datadir,
normalize_timestamp(time.time()) + '.data'),
@ -189,7 +189,7 @@ class TestObjectReplicator(unittest.TestCase):
object_replicator.http_connect = was_connector
def test_get_hashes(self):
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
mkdirs(df.datadir)
with open(os.path.join(df.datadir, normalize_timestamp(
time.time()) + '.ts'), 'wb') as f:
@ -206,7 +206,7 @@ class TestObjectReplicator(unittest.TestCase):
self.assert_('a83' in hashes)
def test_hash_suffix_one_file(self):
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
mkdirs(df.datadir)
f = open(os.path.join(df.datadir,
normalize_timestamp(time.time() - 100) + '.ts'),
@ -223,7 +223,7 @@ class TestObjectReplicator(unittest.TestCase):
self.assertEquals(len(os.listdir(self.parts['0'])), 0)
def test_hash_suffix_multi_file_one(self):
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
mkdirs(df.datadir)
for tdiff in [1, 50, 100, 500]:
for suff in ['.meta', '.data', '.ts']:
@ -244,7 +244,7 @@ class TestObjectReplicator(unittest.TestCase):
self.assertEquals(len(os.listdir(whole_hsh_path)), 1)
def test_hash_suffix_multi_file_two(self):
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
mkdirs(df.datadir)
for tdiff in [1, 50, 100, 500]:
suffs = ['.meta', '.data']
@ -274,7 +274,7 @@ class TestObjectReplicator(unittest.TestCase):
fdata = fp.read()
self.assertEquals(fdata, data)
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
mkdirs(df.datadir)
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
@ -329,7 +329,7 @@ class TestObjectReplicator(unittest.TestCase):
os.path.join(self.objects, part))
def test_delete_partition(self):
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
mkdirs(df.datadir)
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
@ -347,7 +347,8 @@ class TestObjectReplicator(unittest.TestCase):
# Write some files into '1' and run replicate- they should be moved
# to the other partitoins and then node should get deleted.
cur_part = '1'
df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o',
FakeLogger())
mkdirs(df.datadir)
f = open(os.path.join(df.datadir,
normalize_timestamp(time.time()) + '.data'),

View File

@ -18,34 +18,280 @@
import cPickle as pickle
import os
import sys
import shutil
import unittest
from nose import SkipTest
from shutil import rmtree
from StringIO import StringIO
from time import gmtime, sleep, strftime, time
from tempfile import mkdtemp
from hashlib import md5
from eventlet import sleep, spawn, wsgi, listen
from webob import Request
from test.unit import FakeLogger
from test.unit import _getxattr as getxattr
from test.unit import _setxattr as setxattr
from test.unit import connect_tcp, readuntil2crlfs
from swift.obj import server as object_server
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
NullLogger, storage_directory
from swift.common.exceptions import DiskFileNotExist
from eventlet import tpool
class TestDiskFile(unittest.TestCase):
"""Test swift.obj.server.DiskFile"""
def setUp(self):
""" Set up for testing swift.object_server.ObjectController """
self.testdir = os.path.join(mkdtemp(), 'tmp_test_obj_server_DiskFile')
mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
def fake_exe(*args, **kwargs):
pass
tpool.execute = fake_exe
def tearDown(self):
""" Tear down for testing swift.object_server.ObjectController """
rmtree(os.path.dirname(self.testdir))
def test_disk_file_app_iter_corners(self):
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
mkdirs(df.datadir)
f = open(os.path.join(df.datadir,
normalize_timestamp(time()) + '.data'), 'wb')
f.write('1234567890')
setxattr(f.fileno(), object_server.METADATA_KEY,
pickle.dumps({}, object_server.PICKLE_PROTOCOL))
f.close()
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
it = df.app_iter_range(0, None)
sio = StringIO()
for chunk in it:
sio.write(chunk)
self.assertEquals(sio.getvalue(), '1234567890')
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
it = df.app_iter_range(5, None)
sio = StringIO()
for chunk in it:
sio.write(chunk)
self.assertEquals(sio.getvalue(), '67890')
def test_disk_file_mkstemp_creates_dir(self):
tmpdir = os.path.join(self.testdir, 'sda1', 'tmp')
os.rmdir(tmpdir)
with object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
'o', FakeLogger()).mkstemp():
self.assert_(os.path.exists(tmpdir))
def test_quarantine(self):
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
mkdirs(df.datadir)
f = open(os.path.join(df.datadir,
normalize_timestamp(time()) + '.data'), 'wb')
setxattr(f.fileno(), object_server.METADATA_KEY,
pickle.dumps({}, object_server.PICKLE_PROTOCOL))
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
df.quarantine()
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined',
'objects', os.path.basename(os.path.dirname(
df.data_file)))
self.assert_(os.path.isdir(quar_dir))
def test_quarantine_same_file(self):
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
mkdirs(df.datadir)
f = open(os.path.join(df.datadir,
normalize_timestamp(time()) + '.data'), 'wb')
setxattr(f.fileno(), object_server.METADATA_KEY,
pickle.dumps({}, object_server.PICKLE_PROTOCOL))
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
new_dir = df.quarantine()
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined',
'objects', os.path.basename(os.path.dirname(
df.data_file)))
self.assert_(os.path.isdir(quar_dir))
self.assertEquals(quar_dir, new_dir)
# have to remake the datadir and file
mkdirs(df.datadir)
f = open(os.path.join(df.datadir,
normalize_timestamp(time()) + '.data'), 'wb')
setxattr(f.fileno(), object_server.METADATA_KEY,
pickle.dumps({}, object_server.PICKLE_PROTOCOL))
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
double_uuid_path = df.quarantine()
self.assert_(os.path.isdir(double_uuid_path))
self.assert_('-' in os.path.basename(double_uuid_path))
def _get_data_file(self, invalid_type=None, obj_name='o',
fsize=1024, csize=8, extension='.data', ts=None):
'''returns a DiskFile'''
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
obj_name, FakeLogger())
data = '0' * fsize
etag = md5()
if ts:
timestamp = ts
else:
timestamp = str(normalize_timestamp(time()))
with df.mkstemp() as (fd, tmppath):
os.write(fd, data)
etag.update(data)
etag = etag.hexdigest()
metadata = {
'ETag': etag,
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size),
}
df.put(fd, tmppath, metadata, extension=extension)
if invalid_type == 'ETag':
etag = md5()
etag.update('1' + '0' * (fsize-1))
etag = etag.hexdigest()
metadata['ETag'] = etag
object_server.write_metadata(fd, metadata)
if invalid_type == 'Content-Length':
metadata['Content-Length'] = fsize-1
object_server.write_metadata(fd, metadata)
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
obj_name, FakeLogger(),
keep_data_fp=True, disk_chunk_size=csize)
if invalid_type == 'Zero-Byte':
os.remove(df.data_file)
fp = open(df.data_file, 'w')
fp.close()
df.unit_test_len = fsize
return df
def test_quarantine_valids(self):
df = self._get_data_file(obj_name='1')
for chunk in df:
pass
self.assertFalse(df.quarantined_dir)
df = self._get_data_file(obj_name='2', csize=1)
for chunk in df:
pass
self.assertFalse(df.quarantined_dir)
df = self._get_data_file(obj_name='3', csize=100000)
for chunk in df:
pass
self.assertFalse(df.quarantined_dir)
def run_quarantine_invalids(self, invalid_type):
df = self._get_data_file(invalid_type=invalid_type, obj_name='1')
for chunk in df:
pass
self.assertTrue(df.quarantined_dir)
df = self._get_data_file(invalid_type=invalid_type,
obj_name='2', csize=1)
for chunk in df:
pass
self.assertTrue(df.quarantined_dir)
df = self._get_data_file(invalid_type=invalid_type,
obj_name='3', csize=100000)
for chunk in df:
pass
self.assertTrue(df.quarantined_dir)
df = self._get_data_file(invalid_type=invalid_type, obj_name='4')
self.assertFalse(df.quarantined_dir)
df = self._get_data_file(invalid_type=invalid_type, obj_name='5')
for chunk in df.app_iter_range(0, df.unit_test_len):
pass
self.assertTrue(df.quarantined_dir)
df = self._get_data_file(invalid_type=invalid_type, obj_name='6')
for chunk in df.app_iter_range(0, df.unit_test_len + 100):
pass
self.assertTrue(df.quarantined_dir)
expected_quar = False
# for the following, Content-Length/Zero-Byte errors will always result
# in a quarantine, even if the whole file isn't check-summed
if invalid_type in ('Zero-Byte', 'Content-Length'):
expected_quar = True
df = self._get_data_file(invalid_type=invalid_type, obj_name='7')
for chunk in df.app_iter_range(1, df.unit_test_len):
pass
self.assertEquals(bool(df.quarantined_dir), expected_quar)
df = self._get_data_file(invalid_type=invalid_type, obj_name='8')
for chunk in df.app_iter_range(0, df.unit_test_len - 1):
pass
self.assertEquals(bool(df.quarantined_dir), expected_quar)
df = self._get_data_file(invalid_type=invalid_type, obj_name='8')
for chunk in df.app_iter_range(1, df.unit_test_len + 1):
pass
self.assertEquals(bool(df.quarantined_dir), expected_quar)
def test_quarantine_invalids(self):
self.run_quarantine_invalids('ETag')
self.run_quarantine_invalids('Content-Length')
self.run_quarantine_invalids('Zero-Byte')
def test_quarantine_deleted_files(self):
df = self._get_data_file(invalid_type='Content-Length',
extension='.data')
df.close()
self.assertTrue(df.quarantined_dir)
df = self._get_data_file(invalid_type='Content-Length',
extension='.ts')
df.close()
self.assertFalse(df.quarantined_dir)
df = self._get_data_file(invalid_type='Content-Length',
extension='.ts')
self.assertRaises(DiskFileNotExist, df.get_data_file_size)
def test_unlinkold(self):
df1 = self._get_data_file()
future_time = str(normalize_timestamp(time() + 100))
df2 = self._get_data_file(ts=future_time)
self.assertEquals(len(os.listdir(df1.datadir)), 2)
df1.unlinkold(future_time)
self.assertEquals(len(os.listdir(df1.datadir)), 1)
self.assertEquals(os.listdir(df1.datadir)[0], "%s.data" % future_time)
def test_close_error(self):
def err():
raise Exception("bad")
df = self._get_data_file(fsize=1024 * 1024 * 2)
df._handle_close_quarantine = err
for chunk in df:
pass
# close is called at the end of the iterator
self.assertEquals(df.fp, None)
self.assertEquals(len(df.logger.log_dict['error']), 1)
def test_quarantine_twice(self):
df = self._get_data_file(invalid_type='Content-Length',
extension='.data')
self.assert_(os.path.isfile(df.data_file))
quar_dir = df.quarantine()
self.assertFalse(os.path.isfile(df.data_file))
self.assert_(os.path.isdir(quar_dir))
self.assertEquals(df.quarantine(), None)
class TestObjectController(unittest.TestCase):
""" Test swift.object_server.ObjectController """
""" Test swift.obj.server.ObjectController """
def setUp(self):
""" Set up for testing swift.object_server.ObjectController """
self.testdir = \
os.path.join(mkdtemp(), 'tmp_test_object_server_ObjectController')
mkdirs(self.testdir)
rmtree(self.testdir)
mkdirs(os.path.join(self.testdir, 'sda1'))
mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
conf = {'devices': self.testdir, 'mount_check': 'false'}
self.object_controller = object_server.ObjectController(conf)
@ -243,6 +489,35 @@ class TestObjectController(unittest.TestCase):
finally:
object_server.http_connect = old_http_connect
def test_POST_quarantine_zbyte(self):
""" Test swift.object_server.ObjectController.GET """
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
'Content-Type': 'application/x-test'})
req.body = 'VERIFY'
resp = self.object_controller.PUT(req)
self.assertEquals(resp.status_int, 201)
file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
file_name = os.path.basename(file.data_file)
with open(file.data_file) as fp:
metadata = object_server.read_metadata(fp)
os.unlink(file.data_file)
with open(file.data_file, 'w') as fp:
object_server.write_metadata(fp, metadata)
self.assertEquals(os.listdir(file.datadir)[0], file_name)
req = Request.blank('/sda1/p/a/c/o',
headers={'X-Timestamp': normalize_timestamp(time())})
resp = self.object_controller.POST(req)
self.assertEquals(resp.status_int, 404)
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects',
os.path.basename(os.path.dirname(file.data_file)))
self.assertEquals(os.listdir(quar_dir)[0], file_name)
def test_PUT_invalid_path(self):
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'})
resp = self.object_controller.PUT(req)
@ -512,6 +787,34 @@ class TestObjectController(unittest.TestCase):
resp = self.object_controller.HEAD(req)
self.assertEquals(resp.status_int, 404)
def test_HEAD_quarantine_zbyte(self):
""" Test swift.object_server.ObjectController.GET """
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
'Content-Type': 'application/x-test'})
req.body = 'VERIFY'
resp = self.object_controller.PUT(req)
self.assertEquals(resp.status_int, 201)
file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
file_name = os.path.basename(file.data_file)
with open(file.data_file) as fp:
metadata = object_server.read_metadata(fp)
os.unlink(file.data_file)
with open(file.data_file, 'w') as fp:
object_server.write_metadata(fp, metadata)
self.assertEquals(os.listdir(file.datadir)[0], file_name)
req = Request.blank('/sda1/p/a/c/o')
resp = self.object_controller.HEAD(req)
self.assertEquals(resp.status_int, 404)
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects',
os.path.basename(os.path.dirname(file.data_file)))
self.assertEquals(os.listdir(quar_dir)[0], file_name)
def test_GET(self):
""" Test swift.object_server.ObjectController.GET """
req = Request.blank('/sda1/p/a/c')
@ -773,6 +1076,116 @@ class TestObjectController(unittest.TestCase):
resp = self.object_controller.GET(req)
self.assertEquals(resp.status_int, 200)
def test_GET_quarantine(self):
""" Test swift.object_server.ObjectController.GET """
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
'Content-Type': 'application/x-test'})
req.body = 'VERIFY'
resp = self.object_controller.PUT(req)
self.assertEquals(resp.status_int, 201)
file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
file_name = os.path.basename(file.data_file)
etag = md5()
etag.update('VERIF')
etag = etag.hexdigest()
metadata = {'X-Timestamp': timestamp,
'Content-Length': 6, 'ETag': etag}
object_server.write_metadata(file.fp, metadata)
self.assertEquals(os.listdir(file.datadir)[0], file_name)
req = Request.blank('/sda1/p/a/c/o')
resp = self.object_controller.GET(req)
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects',
os.path.basename(os.path.dirname(file.data_file)))
self.assertEquals(os.listdir(file.datadir)[0], file_name)
body = resp.body # actually does quarantining
self.assertEquals(body, 'VERIFY')
self.assertEquals(os.listdir(quar_dir)[0], file_name)
req = Request.blank('/sda1/p/a/c/o')
resp = self.object_controller.GET(req)
self.assertEquals(resp.status_int, 404)
def test_GET_quarantine_zbyte(self):
""" Test swift.object_server.ObjectController.GET """
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
'Content-Type': 'application/x-test'})
req.body = 'VERIFY'
resp = self.object_controller.PUT(req)
self.assertEquals(resp.status_int, 201)
file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
file_name = os.path.basename(file.data_file)
with open(file.data_file) as fp:
metadata = object_server.read_metadata(fp)
os.unlink(file.data_file)
with open(file.data_file, 'w') as fp:
object_server.write_metadata(fp, metadata)
self.assertEquals(os.listdir(file.datadir)[0], file_name)
req = Request.blank('/sda1/p/a/c/o')
resp = self.object_controller.GET(req)
self.assertEquals(resp.status_int, 404)
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects',
os.path.basename(os.path.dirname(file.data_file)))
self.assertEquals(os.listdir(quar_dir)[0], file_name)
def test_GET_quarantine_range(self):
""" Test swift.object_server.ObjectController.GET """
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
'Content-Type': 'application/x-test'})
req.body = 'VERIFY'
resp = self.object_controller.PUT(req)
self.assertEquals(resp.status_int, 201)
file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
file_name = os.path.basename(file.data_file)
etag = md5()
etag.update('VERIF')
etag = etag.hexdigest()
metadata = {'X-Timestamp': timestamp,
'Content-Length': 6, 'ETag': etag}
object_server.write_metadata(file.fp, metadata)
self.assertEquals(os.listdir(file.datadir)[0], file_name)
req = Request.blank('/sda1/p/a/c/o')
req.range = 'bytes=0-4' # partial
resp = self.object_controller.GET(req)
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects',
os.path.basename(os.path.dirname(file.data_file)))
body = resp.body
self.assertEquals(os.listdir(file.datadir)[0], file_name)
self.assertFalse(os.path.isdir(quar_dir))
req = Request.blank('/sda1/p/a/c/o')
resp = self.object_controller.GET(req)
self.assertEquals(resp.status_int, 200)
req = Request.blank('/sda1/p/a/c/o')
req.range = 'bytes=1-6' # partial
resp = self.object_controller.GET(req)
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects',
os.path.basename(os.path.dirname(file.data_file)))
body = resp.body
self.assertEquals(os.listdir(file.datadir)[0], file_name)
self.assertFalse(os.path.isdir(quar_dir))
req = Request.blank('/sda1/p/a/c/o')
req.range = 'bytes=0-14' # full
resp = self.object_controller.GET(req)
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects',
os.path.basename(os.path.dirname(file.data_file)))
self.assertEquals(os.listdir(file.datadir)[0], file_name)
body = resp.body
self.assertTrue(os.path.isdir(quar_dir))
req = Request.blank('/sda1/p/a/c/o')
resp = self.object_controller.GET(req)
self.assertEquals(resp.status_int, 404)
def test_DELETE(self):
""" Test swift.object_server.ObjectController.DELETE """
req = Request.blank('/sda1/p/a/c',
@ -943,38 +1356,6 @@ class TestObjectController(unittest.TestCase):
resp = self.object_controller.PUT(req)
self.assertEquals(resp.status_int, 400)
def test_disk_file_app_iter_corners(self):
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
mkdirs(df.datadir)
f = open(os.path.join(df.datadir,
normalize_timestamp(time()) + '.data'), 'wb')
f.write('1234567890')
setxattr(f.fileno(), object_server.METADATA_KEY,
pickle.dumps({}, object_server.PICKLE_PROTOCOL))
f.close()
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
keep_data_fp=True)
it = df.app_iter_range(0, None)
sio = StringIO()
for chunk in it:
sio.write(chunk)
self.assertEquals(sio.getvalue(), '1234567890')
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
keep_data_fp=True)
it = df.app_iter_range(5, None)
sio = StringIO()
for chunk in it:
sio.write(chunk)
self.assertEquals(sio.getvalue(), '67890')
def test_disk_file_mkstemp_creates_dir(self):
tmpdir = os.path.join(self.testdir, 'sda1', 'tmp')
os.rmdir(tmpdir)
with object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
'o').mkstemp():
self.assert_(os.path.exists(tmpdir))
def test_max_upload_time(self):
class SlowBody():