unittests workng and added probe test

This commit is contained in:
David Goetz 2011-03-15 22:12:03 -07:00
parent da1cdb1c40
commit 86c84c3bc3
11 changed files with 759 additions and 149 deletions

View File

@ -183,7 +183,7 @@ def direct_head_object(node, part, account, container, obj, conn_timeout=5,
def direct_get_object(node, part, account, container, obj, conn_timeout=5, def direct_get_object(node, part, account, container, obj, conn_timeout=5,
response_timeout=15, resp_chunk_size=None): response_timeout=15, resp_chunk_size=None, headers={}):
""" """
Get object directly from the object server. Get object directly from the object server.
@ -195,13 +195,14 @@ def direct_get_object(node, part, account, container, obj, conn_timeout=5,
:param conn_timeout: timeout in seconds for establishing the connection :param conn_timeout: timeout in seconds for establishing the connection
:param response_timeout: timeout in seconds for getting the response :param response_timeout: timeout in seconds for getting the response
:param resp_chunk_size: if defined, chunk size of data to read. :param resp_chunk_size: if defined, chunk size of data to read.
:param headers: dict to be passed into HTTPConnection headers
:returns: a tuple of (response headers, the object's contents) The response :returns: a tuple of (response headers, the object's contents) The response
headers will be a dict and all header names will be lowercase. headers will be a dict and all header names will be lowercase.
""" """
path = '/%s/%s/%s' % (account, container, obj) path = '/%s/%s/%s' % (account, container, obj)
with Timeout(conn_timeout): with Timeout(conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], part, conn = http_connect(node['ip'], node['port'], node['device'], part,
'GET', path) 'GET', path, headers=headers)
with Timeout(response_timeout): with Timeout(response_timeout):
resp = conn.getresponse() resp = conn.getresponse()
if resp.status < 200 or resp.status >= 300: if resp.status < 200 or resp.status >= 300:

View File

@ -30,6 +30,14 @@ class AuditException(Exception):
pass pass
class DiskFileError(Exception):
pass
class DiskFileNotExist(Exception):
pass
class AuthException(Exception): class AuthException(Exception):
pass pass

View File

@ -567,7 +567,7 @@ def storage_directory(datadir, partition, hash):
:param hash: Account, container or object hash :param hash: Account, container or object hash
:returns: Storage directory :returns: Storage directory
""" """
return os.path.join(datadir, partition, hash[-3:], hash) return os.path.join(datadir, str(partition), hash[-3:], hash)
def hash_path(account, container=None, object=None, raw_digest=False): def hash_path(account, container=None, object=None, raw_digest=False):

View File

@ -23,7 +23,8 @@ from swift.obj import server as object_server
from swift.obj.replicator import invalidate_hash from swift.obj.replicator import invalidate_hash
from swift.common.utils import get_logger, renamer, audit_location_generator, \ from swift.common.utils import get_logger, renamer, audit_location_generator, \
ratelimit_sleep, TRUE_VALUES ratelimit_sleep, TRUE_VALUES
from swift.common.exceptions import AuditException from swift.common.exceptions import AuditException, DiskFileError, \
DiskFileNotExist
from swift.common.daemon import Daemon from swift.common.daemon import Daemon
SLEEP_BETWEEN_AUDITS = 30 SLEEP_BETWEEN_AUDITS = 30
@ -119,40 +120,42 @@ class AuditorWorker(object):
except Exception, exc: except Exception, exc:
raise AuditException('Error when reading metadata: %s' % exc) raise AuditException('Error when reading metadata: %s' % exc)
_junk, account, container, obj = name.split('/', 3) _junk, account, container, obj = name.split('/', 3)
df = object_server.DiskFile(self.devices, device, df = object_server.DiskFile(self.devices, device, partition,
partition, account, account, container, obj, self.logger,
container, obj,
keep_data_fp=True) keep_data_fp=True)
if df.data_file is None: if df.data_file is None:
# file is deleted, we found the tombstone # file is deleted, we found the tombstone
return return
obj_size = os.path.getsize(df.data_file) try:
if obj_size != int(df.metadata['Content-Length']): obj_size = df.get_data_file_size()
raise AuditException('Content-Length of %s does not match ' except DiskFileError, e:
'file size of %s' % (int(df.metadata['Content-Length']), raise AuditException(str(e))
os.path.getsize(df.data_file))) except DiskFileNotExist:
return
if self.zero_byte_only_at_fps and obj_size: if self.zero_byte_only_at_fps and obj_size:
return return
etag = md5()
for chunk in df: for chunk in df:
self.bytes_running_time = ratelimit_sleep( self.bytes_running_time = ratelimit_sleep(
self.bytes_running_time, self.max_bytes_per_second, self.bytes_running_time, self.max_bytes_per_second,
incr_by=len(chunk)) incr_by=len(chunk))
etag.update(chunk)
self.bytes_processed += len(chunk) self.bytes_processed += len(chunk)
self.total_bytes_processed += len(chunk) self.total_bytes_processed += len(chunk)
etag = etag.hexdigest() df.close()
if etag != df.metadata['ETag']: if df.quarantined_dir:
raise AuditException("ETag of %s does not match file's md5 of " self.quarantines += 1
"%s" % (df.metadata['ETag'], etag)) self.logger.error(
_("ERROR Object %(path)s failed audit and will be "
"quarantined: ETag and file's md5 do not match"),
{'path': path})
except AuditException, err: except AuditException, err:
self.quarantines += 1 self.quarantines += 1
self.logger.error(_('ERROR Object %(obj)s failed audit and will ' self.logger.error(_('ERROR Object %(obj)s failed audit and will '
'be quarantined: %(err)s'), {'obj': path, 'err': err}) 'be quarantined: %(err)s'), {'obj': path, 'err': err})
object_server.DiskFile.quarantine( object_server.quarantine_renamer(
os.path.join(self.devices, device), path) os.path.join(self.devices, device), path)
return return
except Exception: except Exception:
#raise
self.errors += 1 self.errors += 1
self.logger.exception(_('ERROR Trying to audit %s'), path) self.logger.exception(_('ERROR Trying to audit %s'), path)
return return

View File

@ -42,7 +42,8 @@ from swift.common.utils import mkdirs, normalize_timestamp, \
from swift.common.bufferedhttp import http_connect from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_object_creation, check_mount, \ from swift.common.constraints import check_object_creation, check_mount, \
check_float, check_utf8 check_float, check_utf8
from swift.common.exceptions import ConnectionTimeout from swift.common.exceptions import ConnectionTimeout, DiskFileError, \
DiskFileNotExist
from swift.obj.replicator import get_hashes, invalidate_hash, \ from swift.obj.replicator import get_hashes, invalidate_hash, \
recalculate_hashes recalculate_hashes
@ -89,6 +90,32 @@ def write_metadata(fd, metadata):
key += 1 key += 1
def quarantine_renamer(device_path, corrupted_file_path):
"""
In the case that a file is corrupted, move it to a quarantined
area to allow replication to fix it.
:params device_path: The path to the device the corrupted file is on.
:params corrupted_file_path: The path to the file you want quarantined.
:returns: path (str) of directory the file was moved to
:raises OSError: re-raises non errno.EEXIST / errno.ENOTEMPTY
exceptions from rename
"""
from_dir = os.path.dirname(corrupted_file_path)
to_dir = os.path.join(device_path, 'quarantined',
'objects', os.path.basename(from_dir))
invalidate_hash(os.path.dirname(from_dir))
try:
renamer(from_dir, to_dir)
except OSError, e:
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
raise
to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex)
renamer(from_dir, to_dir)
return to_dir
class DiskFile(object): class DiskFile(object):
""" """
Manage object files on disk. Manage object files on disk.
@ -104,7 +131,8 @@ class DiskFile(object):
""" """
def __init__(self, path, device, partition, account, container, obj, def __init__(self, path, device, partition, account, container, obj,
keep_data_fp=False, disk_chunk_size=65536): logger, keep_data_fp=False, disk_chunk_size=65536):
self.disk_chunk_size = disk_chunk_size self.disk_chunk_size = disk_chunk_size
self.name = '/' + '/'.join((account, container, obj)) self.name = '/' + '/'.join((account, container, obj))
name_hash = hash_path(account, container, obj) name_hash = hash_path(account, container, obj)
@ -112,14 +140,20 @@ class DiskFile(object):
storage_directory(DATADIR, partition, name_hash)) storage_directory(DATADIR, partition, name_hash))
self.device_path = os.path.join(path, device) self.device_path = os.path.join(path, device)
self.tmpdir = os.path.join(path, device, 'tmp') self.tmpdir = os.path.join(path, device, 'tmp')
self.logger = logger
self.metadata = {} self.metadata = {}
self.meta_file = None self.meta_file = None
self.data_file = None self.data_file = None
self.fp = None self.fp = None
self.iter_etag = None
self.last_iter_pos = 0
self.read_to_eof = False
self.quarantined_dir = None
self.keep_cache = False self.keep_cache = False
if not os.path.exists(self.datadir): if not os.path.exists(self.datadir):
return return
files = sorted(os.listdir(self.datadir), reverse=True) files = sorted(os.listdir(self.datadir), reverse=True)
for file in files: for file in files:
if file.endswith('.ts'): if file.endswith('.ts'):
self.data_file = self.meta_file = None self.data_file = self.meta_file = None
@ -135,7 +169,7 @@ class DiskFile(object):
self.fp = open(self.data_file, 'rb') self.fp = open(self.data_file, 'rb')
self.metadata = read_metadata(self.fp) self.metadata = read_metadata(self.fp)
if not keep_data_fp: if not keep_data_fp:
self.close() self.close(verify_file=False)
if self.meta_file: if self.meta_file:
with open(self.meta_file) as mfp: with open(self.meta_file) as mfp:
for key in self.metadata.keys(): for key in self.metadata.keys():
@ -149,9 +183,18 @@ class DiskFile(object):
try: try:
dropped_cache = 0 dropped_cache = 0
read = 0 read = 0
self.last_iter_pos = 0
self.iter_etag = md5()
while True: while True:
pre_read_pos = self.fp.tell()
chunk = self.fp.read(self.disk_chunk_size) chunk = self.fp.read(self.disk_chunk_size)
if chunk: if chunk:
if self.iter_etag and self.last_iter_pos == pre_read_pos:
self.iter_etag.update(chunk)
self.last_iter_pos += len(chunk)
else:
# file has not been read sequentially
self.iter_etag = None
read += len(chunk) read += len(chunk)
if read - dropped_cache > (1024 * 1024): if read - dropped_cache > (1024 * 1024):
self.drop_cache(self.fp.fileno(), dropped_cache, self.drop_cache(self.fp.fileno(), dropped_cache,
@ -159,6 +202,7 @@ class DiskFile(object):
dropped_cache = read dropped_cache = read
yield chunk yield chunk
else: else:
self.read_to_eof = True
self.drop_cache(self.fp.fileno(), dropped_cache, self.drop_cache(self.fp.fileno(), dropped_cache,
read - dropped_cache) read - dropped_cache)
break break
@ -182,11 +226,42 @@ class DiskFile(object):
break break
yield chunk yield chunk
def close(self): def _handle_close_quarantine(self):
"""Close the file.""" """Check if file needs to be quarantined"""
obj_size = None
try:
obj_size = self.get_data_file_size()
except DiskFileError, e:
self.quarantine()
return
except DiskFileNotExist:
return
if (self.iter_etag and self.read_to_eof and self.metadata.get('ETag')
and obj_size == self.last_iter_pos and
self.iter_etag.hexdigest() != self.metadata['ETag']):
self.quarantine()
def close(self, verify_file=True):
"""
Close the file.
:param verify_file: Defaults to True, will handle quarantining
file if necessary.
"""
if self.fp: if self.fp:
self.fp.close() try:
self.fp = None if verify_file:
self._handle_close_quarantine()
except Exception, e:
import traceback
self.logger.error(_('ERROR DiskFile %(data_file)s in '
'%(data_dir)s close failure: %(exc)s : %(stack)'),
{'exc': e, 'stack': ''.join(traceback.format_stack()),
'data_file': self.data_file, 'data_dir': self.datadir})
finally:
self.fp.close()
self.fp = None
def is_deleted(self): def is_deleted(self):
""" """
@ -257,30 +332,42 @@ class DiskFile(object):
if not self.keep_cache: if not self.keep_cache:
drop_buffer_cache(fd, offset, length) drop_buffer_cache(fd, offset, length)
@classmethod def quarantine(self):
def quarantine(cls, device_path, corrupted_file_path):
""" """
In the case that a file is corrupted, move it to a quarantined In the case that a file is corrupted, move it to a quarantined
area to allow replication to fix it. area to allow replication to fix it.
:returns: if quarantine is successful, path to quarantined
:params device_path: The path to the device the corrupted file is on. directory otherwise None
:params corrupted_file_path: The path to the file you want quarantined. """
if not (self.is_deleted() or self.quarantined_dir):
:returns: path (str) of directory the file was moved to self.quarantined_dir = quarantine_renamer(self.device_path,
:raises OSError: re-raises non errno.EEXIST exceptions from rename self.data_file)
return self.quarantined_dir
def get_data_file_size(self):
"""
Returns the os.path.getsize for the file. Raises an exception if this
file does not match the Content-Length stored in the metadata. Or if
self.data_file does not exist.
:returns: file size as an int
:raises DiskFileError: on file size mismatch.
:raises DiskFileNotExist: on file not existing (including deleted)
""" """
from_dir = os.path.dirname(corrupted_file_path)
to_dir = os.path.join(device_path, 'quarantined',
'objects', os.path.basename(from_dir))
invalidate_hash(os.path.dirname(from_dir))
try: try:
renamer(from_dir, to_dir) file_size = 0
except OSError, e: if self.data_file:
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY): file_size = int(os.path.getsize(self.data_file))
if self.metadata.has_key('Content-Length'):
metadata_size = int(self.metadata['Content-Length'])
if file_size != metadata_size:
raise DiskFileError('Content-Length of %s does not '
'match file size of %s' % (metadata_size, file_size))
return file_size
except OSError, err:
if err.errno != errno.ENOENT:
raise raise
to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex) raise DiskFileNotExist('Data File does not exist.')
renamer(from_dir, to_dir)
return to_dir
class ObjectController(object): class ObjectController(object):
@ -370,13 +457,19 @@ class ObjectController(object):
if self.mount_check and not check_mount(self.devices, device): if self.mount_check and not check_mount(self.devices, device):
return Response(status='507 %s is not mounted' % device) return Response(status='507 %s is not mounted' % device)
file = DiskFile(self.devices, device, partition, account, container, file = DiskFile(self.devices, device, partition, account, container,
obj, disk_chunk_size=self.disk_chunk_size) obj, self.logger, disk_chunk_size=self.disk_chunk_size)
if file.is_deleted(): if file.is_deleted():
response_class = HTTPNotFound response_class = HTTPNotFound
else: else:
response_class = HTTPAccepted response_class = HTTPAccepted
try:
file_size = file.get_data_file_size()
except (DiskFileError, DiskFileNotExist):
file.quarantine()
return HTTPNotFound(request=request)
metadata = {'X-Timestamp': request.headers['x-timestamp']} metadata = {'X-Timestamp': request.headers['x-timestamp']}
metadata.update(val for val in request.headers.iteritems() metadata.update(val for val in request.headers.iteritems()
if val[0].lower().startswith('x-object-meta-')) if val[0].lower().startswith('x-object-meta-'))
@ -402,7 +495,7 @@ class ObjectController(object):
if error_response: if error_response:
return error_response return error_response
file = DiskFile(self.devices, device, partition, account, container, file = DiskFile(self.devices, device, partition, account, container,
obj, disk_chunk_size=self.disk_chunk_size) obj, self.logger, disk_chunk_size=self.disk_chunk_size)
upload_expiration = time.time() + self.max_upload_time upload_expiration = time.time() + self.max_upload_time
etag = md5() etag = md5()
upload_size = 0 upload_size = 0
@ -470,12 +563,18 @@ class ObjectController(object):
if self.mount_check and not check_mount(self.devices, device): if self.mount_check and not check_mount(self.devices, device):
return Response(status='507 %s is not mounted' % device) return Response(status='507 %s is not mounted' % device)
file = DiskFile(self.devices, device, partition, account, container, file = DiskFile(self.devices, device, partition, account, container,
obj, keep_data_fp=True, disk_chunk_size=self.disk_chunk_size) obj, self.logger, keep_data_fp=True,
disk_chunk_size=self.disk_chunk_size)
if file.is_deleted(): if file.is_deleted():
if request.headers.get('if-match') == '*': if request.headers.get('if-match') == '*':
return HTTPPreconditionFailed(request=request) return HTTPPreconditionFailed(request=request)
else: else:
return HTTPNotFound(request=request) return HTTPNotFound(request=request)
try:
file_size = file.get_data_file_size()
except (DiskFileError, DiskFileNotExist):
file.quarantine()
return HTTPNotFound(request=request)
if request.headers.get('if-match') not in (None, '*') and \ if request.headers.get('if-match') not in (None, '*') and \
file.metadata['ETag'] not in request.if_match: file.metadata['ETag'] not in request.if_match:
file.close() file.close()
@ -515,7 +614,7 @@ class ObjectController(object):
response.headers[key] = value response.headers[key] = value
response.etag = file.metadata['ETag'] response.etag = file.metadata['ETag']
response.last_modified = float(file.metadata['X-Timestamp']) response.last_modified = float(file.metadata['X-Timestamp'])
response.content_length = int(file.metadata['Content-Length']) response.content_length = file_size
if response.content_length < KEEP_CACHE_SIZE and \ if response.content_length < KEEP_CACHE_SIZE and \
'X-Auth-Token' not in request.headers and \ 'X-Auth-Token' not in request.headers and \
'X-Storage-Token' not in request.headers: 'X-Storage-Token' not in request.headers:
@ -537,9 +636,14 @@ class ObjectController(object):
if self.mount_check and not check_mount(self.devices, device): if self.mount_check and not check_mount(self.devices, device):
return Response(status='507 %s is not mounted' % device) return Response(status='507 %s is not mounted' % device)
file = DiskFile(self.devices, device, partition, account, container, file = DiskFile(self.devices, device, partition, account, container,
obj, disk_chunk_size=self.disk_chunk_size) obj, self.logger, disk_chunk_size=self.disk_chunk_size)
if file.is_deleted(): if file.is_deleted():
return HTTPNotFound(request=request) return HTTPNotFound(request=request)
try:
file_size = file.get_data_file_size()
except (DiskFileError, DiskFileNotExist):
file.quarantine()
return HTTPNotFound(request=request)
response = Response(content_type=file.metadata['Content-Type'], response = Response(content_type=file.metadata['Content-Type'],
request=request, conditional_response=True) request=request, conditional_response=True)
for key, value in file.metadata.iteritems(): for key, value in file.metadata.iteritems():
@ -548,7 +652,7 @@ class ObjectController(object):
response.headers[key] = value response.headers[key] = value
response.etag = file.metadata['ETag'] response.etag = file.metadata['ETag']
response.last_modified = float(file.metadata['X-Timestamp']) response.last_modified = float(file.metadata['X-Timestamp'])
response.content_length = int(file.metadata['Content-Length']) response.content_length = file_size
if 'Content-Encoding' in file.metadata: if 'Content-Encoding' in file.metadata:
response.content_encoding = file.metadata['Content-Encoding'] response.content_encoding = file.metadata['Content-Encoding']
return response return response
@ -569,7 +673,7 @@ class ObjectController(object):
return Response(status='507 %s is not mounted' % device) return Response(status='507 %s is not mounted' % device)
response_class = HTTPNoContent response_class = HTTPNoContent
file = DiskFile(self.devices, device, partition, account, container, file = DiskFile(self.devices, device, partition, account, container,
obj, disk_chunk_size=self.disk_chunk_size) obj, self.logger, disk_chunk_size=self.disk_chunk_size)
if file.is_deleted(): if file.is_deleted():
response_class = HTTPNotFound response_class = HTTPNotFound
metadata = { metadata = {

View File

@ -0,0 +1,173 @@
#!/usr/bin/python -u
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import os
from signal import SIGTERM
from subprocess import call, Popen
from time import sleep
from uuid import uuid4
from swift.common import client, direct_client
from swift.common.utils import hash_path, readconf
from swift.obj.server import write_metadata, read_metadata
from test.probe.common import kill_pids, reset_environment
from test.unit import FakeLogger
class TestObjectFailures(unittest.TestCase):
def setUp(self):
self.pids, self.port2server, self.account_ring, self.container_ring, \
self.object_ring, self.url, self.token, self.account = \
reset_environment()
def tearDown(self):
kill_pids(self.pids)
def _get_data_file_path(self, obj_dir):
files = sorted(os.listdir(obj_dir), reverse=True)
for file in files:
return os.path.join(obj_dir, file)
def run_quarantine(self):
container = 'container-%s' % uuid4()
obj = 'object-%s' % uuid4()
client.put_container(self.url, self.token, container)
client.put_object(self.url, self.token, container, obj, 'VERIFY')
odata = client.get_object(self.url, self.token, container, obj)[-1]
self.assertEquals(odata, 'VERIFY')
opart, onodes = self.object_ring.get_nodes(
self.account, container, obj)
onode = onodes[0]
node_id = (onode['port'] - 6000) / 10
device = onode['device']
hash_str = hash_path(self.account, container, obj)
obj_server_conf = readconf('/etc/swift/object-server/%s.conf' %
node_id)
devices = obj_server_conf['app:object-server']['devices']
obj_dir = '%s/%s/objects/%s/%s/%s/' % (devices,
device, opart,
hash_str[-3:], hash_str)
data_file = self._get_data_file_path(obj_dir)
with open(data_file) as fp:
metadata = read_metadata(fp)
metadata['ETag'] = 'badetag'
with open(data_file) as fp:
write_metadata(fp, metadata)
odata = direct_client.direct_get_object(onode, opart,
self.account, container, obj)[-1]
self.assertEquals(odata, 'VERIFY')
try:
resp = direct_client.direct_get_object(onode, opart, self.account,
container, obj)
raise "Did not quarantine object"
except client.ClientException, e:
self.assertEquals(e.http_status, 404)
def run_quarantine_range_etag(self):
container = 'container-range-%s' % uuid4()
obj = 'object-range-%s' % uuid4()
client.put_container(self.url, self.token, container)
client.put_object(self.url, self.token, container, obj, 'RANGE')
odata = client.get_object(self.url, self.token, container, obj)[-1]
self.assertEquals(odata, 'RANGE')
opart, onodes = self.object_ring.get_nodes(
self.account, container, obj)
onode = onodes[0]
node_id = (onode['port'] - 6000) / 10
device = onode['device']
hash_str = hash_path(self.account, container, obj)
obj_server_conf = readconf('/etc/swift/object-server/%s.conf' %
node_id)
devices = obj_server_conf['app:object-server']['devices']
obj_dir = '%s/%s/objects/%s/%s/%s/' % (devices,
device, opart,
hash_str[-3:], hash_str)
data_file = self._get_data_file_path(obj_dir)
with open(data_file) as fp:
metadata = read_metadata(fp)
metadata['ETag'] = 'badetag'
with open(data_file) as fp:
write_metadata(fp, metadata)
for header, result in [({'Range': 'bytes=0-2'}, 'RAN'),
({'Range': 'bytes=1-11'}, 'ANGE'),
({'Range': 'bytes=0-11'}, 'RANGE'),]:
odata = direct_client.direct_get_object(onode, opart,
self.account, container, obj,
headers=header)[-1]
self.assertEquals(odata, result)
try:
resp = direct_client.direct_get_object(onode, opart, self.account,
container, obj)
raise "Did not quarantine object"
except client.ClientException, e:
self.assertEquals(e.http_status, 404)
def run_quarantine_range_zero_byte(self):
container = 'container-zbyte-%s' % uuid4()
obj = 'object-zbyte-%s' % uuid4()
client.put_container(self.url, self.token, container)
client.put_object(self.url, self.token, container, obj, 'ZBYTE')
odata = client.get_object(self.url, self.token, container, obj)[-1]
self.assertEquals(odata, 'ZBYTE')
opart, onodes = self.object_ring.get_nodes(
self.account, container, obj)
onode = onodes[0]
node_id = (onode['port'] - 6000) / 10
device = onode['device']
hash_str = hash_path(self.account, container, obj)
obj_server_conf = readconf('/etc/swift/object-server/%s.conf' %
node_id)
devices = obj_server_conf['app:object-server']['devices']
obj_dir = '%s/%s/objects/%s/%s/%s/' % (devices,
device, opart,
hash_str[-3:], hash_str)
data_file = self._get_data_file_path(obj_dir)
with open(data_file) as fp:
metadata = read_metadata(fp)
os.unlink(data_file)
with open(data_file,'w') as fp:
write_metadata(fp, metadata)
try:
resp = direct_client.direct_get_object(onode, opart, self.account,
container, obj,
conn_timeout=1,
response_timeout=1)
raise "Did not quarantine object"
except client.ClientException, e:
self.assertEquals(e.http_status, 404)
def test_runner(self):
self.run_quarantine()
self.run_quarantine_range_etag()
self.run_quarantine_range_zero_byte()
if __name__ == '__main__':
unittest.main()

View File

@ -91,6 +91,22 @@ def temptree(files, contents=''):
rmtree(tempdir) rmtree(tempdir)
class FakeLogger(object):
# a thread safe logger
def __init__(self):
self.log_dict = dict(error=[], info=[], warning=[])
def error(self, *args, **kwargs):
self.log_dict['error'].append((args, kwargs))
def info(self, *args, **kwargs):
self.log_dict['info'].append((args, kwargs))
def warning(self, *args, **kwargs):
self.log_dict['warning'].append((args, kwargs))
class MockTrue(object): class MockTrue(object):
""" """
Instances of MockTrue evaluate like True Instances of MockTrue evaluate like True

View File

@ -19,6 +19,7 @@ from contextlib import contextmanager
from threading import Thread from threading import Thread
from webob import Request from webob import Request
from test.unit import FakeLogger
from swift.common.middleware import ratelimit from swift.common.middleware import ratelimit
from swift.proxy.server import get_container_memcache_key from swift.proxy.server import get_container_memcache_key
from swift.common.memcached import MemcacheConnectionError from swift.common.memcached import MemcacheConnectionError
@ -96,19 +97,6 @@ class FakeApp(object):
return ['204 No Content'] return ['204 No Content']
class FakeLogger(object):
# a thread safe logger
def error(self, *args, **kwargs):
pass
def info(self, *args, **kwargs):
pass
def warning(self, *args, **kwargs):
pass
def start_response(*args): def start_response(*args):
pass pass

View File

@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# TODO: Tests
from test import unit from test import unit
import unittest import unittest
import tempfile import tempfile
@ -22,6 +21,7 @@ import time
from shutil import rmtree from shutil import rmtree
from hashlib import md5 from hashlib import md5
from tempfile import mkdtemp from tempfile import mkdtemp
from test.unit import FakeLogger
from swift.obj import auditor from swift.obj import auditor
from swift.obj import server as object_server from swift.obj import server as object_server
from swift.obj.server import DiskFile, write_metadata, DATADIR from swift.obj.server import DiskFile, write_metadata, DATADIR
@ -34,13 +34,11 @@ from swift.common.exceptions import AuditException
class TestAuditor(unittest.TestCase): class TestAuditor(unittest.TestCase):
def setUp(self): def setUp(self):
self.testdir = \ self.testdir = os.path.join(mkdtemp(), 'tmp_test_object_auditor')
os.path.join(mkdtemp(), 'tmp_test_object_auditor')
self.devices = os.path.join(self.testdir, 'node') self.devices = os.path.join(self.testdir, 'node')
self.logger = FakeLogger()
rmtree(self.testdir, ignore_errors=1) rmtree(self.testdir, ignore_errors=1)
os.mkdir(self.testdir) mkdirs(os.path.join(self.devices, 'sda'))
os.mkdir(self.devices)
os.mkdir(os.path.join(self.devices, 'sda'))
self.objects = os.path.join(self.devices, 'sda', 'objects') self.objects = os.path.join(self.devices, 'sda', 'objects')
os.mkdir(os.path.join(self.devices, 'sdb')) os.mkdir(os.path.join(self.devices, 'sdb'))
@ -55,6 +53,8 @@ class TestAuditor(unittest.TestCase):
self.conf = dict( self.conf = dict(
devices=self.devices, devices=self.devices,
mount_check='false') mount_check='false')
self.disk_file = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o',
self.logger)
def tearDown(self): def tearDown(self):
rmtree(os.path.dirname(self.testdir), ignore_errors=1) rmtree(os.path.dirname(self.testdir), ignore_errors=1)
@ -62,11 +62,9 @@ class TestAuditor(unittest.TestCase):
def test_object_audit_extra_data(self): def test_object_audit_extra_data(self):
self.auditor = auditor.AuditorWorker(self.conf) self.auditor = auditor.AuditorWorker(self.conf)
cur_part = '0'
disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
data = '0' * 1024 data = '0' * 1024
etag = md5() etag = md5()
with disk_file.mkstemp() as (fd, tmppath): with self.disk_file.mkstemp() as (fd, tmppath):
os.write(fd, data) os.write(fd, data)
etag.update(data) etag.update(data)
etag = etag.hexdigest() etag = etag.hexdigest()
@ -76,28 +74,26 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': timestamp, 'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size), 'Content-Length': str(os.fstat(fd).st_size),
} }
disk_file.put(fd, tmppath, metadata) self.disk_file.put(fd, tmppath, metadata)
pre_quarantines = self.auditor.quarantines pre_quarantines = self.auditor.quarantines
self.auditor.object_audit( self.auditor.object_audit(
os.path.join(disk_file.datadir, timestamp + '.data'), os.path.join(self.disk_file.datadir, timestamp + '.data'),
'sda', cur_part) 'sda', '0')
self.assertEquals(self.auditor.quarantines, pre_quarantines) self.assertEquals(self.auditor.quarantines, pre_quarantines)
os.write(fd, 'extra_data') os.write(fd, 'extra_data')
self.auditor.object_audit( self.auditor.object_audit(
os.path.join(disk_file.datadir, timestamp + '.data'), os.path.join(self.disk_file.datadir, timestamp + '.data'),
'sda', cur_part) 'sda', '0')
self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
def test_object_audit_diff_data(self): def test_object_audit_diff_data(self):
self.auditor = auditor.AuditorWorker(self.conf) self.auditor = auditor.AuditorWorker(self.conf)
cur_part = '0'
disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
data = '0' * 1024 data = '0' * 1024
etag = md5() etag = md5()
timestamp = str(normalize_timestamp(time.time())) timestamp = str(normalize_timestamp(time.time()))
with disk_file.mkstemp() as (fd, tmppath): with self.disk_file.mkstemp() as (fd, tmppath):
os.write(fd, data) os.write(fd, data)
etag.update(data) etag.update(data)
etag = etag.hexdigest() etag = etag.hexdigest()
@ -106,12 +102,15 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': timestamp, 'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size), 'Content-Length': str(os.fstat(fd).st_size),
} }
disk_file.put(fd, tmppath, metadata) self.disk_file.put(fd, tmppath, metadata)
pre_quarantines = self.auditor.quarantines pre_quarantines = self.auditor.quarantines
# remake to it will have metadata
self.disk_file = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o',
self.logger)
self.auditor.object_audit( self.auditor.object_audit(
os.path.join(disk_file.datadir, timestamp + '.data'), os.path.join(self.disk_file.datadir, timestamp + '.data'),
'sda', cur_part) 'sda', '0')
self.assertEquals(self.auditor.quarantines, pre_quarantines) self.assertEquals(self.auditor.quarantines, pre_quarantines)
etag = md5() etag = md5()
etag.update('1' + '0' * 1023) etag.update('1' + '0' * 1023)
@ -120,25 +119,23 @@ class TestAuditor(unittest.TestCase):
write_metadata(fd, metadata) write_metadata(fd, metadata)
self.auditor.object_audit( self.auditor.object_audit(
os.path.join(disk_file.datadir, timestamp + '.data'), os.path.join(self.disk_file.datadir, timestamp + '.data'),
'sda', cur_part) 'sda', '0')
self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
def test_object_audit_no_meta(self): def test_object_audit_no_meta(self):
cur_part = '0'
disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
timestamp = str(normalize_timestamp(time.time())) timestamp = str(normalize_timestamp(time.time()))
path = os.path.join(disk_file.datadir, timestamp + '.data') path = os.path.join(self.disk_file.datadir, timestamp + '.data')
mkdirs(disk_file.datadir) mkdirs(self.disk_file.datadir)
fp = open(path, 'w') fp = open(path, 'w')
fp.write('0' * 1024) fp.write('0' * 1024)
fp.close() fp.close()
invalidate_hash(os.path.dirname(disk_file.datadir)) invalidate_hash(os.path.dirname(self.disk_file.datadir))
self.auditor = auditor.AuditorWorker(self.conf) self.auditor = auditor.AuditorWorker(self.conf)
pre_quarantines = self.auditor.quarantines pre_quarantines = self.auditor.quarantines
self.auditor.object_audit( self.auditor.object_audit(
os.path.join(disk_file.datadir, timestamp + '.data'), os.path.join(self.disk_file.datadir, timestamp + '.data'),
'sda', cur_part) 'sda', '0')
self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
def test_object_audit_bad_args(self): def test_object_audit_bad_args(self):
@ -153,13 +150,11 @@ class TestAuditor(unittest.TestCase):
def test_object_run_once_pass(self): def test_object_run_once_pass(self):
self.auditor = auditor.AuditorWorker(self.conf) self.auditor = auditor.AuditorWorker(self.conf)
self.auditor.log_time = 0 self.auditor.log_time = 0
cur_part = '0'
timestamp = str(normalize_timestamp(time.time())) timestamp = str(normalize_timestamp(time.time()))
pre_quarantines = self.auditor.quarantines pre_quarantines = self.auditor.quarantines
disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
data = '0' * 1024 data = '0' * 1024
etag = md5() etag = md5()
with disk_file.mkstemp() as (fd, tmppath): with self.disk_file.mkstemp() as (fd, tmppath):
os.write(fd, data) os.write(fd, data)
etag.update(data) etag.update(data)
etag = etag.hexdigest() etag = etag.hexdigest()
@ -168,20 +163,18 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': timestamp, 'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size), 'Content-Length': str(os.fstat(fd).st_size),
} }
disk_file.put(fd, tmppath, metadata) self.disk_file.put(fd, tmppath, metadata)
disk_file.close() self.disk_file.close()
self.auditor.audit_all_objects() self.auditor.audit_all_objects()
self.assertEquals(self.auditor.quarantines, pre_quarantines) self.assertEquals(self.auditor.quarantines, pre_quarantines)
def test_object_run_once_no_sda(self): def test_object_run_once_no_sda(self):
self.auditor = auditor.AuditorWorker(self.conf) self.auditor = auditor.AuditorWorker(self.conf)
cur_part = '0'
timestamp = str(normalize_timestamp(time.time())) timestamp = str(normalize_timestamp(time.time()))
pre_quarantines = self.auditor.quarantines pre_quarantines = self.auditor.quarantines
disk_file = DiskFile(self.devices, 'sdb', cur_part, 'a', 'c', 'o')
data = '0' * 1024 data = '0' * 1024
etag = md5() etag = md5()
with disk_file.mkstemp() as (fd, tmppath): with self.disk_file.mkstemp() as (fd, tmppath):
os.write(fd, data) os.write(fd, data)
etag.update(data) etag.update(data)
etag = etag.hexdigest() etag = etag.hexdigest()
@ -190,21 +183,19 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': timestamp, 'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size), 'Content-Length': str(os.fstat(fd).st_size),
} }
disk_file.put(fd, tmppath, metadata) self.disk_file.put(fd, tmppath, metadata)
disk_file.close() self.disk_file.close()
os.write(fd, 'extra_data') os.write(fd, 'extra_data')
self.auditor.audit_all_objects() self.auditor.audit_all_objects()
self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
def test_object_run_once_multi_devices(self): def test_object_run_once_multi_devices(self):
self.auditor = auditor.AuditorWorker(self.conf) self.auditor = auditor.AuditorWorker(self.conf)
cur_part = '0'
timestamp = str(normalize_timestamp(time.time())) timestamp = str(normalize_timestamp(time.time()))
pre_quarantines = self.auditor.quarantines pre_quarantines = self.auditor.quarantines
disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
data = '0' * 10 data = '0' * 10
etag = md5() etag = md5()
with disk_file.mkstemp() as (fd, tmppath): with self.disk_file.mkstemp() as (fd, tmppath):
os.write(fd, data) os.write(fd, data)
etag.update(data) etag.update(data)
etag = etag.hexdigest() etag = etag.hexdigest()
@ -213,13 +204,14 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': timestamp, 'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size), 'Content-Length': str(os.fstat(fd).st_size),
} }
disk_file.put(fd, tmppath, metadata) self.disk_file.put(fd, tmppath, metadata)
disk_file.close() self.disk_file.close()
self.auditor.audit_all_objects() self.auditor.audit_all_objects()
disk_file = DiskFile(self.devices, 'sdb', cur_part, 'a', 'c', 'ob') self.disk_file = DiskFile(self.devices, 'sdb', '0', 'a', 'c',
'ob', self.logger)
data = '1' * 10 data = '1' * 10
etag = md5() etag = md5()
with disk_file.mkstemp() as (fd, tmppath): with self.disk_file.mkstemp() as (fd, tmppath):
os.write(fd, data) os.write(fd, data)
etag.update(data) etag.update(data)
etag = etag.hexdigest() etag = etag.hexdigest()
@ -228,8 +220,8 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': timestamp, 'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size), 'Content-Length': str(os.fstat(fd).st_size),
} }
disk_file.put(fd, tmppath, metadata) self.disk_file.put(fd, tmppath, metadata)
disk_file.close() self.disk_file.close()
os.write(fd, 'extra_data') os.write(fd, 'extra_data')
self.auditor.audit_all_objects() self.auditor.audit_all_objects()
self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
@ -237,11 +229,9 @@ class TestAuditor(unittest.TestCase):
def test_object_run_fast_track_non_zero(self): def test_object_run_fast_track_non_zero(self):
self.auditor = auditor.ObjectAuditor(self.conf) self.auditor = auditor.ObjectAuditor(self.conf)
self.auditor.log_time = 0 self.auditor.log_time = 0
cur_part = '0'
disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
data = '0' * 1024 data = '0' * 1024
etag = md5() etag = md5()
with disk_file.mkstemp() as (fd, tmppath): with self.disk_file.mkstemp() as (fd, tmppath):
os.write(fd, data) os.write(fd, data)
etag.update(data) etag.update(data)
etag = etag.hexdigest() etag = etag.hexdigest()
@ -250,7 +240,7 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': str(normalize_timestamp(time.time())), 'X-Timestamp': str(normalize_timestamp(time.time())),
'Content-Length': str(os.fstat(fd).st_size), 'Content-Length': str(os.fstat(fd).st_size),
} }
disk_file.put(fd, tmppath, metadata) self.disk_file.put(fd, tmppath, metadata)
etag = md5() etag = md5()
etag.update('1' + '0' * 1023) etag.update('1' + '0' * 1023)
etag = etag.hexdigest() etag = etag.hexdigest()
@ -267,36 +257,33 @@ class TestAuditor(unittest.TestCase):
def setup_bad_zero_byte(self, with_ts=False): def setup_bad_zero_byte(self, with_ts=False):
self.auditor = auditor.ObjectAuditor(self.conf) self.auditor = auditor.ObjectAuditor(self.conf)
self.auditor.log_time = 0 self.auditor.log_time = 0
cur_part = '0'
ts_file_path = '' ts_file_path = ''
if with_ts: if with_ts:
name_hash = hash_path('a', 'c', 'o') name_hash = hash_path('a', 'c', 'o')
dir_path = os.path.join(self.devices, 'sda', dir_path = os.path.join(self.devices, 'sda',
storage_directory(DATADIR, cur_part, name_hash)) storage_directory(DATADIR, '0', name_hash))
ts_file_path = os.path.join(dir_path, '99999.ts') ts_file_path = os.path.join(dir_path, '99999.ts')
if not os.path.exists(dir_path): if not os.path.exists(dir_path):
mkdirs(dir_path) mkdirs(dir_path)
fp = open(ts_file_path, 'w') fp = open(ts_file_path, 'w')
fp.close() fp.close()
disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
etag = md5() etag = md5()
with disk_file.mkstemp() as (fd, tmppath): with self.disk_file.mkstemp() as (fd, tmppath):
etag = etag.hexdigest() etag = etag.hexdigest()
metadata = { metadata = {
'ETag': etag, 'ETag': etag,
'X-Timestamp': str(normalize_timestamp(time.time())), 'X-Timestamp': str(normalize_timestamp(time.time())),
'Content-Length': 10, 'Content-Length': 10,
} }
disk_file.put(fd, tmppath, metadata) self.disk_file.put(fd, tmppath, metadata)
etag = md5() etag = md5()
etag = etag.hexdigest() etag = etag.hexdigest()
metadata['ETag'] = etag metadata['ETag'] = etag
write_metadata(fd, metadata) write_metadata(fd, metadata)
if disk_file.data_file: if self.disk_file.data_file:
return disk_file.data_file return self.disk_file.data_file
return ts_file_path return ts_file_path
def test_object_run_fast_track_all(self): def test_object_run_fast_track_all(self):

