From d5ca365965fc20117cf7bcebeedf81f965abff76 Mon Sep 17 00:00:00 2001 From: Samuel Merritt Date: Mon, 17 Mar 2014 17:54:42 -0700 Subject: [PATCH] Add Storage Policy support to Object Updates The object server will now send its storage policy index to the container server synchronously and asynchronously (via async_pending). Each storage policy gets its own async_pending directory under /srv/node/$disk/objects-$N, so there's no need to change the on-disk pickle format; the policy index comes from the async_pending's filename. This avoids any hassle on upgrade. (Recall that policy 0's objects live in /srv/node/$disk/objects, not objects-0.) Per-policy tempdir as well. Also clean up a couple little things in the object updater. Now it won't abort processing when it encounters a file (not directory) named "async_pending-\d+", and it won't process updates in a directory that does not correspond to a storage policy. That is, if you have policies 1, 2, and 3, but there's a directory on your disk named "async_pending-5", the updater will now skip over that entirely. It won't even bother doing directory listings at all. This is a good idea, believe it or not, because there's nothing good that the container server can do with an update from some unknown storage policy. It can't update the listing, it can't move the object if it's misplaced... all it can do is ignore the request, so it's better to just not send it in the first place. Plus, if this is due to a misconfiguration on one storage node, then the updates will get processed once the configuration is fixed. There's also a drive by fix to update some backend http mocks for container update tests that we're not fully exercising their their request fakes. Because the object server container update code is resilient to to all manor of failure from backend requests the general intent of the tests was unaffected but this change cleans up some confusing logging in the debug logger output. The object-server will send X-Storage-Policy-Index headers with all requests to container severs, including X-Delete containers and all object PUT/DELETE requests. This header value is persisted in the pickle file for the update and sent along with async requests from the object-updater as well. The container server will extract the X-Storage-Policy-Index header from incoming requests and apply it to container broker calls as appropriate defaulting to the legacy storage policy 0 to support seemless migration. DocImpact Implements: blueprint storage-policies Change-Id: I07c730bebaee068f75024fa9c2fa9e11e295d9bd add to object updates Change-Id: Ic97a422238a0d7bc2a411a71a7aba3f8b42fce4d --- swift/container/server.py | 28 ++- swift/obj/diskfile.py | 8 +- swift/obj/mem_server.py | 3 +- swift/obj/server.py | 40 ++-- swift/obj/updater.py | 98 +++++---- test/unit/container/test_server.py | 49 ++++- test/unit/obj/test_diskfile.py | 53 +++-- test/unit/obj/test_server.py | 312 +++++++++++++++++++++++------ test/unit/obj/test_updater.py | 262 ++++++++++++++++++------ 9 files changed, 646 insertions(+), 207 deletions(-) diff --git a/swift/container/server.py b/swift/container/server.py index 2a4eb375ad..2432f2c0c6 100644 --- a/swift/container/server.py +++ b/swift/container/server.py @@ -250,22 +250,24 @@ class ContainerController(object): content_type='text/plain') if self.mount_check and not check_mount(self.root, drive): return HTTPInsufficientStorage(drive=drive, request=req) + # policy index is only relevant for delete_obj (and transitively for + # auto create accounts) + obj_policy_index = self.get_and_validate_policy_index(req) or 0 broker = self._get_container_broker(drive, part, account, container) if account.startswith(self.auto_create_account_prefix) and obj and \ not os.path.exists(broker.db_file): - requested_policy_index = (self.get_and_validate_policy_index(req) - or POLICIES.default.idx) try: broker.initialize( normalize_timestamp( req.headers.get('x-timestamp') or time.time()), - requested_policy_index) + obj_policy_index) except DatabaseAlreadyExists: pass if not os.path.exists(broker.db_file): return HTTPNotFound() if obj: # delete object - broker.delete_object(obj, req.headers.get('x-timestamp')) + broker.delete_object(obj, req.headers.get('x-timestamp'), + obj_policy_index) return HTTPNoContent(request=req) else: # delete container @@ -342,17 +344,22 @@ class ContainerController(object): timestamp = normalize_timestamp(req.headers['x-timestamp']) broker = self._get_container_broker(drive, part, account, container) if obj: # put container object + # obj put expects the policy_index header, default is for + # legacy support during upgrade. + obj_policy_index = requested_policy_index or 0 if account.startswith(self.auto_create_account_prefix) and \ not os.path.exists(broker.db_file): try: - broker.initialize(timestamp, 0) + broker.initialize(timestamp, obj_policy_index) except DatabaseAlreadyExists: pass if not os.path.exists(broker.db_file): return HTTPNotFound() - broker.put_object(obj, timestamp, int(req.headers['x-size']), + broker.put_object(obj, timestamp, + int(req.headers['x-size']), req.headers['x-content-type'], - req.headers['x-etag']) + req.headers['x-etag'], 0, + obj_policy_index) return HTTPCreated(request=req) else: # put container if requested_policy_index is None: @@ -420,7 +427,7 @@ class ContainerController(object): :params record: object entry record :returns: modified record """ - (name, created, size, content_type, etag) = record + (name, created, size, content_type, etag) = record[:5] if content_type is None: return {'subdir': name} response = {'bytes': size, 'hash': etag, 'name': name, @@ -466,8 +473,9 @@ class ContainerController(object): resp_headers = gen_resp_headers(info, is_deleted=is_deleted) if is_deleted: return HTTPNotFound(request=req, headers=resp_headers) - container_list = broker.list_objects_iter(limit, marker, end_marker, - prefix, delimiter, path) + container_list = broker.list_objects_iter( + limit, marker, end_marker, prefix, delimiter, path, + storage_policy_index=info['storage_policy_index']) return self.create_listing(req, out_content_type, info, resp_headers, broker.metadata, container_list, container) diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index 1ea3fe35c8..2cee7aa646 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -531,16 +531,16 @@ class DiskFileManager(object): yield True def pickle_async_update(self, device, account, container, obj, data, - timestamp): + timestamp, policy_idx): device_path = self.construct_dev_path(device) - async_dir = os.path.join(device_path, ASYNCDIR_BASE) + async_dir = os.path.join(device_path, get_async_dir(policy_idx)) ohash = hash_path(account, container, obj) self.threadpools[device].run_in_thread( write_pickle, data, os.path.join(async_dir, ohash[-3:], ohash + '-' + normalize_timestamp(timestamp)), - os.path.join(device_path, 'tmp')) + os.path.join(device_path, get_tmp_dir(policy_idx))) self.logger.increment('async_pendings') def get_diskfile(self, device, partition, account, container, obj, @@ -1005,7 +1005,7 @@ class DiskFile(object): self._container = None self._obj = None self._datadir = None - self._tmpdir = join(device_path, 'tmp') + self._tmpdir = join(device_path, get_tmp_dir(policy_idx)) self._metadata = None self._data_file = None self._fp = None diff --git a/swift/obj/mem_server.py b/swift/obj/mem_server.py index aa61a18ef0..7b3baf1b59 100644 --- a/swift/obj/mem_server.py +++ b/swift/obj/mem_server.py @@ -54,7 +54,7 @@ class ObjectController(server.ObjectController): return self._filesystem.get_diskfile(account, container, obj, **kwargs) def async_update(self, op, account, container, obj, host, partition, - contdevice, headers_out, objdevice): + contdevice, headers_out, objdevice, policy_idx): """ Sends or saves an async update. @@ -68,6 +68,7 @@ class ObjectController(server.ObjectController): :param headers_out: dictionary of headers to send in the container request :param objdevice: device name that the object is in + :param policy_idx: the associated storage policy index """ headers_out['user-agent'] = 'obj-server %s' % os.getpid() full_path = '/%s/%s/%s' % (account, container, obj) diff --git a/swift/obj/server.py b/swift/obj/server.py index 6fa70a40ae..48758bc67f 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -47,6 +47,7 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \ HTTPInsufficientStorage, HTTPForbidden, HTTPException, HeaderKeyDict, \ HTTPConflict from swift.obj.diskfile import DATAFILE_SYSTEM_META, DiskFileManager +from swift.common.storage_policy import POLICY_INDEX class ObjectController(object): @@ -150,11 +151,10 @@ class ObjectController(object): behavior. """ return self._diskfile_mgr.get_diskfile( - device, partition, account, container, obj, - policy_idx=policy_idx, **kwargs) + device, partition, account, container, obj, policy_idx, **kwargs) def async_update(self, op, account, container, obj, host, partition, - contdevice, headers_out, objdevice): + contdevice, headers_out, objdevice, policy_index): """ Sends or saves an async update. @@ -168,6 +168,7 @@ class ObjectController(object): :param headers_out: dictionary of headers to send in the container request :param objdevice: device name that the object is in + :param policy_index: the associated storage policy index """ headers_out['user-agent'] = 'obj-server %s' % os.getpid() full_path = '/%s/%s/%s' % (account, container, obj) @@ -198,10 +199,11 @@ class ObjectController(object): 'obj': obj, 'headers': headers_out} timestamp = headers_out['x-timestamp'] self._diskfile_mgr.pickle_async_update(objdevice, account, container, - obj, data, timestamp) + obj, data, timestamp, + policy_index) def container_update(self, op, account, container, obj, request, - headers_out, objdevice): + headers_out, objdevice, policy_idx): """ Update the container when objects are updated. @@ -224,7 +226,7 @@ class ObjectController(object): if len(conthosts) != len(contdevices): # This shouldn't happen unless there's a bug in the proxy, # but if there is, we want to know about it. - self.logger.error(_('ERROR Container update failed: different ' + self.logger.error(_('ERROR Container update failed: different ' 'numbers of hosts and devices in request: ' '"%s" vs "%s"') % (headers_in.get('X-Container-Host', ''), @@ -238,13 +240,14 @@ class ObjectController(object): headers_out['x-trans-id'] = headers_in.get('x-trans-id', '-') headers_out['referer'] = request.as_referer() + headers_out[POLICY_INDEX] = policy_idx for conthost, contdevice in updates: self.async_update(op, account, container, obj, conthost, contpartition, contdevice, headers_out, - objdevice) + objdevice, policy_idx) def delete_at_update(self, op, delete_at, account, container, obj, - request, objdevice): + request, objdevice, policy_index): """ Update the expiring objects container when objects are updated. @@ -255,6 +258,7 @@ class ObjectController(object): :param obj: object name :param request: the original request driving the update :param objdevice: device name that the object is in + :param policy_index: the policy index to be used for tmp dir """ if config_true_value( request.headers.get('x-backend-replication', 'f')): @@ -266,6 +270,7 @@ class ObjectController(object): hosts = contdevices = [None] headers_in = request.headers headers_out = HeaderKeyDict({ + POLICY_INDEX: 0, # system accounts are always Policy-0 'x-timestamp': headers_in['x-timestamp'], 'x-trans-id': headers_in.get('x-trans-id', '-'), 'referer': request.as_referer()}) @@ -311,7 +316,8 @@ class ObjectController(object): self.async_update( op, self.expiring_objects_account, delete_at_container, '%s-%s/%s/%s' % (delete_at, account, container, obj), - host, partition, contdevice, headers_out, objdevice) + host, partition, contdevice, headers_out, objdevice, + policy_index) @public @timing_stats() @@ -351,10 +357,11 @@ class ObjectController(object): if orig_delete_at != new_delete_at: if new_delete_at: self.delete_at_update('PUT', new_delete_at, account, container, - obj, request, device) + obj, request, device, policy_idx) if orig_delete_at: self.delete_at_update('DELETE', orig_delete_at, account, - container, obj, request, device) + container, obj, request, device, + policy_idx) disk_file.write_metadata(metadata) return HTTPAccepted(request=request) @@ -458,11 +465,11 @@ class ObjectController(object): if new_delete_at: self.delete_at_update( 'PUT', new_delete_at, account, container, obj, request, - device) + device, policy_idx) if orig_delete_at: self.delete_at_update( 'DELETE', orig_delete_at, account, container, obj, - request, device) + request, device, policy_idx) self.container_update( 'PUT', account, container, obj, request, HeaderKeyDict({ @@ -470,7 +477,7 @@ class ObjectController(object): 'x-content-type': metadata['Content-Type'], 'x-timestamp': metadata['X-Timestamp'], 'x-etag': metadata['ETag']}), - device) + device, policy_idx) return HTTPCreated(request=request, etag=etag) @public @@ -608,14 +615,15 @@ class ObjectController(object): body='X-If-Delete-At and X-Delete-At do not match') if orig_delete_at: self.delete_at_update('DELETE', orig_delete_at, account, - container, obj, request, device) + container, obj, request, device, + policy_idx) req_timestamp = request.headers['X-Timestamp'] if orig_timestamp < req_timestamp: disk_file.delete(req_timestamp) self.container_update( 'DELETE', account, container, obj, request, HeaderKeyDict({'x-timestamp': req_timestamp}), - device) + device, policy_idx) return response_class(request=request) @public diff --git a/swift/obj/updater.py b/swift/obj/updater.py index c3ba44635a..95f4357ec3 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -26,10 +26,11 @@ from eventlet import patcher, Timeout from swift.common.bufferedhttp import http_connect from swift.common.exceptions import ConnectionTimeout from swift.common.ring import Ring +from swift.common.storage_policy import POLICY_INDEX from swift.common.utils import get_logger, renamer, write_pickle, \ dump_recon_cache, config_true_value, ismount from swift.common.daemon import Daemon -from swift.obj.diskfile import ASYNCDIR_BASE +from swift.obj.diskfile import get_tmp_dir, get_async_dir, ASYNCDIR_BASE from swift.common.http import is_success, HTTP_NOT_FOUND, \ HTTP_INTERNAL_SERVER_ERROR @@ -137,45 +138,69 @@ class ObjectUpdater(Daemon): :param device: path to device """ start_time = time.time() - async_pending = os.path.join(device, ASYNCDIR_BASE) - if not os.path.isdir(async_pending): - return - for prefix in os.listdir(async_pending): - prefix_path = os.path.join(async_pending, prefix) - if not os.path.isdir(prefix_path): + # loop through async pending dirs for all policies + for asyncdir in os.listdir(device): + # skip stuff like "accounts", "containers", etc. + if not (asyncdir == ASYNCDIR_BASE or + asyncdir.startswith(ASYNCDIR_BASE + '-')): continue - last_obj_hash = None - for update in sorted(os.listdir(prefix_path), reverse=True): - update_path = os.path.join(prefix_path, update) - if not os.path.isfile(update_path): - continue - try: - obj_hash, timestamp = update.split('-') - except ValueError: - self.logger.increment('errors') - self.logger.error( - _('ERROR async pending file with unexpected name %s') - % (update_path)) - continue - if obj_hash == last_obj_hash: - self.logger.increment("unlinks") - os.unlink(update_path) - else: - self.process_object_update(update_path, device) - last_obj_hash = obj_hash - time.sleep(self.slowdown) - try: - os.rmdir(prefix_path) - except OSError: - pass - self.logger.timing_since('timing', start_time) - def process_object_update(self, update_path, device): + # we only care about directories + async_pending = os.path.join(device, asyncdir) + if not os.path.isdir(async_pending): + continue + + if asyncdir == ASYNCDIR_BASE: + policy_idx = 0 + else: + _junk, policy_idx = asyncdir.split('-', 1) + try: + policy_idx = int(policy_idx) + get_async_dir(policy_idx) + except ValueError: + self.logger.warn(_('Directory %s does not map to a ' + 'valid policy') % asyncdir) + continue + + for prefix in os.listdir(async_pending): + prefix_path = os.path.join(async_pending, prefix) + if not os.path.isdir(prefix_path): + continue + last_obj_hash = None + for update in sorted(os.listdir(prefix_path), reverse=True): + update_path = os.path.join(prefix_path, update) + if not os.path.isfile(update_path): + continue + try: + obj_hash, timestamp = update.split('-') + except ValueError: + self.logger.increment('errors') + self.logger.error( + _('ERROR async pending file with unexpected ' + 'name %s') + % (update_path)) + continue + if obj_hash == last_obj_hash: + self.logger.increment("unlinks") + os.unlink(update_path) + else: + self.process_object_update(update_path, device, + policy_idx) + last_obj_hash = obj_hash + time.sleep(self.slowdown) + try: + os.rmdir(prefix_path) + except OSError: + pass + self.logger.timing_since('timing', start_time) + + def process_object_update(self, update_path, device, policy_idx): """ Process the object information to be updated and update. :param update_path: path to pickled object update file :param device: path to device + :param policy_idx: storage policy index of object update """ try: update = pickle.load(open(update_path, 'rb')) @@ -196,8 +221,10 @@ class ObjectUpdater(Daemon): new_successes = False for node in nodes: if node['id'] not in successes: + headers = update['headers'].copy() + headers.setdefault(POLICY_INDEX, str(policy_idx)) status = self.object_update(node, part, update['op'], obj, - update['headers']) + headers) if not is_success(status) and status != HTTP_NOT_FOUND: success = False else: @@ -217,7 +244,8 @@ class ObjectUpdater(Daemon): {'obj': obj, 'path': update_path}) if new_successes: update['successes'] = successes - write_pickle(update, update_path, os.path.join(device, 'tmp')) + write_pickle(update, update_path, os.path.join( + device, get_tmp_dir(policy_idx))) def object_update(self, node, part, op, obj, headers): """ diff --git a/test/unit/container/test_server.py b/test/unit/container/test_server.py index 7495b96648..d9e77d353c 100644 --- a/test/unit/container/test_server.py +++ b/test/unit/container/test_server.py @@ -38,7 +38,8 @@ from swift.common import constraints from swift.common.utils import (normalize_timestamp, mkdirs, public, replication, lock_parent_directory) from test.unit import fake_http_connect -from swift.common.storage_policy import POLICY_INDEX, POLICIES +from swift.common.storage_policy import (POLICY_INDEX, POLICIES, + StoragePolicy) from swift.common.request_helpers import get_sys_meta_prefix from test.unit import patch_policies @@ -73,6 +74,13 @@ class TestContainerController(unittest.TestCase): def tearDown(self): rmtree(os.path.dirname(self.testdir), ignore_errors=1) + def _update_object_put_headers(self, req): + """ + Override this method in test subclasses to test post upgrade + behavior. + """ + pass + def _check_put_container_storage_policy(self, req, policy_index): resp = req.get_response(self.controller) self.assertEqual(201, resp.status_int) @@ -179,7 +187,9 @@ class TestContainerController(unittest.TestCase): 'x-content-type': 'text/plain', 'x-etag': 'x', }) - obj_put_request.get_response(self.controller) + self._update_object_put_headers(obj_put_request) + obj_put_resp = obj_put_request.get_response(self.controller) + self.assertEqual(obj_put_resp.status_int // 100, 2) # re-issue HEAD request response = req.get_response(self.controller) self.assertEqual(response.status_int // 100, 2) @@ -1296,6 +1306,7 @@ class TestContainerController(unittest.TestCase): environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '0', 'HTTP_X_SIZE': 1, 'HTTP_X_CONTENT_TYPE': 'text/plain', 'HTTP_X_ETAG': 'x'}) + self._update_object_put_headers(req) resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 201) req = Request.blank( @@ -1306,6 +1317,7 @@ class TestContainerController(unittest.TestCase): req = Request.blank( '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'DELETE'}, headers={'X-Timestamp': '4'}) + self._update_object_put_headers(req) resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) req = Request.blank( @@ -1469,6 +1481,7 @@ class TestContainerController(unittest.TestCase): 'HTTP_X_CONTENT_TYPE': 'text/plain', 'HTTP_X_ETAG': 'x', 'HTTP_X_SIZE': 0}) + self._update_object_put_headers(req) resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 201) # test format @@ -1545,6 +1558,7 @@ class TestContainerController(unittest.TestCase): 'HTTP_X_CONTENT_TYPE': 'text/plain', 'HTTP_X_ETAG': 'x', 'HTTP_X_SIZE': 0}) + self._update_object_put_headers(req) resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 201) plain_body = '0\n1\n2\n' @@ -1618,6 +1632,7 @@ class TestContainerController(unittest.TestCase): 'HTTP_X_CONTENT_TYPE': 'text/plain', 'HTTP_X_ETAG': 'x', 'HTTP_X_SIZE': 0}) + self._update_object_put_headers(req) resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 201) # test format @@ -1657,6 +1672,7 @@ class TestContainerController(unittest.TestCase): 'HTTP_X_CONTENT_TYPE': 'text/plain', 'HTTP_X_ETAG': 'x', 'HTTP_X_SIZE': 0}) + self._update_object_put_headers(req) resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 201) xml_body = '\n' \ @@ -1736,6 +1752,7 @@ class TestContainerController(unittest.TestCase): 'HTTP_X_TIMESTAMP': '1', 'HTTP_X_CONTENT_TYPE': 'text/plain', 'HTTP_X_ETAG': 'x', 'HTTP_X_SIZE': 0}) + self._update_object_put_headers(req) resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 201) # test limit with marker @@ -1758,6 +1775,7 @@ class TestContainerController(unittest.TestCase): 'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '1', 'HTTP_X_CONTENT_TYPE': ctype, 'HTTP_X_ETAG': 'x', 'HTTP_X_SIZE': 0}) + self._update_object_put_headers(req) resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 201) req = Request.blank('/sda1/p/a/c?format=json', @@ -1799,6 +1817,7 @@ class TestContainerController(unittest.TestCase): 'HTTP_X_CONTENT_TYPE': 'text/plain', 'HTTP_X_ETAG': 'x', 'HTTP_X_SIZE': 0}) + self._update_object_put_headers(req) resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 201) # test limit @@ -1822,6 +1841,7 @@ class TestContainerController(unittest.TestCase): 'HTTP_X_CONTENT_TYPE': 'text/plain', 'HTTP_X_ETAG': 'x', 'HTTP_X_SIZE': 0}) + self._update_object_put_headers(req) resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 201) req = Request.blank( @@ -1848,6 +1868,7 @@ class TestContainerController(unittest.TestCase): 'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '1', 'HTTP_X_CONTENT_TYPE': 'text/plain', 'HTTP_X_ETAG': 'x', 'HTTP_X_SIZE': 0}) + self._update_object_put_headers(req) resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 201) req = Request.blank( @@ -1872,6 +1893,7 @@ class TestContainerController(unittest.TestCase): 'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '1', 'HTTP_X_CONTENT_TYPE': 'text/plain', 'HTTP_X_ETAG': 'x', 'HTTP_X_SIZE': 0}) + self._update_object_put_headers(req) resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 201) req = Request.blank( @@ -1896,6 +1918,7 @@ class TestContainerController(unittest.TestCase): 'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '1', 'HTTP_X_CONTENT_TYPE': 'text/plain', 'HTTP_X_ETAG': 'x', 'HTTP_X_SIZE': 0}) + self._update_object_put_headers(req) resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 201) req = Request.blank( @@ -1926,6 +1949,7 @@ class TestContainerController(unittest.TestCase): 'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '1', 'HTTP_X_CONTENT_TYPE': 'text/plain', 'HTTP_X_ETAG': 'x', 'HTTP_X_SIZE': 0}) + self._update_object_put_headers(req) resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 201) req = Request.blank( @@ -2397,5 +2421,26 @@ class TestContainerController(unittest.TestCase): '404 - "-" "-" "-" 2.0000 "-"',), {})]) +@patch_policies([ + StoragePolicy(0, 'legacy'), + StoragePolicy(1, 'one'), + StoragePolicy(2, 'two', True), + StoragePolicy(3, 'three'), + StoragePolicy(4, 'four'), +]) +class TestNonLegacyDefaultStoragePolicy(TestContainerController): + """ + Test swift.container.server.ContainerController with a non-legacy default + Storage Policy. + """ + + def _update_object_put_headers(self, req): + """ + Add policy index headers for containers created with default POLICY + - which in this TestCase is 1. + """ + req.headers[POLICY_INDEX] = str(POLICIES.default.idx) + + if __name__ == '__main__': unittest.main() diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index c62e24ae15..7f45fbb9e8 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -43,15 +43,12 @@ from swift.common.exceptions import DiskFileNotExist, DiskFileQuarantined, \ DiskFileDeviceUnavailable, DiskFileDeleted, DiskFileNotOpen, \ DiskFileError, ReplicationLockTimeout, PathNotDir, DiskFileCollision, \ DiskFileExpired, SwiftException, DiskFileNoSpace -from swift.common.storage_policy import StoragePolicy, POLICIES, \ - get_policy_string +from swift.common.storage_policy import POLICIES, get_policy_string from functools import partial get_data_dir = partial(get_policy_string, diskfile.DATADIR_BASE) get_tmp_dir = partial(get_policy_string, diskfile.TMP_BASE) -_mocked_policies = [StoragePolicy(0, 'zero', False), - StoragePolicy(1, 'one', True)] def _create_test_ring(path): @@ -81,6 +78,7 @@ def _create_test_ring(path): reload_time=intended_reload_time) +@patch_policies class TestDiskFileModuleMethods(unittest.TestCase): def setUp(self): @@ -115,7 +113,6 @@ class TestDiskFileModuleMethods(unittest.TestCase): '0', 'a', 'c', 'o', policy_idx) - @patch_policies(_mocked_policies) def test_extract_policy_index(self): # good path names pn = 'objects/0/606/1984527ed7ef6247c78606/1401379842.14643.data' @@ -157,7 +154,6 @@ class TestDiskFileModuleMethods(unittest.TestCase): bad_path = '/srv/node/sda1/obj1/1/abc/def/1234.data' self.assertEqual(diskfile.extract_policy_index(bad_path), 0) - @patch_policies(_mocked_policies) def test_quarantine_renamer(self): for policy in POLICIES: # we use this for convenience, not really about a diskfile layout @@ -185,7 +181,6 @@ class TestDiskFileModuleMethods(unittest.TestCase): self.assertRaises(OSError, diskfile.hash_suffix, os.path.join(self.testdir, "doesnotexist"), 101) - @patch_policies(_mocked_policies) def test_get_data_dir(self): self.assertEquals(diskfile.get_data_dir(0), diskfile.DATADIR_BASE) self.assertEquals(diskfile.get_data_dir(1), @@ -194,7 +189,6 @@ class TestDiskFileModuleMethods(unittest.TestCase): self.assertRaises(ValueError, diskfile.get_data_dir, 99) - @patch_policies(_mocked_policies) def test_get_async_dir(self): self.assertEquals(diskfile.get_async_dir(0), diskfile.ASYNCDIR_BASE) @@ -204,7 +198,6 @@ class TestDiskFileModuleMethods(unittest.TestCase): self.assertRaises(ValueError, diskfile.get_async_dir, 99) - @patch_policies(_mocked_policies) def test_get_tmp_dir(self): self.assertEquals(diskfile.get_tmp_dir(0), diskfile.TMP_BASE) @@ -214,6 +207,26 @@ class TestDiskFileModuleMethods(unittest.TestCase): self.assertRaises(ValueError, diskfile.get_tmp_dir, 99) + def test_pickle_async_update_tmp_dir(self): + for policy in POLICIES: + if int(policy) == 0: + tmp_part = 'tmp' + else: + tmp_part = 'tmp-%d' % policy + tmp_path = os.path.join( + self.devices, self.existing_device, tmp_part) + self.assertFalse(os.path.isdir(tmp_path)) + pickle_args = (self.existing_device, 'a', 'c', 'o', + 'data', 0.0, int(policy)) + # async updates don't create their tmpdir on their own + self.assertRaises(OSError, self.df_mgr.pickle_async_update, + *pickle_args) + os.makedirs(tmp_path) + # now create a async update + self.df_mgr.pickle_async_update(*pickle_args) + # check tempdir + self.assertTrue(os.path.isdir(tmp_path)) + def test_hash_suffix_hash_dir_is_file_quarantine(self): df = self._create_diskfile() mkdirs(os.path.dirname(df._datadir)) @@ -647,6 +660,7 @@ class TestDiskFileModuleMethods(unittest.TestCase): self.check_hash_cleanup_listdir(file_list, [file2]) +@patch_policies class TestObjectAuditLocationGenerator(unittest.TestCase): def _make_file(self, path): try: @@ -813,11 +827,11 @@ class TestDiskFileManager(unittest.TestCase): with mock.patch('swift.obj.diskfile.write_pickle') as wp: self.df_mgr.pickle_async_update(self.existing_device1, 'a', 'c', 'o', - dict(a=1, b=2), ts) + dict(a=1, b=2), ts, 0) dp = self.df_mgr.construct_dev_path(self.existing_device1) ohash = diskfile.hash_path('a', 'c', 'o') wp.assert_called_with({'a': 1, 'b': 2}, - os.path.join(dp, diskfile.ASYNCDIR_BASE, + os.path.join(dp, diskfile.get_async_dir(0), ohash[-3:], ohash + '-' + ts), os.path.join(dp, 'tmp')) self.df_mgr.logger.increment.assert_called_with('async_pendings') @@ -895,6 +909,7 @@ class TestDiskFileManager(unittest.TestCase): self.assertTrue(lock_exc is None) +@patch_policies class TestDiskFile(unittest.TestCase): """Test swift.obj.diskfile.DiskFile""" @@ -904,7 +919,9 @@ class TestDiskFile(unittest.TestCase): self.testdir = os.path.join( self.tmpdir, 'tmp_test_obj_server_DiskFile') self.existing_device = 'sda1' - mkdirs(os.path.join(self.testdir, self.existing_device, 'tmp')) + for policy in POLICIES: + mkdirs(os.path.join(self.testdir, self.existing_device, + get_tmp_dir(policy.idx))) self._orig_tpool_exc = tpool.execute tpool.execute = lambda f, *args, **kwargs: f(*args, **kwargs) self.conf = dict(devices=self.testdir, mount_check='false', @@ -1162,11 +1179,13 @@ class TestDiskFile(unittest.TestCase): self.assertEqual(quarantine_msgs, []) def test_disk_file_mkstemp_creates_dir(self): - tmpdir = os.path.join(self.testdir, self.existing_device, 'tmp') - os.rmdir(tmpdir) - df = self._simple_get_diskfile() - with df.create(): - self.assert_(os.path.exists(tmpdir)) + for policy in POLICIES: + tmpdir = os.path.join(self.testdir, self.existing_device, + get_tmp_dir(policy.idx)) + os.rmdir(tmpdir) + df = self._simple_get_diskfile(policy_idx=policy.idx) + with df.create(): + self.assert_(os.path.exists(tmpdir)) def _get_open_disk_file(self, invalid_type=None, obj_name='o', fsize=1024, csize=8, mark_deleted=False, prealloc=False, diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index 137a6dfa1f..8e73429268 100755 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -23,17 +23,19 @@ import os import mock import unittest import math +import random from shutil import rmtree from StringIO import StringIO from time import gmtime, strftime, time, struct_time from tempfile import mkdtemp from hashlib import md5 +import itertools from eventlet import sleep, spawn, wsgi, listen, Timeout, tpool from nose import SkipTest -from test.unit import FakeLogger, debug_logger +from test.unit import FakeLogger, debug_logger, mocked_http_conn from test.unit import connect_tcp, readuntil2crlfs, patch_policies from swift.obj import server as object_server from swift.obj import diskfile @@ -50,6 +52,7 @@ def mock_time(*args, **kwargs): return 5000.0 +@patch_policies class TestObjectController(unittest.TestCase): """Test swift.obj.server.ObjectController""" @@ -59,7 +62,6 @@ class TestObjectController(unittest.TestCase): utils.HASH_PATH_PREFIX = 'startcap' self.testdir = \ os.path.join(mkdtemp(), 'tmp_test_object_server_ObjectController') - mkdirs(os.path.join(self.testdir, 'sda1', 'tmp')) conf = {'devices': self.testdir, 'mount_check': 'false'} self.object_controller = object_server.ObjectController( conf, logger=debug_logger()) @@ -74,6 +76,10 @@ class TestObjectController(unittest.TestCase): rmtree(os.path.dirname(self.testdir)) tpool.execute = self._orig_tpool_exc + def _stage_tmp_dir(self, policy): + mkdirs(os.path.join(self.testdir, 'sda1', + diskfile.get_tmp_dir(int(policy)))) + def check_all_api_methods(self, obj_name='o', alt_res=None): path = '/sda1/p/a/c/%s' % obj_name body = 'SPECIAL_STRING' @@ -2100,6 +2106,8 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.headers['content-encoding'], 'gzip') def test_async_update_http_connect(self): + policy = random.choice(list(POLICIES)) + self._stage_tmp_dir(policy) given_args = [] def fake_http_connect(*args): @@ -2111,19 +2119,22 @@ class TestObjectController(unittest.TestCase): object_server.http_connect = fake_http_connect self.object_controller.async_update( 'PUT', 'a', 'c', 'o', '127.0.0.1:1234', 1, 'sdc1', - {'x-timestamp': '1', 'x-out': 'set'}, 'sda1') + {'x-timestamp': '1', 'x-out': 'set', + POLICY_INDEX: policy.idx}, 'sda1', policy.idx) finally: object_server.http_connect = orig_http_connect self.assertEquals( given_args, ['127.0.0.1', '1234', 'sdc1', 1, 'PUT', '/a/c/o', { 'x-timestamp': '1', 'x-out': 'set', - 'user-agent': 'obj-server %s' % os.getpid()}]) + 'user-agent': 'obj-server %s' % os.getpid(), + POLICY_INDEX: policy.idx}]) @patch_policies([storage_policy.StoragePolicy(0, 'zero', True), storage_policy.StoragePolicy(1, 'one'), storage_policy.StoragePolicy(37, 'fantastico')]) def test_updating_multiple_delete_at_container_servers(self): + policy = random.choice(list(POLICIES)) self.object_controller.expiring_objects_account = 'exp' self.object_controller.expiring_objects_container_divisor = 60 @@ -2153,12 +2164,15 @@ class TestObjectController(unittest.TestCase): dict((k, v) for k, v in captured_args.iteritems() if v is not None)) + return SuccessfulFakeConn() + req = Request.blank( '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': '12345', 'Content-Type': 'application/burrito', 'Content-Length': '0', + POLICY_INDEX: policy.idx, 'X-Container-Partition': '20', 'X-Container-Host': '1.2.3.4:5', 'X-Container-Device': 'sdb1', @@ -2191,8 +2205,10 @@ class TestObjectController(unittest.TestCase): 'x-etag': 'd41d8cd98f00b204e9800998ecf8427e', 'x-size': '0', 'x-timestamp': '12345', + POLICY_INDEX: '37', 'referer': 'PUT http://localhost/sda1/p/a/c/o', 'user-agent': 'obj-server %d' % os.getpid(), + POLICY_INDEX: policy.idx, 'x-trans-id': '-'})}) self.assertEquals( http_connect_args[1], @@ -2210,6 +2226,7 @@ class TestObjectController(unittest.TestCase): 'x-timestamp': '12345', 'referer': 'PUT http://localhost/sda1/p/a/c/o', 'user-agent': 'obj-server %d' % os.getpid(), + POLICY_INDEX: 0, # system account storage policy is 0 'x-trans-id': '-'})}) self.assertEquals( http_connect_args[2], @@ -2227,6 +2244,7 @@ class TestObjectController(unittest.TestCase): 'x-timestamp': '12345', 'referer': 'PUT http://localhost/sda1/p/a/c/o', 'user-agent': 'obj-server %d' % os.getpid(), + POLICY_INDEX: 0, # system account storage policy is 0 'x-trans-id': '-'})}) @patch_policies([storage_policy.StoragePolicy(0, 'zero', True), @@ -2259,13 +2277,15 @@ class TestObjectController(unittest.TestCase): dict((k, v) for k, v in captured_args.iteritems() if v is not None)) + return SuccessfulFakeConn() + req = Request.blank( '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': '12345', 'Content-Type': 'application/burrito', 'Content-Length': '0', - 'X-Storage-Policy-Index': '26', + POLICY_INDEX: '26', 'X-Container-Partition': '20', 'X-Container-Host': '1.2.3.4:5, 6.7.8.9:10', 'X-Container-Device': 'sdb1, sdf1'}) @@ -2291,6 +2311,7 @@ class TestObjectController(unittest.TestCase): 'x-etag': 'd41d8cd98f00b204e9800998ecf8427e', 'x-size': '0', 'x-timestamp': '12345', + POLICY_INDEX: '26', 'referer': 'PUT http://localhost/sda1/p/a/c/o', 'user-agent': 'obj-server %d' % os.getpid(), 'x-trans-id': '-'})}) @@ -2308,11 +2329,94 @@ class TestObjectController(unittest.TestCase): 'x-etag': 'd41d8cd98f00b204e9800998ecf8427e', 'x-size': '0', 'x-timestamp': '12345', + POLICY_INDEX: '26', 'referer': 'PUT http://localhost/sda1/p/a/c/o', 'user-agent': 'obj-server %d' % os.getpid(), 'x-trans-id': '-'})}) + def test_object_delete_at_aysnc_update(self): + policy = random.choice(list(POLICIES)) + ts = (normalize_timestamp(t) for t in + itertools.count(int(time()))) + + container_updates = [] + + def capture_updates(ip, port, method, path, headers, *args, **kwargs): + container_updates.append((ip, port, method, path, headers)) + + put_timestamp = ts.next() + delete_at_timestamp = utils.normalize_delete_at_timestamp(ts.next()) + delete_at_container = ( + int(delete_at_timestamp) / + self.object_controller.expiring_objects_container_divisor * + self.object_controller.expiring_objects_container_divisor) + req = Request.blank('/sda1/p/a/c/o', method='PUT', body='', headers={ + 'Content-Type': 'text/plain', + 'X-Timestamp': put_timestamp, + 'X-Container-Host': '10.0.0.1:6001', + 'X-Container-Device': 'sda1', + 'X-Container-Partition': 'p', + 'X-Delete-At': delete_at_timestamp, + 'X-Delete-At-Container': delete_at_container, + 'X-Delete-At-Partition': 'p', + 'X-Delete-At-Host': '10.0.0.2:6002', + 'X-Delete-At-Device': 'sda1', + POLICY_INDEX: int(policy), + }) + with mocked_http_conn( + 500, 500, give_connect=capture_updates) as fake_conn: + resp = req.get_response(self.object_controller) + self.assertRaises(StopIteration, fake_conn.code_iter.next) + self.assertEqual(resp.status_int, 201) + self.assertEquals(2, len(container_updates)) + delete_at_update, container_update = container_updates + # delete_at_update + ip, port, method, path, headers = delete_at_update + self.assertEqual(ip, '10.0.0.2') + self.assertEqual(port, '6002') + self.assertEqual(method, 'PUT') + self.assertEqual(path, '/sda1/p/.expiring_objects/%s/%s-a/c/o' % + (delete_at_container, delete_at_timestamp)) + expected = { + 'X-Timestamp': put_timestamp, + POLICY_INDEX: 0, # system account is always 0 + } + for key, value in expected.items(): + self.assertEqual(headers[key], str(value)) + # container_update + ip, port, method, path, headers = container_update + self.assertEqual(ip, '10.0.0.1') + self.assertEqual(port, '6001') + self.assertEqual(method, 'PUT') + self.assertEqual(path, '/sda1/p/a/c/o') + expected = { + 'X-Timestamp': put_timestamp, + POLICY_INDEX: int(policy), + } + for key, value in expected.items(): + self.assertEqual(headers[key], str(value)) + # check async pendings + async_dir = os.path.join(self.testdir, 'sda1', + diskfile.get_async_dir(policy.idx)) + found_files = [] + for root, dirs, files in os.walk(async_dir): + for f in files: + async_file = os.path.join(root, f) + found_files.append(async_file) + data = pickle.load(open(async_file)) + if data['account'] == 'a': + self.assertEquals( + int(data['headers'][POLICY_INDEX]), policy.idx) + elif data['account'] == '.expiring_objects': + self.assertEquals( + int(data['headers'][POLICY_INDEX]), 0) + else: + self.fail('unexpected async pending data') + self.assertEqual(2, len(found_files)) + def test_async_update_saves_on_exception(self): + policy = random.choice(list(POLICIES)) + self._stage_tmp_dir(policy) _prefix = utils.HASH_PATH_PREFIX utils.HASH_PATH_PREFIX = '' @@ -2324,19 +2428,24 @@ class TestObjectController(unittest.TestCase): object_server.http_connect = fake_http_connect self.object_controller.async_update( 'PUT', 'a', 'c', 'o', '127.0.0.1:1234', 1, 'sdc1', - {'x-timestamp': '1', 'x-out': 'set'}, 'sda1') + {'x-timestamp': '1', 'x-out': 'set', + POLICY_INDEX: policy.idx}, 'sda1', policy.idx) finally: object_server.http_connect = orig_http_connect utils.HASH_PATH_PREFIX = _prefix + async_dir = diskfile.get_async_dir(policy.idx) self.assertEquals( pickle.load(open(os.path.join( - self.testdir, 'sda1', 'async_pending', 'a83', + self.testdir, 'sda1', async_dir, 'a83', '06fbf0b514e5199dfc4e00f42eb5ea83-0000000001.00000'))), {'headers': {'x-timestamp': '1', 'x-out': 'set', - 'user-agent': 'obj-server %s' % os.getpid()}, + 'user-agent': 'obj-server %s' % os.getpid(), + POLICY_INDEX: policy.idx}, 'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT'}) def test_async_update_saves_on_non_2xx(self): + policy = random.choice(list(POLICIES)) + self._stage_tmp_dir(policy) _prefix = utils.HASH_PATH_PREFIX utils.HASH_PATH_PREFIX = '' @@ -2361,13 +2470,16 @@ class TestObjectController(unittest.TestCase): object_server.http_connect = fake_http_connect(status) self.object_controller.async_update( 'PUT', 'a', 'c', 'o', '127.0.0.1:1234', 1, 'sdc1', - {'x-timestamp': '1', 'x-out': str(status)}, 'sda1') + {'x-timestamp': '1', 'x-out': str(status), + POLICY_INDEX: policy.idx}, 'sda1', policy.idx) + async_dir = diskfile.get_async_dir(policy.idx) self.assertEquals( pickle.load(open(os.path.join( - self.testdir, 'sda1', 'async_pending', 'a83', + self.testdir, 'sda1', async_dir, 'a83', '06fbf0b514e5199dfc4e00f42eb5ea83-0000000001.00000'))), {'headers': {'x-timestamp': '1', 'x-out': str(status), - 'user-agent': 'obj-server %s' % os.getpid()}, + 'user-agent': 'obj-server %s' % os.getpid(), + POLICY_INDEX: policy.idx}, 'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT'}) finally: @@ -2399,7 +2511,7 @@ class TestObjectController(unittest.TestCase): object_server.http_connect = fake_http_connect(status) self.object_controller.async_update( 'PUT', 'a', 'c', 'o', '127.0.0.1:1234', 1, 'sdc1', - {'x-timestamp': '1', 'x-out': str(status)}, 'sda1') + {'x-timestamp': '1', 'x-out': str(status)}, 'sda1', 0) self.assertFalse( os.path.exists(os.path.join( self.testdir, 'sda1', 'async_pending', 'a83', @@ -2409,6 +2521,8 @@ class TestObjectController(unittest.TestCase): utils.HASH_PATH_PREFIX = _prefix def test_async_update_saves_on_timeout(self): + policy = random.choice(list(POLICIES)) + self._stage_tmp_dir(policy) _prefix = utils.HASH_PATH_PREFIX utils.HASH_PATH_PREFIX = '' @@ -2428,10 +2542,12 @@ class TestObjectController(unittest.TestCase): self.object_controller.node_timeout = 0.001 self.object_controller.async_update( 'PUT', 'a', 'c', 'o', '127.0.0.1:1234', 1, 'sdc1', - {'x-timestamp': '1', 'x-out': str(status)}, 'sda1') + {'x-timestamp': '1', 'x-out': str(status)}, 'sda1', + policy.idx) + async_dir = diskfile.get_async_dir(int(policy)) self.assertTrue( os.path.exists(os.path.join( - self.testdir, 'sda1', 'async_pending', 'a83', + self.testdir, 'sda1', async_dir, 'a83', '06fbf0b514e5199dfc4e00f42eb5ea83-0000000001.00000'))) finally: object_server.http_connect = orig_http_connect @@ -2453,39 +2569,88 @@ class TestObjectController(unittest.TestCase): 'PUT', 'a', 'c', 'o', req, { 'x-size': '0', 'x-etag': 'd41d8cd98f00b204e9800998ecf8427e', 'x-content-type': 'text/plain', 'x-timestamp': '1'}, - 'sda1') + 'sda1', 0) self.assertEquals(given_args, []) - def test_container_update(self): - given_args = [] + def test_container_update_success(self): + container_updates = [] - def fake_async_update(*args): - given_args.extend(args) + def capture_updates(ip, port, method, path, headers, *args, **kwargs): + container_updates.append((ip, port, method, path, headers)) - self.object_controller.async_update = fake_async_update req = Request.blank( - '/v1/a/c/o', + '/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': 1, 'X-Trans-Id': '123', - 'X-Container-Host': 'chost', + 'X-Container-Host': 'chost:cport', 'X-Container-Partition': 'cpartition', - 'X-Container-Device': 'cdevice'}) - self.object_controller.container_update( - 'PUT', 'a', 'c', 'o', req, { - 'x-size': '0', 'x-etag': 'd41d8cd98f00b204e9800998ecf8427e', - 'x-content-type': 'text/plain', 'x-timestamp': '1'}, - 'sda1') - self.assertEquals( - given_args, [ - 'PUT', 'a', 'c', 'o', 'chost', 'cpartition', 'cdevice', { - 'x-size': '0', - 'x-etag': 'd41d8cd98f00b204e9800998ecf8427e', - 'x-content-type': 'text/plain', - 'x-timestamp': '1', - 'x-trans-id': '123', - 'referer': 'PUT http://localhost/v1/a/c/o'}, - 'sda1']) + 'X-Container-Device': 'cdevice', + 'Content-Type': 'text/plain'}, body='') + with mocked_http_conn( + 200, give_connect=capture_updates) as fake_conn: + resp = req.get_response(self.object_controller) + self.assertRaises(StopIteration, fake_conn.code_iter.next) + self.assertEqual(resp.status_int, 201) + self.assertEqual(len(container_updates), 1) + ip, port, method, path, headers = container_updates[0] + self.assertEqual(ip, 'chost') + self.assertEqual(port, 'cport') + self.assertEqual(method, 'PUT') + self.assertEqual(path, '/cdevice/cpartition/a/c/o') + self.assertEqual(headers, HeaderKeyDict({ + 'user-agent': 'obj-server %s' % os.getpid(), + 'x-size': '0', + 'x-etag': 'd41d8cd98f00b204e9800998ecf8427e', + 'x-content-type': 'text/plain', + 'x-timestamp': '1', + POLICY_INDEX: '0', # default when not given + 'x-trans-id': '123', + 'referer': 'PUT http://localhost/sda1/0/a/c/o'})) + + def test_container_update_async(self): + req = Request.blank( + '/sda1/0/a/c/o', + environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': 1, + 'X-Trans-Id': '123', + 'X-Container-Host': 'chost:cport', + 'X-Container-Partition': 'cpartition', + 'X-Container-Device': 'cdevice', + 'Content-Type': 'text/plain'}, body='') + given_args = [] + + def fake_pickle_async_update(*args): + given_args[:] = args + self.object_controller._diskfile_mgr.pickle_async_update = \ + fake_pickle_async_update + with mocked_http_conn(500) as fake_conn: + resp = req.get_response(self.object_controller) + self.assertRaises(StopIteration, fake_conn.code_iter.next) + self.assertEqual(resp.status_int, 201) + self.assertEqual(len(given_args), 7) + (objdevice, account, container, obj, data, timestamp, + policy_index) = given_args + self.assertEqual(objdevice, 'sda1') + self.assertEqual(account, 'a') + self.assertEqual(container, 'c') + self.assertEqual(obj, 'o') + self.assertEqual(timestamp, '1') + self.assertEqual(policy_index, 0) + self.assertEqual(data, { + 'headers': HeaderKeyDict({ + 'X-Size': '0', + 'User-Agent': 'obj-server %s' % os.getpid(), + 'X-Content-Type': 'text/plain', + 'X-Timestamp': '1', + 'X-Trans-Id': '123', + 'Referer': 'PUT http://localhost/sda1/0/a/c/o', + 'X-Backend-Storage-Policy-Index': '0', + 'X-Etag': 'd41d8cd98f00b204e9800998ecf8427e'}), + 'obj': 'o', + 'account': 'a', + 'container': 'c', + 'op': 'PUT'}) def test_container_update_bad_args(self): given_args = [] @@ -2493,7 +2658,6 @@ class TestObjectController(unittest.TestCase): def fake_async_update(*args): given_args.extend(args) - self.object_controller.async_update = fake_async_update req = Request.blank( '/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, @@ -2502,12 +2666,22 @@ class TestObjectController(unittest.TestCase): 'X-Container-Host': 'chost,badhost', 'X-Container-Partition': 'cpartition', 'X-Container-Device': 'cdevice'}) - self.object_controller.container_update( - 'PUT', 'a', 'c', 'o', req, { - 'x-size': '0', 'x-etag': 'd41d8cd98f00b204e9800998ecf8427e', - 'x-content-type': 'text/plain', 'x-timestamp': '1'}, - 'sda1') + with mock.patch.object(self.object_controller, 'async_update', + fake_async_update): + self.object_controller.container_update( + 'PUT', 'a', 'c', 'o', req, { + 'x-size': '0', + 'x-etag': 'd41d8cd98f00b204e9800998ecf8427e', + 'x-content-type': 'text/plain', 'x-timestamp': '1'}, + 'sda1', 0) self.assertEqual(given_args, []) + errors = self.object_controller.logger.get_lines_for_level('error') + self.assertEqual(len(errors), 1) + msg = errors[0] + self.assertTrue('Container update failed' in msg) + self.assertTrue('different numbers of hosts and devices' in msg) + self.assertTrue('chost,badhost' in msg) + self.assertTrue('cdevice' in msg) def test_delete_at_update_on_put(self): # Test how delete_at_update works when issued a delete for old @@ -2517,23 +2691,25 @@ class TestObjectController(unittest.TestCase): def fake_async_update(*args): given_args.extend(args) - self.object_controller.async_update = fake_async_update req = Request.blank( '/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': 1, 'X-Trans-Id': '123'}) - self.object_controller.delete_at_update( - 'DELETE', 2, 'a', 'c', 'o', req, 'sda1') + with mock.patch.object(self.object_controller, 'async_update', + fake_async_update): + self.object_controller.delete_at_update( + 'DELETE', 2, 'a', 'c', 'o', req, 'sda1', 0) self.assertEquals( given_args, [ 'DELETE', '.expiring_objects', '0000000000', '0000000002-a/c/o', None, None, None, HeaderKeyDict({ + POLICY_INDEX: 0, 'x-timestamp': '1', 'x-trans-id': '123', 'referer': 'PUT http://localhost/v1/a/c/o'}), - 'sda1']) + 'sda1', 0]) def test_delete_at_negative(self): # Test how delete_at_update works when issued a delete for old @@ -2551,15 +2727,16 @@ class TestObjectController(unittest.TestCase): headers={'X-Timestamp': 1, 'X-Trans-Id': '1234'}) self.object_controller.delete_at_update( - 'DELETE', -2, 'a', 'c', 'o', req, 'sda1') + 'DELETE', -2, 'a', 'c', 'o', req, 'sda1', 0) self.assertEquals(given_args, [ 'DELETE', '.expiring_objects', '0000000000', '0000000000-a/c/o', None, None, None, HeaderKeyDict({ + POLICY_INDEX: 0, 'x-timestamp': '1', 'x-trans-id': '1234', 'referer': 'PUT http://localhost/v1/a/c/o'}), - 'sda1']) + 'sda1', 0]) def test_delete_at_cap(self): # Test how delete_at_update works when issued a delete for old @@ -2577,15 +2754,16 @@ class TestObjectController(unittest.TestCase): headers={'X-Timestamp': 1, 'X-Trans-Id': '1234'}) self.object_controller.delete_at_update( - 'DELETE', 12345678901, 'a', 'c', 'o', req, 'sda1') + 'DELETE', 12345678901, 'a', 'c', 'o', req, 'sda1', 0) self.assertEquals(given_args, [ 'DELETE', '.expiring_objects', '9999936000', '9999999999-a/c/o', None, None, None, HeaderKeyDict({ + POLICY_INDEX: 0, 'x-timestamp': '1', 'x-trans-id': '1234', 'referer': 'PUT http://localhost/v1/a/c/o'}), - 'sda1']) + 'sda1', 0]) def test_delete_at_update_put_with_info(self): # Keep next test, @@ -2607,19 +2785,20 @@ class TestObjectController(unittest.TestCase): 'X-Delete-At-Partition': '3', 'X-Delete-At-Device': 'sdc1'}) self.object_controller.delete_at_update('PUT', 2, 'a', 'c', 'o', - req, 'sda1') + req, 'sda1', 0) self.assertEquals( given_args, [ 'PUT', '.expiring_objects', '0000000000', '0000000002-a/c/o', '127.0.0.1:1234', '3', 'sdc1', HeaderKeyDict({ + POLICY_INDEX: 0, 'x-size': '0', 'x-etag': 'd41d8cd98f00b204e9800998ecf8427e', 'x-content-type': 'text/plain', 'x-timestamp': '1', 'x-trans-id': '1234', 'referer': 'PUT http://localhost/v1/a/c/o'}), - 'sda1']) + 'sda1', 0]) def test_delete_at_update_put_with_info_but_missing_container(self): # Same as previous test, test_delete_at_update_put_with_info, but just @@ -2640,7 +2819,7 @@ class TestObjectController(unittest.TestCase): 'X-Delete-At-Partition': '3', 'X-Delete-At-Device': 'sdc1'}) self.object_controller.delete_at_update('PUT', 2, 'a', 'c', 'o', - req, 'sda1') + req, 'sda1', 0) self.assertEquals( self.object_controller.logger.log_dict['warning'], [(('X-Delete-At-Container header must be specified for expiring ' @@ -2660,15 +2839,16 @@ class TestObjectController(unittest.TestCase): headers={'X-Timestamp': 1, 'X-Trans-Id': '1234'}) self.object_controller.delete_at_update('DELETE', 2, 'a', 'c', 'o', - req, 'sda1') + req, 'sda1', 0) self.assertEquals( given_args, [ 'DELETE', '.expiring_objects', '0000000000', '0000000002-a/c/o', None, None, None, HeaderKeyDict({ + POLICY_INDEX: 0, 'x-timestamp': '1', 'x-trans-id': '1234', 'referer': 'DELETE http://localhost/v1/a/c/o'}), - 'sda1']) + 'sda1', 0]) def test_delete_backend_replication(self): # If X-Backend-Replication: True delete_at_update should completely @@ -2686,7 +2866,7 @@ class TestObjectController(unittest.TestCase): 'X-Trans-Id': '1234', 'X-Backend-Replication': 'True'}) self.object_controller.delete_at_update( - 'DELETE', -2, 'a', 'c', 'o', req, 'sda1') + 'DELETE', -2, 'a', 'c', 'o', req, 'sda1', 0) self.assertEquals(given_args, []) def test_POST_calls_delete_at(self): @@ -2731,7 +2911,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals( given_args, [ 'PUT', int(delete_at_timestamp1), 'a', 'c', 'o', - given_args[5], 'sda1']) + given_args[5], 'sda1', 0]) while given_args: given_args.pop() @@ -2750,9 +2930,9 @@ class TestObjectController(unittest.TestCase): self.assertEquals( given_args, [ 'PUT', int(delete_at_timestamp2), 'a', 'c', 'o', - given_args[5], 'sda1', + given_args[5], 'sda1', 0, 'DELETE', int(delete_at_timestamp1), 'a', 'c', 'o', - given_args[5], 'sda1']) + given_args[5], 'sda1', 0]) def test_PUT_calls_delete_at(self): given_args = [] @@ -2787,7 +2967,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals( given_args, [ 'PUT', int(delete_at_timestamp1), 'a', 'c', 'o', - given_args[5], 'sda1']) + given_args[5], 'sda1', 0]) while given_args: given_args.pop() @@ -2808,9 +2988,9 @@ class TestObjectController(unittest.TestCase): self.assertEquals( given_args, [ 'PUT', int(delete_at_timestamp2), 'a', 'c', 'o', - given_args[5], 'sda1', + given_args[5], 'sda1', 0, 'DELETE', int(delete_at_timestamp1), 'a', 'c', 'o', - given_args[5], 'sda1']) + given_args[5], 'sda1', 0]) def test_GET_but_expired(self): test_time = time() + 10000 @@ -3197,7 +3377,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 201) self.assertEquals(given_args, [ 'PUT', int(delete_at_timestamp1), 'a', 'c', 'o', - given_args[5], 'sda1']) + given_args[5], 'sda1', 0]) while given_args: given_args.pop() @@ -3213,7 +3393,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 204) self.assertEquals(given_args, [ 'DELETE', int(delete_at_timestamp1), 'a', 'c', 'o', - given_args[5], 'sda1']) + given_args[5], 'sda1', 0]) def test_PUT_delete_at_in_past(self): req = Request.blank( diff --git a/test/unit/obj/test_updater.py b/test/unit/obj/test_updater.py index 2aeff19f0f..12e3cf803f 100644 --- a/test/unit/obj/test_updater.py +++ b/test/unit/obj/test_updater.py @@ -17,6 +17,8 @@ import cPickle as pickle import mock import os import unittest +import random +import itertools from contextlib import closing from gzip import GzipFile from tempfile import mkdtemp @@ -27,14 +29,22 @@ from distutils.dir_util import mkpath from eventlet import spawn, Timeout, listen from swift.obj import updater as object_updater -from swift.obj.diskfile import ASYNCDIR_BASE +from swift.obj.diskfile import (ASYNCDIR_BASE, get_async_dir, DiskFileManager, + get_tmp_dir) from swift.common.ring import RingData from swift.common import utils from swift.common.utils import hash_path, normalize_timestamp, mkdirs, \ write_pickle -from test.unit import FakeLogger +from swift.common import swob +from test.unit import debug_logger, patch_policies, mocked_http_conn +from swift.common.storage_policy import StoragePolicy, POLICIES, POLICY_INDEX +_mocked_policies = [StoragePolicy(0, 'zero', False), + StoragePolicy(1, 'one', True)] + + +@patch_policies(_mocked_policies) class TestObjectUpdater(unittest.TestCase): def setUp(self): @@ -58,7 +68,9 @@ class TestObjectUpdater(unittest.TestCase): os.mkdir(self.devices_dir) self.sda1 = os.path.join(self.devices_dir, 'sda1') os.mkdir(self.sda1) - os.mkdir(os.path.join(self.sda1, 'tmp')) + for policy in POLICIES: + os.mkdir(os.path.join(self.sda1, get_tmp_dir(int(policy)))) + self.logger = debug_logger() def tearDown(self): rmtree(self.testdir, ignore_errors=1) @@ -80,48 +92,84 @@ class TestObjectUpdater(unittest.TestCase): self.assert_(cu.get_container_ring() is not None) def test_object_sweep(self): - prefix_dir = os.path.join(self.sda1, ASYNCDIR_BASE, 'abc') - mkpath(prefix_dir) + def check_with_idx(index, warn, should_skip): + if int(index) > 0: + asyncdir = os.path.join(self.sda1, + ASYNCDIR_BASE + "-" + index) + else: + asyncdir = os.path.join(self.sda1, ASYNCDIR_BASE) - # A non-directory where directory is expected should just be skipped... - not_a_dir_path = os.path.join(self.sda1, ASYNCDIR_BASE, 'not_a_dir') - with open(not_a_dir_path, 'w'): - pass + prefix_dir = os.path.join(asyncdir, 'abc') + mkpath(prefix_dir) - objects = { - 'a': [1089.3, 18.37, 12.83, 1.3], - 'b': [49.4, 49.3, 49.2, 49.1], - 'c': [109984.123], - } + # A non-directory where directory is expected should just be + # skipped, but should not stop processing of subsequent + # directories. + not_dirs = ( + os.path.join(self.sda1, 'not_a_dir'), + os.path.join(self.sda1, + ASYNCDIR_BASE + '-' + 'twentington'), + os.path.join(self.sda1, + ASYNCDIR_BASE + '-' + str(int(index) + 100))) - expected = set() - for o, timestamps in objects.iteritems(): - ohash = hash_path('account', 'container', o) - for t in timestamps: - o_path = os.path.join(prefix_dir, ohash + '-' + - normalize_timestamp(t)) - if t == timestamps[0]: - expected.add(o_path) - write_pickle({}, o_path) + for not_dir in not_dirs: + with open(not_dir, 'w'): + pass - seen = set() + objects = { + 'a': [1089.3, 18.37, 12.83, 1.3], + 'b': [49.4, 49.3, 49.2, 49.1], + 'c': [109984.123], + } - class MockObjectUpdater(object_updater.ObjectUpdater): - def process_object_update(self, update_path, device): - seen.add(update_path) - os.unlink(update_path) + expected = set() + for o, timestamps in objects.iteritems(): + ohash = hash_path('account', 'container', o) + for t in timestamps: + o_path = os.path.join(prefix_dir, ohash + '-' + + normalize_timestamp(t)) + if t == timestamps[0]: + expected.add((o_path, int(index))) + write_pickle({}, o_path) - cu = MockObjectUpdater({ - 'devices': self.devices_dir, - 'mount_check': 'false', - 'swift_dir': self.testdir, - 'interval': '1', - 'concurrency': '1', - 'node_timeout': '5'}) - cu.object_sweep(self.sda1) - self.assert_(not os.path.exists(prefix_dir)) - self.assert_(os.path.exists(not_a_dir_path)) - self.assertEqual(expected, seen) + seen = set() + + class MockObjectUpdater(object_updater.ObjectUpdater): + def process_object_update(self, update_path, device, idx): + seen.add((update_path, idx)) + os.unlink(update_path) + + cu = MockObjectUpdater({ + 'devices': self.devices_dir, + 'mount_check': 'false', + 'swift_dir': self.testdir, + 'interval': '1', + 'concurrency': '1', + 'node_timeout': '5'}) + cu.logger = mock_logger = mock.MagicMock() + cu.object_sweep(self.sda1) + self.assertEquals(mock_logger.warn.call_count, warn) + self.assert_(os.path.exists(os.path.join(self.sda1, 'not_a_dir'))) + if should_skip: + # if we were supposed to skip over the dir, we didn't process + # anything at all + self.assertTrue(os.path.exists(prefix_dir)) + self.assertEqual(set(), seen) + else: + self.assert_(not os.path.exists(prefix_dir)) + self.assertEqual(expected, seen) + + # test cleanup: the tempdir gets cleaned up between runs, but this + # way we can be called multiple times in a single test method + for not_dir in not_dirs: + os.unlink(not_dir) + + # first check with valid policies + for pol in POLICIES: + check_with_idx(str(pol.idx), 0, should_skip=False) + # now check with a bogus async dir policy and make sure we get + # a warning indicating that the '99' policy isn't valid + check_with_idx('99', 1, should_skip=True) @mock.patch.object(object_updater, 'ismount') def test_run_once_with_disk_unmounted(self, mock_ismount): @@ -134,7 +182,7 @@ class TestObjectUpdater(unittest.TestCase): 'concurrency': '1', 'node_timeout': '15'}) cu.run_once() - async_dir = os.path.join(self.sda1, ASYNCDIR_BASE) + async_dir = os.path.join(self.sda1, get_async_dir(0)) os.mkdir(async_dir) cu.run_once() self.assert_(os.path.exists(async_dir)) @@ -147,13 +195,13 @@ class TestObjectUpdater(unittest.TestCase): 'swift_dir': self.testdir, 'interval': '1', 'concurrency': '1', - 'node_timeout': '15'}) - odd_dir = os.path.join(async_dir, 'not really supposed to be here') + 'node_timeout': '15'}, logger=self.logger) + odd_dir = os.path.join(async_dir, 'not really supposed ' + 'to be here') os.mkdir(odd_dir) - cu.logger = FakeLogger() cu.run_once() self.assert_(os.path.exists(async_dir)) - self.assert_(os.path.exists(odd_dir)) # skipped because not mounted! + self.assert_(os.path.exists(odd_dir)) # skipped - not mounted! # mount_check == True means ismount was checked self.assertEqual([ mock.call(self.sda1), @@ -169,9 +217,9 @@ class TestObjectUpdater(unittest.TestCase): 'swift_dir': self.testdir, 'interval': '1', 'concurrency': '1', - 'node_timeout': '15'}) + 'node_timeout': '15'}, logger=self.logger) cu.run_once() - async_dir = os.path.join(self.sda1, ASYNCDIR_BASE) + async_dir = os.path.join(self.sda1, get_async_dir(0)) os.mkdir(async_dir) cu.run_once() self.assert_(os.path.exists(async_dir)) @@ -184,8 +232,9 @@ class TestObjectUpdater(unittest.TestCase): 'swift_dir': self.testdir, 'interval': '1', 'concurrency': '1', - 'node_timeout': '15'}) - odd_dir = os.path.join(async_dir, 'not really supposed to be here') + 'node_timeout': '15'}, logger=self.logger) + odd_dir = os.path.join(async_dir, 'not really supposed ' + 'to be here') os.mkdir(odd_dir) cu.run_once() self.assert_(os.path.exists(async_dir)) @@ -206,12 +255,12 @@ class TestObjectUpdater(unittest.TestCase): '%s-%s' % (ohash, normalize_timestamp(time()))) for path in (op_path, older_op_path): with open(path, 'wb') as async_pending: - pickle.dump({'op': 'PUT', 'account': 'a', 'container': 'c', + pickle.dump({'op': 'PUT', 'account': 'a', + 'container': 'c', 'obj': 'o', 'headers': { 'X-Container-Timestamp': normalize_timestamp(0)}}, async_pending) - cu.logger = FakeLogger() cu.run_once() self.assert_(not os.path.exists(older_op_path)) self.assert_(os.path.exists(op_path)) @@ -232,13 +281,14 @@ class TestObjectUpdater(unittest.TestCase): out.flush() self.assertEquals(inc.readline(), 'PUT /sda1/0/a/c/o HTTP/1.1\r\n') - headers = {} + headers = swob.HeaderKeyDict() line = inc.readline() while line and line != '\r\n': - headers[line.split(':')[0].lower()] = \ + headers[line.split(':')[0]] = \ line.split(':')[1].strip() line = inc.readline() - self.assert_('x-container-timestamp' in headers) + self.assertTrue('x-container-timestamp' in headers) + self.assertTrue(POLICY_INDEX in headers) except BaseException as err: return err return None @@ -265,7 +315,7 @@ class TestObjectUpdater(unittest.TestCase): if dev is not None: dev['port'] = bindsock.getsockname()[1] - cu.logger = FakeLogger() + cu.logger._clear() cu.run_once() err = event.wait() if err: @@ -277,7 +327,7 @@ class TestObjectUpdater(unittest.TestCase): pickle.load(open(op_path)).get('successes')) event = spawn(accept, [404, 500]) - cu.logger = FakeLogger() + cu.logger._clear() cu.run_once() err = event.wait() if err: @@ -289,7 +339,7 @@ class TestObjectUpdater(unittest.TestCase): pickle.load(open(op_path)).get('successes')) event = spawn(accept, [201]) - cu.logger = FakeLogger() + cu.logger._clear() cu.run_once() err = event.wait() if err: @@ -298,6 +348,106 @@ class TestObjectUpdater(unittest.TestCase): self.assertEqual(cu.logger.get_increment_counts(), {'unlinks': 1, 'successes': 1}) + def test_obj_put_legacy_updates(self): + ts = (normalize_timestamp(t) for t in + itertools.count(int(time()))) + policy = POLICIES.get_by_index(0) + # setup updater + conf = { + 'devices': self.devices_dir, + 'mount_check': 'false', + 'swift_dir': self.testdir, + } + async_dir = os.path.join(self.sda1, get_async_dir(policy.idx)) + os.mkdir(async_dir) + + account, container, obj = 'a', 'c', 'o' + # write an async + for op in ('PUT', 'DELETE'): + self.logger._clear() + daemon = object_updater.ObjectUpdater(conf, logger=self.logger) + dfmanager = DiskFileManager(conf, daemon.logger) + # don't include storage-policy-index in headers_out pickle + headers_out = swob.HeaderKeyDict({ + 'x-size': 0, + 'x-content-type': 'text/plain', + 'x-etag': 'd41d8cd98f00b204e9800998ecf8427e', + 'x-timestamp': ts.next(), + }) + data = {'op': op, 'account': account, 'container': container, + 'obj': obj, 'headers': headers_out} + dfmanager.pickle_async_update(self.sda1, account, container, obj, + data, ts.next(), policy.idx) + + request_log = [] + + def capture(*args, **kwargs): + request_log.append((args, kwargs)) + + # run once + fake_status_codes = [200, 200, 200] + with mocked_http_conn(*fake_status_codes, give_connect=capture): + daemon.run_once() + self.assertEqual(len(fake_status_codes), len(request_log)) + for request_args, request_kwargs in request_log: + ip, part, method, path, headers, qs, ssl = request_args + self.assertEqual(method, op) + self.assertEqual(headers[POLICY_INDEX], str(policy.idx)) + self.assertEqual(daemon.logger.get_increment_counts(), + {'successes': 1, 'unlinks': 1, + 'async_pendings': 1}) + + def test_obj_put_async_updates(self): + ts = (normalize_timestamp(t) for t in + itertools.count(int(time()))) + policy = random.choice(list(POLICIES)) + # setup updater + conf = { + 'devices': self.devices_dir, + 'mount_check': 'false', + 'swift_dir': self.testdir, + } + daemon = object_updater.ObjectUpdater(conf, logger=self.logger) + async_dir = os.path.join(self.sda1, get_async_dir(policy.idx)) + os.mkdir(async_dir) + + # write an async + dfmanager = DiskFileManager(conf, daemon.logger) + account, container, obj = 'a', 'c', 'o' + op = 'PUT' + headers_out = swob.HeaderKeyDict({ + 'x-size': 0, + 'x-content-type': 'text/plain', + 'x-etag': 'd41d8cd98f00b204e9800998ecf8427e', + 'x-timestamp': ts.next(), + POLICY_INDEX: policy.idx, + }) + data = {'op': op, 'account': account, 'container': container, + 'obj': obj, 'headers': headers_out} + dfmanager.pickle_async_update(self.sda1, account, container, obj, + data, ts.next(), policy.idx) + + request_log = [] + + def capture(*args, **kwargs): + request_log.append((args, kwargs)) + + # run once + fake_status_codes = [ + 200, # object update success + 200, # object update success + 200, # object update conflict + ] + with mocked_http_conn(*fake_status_codes, give_connect=capture): + daemon.run_once() + self.assertEqual(len(fake_status_codes), len(request_log)) + for request_args, request_kwargs in request_log: + ip, part, method, path, headers, qs, ssl = request_args + self.assertEqual(method, 'PUT') + self.assertEqual(headers[POLICY_INDEX], str(policy.idx)) + self.assertEqual(daemon.logger.get_increment_counts(), + {'successes': 1, 'unlinks': 1, 'async_pendings': 1}) + if __name__ == '__main__': unittest.main()