Add Storage Policy Support to ssync

This patch makes ssync policy aware so that clusters using storage
policies and ssync replication will replicate objects in all policies.

DocImpact
Implements: blueprint storage-policies
Change-Id: I64879077676d764c6330e03734fc6665bb26f552
This commit is contained in:
Paul Luse 2014-03-18 11:06:52 -07:00 committed by Clay Gerrard
parent 04f2970362
commit b9707d497c
6 changed files with 274 additions and 56 deletions

View File

@ -575,7 +575,8 @@ class DiskFileManager(object):
self, audit_location.path, dev_path,
audit_location.partition)
def get_diskfile_from_hash(self, device, partition, object_hash, **kwargs):
def get_diskfile_from_hash(self, device, partition, object_hash,
policy_idx, **kwargs):
"""
Returns a DiskFile instance for an object at the given
object_hash. Just in case someone thinks of refactoring, be
@ -589,7 +590,8 @@ class DiskFileManager(object):
if not dev_path:
raise DiskFileDeviceUnavailable()
object_path = os.path.join(
dev_path, DATADIR_BASE, partition, object_hash[-3:], object_hash)
dev_path, get_data_dir(policy_idx), partition, object_hash[-3:],
object_hash)
try:
filenames = hash_cleanup_listdir(object_path, self.reclaim_age)
except OSError as err:
@ -615,7 +617,8 @@ class DiskFileManager(object):
except ValueError:
raise DiskFileNotExist()
return DiskFile(self, dev_path, self.threadpools[device],
partition, account, container, obj, **kwargs)
partition, account, container, obj,
policy_idx=policy_idx, **kwargs)
def get_hashes(self, device, partition, suffix, policy_idx):
dev_path = self.get_dev_path(device)
@ -640,7 +643,7 @@ class DiskFileManager(object):
path, err)
return []
def yield_suffixes(self, device, partition):
def yield_suffixes(self, device, partition, policy_idx):
"""
Yields tuples of (full_path, suffix_only) for suffixes stored
on the given device and partition.
@ -648,7 +651,8 @@ class DiskFileManager(object):
dev_path = self.get_dev_path(device)
if not dev_path:
raise DiskFileDeviceUnavailable()
partition_path = os.path.join(dev_path, DATADIR_BASE, partition)
partition_path = os.path.join(dev_path, get_data_dir(policy_idx),
partition)
for suffix in self._listdir(partition_path):
if len(suffix) != 3:
continue
@ -658,7 +662,7 @@ class DiskFileManager(object):
continue
yield (os.path.join(partition_path, suffix), suffix)
def yield_hashes(self, device, partition, suffixes=None):
def yield_hashes(self, device, partition, policy_idx, suffixes=None):
"""
Yields tuples of (full_path, hash_only, timestamp) for object
information stored for the given device, partition, and
@ -671,9 +675,10 @@ class DiskFileManager(object):
if not dev_path:
raise DiskFileDeviceUnavailable()
if suffixes is None:
suffixes = self.yield_suffixes(device, partition)
suffixes = self.yield_suffixes(device, partition, policy_idx)
else:
partition_path = os.path.join(dev_path, DATADIR_BASE, partition)
partition_path = os.path.join(dev_path, get_data_dir(policy_idx),
partition)
suffixes = (
(os.path.join(partition_path, suffix), suffix)
for suffix in suffixes)

View File

@ -25,6 +25,8 @@ from swift.common import http
from swift.common import swob
from swift.common import utils
from swift.common.storage_policy import POLICY_INDEX
class Receiver(object):
"""
@ -168,6 +170,7 @@ class Receiver(object):
self.request.environ['eventlet.minimum_write_chunk_size'] = 0
self.device, self.partition = utils.split_path(
urllib.unquote(self.request.path), 2, 2, False)
self.policy_idx = int(self.request.headers.get(POLICY_INDEX, 0))
utils.validate_device_partition(self.device, self.partition)
if self.app._diskfile_mgr.mount_check and \
not constraints.check_mount(
@ -228,7 +231,7 @@ class Receiver(object):
want = False
try:
df = self.app._diskfile_mgr.get_diskfile_from_hash(
self.device, self.partition, object_hash)
self.device, self.partition, object_hash, self.policy_idx)
except exceptions.DiskFileNotExist:
want = True
else:
@ -351,6 +354,7 @@ class Receiver(object):
subreq_iter())
else:
raise Exception('Invalid subrequest method %s' % method)
subreq.headers[POLICY_INDEX] = self.policy_idx
subreq.headers['X-Backend-Replication'] = 'True'
if replication_headers:
subreq.headers['X-Backend-Replication-Headers'] = \

