Add Storage Policy support to Object Updates

The object server will now send its storage policy index to the
container server synchronously and asynchronously (via async_pending).

Each storage policy gets its own async_pending directory under
/srv/node/$disk/objects-$N, so there's no need to change the on-disk
pickle format; the policy index comes from the async_pending's
filename. This avoids any hassle on upgrade. (Recall that policy 0's
objects live in /srv/node/$disk/objects, not objects-0.)  Per-policy
tempdir as well.

Also clean up a couple little things in the object updater. Now it
won't abort processing when it encounters a file (not directory) named
"async_pending-\d+", and it won't process updates in a directory that
does not correspond to a storage policy.

That is, if you have policies 1, 2, and 3, but there's a directory on
your disk named "async_pending-5", the updater will now skip over that
entirely. It won't even bother doing directory listings at all. This
is a good idea, believe it or not, because there's nothing good that
the container server can do with an update from some unknown storage
policy. It can't update the listing, it can't move the object if it's
misplaced... all it can do is ignore the request, so it's better to
just not send it in the first place. Plus, if this is due to a
misconfiguration on one storage node, then the updates will get
processed once the configuration is fixed.

There's also a drive by fix to update some backend http mocks for container
update tests that we're not fully exercising their their request fakes.
Because the object server container update code is resilient to to all manor
of failure from backend requests the general intent of the tests was
unaffected but this change cleans up some confusing logging in the debug
logger output.

The object-server will send X-Storage-Policy-Index headers with all
requests to container severs, including X-Delete containers and all
object PUT/DELETE requests.  This header value is persisted in the
pickle file for the update and sent along with async requests from the
object-updater as well.

The container server will extract the X-Storage-Policy-Index header from
incoming requests and apply it to container broker calls as appropriate
defaulting to the legacy storage policy 0 to support seemless migration.

DocImpact
Implements: blueprint storage-policies
Change-Id: I07c730bebaee068f75024fa9c2fa9e11e295d9bd

add to object updates

Change-Id: Ic97a422238a0d7bc2a411a71a7aba3f8b42fce4d
This commit is contained in:
Samuel Merritt 2014-03-17 17:54:42 -07:00 committed by Clay Gerrard
parent 3824ff3df7
commit d5ca365965
9 changed files with 646 additions and 207 deletions

View File

