From 3824ff3df71975194fa1b050255c0da8c1acc598 Mon Sep 17 00:00:00 2001 From: Clay Gerrard Date: Tue, 27 May 2014 01:17:13 -0700 Subject: [PATCH] Add Storage Policy support to Object Server Objects now have a storage policy index associated with them as well; this is determined by their filesystem path. Like before, objects in policy 0 are in /srv/node/$disk/objects; this provides compatibility on upgrade. (Recall that policy 0 is given to all existing data when a cluster is upgraded.) Objects in policy 1 are in /srv/node/$disk/objects-1, objects in policy 2 are in /srv/node/$disk/objects-2, and so on. * 'quarantined' dir already created 'objects' subdir so now there will also be objects-N created at the same level This commit does not address replicators, auditors, or updaters except where method signatures changed. They'll still work if your cluster has only one storage policy, though. DocImpact Implements: blueprint storage-policies Change-Id: I459f3ed97df516cb0c9294477c28729c30f48e09 --- swift/common/request_helpers.py | 25 +- swift/obj/diskfile.py | 74 +- swift/obj/server.py | 49 +- swift/obj/updater.py | 4 +- swift/proxy/controllers/base.py | 1 + swift/proxy/controllers/obj.py | 45 +- swift/proxy/server.py | 19 +- test/unit/common/test_wsgi.py | 11 +- test/unit/obj/test_auditor.py | 4 +- test/unit/obj/test_diskfile.py | 121 +++- test/unit/obj/test_server.py | 139 ++-- test/unit/obj/test_ssync_receiver.py | 6 +- test/unit/obj/test_updater.py | 10 +- test/unit/proxy/controllers/test_account.py | 11 +- test/unit/proxy/controllers/test_base.py | 7 +- test/unit/proxy/controllers/test_container.py | 3 +- test/unit/proxy/controllers/test_obj.py | 59 +- test/unit/proxy/test_server.py | 657 +++++++++++++++--- 18 files changed, 970 insertions(+), 275 deletions(-) diff --git a/swift/common/request_helpers.py b/swift/common/request_helpers.py index 98a239e69a..3e3db57080 100644 --- a/swift/common/request_helpers.py +++ b/swift/common/request_helpers.py @@ -30,6 +30,7 @@ from swift.common.exceptions import ListingIterError, SegmentError from swift.common.http import is_success, HTTP_SERVICE_UNAVAILABLE from swift.common.swob import HTTPBadRequest, HTTPNotAcceptable from swift.common.utils import split_path, validate_device_partition +from swift.common.storage_policy import POLICY_INDEX from swift.common.wsgi import make_subrequest @@ -78,12 +79,34 @@ def get_listing_content_type(req): return out_content_type +def get_name_and_placement(request, minsegs=1, maxsegs=None, + rest_with_last=False): + """ + Utility function to split and validate the request path and + storage_policy_index. The storage_policy_index is extracted from + the headers of the request and converted to an integer, and then the + args are passed through to :meth:`split_and_validate_path`. + + :returns: a list, result of :meth:`split_and_validate_path` with + storage_policy_index appended on the end + :raises: HTTPBadRequest + """ + policy_idx = request.headers.get(POLICY_INDEX, '0') + policy_idx = int(policy_idx) + results = split_and_validate_path(request, minsegs=minsegs, + maxsegs=maxsegs, + rest_with_last=rest_with_last) + results.append(policy_idx) + return results + + def split_and_validate_path(request, minsegs=1, maxsegs=None, rest_with_last=False): """ Utility function to split and validate the request path. - :returns: result of split_path if everything's okay + :returns: result of :meth:`~swift.common.utils.split_path` if + everything's okay :raises: HTTPBadRequest if something's not okay """ try: diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index 5e1e74fb2a..1ea3fe35c8 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -58,7 +58,8 @@ from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \ DiskFileDeleted, DiskFileError, DiskFileNotOpen, PathNotDir, \ ReplicationLockTimeout, DiskFileExpired from swift.common.swob import multi_range_iterator - +from swift.common.storage_policy import get_policy_string, POLICIES +from functools import partial PICKLE_PROTOCOL = 2 ONE_WEEK = 604800 @@ -67,8 +68,12 @@ METADATA_KEY = 'user.swift.metadata' # These are system-set metadata keys that cannot be changed with a POST. # They should be lowercase. DATAFILE_SYSTEM_META = set('content-length content-type deleted etag'.split()) -DATADIR = 'objects' -ASYNCDIR = 'async_pending' +DATADIR_BASE = 'objects' +ASYNCDIR_BASE = 'async_pending' +TMP_BASE = 'tmp' +get_data_dir = partial(get_policy_string, DATADIR_BASE) +get_async_dir = partial(get_policy_string, ASYNCDIR_BASE) +get_tmp_dir = partial(get_policy_string, TMP_BASE) def read_metadata(fd): @@ -105,6 +110,37 @@ def write_metadata(fd, metadata): key += 1 +def extract_policy_index(obj_path): + """ + Extracts the policy index for an object (based on the name of the objects + directory) given the device-relative path to the object. Returns 0 in + the event that the path is malformed in some way. + + The device-relative path is everything after the mount point; for example: + + /srv/node/d42/objects-5/179/ + 485dc017205a81df3af616d917c90179/1401811134.873649.data + + would have device-relative path: + + objects-5/179/485dc017205a81df3af616d917c90179/1401811134.873649.data + + :param obj_path: device-relative path of an object + :returns: storage policy index + """ + policy_idx = 0 + try: + obj_portion = obj_path[obj_path.index(DATADIR_BASE):] + obj_dirname = obj_portion[:obj_portion.index('/')] + except Exception: + return policy_idx + if '-' in obj_dirname: + base, policy_idx = obj_dirname.split('-', 1) + if POLICIES.get_by_index(policy_idx) is None: + policy_idx = 0 + return int(policy_idx) + + def quarantine_renamer(device_path, corrupted_file_path): """ In the case that a file is corrupted, move it to a quarantined @@ -118,7 +154,9 @@ def quarantine_renamer(device_path, corrupted_file_path): exceptions from rename """ from_dir = dirname(corrupted_file_path) - to_dir = join(device_path, 'quarantined', 'objects', basename(from_dir)) + to_dir = join(device_path, 'quarantined', + get_data_dir(extract_policy_index(corrupted_file_path)), + basename(from_dir)) invalidate_hash(dirname(from_dir)) try: renamer(from_dir, to_dir) @@ -385,7 +423,7 @@ def object_audit_location_generator(devices, mount_check=True, logger=None, logger.debug( _('Skipping %s as it is not mounted'), device) continue - datadir_path = os.path.join(devices, device, DATADIR) + datadir_path = os.path.join(devices, device, DATADIR_BASE) partitions = listdir(datadir_path) for partition in partitions: part_path = os.path.join(datadir_path, partition) @@ -495,7 +533,7 @@ class DiskFileManager(object): def pickle_async_update(self, device, account, container, obj, data, timestamp): device_path = self.construct_dev_path(device) - async_dir = os.path.join(device_path, ASYNCDIR) + async_dir = os.path.join(device_path, ASYNCDIR_BASE) ohash = hash_path(account, container, obj) self.threadpools[device].run_in_thread( write_pickle, @@ -506,12 +544,13 @@ class DiskFileManager(object): self.logger.increment('async_pendings') def get_diskfile(self, device, partition, account, container, obj, - **kwargs): + policy_idx=0, **kwargs): dev_path = self.get_dev_path(device) if not dev_path: raise DiskFileDeviceUnavailable() return DiskFile(self, dev_path, self.threadpools[device], - partition, account, container, obj, **kwargs) + partition, account, container, obj, + policy_idx=policy_idx, **kwargs) def object_audit_location_generator(self, device_dirs=None): return object_audit_location_generator(self.devices, self.mount_check, @@ -537,7 +576,7 @@ class DiskFileManager(object): if not dev_path: raise DiskFileDeviceUnavailable() object_path = os.path.join( - dev_path, DATADIR, partition, object_hash[-3:], object_hash) + dev_path, DATADIR_BASE, partition, object_hash[-3:], object_hash) try: filenames = hash_cleanup_listdir(object_path, self.reclaim_age) except OSError as err: @@ -569,7 +608,7 @@ class DiskFileManager(object): dev_path = self.get_dev_path(device) if not dev_path: raise DiskFileDeviceUnavailable() - partition_path = os.path.join(dev_path, DATADIR, partition) + partition_path = os.path.join(dev_path, DATADIR_BASE, partition) if not os.path.exists(partition_path): mkdirs(partition_path) suffixes = suffix.split('-') if suffix else [] @@ -595,7 +634,7 @@ class DiskFileManager(object): dev_path = self.get_dev_path(device) if not dev_path: raise DiskFileDeviceUnavailable() - partition_path = os.path.join(dev_path, DATADIR, partition) + partition_path = os.path.join(dev_path, DATADIR_BASE, partition) for suffix in self._listdir(partition_path): if len(suffix) != 3: continue @@ -620,7 +659,7 @@ class DiskFileManager(object): if suffixes is None: suffixes = self.yield_suffixes(device, partition) else: - partition_path = os.path.join(dev_path, DATADIR, partition) + partition_path = os.path.join(dev_path, DATADIR_BASE, partition) suffixes = ( (os.path.join(partition_path, suffix), suffix) for suffix in suffixes) @@ -937,10 +976,13 @@ class DiskFile(object): :param account: account name for the object :param container: container name for the object :param obj: object name for the object + :param _datadir: override the full datadir otherwise constructed here + :param policy_idx: used to get the data dir when constructing it here """ def __init__(self, mgr, device_path, threadpool, partition, - account=None, container=None, obj=None, _datadir=None): + account=None, container=None, obj=None, _datadir=None, + policy_idx=0): self._mgr = mgr self._device_path = device_path self._threadpool = threadpool or ThreadPool(nthreads=0) @@ -954,7 +996,8 @@ class DiskFile(object): self._obj = obj name_hash = hash_path(account, container, obj) self._datadir = join( - device_path, storage_directory(DATADIR, partition, name_hash)) + device_path, storage_directory(get_data_dir(policy_idx), + partition, name_hash)) else: # gets populated when we read the metadata self._name = None @@ -973,7 +1016,8 @@ class DiskFile(object): else: name_hash = hash_path(account, container, obj) self._datadir = join( - device_path, storage_directory(DATADIR, partition, name_hash)) + device_path, storage_directory(get_data_dir(policy_idx), + partition, name_hash)) @property def account(self): diff --git a/swift/obj/server.py b/swift/obj/server.py index 49d33caeb5..6fa70a40ae 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -38,7 +38,8 @@ from swift.common.exceptions import ConnectionTimeout, DiskFileQuarantined, \ DiskFileDeviceUnavailable, DiskFileExpired, ChunkReadTimeout from swift.obj import ssync_receiver from swift.common.http import is_success -from swift.common.request_helpers import split_and_validate_path, is_user_meta +from swift.common.request_helpers import get_name_and_placement, \ + is_user_meta, split_and_validate_path from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \ HTTPInternalServerError, HTTPNoContent, HTTPNotFound, \ HTTPPreconditionFailed, HTTPRequestTimeout, HTTPUnprocessableEntity, \ @@ -139,7 +140,7 @@ class ObjectController(object): conf.get('replication_failure_ratio') or 1.0) def get_diskfile(self, device, partition, account, container, obj, - **kwargs): + policy_idx, **kwargs): """ Utility method for instantiating a DiskFile object supporting a given REST API. @@ -149,7 +150,8 @@ class ObjectController(object): behavior. """ return self._diskfile_mgr.get_diskfile( - device, partition, account, container, obj, **kwargs) + device, partition, account, container, obj, + policy_idx=policy_idx, **kwargs) def async_update(self, op, account, container, obj, host, partition, contdevice, headers_out, objdevice): @@ -315,9 +317,8 @@ class ObjectController(object): @timing_stats() def POST(self, request): """Handle HTTP POST requests for the Swift Object Server.""" - device, partition, account, container, obj = \ - split_and_validate_path(request, 5, 5, True) - + device, partition, account, container, obj, policy_idx = \ + get_name_and_placement(request, 5, 5, True) if 'x-timestamp' not in request.headers or \ not check_float(request.headers['x-timestamp']): return HTTPBadRequest(body='Missing timestamp', request=request, @@ -328,7 +329,8 @@ class ObjectController(object): content_type='text/plain') try: disk_file = self.get_diskfile( - device, partition, account, container, obj) + device, partition, account, container, obj, + policy_idx=policy_idx) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) try: @@ -360,9 +362,8 @@ class ObjectController(object): @timing_stats() def PUT(self, request): """Handle HTTP PUT requests for the Swift Object Server.""" - device, partition, account, container, obj = \ - split_and_validate_path(request, 5, 5, True) - + device, partition, account, container, obj, policy_idx = \ + get_name_and_placement(request, 5, 5, True) if 'x-timestamp' not in request.headers or \ not check_float(request.headers['x-timestamp']): return HTTPBadRequest(body='Missing timestamp', request=request, @@ -381,7 +382,8 @@ class ObjectController(object): content_type='text/plain') try: disk_file = self.get_diskfile( - device, partition, account, container, obj) + device, partition, account, container, obj, + policy_idx=policy_idx) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) try: @@ -455,8 +457,8 @@ class ObjectController(object): if orig_delete_at != new_delete_at: if new_delete_at: self.delete_at_update( - 'PUT', new_delete_at, account, container, obj, - request, device) + 'PUT', new_delete_at, account, container, obj, request, + device) if orig_delete_at: self.delete_at_update( 'DELETE', orig_delete_at, account, container, obj, @@ -475,14 +477,15 @@ class ObjectController(object): @timing_stats() def GET(self, request): """Handle HTTP GET requests for the Swift Object Server.""" - device, partition, account, container, obj = \ - split_and_validate_path(request, 5, 5, True) + device, partition, account, container, obj, policy_idx = \ + get_name_and_placement(request, 5, 5, True) keep_cache = self.keep_cache_private or ( 'X-Auth-Token' not in request.headers and 'X-Storage-Token' not in request.headers) try: disk_file = self.get_diskfile( - device, partition, account, container, obj) + device, partition, account, container, obj, + policy_idx=policy_idx) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) try: @@ -521,11 +524,12 @@ class ObjectController(object): @timing_stats(sample_rate=0.8) def HEAD(self, request): """Handle HTTP HEAD requests for the Swift Object Server.""" - device, partition, account, container, obj = \ - split_and_validate_path(request, 5, 5, True) + device, partition, account, container, obj, policy_idx = \ + get_name_and_placement(request, 5, 5, True) try: disk_file = self.get_diskfile( - device, partition, account, container, obj) + device, partition, account, container, obj, + policy_idx=policy_idx) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) try: @@ -555,15 +559,16 @@ class ObjectController(object): @timing_stats() def DELETE(self, request): """Handle HTTP DELETE requests for the Swift Object Server.""" - device, partition, account, container, obj = \ - split_and_validate_path(request, 5, 5, True) + device, partition, account, container, obj, policy_idx = \ + get_name_and_placement(request, 5, 5, True) if 'x-timestamp' not in request.headers or \ not check_float(request.headers['x-timestamp']): return HTTPBadRequest(body='Missing timestamp', request=request, content_type='text/plain') try: disk_file = self.get_diskfile( - device, partition, account, container, obj) + device, partition, account, container, obj, + policy_idx=policy_idx) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) try: diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 08895390d6..c3ba44635a 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -29,7 +29,7 @@ from swift.common.ring import Ring from swift.common.utils import get_logger, renamer, write_pickle, \ dump_recon_cache, config_true_value, ismount from swift.common.daemon import Daemon -from swift.obj.diskfile import ASYNCDIR +from swift.obj.diskfile import ASYNCDIR_BASE from swift.common.http import is_success, HTTP_NOT_FOUND, \ HTTP_INTERNAL_SERVER_ERROR @@ -137,7 +137,7 @@ class ObjectUpdater(Daemon): :param device: path to device """ start_time = time.time() - async_pending = os.path.join(device, ASYNCDIR) + async_pending = os.path.join(device, ASYNCDIR_BASE) if not os.path.isdir(async_pending): return for prefix in os.listdir(async_pending): diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index a45a3a2f95..af8f8737ac 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -162,6 +162,7 @@ def headers_to_container_info(headers, status_int=HTTP_OK): 'object_count': headers.get('x-container-object-count'), 'bytes': headers.get('x-container-bytes-used'), 'versions': headers.get('x-versions-location'), + 'storage_policy': headers.get(POLICY_INDEX.lower(), '0'), 'cors': { 'allow_origin': meta.get('access-control-allow-origin'), 'expose_headers': meta.get('access-control-expose-headers'), diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index af0efb6d34..354b5662ba 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -56,6 +56,7 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \ HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \ HTTPServerError, HTTPServiceUnavailable, Request, \ HTTPClientDisconnect, HTTPNotImplemented +from swift.common.storage_policy import POLICY_INDEX from swift.common.request_helpers import is_user_meta @@ -195,15 +196,18 @@ class ObjectController(Controller): container_info = self.container_info( self.account_name, self.container_name, req) req.acl = container_info['read_acl'] + policy_idx = container_info['storage_policy'] + obj_ring = self.app.get_object_ring(policy_idx) + # pass the policy index to storage nodes via req header + req.headers[POLICY_INDEX] = policy_idx if 'swift.authorize' in req.environ: aresp = req.environ['swift.authorize'](req) if aresp: return aresp - - partition = self.app.object_ring.get_part( + partition = obj_ring.get_part( self.account_name, self.container_name, self.object_name) resp = self.GETorHEAD_base( - req, _('Object'), self.app.object_ring, partition, + req, _('Object'), obj_ring, partition, req.swift_entity_path) if ';' in resp.headers.get('content-type', ''): @@ -297,7 +301,11 @@ class ObjectController(Controller): self.app.expiring_objects_account, delete_at_container) else: delete_at_container = delete_at_part = delete_at_nodes = None - partition, nodes = self.app.object_ring.get_nodes( + policy_idx = container_info['storage_policy'] + obj_ring = self.app.get_object_ring(policy_idx) + # pass the policy index to storage nodes via req header + req.headers[POLICY_INDEX] = policy_idx + partition, nodes = obj_ring.get_nodes( self.account_name, self.container_name, self.object_name) req.headers['X-Timestamp'] = normalize_timestamp(time.time()) @@ -305,7 +313,7 @@ class ObjectController(Controller): req, len(nodes), container_partition, containers, delete_at_container, delete_at_part, delete_at_nodes) - resp = self.make_requests(req, self.app.object_ring, partition, + resp = self.make_requests(req, obj_ring, partition, 'POST', req.swift_entity_path, headers) return resp @@ -448,6 +456,10 @@ class ObjectController(Controller): body='If-None-Match only supports *') container_info = self.container_info( self.account_name, self.container_name, req) + policy_idx = container_info['storage_policy'] + obj_ring = self.app.get_object_ring(policy_idx) + # pass the policy index to storage nodes via req header + req.headers[POLICY_INDEX] = policy_idx container_partition = container_info['partition'] containers = container_info['nodes'] req.acl = container_info['write_acl'] @@ -478,16 +490,19 @@ class ObjectController(Controller): body='Non-integer X-Delete-After') req.headers['x-delete-at'] = normalize_delete_at_timestamp( time.time() + x_delete_after) - partition, nodes = self.app.object_ring.get_nodes( + partition, nodes = obj_ring.get_nodes( self.account_name, self.container_name, self.object_name) # do a HEAD request for container sync and checking object versions if 'x-timestamp' in req.headers or \ (object_versions and not req.environ.get('swift_versioned_copy')): - hreq = Request.blank(req.path_info, headers={'X-Newest': 'True'}, + # make sure proxy-server uses the right policy index + _headers = {POLICY_INDEX: req.headers[POLICY_INDEX], + 'X-Newest': 'True'} + hreq = Request.blank(req.path_info, headers=_headers, environ={'REQUEST_METHOD': 'HEAD'}) hresp = self.GETorHEAD_base( - hreq, _('Object'), self.app.object_ring, partition, + hreq, _('Object'), obj_ring, partition, hreq.swift_entity_path) # Used by container sync feature if 'x-timestamp' in req.headers: @@ -642,7 +657,7 @@ class ObjectController(Controller): delete_at_container = delete_at_part = delete_at_nodes = None node_iter = GreenthreadSafeIterator( - self.iter_nodes_local_first(self.app.object_ring, partition)) + self.iter_nodes_local_first(obj_ring, partition)) pile = GreenPile(len(nodes)) te = req.headers.get('transfer-encoding', '') chunked = ('chunked' in te) @@ -756,6 +771,10 @@ class ObjectController(Controller): """HTTP DELETE request handler.""" container_info = self.container_info( self.account_name, self.container_name, req) + policy_idx = container_info['storage_policy'] + obj_ring = self.app.get_object_ring(policy_idx) + # pass the policy index to storage nodes via req header + req.headers[POLICY_INDEX] = policy_idx container_partition = container_info['partition'] containers = container_info['nodes'] req.acl = container_info['write_acl'] @@ -809,6 +828,10 @@ class ObjectController(Controller): new_del_req = Request.blank(copy_path, environ=req.environ) container_info = self.container_info( self.account_name, self.container_name, req) + policy_idx = container_info['storage_policy'] + obj_ring = self.app.get_object_ring(policy_idx) + # pass the policy index to storage nodes via req header + new_del_req.headers[POLICY_INDEX] = policy_idx container_partition = container_info['partition'] containers = container_info['nodes'] new_del_req.acl = container_info['write_acl'] @@ -823,7 +846,7 @@ class ObjectController(Controller): return aresp if not containers: return HTTPNotFound(request=req) - partition, nodes = self.app.object_ring.get_nodes( + partition, nodes = obj_ring.get_nodes( self.account_name, self.container_name, self.object_name) # Used by container sync feature if 'x-timestamp' in req.headers: @@ -840,7 +863,7 @@ class ObjectController(Controller): headers = self._backend_requests( req, len(nodes), container_partition, containers) - resp = self.make_requests(req, self.app.object_ring, + resp = self.make_requests(req, obj_ring, partition, 'DELETE', req.swift_entity_path, headers) return resp diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 5107a06ac2..ea6207be7f 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -25,13 +25,13 @@ from eventlet import Timeout from swift import __canonical_version__ as swift_version from swift.common import constraints +from swift.common.storage_policy import POLICIES from swift.common.ring import Ring from swift.common.utils import cache_from_env, get_logger, \ get_remote_client, split_path, config_true_value, generate_trans_id, \ affinity_key_function, affinity_locality_predicate, list_from_csv, \ register_swift_info from swift.common.constraints import check_utf8 -from swift.common.storage_policy import POLICIES from swift.proxy.controllers import AccountController, ObjectController, \ ContainerController, InfoController from swift.common.swob import HTTPBadRequest, HTTPForbidden, \ @@ -68,7 +68,7 @@ class Application(object): """WSGI application for the proxy server.""" def __init__(self, conf, memcache=None, logger=None, account_ring=None, - container_ring=None, object_ring=None): + container_ring=None): if conf is None: conf = {} if logger is None: @@ -77,6 +77,7 @@ class Application(object): self.logger = logger swift_dir = conf.get('swift_dir', '/etc/swift') + self.swift_dir = swift_dir self.node_timeout = int(conf.get('node_timeout', 10)) self.recoverable_node_timeout = int( conf.get('recoverable_node_timeout', self.node_timeout)) @@ -99,11 +100,13 @@ class Application(object): config_true_value(conf.get('allow_account_management', 'no')) self.object_post_as_copy = \ config_true_value(conf.get('object_post_as_copy', 'true')) - self.object_ring = object_ring or Ring(swift_dir, ring_name='object') self.container_ring = container_ring or Ring(swift_dir, ring_name='container') self.account_ring = account_ring or Ring(swift_dir, ring_name='account') + # ensure rings are loaded for all configured storage policies + for policy in POLICIES: + policy.load_ring(swift_dir) self.memcache = memcache mimetypes.init(mimetypes.knownfiles + [os.path.join(swift_dir, 'mime.types')]) @@ -220,6 +223,16 @@ class Application(object): "read_affinity setting will have no effect." % self.sorting_method) + def get_object_ring(self, policy_idx): + """ + Get the ring object to use to handle a request based on its policy. + + :param policy_idx: policy index as defined in swift.conf + + :returns: appropriate ring object + """ + return POLICIES.get_object_ring(policy_idx, self.swift_dir) + def get_controller(self, path): """ Get the controller to handle a request. diff --git a/test/unit/common/test_wsgi.py b/test/unit/common/test_wsgi.py index 75ddbd6f97..f1573a1f8d 100644 --- a/test/unit/common/test_wsgi.py +++ b/test/unit/common/test_wsgi.py @@ -37,6 +37,8 @@ import swift.proxy.server from swift.common.swob import Request from swift.common import wsgi, utils +from swift.common.storage_policy import StoragePolicy, \ + StoragePolicyCollection from test.unit import temptree, write_fake_ring @@ -46,7 +48,14 @@ from paste.deploy import loadwsgi def _fake_rings(tmpdir): write_fake_ring(os.path.join(tmpdir, 'account.ring.gz')) write_fake_ring(os.path.join(tmpdir, 'container.ring.gz')) - write_fake_ring(os.path.join(tmpdir, 'object.ring.gz')) + # Some storage-policy-specific fake rings. + policy = [StoragePolicy(0, 'zero'), + StoragePolicy(1, 'one', is_default=True)] + policies = StoragePolicyCollection(policy) + for pol in policies: + obj_ring_path = \ + os.path.join(tmpdir, pol.ring_name + '.ring.gz') + write_fake_ring(obj_ring_path) class TestWSGI(unittest.TestCase): diff --git a/test/unit/obj/test_auditor.py b/test/unit/obj/test_auditor.py index ea14540bd2..f34ed8e18c 100644 --- a/test/unit/obj/test_auditor.py +++ b/test/unit/obj/test_auditor.py @@ -25,7 +25,7 @@ from tempfile import mkdtemp from test.unit import FakeLogger from swift.obj import auditor from swift.obj.diskfile import DiskFile, write_metadata, invalidate_hash, \ - DATADIR, DiskFileManager, AuditLocation + DATADIR_BASE, DiskFileManager, AuditLocation from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \ storage_directory @@ -339,7 +339,7 @@ class TestAuditor(unittest.TestCase): name_hash = hash_path('a', 'c', 'o') dir_path = os.path.join( self.devices, 'sda', - storage_directory(DATADIR, '0', name_hash)) + storage_directory(DATADIR_BASE, '0', name_hash)) ts_file_path = os.path.join(dir_path, '99999.ts') if not os.path.exists(dir_path): mkdirs(dir_path) diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index cf8801c79e..c62e24ae15 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -33,7 +33,7 @@ from contextlib import closing, nested from gzip import GzipFile from eventlet import tpool -from test.unit import FakeLogger, mock as unit_mock, temptree +from test.unit import FakeLogger, mock as unit_mock, temptree, patch_policies from swift.obj import diskfile from swift.common import utils @@ -43,6 +43,15 @@ from swift.common.exceptions import DiskFileNotExist, DiskFileQuarantined, \ DiskFileDeviceUnavailable, DiskFileDeleted, DiskFileNotOpen, \ DiskFileError, ReplicationLockTimeout, PathNotDir, DiskFileCollision, \ DiskFileExpired, SwiftException, DiskFileNoSpace +from swift.common.storage_policy import StoragePolicy, POLICIES, \ + get_policy_string +from functools import partial + + +get_data_dir = partial(get_policy_string, diskfile.DATADIR_BASE) +get_tmp_dir = partial(get_policy_string, diskfile.TMP_BASE) +_mocked_policies = [StoragePolicy(0, 'zero', False), + StoragePolicy(1, 'one', True)] def _create_test_ring(path): @@ -101,23 +110,69 @@ class TestDiskFileModuleMethods(unittest.TestCase): def tearDown(self): rmtree(self.testdir, ignore_errors=1) - def _create_diskfile(self): + def _create_diskfile(self, policy_idx=0): return self.df_mgr.get_diskfile(self.existing_device, - '0', 'a', 'c', 'o') + '0', 'a', 'c', 'o', + policy_idx) + @patch_policies(_mocked_policies) + def test_extract_policy_index(self): + # good path names + pn = 'objects/0/606/1984527ed7ef6247c78606/1401379842.14643.data' + self.assertEqual(diskfile.extract_policy_index(pn), 0) + pn = 'objects-1/0/606/198452b6ef6247c78606/1401379842.14643.data' + self.assertEqual(diskfile.extract_policy_index(pn), 1) + good_path = '/srv/node/sda1/objects-1/1/abc/def/1234.data' + self.assertEquals(1, diskfile.extract_policy_index(good_path)) + good_path = '/srv/node/sda1/objects/1/abc/def/1234.data' + self.assertEquals(0, diskfile.extract_policy_index(good_path)) + + # short paths still ok + path = '/srv/node/sda1/objects/1/1234.data' + self.assertEqual(diskfile.extract_policy_index(path), 0) + path = '/srv/node/sda1/objects-1/1/1234.data' + self.assertEqual(diskfile.extract_policy_index(path), 1) + + # leading slash, just in case + pn = '/objects/0/606/1984527ed7ef6247c78606/1401379842.14643.data' + self.assertEqual(diskfile.extract_policy_index(pn), 0) + pn = '/objects-1/0/606/198452b6ef6247c78606/1401379842.14643.data' + self.assertEqual(diskfile.extract_policy_index(pn), 1) + + # bad policy index + pn = 'objects-2/0/606/198427efcff042c78606/1401379842.14643.data' + self.assertEqual(diskfile.extract_policy_index(pn), 0) + bad_path = '/srv/node/sda1/objects-t/1/abc/def/1234.data' + self.assertRaises(ValueError, + diskfile.extract_policy_index, bad_path) + + # malformed path (no objects dir or nothing at all) + pn = 'XXXX/0/606/1984527ed42b6ef6247c78606/1401379842.14643.data' + self.assertEqual(diskfile.extract_policy_index(pn), 0) + self.assertEqual(diskfile.extract_policy_index(''), 0) + + # no datadir base in path + bad_path = '/srv/node/sda1/foo-1/1/abc/def/1234.data' + self.assertEqual(diskfile.extract_policy_index(bad_path), 0) + bad_path = '/srv/node/sda1/obj1/1/abc/def/1234.data' + self.assertEqual(diskfile.extract_policy_index(bad_path), 0) + + @patch_policies(_mocked_policies) def test_quarantine_renamer(self): - # we use this for convenience, not really about a diskfile layout - df = self._create_diskfile() - mkdirs(df._datadir) - exp_dir = os.path.join(self.devices, 'quarantined', 'objects', - os.path.basename(df._datadir)) - qbit = os.path.join(df._datadir, 'qbit') - with open(qbit, 'w') as f: - f.write('abc') - to_dir = diskfile.quarantine_renamer(self.devices, qbit) - self.assertEqual(to_dir, exp_dir) - self.assertRaises(OSError, diskfile.quarantine_renamer, self.devices, - qbit) + for policy in POLICIES: + # we use this for convenience, not really about a diskfile layout + df = self._create_diskfile(policy_idx=policy.idx) + mkdirs(df._datadir) + exp_dir = os.path.join(self.devices, 'quarantined', + get_data_dir(policy.idx), + os.path.basename(df._datadir)) + qbit = os.path.join(df._datadir, 'qbit') + with open(qbit, 'w') as f: + f.write('abc') + to_dir = diskfile.quarantine_renamer(self.devices, qbit) + self.assertEqual(to_dir, exp_dir) + self.assertRaises(OSError, diskfile.quarantine_renamer, + self.devices, qbit) def test_hash_suffix_enoent(self): self.assertRaises(PathNotDir, diskfile.hash_suffix, @@ -130,6 +185,35 @@ class TestDiskFileModuleMethods(unittest.TestCase): self.assertRaises(OSError, diskfile.hash_suffix, os.path.join(self.testdir, "doesnotexist"), 101) + @patch_policies(_mocked_policies) + def test_get_data_dir(self): + self.assertEquals(diskfile.get_data_dir(0), diskfile.DATADIR_BASE) + self.assertEquals(diskfile.get_data_dir(1), + diskfile.DATADIR_BASE + "-1") + self.assertRaises(ValueError, diskfile.get_data_dir, 'junk') + + self.assertRaises(ValueError, diskfile.get_data_dir, 99) + + @patch_policies(_mocked_policies) + def test_get_async_dir(self): + self.assertEquals(diskfile.get_async_dir(0), + diskfile.ASYNCDIR_BASE) + self.assertEquals(diskfile.get_async_dir(1), + diskfile.ASYNCDIR_BASE + "-1") + self.assertRaises(ValueError, diskfile.get_async_dir, 'junk') + + self.assertRaises(ValueError, diskfile.get_async_dir, 99) + + @patch_policies(_mocked_policies) + def test_get_tmp_dir(self): + self.assertEquals(diskfile.get_tmp_dir(0), + diskfile.TMP_BASE) + self.assertEquals(diskfile.get_tmp_dir(1), + diskfile.TMP_BASE + "-1") + self.assertRaises(ValueError, diskfile.get_tmp_dir, 'junk') + + self.assertRaises(ValueError, diskfile.get_tmp_dir, 99) + def test_hash_suffix_hash_dir_is_file_quarantine(self): df = self._create_diskfile() mkdirs(os.path.dirname(df._datadir)) @@ -733,7 +817,7 @@ class TestDiskFileManager(unittest.TestCase): dp = self.df_mgr.construct_dev_path(self.existing_device1) ohash = diskfile.hash_path('a', 'c', 'o') wp.assert_called_with({'a': 1, 'b': 2}, - os.path.join(dp, diskfile.ASYNCDIR, + os.path.join(dp, diskfile.ASYNCDIR_BASE, ohash[-3:], ohash + '-' + ts), os.path.join(dp, 'tmp')) self.df_mgr.logger.increment.assert_called_with('async_pendings') @@ -857,9 +941,10 @@ class TestDiskFile(unittest.TestCase): pickle.dumps(metadata, diskfile.PICKLE_PROTOCOL)) def _simple_get_diskfile(self, partition='0', account='a', container='c', - obj='o'): + obj='o', policy_idx=0): return self.df_mgr.get_diskfile(self.existing_device, - partition, account, container, obj) + partition, account, container, obj, + policy_idx) def _create_test_file(self, data, timestamp=None, metadata=None, account='a', container='c', obj='o'): diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index de28f72af2..137a6dfa1f 100755 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -34,14 +34,15 @@ from eventlet import sleep, spawn, wsgi, listen, Timeout, tpool from nose import SkipTest from test.unit import FakeLogger, debug_logger -from test.unit import connect_tcp, readuntil2crlfs +from test.unit import connect_tcp, readuntil2crlfs, patch_policies from swift.obj import server as object_server from swift.obj import diskfile -from swift.common import utils +from swift.common import utils, storage_policy from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \ NullLogger, storage_directory, public, replication from swift.common import constraints from swift.common.swob import Request, HeaderKeyDict +from swift.common.storage_policy import POLICY_INDEX, POLICIES from swift.common.exceptions import DiskFileDeviceUnavailable @@ -545,8 +546,8 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 201) objfile = os.path.join( self.testdir, 'sda1', - storage_directory(diskfile.DATADIR, 'p', - hash_path('a', 'c', 'o')), + storage_directory(diskfile.get_data_dir(0), + 'p', hash_path('a', 'c', 'o')), timestamp + '.data') self.assert_(os.path.isfile(objfile)) self.assertEquals(open(objfile).read(), 'VERIFY') @@ -578,7 +579,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 201) objfile = os.path.join( self.testdir, 'sda1', - storage_directory(diskfile.DATADIR, 'p', + storage_directory(diskfile.get_data_dir(0), 'p', hash_path('a', 'c', 'o')), timestamp + '.data') self.assert_(os.path.isfile(objfile)) @@ -613,7 +614,7 @@ class TestObjectController(unittest.TestCase): self.assertEqual(resp.status_int, 201) objfile = os.path.join( self.testdir, 'sda1', - storage_directory(diskfile.DATADIR, 'p', + storage_directory(diskfile.get_data_dir(0), 'p', hash_path('a', 'c', 'o')), timestamp + '.data') self.assertTrue(os.path.isfile(objfile)) @@ -687,7 +688,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 201) objfile = os.path.join( self.testdir, 'sda1', - storage_directory(diskfile.DATADIR, 'p', + storage_directory(diskfile.get_data_dir(0), 'p', hash_path('a', 'c', 'o')), timestamp + '.data') self.assert_(os.path.isfile(objfile)) @@ -833,7 +834,7 @@ class TestObjectController(unittest.TestCase): objfile = os.path.join( self.testdir, 'sda1', - storage_directory(diskfile.DATADIR, 'p', + storage_directory(diskfile.get_data_dir(0), 'p', hash_path('a', 'c', 'o')), timestamp + '.data') os.unlink(objfile) @@ -957,7 +958,7 @@ class TestObjectController(unittest.TestCase): objfile = os.path.join( self.testdir, 'sda1', - storage_directory(diskfile.DATADIR, 'p', + storage_directory(diskfile.get_data_dir(0), 'p', hash_path('a', 'c', 'o')), timestamp + '.data') os.unlink(objfile) @@ -1252,7 +1253,7 @@ class TestObjectController(unittest.TestCase): req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'GET'}, headers={'If-Modified-Since': since}) - resp = self.object_controller.GET(req) + resp = req.get_response(self.object_controller) self.assertEquals(resp.status_int, 304) timestamp = normalize_timestamp(int(time())) @@ -1391,7 +1392,7 @@ class TestObjectController(unittest.TestCase): req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'GET'}, headers={'If-Unmodified-Since': since}) - resp = self.object_controller.GET(req) + resp = req.get_response(self.object_controller) self.assertEquals(resp.status_int, 200) def test_HEAD_if_unmodified_since(self): @@ -1568,7 +1569,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 404) ts_1000_file = os.path.join( self.testdir, 'sda1', - storage_directory(diskfile.DATADIR, 'p', + storage_directory(diskfile.get_data_dir(0), 'p', hash_path('a', 'c', 'o')), timestamp + '.ts') self.assertTrue(os.path.isfile(ts_1000_file)) @@ -1584,7 +1585,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 404) ts_999_file = os.path.join( self.testdir, 'sda1', - storage_directory(diskfile.DATADIR, 'p', + storage_directory(diskfile.get_data_dir(0), 'p', hash_path('a', 'c', 'o')), timestamp + '.ts') self.assertFalse(os.path.isfile(ts_999_file)) @@ -1604,7 +1605,7 @@ class TestObjectController(unittest.TestCase): # There should now be 1000 ts and a 1001 data file. data_1002_file = os.path.join( self.testdir, 'sda1', - storage_directory(diskfile.DATADIR, 'p', + storage_directory(diskfile.get_data_dir(0), 'p', hash_path('a', 'c', 'o')), timestamp + '.data') self.assertTrue(os.path.isfile(data_1002_file)) @@ -1619,7 +1620,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 409) ts_1001_file = os.path.join( self.testdir, 'sda1', - storage_directory(diskfile.DATADIR, 'p', + storage_directory(diskfile.get_data_dir(0), 'p', hash_path('a', 'c', 'o')), timestamp + '.ts') self.assertFalse(os.path.isfile(ts_1001_file)) @@ -1634,7 +1635,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 204) ts_1003_file = os.path.join( self.testdir, 'sda1', - storage_directory(diskfile.DATADIR, 'p', + storage_directory(diskfile.get_data_dir(0), 'p', hash_path('a', 'c', 'o')), timestamp + '.ts') self.assertTrue(os.path.isfile(ts_1003_file)) @@ -1673,7 +1674,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 409) objfile = os.path.join( self.testdir, 'sda1', - storage_directory(diskfile.DATADIR, 'p', + storage_directory(diskfile.get_data_dir(0), 'p', hash_path('a', 'c', 'o')), timestamp + '.ts') self.assertFalse(os.path.isfile(objfile)) @@ -1693,7 +1694,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 204) objfile = os.path.join( self.testdir, 'sda1', - storage_directory(diskfile.DATADIR, 'p', + storage_directory(diskfile.get_data_dir(0), 'p', hash_path('a', 'c', 'o')), timestamp + '.ts') self.assert_(os.path.isfile(objfile)) @@ -1713,7 +1714,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 404) objfile = os.path.join( self.testdir, 'sda1', - storage_directory(diskfile.DATADIR, 'p', + storage_directory(diskfile.get_data_dir(0), 'p', hash_path('a', 'c', 'o')), timestamp + '.ts') self.assert_(os.path.isfile(objfile)) @@ -1732,7 +1733,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 404) objfile = os.path.join( self.testdir, 'sda1', - storage_directory(diskfile.DATADIR, 'p', + storage_directory(diskfile.get_data_dir(0), 'p', hash_path('a', 'c', 'o')), timestamp + '.ts') self.assertFalse(os.path.isfile(objfile)) @@ -2119,6 +2120,9 @@ class TestObjectController(unittest.TestCase): 'x-timestamp': '1', 'x-out': 'set', 'user-agent': 'obj-server %s' % os.getpid()}]) + @patch_policies([storage_policy.StoragePolicy(0, 'zero', True), + storage_policy.StoragePolicy(1, 'one'), + storage_policy.StoragePolicy(37, 'fantastico')]) def test_updating_multiple_delete_at_container_servers(self): self.object_controller.expiring_objects_account = 'exp' self.object_controller.expiring_objects_container_divisor = 60 @@ -2164,12 +2168,9 @@ class TestObjectController(unittest.TestCase): 'X-Delete-At-Partition': '6237', 'X-Delete-At-Device': 'sdp,sdq'}) - orig_http_connect = object_server.http_connect - try: - object_server.http_connect = fake_http_connect + with mock.patch.object(object_server, 'http_connect', + fake_http_connect): resp = req.get_response(self.object_controller) - finally: - object_server.http_connect = orig_http_connect self.assertEqual(resp.status_int, 201) @@ -2228,6 +2229,9 @@ class TestObjectController(unittest.TestCase): 'user-agent': 'obj-server %d' % os.getpid(), 'x-trans-id': '-'})}) + @patch_policies([storage_policy.StoragePolicy(0, 'zero', True), + storage_policy.StoragePolicy(1, 'one'), + storage_policy.StoragePolicy(26, 'twice-thirteen')]) def test_updating_multiple_container_servers(self): http_connect_args = [] @@ -2261,16 +2265,14 @@ class TestObjectController(unittest.TestCase): headers={'X-Timestamp': '12345', 'Content-Type': 'application/burrito', 'Content-Length': '0', + 'X-Storage-Policy-Index': '26', 'X-Container-Partition': '20', 'X-Container-Host': '1.2.3.4:5, 6.7.8.9:10', 'X-Container-Device': 'sdb1, sdf1'}) - orig_http_connect = object_server.http_connect - try: - object_server.http_connect = fake_http_connect - self.object_controller.PUT(req) - finally: - object_server.http_connect = orig_http_connect + with mock.patch.object(object_server, 'http_connect', + fake_http_connect): + req.get_response(self.object_controller) http_connect_args.sort(key=operator.itemgetter('ipaddr')) @@ -2701,7 +2703,7 @@ class TestObjectController(unittest.TestCase): 'Content-Length': '4', 'Content-Type': 'application/octet-stream'}) req.body = 'TEST' - resp = self.object_controller.PUT(req) + resp = req.get_response(self.object_controller) self.assertEquals(resp.status_int, 201) self.assertEquals(given_args, []) @@ -2711,7 +2713,7 @@ class TestObjectController(unittest.TestCase): environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': normalize_timestamp(time()), 'Content-Type': 'application/x-test'}) - resp = self.object_controller.POST(req) + resp = req.get_response(self.object_controller) self.assertEquals(resp.status_int, 202) self.assertEquals(given_args, []) @@ -2724,12 +2726,12 @@ class TestObjectController(unittest.TestCase): headers={'X-Timestamp': timestamp1, 'Content-Type': 'application/x-test', 'X-Delete-At': delete_at_timestamp1}) - resp = self.object_controller.POST(req) + resp = req.get_response(self.object_controller) self.assertEquals(resp.status_int, 202) self.assertEquals( given_args, [ 'PUT', int(delete_at_timestamp1), 'a', 'c', 'o', - req, 'sda1']) + given_args[5], 'sda1']) while given_args: given_args.pop() @@ -2743,14 +2745,14 @@ class TestObjectController(unittest.TestCase): headers={'X-Timestamp': timestamp2, 'Content-Type': 'application/x-test', 'X-Delete-At': delete_at_timestamp2}) - resp = self.object_controller.POST(req) + resp = req.get_response(self.object_controller) self.assertEquals(resp.status_int, 202) self.assertEquals( given_args, [ 'PUT', int(delete_at_timestamp2), 'a', 'c', 'o', - req, 'sda1', + given_args[5], 'sda1', 'DELETE', int(delete_at_timestamp1), 'a', 'c', 'o', - req, 'sda1']) + given_args[5], 'sda1']) def test_PUT_calls_delete_at(self): given_args = [] @@ -2766,7 +2768,7 @@ class TestObjectController(unittest.TestCase): 'Content-Length': '4', 'Content-Type': 'application/octet-stream'}) req.body = 'TEST' - resp = self.object_controller.PUT(req) + resp = req.get_response(self.object_controller) self.assertEquals(resp.status_int, 201) self.assertEquals(given_args, []) @@ -2780,12 +2782,12 @@ class TestObjectController(unittest.TestCase): 'Content-Type': 'application/octet-stream', 'X-Delete-At': delete_at_timestamp1}) req.body = 'TEST' - resp = self.object_controller.PUT(req) + resp = req.get_response(self.object_controller) self.assertEquals(resp.status_int, 201) self.assertEquals( given_args, [ 'PUT', int(delete_at_timestamp1), 'a', 'c', 'o', - req, 'sda1']) + given_args[5], 'sda1']) while given_args: given_args.pop() @@ -2801,14 +2803,14 @@ class TestObjectController(unittest.TestCase): 'Content-Type': 'application/octet-stream', 'X-Delete-At': delete_at_timestamp2}) req.body = 'TEST' - resp = self.object_controller.PUT(req) + resp = req.get_response(self.object_controller) self.assertEquals(resp.status_int, 201) self.assertEquals( given_args, [ 'PUT', int(delete_at_timestamp2), 'a', 'c', 'o', - req, 'sda1', + given_args[5], 'sda1', 'DELETE', int(delete_at_timestamp1), 'a', 'c', 'o', - req, 'sda1']) + given_args[5], 'sda1']) def test_GET_but_expired(self): test_time = time() + 10000 @@ -3057,7 +3059,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.body, 'TEST') objfile = os.path.join( self.testdir, 'sda1', - storage_directory(diskfile.DATADIR, 'p', + storage_directory(diskfile.get_data_dir(0), 'p', hash_path('a', 'c', 'o')), test_timestamp + '.data') self.assert_(os.path.isfile(objfile)) @@ -3177,7 +3179,6 @@ class TestObjectController(unittest.TestCase): given_args.extend(args) self.object_controller.delete_at_update = fake_delete_at_update - timestamp1 = normalize_timestamp(time()) delete_at_timestamp1 = int(time() + 1000) delete_at_container1 = str( @@ -3192,11 +3193,11 @@ class TestObjectController(unittest.TestCase): 'X-Delete-At': str(delete_at_timestamp1), 'X-Delete-At-Container': delete_at_container1}) req.body = 'TEST' - resp = self.object_controller.PUT(req) + resp = req.get_response(self.object_controller) self.assertEquals(resp.status_int, 201) self.assertEquals(given_args, [ 'PUT', int(delete_at_timestamp1), 'a', 'c', 'o', - req, 'sda1']) + given_args[5], 'sda1']) while given_args: given_args.pop() @@ -3208,11 +3209,11 @@ class TestObjectController(unittest.TestCase): environ={'REQUEST_METHOD': 'DELETE'}, headers={'X-Timestamp': timestamp2, 'Content-Type': 'application/octet-stream'}) - resp = self.object_controller.DELETE(req) + resp = req.get_response(self.object_controller) self.assertEquals(resp.status_int, 204) self.assertEquals(given_args, [ 'DELETE', int(delete_at_timestamp1), 'a', 'c', 'o', - req, 'sda1']) + given_args[5], 'sda1']) def test_PUT_delete_at_in_past(self): req = Request.blank( @@ -3439,7 +3440,8 @@ class TestObjectController(unittest.TestCase): 'wsgi.run_once': False} method_res = mock.MagicMock() - mock_method = public(lambda x: mock.MagicMock(return_value=method_res)) + mock_method = public(lambda x: + mock.MagicMock(return_value=method_res)) with mock.patch.object(self.object_controller, method, new=mock_method): response = self.object_controller.__call__(env, start_response) @@ -3643,6 +3645,41 @@ class TestObjectController(unittest.TestCase): [(('1.2.3.4 - - [01/Jan/1970:02:46:41 +0000] "HEAD /sda1/p/a/c/o" ' '404 - "-" "-" "-" 2.0000 "-"',), {})]) + @patch_policies([storage_policy.StoragePolicy(0, 'zero', True), + storage_policy.StoragePolicy(1, 'one', False)]) + def test_dynamic_datadir(self): + timestamp = normalize_timestamp(time()) + req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': timestamp, + 'Content-Type': 'application/x-test', + 'Foo': 'fooheader', + 'Baz': 'bazheader', + POLICY_INDEX: 1, + 'X-Object-Meta-1': 'One', + 'X-Object-Meta-Two': 'Two'}) + req.body = 'VERIFY' + object_dir = self.testdir + "/sda1/objects-1" + self.assertFalse(os.path.isdir(object_dir)) + resp = req.get_response(self.object_controller) + self.assertEquals(resp.status_int, 201) + self.assertTrue(os.path.isdir(object_dir)) + + # make sure no idx in header uses policy 0 data_dir + req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': timestamp, + 'Content-Type': 'application/x-test', + 'Foo': 'fooheader', + 'Baz': 'bazheader', + 'X-Object-Meta-1': 'One', + 'X-Object-Meta-Two': 'Two'}) + req.body = 'VERIFY' + object_dir = self.testdir + "/sda1/objects" + self.assertFalse(os.path.isdir(object_dir)) + with mock.patch.object(POLICIES, 'get_by_index', + lambda _: True): + resp = req.get_response(self.object_controller) + self.assertEquals(resp.status_int, 201) + self.assertTrue(os.path.isdir(object_dir)) if __name__ == '__main__': unittest.main() diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py index 30eb4491ca..e9daf50ba7 100644 --- a/test/unit/obj/test_ssync_receiver.py +++ b/test/unit/obj/test_ssync_receiver.py @@ -454,7 +454,7 @@ class TestReceiver(unittest.TestCase): def test_MISSING_CHECK_have_one_exact(self): object_dir = utils.storage_directory( - os.path.join(self.testdir, 'sda1', diskfile.DATADIR), + os.path.join(self.testdir, 'sda1', diskfile.DATADIR_BASE), '1', self.hash1) utils.mkdirs(object_dir) fp = open(os.path.join(object_dir, self.ts1 + '.data'), 'w+') @@ -485,7 +485,7 @@ class TestReceiver(unittest.TestCase): def test_MISSING_CHECK_have_one_newer(self): object_dir = utils.storage_directory( - os.path.join(self.testdir, 'sda1', diskfile.DATADIR), + os.path.join(self.testdir, 'sda1', diskfile.DATADIR_BASE), '1', self.hash1) utils.mkdirs(object_dir) newer_ts1 = utils.normalize_timestamp(float(self.ts1) + 1) @@ -518,7 +518,7 @@ class TestReceiver(unittest.TestCase): def test_MISSING_CHECK_have_one_older(self): object_dir = utils.storage_directory( - os.path.join(self.testdir, 'sda1', diskfile.DATADIR), + os.path.join(self.testdir, 'sda1', diskfile.DATADIR_BASE), '1', self.hash1) utils.mkdirs(object_dir) older_ts1 = utils.normalize_timestamp(float(self.ts1) - 1) diff --git a/test/unit/obj/test_updater.py b/test/unit/obj/test_updater.py index 56a6b5a271..2aeff19f0f 100644 --- a/test/unit/obj/test_updater.py +++ b/test/unit/obj/test_updater.py @@ -27,7 +27,7 @@ from distutils.dir_util import mkpath from eventlet import spawn, Timeout, listen from swift.obj import updater as object_updater -from swift.obj.diskfile import ASYNCDIR +from swift.obj.diskfile import ASYNCDIR_BASE from swift.common.ring import RingData from swift.common import utils from swift.common.utils import hash_path, normalize_timestamp, mkdirs, \ @@ -80,11 +80,11 @@ class TestObjectUpdater(unittest.TestCase): self.assert_(cu.get_container_ring() is not None) def test_object_sweep(self): - prefix_dir = os.path.join(self.sda1, ASYNCDIR, 'abc') + prefix_dir = os.path.join(self.sda1, ASYNCDIR_BASE, 'abc') mkpath(prefix_dir) # A non-directory where directory is expected should just be skipped... - not_a_dir_path = os.path.join(self.sda1, ASYNCDIR, 'not_a_dir') + not_a_dir_path = os.path.join(self.sda1, ASYNCDIR_BASE, 'not_a_dir') with open(not_a_dir_path, 'w'): pass @@ -134,7 +134,7 @@ class TestObjectUpdater(unittest.TestCase): 'concurrency': '1', 'node_timeout': '15'}) cu.run_once() - async_dir = os.path.join(self.sda1, ASYNCDIR) + async_dir = os.path.join(self.sda1, ASYNCDIR_BASE) os.mkdir(async_dir) cu.run_once() self.assert_(os.path.exists(async_dir)) @@ -171,7 +171,7 @@ class TestObjectUpdater(unittest.TestCase): 'concurrency': '1', 'node_timeout': '15'}) cu.run_once() - async_dir = os.path.join(self.sda1, ASYNCDIR) + async_dir = os.path.join(self.sda1, ASYNCDIR_BASE) os.mkdir(async_dir) cu.run_once() self.assert_(os.path.exists(async_dir)) diff --git a/test/unit/proxy/controllers/test_account.py b/test/unit/proxy/controllers/test_account.py index 276a432a85..23ad0a1c4e 100644 --- a/test/unit/proxy/controllers/test_account.py +++ b/test/unit/proxy/controllers/test_account.py @@ -22,16 +22,19 @@ from swift.proxy import server as proxy_server from swift.proxy.controllers.base import headers_to_account_info from swift.common import constraints from test.unit import fake_http_connect, FakeRing, FakeMemcache +from swift.common.storage_policy import StoragePolicy from swift.common.request_helpers import get_sys_meta_prefix import swift.proxy.controllers.base +from test.unit import patch_policies + +@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())]) class TestAccountController(unittest.TestCase): def setUp(self): - self.app = proxy_server.Application(None, FakeMemcache(), - account_ring=FakeRing(), - container_ring=FakeRing(), - object_ring=FakeRing()) + self.app = proxy_server.Application( + None, FakeMemcache(), + account_ring=FakeRing(), container_ring=FakeRing()) def test_account_info_in_response_env(self): controller = proxy_server.AccountController(self.app, 'AUTH_bob') diff --git a/test/unit/proxy/controllers/test_base.py b/test/unit/proxy/controllers/test_base.py index 0c94f90171..cf85ffe6e4 100644 --- a/test/unit/proxy/controllers/test_base.py +++ b/test/unit/proxy/controllers/test_base.py @@ -22,10 +22,13 @@ from swift.proxy.controllers.base import headers_to_container_info, \ Controller, GetOrHeadHandler from swift.common.swob import Request, HTTPException, HeaderKeyDict from swift.common.utils import split_path +from swift.common.storage_policy import StoragePolicy from test.unit import fake_http_connect, FakeRing, FakeMemcache from swift.proxy import server as proxy_server from swift.common.request_helpers import get_sys_meta_prefix +from test.unit import patch_policies + FakeResponse_status_int = 201 @@ -81,12 +84,12 @@ class FakeCache(object): return self.val +@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())]) class TestFuncs(unittest.TestCase): def setUp(self): self.app = proxy_server.Application(None, FakeMemcache(), account_ring=FakeRing(), - container_ring=FakeRing(), - object_ring=FakeRing) + container_ring=FakeRing()) def test_GETorHEAD_base(self): base = Controller(self.app) diff --git a/test/unit/proxy/controllers/test_container.py b/test/unit/proxy/controllers/test_container.py index eda000fe98..0dc34b5e66 100644 --- a/test/unit/proxy/controllers/test_container.py +++ b/test/unit/proxy/controllers/test_container.py @@ -33,8 +33,7 @@ class TestContainerController(TestRingBase): TestRingBase.setUp(self) self.app = proxy_server.Application(None, FakeMemcache(), account_ring=FakeRing(), - container_ring=FakeRing(), - object_ring=FakeRing()) + container_ring=FakeRing()) def test_container_info_in_response_env(self): controller = proxy_server.ContainerController(self.app, 'a', 'c') diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index 83c0c1f64b..2c5111404d 100755 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -23,6 +23,9 @@ import swift from swift.proxy import server as proxy_server from swift.common.swob import HTTPException from test.unit import FakeRing, FakeMemcache, fake_http_connect, debug_logger +from swift.common.storage_policy import StoragePolicy + +from test.unit import patch_policies @contextmanager @@ -40,23 +43,25 @@ def set_http_connect(*args, **kwargs): swift.proxy.controllers.container.http_connect = old_connect +@patch_policies([StoragePolicy(0, 'zero', True, + object_ring=FakeRing(max_more_nodes=9))]) class TestObjControllerWriteAffinity(unittest.TestCase): def setUp(self): self.app = proxy_server.Application( None, FakeMemcache(), account_ring=FakeRing(), - container_ring=FakeRing(), object_ring=FakeRing(max_more_nodes=9)) - self.app.request_node_count = lambda replicas: 10000000 + container_ring=FakeRing(), logger=debug_logger()) + self.app.request_node_count = lambda ring: 10000000 self.app.sort_nodes = lambda l: l # stop shuffling the primary nodes def test_iter_nodes_local_first_noops_when_no_affinity(self): controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o') self.app.write_affinity_is_local_fn = None - - all_nodes = self.app.object_ring.get_part_nodes(1) - all_nodes.extend(self.app.object_ring.get_more_nodes(1)) + object_ring = self.app.get_object_ring(None) + all_nodes = object_ring.get_part_nodes(1) + all_nodes.extend(object_ring.get_more_nodes(1)) local_first_nodes = list(controller.iter_nodes_local_first( - self.app.object_ring, 1)) + object_ring, 1)) self.maxDiff = None @@ -68,11 +73,12 @@ class TestObjControllerWriteAffinity(unittest.TestCase): lambda node: node['region'] == 1) self.app.write_affinity_node_count = lambda ring: 4 - all_nodes = self.app.object_ring.get_part_nodes(1) - all_nodes.extend(self.app.object_ring.get_more_nodes(1)) + object_ring = self.app.get_object_ring(None) + all_nodes = object_ring.get_part_nodes(1) + all_nodes.extend(object_ring.get_more_nodes(1)) local_first_nodes = list(controller.iter_nodes_local_first( - self.app.object_ring, 1)) + object_ring, 1)) # the local nodes move up in the ordering self.assertEqual([1, 1, 1, 1], @@ -90,14 +96,14 @@ class TestObjControllerWriteAffinity(unittest.TestCase): self.assertTrue(res is None) +@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())]) class TestObjController(unittest.TestCase): def setUp(self): logger = debug_logger('proxy-server') logger.thread_locals = ('txn1', '127.0.0.2') self.app = proxy_server.Application( None, FakeMemcache(), account_ring=FakeRing(), - container_ring=FakeRing(), object_ring=FakeRing(), - logger=logger) + container_ring=FakeRing(), logger=logger) self.controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o') self.controller.container_info = mock.MagicMock(return_value={ @@ -109,6 +115,7 @@ class TestObjController(unittest.TestCase): ], 'write_acl': None, 'read_acl': None, + 'storage_policy': None, 'sync_key': None, 'versions': None}) @@ -155,23 +162,35 @@ class TestObjController(unittest.TestCase): resp = self.controller.DELETE(req) self.assertEquals(resp.status_int, 204) - def test_POST_simple(self): + def test_POST_as_COPY_simple(self): req = swift.common.swob.Request.blank('/v1/a/c/o') - with set_http_connect(200, 200, 200, 201, 201, 201): + with set_http_connect(200, 200, 200, 201, 201, 201) as fake_conn: resp = self.controller.POST(req) + self.assertRaises(StopIteration, fake_conn.code_iter.next) self.assertEquals(resp.status_int, 202) def test_COPY_simple(self): - req = swift.common.swob.Request.blank('/v1/a/c/o') - with set_http_connect(200, 200, 200, 201, 201, 201): - resp = self.controller.POST(req) - self.assertEquals(resp.status_int, 202) + req = swift.common.swob.Request.blank( + '/v1/a/c/o', headers={'Content-Length': 0, + 'Destination': 'c/o-copy'}) + with set_http_connect(200, 200, 200, 201, 201, 201) as fake_conn: + resp = self.controller.COPY(req) + self.assertRaises(StopIteration, fake_conn.code_iter.next) + self.assertEquals(resp.status_int, 201) def test_HEAD_simple(self): req = swift.common.swob.Request.blank('/v1/a/c/o') - with set_http_connect(200, 200, 200, 201, 201, 201): - resp = self.controller.POST(req) - self.assertEquals(resp.status_int, 202) + with set_http_connect(200): + resp = self.controller.HEAD(req) + self.assertEquals(resp.status_int, 200) + + def test_HEAD_x_newest(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', + headers={'X-Newest': 'true'}) + with set_http_connect(200, 200, 200) as fake_conn: + resp = self.controller.HEAD(req) + self.assertRaises(StopIteration, fake_conn.code_iter.next) + self.assertEquals(resp.status_int, 200) def test_PUT_log_info(self): # mock out enough to get to the area of the code we want to test diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 41654a0d55..d57ad2851f 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -21,17 +21,20 @@ from contextlib import contextmanager, nested from shutil import rmtree import gc import time +from textwrap import dedent from urllib import quote from hashlib import md5 from tempfile import mkdtemp import weakref import operator +import functools +from swift.obj import diskfile import re import random import mock from eventlet import sleep, spawn, wsgi, listen -import simplejson +from swift.common.utils import json from test.unit import ( connect_tcp, readuntil2crlfs, FakeLogger, fake_http_connect, FakeRing, @@ -46,15 +49,16 @@ from swift.common.middleware.acl import parse_acl, format_acl from swift.common.exceptions import ChunkReadTimeout from swift.common import utils, constraints from swift.common.utils import mkdirs, normalize_timestamp, NullLogger -from swift.common.wsgi import monkey_patch_mimetools +from swift.common.wsgi import monkey_patch_mimetools, loadapp from swift.proxy.controllers import base as proxy_base from swift.proxy.controllers.base import get_container_memcache_key, \ get_account_memcache_key, cors_validation import swift.proxy.controllers from swift.common.swob import Request, Response, HTTPUnauthorized, \ HTTPException +from swift.common import storage_policy from swift.common.storage_policy import StoragePolicy, \ - POLICIES, POLICY, POLICY_INDEX + StoragePolicyCollection, POLICIES, POLICY, POLICY_INDEX from swift.common.request_helpers import get_sys_meta_prefix # mocks @@ -63,13 +67,15 @@ logging.getLogger().addHandler(logging.StreamHandler(sys.stdout)) STATIC_TIME = time.time() _test_coros = _test_servers = _test_sockets = _orig_container_listing_limit = \ - _testdir = _orig_SysLogHandler = None + _testdir = _orig_SysLogHandler = _orig_POLICIES = _test_POLICIES = None def do_setup(the_object_server): utils.HASH_PATH_SUFFIX = 'endcap' global _testdir, _test_servers, _test_sockets, \ - _orig_container_listing_limit, _test_coros, _orig_SysLogHandler + _orig_container_listing_limit, _test_coros, _orig_SysLogHandler, \ + _orig_POLICIES, _test_POLICIES + _orig_POLICIES = storage_policy._POLICIES _orig_SysLogHandler = utils.SysLogHandler utils.SysLogHandler = mock.MagicMock() monkey_patch_mimetools() @@ -109,14 +115,31 @@ def do_setup(the_object_server): {'port': con2lis.getsockname()[1]}, ] write_fake_ring(container_ring_path, *container_devs) - object_ring_path = os.path.join(_testdir, 'object.ring.gz') - object_devs = [ - {'port': obj1lis.getsockname()[1]}, - {'port': obj2lis.getsockname()[1]}, - ] - write_fake_ring(object_ring_path, *object_devs) + storage_policy._POLICIES = StoragePolicyCollection([ + StoragePolicy(0, 'zero', True), + StoragePolicy(1, 'one', False), + StoragePolicy(2, 'two', False)]) + obj_rings = { + 0: ('sda1', 'sdb1'), + 1: ('sdc1', 'sdd1'), + 2: ('sde1', 'sdf1'), + } + for policy_index, devices in obj_rings.items(): + policy = POLICIES[policy_index] + dev1, dev2 = devices + obj_ring_path = os.path.join(_testdir, policy.ring_name + '.ring.gz') + obj_devs = [ + {'port': obj1lis.getsockname()[1], 'device': dev1}, + {'port': obj2lis.getsockname()[1], 'device': dev2}, + ] + write_fake_ring(obj_ring_path, *obj_devs) prosrv = proxy_server.Application(conf, FakeMemcacheReturnsNone(), logger=debug_logger('proxy')) + for policy in POLICIES: + # make sure all the rings are loaded + prosrv.get_object_ring(policy.idx) + # don't loose this one! + _test_POLICIES = storage_policy._POLICIES acc1srv = account_server.AccountController( conf, logger=debug_logger('acct1')) acc2srv = account_server.AccountController( @@ -155,7 +178,7 @@ def do_setup(the_object_server): 'x-trans-id': 'test'}) resp = conn.getresponse() assert(resp.status == 201) - # Create container + # Create containers, 1 per test policy sock = connect_tcp(('localhost', prolis.getsockname()[1])) fd = sock.makefile() fd.write('PUT /v1/a/c HTTP/1.1\r\nHost: localhost\r\n' @@ -167,6 +190,45 @@ def do_setup(the_object_server): assert headers[:len(exp)] == exp, "Expected '%s', encountered '%s'" % ( exp, headers[:len(exp)]) + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write( + 'PUT /v1/a/c1 HTTP/1.1\r\nHost: localhost\r\n' + 'Connection: close\r\nX-Auth-Token: t\r\nX-Storage-Policy: one\r\n' + 'Content-Length: 0\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + assert headers[:len(exp)] == exp, \ + "Expected '%s', encountered '%s'" % (exp, headers[:len(exp)]) + + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write( + 'PUT /v1/a/c2 HTTP/1.1\r\nHost: localhost\r\n' + 'Connection: close\r\nX-Auth-Token: t\r\nX-Storage-Policy: two\r\n' + 'Content-Length: 0\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + assert headers[:len(exp)] == exp, \ + "Expected '%s', encountered '%s'" % (exp, headers[:len(exp)]) + + +def unpatch_policies(f): + """ + This will unset a TestCase level patch_policies to use the module level + policies setup for the _test_servers instead. + + N.B. You should NEVER modify the _test_server policies or rings during a + test because they persist for the life of the entire module! + """ + @functools.wraps(f) + def wrapper(*args, **kwargs): + with patch_policies(_test_POLICIES): + return f(*args, **kwargs) + return wrapper + def setup(): do_setup(object_server) @@ -177,6 +239,7 @@ def teardown(): server.kill() rmtree(os.path.dirname(_testdir)) utils.SysLogHandler = _orig_SysLogHandler + storage_policy._POLICIES = _orig_POLICIES def sortHeaderNames(headerNames): @@ -230,11 +293,9 @@ class TestController(unittest.TestCase): self.account_ring = FakeRing() self.container_ring = FakeRing() self.memcache = FakeMemcache() - app = proxy_server.Application(None, self.memcache, account_ring=self.account_ring, - container_ring=self.container_ring, - object_ring=FakeRing()) + container_ring=self.container_ring) self.controller = swift.proxy.controllers.Controller(app) class FakeReq(object): @@ -488,8 +549,34 @@ class TestController(unittest.TestCase): test(503, 503, 503) +@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())]) class TestProxyServer(unittest.TestCase): + def test_get_object_ring(self): + baseapp = proxy_server.Application({}, + FakeMemcache(), + container_ring=FakeRing(), + account_ring=FakeRing()) + with patch_policies([ + StoragePolicy(0, 'a', False, object_ring=123), + StoragePolicy(1, 'b', True, object_ring=456), + StoragePolicy(2, 'd', False, object_ring=789) + ]): + # None means legacy so always use policy 0 + ring = baseapp.get_object_ring(None) + self.assertEqual(ring, 123) + ring = baseapp.get_object_ring('') + self.assertEqual(ring, 123) + ring = baseapp.get_object_ring('0') + self.assertEqual(ring, 123) + ring = baseapp.get_object_ring('1') + self.assertEqual(ring, 456) + ring = baseapp.get_object_ring('2') + self.assertEqual(ring, 789) + # illegal values + self.assertRaises(ValueError, baseapp.get_object_ring, '99') + self.assertRaises(ValueError, baseapp.get_object_ring, 'asdf') + def test_unhandled_exception(self): class MyApp(proxy_server.Application): @@ -498,7 +585,7 @@ class TestProxyServer(unittest.TestCase): raise Exception('this shouldnt be caught') app = MyApp(None, FakeMemcache(), account_ring=FakeRing(), - container_ring=FakeRing(), object_ring=FakeRing()) + container_ring=FakeRing()) req = Request.blank('/v1/account', environ={'REQUEST_METHOD': 'HEAD'}) app.update_request(req) resp = app.handle_request(req) @@ -508,7 +595,6 @@ class TestProxyServer(unittest.TestCase): baseapp = proxy_server.Application({}, FakeMemcache(), container_ring=FakeRing(), - object_ring=FakeRing(), account_ring=FakeRing()) resp = baseapp.handle_request( Request.blank('/v1/a', environ={'REQUEST_METHOD': '__init__'})) @@ -518,8 +604,7 @@ class TestProxyServer(unittest.TestCase): baseapp = proxy_server.Application({}, FakeMemcache(), container_ring=FakeRing(), - account_ring=FakeRing(), - object_ring=FakeRing()) + account_ring=FakeRing()) resp = baseapp.handle_request( Request.blank('/v1/a', environ={'REQUEST_METHOD': '!invalid'})) self.assertEquals(resp.status, '405 Method Not Allowed') @@ -533,8 +618,7 @@ class TestProxyServer(unittest.TestCase): set_http_connect(200) app = proxy_server.Application(None, FakeMemcache(), account_ring=FakeRing(), - container_ring=FakeRing(), - object_ring=FakeRing()) + container_ring=FakeRing()) req = Request.blank('/v1/a') req.environ['swift.authorize'] = authorize app.update_request(req) @@ -549,8 +633,7 @@ class TestProxyServer(unittest.TestCase): return HTTPUnauthorized(request=req) app = proxy_server.Application(None, FakeMemcache(), account_ring=FakeRing(), - container_ring=FakeRing(), - object_ring=FakeRing()) + container_ring=FakeRing()) req = Request.blank('/v1/a') req.environ['swift.authorize'] = authorize app.update_request(req) @@ -562,8 +645,7 @@ class TestProxyServer(unittest.TestCase): try: baseapp = proxy_server.Application({'swift_dir': swift_dir}, FakeMemcache(), FakeLogger(), - FakeRing(), FakeRing(), - FakeRing()) + FakeRing(), FakeRing()) resp = baseapp.handle_request( Request.blank('/', environ={'CONTENT_LENGTH': '-1'})) self.assertEquals(resp.status, '400 Bad Request') @@ -581,8 +663,8 @@ class TestProxyServer(unittest.TestCase): logger = FakeLogger() baseapp = proxy_server.Application({'swift_dir': swift_dir}, FakeMemcache(), logger, - FakeRing(), FakeRing(), - FakeRing()) + container_ring=FakeLogger(), + account_ring=FakeRing()) baseapp.handle_request( Request.blank('/info', environ={'HTTP_X_TRANS_ID_EXTRA': 'sardine', @@ -601,8 +683,8 @@ class TestProxyServer(unittest.TestCase): logger = FakeLogger() baseapp = proxy_server.Application({'swift_dir': swift_dir}, FakeMemcache(), logger, - FakeRing(), FakeRing(), - FakeRing()) + container_ring=FakeLogger(), + account_ring=FakeRing()) baseapp.handle_request( Request.blank('/info', environ={'HTTP_X_TRANS_ID_EXTRA': 'a' * 1000, @@ -618,9 +700,9 @@ class TestProxyServer(unittest.TestCase): baseapp = proxy_server.Application({'swift_dir': swift_dir, 'deny_host_headers': 'invalid_host.com'}, - FakeMemcache(), FakeLogger(), - FakeRing(), FakeRing(), - FakeRing()) + FakeMemcache(), + container_ring=FakeLogger(), + account_ring=FakeRing()) resp = baseapp.handle_request( Request.blank('/v1/a/c/o', environ={'HTTP_HOST': 'invalid_host.com'})) @@ -632,7 +714,6 @@ class TestProxyServer(unittest.TestCase): baseapp = proxy_server.Application({'sorting_method': 'timing'}, FakeMemcache(), container_ring=FakeRing(), - object_ring=FakeRing(), account_ring=FakeRing()) self.assertEquals(baseapp.node_timings, {}) @@ -661,7 +742,6 @@ class TestProxyServer(unittest.TestCase): 'read_affinity': 'r1=1'}, FakeMemcache(), container_ring=FakeRing(), - object_ring=FakeRing(), account_ring=FakeRing()) nodes = [{'region': 2, 'zone': 1, 'ip': '127.0.0.1'}, @@ -675,8 +755,7 @@ class TestProxyServer(unittest.TestCase): def test_info_defaults(self): app = proxy_server.Application({}, FakeMemcache(), account_ring=FakeRing(), - container_ring=FakeRing(), - object_ring=FakeRing()) + container_ring=FakeRing()) self.assertTrue(app.expose_info) self.assertTrue(isinstance(app.disallowed_sections, list)) @@ -687,8 +766,7 @@ class TestProxyServer(unittest.TestCase): path = '/info' app = proxy_server.Application({}, FakeMemcache(), account_ring=FakeRing(), - container_ring=FakeRing(), - object_ring=FakeRing()) + container_ring=FakeRing()) controller, path_parts = app.get_controller(path) @@ -701,19 +779,122 @@ class TestProxyServer(unittest.TestCase): self.assertEqual(controller.__name__, 'InfoController') +@patch_policies([ + StoragePolicy(0, 'zero', is_default=True), + StoragePolicy(1, 'one'), +]) +class TestProxyServerLoading(unittest.TestCase): + + def setUp(self): + self._orig_hash_suffix = utils.HASH_PATH_SUFFIX + utils.HASH_PATH_SUFFIX = 'endcap' + self.tempdir = mkdtemp() + + def tearDown(self): + rmtree(self.tempdir) + utils.HASH_PATH_SUFFIX = self._orig_hash_suffix + for policy in POLICIES: + policy.object_ring = None + + def test_load_policy_rings(self): + for policy in POLICIES: + self.assertFalse(policy.object_ring) + conf_path = os.path.join(self.tempdir, 'proxy-server.conf') + conf_body = """ + [DEFAULT] + swift_dir = %s + + [pipeline:main] + pipeline = catch_errors cache proxy-server + + [app:proxy-server] + use = egg:swift#proxy + + [filter:cache] + use = egg:swift#memcache + + [filter:catch_errors] + use = egg:swift#catch_errors + """ % self.tempdir + with open(conf_path, 'w') as f: + f.write(dedent(conf_body)) + account_ring_path = os.path.join(self.tempdir, 'account.ring.gz') + write_fake_ring(account_ring_path) + container_ring_path = os.path.join(self.tempdir, 'container.ring.gz') + write_fake_ring(container_ring_path) + for policy in POLICIES: + object_ring_path = os.path.join(self.tempdir, + policy.ring_name + '.ring.gz') + write_fake_ring(object_ring_path) + app = loadapp(conf_path) + # find the end of the pipeline + while hasattr(app, 'app'): + app = app.app + + # validate loaded rings + self.assertEqual(app.account_ring.serialized_path, + account_ring_path) + self.assertEqual(app.container_ring.serialized_path, + container_ring_path) + for policy in POLICIES: + self.assertEqual(policy.object_ring, + app.get_object_ring(int(policy))) + + def test_missing_rings(self): + conf_path = os.path.join(self.tempdir, 'proxy-server.conf') + conf_body = """ + [DEFAULT] + swift_dir = %s + + [pipeline:main] + pipeline = catch_errors cache proxy-server + + [app:proxy-server] + use = egg:swift#proxy + + [filter:cache] + use = egg:swift#memcache + + [filter:catch_errors] + use = egg:swift#catch_errors + """ % self.tempdir + with open(conf_path, 'w') as f: + f.write(dedent(conf_body)) + ring_paths = [ + os.path.join(self.tempdir, 'account.ring.gz'), + os.path.join(self.tempdir, 'container.ring.gz'), + ] + for policy in POLICIES: + self.assertFalse(policy.object_ring) + object_ring_path = os.path.join(self.tempdir, + policy.ring_name + '.ring.gz') + ring_paths.append(object_ring_path) + for policy in POLICIES: + self.assertFalse(policy.object_ring) + for ring_path in ring_paths: + self.assertFalse(os.path.exists(ring_path)) + self.assertRaises(IOError, loadapp, conf_path) + write_fake_ring(ring_path) + # all rings exist, app should load + loadapp(conf_path) + for policy in POLICIES: + self.assert_(policy.object_ring) + + +@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())]) class TestObjectController(unittest.TestCase): def setUp(self): self.app = proxy_server.Application(None, FakeMemcache(), logger=debug_logger('proxy-ut'), account_ring=FakeRing(), - container_ring=FakeRing(), - object_ring=FakeRing()) + container_ring=FakeRing()) def tearDown(self): self.app.account_ring.set_replicas(3) self.app.container_ring.set_replicas(3) - self.app.object_ring.set_replicas(3) + for policy in POLICIES: + policy.object_ring = FakeRing() def assert_status_map(self, method, statuses, expected, raise_exc=False): with save_globals(): @@ -740,6 +921,108 @@ class TestObjectController(unittest.TestCase): res = method(req) self.assertEquals(res.status_int, expected) + @unpatch_policies + def test_policy_IO(self): + if hasattr(_test_servers[-1], '_filesystem'): + # ironically, the _filesystem attribute on the object server means + # the in-memory diskfile is in use, so this test does not apply + return + + def check_file(policy_idx, cont, devs, check_val): + partition, nodes = prosrv.get_object_ring(policy_idx).get_nodes( + 'a', cont, 'o') + conf = {'devices': _testdir, 'mount_check': 'false'} + df_mgr = diskfile.DiskFileManager(conf, FakeLogger()) + for dev in devs: + file = df_mgr.get_diskfile(dev, partition, 'a', + cont, 'o', + policy_idx=policy_idx) + if check_val is True: + file.open() + + prolis = _test_sockets[0] + prosrv = _test_servers[0] + + # check policy 0: put file on c, read it back, check loc on disk + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + obj = 'test_object0' + path = '/v1/a/c/o' + fd.write('PUT %s HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Connection: close\r\n' + 'X-Storage-Token: t\r\n' + 'Content-Length: %s\r\n' + 'Content-Type: text/plain\r\n' + '\r\n%s' % (path, str(len(obj)), obj)) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEqual(headers[:len(exp)], exp) + req = Request.blank(path, + environ={'REQUEST_METHOD': 'GET'}, + headers={'Content-Type': + 'text/plain'}) + res = req.get_response(prosrv) + self.assertEqual(res.status_int, 200) + self.assertEqual(res.body, obj) + + check_file(0, 'c', ['sda1', 'sdb1'], True) + check_file(0, 'c', ['sdc1', 'sdd1', 'sde1', 'sdf1'], False) + + # check policy 1: put file on c1, read it back, check loc on disk + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + path = '/v1/a/c1/o' + obj = 'test_object1' + fd.write('PUT %s HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Connection: close\r\n' + 'X-Storage-Token: t\r\n' + 'Content-Length: %s\r\n' + 'Content-Type: text/plain\r\n' + '\r\n%s' % (path, str(len(obj)), obj)) + fd.flush() + headers = readuntil2crlfs(fd) + self.assertEqual(headers[:len(exp)], exp) + req = Request.blank(path, + environ={'REQUEST_METHOD': 'GET'}, + headers={'Content-Type': + 'text/plain'}) + res = req.get_response(prosrv) + self.assertEqual(res.status_int, 200) + self.assertEqual(res.body, obj) + + check_file(1, 'c1', ['sdc1', 'sdd1'], True) + check_file(1, 'c1', ['sda1', 'sdb1', 'sde1', 'sdf1'], False) + + # check policy 2: put file on c2, read it back, check loc on disk + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + path = '/v1/a/c2/o' + obj = 'test_object2' + fd.write('PUT %s HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Connection: close\r\n' + 'X-Storage-Token: t\r\n' + 'Content-Length: %s\r\n' + 'Content-Type: text/plain\r\n' + '\r\n%s' % (path, str(len(obj)), obj)) + fd.flush() + headers = readuntil2crlfs(fd) + self.assertEqual(headers[:len(exp)], exp) + req = Request.blank(path, + environ={'REQUEST_METHOD': 'GET'}, + headers={'Content-Type': + 'text/plain'}) + res = req.get_response(prosrv) + self.assertEqual(res.status_int, 200) + self.assertEqual(res.body, obj) + + check_file(2, 'c2', ['sde1', 'sdf1'], True) + check_file(2, 'c2', ['sda1', 'sdb1', 'sdc1', 'sdd1'], False) + + @unpatch_policies def test_GET_newest_large_file(self): prolis = _test_sockets[0] prosrv = _test_servers[0] @@ -830,7 +1113,8 @@ class TestObjectController(unittest.TestCase): def is_r0(node): return node['region'] == 0 - self.app.object_ring.max_more_nodes = 100 + object_ring = self.app.get_object_ring(None) + object_ring.max_more_nodes = 100 self.app.write_affinity_is_local_fn = is_r0 self.app.write_affinity_node_count = lambda r: 3 @@ -864,14 +1148,15 @@ class TestObjectController(unittest.TestCase): def is_r0(node): return node['region'] == 0 - self.app.object_ring.max_more_nodes = 100 + object_ring = self.app.get_object_ring(None) + object_ring.max_more_nodes = 100 self.app.write_affinity_is_local_fn = is_r0 self.app.write_affinity_node_count = lambda r: 3 controller = \ proxy_server.ObjectController(self.app, 'a', 'c', 'o.jpg') self.app.error_limit( - self.app.object_ring.get_part_nodes(1)[0], 'test') + object_ring.get_part_nodes(1)[0], 'test') set_http_connect(200, 200, # account, container 201, 201, 201, # 3 working backends give_connect=test_connect) @@ -890,6 +1175,7 @@ class TestObjectController(unittest.TestCase): self.assertEqual(0, written_to[1][1] % 2) self.assertNotEqual(0, written_to[2][1] % 2) + @unpatch_policies def test_PUT_message_length_using_content_length(self): prolis = _test_sockets[0] sock = connect_tcp(('localhost', prolis.getsockname()[1])) @@ -907,6 +1193,7 @@ class TestObjectController(unittest.TestCase): exp = 'HTTP/1.1 201' self.assertEqual(headers[:len(exp)], exp) + @unpatch_policies def test_PUT_message_length_using_transfer_encoding(self): prolis = _test_sockets[0] sock = connect_tcp(('localhost', prolis.getsockname()[1])) @@ -939,6 +1226,7 @@ class TestObjectController(unittest.TestCase): exp = 'HTTP/1.1 201' self.assertEqual(headers[:len(exp)], exp) + @unpatch_policies def test_PUT_message_length_using_both(self): prolis = _test_sockets[0] sock = connect_tcp(('localhost', prolis.getsockname()[1])) @@ -972,6 +1260,7 @@ class TestObjectController(unittest.TestCase): exp = 'HTTP/1.1 201' self.assertEqual(headers[:len(exp)], exp) + @unpatch_policies def test_PUT_bad_message_length(self): prolis = _test_sockets[0] sock = connect_tcp(('localhost', prolis.getsockname()[1])) @@ -1005,6 +1294,7 @@ class TestObjectController(unittest.TestCase): exp = 'HTTP/1.1 400' self.assertEqual(headers[:len(exp)], exp) + @unpatch_policies def test_PUT_message_length_unsup_xfr_encoding(self): prolis = _test_sockets[0] sock = connect_tcp(('localhost', prolis.getsockname()[1])) @@ -1038,6 +1328,7 @@ class TestObjectController(unittest.TestCase): exp = 'HTTP/1.1 501' self.assertEqual(headers[:len(exp)], exp) + @unpatch_policies def test_PUT_message_length_too_large(self): with mock.patch('swift.common.constraints.MAX_FILE_SIZE', 10): prolis = _test_sockets[0] @@ -1055,6 +1346,7 @@ class TestObjectController(unittest.TestCase): exp = 'HTTP/1.1 413' self.assertEqual(headers[:len(exp)], exp) + @unpatch_policies def test_PUT_last_modified(self): prolis = _test_sockets[0] sock = connect_tcp(('localhost', prolis.getsockname()[1])) @@ -1114,7 +1406,7 @@ class TestObjectController(unittest.TestCase): if 'x-if-delete-at' in headers or 'X-If-Delete-At' in headers: test_errors.append('X-If-Delete-At in headers') - body = simplejson.dumps( + body = json.dumps( [{"name": "001o/1", "hash": "x", "bytes": 0, @@ -1177,8 +1469,7 @@ class TestObjectController(unittest.TestCase): fp.write('foo/bar foo\n') proxy_server.Application({'swift_dir': swift_dir}, FakeMemcache(), FakeLogger(), - FakeRing(), FakeRing(), - FakeRing()) + FakeRing(), FakeRing()) self.assertEquals(proxy_server.mimetypes.guess_type('blah.foo')[0], 'foo/bar') self.assertEquals(proxy_server.mimetypes.guess_type('blah.jpg')[0], @@ -1615,8 +1906,9 @@ class TestObjectController(unittest.TestCase): for dev in self.app.container_ring.devs: dev['ip'] = '127.0.0.1' dev['port'] = 1 - self.app.object_ring.get_nodes('account') - for dev in self.app.object_ring.devs: + object_ring = self.app.get_object_ring(None) + object_ring.get_nodes('account') + for dev in object_ring.devs: dev['ip'] = '127.0.0.1' dev['port'] = 1 @@ -1664,8 +1956,9 @@ class TestObjectController(unittest.TestCase): for dev in self.app.container_ring.devs: dev['ip'] = '127.0.0.1' dev['port'] = 1 - self.app.object_ring.get_nodes('account') - for dev in self.app.object_ring.devs: + object_ring = self.app.get_object_ring(None) + object_ring.get_nodes('account') + for dev in object_ring.devs: dev['ip'] = '127.0.0.1' dev['port'] = 1 @@ -1698,8 +1991,9 @@ class TestObjectController(unittest.TestCase): for dev in self.app.container_ring.devs: dev['ip'] = '127.0.0.1' dev['port'] = 1 - self.app.object_ring.get_nodes('account') - for dev in self.app.object_ring.devs: + object_ring = self.app.get_object_ring(None) + object_ring.get_nodes('account') + for dev in object_ring.devs: dev['ip'] = '127.0.0.1' dev['port'] = 1 req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'}) @@ -1733,8 +2027,9 @@ class TestObjectController(unittest.TestCase): for dev in self.app.container_ring.devs: dev['ip'] = '127.0.0.1' dev['port'] = 1 - self.app.object_ring.get_nodes('account') - for dev in self.app.object_ring.devs: + object_ring = self.app.get_object_ring(None) + object_ring.get_nodes('account') + for dev in object_ring.devs: dev['ip'] = '127.0.0.1' dev['port'] = 1 req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'}) @@ -1801,8 +2096,9 @@ class TestObjectController(unittest.TestCase): for dev in self.app.container_ring.devs: dev['ip'] = '127.0.0.1' dev['port'] = 1 - self.app.object_ring.get_nodes('account') - for dev in self.app.object_ring.devs: + object_ring = self.app.get_object_ring(None) + object_ring.get_nodes('account') + for dev in object_ring.devs: dev['ip'] = '127.0.0.1' dev['port'] = 1 req = Request.blank('/v1/a/c/o', @@ -1829,42 +2125,42 @@ class TestObjectController(unittest.TestCase): baseapp = proxy_server.Application({'request_node_count': '3'}, FakeMemcache(), container_ring=FakeRing(), - account_ring=FakeRing(), - object_ring=FakeRing()) + account_ring=FakeRing()) self.assertEquals(baseapp.request_node_count(3), 3) def test_iter_nodes(self): with save_globals(): try: - self.app.object_ring.max_more_nodes = 2 - partition, nodes = self.app.object_ring.get_nodes('account', - 'container', - 'object') + object_ring = self.app.get_object_ring(None) + object_ring.max_more_nodes = 2 + partition, nodes = object_ring.get_nodes('account', + 'container', + 'object') collected_nodes = [] - for node in self.app.iter_nodes(self.app.object_ring, + for node in self.app.iter_nodes(object_ring, partition): collected_nodes.append(node) self.assertEquals(len(collected_nodes), 5) - self.app.object_ring.max_more_nodes = 20 + object_ring.max_more_nodes = 20 self.app.request_node_count = lambda r: 20 - partition, nodes = self.app.object_ring.get_nodes('account', - 'container', - 'object') + partition, nodes = object_ring.get_nodes('account', + 'container', + 'object') collected_nodes = [] - for node in self.app.iter_nodes(self.app.object_ring, + for node in self.app.iter_nodes(object_ring, partition): collected_nodes.append(node) self.assertEquals(len(collected_nodes), 9) self.app.log_handoffs = True self.app.logger = FakeLogger() - self.app.object_ring.max_more_nodes = 2 - partition, nodes = self.app.object_ring.get_nodes('account', - 'container', - 'object') + object_ring.max_more_nodes = 2 + partition, nodes = object_ring.get_nodes('account', + 'container', + 'object') collected_nodes = [] - for node in self.app.iter_nodes(self.app.object_ring, + for node in self.app.iter_nodes(object_ring, partition): collected_nodes.append(node) self.assertEquals(len(collected_nodes), 5) @@ -1875,45 +2171,48 @@ class TestObjectController(unittest.TestCase): self.app.log_handoffs = False self.app.logger = FakeLogger() - self.app.object_ring.max_more_nodes = 2 - partition, nodes = self.app.object_ring.get_nodes('account', - 'container', - 'object') + object_ring.max_more_nodes = 2 + partition, nodes = object_ring.get_nodes('account', + 'container', + 'object') collected_nodes = [] - for node in self.app.iter_nodes(self.app.object_ring, + for node in self.app.iter_nodes(object_ring, partition): collected_nodes.append(node) self.assertEquals(len(collected_nodes), 5) self.assertEquals(self.app.logger.log_dict['warning'], []) finally: - self.app.object_ring.max_more_nodes = 0 + object_ring.max_more_nodes = 0 def test_iter_nodes_calls_sort_nodes(self): with mock.patch.object(self.app, 'sort_nodes') as sort_nodes: - for node in self.app.iter_nodes(self.app.object_ring, 0): + object_ring = self.app.get_object_ring(None) + for node in self.app.iter_nodes(object_ring, 0): pass sort_nodes.assert_called_once_with( - self.app.object_ring.get_part_nodes(0)) + object_ring.get_part_nodes(0)) def test_iter_nodes_skips_error_limited(self): with mock.patch.object(self.app, 'sort_nodes', lambda n: n): - first_nodes = list(self.app.iter_nodes(self.app.object_ring, 0)) - second_nodes = list(self.app.iter_nodes(self.app.object_ring, 0)) + object_ring = self.app.get_object_ring(None) + first_nodes = list(self.app.iter_nodes(object_ring, 0)) + second_nodes = list(self.app.iter_nodes(object_ring, 0)) self.assertTrue(first_nodes[0] in second_nodes) self.app.error_limit(first_nodes[0], 'test') - second_nodes = list(self.app.iter_nodes(self.app.object_ring, 0)) + second_nodes = list(self.app.iter_nodes(object_ring, 0)) self.assertTrue(first_nodes[0] not in second_nodes) def test_iter_nodes_gives_extra_if_error_limited_inline(self): + object_ring = self.app.get_object_ring(None) with nested( mock.patch.object(self.app, 'sort_nodes', lambda n: n), mock.patch.object(self.app, 'request_node_count', lambda r: 6), - mock.patch.object(self.app.object_ring, 'max_more_nodes', 99)): - first_nodes = list(self.app.iter_nodes(self.app.object_ring, 0)) + mock.patch.object(object_ring, 'max_more_nodes', 99)): + first_nodes = list(self.app.iter_nodes(object_ring, 0)) second_nodes = [] - for node in self.app.iter_nodes(self.app.object_ring, 0): + for node in self.app.iter_nodes(object_ring, 0): if not second_nodes: self.app.error_limit(node, 'test') second_nodes.append(node) @@ -1921,12 +2220,13 @@ class TestObjectController(unittest.TestCase): self.assertEquals(len(second_nodes), 7) def test_iter_nodes_with_custom_node_iter(self): + object_ring = self.app.get_object_ring(None) node_list = [dict(id=n) for n in xrange(10)] with nested( mock.patch.object(self.app, 'sort_nodes', lambda n: n), mock.patch.object(self.app, 'request_node_count', lambda r: 3)): - got_nodes = list(self.app.iter_nodes(self.app.object_ring, 0, + got_nodes = list(self.app.iter_nodes(object_ring, 0, node_iter=iter(node_list))) self.assertEqual(node_list[:3], got_nodes) @@ -1934,7 +2234,7 @@ class TestObjectController(unittest.TestCase): mock.patch.object(self.app, 'sort_nodes', lambda n: n), mock.patch.object(self.app, 'request_node_count', lambda r: 1000000)): - got_nodes = list(self.app.iter_nodes(self.app.object_ring, 0, + got_nodes = list(self.app.iter_nodes(object_ring, 0, node_iter=iter(node_list))) self.assertEqual(node_list, got_nodes) @@ -1999,18 +2299,19 @@ class TestObjectController(unittest.TestCase): controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') controller.app.sort_nodes = lambda l: l + object_ring = controller.app.get_object_ring(None) self.assert_status_map(controller.HEAD, (200, 200, 503, 200, 200), 200) - self.assertEquals(controller.app.object_ring.devs[0]['errors'], 2) - self.assert_('last_error' in controller.app.object_ring.devs[0]) + self.assertEquals(object_ring.devs[0]['errors'], 2) + self.assert_('last_error' in object_ring.devs[0]) for _junk in xrange(self.app.error_suppression_limit): self.assert_status_map(controller.HEAD, (200, 200, 503, 503, 503), 503) - self.assertEquals(controller.app.object_ring.devs[0]['errors'], + self.assertEquals(object_ring.devs[0]['errors'], self.app.error_suppression_limit + 1) self.assert_status_map(controller.HEAD, (200, 200, 200, 200, 200), 503) - self.assert_('last_error' in controller.app.object_ring.devs[0]) + self.assert_('last_error' in object_ring.devs[0]) self.assert_status_map(controller.PUT, (200, 200, 200, 201, 201, 201), 503) self.assert_status_map(controller.POST, @@ -2669,6 +2970,7 @@ class TestObjectController(unittest.TestCase): res = controller.PUT(req) self.assertEquals(res.status_int, 413) + @unpatch_policies def test_chunked_put_bad_version(self): # Check bad version (prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis, @@ -2682,6 +2984,7 @@ class TestObjectController(unittest.TestCase): exp = 'HTTP/1.1 412' self.assertEquals(headers[:len(exp)], exp) + @unpatch_policies def test_chunked_put_bad_path(self): # Check bad path (prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis, @@ -2695,6 +2998,7 @@ class TestObjectController(unittest.TestCase): exp = 'HTTP/1.1 404' self.assertEquals(headers[:len(exp)], exp) + @unpatch_policies def test_chunked_put_bad_utf8(self): # Check invalid utf-8 (prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis, @@ -2709,6 +3013,7 @@ class TestObjectController(unittest.TestCase): exp = 'HTTP/1.1 412' self.assertEquals(headers[:len(exp)], exp) + @unpatch_policies def test_chunked_put_bad_path_no_controller(self): # Check bad path, no controller (prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis, @@ -2723,6 +3028,7 @@ class TestObjectController(unittest.TestCase): exp = 'HTTP/1.1 412' self.assertEquals(headers[:len(exp)], exp) + @unpatch_policies def test_chunked_put_bad_method(self): # Check bad method (prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis, @@ -2737,6 +3043,7 @@ class TestObjectController(unittest.TestCase): exp = 'HTTP/1.1 405' self.assertEquals(headers[:len(exp)], exp) + @unpatch_policies def test_chunked_put_unhandled_exception(self): # Check unhandled exception (prosrv, acc1srv, acc2srv, con1srv, con2srv, obj1srv, @@ -2760,6 +3067,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals(headers[:len(exp)], exp) prosrv.update_request = orig_update_request + @unpatch_policies def test_chunked_put_head_account(self): # Head account, just a double check and really is here to test # the part Application.log_request that 'enforces' a @@ -2777,6 +3085,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals(headers[:len(exp)], exp) self.assert_('\r\nContent-Length: 0\r\n' in headers) + @unpatch_policies def test_chunked_put_utf8_all_the_way_down(self): # Test UTF-8 Unicode all the way through the system ustr = '\xe1\xbc\xb8\xce\xbf\xe1\xbd\xba \xe1\xbc\xb0\xce' \ @@ -2820,7 +3129,7 @@ class TestObjectController(unittest.TestCase): headers = readuntil2crlfs(fd) exp = 'HTTP/1.1 200' self.assertEquals(headers[:len(exp)], exp) - listing = simplejson.loads(fd.read()) + listing = json.loads(fd.read()) self.assert_(ustr.decode('utf8') in [l['name'] for l in listing]) # List account with ustr container (test xml) sock = connect_tcp(('localhost', prolis.getsockname()[1])) @@ -2868,7 +3177,7 @@ class TestObjectController(unittest.TestCase): headers = readuntil2crlfs(fd) exp = 'HTTP/1.1 200' self.assertEquals(headers[:len(exp)], exp) - listing = simplejson.loads(fd.read()) + listing = json.loads(fd.read()) self.assertEquals(listing[0]['name'], ustr.decode('utf8')) # List ustr container with ustr object (test xml) sock = connect_tcp(('localhost', prolis.getsockname()[1])) @@ -2896,6 +3205,7 @@ class TestObjectController(unittest.TestCase): self.assert_('\r\nX-Object-Meta-%s: %s\r\n' % (quote(ustr_short).lower(), quote(ustr)) in headers) + @unpatch_policies def test_chunked_put_chunked_put(self): # Do chunked object put (prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis, @@ -2925,6 +3235,7 @@ class TestObjectController(unittest.TestCase): body = fd.read() self.assertEquals(body, 'oh hai123456789abcdef') + @unpatch_policies def test_version_manifest(self, oc='versions', vc='vers', o='name'): versions_to_create = 3 # Create a container for our versioned object testing @@ -3258,48 +3569,56 @@ class TestObjectController(unittest.TestCase): exp = 'HTTP/1.1 2' # 2xx response self.assertEquals(headers[:len(exp)], exp) + @unpatch_policies def test_version_manifest_utf8(self): oc = '0_oc_non_ascii\xc2\xa3' vc = '0_vc_non_ascii\xc2\xa3' o = '0_o_non_ascii\xc2\xa3' self.test_version_manifest(oc, vc, o) + @unpatch_policies def test_version_manifest_utf8_container(self): oc = '1_oc_non_ascii\xc2\xa3' vc = '1_vc_ascii' o = '1_o_ascii' self.test_version_manifest(oc, vc, o) + @unpatch_policies def test_version_manifest_utf8_version_container(self): oc = '2_oc_ascii' vc = '2_vc_non_ascii\xc2\xa3' o = '2_o_ascii' self.test_version_manifest(oc, vc, o) + @unpatch_policies def test_version_manifest_utf8_containers(self): oc = '3_oc_non_ascii\xc2\xa3' vc = '3_vc_non_ascii\xc2\xa3' o = '3_o_ascii' self.test_version_manifest(oc, vc, o) + @unpatch_policies def test_version_manifest_utf8_object(self): oc = '4_oc_ascii' vc = '4_vc_ascii' o = '4_o_non_ascii\xc2\xa3' self.test_version_manifest(oc, vc, o) + @unpatch_policies def test_version_manifest_utf8_version_container_utf_object(self): oc = '5_oc_ascii' vc = '5_vc_non_ascii\xc2\xa3' o = '5_o_non_ascii\xc2\xa3' self.test_version_manifest(oc, vc, o) + @unpatch_policies def test_version_manifest_utf8_container_utf_object(self): oc = '6_oc_non_ascii\xc2\xa3' vc = '6_vc_ascii' o = '6_o_non_ascii\xc2\xa3' self.test_version_manifest(oc, vc, o) + @unpatch_policies def test_conditional_range_get(self): (prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis, obj2lis) = \ _test_sockets @@ -3711,6 +4030,123 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 400) self.assertTrue('X-Delete-At in past' in resp.body) + @patch_policies([ + StoragePolicy(0, 'zero', False, object_ring=FakeRing()), + StoragePolicy(1, 'one', True, object_ring=FakeRing()) + ]) + def test_PUT_versioning_with_nonzero_default_policy(self): + + def test_connect(ipaddr, port, device, partition, method, path, + headers=None, query_string=None): + if method == "HEAD": + self.assertEquals(path, '/a/c/o.jpg') + self.assertNotEquals(None, headers[POLICY_INDEX]) + self.assertEquals(1, int(headers[POLICY_INDEX])) + + def fake_container_info(account, container, req): + return {'status': 200, 'sync_key': None, 'storage_policy': '1', + 'meta': {}, 'cors': {'allow_origin': None, + 'expose_headers': None, + 'max_age': None}, + 'sysmeta': {}, 'read_acl': None, 'object_count': None, + 'write_acl': None, 'versions': 'c-versions', + 'partition': 1, 'bytes': None, + 'nodes': [{'zone': 0, 'ip': '10.0.0.0', 'region': 0, + 'id': 0, 'device': 'sda', 'port': 1000}, + {'zone': 1, 'ip': '10.0.0.1', 'region': 1, + 'id': 1, 'device': 'sdb', 'port': 1001}, + {'zone': 2, 'ip': '10.0.0.2', 'region': 0, + 'id': 2, 'device': 'sdc', 'port': 1002}]} + with save_globals(): + controller = proxy_server.ObjectController(self.app, 'a', + 'c', 'o.jpg') + + controller.container_info = fake_container_info + set_http_connect(200, 200, 200, # head: for the last version + 200, 200, 200, # get: for the last version + 201, 201, 201, # put: move the current version + 201, 201, 201, # put: save the new version + give_connect=test_connect) + req = Request.blank('/v1/a/c/o.jpg', + environ={'REQUEST_METHOD': 'PUT'}, + headers={'Content-Length': '0'}) + self.app.update_request(req) + self.app.memcache.store = {} + res = controller.PUT(req) + self.assertEquals(201, res.status_int) + + @patch_policies([ + StoragePolicy(0, 'zero', False, object_ring=FakeRing()), + StoragePolicy(1, 'one', True, object_ring=FakeRing()) + ]) + def test_cross_policy_DELETE_versioning(self): + requests = [] + + def capture_requests(ipaddr, port, device, partition, method, path, + headers=None, query_string=None): + requests.append((method, path, headers)) + + def fake_container_info(app, env, account, container, **kwargs): + info = {'status': 200, 'sync_key': None, 'storage_policy': None, + 'meta': {}, 'cors': {'allow_origin': None, + 'expose_headers': None, + 'max_age': None}, + 'sysmeta': {}, 'read_acl': None, 'object_count': None, + 'write_acl': None, 'versions': None, + 'partition': 1, 'bytes': None, + 'nodes': [{'zone': 0, 'ip': '10.0.0.0', 'region': 0, + 'id': 0, 'device': 'sda', 'port': 1000}, + {'zone': 1, 'ip': '10.0.0.1', 'region': 1, + 'id': 1, 'device': 'sdb', 'port': 1001}, + {'zone': 2, 'ip': '10.0.0.2', 'region': 0, + 'id': 2, 'device': 'sdc', 'port': 1002}]} + if container == 'c': + info['storage_policy'] = '1' + info['versions'] = 'c-versions' + elif container == 'c-versions': + info['storage_policy'] = '0' + else: + self.fail('Unexpected call to get_info for %r' % container) + return info + container_listing = json.dumps([{'name': 'old_version'}]) + with save_globals(): + resp_status = ( + 200, 200, # listings for versions container + 200, 200, 200, # get: for the last version + 201, 201, 201, # put: move the last version + 200, 200, 200, # delete: for the last version + ) + body_iter = iter([container_listing] + [ + '' for x in range(len(resp_status) - 1)]) + set_http_connect(*resp_status, body_iter=body_iter, + give_connect=capture_requests) + req = Request.blank('/v1/a/c/current_version', method='DELETE') + self.app.update_request(req) + self.app.memcache.store = {} + with mock.patch('swift.proxy.controllers.base.get_info', + fake_container_info): + resp = self.app.handle_request(req) + self.assertEquals(200, resp.status_int) + expected = [('GET', '/a/c-versions')] * 2 + \ + [('GET', '/a/c-versions/old_version')] * 3 + \ + [('PUT', '/a/c/current_version')] * 3 + \ + [('DELETE', '/a/c-versions/old_version')] * 3 + self.assertEqual(expected, [(m, p) for m, p, h in requests]) + for method, path, headers in requests: + if 'current_version' in path: + expected_storage_policy = 1 + elif 'old_version' in path: + expected_storage_policy = 0 + else: + continue + storage_policy_index = int(headers[POLICY_INDEX]) + self.assertEqual( + expected_storage_policy, storage_policy_index, + 'Unexpected %s request for %s ' + 'with storage policy index %s' % ( + method, path, storage_policy_index)) + + @unpatch_policies def test_leak_1(self): _request_instances = weakref.WeakKeyDictionary() _orig_init = Request.__init__ @@ -4185,7 +4621,6 @@ class TestContainerController(unittest.TestCase): self.app = proxy_server.Application(None, FakeMemcache(), account_ring=FakeRing(), container_ring=FakeRing(), - object_ring=FakeRing(), logger=debug_logger()) def test_convert_policy_to_index(self): @@ -5293,13 +5728,13 @@ class TestContainerController(unittest.TestCase): self.assert_(got_exc) +@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())]) class TestAccountController(unittest.TestCase): def setUp(self): self.app = proxy_server.Application(None, FakeMemcache(), account_ring=FakeRing(), - container_ring=FakeRing(), - object_ring=FakeRing) + container_ring=FakeRing()) def assert_status_map(self, method, statuses, expected, env_expected=None): with save_globals(): @@ -5723,6 +6158,7 @@ class TestAccountController(unittest.TestCase): test_status_map((204, 500, 404), 400) +@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())]) class TestAccountControllerFakeGetResponse(unittest.TestCase): """ Test all the faked-out GET responses for accounts that don't exist. They @@ -5732,8 +6168,7 @@ class TestAccountControllerFakeGetResponse(unittest.TestCase): conf = {'account_autocreate': 'yes'} self.app = proxy_server.Application(conf, FakeMemcache(), account_ring=FakeRing(), - container_ring=FakeRing(), - object_ring=FakeRing) + container_ring=FakeRing()) self.app.memcache = FakeMemcacheReturnsNone() def test_GET_autocreate_accept_json(self): @@ -5824,7 +6259,7 @@ class TestAccountControllerFakeGetResponse(unittest.TestCase): app = proxy_server.Application( None, FakeMemcache(), account_ring=FakeRing(), - container_ring=FakeRing(), object_ring=FakeRing()) + container_ring=FakeRing()) with save_globals(): # Mock account server will provide privileged information (ACLs) @@ -5899,7 +6334,7 @@ class TestAccountControllerFakeGetResponse(unittest.TestCase): app = proxy_server.Application( None, FakeMemcache(), account_ring=FakeRing(), - container_ring=FakeRing(), object_ring=FakeRing()) + container_ring=FakeRing()) app.allow_account_management = True ext_header = 'x-account-access-control' @@ -5995,10 +6430,6 @@ class FakeObjectController(object): return -class Stub(object): - pass - - class TestProxyObjectPerformance(unittest.TestCase): def setUp(self): @@ -6058,10 +6489,11 @@ class TestProxyObjectPerformance(unittest.TestCase): print "Run %02d took %07.03f" % (i, end - start) -@patch_policies([StoragePolicy(0, 'migrated'), - StoragePolicy(1, 'ernie', True), - StoragePolicy(2, 'deprecated', is_deprecated=True), - StoragePolicy(3, 'bert')]) +@patch_policies([StoragePolicy(0, 'migrated', object_ring=FakeRing()), + StoragePolicy(1, 'ernie', True, object_ring=FakeRing()), + StoragePolicy(2, 'deprecated', is_deprecated=True, + object_ring=FakeRing()), + StoragePolicy(3, 'bert', object_ring=FakeRing())]) class TestSwiftInfo(unittest.TestCase): def setUp(self): utils._swift_info = {} @@ -6070,8 +6502,7 @@ class TestSwiftInfo(unittest.TestCase): def test_registered_defaults(self): proxy_server.Application({}, FakeMemcache(), account_ring=FakeRing(), - container_ring=FakeRing(), - object_ring=FakeRing) + container_ring=FakeRing()) si = utils.get_swift_info()['swift'] self.assertTrue('version' in si)