ssync: Refactor tests to use Timestamp and new assert

Created a new assertConnectionMessages() assert that will take it a list
of message strings (without the length and added \r\n), these are added
and compared against the actual connection messages passed in.

I.e:
    self.assertEqual(
        b''.join(connection.sent),
        b'17\r\n:MISSING_CHECK: START\r\n\r\n'
        b'15\r\n:MISSING_CHECK: END\r\n\r\n')

Becomes:
    self.assertConnectionMessages(
        [b':MISSING_CHECK: START',
         b':MISSING_CHECK: END']
        connection.sent)

And the hex lengths are automatically calculated. This because more
useful when you pass in variable messages and timestamps.

Change-Id: I405e218f6e6d6f12818f611e08d5c7f8963ee8c6
Signed-off-by: Matthew Oliver <matt@oliver.net.au>
This commit is contained in:
Matthew Oliver
2025-12-02 16:41:23 +11:00
committed by Alistair Coles
parent 22605d21ee
commit 259afd2a75
2 changed files with 261 additions and 211 deletions

View File

@@ -20,6 +20,7 @@ import unittest
from swift.common import utils
from swift.common.storage_policy import POLICIES
from swift.common.utils import Timestamp, md5
from test.unit import make_timestamp_iter
def write_diskfile(df, timestamp, data=b'test data', frag_index=None,
@@ -66,10 +67,17 @@ class BaseTest(unittest.TestCase):
}
# daemon will be set in subclass setUp
self.daemon = None
self._ts_iter = make_timestamp_iter()
def tearDown(self):
shutil.rmtree(self.tmpdir, ignore_errors=True)
def ts(self):
"""
Timestamps - forever.
"""
return next(self._ts_iter)
def _make_diskfile(self, device='dev', partition='9',
account='a', container='c', obj='o', body=b'test',
extra_metadata=None, policy=None,

View File

@@ -96,8 +96,35 @@ class FakeConnection(object):
self.closed = True
class SenderBase(BaseTest):
def assertConnectionMessages(self, expected_messages, messages,
cmds_with_newlines=False):
"""Assert ssync sender messages
You only need to provide the expected lines of text. The
hex length and \\r\\n will be added automatically, eg:
[':MISSING_CHECK: START', '<obj hash> <ts>', ':MISSING_CHECK: END' ]
Commands in the ssync protocol need to end with a \\r\\n. If you
decide to provide them (because request data tends to not incude them)
then you can use the `cmds_with_newlines` flag to indicate this.
"""
expected = b""
for line in expected_messages:
if isinstance(line, str):
line = wsgi_to_bytes(line)
if line:
if not cmds_with_newlines:
line = b"%s\r\n" % line
expected += b'%x\r\n%s\r\n' % (len(line), line)
self.assertEqual(expected, b''.join(messages))
@patch_policies()
class TestSender(BaseTest):
class TestSender(SenderBase):
def setUp(self):
skip_if_no_xattrs()
@@ -560,12 +587,14 @@ class TestSender(BaseTest):
self.assertTrue(found_put)
def test_call_and_missing_check(self):
ts = self.ts()
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if device == 'dev' and partition == '9' and suffixes == ['abc'] \
and policy == POLICIES.legacy:
yield (
'9d41d8cd98f00b204e9800998ecf0abc',
{'ts_data': Timestamp(1380144470.00000)})
{'ts_data': ts})
else:
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
@@ -593,15 +622,17 @@ class TestSender(BaseTest):
self.assertTrue(success)
self.assertEqual(candidates,
dict([('9d41d8cd98f00b204e9800998ecf0abc',
{'ts_data': Timestamp(1380144470.00000)})]))
{'ts_data': ts})]))
def test_call_and_missing_check_with_obj_list(self):
ts = self.ts()
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if device == 'dev' and partition == '9' and suffixes == ['abc'] \
and policy == POLICIES.legacy:
yield (
'9d41d8cd98f00b204e9800998ecf0abc',
{'ts_data': Timestamp(1380144470.00000)})
{'ts_data': ts})
else:
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
@@ -627,7 +658,7 @@ class TestSender(BaseTest):
self.assertTrue(success)
self.assertEqual(candidates,
dict([('9d41d8cd98f00b204e9800998ecf0abc',
{'ts_data': Timestamp(1380144470.00000)})]))
{'ts_data': ts})]))
def test_call_and_missing_check_with_obj_list_but_required(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@@ -804,13 +835,16 @@ class TestSender(BaseTest):
self.assertFalse(self.sender.limited_by_max_objects)
def test_call_and_missing_check_timeout_send_line(self):
ts1 = self.ts()
ts2 = self.ts()
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
yield (
'9d41d8cd98f00b204e9800998ecf0abc',
{'ts_data': Timestamp(1380144470.00000)})
{'ts_data': ts1})
yield (
'9d41d8cd98f00b204e9800998ecf0def',
{'ts_data': Timestamp(1380144471.00000)})
{'ts_data': ts2})
response = FakeResponse()
# max_objects unlimited
@@ -836,11 +870,12 @@ class TestSender(BaseTest):
'0 lines (0 bytes) sent', log_lines)
self.assertFalse(self.sender.limited_by_max_objects)
# only the first missing check item was sent, plus a disconnect line
self.assertEqual(
b''.join(connection.sent),
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
b'0\r\n\r\n')
self.assertConnectionMessages(
[b':MISSING_CHECK: START',
b'9d41d8cd98f00b204e9800998ecf0abc %s' %
ts1.internal.encode('ascii'),
''],
connection.sent)
def test_missing_check_has_empty_suffixes(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@@ -867,31 +902,33 @@ class TestSender(BaseTest):
self.assertFalse(self.sender.limited_by_max_objects)
available_map, send_map = self.sender.missing_check(connection,
response)
self.assertEqual(
b''.join(connection.sent),
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertConnectionMessages(
[b':MISSING_CHECK: START',
b':MISSING_CHECK: END'],
connection.sent)
self.assertEqual(send_map, {})
self.assertEqual(available_map, {})
self.assertFalse(self.sender.limited_by_max_objects)
def test_missing_check_has_suffixes(self):
timestamps = [self.ts() for _ in range(6)]
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if (device == 'dev' and partition == '9' and
policy == POLICIES.legacy and
suffixes == ['abc', 'def']):
yield (
'9d41d8cd98f00b204e9800998ecf0abc',
{'ts_data': Timestamp(1380144470.00000)})
{'ts_data': timestamps[0]})
yield (
'9d41d8cd98f00b204e9800998ecf0def',
{'ts_data': Timestamp(1380144472.22222),
'ts_meta': Timestamp(1380144473.22222)})
{'ts_data': timestamps[1],
'ts_meta': timestamps[2]})
yield (
'9d41d8cd98f00b204e9800998ecf1def',
{'ts_data': Timestamp(1380144474.44444),
'ts_ctype': Timestamp(1380144474.44448),
'ts_meta': Timestamp(1380144475.44444)})
{'ts_data': timestamps[3],
'ts_ctype': timestamps[4],
'ts_meta': timestamps[5]})
else:
raise Exception(
'No match for %r %r %r %r' % (device, partition,
@@ -916,30 +953,34 @@ class TestSender(BaseTest):
self.assertFalse(self.sender.limited_by_max_objects)
available_map, send_map = self.sender.missing_check(connection,
response)
self.assertEqual(
b''.join(connection.sent),
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
b'3b\r\n9d41d8cd98f00b204e9800998ecf0def 1380144472.22222 '
b'm:186a0\r\n\r\n'
b'3f\r\n9d41d8cd98f00b204e9800998ecf1def 1380144474.44444 '
b'm:186a0,t:4\r\n\r\n'
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
expected_messages = [
b':MISSING_CHECK: START',
b'9d41d8cd98f00b204e9800998ecf0abc %s' % (
timestamps[0].internal.encode('ascii')),
b'9d41d8cd98f00b204e9800998ecf0def %s m:186a0' % (
timestamps[1].internal.encode('ascii')),
(b'9d41d8cd98f00b204e9800998ecf1def %s '
b'm:30d40,t:186a0' % (
timestamps[3].internal.encode('ascii'))),
b':MISSING_CHECK: END']
self.assertConnectionMessages(expected_messages, connection.sent)
self.assertEqual(send_map, {})
candidates = [('9d41d8cd98f00b204e9800998ecf0abc',
dict(ts_data=Timestamp(1380144470.00000))),
dict(ts_data=timestamps[0])),
('9d41d8cd98f00b204e9800998ecf0def',
dict(ts_data=Timestamp(1380144472.22222),
ts_meta=Timestamp(1380144473.22222))),
dict(ts_data=timestamps[1],
ts_meta=timestamps[2])),
('9d41d8cd98f00b204e9800998ecf1def',
dict(ts_data=Timestamp(1380144474.44444),
ts_meta=Timestamp(1380144475.44444),
ts_ctype=Timestamp(1380144474.44448)))]
dict(ts_data=timestamps[3],
ts_meta=timestamps[5],
ts_ctype=timestamps[4]))]
self.assertEqual(available_map, dict(candidates))
self.assertEqual([], self.daemon_logger.get_lines_for_level('info'))
self.assertFalse(self.sender.limited_by_max_objects)
def test_missing_check_max_objects_less_than_actual_objects(self):
timestamps = [self.ts() for _ in range(6)]
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
# verify missing_check stops after 2 objects even though more
# objects would yield
@@ -948,16 +989,16 @@ class TestSender(BaseTest):
suffixes == ['abc', 'def']):
yield (
'9d41d8cd98f00b204e9800998ecf0abc',
{'ts_data': Timestamp(1380144470.00000)})
{'ts_data': timestamps[0]})
yield (
'9d41d8cd98f00b204e9800998ecf0def',
{'ts_data': Timestamp(1380144472.22222),
'ts_meta': Timestamp(1380144473.22222)})
{'ts_data': timestamps[1],
'ts_meta': timestamps[2]})
yield (
'9d41d8cd98f00b204e9800998ecf1def',
{'ts_data': Timestamp(1380144474.44444),
'ts_ctype': Timestamp(1380144474.44448),
'ts_meta': Timestamp(1380144475.44444)})
{'ts_data': timestamps[3],
'ts_ctype': timestamps[4],
'ts_meta': timestamps[5]})
else:
raise Exception(
'No match for %r %r %r %r' % (device, partition,
@@ -982,19 +1023,20 @@ class TestSender(BaseTest):
self.assertFalse(self.sender.limited_by_max_objects)
available_map, send_map = self.sender.missing_check(connection,
response)
self.assertEqual(
b''.join(connection.sent),
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
b'3b\r\n9d41d8cd98f00b204e9800998ecf0def 1380144472.22222 '
b'm:186a0\r\n\r\n'
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
expected_messages = [
b':MISSING_CHECK: START',
b'9d41d8cd98f00b204e9800998ecf0abc %s' % (
timestamps[0].internal.encode('ascii')),
b'9d41d8cd98f00b204e9800998ecf0def %s m:186a0' % (
timestamps[1].internal.encode('ascii')),
b':MISSING_CHECK: END']
self.assertConnectionMessages(expected_messages, connection.sent)
self.assertEqual(send_map, {})
candidates = [('9d41d8cd98f00b204e9800998ecf0abc',
dict(ts_data=Timestamp(1380144470.00000))),
dict(ts_data=timestamps[0])),
('9d41d8cd98f00b204e9800998ecf0def',
dict(ts_data=Timestamp(1380144472.22222),
ts_meta=Timestamp(1380144473.22222)))]
dict(ts_data=timestamps[1],
ts_meta=timestamps[2]))]
self.assertEqual(available_map, dict(candidates))
self.assertEqual(
['ssync missing_check truncated after 2 objects: device: dev, '
@@ -1004,17 +1046,19 @@ class TestSender(BaseTest):
self.assertTrue(self.sender.limited_by_max_objects)
def test_missing_check_max_objects_exactly_actual_objects(self):
timestamps = [self.ts() for _ in range(3)]
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if (device == 'dev' and partition == '9' and
policy == POLICIES.legacy and
suffixes == ['abc', 'def']):
yield (
'9d41d8cd98f00b204e9800998ecf0abc',
{'ts_data': Timestamp(1380144470.00000)})
{'ts_data': timestamps[0]})
yield (
'9d41d8cd98f00b204e9800998ecf0def',
{'ts_data': Timestamp(1380144472.22222),
'ts_meta': Timestamp(1380144473.22222)})
{'ts_data': timestamps[1],
'ts_meta': timestamps[2]})
else:
raise Exception(
'No match for %r %r %r %r' % (device, partition,
@@ -1039,32 +1083,35 @@ class TestSender(BaseTest):
self.assertFalse(self.sender.limited_by_max_objects)
available_map, send_map = self.sender.missing_check(connection,
response)
self.assertEqual(
b''.join(connection.sent),
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
b'3b\r\n9d41d8cd98f00b204e9800998ecf0def 1380144472.22222 '
b'm:186a0\r\n\r\n'
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
expected_messages = [
b':MISSING_CHECK: START',
b'9d41d8cd98f00b204e9800998ecf0abc %s' % (
timestamps[0].internal.encode('ascii')),
b'9d41d8cd98f00b204e9800998ecf0def %s m:186a0' % (
timestamps[1].internal.encode('ascii')),
b':MISSING_CHECK: END']
self.assertConnectionMessages(expected_messages, connection.sent)
self.assertEqual(send_map, {})
candidates = [('9d41d8cd98f00b204e9800998ecf0abc',
dict(ts_data=Timestamp(1380144470.00000))),
dict(ts_data=timestamps[0])),
('9d41d8cd98f00b204e9800998ecf0def',
dict(ts_data=Timestamp(1380144472.22222),
ts_meta=Timestamp(1380144473.22222)))]
dict(ts_data=timestamps[1],
ts_meta=timestamps[2]))]
self.assertEqual(available_map, dict(candidates))
# nothing logged re: truncation
self.assertEqual([], self.daemon_logger.get_lines_for_level('info'))
self.assertFalse(self.sender.limited_by_max_objects)
def test_missing_check_far_end_disconnect(self):
ts = self.ts()
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if (device == 'dev' and partition == '9' and
policy == POLICIES.legacy and
suffixes == ['abc']):
yield (
'9d41d8cd98f00b204e9800998ecf0abc',
{'ts_data': Timestamp(1380144470.00000)})
{'ts_data': ts})
else:
raise Exception(
'No match for %r %r %r %r' % (device, partition,
@@ -1086,21 +1133,24 @@ class TestSender(BaseTest):
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), 'Early disconnect')
self.assertEqual(
b''.join(connection.sent),
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
expected_messages = [
b':MISSING_CHECK: START',
b'9d41d8cd98f00b204e9800998ecf0abc %s' % (
ts.internal.encode('ascii')),
b':MISSING_CHECK: END']
self.assertConnectionMessages(expected_messages, connection.sent)
self.assertFalse(self.sender.limited_by_max_objects)
def test_missing_check_far_end_disconnect2(self):
ts = self.ts()
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if (device == 'dev' and partition == '9' and
policy == POLICIES.legacy and
suffixes == ['abc']):
yield (
'9d41d8cd98f00b204e9800998ecf0abc',
{'ts_data': Timestamp(1380144470.00000)})
{'ts_data': ts})
else:
raise Exception(
'No match for %r %r %r %r' % (device, partition,
@@ -1123,21 +1173,24 @@ class TestSender(BaseTest):
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), 'Early disconnect')
self.assertEqual(
b''.join(connection.sent),
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
expected_messages = [
b':MISSING_CHECK: START',
b'9d41d8cd98f00b204e9800998ecf0abc %s' % (
ts.internal.encode('ascii')),
b':MISSING_CHECK: END']
self.assertConnectionMessages(expected_messages, connection.sent)
self.assertFalse(self.sender.limited_by_max_objects)
def test_missing_check_far_end_unexpected(self):
ts = self.ts()
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if (device == 'dev' and partition == '9' and
policy == POLICIES.legacy and
suffixes == ['abc']):
yield (
'9d41d8cd98f00b204e9800998ecf0abc',
{'ts_data': Timestamp(1380144470.00000)})
{'ts_data': ts})
else:
raise Exception(
'No match for %r %r %r %r' % (device, partition,
@@ -1159,21 +1212,24 @@ class TestSender(BaseTest):
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), "Unexpected response: 'OH HAI'")
self.assertEqual(
b''.join(connection.sent),
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
expected_messages = [
b':MISSING_CHECK: START',
b'9d41d8cd98f00b204e9800998ecf0abc %s' % (
ts.internal.encode('ascii')),
b':MISSING_CHECK: END']
self.assertConnectionMessages(expected_messages, connection.sent)
self.assertFalse(self.sender.limited_by_max_objects)
def test_missing_check_send_map(self):
ts = self.ts()
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if (device == 'dev' and partition == '9' and
policy == POLICIES.legacy and
suffixes == ['abc']):
yield (
'9d41d8cd98f00b204e9800998ecf0abc',
{'ts_data': Timestamp(1380144470.00000)})
{'ts_data': ts})
else:
raise Exception(
'No match for %r %r %r %r' % (device, partition,
@@ -1195,27 +1251,30 @@ class TestSender(BaseTest):
self.assertFalse(self.sender.limited_by_max_objects)
available_map, send_map = self.sender.missing_check(connection,
response)
self.assertEqual(
b''.join(connection.sent),
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
expected_messages = [
b':MISSING_CHECK: START',
b'9d41d8cd98f00b204e9800998ecf0abc %s' % (
ts.internal.encode('ascii')),
b':MISSING_CHECK: END']
self.assertConnectionMessages(expected_messages, connection.sent)
self.assertEqual(send_map, {'0123abc': {'data': True, 'meta': True}})
self.assertEqual(available_map,
dict([('9d41d8cd98f00b204e9800998ecf0abc',
{'ts_data': Timestamp(1380144470.00000)})]))
{'ts_data': ts})]))
self.assertFalse(self.sender.limited_by_max_objects)
def test_missing_check_extra_line_parts(self):
# check that sender tolerates extra parts in missing check
# line responses to allow for protocol upgrades
ts = self.ts()
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if (device == 'dev' and partition == '9' and
policy == POLICIES.legacy and
suffixes == ['abc']):
yield (
'9d41d8cd98f00b204e9800998ecf0abc',
{'ts_data': Timestamp(1380144470.00000)})
{'ts_data': ts})
else:
raise Exception(
'No match for %r %r %r %r' % (device, partition,
@@ -1240,7 +1299,7 @@ class TestSender(BaseTest):
self.assertEqual(send_map, {'0123abc': {'data': True}})
self.assertEqual(available_map,
dict([('9d41d8cd98f00b204e9800998ecf0abc',
{'ts_data': Timestamp(1380144470.00000)})]))
{'ts_data': ts})]))
self.assertFalse(self.sender.limited_by_max_objects)
def test_updates_timeout(self):
@@ -1258,10 +1317,10 @@ class TestSender(BaseTest):
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates(connection, response, {})
self.assertEqual(
b''.join(connection.sent),
b'11\r\n:UPDATES: START\r\n\r\n'
b'f\r\n:UPDATES: END\r\n\r\n')
expected_messages = [
b':UPDATES: START',
b':UPDATES: END']
self.assertConnectionMessages(expected_messages, connection.sent)
def test_updates_unexpected_response_lines1(self):
connection = FakeConnection()
@@ -1276,10 +1335,10 @@ class TestSender(BaseTest):
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), "Unexpected response: 'abc'")
self.assertEqual(
b''.join(connection.sent),
b'11\r\n:UPDATES: START\r\n\r\n'
b'f\r\n:UPDATES: END\r\n\r\n')
expected_messages = [
b':UPDATES: START',
b':UPDATES: END']
self.assertConnectionMessages(expected_messages, connection.sent)
def test_updates_unexpected_response_lines2(self):
connection = FakeConnection()
@@ -1294,10 +1353,10 @@ class TestSender(BaseTest):
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), "Unexpected response: 'abc'")
self.assertEqual(
b''.join(connection.sent),
b'11\r\n:UPDATES: START\r\n\r\n'
b'f\r\n:UPDATES: END\r\n\r\n')
expected_messages = [
b':UPDATES: START',
b':UPDATES: END']
self.assertConnectionMessages(expected_messages, connection.sent)
def test_updates_is_deleted(self):
device = 'dev'
@@ -1328,10 +1387,10 @@ class TestSender(BaseTest):
self.assertEqual(self.sender.send_put.mock_calls, [])
# note that the delete line isn't actually sent since we mock
# send_delete; send_delete is tested separately.
self.assertEqual(
b''.join(connection.sent),
b'11\r\n:UPDATES: START\r\n\r\n'
b'f\r\n:UPDATES: END\r\n\r\n')
expected_messages = [
b':UPDATES: START',
b':UPDATES: END']
self.assertConnectionMessages(expected_messages, connection.sent)
def test_update_send_delete(self):
device = 'dev'
@@ -1339,7 +1398,7 @@ class TestSender(BaseTest):
object_parts = ('a', 'c', 'o')
df = self._make_open_diskfile(device, part, *object_parts)
object_hash = utils.hash_path(*object_parts)
delete_timestamp = utils.normalize_timestamp(time.time())
delete_timestamp = utils.normalize_timestamp(Timestamp.now())
df.delete(delete_timestamp)
connection = FakeConnection()
self.sender.job = {
@@ -1355,15 +1414,12 @@ class TestSender(BaseTest):
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates(connection, response, send_map)
self.assertEqual(
b''.join(connection.sent),
b'11\r\n:UPDATES: START\r\n\r\n'
b'30\r\n'
b'DELETE /a/c/o\r\n'
b'X-Timestamp: %s\r\n\r\n\r\n'
b'f\r\n:UPDATES: END\r\n\r\n'
% delete_timestamp.encode('ascii')
)
expected_messages = [
b':UPDATES: START',
b'DELETE /a/c/o\r\nX-Timestamp: %s\r\n' %
delete_timestamp.encode('ascii'),
b':UPDATES: END']
self.assertConnectionMessages(expected_messages, connection.sent)
def test_updates_put(self):
# sender has data file and meta file
@@ -1408,10 +1464,10 @@ class TestSender(BaseTest):
self.assertEqual(expected, df.get_metadata())
# note that the put line isn't actually sent since we mock send_put;
# send_put is tested separately.
self.assertEqual(
b''.join(connection.sent),
b'11\r\n:UPDATES: START\r\n\r\n'
b'f\r\n:UPDATES: END\r\n\r\n')
expected_messages = [
b':UPDATES: START',
b':UPDATES: END']
self.assertConnectionMessages(expected_messages, connection.sent)
def test_updates_post(self):
ts_iter = make_timestamp_iter()
@@ -1455,10 +1511,10 @@ class TestSender(BaseTest):
self.assertEqual(expected, df.get_metadata())
# note that the post line isn't actually sent since we mock send_post;
# send_post is tested separately.
self.assertEqual(
b''.join(connection.sent),
b'11\r\n:UPDATES: START\r\n\r\n'
b'f\r\n:UPDATES: END\r\n\r\n')
expected_messages = [
b':UPDATES: START',
b':UPDATES: END']
self.assertConnectionMessages(expected_messages, connection.sent)
def test_updates_put_and_post(self):
ts_iter = make_timestamp_iter()
@@ -1507,10 +1563,10 @@ class TestSender(BaseTest):
self.assertEqual(path, '/a/c/o')
self.assertIsInstance(df, diskfile.DiskFile)
self.assertEqual(expected, df.get_metadata())
self.assertEqual(
b''.join(connection.sent),
b'11\r\n:UPDATES: START\r\n\r\n'
b'f\r\n:UPDATES: END\r\n\r\n')
expected_messages = [
b':UPDATES: START',
b':UPDATES: END']
self.assertConnectionMessages(expected_messages, connection.sent)
def test_updates_storage_policy_index(self):
device = 'dev'
@@ -1570,10 +1626,10 @@ class TestSender(BaseTest):
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), 'Early disconnect')
self.assertEqual(
b''.join(connection.sent),
b'11\r\n:UPDATES: START\r\n\r\n'
b'f\r\n:UPDATES: END\r\n\r\n')
expected_messages = [
b':UPDATES: START',
b':UPDATES: END']
self.assertConnectionMessages(expected_messages, connection.sent)
def test_updates_read_response_unexp_start(self):
connection = FakeConnection()
@@ -1588,10 +1644,10 @@ class TestSender(BaseTest):
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), "Unexpected response: 'anything else'")
self.assertEqual(
b''.join(connection.sent),
b'11\r\n:UPDATES: START\r\n\r\n'
b'f\r\n:UPDATES: END\r\n\r\n')
expected_messages = [
b':UPDATES: START',
b':UPDATES: END']
self.assertConnectionMessages(expected_messages, connection.sent)
def test_updates_read_response_timeout_end(self):
connection = FakeConnection()
@@ -1624,10 +1680,10 @@ class TestSender(BaseTest):
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), 'Early disconnect')
self.assertEqual(
b''.join(connection.sent),
b'11\r\n:UPDATES: START\r\n\r\n'
b'f\r\n:UPDATES: END\r\n\r\n')
expected_messages = [
b':UPDATES: START',
b':UPDATES: END']
self.assertConnectionMessages(expected_messages, connection.sent)
def test_updates_read_response_unexp_end(self):
connection = FakeConnection()
@@ -1642,10 +1698,10 @@ class TestSender(BaseTest):
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), "Unexpected response: 'anything else'")
self.assertEqual(
b''.join(connection.sent),
b'11\r\n:UPDATES: START\r\n\r\n'
b'f\r\n:UPDATES: END\r\n\r\n')
expected_messages = [
b':UPDATES: START',
b':UPDATES: END']
self.assertConnectionMessages(expected_messages, connection.sent)
def test_send_delete_timeout(self):
connection = FakeConnection()
@@ -1661,14 +1717,12 @@ class TestSender(BaseTest):
def test_send_delete(self):
connection = FakeConnection()
self.sender.send_delete(connection, '/a/c/o',
utils.Timestamp('1381679759.90941'))
self.assertEqual(
b''.join(connection.sent),
b'30\r\n'
b'DELETE /a/c/o\r\n'
b'X-Timestamp: 1381679759.90941\r\n'
b'\r\n\r\n')
ts = self.ts()
self.sender.send_delete(connection, '/a/c/o', ts)
expected_messages = [
b'DELETE /a/c/o\r\nX-Timestamp: %s\r\n'
% ts.internal.encode('ascii')]
self.assertConnectionMessages(expected_messages, connection.sent)
def test_send_put_initial_timeout(self):
df = self._make_open_diskfile()
@@ -1707,8 +1761,7 @@ class TestSender(BaseTest):
def _check_send_put(self, obj_name, meta_value,
meta_name='Unicode-Meta-Name', durable=True):
ts_iter = make_timestamp_iter()
t1 = next(ts_iter)
t1 = self.ts()
body = b'test'
extra_metadata = {'Some-Other-Header': 'value',
meta_name: meta_value}
@@ -1720,38 +1773,30 @@ class TestSender(BaseTest):
commit=durable)
expected = dict(df.get_metadata())
expected['body'] = body.decode('ascii')
expected['chunk_size'] = len(body)
expected['meta'] = meta_value
expected['meta_name'] = meta_name
path = urllib.parse.quote(expected['name'])
expected['path'] = path
no_commit = '' if durable else 'X-Backend-No-Commit: True\r\n'
expected['no_commit'] = no_commit
length = 128 + len(path) + len(meta_value) + len(no_commit) + \
len(meta_name)
expected['length'] = format(length, 'x')
# .meta file metadata is not included in expected for data only PUT
t2 = next(ts_iter)
t2 = self.ts()
metadata = {'X-Timestamp': t2.internal, 'X-Object-Meta-Fruit': 'kiwi'}
df.write_metadata(metadata)
df.open()
connection = FakeConnection()
self.sender.send_put(connection, path, df, durable=durable)
expected = (
'%(length)s\r\n'
'PUT %(path)s\r\n'
'Content-Length: %(Content-Length)s\r\n'
'ETag: %(ETag)s\r\n'
'Some-Other-Header: value\r\n'
'%(meta_name)s: %(meta)s\r\n'
'%(no_commit)s'
'X-Timestamp: %(X-Timestamp)s\r\n'
'\r\n'
'\r\n'
'%(chunk_size)s\r\n'
'%(body)s\r\n' % expected)
expected = wsgi_to_bytes(expected)
self.assertEqual(b''.join(connection.sent), expected)
expected_messages = [
('PUT %(path)s\r\n'
'Content-Length: %(Content-Length)s\r\n'
'ETag: %(ETag)s\r\n'
'Some-Other-Header: value\r\n'
'%(meta_name)s: %(meta)s\r\n'
'%(no_commit)s'
'X-Timestamp: %(X-Timestamp)s\r\n\r\n' % expected),
'%(body)s' % expected]
self.assertConnectionMessages(expected_messages, connection.sent,
cmds_with_newlines=True)
def test_send_put(self):
self._check_send_put('o', 'meta')
@@ -1788,21 +1833,19 @@ class TestSender(BaseTest):
df.write_metadata(newer_metadata)
path = urllib.parse.quote(df.read_metadata()['name'])
wire_meta = wsgi_to_bytes(meta_value)
length = format(61 + len(path) + len(wire_meta), 'x')
connection = FakeConnection()
with df.open():
self.sender.send_post(connection, path, df)
self.assertEqual(
b''.join(connection.sent),
b'%s\r\n'
b'POST %s\r\n'
b'X-Object-Meta-Foo: %s\r\n'
b'X-Timestamp: %s\r\n'
b'\r\n'
b'\r\n' % (length.encode('ascii'), path.encode('ascii'),
wire_meta,
ts_1.internal.encode('ascii')))
expected_messages = [
(b'POST %s\r\n'
b'X-Object-Meta-Foo: %s\r\n'
b'X-Timestamp: %s\r\n' % (
path.encode('ascii'),
wire_meta,
ts_1.internal.encode('ascii')
))]
self.assertConnectionMessages(expected_messages, connection.sent)
def test_send_post(self):
self._check_send_post('o', 'meta')
@@ -1828,7 +1871,7 @@ class TestSender(BaseTest):
@patch_policies(with_ec_default=True)
class TestSenderEC(BaseTest):
class TestSenderEC(SenderBase):
def setUp(self):
skip_if_no_xattrs()
super(TestSenderEC, self).setUp()
@@ -1879,12 +1922,11 @@ class TestSenderEC(BaseTest):
connection = FakeConnection()
available_map, send_map = self.sender.missing_check(connection,
response)
self.assertEqual(
b''.join(connection.sent),
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
b'33\r\n' + object_hash.encode('utf8') +
b' ' + t1.internal.encode('utf8') + b'\r\n\r\n'
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
expected_messages = [
b':MISSING_CHECK: START',
object_hash.encode('utf8') + b' ' + t1.internal.encode('utf8'),
b':MISSING_CHECK: END']
self.assertConnectionMessages(expected_messages, connection.sent)
self.assertEqual(
available_map, {object_hash: {'ts_data': t1, 'durable': True}})
@@ -1897,12 +1939,12 @@ class TestSenderEC(BaseTest):
connection = FakeConnection()
available_map, send_map = self.sender.missing_check(connection,
response)
self.assertEqual(
b''.join(connection.sent),
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
b'41\r\n' + object_hash.encode('utf8') +
b' ' + t2.internal.encode('utf8') + b' durable:False\r\n\r\n'
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
expected_messages = [
b':MISSING_CHECK: START',
object_hash.encode('utf8') + b' ' + t2.internal.encode('utf8')
+ b' durable:False',
b':MISSING_CHECK: END']
self.assertConnectionMessages(expected_messages, connection.sent)
self.assertEqual(
available_map, {object_hash: {'ts_data': t2, 'durable': False}})
@@ -1914,12 +1956,11 @@ class TestSenderEC(BaseTest):
connection = FakeConnection()
available_map, send_map = self.sender.missing_check(connection,
response)
self.assertEqual(
b''.join(connection.sent),
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
b'33\r\n' + object_hash.encode('utf8') +
b' ' + t1.internal.encode('utf8') + b'\r\n\r\n'
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
expected_messages = [
b':MISSING_CHECK: START',
object_hash.encode('utf8') + b' ' + t1.internal.encode('utf8'),
b':MISSING_CHECK: END']
self.assertConnectionMessages(expected_messages, connection.sent)
self.assertEqual(
available_map, {object_hash: {'ts_data': t1, 'durable': True}})
@@ -1981,10 +2022,11 @@ class TestSenderEC(BaseTest):
self.assertEqual({'durable': expected_durable_kwarg}, kwargs)
# note that the put line isn't actually sent since we mock
# send_put; send_put is tested separately.
self.assertEqual(
b''.join(connection.sent),
b'11\r\n:UPDATES: START\r\n\r\n'
b'f\r\n:UPDATES: END\r\n\r\n')
expected_messages = [
b':UPDATES: START',
b':UPDATES: END']
self.assertConnectionMessages(expected_messages, connection.sent)
# note: we never expect the (False, False) case
check_updates(include_non_durable=False, expected_durable_kwarg=True)