@ -250,22 +250,24 @@ class ContainerController(object):
content_type='text/plain')
if self.mount_check and not check_mount(self.root, drive):
return HTTPInsufficientStorage(drive=drive, request=req)
# policy index is only relevant for delete_obj (and transitively for
# auto create accounts)
obj_policy_index = self.get_and_validate_policy_index(req) or 0
broker = self._get_container_broker(drive, part, account, container)
if account.startswith(self.auto_create_account_prefix) and obj and \
not os.path.exists(broker.db_file):
requested_policy_index = (self.get_and_validate_policy_index(req)
or POLICIES.default.idx)
try:
broker.initialize(
normalize_timestamp(
req.headers.get('x-timestamp') or time.time()),
requested_policy_index)
obj_policy_index)
except DatabaseAlreadyExists:
pass
if not os.path.exists(broker.db_file):
return HTTPNotFound()
if obj: # delete object
broker.delete_object(obj, req.headers.get('x-timestamp'))
broker.delete_object(obj, req.headers.get('x-timestamp'),
obj_policy_index)
return HTTPNoContent(request=req)
else:
# delete container
@ -342,17 +344,22 @@ class ContainerController(object):
timestamp = normalize_timestamp(req.headers['x-timestamp'])
broker = self._get_container_broker(drive, part, account, container)
if obj: # put container object
# obj put expects the policy_index header, default is for
# legacy support during upgrade.
obj_policy_index = requested_policy_index or 0
if account.startswith(self.auto_create_account_prefix) and \
not os.path.exists(broker.db_file):
try:
broker.initialize(timestamp, 0)
broker.initialize(timestamp, obj_policy_index)
except DatabaseAlreadyExists:
pass
if not os.path.exists(broker.db_file):
return HTTPNotFound()
broker.put_object(obj, timestamp, int(req.headers['x-size']),
broker.put_object(obj, timestamp,
int(req.headers['x-size']),
req.headers['x-content-type'],
req.headers['x-etag'])
req.headers['x-etag'], 0,
obj_policy_index)
return HTTPCreated(request=req)
else: # put container
if requested_policy_index is None:
@ -420,7 +427,7 @@ class ContainerController(object):
:params record: object entry record
:returns: modified record
"""
(name, created, size, content_type, etag) = record
(name, created, size, content_type, etag) = record[:5]
if content_type is None:
return {'subdir': name}
response = {'bytes': size, 'hash': etag, 'name': name,
@ -466,8 +473,9 @@ class ContainerController(object):
resp_headers = gen_resp_headers(info, is_deleted=is_deleted)
if is_deleted:
return HTTPNotFound(request=req, headers=resp_headers)
container_list = broker.list_objects_iter(limit, marker, end_marker,
prefix, delimiter, path)
container_list = broker.list_objects_iter(
limit, marker, end_marker, prefix, delimiter, path,
storage_policy_index=info['storage_policy_index'])
return self.create_listing(req, out_content_type, info, resp_headers,
broker.metadata, container_list, container)

View File

@ -531,16 +531,16 @@ class DiskFileManager(object):
yield True
def pickle_async_update(self, device, account, container, obj, data,
timestamp):
timestamp, policy_idx):
device_path = self.construct_dev_path(device)
async_dir = os.path.join(device_path, ASYNCDIR_BASE)
async_dir = os.path.join(device_path, get_async_dir(policy_idx))
ohash = hash_path(account, container, obj)
self.threadpools[device].run_in_thread(
write_pickle,
data,
os.path.join(async_dir, ohash[-3:], ohash + '-' +
normalize_timestamp(timestamp)),
os.path.join(device_path, 'tmp'))
os.path.join(device_path, get_tmp_dir(policy_idx)))
self.logger.increment('async_pendings')
def get_diskfile(self, device, partition, account, container, obj,
@ -1005,7 +1005,7 @@ class DiskFile(object):
self._container = None
self._obj = None
self._datadir = None
self._tmpdir = join(device_path, 'tmp')
self._tmpdir = join(device_path, get_tmp_dir(policy_idx))
self._metadata = None
self._data_file = None
self._fp = None

View File

@ -54,7 +54,7 @@ class ObjectController(server.ObjectController):
return self._filesystem.get_diskfile(account, container, obj, **kwargs)
def async_update(self, op, account, container, obj, host, partition,
contdevice, headers_out, objdevice):
contdevice, headers_out, objdevice, policy_idx):
"""
Sends or saves an async update.
@ -68,6 +68,7 @@ class ObjectController(server.ObjectController):
:param headers_out: dictionary of headers to send in the container
request
:param objdevice: device name that the object is in
:param policy_idx: the associated storage policy index
"""
headers_out['user-agent'] = 'obj-server %s' % os.getpid()
full_path = '/%s/%s/%s' % (account, container, obj)

View File

@ -47,6 +47,7 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
HTTPInsufficientStorage, HTTPForbidden, HTTPException, HeaderKeyDict, \
HTTPConflict
from swift.obj.diskfile import DATAFILE_SYSTEM_META, DiskFileManager
from swift.common.storage_policy import POLICY_INDEX
class ObjectController(object):
@ -150,11 +151,10 @@ class ObjectController(object):
behavior.
"""
return self._diskfile_mgr.get_diskfile(
device, partition, account, container, obj,
policy_idx=policy_idx, **kwargs)
device, partition, account, container, obj, policy_idx, **kwargs)
def async_update(self, op, account, container, obj, host, partition,
contdevice, headers_out, objdevice):
contdevice, headers_out, objdevice, policy_index):
"""
Sends or saves an async update.
@ -168,6 +168,7 @@ class ObjectController(object):
:param headers_out: dictionary of headers to send in the container
request
:param objdevice: device name that the object is in
:param policy_index: the associated storage policy index
"""
headers_out['user-agent'] = 'obj-server %s' % os.getpid()
full_path = '/%s/%s/%s' % (account, container, obj)
@ -198,10 +199,11 @@ class ObjectController(object):
'obj': obj, 'headers': headers_out}
timestamp = headers_out['x-timestamp']
self._diskfile_mgr.pickle_async_update(objdevice, account, container,
obj, data, timestamp)
obj, data, timestamp,
policy_index)
def container_update(self, op, account, container, obj, request,
headers_out, objdevice):
headers_out, objdevice, policy_idx):
"""
Update the container when objects are updated.
@ -224,7 +226,7 @@ class ObjectController(object):
if len(conthosts) != len(contdevices):
# This shouldn't happen unless there's a bug in the proxy,
# but if there is, we want to know about it.
self.logger.error(_('ERROR Container update failed: different '
self.logger.error(_('ERROR Container update failed: different '
'numbers of hosts and devices in request: '
'"%s" vs "%s"') %
(headers_in.get('X-Container-Host', ''),
@ -238,13 +240,14 @@ class ObjectController(object):
headers_out['x-trans-id'] = headers_in.get('x-trans-id', '-')
headers_out['referer'] = request.as_referer()
headers_out[POLICY_INDEX] = policy_idx
for conthost, contdevice in updates:
self.async_update(op, account, container, obj, conthost,
contpartition, contdevice, headers_out,
objdevice)
objdevice, policy_idx)
def delete_at_update(self, op, delete_at, account, container, obj,
request, objdevice):
request, objdevice, policy_index):
"""
Update the expiring objects container when objects are updated.
@ -255,6 +258,7 @@ class ObjectController(object):
:param obj: object name
:param request: the original request driving the update
:param objdevice: device name that the object is in
:param policy_index: the policy index to be used for tmp dir
"""
if config_true_value(
request.headers.get('x-backend-replication', 'f')):
@ -266,6 +270,7 @@ class ObjectController(object):
hosts = contdevices = [None]
headers_in = request.headers
headers_out = HeaderKeyDict({
POLICY_INDEX: 0, # system accounts are always Policy-0
'x-timestamp': headers_in['x-timestamp'],
'x-trans-id': headers_in.get('x-trans-id', '-'),
'referer': request.as_referer()})
@ -311,7 +316,8 @@ class ObjectController(object):
self.async_update(
op, self.expiring_objects_account, delete_at_container,
'%s-%s/%s/%s' % (delete_at, account, container, obj),
host, partition, contdevice, headers_out, objdevice)
host, partition, contdevice, headers_out, objdevice,
policy_index)
@public
@timing_stats()
@ -351,10 +357,11 @@ class ObjectController(object):
if orig_delete_at != new_delete_at:
if new_delete_at:
self.delete_at_update('PUT', new_delete_at, account, container,
obj, request, device)
obj, request, device, policy_idx)
if orig_delete_at:
self.delete_at_update('DELETE', orig_delete_at, account,
container, obj, request, device)
container, obj, request, device,
policy_idx)
disk_file.write_metadata(metadata)
return HTTPAccepted(request=request)
@ -458,11 +465,11 @@ class ObjectController(object):
if new_delete_at:
self.delete_at_update(
'PUT', new_delete_at, account, container, obj, request,
device)
device, policy_idx)
if orig_delete_at:
self.delete_at_update(
'DELETE', orig_delete_at, account, container, obj,
request, device)
request, device, policy_idx)
self.container_update(
'PUT', account, container, obj, request,
HeaderKeyDict({
@ -470,7 +477,7 @@ class ObjectController(object):
'x-content-type': metadata['Content-Type'],
'x-timestamp': metadata['X-Timestamp'],
'x-etag': metadata['ETag']}),
device)
device, policy_idx)
return HTTPCreated(request=request, etag=etag)
@public
@ -608,14 +615,15 @@ class ObjectController(object):
body='X-If-Delete-At and X-Delete-At do not match')
if orig_delete_at:
self.delete_at_update('DELETE', orig_delete_at, account,
container, obj, request, device)
container, obj, request, device,
policy_idx)
req_timestamp = request.headers['X-Timestamp']
if orig_timestamp < req_timestamp:
disk_file.delete(req_timestamp)
self.container_update(
'DELETE', account, container, obj, request,
HeaderKeyDict({'x-timestamp': req_timestamp}),
device)
device, policy_idx)
return response_class(request=request)
@public

View File

@ -26,10 +26,11 @@ from eventlet import patcher, Timeout
from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ConnectionTimeout
from swift.common.ring import Ring
from swift.common.storage_policy import POLICY_INDEX
from swift.common.utils import get_logger, renamer, write_pickle, \
dump_recon_cache, config_true_value, ismount
from swift.common.daemon import Daemon
from swift.obj.diskfile import ASYNCDIR_BASE
from swift.obj.diskfile import get_tmp_dir, get_async_dir, ASYNCDIR_BASE
from swift.common.http import is_success, HTTP_NOT_FOUND, \
HTTP_INTERNAL_SERVER_ERROR
@ -137,45 +138,69 @@ class ObjectUpdater(Daemon):
:param device: path to device
"""
start_time = time.time()
async_pending = os.path.join(device, ASYNCDIR_BASE)
if not os.path.isdir(async_pending):
return
for prefix in os.listdir(async_pending):
prefix_path = os.path.join(async_pending, prefix)
if not os.path.isdir(prefix_path):
# loop through async pending dirs for all policies
for asyncdir in os.listdir(device):
# skip stuff like "accounts", "containers", etc.
if not (asyncdir == ASYNCDIR_BASE or
asyncdir.startswith(ASYNCDIR_BASE + '-')):
continue
last_obj_hash = None
for update in sorted(os.listdir(prefix_path), reverse=True):
update_path = os.path.join(prefix_path, update)
if not os.path.isfile(update_path):
continue
try:
obj_hash, timestamp = update.split('-')
except ValueError:
self.logger.increment('errors')
self.logger.error(
_('ERROR async pending file with unexpected name %s')
% (update_path))
continue
if obj_hash == last_obj_hash:
self.logger.increment("unlinks")
os.unlink(update_path)
else:
self.process_object_update(update_path, device)
last_obj_hash = obj_hash
time.sleep(self.slowdown)
try:
os.rmdir(prefix_path)
except OSError:
pass
self.logger.timing_since('timing', start_time)
def process_object_update(self, update_path, device):
# we only care about directories
async_pending = os.path.join(device, asyncdir)
if not os.path.isdir(async_pending):
continue
if asyncdir == ASYNCDIR_BASE:
policy_idx = 0
else:
_junk, policy_idx = asyncdir.split('-', 1)
try:
policy_idx = int(policy_idx)
get_async_dir(policy_idx)
except ValueError:
self.logger.warn(_('Directory %s does not map to a '
'valid policy') % asyncdir)
continue
for prefix in os.listdir(async_pending):
prefix_path = os.path.join(async_pending, prefix)
if not os.path.isdir(prefix_path):
continue
last_obj_hash = None
for update in sorted(os.listdir(prefix_path), reverse=True):
update_path = os.path.join(prefix_path, update)
if not os.path.isfile(update_path):
continue
try:
obj_hash, timestamp = update.split('-')
except ValueError:
self.logger.increment('errors')
self.logger.error(
_('ERROR async pending file with unexpected '
'name %s')
% (update_path))
continue
if obj_hash == last_obj_hash:
self.logger.increment("unlinks")
os.unlink(update_path)
else:
self.process_object_update(update_path, device,
policy_idx)
last_obj_hash = obj_hash
time.sleep(self.slowdown)
try:
os.rmdir(prefix_path)
except OSError:
pass
self.logger.timing_since('timing', start_time)
def process_object_update(self, update_path, device, policy_idx):
"""
Process the object information to be updated and update.
:param update_path: path to pickled object update file
:param device: path to device
:param policy_idx: storage policy index of object update
"""
try:
update = pickle.load(open(update_path, 'rb'))
@ -196,8 +221,10 @@ class ObjectUpdater(Daemon):
new_successes = False
for node in nodes:
if node['id'] not in successes:
headers = update['headers'].copy()
headers.setdefault(POLICY_INDEX, str(policy_idx))
status = self.object_update(node, part, update['op'], obj,
update['headers'])
headers)
if not is_success(status) and status != HTTP_NOT_FOUND:
success = False
else:
@ -217,7 +244,8 @@ class ObjectUpdater(Daemon):
{'obj': obj, 'path': update_path})
if new_successes:
update['successes'] = successes
write_pickle(update, update_path, os.path.join(device, 'tmp'))
write_pickle(update, update_path, os.path.join(
device, get_tmp_dir(policy_idx)))
def object_update(self, node, part, op, obj, headers):
"""

View File

@ -38,7 +38,8 @@ from swift.common import constraints
from swift.common.utils import (normalize_timestamp, mkdirs, public,
replication, lock_parent_directory)
from test.unit import fake_http_connect
from swift.common.storage_policy import POLICY_INDEX, POLICIES
from swift.common.storage_policy import (POLICY_INDEX, POLICIES,
StoragePolicy)
from swift.common.request_helpers import get_sys_meta_prefix
from test.unit import patch_policies
@ -73,6 +74,13 @@ class TestContainerController(unittest.TestCase):
def tearDown(self):
rmtree(os.path.dirname(self.testdir), ignore_errors=1)
def _update_object_put_headers(self, req):
"""
Override this method in test subclasses to test post upgrade
behavior.
"""
pass
def _check_put_container_storage_policy(self, req, policy_index):
resp = req.get_response(self.controller)
self.assertEqual(201, resp.status_int)
@ -179,7 +187,9 @@ class TestContainerController(unittest.TestCase):
'x-content-type': 'text/plain',
'x-etag': 'x',
})
obj_put_request.get_response(self.controller)
self._update_object_put_headers(obj_put_request)
obj_put_resp = obj_put_request.get_response(self.controller)
self.assertEqual(obj_put_resp.status_int // 100, 2)
# re-issue HEAD request
response = req.get_response(self.controller)
self.assertEqual(response.status_int // 100, 2)
@ -1296,6 +1306,7 @@ class TestContainerController(unittest.TestCase):
environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '0',
'HTTP_X_SIZE': 1, 'HTTP_X_CONTENT_TYPE': 'text/plain',
'HTTP_X_ETAG': 'x'})
self._update_object_put_headers(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 201)
req = Request.blank(
@ -1306,6 +1317,7 @@ class TestContainerController(unittest.TestCase):
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'DELETE'}, headers={'X-Timestamp': '4'})
self._update_object_put_headers(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
req = Request.blank(
@ -1469,6 +1481,7 @@ class TestContainerController(unittest.TestCase):
'HTTP_X_CONTENT_TYPE': 'text/plain',
'HTTP_X_ETAG': 'x',
'HTTP_X_SIZE': 0})
self._update_object_put_headers(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 201)
# test format
@ -1545,6 +1558,7 @@ class TestContainerController(unittest.TestCase):
'HTTP_X_CONTENT_TYPE': 'text/plain',
'HTTP_X_ETAG': 'x',
'HTTP_X_SIZE': 0})
self._update_object_put_headers(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 201)
plain_body = '0\n1\n2\n'
@ -1618,6 +1632,7 @@ class TestContainerController(unittest.TestCase):
'HTTP_X_CONTENT_TYPE': 'text/plain',
'HTTP_X_ETAG': 'x',
'HTTP_X_SIZE': 0})
self._update_object_put_headers(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 201)
# test format
@ -1657,6 +1672,7 @@ class TestContainerController(unittest.TestCase):
'HTTP_X_CONTENT_TYPE': 'text/plain',
'HTTP_X_ETAG': 'x',
'HTTP_X_SIZE': 0})
self._update_object_put_headers(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 201)
xml_body = '<?xml version="1.0" encoding="UTF-8"?>\n' \
@ -1736,6 +1752,7 @@ class TestContainerController(unittest.TestCase):
'HTTP_X_TIMESTAMP': '1',
'HTTP_X_CONTENT_TYPE': 'text/plain',
'HTTP_X_ETAG': 'x', 'HTTP_X_SIZE': 0})
self._update_object_put_headers(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 201)
# test limit with marker
@ -1758,6 +1775,7 @@ class TestContainerController(unittest.TestCase):
'REQUEST_METHOD': 'PUT',
'HTTP_X_TIMESTAMP': '1', 'HTTP_X_CONTENT_TYPE': ctype,
'HTTP_X_ETAG': 'x', 'HTTP_X_SIZE': 0})
self._update_object_put_headers(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 201)
req = Request.blank('/sda1/p/a/c?format=json',
@ -1799,6 +1817,7 @@ class TestContainerController(unittest.TestCase):
'HTTP_X_CONTENT_TYPE': 'text/plain',
'HTTP_X_ETAG': 'x',
'HTTP_X_SIZE': 0})
self._update_object_put_headers(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 201)
# test limit
@ -1822,6 +1841,7 @@ class TestContainerController(unittest.TestCase):
'HTTP_X_CONTENT_TYPE': 'text/plain',
'HTTP_X_ETAG': 'x',
'HTTP_X_SIZE': 0})
self._update_object_put_headers(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 201)
req = Request.blank(
@ -1848,6 +1868,7 @@ class TestContainerController(unittest.TestCase):
'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '1',
'HTTP_X_CONTENT_TYPE': 'text/plain', 'HTTP_X_ETAG': 'x',
'HTTP_X_SIZE': 0})
self._update_object_put_headers(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 201)
req = Request.blank(
@ -1872,6 +1893,7 @@ class TestContainerController(unittest.TestCase):
'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '1',
'HTTP_X_CONTENT_TYPE': 'text/plain', 'HTTP_X_ETAG': 'x',
'HTTP_X_SIZE': 0})
self._update_object_put_headers(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 201)
req = Request.blank(
@ -1896,6 +1918,7 @@ class TestContainerController(unittest.TestCase):
'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '1',
'HTTP_X_CONTENT_TYPE': 'text/plain', 'HTTP_X_ETAG': 'x',
'HTTP_X_SIZE': 0})
self._update_object_put_headers(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 201)
req = Request.blank(
@ -1926,6 +1949,7 @@ class TestContainerController(unittest.TestCase):
'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '1',
'HTTP_X_CONTENT_TYPE': 'text/plain', 'HTTP_X_ETAG': 'x',
'HTTP_X_SIZE': 0})
self._update_object_put_headers(req)
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 201)
req = Request.blank(
@ -2397,5 +2421,26 @@ class TestContainerController(unittest.TestCase):
'404 - "-" "-" "-" 2.0000 "-"',), {})])
@patch_policies([
StoragePolicy(0, 'legacy'),
StoragePolicy(1, 'one'),
StoragePolicy(2, 'two', True),
StoragePolicy(3, 'three'),
StoragePolicy(4, 'four'),
])
class TestNonLegacyDefaultStoragePolicy(TestContainerController):
"""
Test swift.container.server.ContainerController with a non-legacy default
Storage Policy.
"""
def _update_object_put_headers(self, req):
"""
Add policy index headers for containers created with default POLICY
- which in this TestCase is 1.
"""
req.headers[POLICY_INDEX] = str(POLICIES.default.idx)
if __name__ == '__main__':
unittest.main()

View File

@ -43,15 +43,12 @@ from swift.common.exceptions import DiskFileNotExist, DiskFileQuarantined, \
DiskFileDeviceUnavailable, DiskFileDeleted, DiskFileNotOpen, \
DiskFileError, ReplicationLockTimeout, PathNotDir, DiskFileCollision, \
DiskFileExpired, SwiftException, DiskFileNoSpace
from swift.common.storage_policy import StoragePolicy, POLICIES, \
get_policy_string
from swift.common.storage_policy import POLICIES, get_policy_string
from functools import partial
get_data_dir = partial(get_policy_string, diskfile.DATADIR_BASE)
get_tmp_dir = partial(get_policy_string, diskfile.TMP_BASE)
_mocked_policies = [StoragePolicy(0, 'zero', False),
StoragePolicy(1, 'one', True)]
def _create_test_ring(path):
@ -81,6 +78,7 @@ def _create_test_ring(path):
reload_time=intended_reload_time)
@patch_policies
class TestDiskFileModuleMethods(unittest.TestCase):
def setUp(self):
@ -115,7 +113,6 @@ class TestDiskFileModuleMethods(unittest.TestCase):
'0', 'a', 'c', 'o',
policy_idx)
@patch_policies(_mocked_policies)
def test_extract_policy_index(self):
# good path names
pn = 'objects/0/606/1984527ed7ef6247c78606/1401379842.14643.data'
@ -157,7 +154,6 @@ class TestDiskFileModuleMethods(unittest.TestCase):
bad_path = '/srv/node/sda1/obj1/1/abc/def/1234.data'
self.assertEqual(diskfile.extract_policy_index(bad_path), 0)
@patch_policies(_mocked_policies)
def test_quarantine_renamer(self):
for policy in POLICIES:
# we use this for convenience, not really about a diskfile layout
@ -185,7 +181,6 @@ class TestDiskFileModuleMethods(unittest.TestCase):
self.assertRaises(OSError, diskfile.hash_suffix,
os.path.join(self.testdir, "doesnotexist"), 101)
@patch_policies(_mocked_policies)
def test_get_data_dir(self):
self.assertEquals(diskfile.get_data_dir(0), diskfile.DATADIR_BASE)
self.assertEquals(diskfile.get_data_dir(1),
@ -194,7 +189,6 @@ class TestDiskFileModuleMethods(unittest.TestCase):
self.assertRaises(ValueError, diskfile.get_data_dir, 99)
@patch_policies(_mocked_policies)
def test_get_async_dir(self):
self.assertEquals(diskfile.get_async_dir(0),
diskfile.ASYNCDIR_BASE)
@ -204,7 +198,6 @@ class TestDiskFileModuleMethods(unittest.TestCase):
self.assertRaises(ValueError, diskfile.get_async_dir, 99)
@patch_policies(_mocked_policies)
def test_get_tmp_dir(self):
self.assertEquals(diskfile.get_tmp_dir(0),
diskfile.TMP_BASE)
@ -214,6 +207,26 @@ class TestDiskFileModuleMethods(unittest.TestCase):
self.assertRaises(ValueError, diskfile.get_tmp_dir, 99)
def test_pickle_async_update_tmp_dir(self):
for policy in POLICIES:
if int(policy) == 0:
tmp_part = 'tmp'
else:
tmp_part = 'tmp-%d' % policy
tmp_path = os.path.join(
self.devices, self.existing_device, tmp_part)
self.assertFalse(os.path.isdir(tmp_path))
pickle_args = (self.existing_device, 'a', 'c', 'o',
'data', 0.0, int(policy))
# async updates don't create their tmpdir on their own
self.assertRaises(OSError, self.df_mgr.pickle_async_update,
*pickle_args)
os.makedirs(tmp_path)
# now create a async update
self.df_mgr.pickle_async_update(*pickle_args)
# check tempdir
self.assertTrue(os.path.isdir(tmp_path))
def test_hash_suffix_hash_dir_is_file_quarantine(self):
df = self._create_diskfile()
mkdirs(os.path.dirname(df._datadir))
@ -647,6 +660,7 @@ class TestDiskFileModuleMethods(unittest.TestCase):
self.check_hash_cleanup_listdir(file_list, [file2])
@patch_policies
class TestObjectAuditLocationGenerator(unittest.TestCase):
def _make_file(self, path):
try:
@ -813,11 +827,11 @@ class TestDiskFileManager(unittest.TestCase):
with mock.patch('swift.obj.diskfile.write_pickle') as wp:
self.df_mgr.pickle_async_update(self.existing_device1,
'a', 'c', 'o',
dict(a=1, b=2), ts)
dict(a=1, b=2), ts, 0)
dp = self.df_mgr.construct_dev_path(self.existing_device1)
ohash = diskfile.hash_path('a', 'c', 'o')
wp.assert_called_with({'a': 1, 'b': 2},
os.path.join(dp, diskfile.ASYNCDIR_BASE,
os.path.join(dp, diskfile.get_async_dir(0),
ohash[-3:], ohash + '-' + ts),
os.path.join(dp, 'tmp'))
self.df_mgr.logger.increment.assert_called_with('async_pendings')
@ -895,6 +909,7 @@ class TestDiskFileManager(unittest.TestCase):
self.assertTrue(lock_exc is None)
@patch_policies
class TestDiskFile(unittest.TestCase):
"""Test swift.obj.diskfile.DiskFile"""
@ -904,7 +919,9 @@ class TestDiskFile(unittest.TestCase):
self.testdir = os.path.join(
self.tmpdir, 'tmp_test_obj_server_DiskFile')
self.existing_device = 'sda1'
mkdirs(os.path.join(self.testdir, self.existing_device, 'tmp'))
for policy in POLICIES:
mkdirs(os.path.join(self.testdir, self.existing_device,
get_tmp_dir(policy.idx)))
self._orig_tpool_exc = tpool.execute
tpool.execute = lambda f, *args, **kwargs: f(*args, **kwargs)
self.conf = dict(devices=self.testdir, mount_check='false',
@ -1162,11 +1179,13 @@ class TestDiskFile(unittest.TestCase):
self.assertEqual(quarantine_msgs, [])
def test_disk_file_mkstemp_creates_dir(self):
tmpdir = os.path.join(self.testdir, self.existing_device, 'tmp')
os.rmdir(tmpdir)
df = self._simple_get_diskfile()
with df.create():
self.assert_(os.path.exists(tmpdir))
for policy in POLICIES:
tmpdir = os.path.join(self.testdir, self.existing_device,
get_tmp_dir(policy.idx))
os.rmdir(tmpdir)
df = self._simple_get_diskfile(policy_idx=policy.idx)
with df.create():
self.assert_(os.path.exists(tmpdir))
def _get_open_disk_file(self, invalid_type=None, obj_name='o', fsize=1024,
csize=8, mark_deleted=False, prealloc=False,

View File

@ -23,17 +23,19 @@ import os
import mock
import unittest
import math
import random
from shutil import rmtree
from StringIO import StringIO
from time import gmtime, strftime, time, struct_time
from tempfile import mkdtemp
from hashlib import md5
import itertools
from eventlet import sleep, spawn, wsgi, listen, Timeout, tpool
from nose import SkipTest
from test.unit import FakeLogger, debug_logger
from test.unit import FakeLogger, debug_logger, mocked_http_conn
from test.unit import connect_tcp, readuntil2crlfs, patch_policies
from swift.obj import server as object_server
from swift.obj import diskfile
@ -50,6 +52,7 @@ def mock_time(*args, **kwargs):
return 5000.0
@patch_policies
class TestObjectController(unittest.TestCase):
"""Test swift.obj.server.ObjectController"""
@ -59,7 +62,6 @@ class TestObjectController(unittest.TestCase):
utils.HASH_PATH_PREFIX = 'startcap'
self.testdir = \
os.path.join(mkdtemp(), 'tmp_test_object_server_ObjectController')
mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
conf = {'devices': self.testdir, 'mount_check': 'false'}
self.object_controller = object_server.ObjectController(
conf, logger=debug_logger())
@ -74,6 +76,10 @@ class TestObjectController(unittest.TestCase):
rmtree(os.path.dirname(self.testdir))
tpool.execute = self._orig_tpool_exc
def _stage_tmp_dir(self, policy):
mkdirs(os.path.join(self.testdir, 'sda1',
diskfile.get_tmp_dir(int(policy))))
def check_all_api_methods(self, obj_name='o', alt_res=None):
path = '/sda1/p/a/c/%s' % obj_name
body = 'SPECIAL_STRING'
@ -2100,6 +2106,8 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.headers['content-encoding'], 'gzip')
def test_async_update_http_connect(self):
policy = random.choice(list(POLICIES))
self._stage_tmp_dir(policy)
given_args = []
def fake_http_connect(*args):
@ -2111,19 +2119,22 @@ class TestObjectController(unittest.TestCase):
object_server.http_connect = fake_http_connect
self.object_controller.async_update(
'PUT', 'a', 'c', 'o', '127.0.0.1:1234', 1, 'sdc1',
{'x-timestamp': '1', 'x-out': 'set'}, 'sda1')
{'x-timestamp': '1', 'x-out': 'set',
POLICY_INDEX: policy.idx}, 'sda1', policy.idx)
finally:
object_server.http_connect = orig_http_connect
self.assertEquals(
given_args,
['127.0.0.1', '1234', 'sdc1', 1, 'PUT', '/a/c/o', {
'x-timestamp': '1', 'x-out': 'set',
'user-agent': 'obj-server %s' % os.getpid()}])
'user-agent': 'obj-server %s' % os.getpid(),
POLICY_INDEX: policy.idx}])
@patch_policies([storage_policy.StoragePolicy(0, 'zero', True),
storage_policy.StoragePolicy(1, 'one'),
storage_policy.StoragePolicy(37, 'fantastico')])
def test_updating_multiple_delete_at_container_servers(self):
policy = random.choice(list(POLICIES))
self.object_controller.expiring_objects_account = 'exp'
self.object_controller.expiring_objects_container_divisor = 60
@ -2153,12 +2164,15 @@ class TestObjectController(unittest.TestCase):
dict((k, v) for k, v in captured_args.iteritems()
if v is not None))
return SuccessfulFakeConn()
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': '12345',
'Content-Type': 'application/burrito',
'Content-Length': '0',
POLICY_INDEX: policy.idx,
'X-Container-Partition': '20',
'X-Container-Host': '1.2.3.4:5',
'X-Container-Device': 'sdb1',
@ -2191,8 +2205,10 @@ class TestObjectController(unittest.TestCase):
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
'x-size': '0',
'x-timestamp': '12345',
POLICY_INDEX: '37',
'referer': 'PUT http://localhost/sda1/p/a/c/o',
'user-agent': 'obj-server %d' % os.getpid(),
POLICY_INDEX: policy.idx,
'x-trans-id': '-'})})
self.assertEquals(
http_connect_args[1],
@ -2210,6 +2226,7 @@ class TestObjectController(unittest.TestCase):
'x-timestamp': '12345',
'referer': 'PUT http://localhost/sda1/p/a/c/o',
'user-agent': 'obj-server %d' % os.getpid(),
POLICY_INDEX: 0, # system account storage policy is 0
'x-trans-id': '-'})})
self.assertEquals(
http_connect_args[2],
@ -2227,6 +2244,7 @@ class TestObjectController(unittest.TestCase):
'x-timestamp': '12345',
'referer': 'PUT http://localhost/sda1/p/a/c/o',
'user-agent': 'obj-server %d' % os.getpid(),
POLICY_INDEX: 0, # system account storage policy is 0
'x-trans-id': '-'})})
@patch_policies([storage_policy.StoragePolicy(0, 'zero', True),
@ -2259,13 +2277,15 @@ class TestObjectController(unittest.TestCase):
dict((k, v) for k, v in captured_args.iteritems()
if v is not None))
return SuccessfulFakeConn()
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': '12345',
'Content-Type': 'application/burrito',
'Content-Length': '0',
'X-Storage-Policy-Index': '26',
POLICY_INDEX: '26',
'X-Container-Partition': '20',
'X-Container-Host': '1.2.3.4:5, 6.7.8.9:10',
'X-Container-Device': 'sdb1, sdf1'})
@ -2291,6 +2311,7 @@ class TestObjectController(unittest.TestCase):
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
'x-size': '0',
'x-timestamp': '12345',
POLICY_INDEX: '26',
'referer': 'PUT http://localhost/sda1/p/a/c/o',
'user-agent': 'obj-server %d' % os.getpid(),
'x-trans-id': '-'})})
@ -2308,11 +2329,94 @@ class TestObjectController(unittest.TestCase):
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
'x-size': '0',
'x-timestamp': '12345',
POLICY_INDEX: '26',
'referer': 'PUT http://localhost/sda1/p/a/c/o',
'user-agent': 'obj-server %d' % os.getpid(),
'x-trans-id': '-'})})
def test_object_delete_at_aysnc_update(self):
policy = random.choice(list(POLICIES))
ts = (normalize_timestamp(t) for t in
itertools.count(int(time())))
container_updates = []
def capture_updates(ip, port, method, path, headers, *args, **kwargs):
container_updates.append((ip, port, method, path, headers))
put_timestamp = ts.next()
delete_at_timestamp = utils.normalize_delete_at_timestamp(ts.next())
delete_at_container = (
int(delete_at_timestamp) /
self.object_controller.expiring_objects_container_divisor *
self.object_controller.expiring_objects_container_divisor)
req = Request.blank('/sda1/p/a/c/o', method='PUT', body='', headers={
'Content-Type': 'text/plain',
'X-Timestamp': put_timestamp,
'X-Container-Host': '10.0.0.1:6001',
'X-Container-Device': 'sda1',
'X-Container-Partition': 'p',
'X-Delete-At': delete_at_timestamp,
'X-Delete-At-Container': delete_at_container,
'X-Delete-At-Partition': 'p',
'X-Delete-At-Host': '10.0.0.2:6002',
'X-Delete-At-Device': 'sda1',
POLICY_INDEX: int(policy),
})
with mocked_http_conn(
500, 500, give_connect=capture_updates) as fake_conn:
resp = req.get_response(self.object_controller)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
self.assertEqual(resp.status_int, 201)
self.assertEquals(2, len(container_updates))
delete_at_update, container_update = container_updates
# delete_at_update
ip, port, method, path, headers = delete_at_update
self.assertEqual(ip, '10.0.0.2')
self.assertEqual(port, '6002')
self.assertEqual(method, 'PUT')
self.assertEqual(path, '/sda1/p/.expiring_objects/%s/%s-a/c/o' %
(delete_at_container, delete_at_timestamp))
expected = {
'X-Timestamp': put_timestamp,
POLICY_INDEX: 0, # system account is always 0
}
for key, value in expected.items():
self.assertEqual(headers[key], str(value))
# container_update
ip, port, method, path, headers = container_update
self.assertEqual(ip, '10.0.0.1')
self.assertEqual(port, '6001')
self.assertEqual(method, 'PUT')
self.assertEqual(path, '/sda1/p/a/c/o')
expected = {
'X-Timestamp': put_timestamp,
POLICY_INDEX: int(policy),
}
for key, value in expected.items():
self.assertEqual(headers[key], str(value))
# check async pendings
async_dir = os.path.join(self.testdir, 'sda1',
diskfile.get_async_dir(policy.idx))
found_files = []
for root, dirs, files in os.walk(async_dir):
for f in files:
async_file = os.path.join(root, f)
found_files.append(async_file)
data = pickle.load(open(async_file))
if data['account'] == 'a':
self.assertEquals(
int(data['headers'][POLICY_INDEX]), policy.idx)
elif data['account'] == '.expiring_objects':
self.assertEquals(
int(data['headers'][POLICY_INDEX]), 0)
else:
self.fail('unexpected async pending data')
self.assertEqual(2, len(found_files))
def test_async_update_saves_on_exception(self):
policy = random.choice(list(POLICIES))
self._stage_tmp_dir(policy)
_prefix = utils.HASH_PATH_PREFIX
utils.HASH_PATH_PREFIX = ''
@ -2324,19 +2428,24 @@ class TestObjectController(unittest.TestCase):
object_server.http_connect = fake_http_connect
self.object_controller.async_update(
'PUT', 'a', 'c', 'o', '127.0.0.1:1234', 1, 'sdc1',
{'x-timestamp': '1', 'x-out': 'set'}, 'sda1')
{'x-timestamp': '1', 'x-out': 'set',
POLICY_INDEX: policy.idx}, 'sda1', policy.idx)
finally:
object_server.http_connect = orig_http_connect
utils.HASH_PATH_PREFIX = _prefix
async_dir = diskfile.get_async_dir(policy.idx)
self.assertEquals(
pickle.load(open(os.path.join(
self.testdir, 'sda1', 'async_pending', 'a83',
self.testdir, 'sda1', async_dir, 'a83',
'06fbf0b514e5199dfc4e00f42eb5ea83-0000000001.00000'))),
{'headers': {'x-timestamp': '1', 'x-out': 'set',
'user-agent': 'obj-server %s' % os.getpid()},
'user-agent': 'obj-server %s' % os.getpid(),
POLICY_INDEX: policy.idx},
'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT'})
def test_async_update_saves_on_non_2xx(self):
policy = random.choice(list(POLICIES))
self._stage_tmp_dir(policy)
_prefix = utils.HASH_PATH_PREFIX
utils.HASH_PATH_PREFIX = ''
@ -2361,13 +2470,16 @@ class TestObjectController(unittest.TestCase):
object_server.http_connect = fake_http_connect(status)
self.object_controller.async_update(
'PUT', 'a', 'c', 'o', '127.0.0.1:1234', 1, 'sdc1',
{'x-timestamp': '1', 'x-out': str(status)}, 'sda1')
{'x-timestamp': '1', 'x-out': str(status),
POLICY_INDEX: policy.idx}, 'sda1', policy.idx)
async_dir = diskfile.get_async_dir(policy.idx)
self.assertEquals(
pickle.load(open(os.path.join(
self.testdir, 'sda1', 'async_pending', 'a83',
self.testdir, 'sda1', async_dir, 'a83',
'06fbf0b514e5199dfc4e00f42eb5ea83-0000000001.00000'))),
{'headers': {'x-timestamp': '1', 'x-out': str(status),
'user-agent': 'obj-server %s' % os.getpid()},
'user-agent': 'obj-server %s' % os.getpid(),
POLICY_INDEX: policy.idx},
'account': 'a', 'container': 'c', 'obj': 'o',
'op': 'PUT'})
finally:
@ -2399,7 +2511,7 @@ class TestObjectController(unittest.TestCase):
object_server.http_connect = fake_http_connect(status)
self.object_controller.async_update(
'PUT', 'a', 'c', 'o', '127.0.0.1:1234', 1, 'sdc1',
{'x-timestamp': '1', 'x-out': str(status)}, 'sda1')
{'x-timestamp': '1', 'x-out': str(status)}, 'sda1', 0)
self.assertFalse(
os.path.exists(os.path.join(
self.testdir, 'sda1', 'async_pending', 'a83',
@ -2409,6 +2521,8 @@ class TestObjectController(unittest.TestCase):
utils.HASH_PATH_PREFIX = _prefix
def test_async_update_saves_on_timeout(self):
policy = random.choice(list(POLICIES))
self._stage_tmp_dir(policy)
_prefix = utils.HASH_PATH_PREFIX
utils.HASH_PATH_PREFIX = ''
@ -2428,10 +2542,12 @@ class TestObjectController(unittest.TestCase):
self.object_controller.node_timeout = 0.001
self.object_controller.async_update(
'PUT', 'a', 'c', 'o', '127.0.0.1:1234', 1, 'sdc1',
{'x-timestamp': '1', 'x-out': str(status)}, 'sda1')
{'x-timestamp': '1', 'x-out': str(status)}, 'sda1',
policy.idx)
async_dir = diskfile.get_async_dir(int(policy))
self.assertTrue(
os.path.exists(os.path.join(
self.testdir, 'sda1', 'async_pending', 'a83',
self.testdir, 'sda1', async_dir, 'a83',
'06fbf0b514e5199dfc4e00f42eb5ea83-0000000001.00000')))
finally:
object_server.http_connect = orig_http_connect
@ -2453,39 +2569,88 @@ class TestObjectController(unittest.TestCase):
'PUT', 'a', 'c', 'o', req, {
'x-size': '0', 'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
'x-content-type': 'text/plain', 'x-timestamp': '1'},
'sda1')
'sda1', 0)
self.assertEquals(given_args, [])
def test_container_update(self):
given_args = []
def test_container_update_success(self):
container_updates = []
def fake_async_update(*args):
given_args.extend(args)
def capture_updates(ip, port, method, path, headers, *args, **kwargs):
container_updates.append((ip, port, method, path, headers))
self.object_controller.async_update = fake_async_update
req = Request.blank(
'/v1/a/c/o',
'/sda1/0/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': 1,
'X-Trans-Id': '123',
'X-Container-Host': 'chost',
'X-Container-Host': 'chost:cport',
'X-Container-Partition': 'cpartition',
'X-Container-Device': 'cdevice'})
self.object_controller.container_update(
'PUT', 'a', 'c', 'o', req, {
'x-size': '0', 'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
'x-content-type': 'text/plain', 'x-timestamp': '1'},
'sda1')
self.assertEquals(
given_args, [
'PUT', 'a', 'c', 'o', 'chost', 'cpartition', 'cdevice', {
'x-size': '0',
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
'x-content-type': 'text/plain',
'x-timestamp': '1',
'x-trans-id': '123',
'referer': 'PUT http://localhost/v1/a/c/o'},
'sda1'])
'X-Container-Device': 'cdevice',
'Content-Type': 'text/plain'}, body='')
with mocked_http_conn(
200, give_connect=capture_updates) as fake_conn:
resp = req.get_response(self.object_controller)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
self.assertEqual(resp.status_int, 201)
self.assertEqual(len(container_updates), 1)
ip, port, method, path, headers = container_updates[0]
self.assertEqual(ip, 'chost')
self.assertEqual(port, 'cport')
self.assertEqual(method, 'PUT')
self.assertEqual(path, '/cdevice/cpartition/a/c/o')
self.assertEqual(headers, HeaderKeyDict({
'user-agent': 'obj-server %s' % os.getpid(),
'x-size': '0',
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
'x-content-type': 'text/plain',
'x-timestamp': '1',
POLICY_INDEX: '0', # default when not given
'x-trans-id': '123',
'referer': 'PUT http://localhost/sda1/0/a/c/o'}))
def test_container_update_async(self):
req = Request.blank(
'/sda1/0/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': 1,
'X-Trans-Id': '123',
'X-Container-Host': 'chost:cport',
'X-Container-Partition': 'cpartition',
'X-Container-Device': 'cdevice',
'Content-Type': 'text/plain'}, body='')
given_args = []
def fake_pickle_async_update(*args):
given_args[:] = args
self.object_controller._diskfile_mgr.pickle_async_update = \
fake_pickle_async_update
with mocked_http_conn(500) as fake_conn:
resp = req.get_response(self.object_controller)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
self.assertEqual(resp.status_int, 201)
self.assertEqual(len(given_args), 7)
(objdevice, account, container, obj, data, timestamp,
policy_index) = given_args
self.assertEqual(objdevice, 'sda1')
self.assertEqual(account, 'a')
self.assertEqual(container, 'c')
self.assertEqual(obj, 'o')
self.assertEqual(timestamp, '1')
self.assertEqual(policy_index, 0)
self.assertEqual(data, {
'headers': HeaderKeyDict({
'X-Size': '0',
'User-Agent': 'obj-server %s' % os.getpid(),
'X-Content-Type': 'text/plain',
'X-Timestamp': '1',
'X-Trans-Id': '123',
'Referer': 'PUT http://localhost/sda1/0/a/c/o',
'X-Backend-Storage-Policy-Index': '0',
'X-Etag': 'd41d8cd98f00b204e9800998ecf8427e'}),
'obj': 'o',
'account': 'a',
'container': 'c',
'op': 'PUT'})
def test_container_update_bad_args(self):
given_args = []
@ -2493,7 +2658,6 @@ class TestObjectController(unittest.TestCase):
def fake_async_update(*args):
given_args.extend(args)
self.object_controller.async_update = fake_async_update
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
@ -2502,12 +2666,22 @@ class TestObjectController(unittest.TestCase):
'X-Container-Host': 'chost,badhost',
'X-Container-Partition': 'cpartition',
'X-Container-Device': 'cdevice'})
self.object_controller.container_update(
'PUT', 'a', 'c', 'o', req, {
'x-size': '0', 'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
'x-content-type': 'text/plain', 'x-timestamp': '1'},
'sda1')
with mock.patch.object(self.object_controller, 'async_update',
fake_async_update):
self.object_controller.container_update(
'PUT', 'a', 'c', 'o', req, {
'x-size': '0',
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
'x-content-type': 'text/plain', 'x-timestamp': '1'},
'sda1', 0)
self.assertEqual(given_args, [])
errors = self.object_controller.logger.get_lines_for_level('error')
self.assertEqual(len(errors), 1)
msg = errors[0]
self.assertTrue('Container update failed' in msg)
self.assertTrue('different numbers of hosts and devices' in msg)
self.assertTrue('chost,badhost' in msg)
self.assertTrue('cdevice' in msg)
def test_delete_at_update_on_put(self):
# Test how delete_at_update works when issued a delete for old
@ -2517,23 +2691,25 @@ class TestObjectController(unittest.TestCase):
def fake_async_update(*args):
given_args.extend(args)
self.object_controller.async_update = fake_async_update
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': 1,
'X-Trans-Id': '123'})
self.object_controller.delete_at_update(
'DELETE', 2, 'a', 'c', 'o', req, 'sda1')
with mock.patch.object(self.object_controller, 'async_update',
fake_async_update):
self.object_controller.delete_at_update(
'DELETE', 2, 'a', 'c', 'o', req, 'sda1', 0)
self.assertEquals(
given_args, [
'DELETE', '.expiring_objects', '0000000000',
'0000000002-a/c/o', None, None, None,
HeaderKeyDict({
POLICY_INDEX: 0,
'x-timestamp': '1',
'x-trans-id': '123',
'referer': 'PUT http://localhost/v1/a/c/o'}),
'sda1'])
'sda1', 0])
def test_delete_at_negative(self):
# Test how delete_at_update works when issued a delete for old
@ -2551,15 +2727,16 @@ class TestObjectController(unittest.TestCase):
headers={'X-Timestamp': 1,
'X-Trans-Id': '1234'})
self.object_controller.delete_at_update(
'DELETE', -2, 'a', 'c', 'o', req, 'sda1')
'DELETE', -2, 'a', 'c', 'o', req, 'sda1', 0)
self.assertEquals(given_args, [
'DELETE', '.expiring_objects', '0000000000', '0000000000-a/c/o',
None, None, None,
HeaderKeyDict({
POLICY_INDEX: 0,
'x-timestamp': '1',
'x-trans-id': '1234',
'referer': 'PUT http://localhost/v1/a/c/o'}),
'sda1'])
'sda1', 0])
def test_delete_at_cap(self):
# Test how delete_at_update works when issued a delete for old
@ -2577,15 +2754,16 @@ class TestObjectController(unittest.TestCase):
headers={'X-Timestamp': 1,
'X-Trans-Id': '1234'})
self.object_controller.delete_at_update(
'DELETE', 12345678901, 'a', 'c', 'o', req, 'sda1')
'DELETE', 12345678901, 'a', 'c', 'o', req, 'sda1', 0)
self.assertEquals(given_args, [
'DELETE', '.expiring_objects', '9999936000', '9999999999-a/c/o',
None, None, None,
HeaderKeyDict({
POLICY_INDEX: 0,
'x-timestamp': '1',
'x-trans-id': '1234',
'referer': 'PUT http://localhost/v1/a/c/o'}),
'sda1'])
'sda1', 0])
def test_delete_at_update_put_with_info(self):
# Keep next test,
@ -2607,19 +2785,20 @@ class TestObjectController(unittest.TestCase):
'X-Delete-At-Partition': '3',
'X-Delete-At-Device': 'sdc1'})
self.object_controller.delete_at_update('PUT', 2, 'a', 'c', 'o',
req, 'sda1')
req, 'sda1', 0)
self.assertEquals(
given_args, [
'PUT', '.expiring_objects', '0000000000', '0000000002-a/c/o',
'127.0.0.1:1234',
'3', 'sdc1', HeaderKeyDict({
POLICY_INDEX: 0,
'x-size': '0',
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
'x-content-type': 'text/plain',
'x-timestamp': '1',
'x-trans-id': '1234',
'referer': 'PUT http://localhost/v1/a/c/o'}),
'sda1'])
'sda1', 0])
def test_delete_at_update_put_with_info_but_missing_container(self):
# Same as previous test, test_delete_at_update_put_with_info, but just
@ -2640,7 +2819,7 @@ class TestObjectController(unittest.TestCase):
'X-Delete-At-Partition': '3',
'X-Delete-At-Device': 'sdc1'})
self.object_controller.delete_at_update('PUT', 2, 'a', 'c', 'o',
req, 'sda1')
req, 'sda1', 0)
self.assertEquals(
self.object_controller.logger.log_dict['warning'],
[(('X-Delete-At-Container header must be specified for expiring '
@ -2660,15 +2839,16 @@ class TestObjectController(unittest.TestCase):
headers={'X-Timestamp': 1,
'X-Trans-Id': '1234'})
self.object_controller.delete_at_update('DELETE', 2, 'a', 'c', 'o',
req, 'sda1')
req, 'sda1', 0)
self.assertEquals(
given_args, [
'DELETE', '.expiring_objects', '0000000000',
'0000000002-a/c/o', None, None,
None, HeaderKeyDict({
POLICY_INDEX: 0,
'x-timestamp': '1', 'x-trans-id': '1234',
'referer': 'DELETE http://localhost/v1/a/c/o'}),
'sda1'])
'sda1', 0])
def test_delete_backend_replication(self):
# If X-Backend-Replication: True delete_at_update should completely
@ -2686,7 +2866,7 @@ class TestObjectController(unittest.TestCase):
'X-Trans-Id': '1234',
'X-Backend-Replication': 'True'})
self.object_controller.delete_at_update(
'DELETE', -2, 'a', 'c', 'o', req, 'sda1')
'DELETE', -2, 'a', 'c', 'o', req, 'sda1', 0)
self.assertEquals(given_args, [])
def test_POST_calls_delete_at(self):
@ -2731,7 +2911,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(
given_args, [
'PUT', int(delete_at_timestamp1), 'a', 'c', 'o',
given_args[5], 'sda1'])
given_args[5], 'sda1', 0])
while given_args:
given_args.pop()
@ -2750,9 +2930,9 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(
given_args, [
'PUT', int(delete_at_timestamp2), 'a', 'c', 'o',
given_args[5], 'sda1',
given_args[5], 'sda1', 0,
'DELETE', int(delete_at_timestamp1), 'a', 'c', 'o',
given_args[5], 'sda1'])
given_args[5], 'sda1', 0])
def test_PUT_calls_delete_at(self):
given_args = []
@ -2787,7 +2967,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(
given_args, [
'PUT', int(delete_at_timestamp1), 'a', 'c', 'o',
given_args[5], 'sda1'])
given_args[5], 'sda1', 0])
while given_args:
given_args.pop()
@ -2808,9 +2988,9 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(
given_args, [
'PUT', int(delete_at_timestamp2), 'a', 'c', 'o',
given_args[5], 'sda1',
given_args[5], 'sda1', 0,
'DELETE', int(delete_at_timestamp1), 'a', 'c', 'o',
given_args[5], 'sda1'])
given_args[5], 'sda1', 0])
def test_GET_but_expired(self):
test_time = time() + 10000
@ -3197,7 +3377,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 201)
self.assertEquals(given_args, [
'PUT', int(delete_at_timestamp1), 'a', 'c', 'o',
given_args[5], 'sda1'])
given_args[5], 'sda1', 0])
while given_args:
given_args.pop()
@ -3213,7 +3393,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 204)
self.assertEquals(given_args, [
'DELETE', int(delete_at_timestamp1), 'a', 'c', 'o',
given_args[5], 'sda1'])
given_args[5], 'sda1', 0])
def test_PUT_delete_at_in_past(self):
req = Request.blank(

View File

@ -17,6 +17,8 @@ import cPickle as pickle
import mock
import os
import unittest
import random
import itertools
from contextlib import closing
from gzip import GzipFile
from tempfile import mkdtemp
@ -27,14 +29,22 @@ from distutils.dir_util import mkpath
from eventlet import spawn, Timeout, listen
from swift.obj import updater as object_updater
from swift.obj.diskfile import ASYNCDIR_BASE
from swift.obj.diskfile import (ASYNCDIR_BASE, get_async_dir, DiskFileManager,
get_tmp_dir)
from swift.common.ring import RingData
from swift.common import utils
from swift.common.utils import hash_path, normalize_timestamp, mkdirs, \
write_pickle
from test.unit import FakeLogger
from swift.common import swob
from test.unit import debug_logger, patch_policies, mocked_http_conn
from swift.common.storage_policy import StoragePolicy, POLICIES, POLICY_INDEX
_mocked_policies = [StoragePolicy(0, 'zero', False),
StoragePolicy(1, 'one', True)]
@patch_policies(_mocked_policies)
class TestObjectUpdater(unittest.TestCase):
def setUp(self):
@ -58,7 +68,9 @@ class TestObjectUpdater(unittest.TestCase):
os.mkdir(self.devices_dir)
self.sda1 = os.path.join(self.devices_dir, 'sda1')
os.mkdir(self.sda1)
os.mkdir(os.path.join(self.sda1, 'tmp'))
for policy in POLICIES:
os.mkdir(os.path.join(self.sda1, get_tmp_dir(int(policy))))
self.logger = debug_logger()
def tearDown(self):
rmtree(self.testdir, ignore_errors=1)
@ -80,48 +92,84 @@ class TestObjectUpdater(unittest.TestCase):
self.assert_(cu.get_container_ring() is not None)
def test_object_sweep(self):
prefix_dir = os.path.join(self.sda1, ASYNCDIR_BASE, 'abc')
mkpath(prefix_dir)
def check_with_idx(index, warn, should_skip):
if int(index) > 0:
asyncdir = os.path.join(self.sda1,
ASYNCDIR_BASE + "-" + index)
else:
asyncdir = os.path.join(self.sda1, ASYNCDIR_BASE)
# A non-directory where directory is expected should just be skipped...
not_a_dir_path = os.path.join(self.sda1, ASYNCDIR_BASE, 'not_a_dir')
with open(not_a_dir_path, 'w'):
pass
prefix_dir = os.path.join(asyncdir, 'abc')
mkpath(prefix_dir)
objects = {
'a': [1089.3, 18.37, 12.83, 1.3],
'b': [49.4, 49.3, 49.2, 49.1],
'c': [109984.123],
}
# A non-directory where directory is expected should just be
# skipped, but should not stop processing of subsequent
# directories.
not_dirs = (
os.path.join(self.sda1, 'not_a_dir'),
os.path.join(self.sda1,
ASYNCDIR_BASE + '-' + 'twentington'),
os.path.join(self.sda1,
ASYNCDIR_BASE + '-' + str(int(index) + 100)))
expected = set()
for o, timestamps in objects.iteritems():
ohash = hash_path('account', 'container', o)
for t in timestamps:
o_path = os.path.join(prefix_dir, ohash + '-' +
normalize_timestamp(t))
if t == timestamps[0]:
expected.add(o_path)
write_pickle({}, o_path)
for not_dir in not_dirs:
with open(not_dir, 'w'):
pass
seen = set()
objects = {
'a': [1089.3, 18.37, 12.83, 1.3],
'b': [49.4, 49.3, 49.2, 49.1],
'c': [109984.123],
}
class MockObjectUpdater(object_updater.ObjectUpdater):
def process_object_update(self, update_path, device):
seen.add(update_path)
os.unlink(update_path)
expected = set()
for o, timestamps in objects.iteritems():
ohash = hash_path('account', 'container', o)
for t in timestamps:
o_path = os.path.join(prefix_dir, ohash + '-' +
normalize_timestamp(t))
if t == timestamps[0]:
expected.add((o_path, int(index)))
write_pickle({}, o_path)
cu = MockObjectUpdater({
'devices': self.devices_dir,
'mount_check': 'false',
'swift_dir': self.testdir,
'interval': '1',
'concurrency': '1',
'node_timeout': '5'})
cu.object_sweep(self.sda1)
self.assert_(not os.path.exists(prefix_dir))
self.assert_(os.path.exists(not_a_dir_path))
self.assertEqual(expected, seen)
seen = set()
class MockObjectUpdater(object_updater.ObjectUpdater):
def process_object_update(self, update_path, device, idx):
seen.add((update_path, idx))
os.unlink(update_path)
cu = MockObjectUpdater({
'devices': self.devices_dir,
'mount_check': 'false',
'swift_dir': self.testdir,
'interval': '1',
'concurrency': '1',
'node_timeout': '5'})
cu.logger = mock_logger = mock.MagicMock()
cu.object_sweep(self.sda1)
self.assertEquals(mock_logger.warn.call_count, warn)
self.assert_(os.path.exists(os.path.join(self.sda1, 'not_a_dir')))
if should_skip:
# if we were supposed to skip over the dir, we didn't process
# anything at all
self.assertTrue(os.path.exists(prefix_dir))
self.assertEqual(set(), seen)
else:
self.assert_(not os.path.exists(prefix_dir))
self.assertEqual(expected, seen)
# test cleanup: the tempdir gets cleaned up between runs, but this
# way we can be called multiple times in a single test method
for not_dir in not_dirs:
os.unlink(not_dir)
# first check with valid policies
for pol in POLICIES:
check_with_idx(str(pol.idx), 0, should_skip=False)
# now check with a bogus async dir policy and make sure we get
# a warning indicating that the '99' policy isn't valid
check_with_idx('99', 1, should_skip=True)
@mock.patch.object(object_updater, 'ismount')
def test_run_once_with_disk_unmounted(self, mock_ismount):
@ -134,7 +182,7 @@ class TestObjectUpdater(unittest.TestCase):
'concurrency': '1',
'node_timeout': '15'})
cu.run_once()
async_dir = os.path.join(self.sda1, ASYNCDIR_BASE)
async_dir = os.path.join(self.sda1, get_async_dir(0))
os.mkdir(async_dir)
cu.run_once()
self.assert_(os.path.exists(async_dir))
@ -147,13 +195,13 @@ class TestObjectUpdater(unittest.TestCase):
'swift_dir': self.testdir,
'interval': '1',
'concurrency': '1',
'node_timeout': '15'})
odd_dir = os.path.join(async_dir, 'not really supposed to be here')
'node_timeout': '15'}, logger=self.logger)
odd_dir = os.path.join(async_dir, 'not really supposed '
'to be here')
os.mkdir(odd_dir)
cu.logger = FakeLogger()
cu.run_once()
self.assert_(os.path.exists(async_dir))
self.assert_(os.path.exists(odd_dir)) # skipped because not mounted!
self.assert_(os.path.exists(odd_dir)) # skipped - not mounted!
# mount_check == True means ismount was checked
self.assertEqual([
mock.call(self.sda1),
@ -169,9 +217,9 @@ class TestObjectUpdater(unittest.TestCase):
'swift_dir': self.testdir,
'interval': '1',
'concurrency': '1',
'node_timeout': '15'})
'node_timeout': '15'}, logger=self.logger)
cu.run_once()
async_dir = os.path.join(self.sda1, ASYNCDIR_BASE)
async_dir = os.path.join(self.sda1, get_async_dir(0))
os.mkdir(async_dir)
cu.run_once()
self.assert_(os.path.exists(async_dir))
@ -184,8 +232,9 @@ class TestObjectUpdater(unittest.TestCase):
'swift_dir': self.testdir,
'interval': '1',
'concurrency': '1',
'node_timeout': '15'})
odd_dir = os.path.join(async_dir, 'not really supposed to be here')
'node_timeout': '15'}, logger=self.logger)
odd_dir = os.path.join(async_dir, 'not really supposed '
'to be here')
os.mkdir(odd_dir)
cu.run_once()
self.assert_(os.path.exists(async_dir))
@ -206,12 +255,12 @@ class TestObjectUpdater(unittest.TestCase):
'%s-%s' % (ohash, normalize_timestamp(time())))
for path in (op_path, older_op_path):
with open(path, 'wb') as async_pending:
pickle.dump({'op': 'PUT', 'account': 'a', 'container': 'c',
pickle.dump({'op': 'PUT', 'account': 'a',
'container': 'c',
'obj': 'o', 'headers': {
'X-Container-Timestamp':
normalize_timestamp(0)}},
async_pending)
cu.logger = FakeLogger()
cu.run_once()
self.assert_(not os.path.exists(older_op_path))
self.assert_(os.path.exists(op_path))
@ -232,13 +281,14 @@ class TestObjectUpdater(unittest.TestCase):
out.flush()
self.assertEquals(inc.readline(),
'PUT /sda1/0/a/c/o HTTP/1.1\r\n')
headers = {}
headers = swob.HeaderKeyDict()
line = inc.readline()
while line and line != '\r\n':
headers[line.split(':')[0].lower()] = \
headers[line.split(':')[0]] = \
line.split(':')[1].strip()
line = inc.readline()
self.assert_('x-container-timestamp' in headers)
self.assertTrue('x-container-timestamp' in headers)
self.assertTrue(POLICY_INDEX in headers)
except BaseException as err:
return err
return None
@ -265,7 +315,7 @@ class TestObjectUpdater(unittest.TestCase):
if dev is not None:
dev['port'] = bindsock.getsockname()[1]
cu.logger = FakeLogger()
cu.logger._clear()
cu.run_once()
err = event.wait()
if err:
@ -277,7 +327,7 @@ class TestObjectUpdater(unittest.TestCase):
pickle.load(open(op_path)).get('successes'))
event = spawn(accept, [404, 500])
cu.logger = FakeLogger()
cu.logger._clear()
cu.run_once()
err = event.wait()
if err:
@ -289,7 +339,7 @@ class TestObjectUpdater(unittest.TestCase):
pickle.load(open(op_path)).get('successes'))
event = spawn(accept, [201])
cu.logger = FakeLogger()
cu.logger._clear()
cu.run_once()
err = event.wait()
if err:
@ -298,6 +348,106 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual(cu.logger.get_increment_counts(),
{'unlinks': 1, 'successes': 1})
def test_obj_put_legacy_updates(self):
ts = (normalize_timestamp(t) for t in
itertools.count(int(time())))
policy = POLICIES.get_by_index(0)
# setup updater
conf = {
'devices': self.devices_dir,
'mount_check': 'false',
'swift_dir': self.testdir,
}
async_dir = os.path.join(self.sda1, get_async_dir(policy.idx))
os.mkdir(async_dir)
account, container, obj = 'a', 'c', 'o'
# write an async
for op in ('PUT', 'DELETE'):
self.logger._clear()
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
dfmanager = DiskFileManager(conf, daemon.logger)
# don't include storage-policy-index in headers_out pickle
headers_out = swob.HeaderKeyDict({
'x-size': 0,
'x-content-type': 'text/plain',
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
'x-timestamp': ts.next(),
})
data = {'op': op, 'account': account, 'container': container,
'obj': obj, 'headers': headers_out}
dfmanager.pickle_async_update(self.sda1, account, container, obj,
data, ts.next(), policy.idx)
request_log = []
def capture(*args, **kwargs):
request_log.append((args, kwargs))
# run once
fake_status_codes = [200, 200, 200]
with mocked_http_conn(*fake_status_codes, give_connect=capture):
daemon.run_once()
self.assertEqual(len(fake_status_codes), len(request_log))
for request_args, request_kwargs in request_log:
ip, part, method, path, headers, qs, ssl = request_args
self.assertEqual(method, op)
self.assertEqual(headers[POLICY_INDEX], str(policy.idx))
self.assertEqual(daemon.logger.get_increment_counts(),
{'successes': 1, 'unlinks': 1,
'async_pendings': 1})
def test_obj_put_async_updates(self):
ts = (normalize_timestamp(t) for t in
itertools.count(int(time())))
policy = random.choice(list(POLICIES))
# setup updater
conf = {
'devices': self.devices_dir,
'mount_check': 'false',
'swift_dir': self.testdir,
}
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
async_dir = os.path.join(self.sda1, get_async_dir(policy.idx))
os.mkdir(async_dir)
# write an async
dfmanager = DiskFileManager(conf, daemon.logger)
account, container, obj = 'a', 'c', 'o'
op = 'PUT'
headers_out = swob.HeaderKeyDict({
'x-size': 0,
'x-content-type': 'text/plain',
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
'x-timestamp': ts.next(),
POLICY_INDEX: policy.idx,
})
data = {'op': op, 'account': account, 'container': container,
'obj': obj, 'headers': headers_out}
dfmanager.pickle_async_update(self.sda1, account, container, obj,
data, ts.next(), policy.idx)
request_log = []
def capture(*args, **kwargs):
request_log.append((args, kwargs))
# run once
fake_status_codes = [
200, # object update success
200, # object update success
200, # object update conflict
]
with mocked_http_conn(*fake_status_codes, give_connect=capture):
daemon.run_once()
self.assertEqual(len(fake_status_codes), len(request_log))
for request_args, request_kwargs in request_log:
ip, part, method, path, headers, qs, ssl = request_args
self.assertEqual(method, 'PUT')
self.assertEqual(headers[POLICY_INDEX], str(policy.idx))
self.assertEqual(daemon.logger.get_increment_counts(),
{'successes': 1, 'unlinks': 1, 'async_pendings': 1})
if __name__ == '__main__':
unittest.main()