View File

@ -18,6 +18,8 @@ from swift.common import bufferedhttp
from swift.common import exceptions
from swift.common import http
from swift.common.storage_policy import POLICY_INDEX
class Sender(object):
"""
@ -40,6 +42,10 @@ class Sender(object):
self.send_list = None
self.failures = 0
@property
def policy_idx(self):
return int(self.job.get('policy_idx', 0))
def __call__(self):
if not self.suffixes:
return True
@ -94,6 +100,7 @@ class Sender(object):
self.connection.putrequest('REPLICATION', '/%s/%s' % (
self.node['device'], self.job['partition']))
self.connection.putheader('Transfer-Encoding', 'chunked')
self.connection.putheader(POLICY_INDEX, self.policy_idx)
self.connection.endheaders()
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'connect receive'):
@ -163,7 +170,8 @@ class Sender(object):
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
for path, object_hash, timestamp in \
self.daemon._diskfile_mgr.yield_hashes(
self.job['device'], self.job['partition'], self.suffixes):
self.job['device'], self.job['partition'],
self.policy_idx, self.suffixes):
with exceptions.MessageTimeout(
self.daemon.node_timeout,
'missing_check send line'):
@ -217,7 +225,8 @@ class Sender(object):
for object_hash in self.send_list:
try:
df = self.daemon._diskfile_mgr.get_diskfile_from_hash(
self.job['device'], self.job['partition'], object_hash)
self.job['device'], self.job['partition'], object_hash,
self.policy_idx)
except exceptions.DiskFileNotExist:
continue
url_path = urllib.quote(

View File

@ -1794,7 +1794,7 @@ class TestDiskFile(unittest.TestCase):
self.assertRaises(
DiskFileDeviceUnavailable,
self.df_mgr.get_diskfile_from_hash,
'dev', '9', '9a7175077c01a23ade5956b8a2bba900')
'dev', '9', '9a7175077c01a23ade5956b8a2bba900', 0)
def test_get_diskfile_from_hash_not_dir(self):
self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
@ -1811,7 +1811,7 @@ class TestDiskFile(unittest.TestCase):
self.assertRaises(
DiskFileNotExist,
self.df_mgr.get_diskfile_from_hash,
'dev', '9', '9a7175077c01a23ade5956b8a2bba900')
'dev', '9', '9a7175077c01a23ade5956b8a2bba900', 0)
quarantine_renamer.assert_called_once_with(
'/srv/dev/',
'/srv/dev/objects/9/900/9a7175077c01a23ade5956b8a2bba900')
@ -1830,7 +1830,7 @@ class TestDiskFile(unittest.TestCase):
self.assertRaises(
DiskFileNotExist,
self.df_mgr.get_diskfile_from_hash,
'dev', '9', '9a7175077c01a23ade5956b8a2bba900')
'dev', '9', '9a7175077c01a23ade5956b8a2bba900', 0)
def test_get_diskfile_from_hash_other_oserror(self):
self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
@ -1845,7 +1845,7 @@ class TestDiskFile(unittest.TestCase):
self.assertRaises(
OSError,
self.df_mgr.get_diskfile_from_hash,
'dev', '9', '9a7175077c01a23ade5956b8a2bba900')
'dev', '9', '9a7175077c01a23ade5956b8a2bba900', 0)
def test_get_diskfile_from_hash_no_actual_files(self):
self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
@ -1859,7 +1859,7 @@ class TestDiskFile(unittest.TestCase):
self.assertRaises(
DiskFileNotExist,
self.df_mgr.get_diskfile_from_hash,
'dev', '9', '9a7175077c01a23ade5956b8a2bba900')
'dev', '9', '9a7175077c01a23ade5956b8a2bba900', 0)
def test_get_diskfile_from_hash_read_metadata_problem(self):
self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
@ -1873,7 +1873,7 @@ class TestDiskFile(unittest.TestCase):
self.assertRaises(
DiskFileNotExist,
self.df_mgr.get_diskfile_from_hash,
'dev', '9', '9a7175077c01a23ade5956b8a2bba900')
'dev', '9', '9a7175077c01a23ade5956b8a2bba900', 0)
def test_get_diskfile_from_hash_no_meta_name(self):
self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
@ -1886,7 +1886,7 @@ class TestDiskFile(unittest.TestCase):
readmeta.return_value = {}
try:
self.df_mgr.get_diskfile_from_hash(
'dev', '9', '9a7175077c01a23ade5956b8a2bba900')
'dev', '9', '9a7175077c01a23ade5956b8a2bba900', 0)
except DiskFileNotExist as err:
exc = err
self.assertEqual(str(exc), '')
@ -1902,7 +1902,7 @@ class TestDiskFile(unittest.TestCase):
readmeta.return_value = {'name': 'bad'}
try:
self.df_mgr.get_diskfile_from_hash(
'dev', '9', '9a7175077c01a23ade5956b8a2bba900')
'dev', '9', '9a7175077c01a23ade5956b8a2bba900', 0)
except DiskFileNotExist as err:
exc = err
self.assertEqual(str(exc), '')
@ -1917,10 +1917,10 @@ class TestDiskFile(unittest.TestCase):
hclistdir.return_value = ['1381679759.90941.data']
readmeta.return_value = {'name': '/a/c/o'}
self.df_mgr.get_diskfile_from_hash(
'dev', '9', '9a7175077c01a23ade5956b8a2bba900')
'dev', '9', '9a7175077c01a23ade5956b8a2bba900', 0)
dfclass.assert_called_once_with(
self.df_mgr, '/srv/dev/', self.df_mgr.threadpools['dev'], '9',
'a', 'c', 'o')
'a', 'c', 'o', policy_idx=0)
hclistdir.assert_called_once_with(
'/srv/dev/objects/9/900/9a7175077c01a23ade5956b8a2bba900',
604800)
@ -1955,7 +1955,7 @@ class TestDiskFile(unittest.TestCase):
self.df_mgr.get_dev_path = mock.MagicMock(return_value=None)
exc = None
try:
list(self.df_mgr.yield_suffixes('dev', '9'))
list(self.df_mgr.yield_suffixes('dev', '9', 0))
except DiskFileDeviceUnavailable as err:
exc = err
self.assertEqual(str(exc), '')
@ -1964,7 +1964,7 @@ class TestDiskFile(unittest.TestCase):
self.df_mgr._listdir = mock.MagicMock(return_value=[
'abc', 'def', 'ghi', 'abcd', '012'])
self.assertEqual(
list(self.df_mgr.yield_suffixes('dev', '9')),
list(self.df_mgr.yield_suffixes('dev', '9', 0)),
[(self.testdir + '/dev/objects/9/abc', 'abc'),
(self.testdir + '/dev/objects/9/def', 'def'),
(self.testdir + '/dev/objects/9/012', '012')])
@ -1973,7 +1973,7 @@ class TestDiskFile(unittest.TestCase):
self.df_mgr.get_dev_path = mock.MagicMock(return_value=None)
exc = None
try:
list(self.df_mgr.yield_hashes('dev', '9'))
list(self.df_mgr.yield_hashes('dev', '9', 0))
except DiskFileDeviceUnavailable as err:
exc = err
self.assertEqual(str(exc), '')
@ -1983,7 +1983,7 @@ class TestDiskFile(unittest.TestCase):
return []
with mock.patch('os.listdir', _listdir):
self.assertEqual(list(self.df_mgr.yield_hashes('dev', '9')), [])
self.assertEqual(list(self.df_mgr.yield_hashes('dev', '9', 0)), [])
def test_yield_hashes_empty_suffixes(self):
def _listdir(path):
@ -1991,8 +1991,8 @@ class TestDiskFile(unittest.TestCase):
with mock.patch('os.listdir', _listdir):
self.assertEqual(
list(self.df_mgr.yield_hashes('dev', '9', suffixes=['456'])),
[])
list(self.df_mgr.yield_hashes('dev', '9', 0,
suffixes=['456'])), [])
def test_yield_hashes(self):
fresh_ts = normalize_timestamp(time() - 10)
@ -2025,7 +2025,7 @@ class TestDiskFile(unittest.TestCase):
mock.patch('os.listdir', _listdir),
mock.patch('os.unlink')):
self.assertEqual(
list(self.df_mgr.yield_hashes('dev', '9')),
list(self.df_mgr.yield_hashes('dev', '9', 0)),
[(self.testdir +
'/dev/objects/9/abc/9373a92d072897b136b3fc06595b4abc',
'9373a92d072897b136b3fc06595b4abc', fresh_ts),
@ -2068,7 +2068,7 @@ class TestDiskFile(unittest.TestCase):
mock.patch('os.unlink')):
self.assertEqual(
list(self.df_mgr.yield_hashes(
'dev', '9', suffixes=['456'])),
'dev', '9', 0, suffixes=['456'])),
[(self.testdir +
'/dev/objects/9/456/9373a92d072897b136b3fc06595b0456',
'9373a92d072897b136b3fc06595b0456', '1383180000.12345'),

View File

@ -27,6 +27,7 @@ from swift.common import constraints
from swift.common import exceptions
from swift.common import swob
from swift.common import utils
from swift.common.storage_policy import POLICY_INDEX
from swift.obj import diskfile
from swift.obj import server
from swift.obj import ssync_receiver
@ -127,6 +128,38 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(resp.status_int, 200)
mocked_replication_lock.assert_called_once_with('sda1')
def test_Receiver_with_default_storage_policy(self):
req = swob.Request.blank(
'/sda1/1',
environ={'REQUEST_METHOD': 'REPLICATION'},
body=':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'
':UPDATES: START\r\n:UPDATES: END\r\n')
rcvr = ssync_receiver.Receiver(self.controller, req)
body_lines = [chunk.strip() for chunk in rcvr() if chunk.strip()]
self.assertEqual(
body_lines,
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(rcvr.policy_idx, 0)
@unit.patch_policies()
def test_Receiver_with_storage_policy_index_header(self):
req = swob.Request.blank(
'/sda1/1',
environ={'REQUEST_METHOD': 'REPLICATION',
'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '1'},
body=':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'
':UPDATES: START\r\n:UPDATES: END\r\n')
rcvr = ssync_receiver.Receiver(self.controller, req)
body_lines = [chunk.strip() for chunk in rcvr() if chunk.strip()]
self.assertEqual(
body_lines,
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(rcvr.policy_idx, 1)
def test_REPLICATION_replication_lock_fail(self):
def _mock(path):
with exceptions.ReplicationLockTimeout(0.01, '/somewhere/' + path):
@ -454,7 +487,7 @@ class TestReceiver(unittest.TestCase):
def test_MISSING_CHECK_have_one_exact(self):
object_dir = utils.storage_directory(
os.path.join(self.testdir, 'sda1', diskfile.DATADIR_BASE),
os.path.join(self.testdir, 'sda1', diskfile.get_data_dir(0)),
'1', self.hash1)
utils.mkdirs(object_dir)
fp = open(os.path.join(object_dir, self.ts1 + '.data'), 'w+')
@ -483,9 +516,42 @@ class TestReceiver(unittest.TestCase):
self.assertFalse(self.controller.logger.error.called)
self.assertFalse(self.controller.logger.exception.called)
@unit.patch_policies
def test_MISSING_CHECK_storage_policy(self):
object_dir = utils.storage_directory(
os.path.join(self.testdir, 'sda1', diskfile.get_data_dir(1)),
'1', self.hash1)
utils.mkdirs(object_dir)
fp = open(os.path.join(object_dir, self.ts1 + '.data'), 'w+')
fp.write('1')
fp.flush()
self.metadata1['Content-Length'] = '1'
diskfile.write_metadata(fp, self.metadata1)
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/sda1/1',
environ={'REQUEST_METHOD': 'REPLICATION',
'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '1'},
body=':MISSING_CHECK: START\r\n' +
self.hash1 + ' ' + self.ts1 + '\r\n' +
self.hash2 + ' ' + self.ts2 + '\r\n'
':MISSING_CHECK: END\r\n'
':UPDATES: START\r\n:UPDATES: END\r\n')
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
[':MISSING_CHECK: START',
self.hash2,
':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(resp.status_int, 200)
self.assertFalse(self.controller.logger.error.called)
self.assertFalse(self.controller.logger.exception.called)
def test_MISSING_CHECK_have_one_newer(self):
object_dir = utils.storage_directory(
os.path.join(self.testdir, 'sda1', diskfile.DATADIR_BASE),
os.path.join(self.testdir, 'sda1', diskfile.get_data_dir(0)),
'1', self.hash1)
utils.mkdirs(object_dir)
newer_ts1 = utils.normalize_timestamp(float(self.ts1) + 1)
@ -518,7 +584,7 @@ class TestReceiver(unittest.TestCase):
def test_MISSING_CHECK_have_one_older(self):
object_dir = utils.storage_directory(
os.path.join(self.testdir, 'sda1', diskfile.DATADIR_BASE),
os.path.join(self.testdir, 'sda1', diskfile.get_data_dir(0)),
'1', self.hash1)
utils.mkdirs(object_dir)
older_ts1 = utils.normalize_timestamp(float(self.ts1) - 1)
@ -1000,6 +1066,58 @@ class TestReceiver(unittest.TestCase):
'Content-Encoding': 'gzip',
'Specialty-Header': 'value',
'Host': 'localhost:80',
POLICY_INDEX: '0',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': (
'content-length x-timestamp x-object-meta-test1 '
'content-encoding specialty-header')})
self.assertEqual(req.read_body, '1')
@unit.patch_policies()
def test_UPDATES_with_storage_policy(self):
_PUT_request = [None]
@server.public
def _PUT(request):
_PUT_request[0] = request
request.read_body = request.environ['wsgi.input'].read()
return swob.HTTPOk()
with mock.patch.object(self.controller, 'PUT', _PUT):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
environ={'REQUEST_METHOD': 'REPLICATION',
'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '1'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\n'
'PUT /a/c/o\r\n'
'Content-Length: 1\r\n'
'X-Timestamp: 1364456113.12344\r\n'
'X-Object-Meta-Test1: one\r\n'
'Content-Encoding: gzip\r\n'
'Specialty-Header: value\r\n'
'\r\n'
'1')
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(resp.status_int, 200)
self.assertFalse(self.controller.logger.exception.called)
self.assertFalse(self.controller.logger.error.called)
req = _PUT_request[0]
self.assertEqual(req.path, '/device/partition/a/c/o')
self.assertEqual(req.content_length, 1)
self.assertEqual(req.headers, {
'Content-Length': '1',
'X-Timestamp': '1364456113.12344',
'X-Object-Meta-Test1': 'one',
'Content-Encoding': 'gzip',
'Specialty-Header': 'value',
'Host': 'localhost:80',
POLICY_INDEX: '1',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': (
'content-length x-timestamp x-object-meta-test1 '
@ -1037,6 +1155,7 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(req.headers, {
'X-Timestamp': '1364456113.76334',
'Host': 'localhost:80',
POLICY_INDEX: '0',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': 'x-timestamp'})
@ -1137,6 +1256,7 @@ class TestReceiver(unittest.TestCase):
'Content-Encoding': 'gzip',
'Specialty-Header': 'value',
'Host': 'localhost:80',
POLICY_INDEX: '0',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': (
'content-length x-timestamp x-object-meta-test1 '
@ -1148,6 +1268,7 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(req.headers, {
'X-Timestamp': '1364456113.00002',
'Host': 'localhost:80',
POLICY_INDEX: '0',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': 'x-timestamp'})
req = _requests.pop(0)
@ -1158,6 +1279,7 @@ class TestReceiver(unittest.TestCase):
'Content-Length': '3',
'X-Timestamp': '1364456113.00003',
'Host': 'localhost:80',
POLICY_INDEX: '0',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': (
'content-length x-timestamp')})
@ -1170,6 +1292,7 @@ class TestReceiver(unittest.TestCase):
'Content-Length': '4',
'X-Timestamp': '1364456113.00004',
'Host': 'localhost:80',
POLICY_INDEX: '0',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': (
'content-length x-timestamp')})
@ -1180,6 +1303,7 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(req.headers, {
'X-Timestamp': '1364456113.00005',
'Host': 'localhost:80',
POLICY_INDEX: '0',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': 'x-timestamp'})
req = _requests.pop(0)
@ -1188,6 +1312,7 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(req.headers, {
'X-Timestamp': '1364456113.00006',
'Host': 'localhost:80',
POLICY_INDEX: '0',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': 'x-timestamp'})
self.assertEqual(_requests, [])
@ -1251,6 +1376,7 @@ class TestReceiver(unittest.TestCase):
'Content-Length': '3',
'X-Timestamp': '1364456113.00001',
'Host': 'localhost:80',
POLICY_INDEX: '0',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': (
'content-length x-timestamp')})
@ -1262,6 +1388,7 @@ class TestReceiver(unittest.TestCase):
'Content-Length': '1',
'X-Timestamp': '1364456113.00002',
'Host': 'localhost:80',
POLICY_INDEX: '0',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': (
'content-length x-timestamp')})

View File

@ -26,8 +26,9 @@ import mock
from swift.common import exceptions, utils
from swift.obj import ssync_sender, diskfile
from swift.common.storage_policy import POLICY_INDEX
from test.unit import DebugLogger
from test.unit import DebugLogger, patch_policies
class FakeReplicator(object):
@ -103,11 +104,11 @@ class TestSender(unittest.TestCase):
def _make_open_diskfile(self, device='dev', partition='9',
account='a', container='c', obj='o', body='test',
extra_metadata=None):
extra_metadata=None, policy_idx=0):
object_parts = account, container, obj
req_timestamp = utils.normalize_timestamp(time.time())
df = self.sender.daemon._diskfile_mgr.get_diskfile(device, partition,
*object_parts)
df = self.sender.daemon._diskfile_mgr.get_diskfile(
device, partition, *object_parts, policy_idx=policy_idx)
content_length = len(body)
etag = hashlib.md5(body).hexdigest()
with df.create() as writer:
@ -206,6 +207,38 @@ class TestSender(unittest.TestCase):
self.sender.updates.assert_called_once_with()
self.sender.disconnect.assert_called_once_with()
@patch_policies
def test_connect(self):
node = dict(ip='1.2.3.4', port=5678, device='sda1')
job = dict(partition='9', policy_idx=1)
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
with mock.patch(
'swift.obj.ssync_sender.bufferedhttp.BufferedHTTPConnection'
) as mock_conn_class:
mock_conn = mock_conn_class.return_value
mock_resp = mock.MagicMock()
mock_resp.status = 200
mock_conn.getresponse.return_value = mock_resp
self.sender.connect()
mock_conn_class.assert_called_once_with('1.2.3.4:5678')
expectations = {
'putrequest': [
mock.call('REPLICATION', '/sda1/9'),
],
'putheader': [
mock.call('Transfer-Encoding', 'chunked'),
mock.call(POLICY_INDEX, 1),
],
'endheaders': [mock.call()],
}
for method_name, expected_calls in expectations.items():
mock_method = getattr(mock_conn, method_name)
self.assertEquals(expected_calls, mock_method.mock_calls,
'connection method "%s" got %r not %r' % (
method_name, mock_method.mock_calls,
expected_calls))
def test_connect_send_timeout(self):
self.replicator.conn_timeout = 0.01
node = dict(ip='1.2.3.4', port=5678, device='sda1')
@ -327,12 +360,13 @@ class TestSender(unittest.TestCase):
self.assertRaises(exceptions.MessageTimeout, self.sender.missing_check)
def test_missing_check_has_empty_suffixes(self):
def yield_hashes(device, partition, suffixes=None):
if device != 'dev' or partition != '9' or suffixes != [
'abc', 'def']:
def yield_hashes(device, partition, policy_idx, suffixes=None):
if (device != 'dev' or partition != '9' or policy_idx != 0 or
suffixes != ['abc', 'def']):
yield # Just here to make this a generator
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
'No match for %r %r %r %r' % (device, partition,
policy_idx, suffixes))
self.sender.connection = FakeConnection()
self.sender.job = {'device': 'dev', 'partition': '9'}
@ -350,9 +384,9 @@ class TestSender(unittest.TestCase):
self.assertEqual(self.sender.send_list, [])
def test_missing_check_has_suffixes(self):
def yield_hashes(device, partition, suffixes=None):
if device == 'dev' and partition == '9' and suffixes == [
'abc', 'def']:
def yield_hashes(device, partition, policy_idx, suffixes=None):
if (device == 'dev' and partition == '9' and policy_idx == 0 and
suffixes == ['abc', 'def']):
yield (
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
@ -370,7 +404,8 @@ class TestSender(unittest.TestCase):
'1380144474.44444')
else:
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
'No match for %r %r %r %r' % (device, partition,
policy_idx, suffixes))
self.sender.connection = FakeConnection()
self.sender.job = {'device': 'dev', 'partition': '9'}
@ -391,8 +426,9 @@ class TestSender(unittest.TestCase):
self.assertEqual(self.sender.send_list, [])
def test_missing_check_far_end_disconnect(self):
def yield_hashes(device, partition, suffixes=None):
if device == 'dev' and partition == '9' and suffixes == ['abc']:
def yield_hashes(device, partition, policy_idx, suffixes=None):
if (device == 'dev' and partition == '9' and policy_idx == 0 and
suffixes == ['abc']):
yield (
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
@ -400,7 +436,8 @@ class TestSender(unittest.TestCase):
'1380144470.00000')
else:
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
'No match for %r %r %r %r' % (device, partition,
policy_idx, suffixes))
self.sender.connection = FakeConnection()
self.sender.job = {'device': 'dev', 'partition': '9'}
@ -420,8 +457,9 @@ class TestSender(unittest.TestCase):
'15\r\n:MISSING_CHECK: END\r\n\r\n')
def test_missing_check_far_end_disconnect2(self):
def yield_hashes(device, partition, suffixes=None):
if device == 'dev' and partition == '9' and suffixes == ['abc']:
def yield_hashes(device, partition, policy_idx, suffixes=None):
if (device == 'dev' and partition == '9' and policy_idx == 0 and
suffixes == ['abc']):
yield (
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
@ -429,7 +467,8 @@ class TestSender(unittest.TestCase):
'1380144470.00000')
else:
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
'No match for %r %r %r %r' % (device, partition,
policy_idx, suffixes))
self.sender.connection = FakeConnection()
self.sender.job = {'device': 'dev', 'partition': '9'}
@ -450,8 +489,9 @@ class TestSender(unittest.TestCase):
'15\r\n:MISSING_CHECK: END\r\n\r\n')
def test_missing_check_far_end_unexpected(self):
def yield_hashes(device, partition, suffixes=None):
if device == 'dev' and partition == '9' and suffixes == ['abc']:
def yield_hashes(device, partition, policy_idx, suffixes=None):
if (device == 'dev' and partition == '9' and policy_idx == 0 and
suffixes == ['abc']):
yield (
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
@ -459,7 +499,8 @@ class TestSender(unittest.TestCase):
'1380144470.00000')
else:
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
'No match for %r %r %r %r' % (device, partition,
policy_idx, suffixes))
self.sender.connection = FakeConnection()
self.sender.job = {'device': 'dev', 'partition': '9'}
@ -479,8 +520,9 @@ class TestSender(unittest.TestCase):
'15\r\n:MISSING_CHECK: END\r\n\r\n')
def test_missing_check_send_list(self):
def yield_hashes(device, partition, suffixes=None):
if device == 'dev' and partition == '9' and suffixes == ['abc']:
def yield_hashes(device, partition, policy_idx, suffixes=None):
if (device == 'dev' and partition == '9' and policy_idx == 0 and
suffixes == ['abc']):
yield (
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
@ -488,7 +530,8 @@ class TestSender(unittest.TestCase):
'1380144470.00000')
else:
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
'No match for %r %r %r %r' % (device, partition,
policy_idx, suffixes))
self.sender.connection = FakeConnection()
self.sender.job = {'device': 'dev', 'partition': '9'}
@ -625,6 +668,36 @@ class TestSender(unittest.TestCase):
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
@patch_policies
def test_updates_storage_policy_index(self):
device = 'dev'
part = '9'
object_parts = ('a', 'c', 'o')
df = self._make_open_diskfile(device, part, *object_parts,
policy_idx=1)
object_hash = utils.hash_path(*object_parts)
expected = df.get_metadata()
self.sender.connection = FakeConnection()
self.sender.job = {'device': device, 'partition': part,
'policy_idx': 1}
self.sender.node = {}
self.sender.send_list = [object_hash]
self.sender.send_delete = mock.MagicMock()
self.sender.send_put = mock.MagicMock()
self.sender.response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates()
args, _kwargs = self.sender.send_put.call_args
path, df = args
self.assertEqual(path, '/a/c/o')
self.assert_(isinstance(df, diskfile.DiskFile))
self.assertEqual(expected, df.get_metadata())
self.assertEqual(os.path.join(self.testdir, 'dev/objects-1/9/',
object_hash[-3:], object_hash),
df._datadir)
def test_updates_read_response_timeout_start(self):
self.sender.connection = FakeConnection()
self.sender.send_list = []