diff --git a/swift/common/direct_client.py b/swift/common/direct_client.py index 68334a3ed4..d7fde1f221 100644 --- a/swift/common/direct_client.py +++ b/swift/common/direct_client.py @@ -183,7 +183,7 @@ def direct_head_object(node, part, account, container, obj, conn_timeout=5, def direct_get_object(node, part, account, container, obj, conn_timeout=5, - response_timeout=15, resp_chunk_size=None): + response_timeout=15, resp_chunk_size=None, headers={}): """ Get object directly from the object server. @@ -195,13 +195,14 @@ def direct_get_object(node, part, account, container, obj, conn_timeout=5, :param conn_timeout: timeout in seconds for establishing the connection :param response_timeout: timeout in seconds for getting the response :param resp_chunk_size: if defined, chunk size of data to read. + :param headers: dict to be passed into HTTPConnection headers :returns: a tuple of (response headers, the object's contents) The response headers will be a dict and all header names will be lowercase. """ path = '/%s/%s/%s' % (account, container, obj) with Timeout(conn_timeout): conn = http_connect(node['ip'], node['port'], node['device'], part, - 'GET', path) + 'GET', path, headers=headers) with Timeout(response_timeout): resp = conn.getresponse() if resp.status < 200 or resp.status >= 300: @@ -286,6 +287,40 @@ def direct_put_object(node, part, account, container, name, contents, return resp.getheader('etag').strip('"') +def direct_post_object(node, part, account, container, name, headers, + conn_timeout=5, response_timeout=15): + """ + Direct update to object metadata on object server. + + :param node: node dictionary from the ring + :param part: partition the container is on + :param account: account name + :param container: container name + :param name: object name + :param headers: headers to store as metadata + :param conn_timeout: timeout in seconds for establishing the connection + :param response_timeout: timeout in seconds for getting the response + :raises ClientException: HTTP POST request failed + """ + path = '/%s/%s/%s' % (account, container, name) + headers['X-Timestamp'] = normalize_timestamp(time()) + with Timeout(conn_timeout): + conn = http_connect(node['ip'], node['port'], node['device'], part, + 'POST', path, headers=headers) + with Timeout(response_timeout): + resp = conn.getresponse() + resp.read() + if resp.status < 200 or resp.status >= 300: + raise ClientException( + 'Object server %s:%s direct POST %s gave status %s' % + (node['ip'], node['port'], + repr('/%s/%s%s' % (node['device'], part, path)), + resp.status), + http_host=node['ip'], http_port=node['port'], + http_device=node['device'], http_status=resp.status, + http_reason=resp.reason) + + def direct_delete_object(node, part, account, container, obj, conn_timeout=5, response_timeout=15, headers={}): """ diff --git a/swift/common/exceptions.py b/swift/common/exceptions.py index 81f544bfae..30af79b722 100644 --- a/swift/common/exceptions.py +++ b/swift/common/exceptions.py @@ -30,6 +30,14 @@ class AuditException(Exception): pass +class DiskFileError(Exception): + pass + + +class DiskFileNotExist(Exception): + pass + + class AuthException(Exception): pass diff --git a/swift/common/utils.py b/swift/common/utils.py index 41874e8073..8d31b69512 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -179,9 +179,9 @@ def mkdirs(path): raise -def renamer(old, new): # pragma: no cover +def renamer(old, new): """ - Attempt to fix^H^H^Hhide race conditions like empty object directories + Attempt to fix / hide race conditions like empty object directories being removed by backend processes during uploads, by retrying. :param old: old path to be renamed @@ -567,7 +567,7 @@ def storage_directory(datadir, partition, hash): :param hash: Account, container or object hash :returns: Storage directory """ - return os.path.join(datadir, partition, hash[-3:], hash) + return os.path.join(datadir, str(partition), hash[-3:], hash) def hash_path(account, container=None, object=None, raw_digest=False): diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py index c5c908c190..f7026b86d5 100644 --- a/swift/obj/auditor.py +++ b/swift/obj/auditor.py @@ -23,7 +23,8 @@ from swift.obj import server as object_server from swift.obj.replicator import invalidate_hash from swift.common.utils import get_logger, renamer, audit_location_generator, \ ratelimit_sleep, TRUE_VALUES -from swift.common.exceptions import AuditException +from swift.common.exceptions import AuditException, DiskFileError, \ + DiskFileNotExist from swift.common.daemon import Daemon SLEEP_BETWEEN_AUDITS = 30 @@ -119,47 +120,39 @@ class AuditorWorker(object): except Exception, exc: raise AuditException('Error when reading metadata: %s' % exc) _junk, account, container, obj = name.split('/', 3) - df = object_server.DiskFile(self.devices, device, - partition, account, - container, obj, + df = object_server.DiskFile(self.devices, device, partition, + account, container, obj, self.logger, keep_data_fp=True) if df.data_file is None: # file is deleted, we found the tombstone return - obj_size = os.path.getsize(df.data_file) - if obj_size != int(df.metadata['Content-Length']): - raise AuditException('Content-Length of %s does not match ' - 'file size of %s' % (int(df.metadata['Content-Length']), - os.path.getsize(df.data_file))) + try: + obj_size = df.get_data_file_size() + except DiskFileError, e: + raise AuditException(str(e)) + except DiskFileNotExist: + return if self.zero_byte_only_at_fps and obj_size: return - etag = md5() for chunk in df: self.bytes_running_time = ratelimit_sleep( self.bytes_running_time, self.max_bytes_per_second, incr_by=len(chunk)) - etag.update(chunk) self.bytes_processed += len(chunk) self.total_bytes_processed += len(chunk) - etag = etag.hexdigest() - if etag != df.metadata['ETag']: - raise AuditException("ETag of %s does not match file's md5 of " - "%s" % (df.metadata['ETag'], etag)) + df.close() + if df.quarantined_dir: + self.quarantines += 1 + self.logger.error( + _("ERROR Object %(path)s failed audit and will be " + "quarantined: ETag and file's md5 do not match"), + {'path': path}) except AuditException, err: self.quarantines += 1 self.logger.error(_('ERROR Object %(obj)s failed audit and will ' 'be quarantined: %(err)s'), {'obj': path, 'err': err}) - object_dir = os.path.dirname(path) - invalidate_hash(os.path.dirname(object_dir)) - renamer_path = os.path.dirname(path) - to_path = os.path.join(self.devices, device, 'quarantined', - 'objects', os.path.basename(renamer_path)) - try: - renamer(renamer_path, to_path) - except OSError, e: - if e.errno == errno.EEXIST: - to_path = "%s-%s" % (to_path, uuid.uuid4().hex) - renamer(renamer_path, to_path) + object_server.quarantine_renamer( + os.path.join(self.devices, device), path) return except Exception: self.errors += 1 diff --git a/swift/obj/server.py b/swift/obj/server.py index 3d4843b785..74c6d2bded 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -21,6 +21,7 @@ import errno import os import time import traceback +import uuid from datetime import datetime from hashlib import md5 from tempfile import mkstemp @@ -41,7 +42,8 @@ from swift.common.utils import mkdirs, normalize_timestamp, \ from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_object_creation, check_mount, \ check_float, check_utf8 -from swift.common.exceptions import ConnectionTimeout +from swift.common.exceptions import ConnectionTimeout, DiskFileError, \ + DiskFileNotExist from swift.obj.replicator import get_hashes, invalidate_hash @@ -89,6 +91,32 @@ def write_metadata(fd, metadata): key += 1 +def quarantine_renamer(device_path, corrupted_file_path): + """ + In the case that a file is corrupted, move it to a quarantined + area to allow replication to fix it. + + :params device_path: The path to the device the corrupted file is on. + :params corrupted_file_path: The path to the file you want quarantined. + + :returns: path (str) of directory the file was moved to + :raises OSError: re-raises non errno.EEXIST / errno.ENOTEMPTY + exceptions from rename + """ + from_dir = os.path.dirname(corrupted_file_path) + to_dir = os.path.join(device_path, 'quarantined', + 'objects', os.path.basename(from_dir)) + invalidate_hash(os.path.dirname(from_dir)) + try: + renamer(from_dir, to_dir) + except OSError, e: + if e.errno not in (errno.EEXIST, errno.ENOTEMPTY): + raise + to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex) + renamer(from_dir, to_dir) + return to_dir + + class DiskFile(object): """ Manage object files on disk. @@ -104,17 +132,23 @@ class DiskFile(object): """ def __init__(self, path, device, partition, account, container, obj, - keep_data_fp=False, disk_chunk_size=65536): + logger, keep_data_fp=False, disk_chunk_size=65536): self.disk_chunk_size = disk_chunk_size self.name = '/' + '/'.join((account, container, obj)) name_hash = hash_path(account, container, obj) self.datadir = os.path.join(path, device, storage_directory(DATADIR, partition, name_hash)) + self.device_path = os.path.join(path, device) self.tmpdir = os.path.join(path, device, 'tmp') + self.logger = logger self.metadata = {} self.meta_file = None self.data_file = None self.fp = None + self.iter_etag = None + self.started_at_0 = False + self.read_to_eof = False + self.quarantined_dir = None self.keep_cache = False if not os.path.exists(self.datadir): return @@ -134,7 +168,7 @@ class DiskFile(object): self.fp = open(self.data_file, 'rb') self.metadata = read_metadata(self.fp) if not keep_data_fp: - self.close() + self.close(verify_file=False) if self.meta_file: with open(self.meta_file) as mfp: for key in self.metadata.keys(): @@ -147,9 +181,16 @@ class DiskFile(object): try: dropped_cache = 0 read = 0 + self.started_at_0 = False + self.read_to_eof = False + if self.fp.tell() == 0: + self.started_at_0 = True + self.iter_etag = md5() while True: chunk = self.fp.read(self.disk_chunk_size) if chunk: + if self.iter_etag: + self.iter_etag.update(chunk) read += len(chunk) if read - dropped_cache > (1024 * 1024): self.drop_cache(self.fp.fileno(), dropped_cache, @@ -157,6 +198,7 @@ class DiskFile(object): dropped_cache = read yield chunk else: + self.read_to_eof = True self.drop_cache(self.fp.fileno(), dropped_cache, read - dropped_cache) break @@ -180,11 +222,41 @@ class DiskFile(object): break yield chunk - def close(self): - """Close the file.""" + def _handle_close_quarantine(self): + """Check if file needs to be quarantined""" + try: + obj_size = self.get_data_file_size() + except DiskFileError, e: + self.quarantine() + return + except DiskFileNotExist: + return + + if (self.iter_etag and self.started_at_0 and self.read_to_eof and + 'ETag' in self.metadata and + self.iter_etag.hexdigest() != self.metadata.get('ETag')): + self.quarantine() + + def close(self, verify_file=True): + """ + Close the file. Will handle quarantining file if necessary. + + :param verify_file: Defaults to True. If false, will not check + file to see if it needs quarantining. + """ if self.fp: - self.fp.close() - self.fp = None + try: + if verify_file: + self._handle_close_quarantine() + except Exception, e: + import traceback + self.logger.error(_('ERROR DiskFile %(data_file)s in ' + '%(data_dir)s close failure: %(exc)s : %(stack)'), + {'exc': e, 'stack': ''.join(traceback.format_stack()), + 'data_file': self.data_file, 'data_dir': self.datadir}) + finally: + self.fp.close() + self.fp = None def is_deleted(self): """ @@ -255,6 +327,44 @@ class DiskFile(object): if not self.keep_cache: drop_buffer_cache(fd, offset, length) + def quarantine(self): + """ + In the case that a file is corrupted, move it to a quarantined + area to allow replication to fix it. + + :returns: if quarantine is successful, path to quarantined + directory otherwise None + """ + if not (self.is_deleted() or self.quarantined_dir): + self.quarantined_dir = quarantine_renamer(self.device_path, + self.data_file) + return self.quarantined_dir + + def get_data_file_size(self): + """ + Returns the os.path.getsize for the file. Raises an exception if this + file does not match the Content-Length stored in the metadata. Or if + self.data_file does not exist. + + :returns: file size as an int + :raises DiskFileError: on file size mismatch. + :raises DiskFileNotExist: on file not existing (including deleted) + """ + try: + file_size = 0 + if self.data_file: + file_size = os.path.getsize(self.data_file) + if 'Content-Length' in self.metadata: + metadata_size = int(self.metadata['Content-Length']) + if file_size != metadata_size: + raise DiskFileError('Content-Length of %s does not ' + 'match file size of %s' % (metadata_size, file_size)) + return file_size + except OSError, err: + if err.errno != errno.ENOENT: + raise + raise DiskFileNotExist('Data File does not exist.') + class ObjectController(object): """Implements the WSGI application for the Swift Object Server.""" @@ -349,13 +459,17 @@ class ObjectController(object): if self.mount_check and not check_mount(self.devices, device): return Response(status='507 %s is not mounted' % device) file = DiskFile(self.devices, device, partition, account, container, - obj, disk_chunk_size=self.disk_chunk_size) + obj, self.logger, disk_chunk_size=self.disk_chunk_size) if file.is_deleted(): response_class = HTTPNotFound else: response_class = HTTPAccepted - + try: + file_size = file.get_data_file_size() + except (DiskFileError, DiskFileNotExist): + file.quarantine() + return HTTPNotFound(request=request) metadata = {'X-Timestamp': request.headers['x-timestamp']} metadata.update(val for val in request.headers.iteritems() if val[0].lower().startswith('x-object-meta-')) @@ -385,7 +499,7 @@ class ObjectController(object): if error_response: return error_response file = DiskFile(self.devices, device, partition, account, container, - obj, disk_chunk_size=self.disk_chunk_size) + obj, self.logger, disk_chunk_size=self.disk_chunk_size) upload_expiration = time.time() + self.max_upload_time etag = md5() upload_size = 0 @@ -451,12 +565,18 @@ class ObjectController(object): if self.mount_check and not check_mount(self.devices, device): return Response(status='507 %s is not mounted' % device) file = DiskFile(self.devices, device, partition, account, container, - obj, keep_data_fp=True, disk_chunk_size=self.disk_chunk_size) + obj, self.logger, keep_data_fp=True, + disk_chunk_size=self.disk_chunk_size) if file.is_deleted(): if request.headers.get('if-match') == '*': return HTTPPreconditionFailed(request=request) else: return HTTPNotFound(request=request) + try: + file_size = file.get_data_file_size() + except (DiskFileError, DiskFileNotExist): + file.quarantine() + return HTTPNotFound(request=request) if request.headers.get('if-match') not in (None, '*') and \ file.metadata['ETag'] not in request.if_match: file.close() @@ -496,7 +616,7 @@ class ObjectController(object): response.headers[key] = value response.etag = file.metadata['ETag'] response.last_modified = float(file.metadata['X-Timestamp']) - response.content_length = int(file.metadata['Content-Length']) + response.content_length = file_size if response.content_length < KEEP_CACHE_SIZE and \ 'X-Auth-Token' not in request.headers and \ 'X-Storage-Token' not in request.headers: @@ -518,9 +638,14 @@ class ObjectController(object): if self.mount_check and not check_mount(self.devices, device): return Response(status='507 %s is not mounted' % device) file = DiskFile(self.devices, device, partition, account, container, - obj, disk_chunk_size=self.disk_chunk_size) + obj, self.logger, disk_chunk_size=self.disk_chunk_size) if file.is_deleted(): return HTTPNotFound(request=request) + try: + file_size = file.get_data_file_size() + except (DiskFileError, DiskFileNotExist): + file.quarantine() + return HTTPNotFound(request=request) response = Response(content_type=file.metadata['Content-Type'], request=request, conditional_response=True) for key, value in file.metadata.iteritems(): @@ -529,7 +654,7 @@ class ObjectController(object): response.headers[key] = value response.etag = file.metadata['ETag'] response.last_modified = float(file.metadata['X-Timestamp']) - response.content_length = int(file.metadata['Content-Length']) + response.content_length = file_size if 'Content-Encoding' in file.metadata: response.content_encoding = file.metadata['Content-Encoding'] return response @@ -550,7 +675,7 @@ class ObjectController(object): return Response(status='507 %s is not mounted' % device) response_class = HTTPNoContent file = DiskFile(self.devices, device, partition, account, container, - obj, disk_chunk_size=self.disk_chunk_size) + obj, self.logger, disk_chunk_size=self.disk_chunk_size) if file.is_deleted(): response_class = HTTPNotFound metadata = { diff --git a/test/probe/test_object_failures.py b/test/probe/test_object_failures.py new file mode 100755 index 0000000000..1d1627cce3 --- /dev/null +++ b/test/probe/test_object_failures.py @@ -0,0 +1,176 @@ +#!/usr/bin/python -u +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import os +from uuid import uuid4 + +from swift.common import client, direct_client +from swift.common.utils import hash_path, readconf +from swift.obj.server import write_metadata, read_metadata +from test.probe.common import kill_pids, reset_environment + + +class TestObjectFailures(unittest.TestCase): + + def setUp(self): + self.pids, self.port2server, self.account_ring, self.container_ring, \ + self.object_ring, self.url, self.token, self.account = \ + reset_environment() + + def tearDown(self): + kill_pids(self.pids) + + def _get_data_file_path(self, obj_dir): + files = sorted(os.listdir(obj_dir), reverse=True) + for file in files: + return os.path.join(obj_dir, file) + + def _setup_data_file(self, container, obj, data): + client.put_container(self.url, self.token, container) + client.put_object(self.url, self.token, container, obj, data) + odata = client.get_object(self.url, self.token, container, obj)[-1] + self.assertEquals(odata, data) + opart, onodes = self.object_ring.get_nodes( + self.account, container, obj) + onode = onodes[0] + node_id = (onode['port'] - 6000) / 10 + device = onode['device'] + hash_str = hash_path(self.account, container, obj) + obj_server_conf = readconf('/etc/swift/object-server/%s.conf' % + node_id) + devices = obj_server_conf['app:object-server']['devices'] + obj_dir = '%s/%s/objects/%s/%s/%s/' % (devices, + device, opart, + hash_str[-3:], hash_str) + data_file = self._get_data_file_path(obj_dir) + return onode, opart, data_file + + def run_quarantine(self): + container = 'container-%s' % uuid4() + obj = 'object-%s' % uuid4() + onode, opart, data_file = self._setup_data_file(container, obj, + 'VERIFY') + with open(data_file) as fp: + metadata = read_metadata(fp) + metadata['ETag'] = 'badetag' + with open(data_file) as fp: + write_metadata(fp, metadata) + + odata = direct_client.direct_get_object(onode, opart, + self.account, container, obj)[-1] + self.assertEquals(odata, 'VERIFY') + try: + resp = direct_client.direct_get_object(onode, opart, self.account, + container, obj) + raise "Did not quarantine object" + except client.ClientException, e: + self.assertEquals(e.http_status, 404) + + def run_quarantine_range_etag(self): + container = 'container-range-%s' % uuid4() + obj = 'object-range-%s' % uuid4() + onode, opart, data_file = self._setup_data_file(container, obj, + 'RANGE') + with open(data_file) as fp: + metadata = read_metadata(fp) + metadata['ETag'] = 'badetag' + with open(data_file) as fp: + write_metadata(fp, metadata) + for header, result in [({'Range': 'bytes=0-2'}, 'RAN'), + ({'Range': 'bytes=1-11'}, 'ANGE'), + ({'Range': 'bytes=0-11'}, 'RANGE')]: + odata = direct_client.direct_get_object(onode, opart, + self.account, container, obj, + headers=header)[-1] + + self.assertEquals(odata, result) + + try: + resp = direct_client.direct_get_object(onode, opart, self.account, + container, obj) + raise "Did not quarantine object" + except client.ClientException, e: + self.assertEquals(e.http_status, 404) + + def run_quarantine_zero_byte_get(self): + container = 'container-zbyte-%s' % uuid4() + obj = 'object-zbyte-%s' % uuid4() + onode, opart, data_file = self._setup_data_file(container, obj, 'DATA') + with open(data_file) as fp: + metadata = read_metadata(fp) + os.unlink(data_file) + + with open(data_file, 'w') as fp: + write_metadata(fp, metadata) + try: + resp = direct_client.direct_get_object(onode, opart, self.account, + container, obj, + conn_timeout=1, + response_timeout=1) + raise "Did not quarantine object" + except client.ClientException, e: + self.assertEquals(e.http_status, 404) + + def run_quarantine_zero_byte_head(self): + container = 'container-zbyte-%s' % uuid4() + obj = 'object-zbyte-%s' % uuid4() + onode, opart, data_file = self._setup_data_file(container, obj, 'DATA') + with open(data_file) as fp: + metadata = read_metadata(fp) + os.unlink(data_file) + + with open(data_file, 'w') as fp: + write_metadata(fp, metadata) + try: + resp = direct_client.direct_head_object(onode, opart, self.account, + container, obj, + conn_timeout=1, + response_timeout=1) + raise "Did not quarantine object" + except client.ClientException, e: + self.assertEquals(e.http_status, 404) + + def run_quarantine_zero_byte_post(self): + container = 'container-zbyte-%s' % uuid4() + obj = 'object-zbyte-%s' % uuid4() + onode, opart, data_file = self._setup_data_file(container, obj, 'DATA') + with open(data_file) as fp: + metadata = read_metadata(fp) + os.unlink(data_file) + + with open(data_file, 'w') as fp: + write_metadata(fp, metadata) + try: + resp = direct_client.direct_post_object( + onode, opart, self.account, + container, obj, + {'X-Object-Meta-1': 'One', 'X-Object-Meta-Two': 'Two'}, + conn_timeout=1, + response_timeout=1) + raise "Did not quarantine object" + except client.ClientException, e: + self.assertEquals(e.http_status, 404) + + def test_runner(self): + self.run_quarantine() + self.run_quarantine_range_etag() + self.run_quarantine_zero_byte_get() + self.run_quarantine_zero_byte_head() + self.run_quarantine_zero_byte_post() + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/__init__.py b/test/unit/__init__.py index 005aabd3db..6b5b7b4d70 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -91,6 +91,22 @@ def temptree(files, contents=''): rmtree(tempdir) +class FakeLogger(object): + # a thread safe logger + + def __init__(self): + self.log_dict = dict(error=[], info=[], warning=[]) + + def error(self, *args, **kwargs): + self.log_dict['error'].append((args, kwargs)) + + def info(self, *args, **kwargs): + self.log_dict['info'].append((args, kwargs)) + + def warning(self, *args, **kwargs): + self.log_dict['warning'].append((args, kwargs)) + + class MockTrue(object): """ Instances of MockTrue evaluate like True diff --git a/test/unit/common/middleware/test_ratelimit.py b/test/unit/common/middleware/test_ratelimit.py index 4afefb0351..257033dc11 100644 --- a/test/unit/common/middleware/test_ratelimit.py +++ b/test/unit/common/middleware/test_ratelimit.py @@ -19,6 +19,7 @@ from contextlib import contextmanager from threading import Thread from webob import Request +from test.unit import FakeLogger from swift.common.middleware import ratelimit from swift.proxy.server import get_container_memcache_key from swift.common.memcached import MemcacheConnectionError @@ -96,19 +97,6 @@ class FakeApp(object): return ['204 No Content'] -class FakeLogger(object): - # a thread safe logger - - def error(self, *args, **kwargs): - pass - - def info(self, *args, **kwargs): - pass - - def warning(self, *args, **kwargs): - pass - - def start_response(*args): pass diff --git a/test/unit/obj/test_auditor.py b/test/unit/obj/test_auditor.py index d50f6158d7..0aa05bcce4 100644 --- a/test/unit/obj/test_auditor.py +++ b/test/unit/obj/test_auditor.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -# TODO: Tests from test import unit import unittest import tempfile @@ -22,6 +21,7 @@ import time from shutil import rmtree from hashlib import md5 from tempfile import mkdtemp +from test.unit import FakeLogger from swift.obj import auditor from swift.obj import server as object_server from swift.obj.server import DiskFile, write_metadata, DATADIR @@ -34,13 +34,11 @@ from swift.common.exceptions import AuditException class TestAuditor(unittest.TestCase): def setUp(self): - self.testdir = \ - os.path.join(mkdtemp(), 'tmp_test_object_auditor') + self.testdir = os.path.join(mkdtemp(), 'tmp_test_object_auditor') self.devices = os.path.join(self.testdir, 'node') + self.logger = FakeLogger() rmtree(self.testdir, ignore_errors=1) - os.mkdir(self.testdir) - os.mkdir(self.devices) - os.mkdir(os.path.join(self.devices, 'sda')) + mkdirs(os.path.join(self.devices, 'sda')) self.objects = os.path.join(self.devices, 'sda', 'objects') os.mkdir(os.path.join(self.devices, 'sdb')) @@ -55,6 +53,8 @@ class TestAuditor(unittest.TestCase): self.conf = dict( devices=self.devices, mount_check='false') + self.disk_file = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', + self.logger) def tearDown(self): rmtree(os.path.dirname(self.testdir), ignore_errors=1) @@ -62,11 +62,9 @@ class TestAuditor(unittest.TestCase): def test_object_audit_extra_data(self): self.auditor = auditor.AuditorWorker(self.conf) - cur_part = '0' - disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') data = '0' * 1024 etag = md5() - with disk_file.mkstemp() as (fd, tmppath): + with self.disk_file.mkstemp() as (fd, tmppath): os.write(fd, data) etag.update(data) etag = etag.hexdigest() @@ -76,28 +74,26 @@ class TestAuditor(unittest.TestCase): 'X-Timestamp': timestamp, 'Content-Length': str(os.fstat(fd).st_size), } - disk_file.put(fd, tmppath, metadata) + self.disk_file.put(fd, tmppath, metadata) pre_quarantines = self.auditor.quarantines self.auditor.object_audit( - os.path.join(disk_file.datadir, timestamp + '.data'), - 'sda', cur_part) + os.path.join(self.disk_file.datadir, timestamp + '.data'), + 'sda', '0') self.assertEquals(self.auditor.quarantines, pre_quarantines) os.write(fd, 'extra_data') self.auditor.object_audit( - os.path.join(disk_file.datadir, timestamp + '.data'), - 'sda', cur_part) + os.path.join(self.disk_file.datadir, timestamp + '.data'), + 'sda', '0') self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) def test_object_audit_diff_data(self): self.auditor = auditor.AuditorWorker(self.conf) - cur_part = '0' - disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') data = '0' * 1024 etag = md5() timestamp = str(normalize_timestamp(time.time())) - with disk_file.mkstemp() as (fd, tmppath): + with self.disk_file.mkstemp() as (fd, tmppath): os.write(fd, data) etag.update(data) etag = etag.hexdigest() @@ -106,12 +102,15 @@ class TestAuditor(unittest.TestCase): 'X-Timestamp': timestamp, 'Content-Length': str(os.fstat(fd).st_size), } - disk_file.put(fd, tmppath, metadata) + self.disk_file.put(fd, tmppath, metadata) pre_quarantines = self.auditor.quarantines + # remake so it will have metadata + self.disk_file = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', + self.logger) self.auditor.object_audit( - os.path.join(disk_file.datadir, timestamp + '.data'), - 'sda', cur_part) + os.path.join(self.disk_file.datadir, timestamp + '.data'), + 'sda', '0') self.assertEquals(self.auditor.quarantines, pre_quarantines) etag = md5() etag.update('1' + '0' * 1023) @@ -120,25 +119,23 @@ class TestAuditor(unittest.TestCase): write_metadata(fd, metadata) self.auditor.object_audit( - os.path.join(disk_file.datadir, timestamp + '.data'), - 'sda', cur_part) + os.path.join(self.disk_file.datadir, timestamp + '.data'), + 'sda', '0') self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) def test_object_audit_no_meta(self): - cur_part = '0' - disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') timestamp = str(normalize_timestamp(time.time())) - path = os.path.join(disk_file.datadir, timestamp + '.data') - mkdirs(disk_file.datadir) + path = os.path.join(self.disk_file.datadir, timestamp + '.data') + mkdirs(self.disk_file.datadir) fp = open(path, 'w') fp.write('0' * 1024) fp.close() - invalidate_hash(os.path.dirname(disk_file.datadir)) + invalidate_hash(os.path.dirname(self.disk_file.datadir)) self.auditor = auditor.AuditorWorker(self.conf) pre_quarantines = self.auditor.quarantines self.auditor.object_audit( - os.path.join(disk_file.datadir, timestamp + '.data'), - 'sda', cur_part) + os.path.join(self.disk_file.datadir, timestamp + '.data'), + 'sda', '0') self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) def test_object_audit_bad_args(self): @@ -153,13 +150,11 @@ class TestAuditor(unittest.TestCase): def test_object_run_once_pass(self): self.auditor = auditor.AuditorWorker(self.conf) self.auditor.log_time = 0 - cur_part = '0' timestamp = str(normalize_timestamp(time.time())) pre_quarantines = self.auditor.quarantines - disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') data = '0' * 1024 etag = md5() - with disk_file.mkstemp() as (fd, tmppath): + with self.disk_file.mkstemp() as (fd, tmppath): os.write(fd, data) etag.update(data) etag = etag.hexdigest() @@ -168,20 +163,18 @@ class TestAuditor(unittest.TestCase): 'X-Timestamp': timestamp, 'Content-Length': str(os.fstat(fd).st_size), } - disk_file.put(fd, tmppath, metadata) - disk_file.close() + self.disk_file.put(fd, tmppath, metadata) + self.disk_file.close() self.auditor.audit_all_objects() self.assertEquals(self.auditor.quarantines, pre_quarantines) def test_object_run_once_no_sda(self): self.auditor = auditor.AuditorWorker(self.conf) - cur_part = '0' timestamp = str(normalize_timestamp(time.time())) pre_quarantines = self.auditor.quarantines - disk_file = DiskFile(self.devices, 'sdb', cur_part, 'a', 'c', 'o') data = '0' * 1024 etag = md5() - with disk_file.mkstemp() as (fd, tmppath): + with self.disk_file.mkstemp() as (fd, tmppath): os.write(fd, data) etag.update(data) etag = etag.hexdigest() @@ -190,21 +183,19 @@ class TestAuditor(unittest.TestCase): 'X-Timestamp': timestamp, 'Content-Length': str(os.fstat(fd).st_size), } - disk_file.put(fd, tmppath, metadata) - disk_file.close() + self.disk_file.put(fd, tmppath, metadata) + self.disk_file.close() os.write(fd, 'extra_data') self.auditor.audit_all_objects() self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) def test_object_run_once_multi_devices(self): self.auditor = auditor.AuditorWorker(self.conf) - cur_part = '0' timestamp = str(normalize_timestamp(time.time())) pre_quarantines = self.auditor.quarantines - disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') data = '0' * 10 etag = md5() - with disk_file.mkstemp() as (fd, tmppath): + with self.disk_file.mkstemp() as (fd, tmppath): os.write(fd, data) etag.update(data) etag = etag.hexdigest() @@ -213,13 +204,14 @@ class TestAuditor(unittest.TestCase): 'X-Timestamp': timestamp, 'Content-Length': str(os.fstat(fd).st_size), } - disk_file.put(fd, tmppath, metadata) - disk_file.close() + self.disk_file.put(fd, tmppath, metadata) + self.disk_file.close() self.auditor.audit_all_objects() - disk_file = DiskFile(self.devices, 'sdb', cur_part, 'a', 'c', 'ob') + self.disk_file = DiskFile(self.devices, 'sdb', '0', 'a', 'c', + 'ob', self.logger) data = '1' * 10 etag = md5() - with disk_file.mkstemp() as (fd, tmppath): + with self.disk_file.mkstemp() as (fd, tmppath): os.write(fd, data) etag.update(data) etag = etag.hexdigest() @@ -228,8 +220,8 @@ class TestAuditor(unittest.TestCase): 'X-Timestamp': timestamp, 'Content-Length': str(os.fstat(fd).st_size), } - disk_file.put(fd, tmppath, metadata) - disk_file.close() + self.disk_file.put(fd, tmppath, metadata) + self.disk_file.close() os.write(fd, 'extra_data') self.auditor.audit_all_objects() self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) @@ -237,11 +229,9 @@ class TestAuditor(unittest.TestCase): def test_object_run_fast_track_non_zero(self): self.auditor = auditor.ObjectAuditor(self.conf) self.auditor.log_time = 0 - cur_part = '0' - disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') data = '0' * 1024 etag = md5() - with disk_file.mkstemp() as (fd, tmppath): + with self.disk_file.mkstemp() as (fd, tmppath): os.write(fd, data) etag.update(data) etag = etag.hexdigest() @@ -250,7 +240,7 @@ class TestAuditor(unittest.TestCase): 'X-Timestamp': str(normalize_timestamp(time.time())), 'Content-Length': str(os.fstat(fd).st_size), } - disk_file.put(fd, tmppath, metadata) + self.disk_file.put(fd, tmppath, metadata) etag = md5() etag.update('1' + '0' * 1023) etag = etag.hexdigest() @@ -267,36 +257,33 @@ class TestAuditor(unittest.TestCase): def setup_bad_zero_byte(self, with_ts=False): self.auditor = auditor.ObjectAuditor(self.conf) self.auditor.log_time = 0 - cur_part = '0' ts_file_path = '' if with_ts: name_hash = hash_path('a', 'c', 'o') dir_path = os.path.join(self.devices, 'sda', - storage_directory(DATADIR, cur_part, name_hash)) + storage_directory(DATADIR, '0', name_hash)) ts_file_path = os.path.join(dir_path, '99999.ts') if not os.path.exists(dir_path): mkdirs(dir_path) fp = open(ts_file_path, 'w') fp.close() - - disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') etag = md5() - with disk_file.mkstemp() as (fd, tmppath): + with self.disk_file.mkstemp() as (fd, tmppath): etag = etag.hexdigest() metadata = { 'ETag': etag, 'X-Timestamp': str(normalize_timestamp(time.time())), 'Content-Length': 10, } - disk_file.put(fd, tmppath, metadata) + self.disk_file.put(fd, tmppath, metadata) etag = md5() etag = etag.hexdigest() metadata['ETag'] = etag write_metadata(fd, metadata) - if disk_file.data_file: - return disk_file.data_file + if self.disk_file.data_file: + return self.disk_file.data_file return ts_file_path def test_object_run_fast_track_all(self): diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index 290077b32c..50dac19559 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -25,9 +25,8 @@ import fcntl import time import tempfile from contextlib import contextmanager -from eventlet import tpool - from eventlet.green import subprocess +from test.unit import FakeLogger from swift.common import utils from swift.common.utils import hash_path, mkdirs, normalize_timestamp from swift.common import ring @@ -164,7 +163,8 @@ class TestObjectReplicator(unittest.TestCase): was_connector = object_replicator.http_connect object_replicator.http_connect = mock_http_connect(200) cur_part = '0' - df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') + df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o', + FakeLogger()) mkdirs(df.datadir) f = open(os.path.join(df.datadir, normalize_timestamp(time.time()) + '.data'), @@ -189,7 +189,7 @@ class TestObjectReplicator(unittest.TestCase): object_replicator.http_connect = was_connector def test_get_hashes(self): - df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') + df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger()) mkdirs(df.datadir) with open(os.path.join(df.datadir, normalize_timestamp( time.time()) + '.ts'), 'wb') as f: @@ -206,7 +206,7 @@ class TestObjectReplicator(unittest.TestCase): self.assert_('a83' in hashes) def test_hash_suffix_one_file(self): - df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') + df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger()) mkdirs(df.datadir) f = open(os.path.join(df.datadir, normalize_timestamp(time.time() - 100) + '.ts'), @@ -223,7 +223,7 @@ class TestObjectReplicator(unittest.TestCase): self.assertEquals(len(os.listdir(self.parts['0'])), 0) def test_hash_suffix_multi_file_one(self): - df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') + df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger()) mkdirs(df.datadir) for tdiff in [1, 50, 100, 500]: for suff in ['.meta', '.data', '.ts']: @@ -244,7 +244,7 @@ class TestObjectReplicator(unittest.TestCase): self.assertEquals(len(os.listdir(whole_hsh_path)), 1) def test_hash_suffix_multi_file_two(self): - df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') + df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger()) mkdirs(df.datadir) for tdiff in [1, 50, 100, 500]: suffs = ['.meta', '.data'] @@ -274,7 +274,7 @@ class TestObjectReplicator(unittest.TestCase): fdata = fp.read() self.assertEquals(fdata, data) - df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') + df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger()) mkdirs(df.datadir) ohash = hash_path('a', 'c', 'o') data_dir = ohash[-3:] @@ -329,7 +329,7 @@ class TestObjectReplicator(unittest.TestCase): os.path.join(self.objects, part)) def test_delete_partition(self): - df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') + df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger()) mkdirs(df.datadir) ohash = hash_path('a', 'c', 'o') data_dir = ohash[-3:] @@ -347,7 +347,8 @@ class TestObjectReplicator(unittest.TestCase): # Write some files into '1' and run replicate- they should be moved # to the other partitoins and then node should get deleted. cur_part = '1' - df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') + df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o', + FakeLogger()) mkdirs(df.datadir) f = open(os.path.join(df.datadir, normalize_timestamp(time.time()) + '.data'), diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index 3b2e8f6a61..6628d7dd64 100644 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -18,34 +18,280 @@ import cPickle as pickle import os import sys +import shutil import unittest from nose import SkipTest from shutil import rmtree from StringIO import StringIO from time import gmtime, sleep, strftime, time from tempfile import mkdtemp +from hashlib import md5 from eventlet import sleep, spawn, wsgi, listen from webob import Request +from test.unit import FakeLogger from test.unit import _getxattr as getxattr from test.unit import _setxattr as setxattr - from test.unit import connect_tcp, readuntil2crlfs from swift.obj import server as object_server from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \ NullLogger, storage_directory +from swift.common.exceptions import DiskFileNotExist +from eventlet import tpool + + +class TestDiskFile(unittest.TestCase): + """Test swift.obj.server.DiskFile""" + + def setUp(self): + """ Set up for testing swift.object_server.ObjectController """ + self.testdir = os.path.join(mkdtemp(), 'tmp_test_obj_server_DiskFile') + mkdirs(os.path.join(self.testdir, 'sda1', 'tmp')) + + def fake_exe(*args, **kwargs): + pass + tpool.execute = fake_exe + + def tearDown(self): + """ Tear down for testing swift.object_server.ObjectController """ + rmtree(os.path.dirname(self.testdir)) + + def test_disk_file_app_iter_corners(self): + df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', + FakeLogger()) + mkdirs(df.datadir) + f = open(os.path.join(df.datadir, + normalize_timestamp(time()) + '.data'), 'wb') + f.write('1234567890') + setxattr(f.fileno(), object_server.METADATA_KEY, + pickle.dumps({}, object_server.PICKLE_PROTOCOL)) + f.close() + df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', + FakeLogger(), keep_data_fp=True) + it = df.app_iter_range(0, None) + sio = StringIO() + for chunk in it: + sio.write(chunk) + self.assertEquals(sio.getvalue(), '1234567890') + + df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', + FakeLogger(), keep_data_fp=True) + it = df.app_iter_range(5, None) + sio = StringIO() + for chunk in it: + sio.write(chunk) + self.assertEquals(sio.getvalue(), '67890') + + def test_disk_file_mkstemp_creates_dir(self): + tmpdir = os.path.join(self.testdir, 'sda1', 'tmp') + os.rmdir(tmpdir) + with object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', + 'o', FakeLogger()).mkstemp(): + self.assert_(os.path.exists(tmpdir)) + + def test_quarantine(self): + df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', + FakeLogger()) + mkdirs(df.datadir) + f = open(os.path.join(df.datadir, + normalize_timestamp(time()) + '.data'), 'wb') + setxattr(f.fileno(), object_server.METADATA_KEY, + pickle.dumps({}, object_server.PICKLE_PROTOCOL)) + df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', + FakeLogger()) + df.quarantine() + quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', + 'objects', os.path.basename(os.path.dirname( + df.data_file))) + self.assert_(os.path.isdir(quar_dir)) + + def test_quarantine_same_file(self): + df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', + FakeLogger()) + mkdirs(df.datadir) + f = open(os.path.join(df.datadir, + normalize_timestamp(time()) + '.data'), 'wb') + setxattr(f.fileno(), object_server.METADATA_KEY, + pickle.dumps({}, object_server.PICKLE_PROTOCOL)) + df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', + FakeLogger()) + new_dir = df.quarantine() + quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', + 'objects', os.path.basename(os.path.dirname( + df.data_file))) + self.assert_(os.path.isdir(quar_dir)) + self.assertEquals(quar_dir, new_dir) + # have to remake the datadir and file + mkdirs(df.datadir) + f = open(os.path.join(df.datadir, + normalize_timestamp(time()) + '.data'), 'wb') + setxattr(f.fileno(), object_server.METADATA_KEY, + pickle.dumps({}, object_server.PICKLE_PROTOCOL)) + + df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', + FakeLogger(), keep_data_fp=True) + double_uuid_path = df.quarantine() + self.assert_(os.path.isdir(double_uuid_path)) + self.assert_('-' in os.path.basename(double_uuid_path)) + + def _get_data_file(self, invalid_type=None, obj_name='o', + fsize=1024, csize=8, extension='.data', ts=None): + '''returns a DiskFile''' + df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', + obj_name, FakeLogger()) + data = '0' * fsize + etag = md5() + if ts: + timestamp = ts + else: + timestamp = str(normalize_timestamp(time())) + with df.mkstemp() as (fd, tmppath): + os.write(fd, data) + etag.update(data) + etag = etag.hexdigest() + metadata = { + 'ETag': etag, + 'X-Timestamp': timestamp, + 'Content-Length': str(os.fstat(fd).st_size), + } + df.put(fd, tmppath, metadata, extension=extension) + if invalid_type == 'ETag': + etag = md5() + etag.update('1' + '0' * (fsize-1)) + etag = etag.hexdigest() + metadata['ETag'] = etag + object_server.write_metadata(fd, metadata) + if invalid_type == 'Content-Length': + metadata['Content-Length'] = fsize-1 + object_server.write_metadata(fd, metadata) + + df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', + obj_name, FakeLogger(), + keep_data_fp=True, disk_chunk_size=csize) + if invalid_type == 'Zero-Byte': + os.remove(df.data_file) + fp = open(df.data_file, 'w') + fp.close() + df.unit_test_len = fsize + return df + + def test_quarantine_valids(self): + df = self._get_data_file(obj_name='1') + for chunk in df: + pass + self.assertFalse(df.quarantined_dir) + + df = self._get_data_file(obj_name='2', csize=1) + for chunk in df: + pass + self.assertFalse(df.quarantined_dir) + + df = self._get_data_file(obj_name='3', csize=100000) + for chunk in df: + pass + self.assertFalse(df.quarantined_dir) + + def run_quarantine_invalids(self, invalid_type): + df = self._get_data_file(invalid_type=invalid_type, obj_name='1') + for chunk in df: + pass + self.assertTrue(df.quarantined_dir) + df = self._get_data_file(invalid_type=invalid_type, + obj_name='2', csize=1) + for chunk in df: + pass + self.assertTrue(df.quarantined_dir) + df = self._get_data_file(invalid_type=invalid_type, + obj_name='3', csize=100000) + for chunk in df: + pass + self.assertTrue(df.quarantined_dir) + df = self._get_data_file(invalid_type=invalid_type, obj_name='4') + self.assertFalse(df.quarantined_dir) + df = self._get_data_file(invalid_type=invalid_type, obj_name='5') + for chunk in df.app_iter_range(0, df.unit_test_len): + pass + self.assertTrue(df.quarantined_dir) + df = self._get_data_file(invalid_type=invalid_type, obj_name='6') + for chunk in df.app_iter_range(0, df.unit_test_len + 100): + pass + self.assertTrue(df.quarantined_dir) + expected_quar = False + # for the following, Content-Length/Zero-Byte errors will always result + # in a quarantine, even if the whole file isn't check-summed + if invalid_type in ('Zero-Byte', 'Content-Length'): + expected_quar = True + df = self._get_data_file(invalid_type=invalid_type, obj_name='7') + for chunk in df.app_iter_range(1, df.unit_test_len): + pass + self.assertEquals(bool(df.quarantined_dir), expected_quar) + df = self._get_data_file(invalid_type=invalid_type, obj_name='8') + for chunk in df.app_iter_range(0, df.unit_test_len - 1): + pass + self.assertEquals(bool(df.quarantined_dir), expected_quar) + df = self._get_data_file(invalid_type=invalid_type, obj_name='8') + for chunk in df.app_iter_range(1, df.unit_test_len + 1): + pass + self.assertEquals(bool(df.quarantined_dir), expected_quar) + + def test_quarantine_invalids(self): + self.run_quarantine_invalids('ETag') + self.run_quarantine_invalids('Content-Length') + self.run_quarantine_invalids('Zero-Byte') + + def test_quarantine_deleted_files(self): + df = self._get_data_file(invalid_type='Content-Length', + extension='.data') + df.close() + self.assertTrue(df.quarantined_dir) + + df = self._get_data_file(invalid_type='Content-Length', + extension='.ts') + df.close() + self.assertFalse(df.quarantined_dir) + df = self._get_data_file(invalid_type='Content-Length', + extension='.ts') + self.assertRaises(DiskFileNotExist, df.get_data_file_size) + + def test_unlinkold(self): + df1 = self._get_data_file() + future_time = str(normalize_timestamp(time() + 100)) + df2 = self._get_data_file(ts=future_time) + self.assertEquals(len(os.listdir(df1.datadir)), 2) + df1.unlinkold(future_time) + self.assertEquals(len(os.listdir(df1.datadir)), 1) + self.assertEquals(os.listdir(df1.datadir)[0], "%s.data" % future_time) + + def test_close_error(self): + + def err(): + raise Exception("bad") + + df = self._get_data_file(fsize=1024 * 1024 * 2) + df._handle_close_quarantine = err + for chunk in df: + pass + # close is called at the end of the iterator + self.assertEquals(df.fp, None) + self.assertEquals(len(df.logger.log_dict['error']), 1) + + def test_quarantine_twice(self): + df = self._get_data_file(invalid_type='Content-Length', + extension='.data') + self.assert_(os.path.isfile(df.data_file)) + quar_dir = df.quarantine() + self.assertFalse(os.path.isfile(df.data_file)) + self.assert_(os.path.isdir(quar_dir)) + self.assertEquals(df.quarantine(), None) class TestObjectController(unittest.TestCase): - """ Test swift.object_server.ObjectController """ + """ Test swift.obj.server.ObjectController """ def setUp(self): """ Set up for testing swift.object_server.ObjectController """ self.testdir = \ os.path.join(mkdtemp(), 'tmp_test_object_server_ObjectController') - mkdirs(self.testdir) - rmtree(self.testdir) - mkdirs(os.path.join(self.testdir, 'sda1')) mkdirs(os.path.join(self.testdir, 'sda1', 'tmp')) conf = {'devices': self.testdir, 'mount_check': 'false'} self.object_controller = object_server.ObjectController(conf) @@ -243,6 +489,35 @@ class TestObjectController(unittest.TestCase): finally: object_server.http_connect = old_http_connect + def test_POST_quarantine_zbyte(self): + """ Test swift.object_server.ObjectController.GET """ + timestamp = normalize_timestamp(time()) + req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': timestamp, + 'Content-Type': 'application/x-test'}) + req.body = 'VERIFY' + resp = self.object_controller.PUT(req) + self.assertEquals(resp.status_int, 201) + file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o', + FakeLogger(), keep_data_fp=True) + + file_name = os.path.basename(file.data_file) + with open(file.data_file) as fp: + metadata = object_server.read_metadata(fp) + os.unlink(file.data_file) + with open(file.data_file, 'w') as fp: + object_server.write_metadata(fp, metadata) + + self.assertEquals(os.listdir(file.datadir)[0], file_name) + req = Request.blank('/sda1/p/a/c/o', + headers={'X-Timestamp': normalize_timestamp(time())}) + resp = self.object_controller.POST(req) + self.assertEquals(resp.status_int, 404) + + quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects', + os.path.basename(os.path.dirname(file.data_file))) + self.assertEquals(os.listdir(quar_dir)[0], file_name) + def test_PUT_invalid_path(self): req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}) resp = self.object_controller.PUT(req) @@ -512,6 +787,34 @@ class TestObjectController(unittest.TestCase): resp = self.object_controller.HEAD(req) self.assertEquals(resp.status_int, 404) + def test_HEAD_quarantine_zbyte(self): + """ Test swift.object_server.ObjectController.GET """ + timestamp = normalize_timestamp(time()) + req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': timestamp, + 'Content-Type': 'application/x-test'}) + req.body = 'VERIFY' + resp = self.object_controller.PUT(req) + self.assertEquals(resp.status_int, 201) + file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o', + FakeLogger(), keep_data_fp=True) + + file_name = os.path.basename(file.data_file) + with open(file.data_file) as fp: + metadata = object_server.read_metadata(fp) + os.unlink(file.data_file) + with open(file.data_file, 'w') as fp: + object_server.write_metadata(fp, metadata) + + self.assertEquals(os.listdir(file.datadir)[0], file_name) + req = Request.blank('/sda1/p/a/c/o') + resp = self.object_controller.HEAD(req) + self.assertEquals(resp.status_int, 404) + + quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects', + os.path.basename(os.path.dirname(file.data_file))) + self.assertEquals(os.listdir(quar_dir)[0], file_name) + def test_GET(self): """ Test swift.object_server.ObjectController.GET """ req = Request.blank('/sda1/p/a/c') @@ -773,6 +1076,116 @@ class TestObjectController(unittest.TestCase): resp = self.object_controller.GET(req) self.assertEquals(resp.status_int, 200) + def test_GET_quarantine(self): + """ Test swift.object_server.ObjectController.GET """ + timestamp = normalize_timestamp(time()) + req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': timestamp, + 'Content-Type': 'application/x-test'}) + req.body = 'VERIFY' + resp = self.object_controller.PUT(req) + self.assertEquals(resp.status_int, 201) + file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o', + FakeLogger(), keep_data_fp=True) + file_name = os.path.basename(file.data_file) + etag = md5() + etag.update('VERIF') + etag = etag.hexdigest() + metadata = {'X-Timestamp': timestamp, + 'Content-Length': 6, 'ETag': etag} + object_server.write_metadata(file.fp, metadata) + self.assertEquals(os.listdir(file.datadir)[0], file_name) + req = Request.blank('/sda1/p/a/c/o') + resp = self.object_controller.GET(req) + quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects', + os.path.basename(os.path.dirname(file.data_file))) + self.assertEquals(os.listdir(file.datadir)[0], file_name) + body = resp.body # actually does quarantining + self.assertEquals(body, 'VERIFY') + self.assertEquals(os.listdir(quar_dir)[0], file_name) + req = Request.blank('/sda1/p/a/c/o') + resp = self.object_controller.GET(req) + self.assertEquals(resp.status_int, 404) + + def test_GET_quarantine_zbyte(self): + """ Test swift.object_server.ObjectController.GET """ + timestamp = normalize_timestamp(time()) + req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': timestamp, + 'Content-Type': 'application/x-test'}) + req.body = 'VERIFY' + resp = self.object_controller.PUT(req) + self.assertEquals(resp.status_int, 201) + file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o', + FakeLogger(), keep_data_fp=True) + file_name = os.path.basename(file.data_file) + with open(file.data_file) as fp: + metadata = object_server.read_metadata(fp) + os.unlink(file.data_file) + with open(file.data_file, 'w') as fp: + object_server.write_metadata(fp, metadata) + + self.assertEquals(os.listdir(file.datadir)[0], file_name) + req = Request.blank('/sda1/p/a/c/o') + resp = self.object_controller.GET(req) + self.assertEquals(resp.status_int, 404) + + quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects', + os.path.basename(os.path.dirname(file.data_file))) + self.assertEquals(os.listdir(quar_dir)[0], file_name) + + def test_GET_quarantine_range(self): + """ Test swift.object_server.ObjectController.GET """ + timestamp = normalize_timestamp(time()) + req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': timestamp, + 'Content-Type': 'application/x-test'}) + req.body = 'VERIFY' + resp = self.object_controller.PUT(req) + self.assertEquals(resp.status_int, 201) + file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o', + FakeLogger(), keep_data_fp=True) + file_name = os.path.basename(file.data_file) + etag = md5() + etag.update('VERIF') + etag = etag.hexdigest() + metadata = {'X-Timestamp': timestamp, + 'Content-Length': 6, 'ETag': etag} + object_server.write_metadata(file.fp, metadata) + self.assertEquals(os.listdir(file.datadir)[0], file_name) + req = Request.blank('/sda1/p/a/c/o') + req.range = 'bytes=0-4' # partial + resp = self.object_controller.GET(req) + quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects', + os.path.basename(os.path.dirname(file.data_file))) + body = resp.body + self.assertEquals(os.listdir(file.datadir)[0], file_name) + self.assertFalse(os.path.isdir(quar_dir)) + req = Request.blank('/sda1/p/a/c/o') + resp = self.object_controller.GET(req) + self.assertEquals(resp.status_int, 200) + + req = Request.blank('/sda1/p/a/c/o') + req.range = 'bytes=1-6' # partial + resp = self.object_controller.GET(req) + quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects', + os.path.basename(os.path.dirname(file.data_file))) + body = resp.body + self.assertEquals(os.listdir(file.datadir)[0], file_name) + self.assertFalse(os.path.isdir(quar_dir)) + + req = Request.blank('/sda1/p/a/c/o') + req.range = 'bytes=0-14' # full + resp = self.object_controller.GET(req) + quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects', + os.path.basename(os.path.dirname(file.data_file))) + self.assertEquals(os.listdir(file.datadir)[0], file_name) + body = resp.body + self.assertTrue(os.path.isdir(quar_dir)) + req = Request.blank('/sda1/p/a/c/o') + resp = self.object_controller.GET(req) + self.assertEquals(resp.status_int, 404) + def test_DELETE(self): """ Test swift.object_server.ObjectController.DELETE """ req = Request.blank('/sda1/p/a/c', @@ -943,38 +1356,6 @@ class TestObjectController(unittest.TestCase): resp = self.object_controller.PUT(req) self.assertEquals(resp.status_int, 400) - def test_disk_file_app_iter_corners(self): - df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o') - mkdirs(df.datadir) - f = open(os.path.join(df.datadir, - normalize_timestamp(time()) + '.data'), 'wb') - f.write('1234567890') - setxattr(f.fileno(), object_server.METADATA_KEY, - pickle.dumps({}, object_server.PICKLE_PROTOCOL)) - f.close() - df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', - keep_data_fp=True) - it = df.app_iter_range(0, None) - sio = StringIO() - for chunk in it: - sio.write(chunk) - self.assertEquals(sio.getvalue(), '1234567890') - - df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', - keep_data_fp=True) - it = df.app_iter_range(5, None) - sio = StringIO() - for chunk in it: - sio.write(chunk) - self.assertEquals(sio.getvalue(), '67890') - - def test_disk_file_mkstemp_creates_dir(self): - tmpdir = os.path.join(self.testdir, 'sda1', 'tmp') - os.rmdir(tmpdir) - with object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', - 'o').mkstemp(): - self.assert_(os.path.exists(tmpdir)) - def test_max_upload_time(self): class SlowBody():