View File

@ -26,8 +26,8 @@ import time
import tempfile import tempfile
from contextlib import contextmanager from contextlib import contextmanager
from eventlet import tpool from eventlet import tpool
from eventlet.green import subprocess from eventlet.green import subprocess
from test.unit import FakeLogger
from swift.common import utils from swift.common import utils
from swift.common.utils import hash_path, mkdirs, normalize_timestamp from swift.common.utils import hash_path, mkdirs, normalize_timestamp
from swift.common import ring from swift.common import ring
@ -164,7 +164,8 @@ class TestObjectReplicator(unittest.TestCase):
was_connector = object_replicator.http_connect was_connector = object_replicator.http_connect
object_replicator.http_connect = mock_http_connect(200) object_replicator.http_connect = mock_http_connect(200)
cur_part = '0' cur_part = '0'
df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o',
FakeLogger())
mkdirs(df.datadir) mkdirs(df.datadir)
f = open(os.path.join(df.datadir, f = open(os.path.join(df.datadir,
normalize_timestamp(time.time()) + '.data'), normalize_timestamp(time.time()) + '.data'),
@ -189,7 +190,7 @@ class TestObjectReplicator(unittest.TestCase):
object_replicator.http_connect = was_connector object_replicator.http_connect = was_connector
def test_hash_suffix_one_file(self): def test_hash_suffix_one_file(self):
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
mkdirs(df.datadir) mkdirs(df.datadir)
f = open(os.path.join(df.datadir, f = open(os.path.join(df.datadir,
normalize_timestamp(time.time() - 100) + '.ts'), normalize_timestamp(time.time() - 100) + '.ts'),
@ -206,7 +207,7 @@ class TestObjectReplicator(unittest.TestCase):
self.assertEquals(len(os.listdir(self.parts['0'])), 0) self.assertEquals(len(os.listdir(self.parts['0'])), 0)
def test_hash_suffix_multi_file_one(self): def test_hash_suffix_multi_file_one(self):
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
mkdirs(df.datadir) mkdirs(df.datadir)
for tdiff in [1, 50, 100, 500]: for tdiff in [1, 50, 100, 500]:
for suff in ['.meta', '.data', '.ts']: for suff in ['.meta', '.data', '.ts']:
@ -227,7 +228,7 @@ class TestObjectReplicator(unittest.TestCase):
self.assertEquals(len(os.listdir(whole_hsh_path)), 1) self.assertEquals(len(os.listdir(whole_hsh_path)), 1)
def test_hash_suffix_multi_file_two(self): def test_hash_suffix_multi_file_two(self):
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
mkdirs(df.datadir) mkdirs(df.datadir)
for tdiff in [1, 50, 100, 500]: for tdiff in [1, 50, 100, 500]:
suffs = ['.meta', '.data'] suffs = ['.meta', '.data']
@ -257,7 +258,7 @@ class TestObjectReplicator(unittest.TestCase):
fdata = fp.read() fdata = fp.read()
self.assertEquals(fdata, data) self.assertEquals(fdata, data)
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
mkdirs(df.datadir) mkdirs(df.datadir)
ohash = hash_path('a', 'c', 'o') ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:] data_dir = ohash[-3:]
@ -312,7 +313,7 @@ class TestObjectReplicator(unittest.TestCase):
os.path.join(self.objects, part)) os.path.join(self.objects, part))
def test_delete_partition(self): def test_delete_partition(self):
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
mkdirs(df.datadir) mkdirs(df.datadir)
ohash = hash_path('a', 'c', 'o') ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:] data_dir = ohash[-3:]
@ -330,7 +331,8 @@ class TestObjectReplicator(unittest.TestCase):
# Write some files into '1' and run replicate- they should be moved # Write some files into '1' and run replicate- they should be moved
# to the other partitoins and then node should get deleted. # to the other partitoins and then node should get deleted.
cur_part = '1' cur_part = '1'
df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o',
FakeLogger())
mkdirs(df.datadir) mkdirs(df.datadir)
f = open(os.path.join(df.datadir, f = open(os.path.join(df.datadir,
normalize_timestamp(time.time()) + '.data'), normalize_timestamp(time.time()) + '.data'),

View File

@ -18,22 +18,25 @@
import cPickle as pickle import cPickle as pickle
import os import os
import sys import sys
import shutil
import unittest import unittest
from nose import SkipTest from nose import SkipTest
from shutil import rmtree from shutil import rmtree
from StringIO import StringIO from StringIO import StringIO
from time import gmtime, sleep, strftime, time from time import gmtime, sleep, strftime, time
from tempfile import mkdtemp from tempfile import mkdtemp
from hashlib import md5
from eventlet import sleep, spawn, wsgi, listen from eventlet import sleep, spawn, wsgi, listen
from webob import Request from webob import Request
from test.unit import FakeLogger
from test.unit import _getxattr as getxattr from test.unit import _getxattr as getxattr
from test.unit import _setxattr as setxattr from test.unit import _setxattr as setxattr
from test.unit import connect_tcp, readuntil2crlfs from test.unit import connect_tcp, readuntil2crlfs
from swift.obj import server as object_server from swift.obj import server as object_server
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \ from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
NullLogger, storage_directory NullLogger, storage_directory
from swift.common.exceptions import DiskFileNotExist
class TestDiskFile(unittest.TestCase): class TestDiskFile(unittest.TestCase):
@ -49,7 +52,8 @@ class TestDiskFile(unittest.TestCase):
rmtree(os.path.dirname(self.testdir)) rmtree(os.path.dirname(self.testdir))
def test_disk_file_app_iter_corners(self): def test_disk_file_app_iter_corners(self):
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o') df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
mkdirs(df.datadir) mkdirs(df.datadir)
f = open(os.path.join(df.datadir, f = open(os.path.join(df.datadir,
normalize_timestamp(time()) + '.data'), 'wb') normalize_timestamp(time()) + '.data'), 'wb')
@ -58,7 +62,7 @@ class TestDiskFile(unittest.TestCase):
pickle.dumps({}, object_server.PICKLE_PROTOCOL)) pickle.dumps({}, object_server.PICKLE_PROTOCOL))
f.close() f.close()
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
keep_data_fp=True) FakeLogger(), keep_data_fp=True)
it = df.app_iter_range(0, None) it = df.app_iter_range(0, None)
sio = StringIO() sio = StringIO()
for chunk in it: for chunk in it:
@ -66,7 +70,7 @@ class TestDiskFile(unittest.TestCase):
self.assertEquals(sio.getvalue(), '1234567890') self.assertEquals(sio.getvalue(), '1234567890')
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
keep_data_fp=True) FakeLogger(), keep_data_fp=True)
it = df.app_iter_range(5, None) it = df.app_iter_range(5, None)
sio = StringIO() sio = StringIO()
for chunk in it: for chunk in it:
@ -77,47 +81,201 @@ class TestDiskFile(unittest.TestCase):
tmpdir = os.path.join(self.testdir, 'sda1', 'tmp') tmpdir = os.path.join(self.testdir, 'sda1', 'tmp')
os.rmdir(tmpdir) os.rmdir(tmpdir)
with object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', with object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
'o').mkstemp(): 'o', FakeLogger()).mkstemp():
self.assert_(os.path.exists(tmpdir)) self.assert_(os.path.exists(tmpdir))
def test_quarantine(self): def test_quarantine(self):
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o') df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
mkdirs(df.datadir) mkdirs(df.datadir)
f = open(os.path.join(df.datadir, f = open(os.path.join(df.datadir,
normalize_timestamp(time()) + '.data'), 'wb') normalize_timestamp(time()) + '.data'), 'wb')
setxattr(f.fileno(), object_server.METADATA_KEY, setxattr(f.fileno(), object_server.METADATA_KEY,
pickle.dumps({}, object_server.PICKLE_PROTOCOL)) pickle.dumps({}, object_server.PICKLE_PROTOCOL))
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o') df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
object_server.DiskFile.quarantine(df.device_path, df.data_file) FakeLogger())
df.quarantine()
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined',
'objects', os.path.basename(os.path.dirname( 'objects', os.path.basename(os.path.dirname(
df.data_file))) df.data_file)))
self.assert_(os.path.isdir(quar_dir)) self.assert_(os.path.isdir(quar_dir))
def test_quarantine_double(self): def test_quarantine_same_file(self):
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o') df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
mkdirs(df.datadir) mkdirs(df.datadir)
f = open(os.path.join(df.datadir, f = open(os.path.join(df.datadir,
normalize_timestamp(time()) + '.data'), 'wb') normalize_timestamp(time()) + '.data'), 'wb')
setxattr(f.fileno(), object_server.METADATA_KEY, setxattr(f.fileno(), object_server.METADATA_KEY,
pickle.dumps({}, object_server.PICKLE_PROTOCOL)) pickle.dumps({}, object_server.PICKLE_PROTOCOL))
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o') df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
new_dir = object_server.DiskFile.quarantine(df.device_path, FakeLogger())
df.data_file) new_dir = df.quarantine()
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined',
'objects', os.path.basename(os.path.dirname( 'objects', os.path.basename(os.path.dirname(
df.data_file))) df.data_file)))
self.assert_(os.path.isdir(quar_dir)) self.assert_(os.path.isdir(quar_dir))
self.assertEquals(quar_dir, new_dir) self.assertEquals(quar_dir, new_dir)
# have to remake the datadir # have to remake the datadir and file
mkdirs(df.datadir) mkdirs(df.datadir)
double_uuid_path = df.quarantine(df.device_path, df.data_file) f = open(os.path.join(df.datadir,
normalize_timestamp(time()) + '.data'), 'wb')
setxattr(f.fileno(), object_server.METADATA_KEY,
pickle.dumps({}, object_server.PICKLE_PROTOCOL))
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
double_uuid_path = df.quarantine()
self.assert_(os.path.isdir(double_uuid_path)) self.assert_(os.path.isdir(double_uuid_path))
self.assert_('-' in os.path.basename(double_uuid_path)) self.assert_('-' in os.path.basename(double_uuid_path))
def _get_data_file(self, invalid_type=None, obj_name='o',
fsize=1024, csize=8, extension='.data', ts=None):
'''returns a DiskFile'''
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
obj_name, FakeLogger())
data = '0' * fsize
etag = md5()
if ts:
timestamp = ts
else:
timestamp = str(normalize_timestamp(time()))
with df.mkstemp() as (fd, tmppath):
os.write(fd, data)
etag.update(data)
etag = etag.hexdigest()
metadata = {
'ETag': etag,
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size),
}
df.put(fd, tmppath, metadata, extension=extension)
if invalid_type == 'ETag':
etag = md5()
etag.update('1' + '0' * (fsize-1))
etag = etag.hexdigest()
metadata['ETag'] = etag
object_server.write_metadata(fd, metadata)
if invalid_type == 'Content-Length':
metadata['Content-Length'] = fsize-1
object_server.write_metadata(fd, metadata)
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
obj_name, FakeLogger(),
keep_data_fp=True, disk_chunk_size=csize)
if invalid_type == 'Zero-Byte':
os.remove(df.data_file)
fp = open(df.data_file,'w')
fp.close()
df.unit_test_len = fsize
return df
def test_quarantine_valids(self):
df = self._get_data_file(obj_name='1')
for chunk in df:
pass
self.assertFalse(df.quarantined_dir)
df = self._get_data_file(obj_name='2', csize=1)
for chunk in df:
pass
self.assertFalse(df.quarantined_dir)
df = self._get_data_file(obj_name='3', csize=100000)
for chunk in df:
pass
self.assertFalse(df.quarantined_dir)
def run_quarantine_invalids(self, invalid_type):
df = self._get_data_file(invalid_type=invalid_type, obj_name='1')
for chunk in df:
pass
self.assertTrue(df.quarantined_dir)
df = self._get_data_file(invalid_type=invalid_type,
obj_name='2', csize=1)
for chunk in df:
pass
self.assertTrue(df.quarantined_dir)
df = self._get_data_file(invalid_type=invalid_type,
obj_name='3',csize=100000)
for chunk in df:
pass
self.assertTrue(df.quarantined_dir)
df = self._get_data_file(invalid_type=invalid_type, obj_name='4')
self.assertFalse(df.quarantined_dir)
df = self._get_data_file(invalid_type=invalid_type, obj_name='5')
for chunk in df.app_iter_range(0, df.unit_test_len):
pass
self.assertTrue(df.quarantined_dir)
df = self._get_data_file(invalid_type=invalid_type, obj_name='6')
for chunk in df.app_iter_range(0, df.unit_test_len + 100):
pass
self.assertTrue(df.quarantined_dir)
expected_quar = False
# for the following, Content-Length/Zero-Byte errors will always result
# in a quarantine, even if the whole file isn't check-summed
if invalid_type in ('Zero-Byte', 'Content-Length'):
expected_quar = True
df = self._get_data_file(invalid_type=invalid_type, obj_name='7')
for chunk in df.app_iter_range(1, df.unit_test_len):
pass
self.assertEquals(bool(df.quarantined_dir), expected_quar)
df = self._get_data_file(invalid_type=invalid_type, obj_name='8')
for chunk in df.app_iter_range(0, df.unit_test_len - 1):
pass
self.assertEquals(bool(df.quarantined_dir), expected_quar)
df = self._get_data_file(invalid_type=invalid_type, obj_name='8')
for chunk in df.app_iter_range(1, df.unit_test_len + 1):
pass
self.assertEquals(bool(df.quarantined_dir), expected_quar)
def test_quarantine_invalids(self):
self.run_quarantine_invalids('ETag')
self.run_quarantine_invalids('Content-Length')
self.run_quarantine_invalids('Zero-Byte')
def test_quarantine_deleted_files(self):
df = self._get_data_file(invalid_type='Content-Length',
extension='.data')
df.close()
self.assertTrue(df.quarantined_dir)
df = self._get_data_file(invalid_type='Content-Length',
extension='.ts')
df.close()
self.assertFalse(df.quarantined_dir)
df = self._get_data_file(invalid_type='Content-Length',
extension='.ts')
self.assertRaises(DiskFileNotExist, df.get_data_file_size)
def test_unlinkold(self): def test_unlinkold(self):
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'ob') df1 = self._get_data_file()
future_time = str(normalize_timestamp(time()+100))
df2 = self._get_data_file(ts=future_time)
self.assertEquals(len(os.listdir(df1.datadir)), 2)
df1.unlinkold(future_time)
self.assertEquals(len(os.listdir(df1.datadir)), 1)
self.assertEquals(os.listdir(df1.datadir)[0], "%s.data" % future_time)
def test_close_error(self):
def err():
raise Exception("bad")
df = self._get_data_file(fsize=1024*1024*2)
df._handle_close_quarantine = err
for chunk in df:
pass
# close is called at the end of the iterator
self.assertEquals(df.fp, None)
self.assertEquals(len(df.logger.log_dict['error']), 1)
def test_quarantine_twice(self):
df = self._get_data_file(invalid_type='Content-Length',
extension='.data')
self.assert_(os.path.isfile(df.data_file))
quar_dir = df.quarantine()
self.assertFalse(os.path.isfile(df.data_file))
self.assert_(os.path.isdir(quar_dir))
self.assertEquals(df.quarantine(), None)
class TestObjectController(unittest.TestCase): class TestObjectController(unittest.TestCase):
@ -251,6 +409,37 @@ class TestObjectController(unittest.TestCase):
finally: finally:
object_server.http_connect = old_http_connect object_server.http_connect = old_http_connect
def test_POST_quarantine_zbyte(self):
""" Test swift.object_server.ObjectController.GET """
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
'Content-Type': 'application/x-test'})
req.body = 'VERIFY'
resp = self.object_controller.PUT(req)
self.assertEquals(resp.status_int, 201)
file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
file_name = os.path.basename(file.data_file)
with open(file.data_file) as fp:
metadata = object_server.read_metadata(fp)
os.unlink(file.data_file)
with open(file.data_file,'w') as fp:
object_server.write_metadata(fp, metadata)
self.assertEquals(os.listdir(file.datadir)[0], file_name)
req = Request.blank('/sda1/p/a/c/o',
headers={'X-Timestamp': normalize_timestamp(time())})
resp = self.object_controller.POST(req)
self.assertEquals(resp.status_int, 404)
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects',
os.path.basename(os.path.dirname(file.data_file)))
self.assertEquals(os.listdir(quar_dir)[0], file_name)
def test_PUT_invalid_path(self): def test_PUT_invalid_path(self):
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}) req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'})
resp = self.object_controller.PUT(req) resp = self.object_controller.PUT(req)
@ -520,6 +709,34 @@ class TestObjectController(unittest.TestCase):
resp = self.object_controller.HEAD(req) resp = self.object_controller.HEAD(req)
self.assertEquals(resp.status_int, 404) self.assertEquals(resp.status_int, 404)
def test_HEAD_quarantine_zbyte(self):
""" Test swift.object_server.ObjectController.GET """
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
'Content-Type': 'application/x-test'})
req.body = 'VERIFY'
resp = self.object_controller.PUT(req)
self.assertEquals(resp.status_int, 201)
file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
file_name = os.path.basename(file.data_file)
with open(file.data_file) as fp:
metadata = object_server.read_metadata(fp)
os.unlink(file.data_file)
with open(file.data_file,'w') as fp:
object_server.write_metadata(fp, metadata)
self.assertEquals(os.listdir(file.datadir)[0], file_name)
req = Request.blank('/sda1/p/a/c/o')
resp = self.object_controller.HEAD(req)
self.assertEquals(resp.status_int, 404)
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects',
os.path.basename(os.path.dirname(file.data_file)))
self.assertEquals(os.listdir(quar_dir)[0], file_name)
def test_GET(self): def test_GET(self):
""" Test swift.object_server.ObjectController.GET """ """ Test swift.object_server.ObjectController.GET """
req = Request.blank('/sda1/p/a/c') req = Request.blank('/sda1/p/a/c')
@ -781,6 +998,117 @@ class TestObjectController(unittest.TestCase):
resp = self.object_controller.GET(req) resp = self.object_controller.GET(req)
self.assertEquals(resp.status_int, 200) self.assertEquals(resp.status_int, 200)
def test_GET_quarantine(self):
""" Test swift.object_server.ObjectController.GET """
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
'Content-Type': 'application/x-test'})
req.body = 'VERIFY'
resp = self.object_controller.PUT(req)
self.assertEquals(resp.status_int, 201)
file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
file_name = os.path.basename(file.data_file)
etag = md5()
etag.update('VERIF')
etag = etag.hexdigest()
metadata = {'X-Timestamp': timestamp,
'Content-Length': 6, 'ETag': etag}
object_server.write_metadata(file.fp, metadata)
self.assertEquals(os.listdir(file.datadir)[0], file_name)
req = Request.blank('/sda1/p/a/c/o')
resp = self.object_controller.GET(req)
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects',
os.path.basename(os.path.dirname(file.data_file)))
self.assertEquals(os.listdir(file.datadir)[0], file_name)
body = resp.body # actually does quarantining
self.assertEquals(body, 'VERIFY')
self.assertEquals(os.listdir(quar_dir)[0], file_name)
req = Request.blank('/sda1/p/a/c/o')
resp = self.object_controller.GET(req)
self.assertEquals(resp.status_int, 404)
def test_GET_quarantine_zbyte(self):
""" Test swift.object_server.ObjectController.GET """
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
'Content-Type': 'application/x-test'})
req.body = 'VERIFY'
resp = self.object_controller.PUT(req)
self.assertEquals(resp.status_int, 201)
file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
file_name = os.path.basename(file.data_file)
with open(file.data_file) as fp:
metadata = object_server.read_metadata(fp)
os.unlink(file.data_file)
with open(file.data_file,'w') as fp:
object_server.write_metadata(fp, metadata)
self.assertEquals(os.listdir(file.datadir)[0], file_name)
req = Request.blank('/sda1/p/a/c/o')
resp = self.object_controller.GET(req)
self.assertEquals(resp.status_int, 404)
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects',
os.path.basename(os.path.dirname(file.data_file)))
self.assertEquals(os.listdir(quar_dir)[0], file_name)
def test_GET_quarantine_range(self):
""" Test swift.object_server.ObjectController.GET """
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
'Content-Type': 'application/x-test'})
req.body = 'VERIFY'
resp = self.object_controller.PUT(req)
self.assertEquals(resp.status_int, 201)
file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
file_name = os.path.basename(file.data_file)
etag = md5()
etag.update('VERIF')
etag = etag.hexdigest()
metadata = {'X-Timestamp': timestamp,
'Content-Length': 6, 'ETag': etag}
object_server.write_metadata(file.fp, metadata)
self.assertEquals(os.listdir(file.datadir)[0], file_name)
req = Request.blank('/sda1/p/a/c/o')
req.range = 'bytes=0-4' # partial
resp = self.object_controller.GET(req)
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects',
os.path.basename(os.path.dirname(file.data_file)))
body = resp.body
self.assertEquals(os.listdir(file.datadir)[0], file_name)
self.assertFalse(os.path.isdir(quar_dir))
req = Request.blank('/sda1/p/a/c/o')
resp = self.object_controller.GET(req)
self.assertEquals(resp.status_int, 200)
req = Request.blank('/sda1/p/a/c/o')
req.range = 'bytes=1-6' # partial
resp = self.object_controller.GET(req)
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects',
os.path.basename(os.path.dirname(file.data_file)))
body = resp.body
self.assertEquals(os.listdir(file.datadir)[0], file_name)
self.assertFalse(os.path.isdir(quar_dir))
req = Request.blank('/sda1/p/a/c/o')
req.range = 'bytes=0-14' # full
resp = self.object_controller.GET(req)
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects',
os.path.basename(os.path.dirname(file.data_file)))
self.assertEquals(os.listdir(file.datadir)[0], file_name)
body = resp.body
self.assertTrue(os.path.isdir(quar_dir))
req = Request.blank('/sda1/p/a/c/o')
resp = self.object_controller.GET(req)
self.assertEquals(resp.status_int, 404)
def test_DELETE(self): def test_DELETE(self):
""" Test swift.object_server.ObjectController.DELETE """ """ Test swift.object_server.ObjectController.DELETE """
req = Request.blank('/sda1/p/a/c', req = Request.blank('/sda1/p/a/c',