SSYNC: Stop sharing a global connection

Change-Id: Id5988887f01532b27c3888a126764524d2466011
This commit is contained in:
Romain LE DISEZ
2018-10-22 10:26:59 +02:00
parent 46c6fab1cf
commit e4ad56abb1
3 changed files with 165 additions and 157 deletions

View File

@@ -84,7 +84,6 @@ class Sender(object):
self.node = node
self.job = job
self.suffixes = suffixes
self.connection = None
self.response = None
self.response_buffer = ''
self.response_chunk_left = 0
@@ -111,6 +110,7 @@ class Sender(object):
"""
if not self.suffixes:
return True, {}
connection = None
try:
# Double try blocks in case our main error handler fails.
try:
@@ -119,10 +119,10 @@ class Sender(object):
# exceptions.ReplicationException for common issues that will
# abort the replication attempt and log a simple error. All
# other exceptions will be logged with a full stack trace.
self.connect()
self.missing_check()
connection = self.connect()
self.missing_check(connection)
if self.remote_check_objs is None:
self.updates()
self.updates(connection)
can_delete_obj = self.available_map
else:
# when we are initialized with remote_check_objs we don't
@@ -150,7 +150,7 @@ class Sender(object):
self.node.get('replication_port'),
self.node.get('device'), self.job.get('partition'))
finally:
self.disconnect()
self.disconnect(connection)
except Exception:
# We don't want any exceptions to escape our code and possibly
# mess up the original replicator code that called us since it
@@ -167,16 +167,17 @@ class Sender(object):
Establishes a connection and starts an SSYNC request
with the object server.
"""
connection = None
with exceptions.MessageTimeout(
self.daemon.conn_timeout, 'connect send'):
self.connection = bufferedhttp.BufferedHTTPConnection(
connection = bufferedhttp.BufferedHTTPConnection(
'%s:%s' % (self.node['replication_ip'],
self.node['replication_port']))
self.connection.putrequest('SSYNC', '/%s/%s' % (
connection.putrequest('SSYNC', '/%s/%s' % (
self.node['device'], self.job['partition']))
self.connection.putheader('Transfer-Encoding', 'chunked')
self.connection.putheader('X-Backend-Storage-Policy-Index',
int(self.job['policy']))
connection.putheader('Transfer-Encoding', 'chunked')
connection.putheader('X-Backend-Storage-Policy-Index',
int(self.job['policy']))
# a sync job must use the node's index for the frag_index of the
# rebuilt fragments instead of the frag_index from the job which
# will be rebuilding them
@@ -188,20 +189,20 @@ class Sender(object):
# cases on the wire we write the empty string which
# ssync_receiver will translate to None
frag_index = ''
self.connection.putheader('X-Backend-Ssync-Frag-Index',
frag_index)
connection.putheader('X-Backend-Ssync-Frag-Index', frag_index)
# a revert job to a handoff will not have a node index
self.connection.putheader('X-Backend-Ssync-Node-Index',
self.node.get('index', ''))
self.connection.endheaders()
connection.putheader('X-Backend-Ssync-Node-Index',
self.node.get('index', ''))
connection.endheaders()
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'connect receive'):
self.response = self.connection.getresponse()
self.response = connection.getresponse()
if self.response.status != http.HTTP_OK:
err_msg = self.response.read()[:1024]
raise exceptions.ReplicationException(
'Expected status %s; got %s (%s)' %
(http.HTTP_OK, self.response.status, err_msg))
return connection
def readline(self):
"""
@@ -248,7 +249,7 @@ class Sender(object):
data += '\n'
return data
def missing_check(self):
def missing_check(self, connection):
"""
Handles the sender-side of the MISSING_CHECK step of a
SSYNC request.
@@ -260,7 +261,7 @@ class Sender(object):
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'missing_check start'):
msg = ':MISSING_CHECK: START\r\n'
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
connection.send('%x\r\n%s\r\n' % (len(msg), msg))
hash_gen = self.df_mgr.yield_hashes(
self.job['device'], self.job['partition'],
self.job['policy'], self.suffixes,
@@ -276,11 +277,11 @@ class Sender(object):
self.daemon.node_timeout,
'missing_check send line'):
msg = '%s\r\n' % encode_missing(object_hash, **timestamps)
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
connection.send('%x\r\n%s\r\n' % (len(msg), msg))
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'missing_check end'):
msg = ':MISSING_CHECK: END\r\n'
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
connection.send('%x\r\n%s\r\n' % (len(msg), msg))
# Now, retrieve the list of what they want.
while True:
with exceptions.MessageTimeout(
@@ -307,7 +308,7 @@ class Sender(object):
if parts:
self.send_map[parts[0]] = decode_wanted(parts[1:])
def updates(self):
def updates(self, connection):
"""
Handles the sender-side of the UPDATES step of an SSYNC
request.
@@ -319,7 +320,7 @@ class Sender(object):
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'updates start'):
msg = ':UPDATES: START\r\n'
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
connection.send('%x\r\n%s\r\n' % (len(msg), msg))
for object_hash, want in self.send_map.items():
object_hash = urllib.parse.unquote(object_hash)
try:
@@ -340,12 +341,12 @@ class Sender(object):
df_alt = self.job.get(
'sync_diskfile_builder', lambda *args: df)(
self.job, self.node, df.get_datafile_metadata())
self.send_put(url_path, df_alt)
self.send_put(connection, url_path, df_alt)
if want.get('meta') and df.data_timestamp != df.timestamp:
self.send_post(url_path, df)
self.send_post(connection, url_path, df)
except exceptions.DiskFileDeleted as err:
if want.get('data'):
self.send_delete(url_path, err.timestamp)
self.send_delete(connection, url_path, err.timestamp)
except exceptions.DiskFileError:
# DiskFileErrors are expected while opening the diskfile,
# before any data is read and sent. Since there is no partial
@@ -356,7 +357,7 @@ class Sender(object):
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'updates end'):
msg = ':UPDATES: END\r\n'
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
connection.send('%x\r\n%s\r\n' % (len(msg), msg))
# Now, read their response for any issues.
while True:
with exceptions.MessageTimeout(
@@ -383,14 +384,14 @@ class Sender(object):
raise exceptions.ReplicationException(
'Unexpected response: %r' % line[:1024])
def send_subrequest(self, method, url_path, headers, df):
def send_subrequest(self, connection, method, url_path, headers, df):
msg = ['%s %s' % (method, url_path)]
for key, value in sorted(headers.items()):
msg.append('%s: %s' % (key, value))
msg = '\r\n'.join(msg) + '\r\n\r\n'
with exceptions.MessageTimeout(self.daemon.node_timeout,
'send_%s' % method.lower()):
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
connection.send('%x\r\n%s\r\n' % (len(msg), msg))
if df:
bytes_read = 0
@@ -399,7 +400,7 @@ class Sender(object):
with exceptions.MessageTimeout(self.daemon.node_timeout,
'send_%s chunk' %
method.lower()):
self.connection.send('%x\r\n%s\r\n' % (len(chunk), chunk))
connection.send('%x\r\n%s\r\n' % (len(chunk), chunk))
if bytes_read != df.content_length:
# Since we may now have partial state on the receiver we have
# to prevent the receiver finalising what may well be a bad or
@@ -411,14 +412,14 @@ class Sender(object):
raise exceptions.ReplicationException(
'Sent data length does not match content-length')
def send_delete(self, url_path, timestamp):
def send_delete(self, connection, url_path, timestamp):
"""
Sends a DELETE subrequest with the given information.
"""
headers = {'X-Timestamp': timestamp.internal}
self.send_subrequest('DELETE', url_path, headers, None)
self.send_subrequest(connection, 'DELETE', url_path, headers, None)
def send_put(self, url_path, df):
def send_put(self, connection, url_path, df):
"""
Sends a PUT subrequest for the url_path using the source df
(DiskFile) and content_length.
@@ -427,25 +428,25 @@ class Sender(object):
for key, value in df.get_datafile_metadata().items():
if key not in ('name', 'Content-Length'):
headers[key] = value
self.send_subrequest('PUT', url_path, headers, df)
self.send_subrequest(connection, 'PUT', url_path, headers, df)
def send_post(self, url_path, df):
def send_post(self, connection, url_path, df):
metadata = df.get_metafile_metadata()
if metadata is None:
return
self.send_subrequest('POST', url_path, metadata, None)
self.send_subrequest(connection, 'POST', url_path, metadata, None)
def disconnect(self):
def disconnect(self, connection):
"""
Closes down the connection to the object server once done
with the SSYNC request.
"""
if not self.connection:
if not connection:
return
try:
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'disconnect'):
self.connection.send('0\r\n\r\n')
connection.send('0\r\n\r\n')
except (Exception, exceptions.Timeout):
pass # We're okay with the above failing.
self.connection.close()
connection.close()

View File

@@ -108,10 +108,11 @@ class TestBaseSsync(BaseTest):
return wrapped_readline
def wrapped_connect():
orig_connect()
sender.connection.send = make_send_wrapper(
sender.connection.send)
connection = orig_connect()
connection.send = make_send_wrapper(
connection.send)
sender.readline = make_readline_wrapper(sender.readline)
return connection
return wrapped_connect, trace
def _get_object_data(self, path, **kwargs):

View File

@@ -162,8 +162,9 @@ class TestSender(BaseTest):
'EXCEPTION in ssync.Sender'))
def test_call_calls_others(self):
connection = FakeConnection()
self.sender.suffixes = ['abc']
self.sender.connect = mock.MagicMock()
self.sender.connect = mock.MagicMock(return_value=connection)
self.sender.missing_check = mock.MagicMock()
self.sender.updates = mock.MagicMock()
self.sender.disconnect = mock.MagicMock()
@@ -171,9 +172,9 @@ class TestSender(BaseTest):
self.assertTrue(success)
self.assertEqual(candidates, {})
self.sender.connect.assert_called_once_with()
self.sender.missing_check.assert_called_once_with()
self.sender.updates.assert_called_once_with()
self.sender.disconnect.assert_called_once_with()
self.sender.missing_check.assert_called_once_with(connection)
self.sender.updates.assert_called_once_with(connection)
self.sender.disconnect.assert_called_once_with(connection)
def test_connect(self):
node = dict(replication_ip='1.2.3.4', replication_port=5678,
@@ -348,7 +349,8 @@ class TestSender(BaseTest):
def test_call(self):
def patch_sender(sender):
sender.connect = mock.MagicMock()
connection = FakeConnection()
sender.connect = mock.MagicMock(return_value=connection)
sender.missing_check = mock.MagicMock()
sender.updates = mock.MagicMock()
sender.disconnect = mock.MagicMock()
@@ -428,7 +430,7 @@ class TestSender(BaseTest):
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.node = {}
self.sender.job = {
'device': 'dev',
@@ -446,7 +448,7 @@ class TestSender(BaseTest):
':UPDATES: END\r\n'
))
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock()
self.sender.connect = mock.MagicMock(return_value=connection)
df = mock.MagicMock()
df.content_length = 0
self.sender.df_mgr.get_diskfile_from_hash = mock.MagicMock(
@@ -455,7 +457,7 @@ class TestSender(BaseTest):
success, candidates = self.sender()
self.assertTrue(success)
found_post = found_put = False
for chunk in self.sender.connection.sent:
for chunk in connection.sent:
if 'POST' in chunk:
found_post = True
if 'PUT' in chunk:
@@ -474,7 +476,7 @@ class TestSender(BaseTest):
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.node = {}
self.sender.job = {
'device': 'dev',
@@ -489,7 +491,7 @@ class TestSender(BaseTest):
'9d41d8cd98f00b204e9800998ecf0abc d\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock()
self.sender.connect = mock.MagicMock(return_value=connection)
self.sender.updates = mock.MagicMock()
self.sender.disconnect = mock.MagicMock()
success, candidates = self.sender()
@@ -516,13 +518,13 @@ class TestSender(BaseTest):
}
self.sender = ssync_sender.Sender(self.daemon, None, job, ['abc'],
['9d41d8cd98f00b204e9800998ecf0abc'])
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.response = FakeResponse(
chunk_body=(
':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock()
self.sender.connect = mock.MagicMock(return_value=connection)
self.sender.updates = mock.MagicMock()
self.sender.disconnect = mock.MagicMock()
success, candidates = self.sender()
@@ -549,14 +551,14 @@ class TestSender(BaseTest):
}
self.sender = ssync_sender.Sender(self.daemon, {}, job, ['abc'],
['9d41d8cd98f00b204e9800998ecf0abc'])
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.response = FakeResponse(
chunk_body=(
':MISSING_CHECK: START\r\n'
'9d41d8cd98f00b204e9800998ecf0abc d\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock()
self.sender.connect = mock.MagicMock(return_value=connection)
self.sender.updates = mock.MagicMock()
self.sender.disconnect = mock.MagicMock()
success, candidates = self.sender()
@@ -694,10 +696,11 @@ class TestSender(BaseTest):
self.assertEqual(self.sender.readline(), '')
def test_missing_check_timeout(self):
self.sender.connection = FakeConnection()
self.sender.connection.send = lambda d: eventlet.sleep(1)
connection = FakeConnection()
connection.send = lambda d: eventlet.sleep(1)
self.sender.daemon.node_timeout = 0.01
self.assertRaises(exceptions.MessageTimeout, self.sender.missing_check)
self.assertRaises(exceptions.MessageTimeout, self.sender.missing_check,
connection)
def test_missing_check_has_empty_suffixes(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@@ -709,7 +712,7 @@ class TestSender(BaseTest):
'No match for %r %r %r %r' % (device, partition,
policy, suffixes))
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.job = {
'device': 'dev',
'partition': '9',
@@ -721,9 +724,9 @@ class TestSender(BaseTest):
':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.missing_check()
self.sender.missing_check(connection)
self.assertEqual(
''.join(self.sender.connection.sent),
''.join(connection.sent),
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.send_map, {})
@@ -751,7 +754,7 @@ class TestSender(BaseTest):
'No match for %r %r %r %r' % (device, partition,
policy, suffixes))
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.job = {
'device': 'dev',
'partition': '9',
@@ -763,9 +766,9 @@ class TestSender(BaseTest):
':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.missing_check()
self.sender.missing_check(connection)
self.assertEqual(
''.join(self.sender.connection.sent),
''.join(connection.sent),
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'3b\r\n9d41d8cd98f00b204e9800998ecf0def 1380144472.22222 '
@@ -798,7 +801,7 @@ class TestSender(BaseTest):
'No match for %r %r %r %r' % (device, partition,
policy, suffixes))
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.job = {
'device': 'dev',
'partition': '9',
@@ -809,12 +812,12 @@ class TestSender(BaseTest):
self.sender.response = FakeResponse(chunk_body='\r\n')
exc = None
try:
self.sender.missing_check()
self.sender.missing_check(connection)
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), 'Early disconnect')
self.assertEqual(
''.join(self.sender.connection.sent),
''.join(connection.sent),
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
@@ -835,7 +838,7 @@ class TestSender(BaseTest):
'No match for %r %r %r %r' % (device, partition,
policy, suffixes))
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.job = {
'device': 'dev',
'partition': '9',
@@ -847,12 +850,12 @@ class TestSender(BaseTest):
chunk_body=':MISSING_CHECK: START\r\n')
exc = None
try:
self.sender.missing_check()
self.sender.missing_check(connection)
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), 'Early disconnect')
self.assertEqual(
''.join(self.sender.connection.sent),
''.join(connection.sent),
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
@@ -873,7 +876,7 @@ class TestSender(BaseTest):
'No match for %r %r %r %r' % (device, partition,
policy, suffixes))
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.job = {
'device': 'dev',
'partition': '9',
@@ -884,12 +887,12 @@ class TestSender(BaseTest):
self.sender.response = FakeResponse(chunk_body='OH HAI\r\n')
exc = None
try:
self.sender.missing_check()
self.sender.missing_check(connection)
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), "Unexpected response: 'OH HAI'")
self.assertEqual(
''.join(self.sender.connection.sent),
''.join(connection.sent),
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
@@ -910,7 +913,7 @@ class TestSender(BaseTest):
'No match for %r %r %r %r' % (device, partition,
policy, suffixes))
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.job = {
'device': 'dev',
'partition': '9',
@@ -923,9 +926,9 @@ class TestSender(BaseTest):
'0123abc dm\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.missing_check()
self.sender.missing_check(connection)
self.assertEqual(
''.join(self.sender.connection.sent),
''.join(connection.sent),
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
@@ -950,7 +953,7 @@ class TestSender(BaseTest):
'No match for %r %r %r %r' % (device, partition,
policy, suffixes))
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.job = {
'device': 'dev',
'partition': '9',
@@ -963,7 +966,7 @@ class TestSender(BaseTest):
'0123abc d extra response parts\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.missing_check()
self.sender.missing_check(connection)
self.assertEqual(self.sender.send_map,
{'0123abc': {'data': True}})
self.assertEqual(self.sender.available_map,
@@ -971,26 +974,27 @@ class TestSender(BaseTest):
{'ts_data': Timestamp(1380144470.00000)})]))
def test_updates_timeout(self):
self.sender.connection = FakeConnection()
self.sender.connection.send = lambda d: eventlet.sleep(1)
connection = FakeConnection()
connection.send = lambda d: eventlet.sleep(1)
self.sender.daemon.node_timeout = 0.01
self.assertRaises(exceptions.MessageTimeout, self.sender.updates)
self.assertRaises(exceptions.MessageTimeout, self.sender.updates,
connection)
def test_updates_empty_send_map(self):
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.send_map = {}
self.sender.response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates()
self.sender.updates(connection)
self.assertEqual(
''.join(self.sender.connection.sent),
''.join(connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
def test_updates_unexpected_response_lines1(self):
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.send_map = {}
self.sender.response = FakeResponse(
chunk_body=(
@@ -999,17 +1003,17 @@ class TestSender(BaseTest):
':UPDATES: END\r\n'))
exc = None
try:
self.sender.updates()
self.sender.updates(connection)
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), "Unexpected response: 'abc'")
self.assertEqual(
''.join(self.sender.connection.sent),
''.join(connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
def test_updates_unexpected_response_lines2(self):
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.send_map = {}
self.sender.response = FakeResponse(
chunk_body=(
@@ -1018,12 +1022,12 @@ class TestSender(BaseTest):
':UPDATES: END\r\n'))
exc = None
try:
self.sender.updates()
self.sender.updates(connection)
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), "Unexpected response: 'abc'")
self.assertEqual(
''.join(self.sender.connection.sent),
''.join(connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
@@ -1035,7 +1039,7 @@ class TestSender(BaseTest):
object_hash = utils.hash_path(*object_parts)
delete_timestamp = utils.normalize_timestamp(time.time())
df.delete(delete_timestamp)
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.job = {
'device': device,
'partition': part,
@@ -1050,14 +1054,14 @@ class TestSender(BaseTest):
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates()
self.sender.updates(connection)
self.sender.send_delete.assert_called_once_with(
'/a/c/o', delete_timestamp)
connection, '/a/c/o', delete_timestamp)
self.assertEqual(self.sender.send_put.mock_calls, [])
# note that the delete line isn't actually sent since we mock
# send_delete; send_delete is tested separately.
self.assertEqual(
''.join(self.sender.connection.sent),
''.join(connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
@@ -1069,7 +1073,7 @@ class TestSender(BaseTest):
object_hash = utils.hash_path(*object_parts)
delete_timestamp = utils.normalize_timestamp(time.time())
df.delete(delete_timestamp)
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.job = {
'device': device,
'partition': part,
@@ -1082,9 +1086,9 @@ class TestSender(BaseTest):
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates()
self.sender.updates(connection)
self.assertEqual(
''.join(self.sender.connection.sent),
''.join(connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
'30\r\n'
'DELETE /a/c/o\r\n'
@@ -1108,7 +1112,7 @@ class TestSender(BaseTest):
object_hash = utils.hash_path(*object_parts)
df.open()
expected = df.get_metadata()
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.job = {
'device': device,
'partition': part,
@@ -1125,19 +1129,19 @@ class TestSender(BaseTest):
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates()
self.sender.updates(connection)
self.assertEqual(self.sender.send_delete.mock_calls, [])
self.assertEqual(self.sender.send_post.mock_calls, [])
self.assertEqual(1, len(self.sender.send_put.mock_calls))
args, _kwargs = self.sender.send_put.call_args
path, df = args
connection, path, df = args
self.assertEqual(path, '/a/c/o')
self.assertTrue(isinstance(df, diskfile.DiskFile))
self.assertEqual(expected, df.get_metadata())
# note that the put line isn't actually sent since we mock send_put;
# send_put is tested separately.
self.assertEqual(
''.join(self.sender.connection.sent),
''.join(connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
@@ -1155,7 +1159,7 @@ class TestSender(BaseTest):
object_hash = utils.hash_path(*object_parts)
df.open()
expected = df.get_metadata()
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.job = {
'device': device,
'partition': part,
@@ -1172,19 +1176,19 @@ class TestSender(BaseTest):
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates()
self.sender.updates(connection)
self.assertEqual(self.sender.send_delete.mock_calls, [])
self.assertEqual(self.sender.send_put.mock_calls, [])
self.assertEqual(1, len(self.sender.send_post.mock_calls))
args, _kwargs = self.sender.send_post.call_args
path, df = args
connection, path, df = args
self.assertEqual(path, '/a/c/o')
self.assertIsInstance(df, diskfile.DiskFile)
self.assertEqual(expected, df.get_metadata())
# note that the post line isn't actually sent since we mock send_post;
# send_post is tested separately.
self.assertEqual(
''.join(self.sender.connection.sent),
''.join(connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
@@ -1202,7 +1206,7 @@ class TestSender(BaseTest):
object_hash = utils.hash_path(*object_parts)
df.open()
expected = df.get_metadata()
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.job = {
'device': device,
'partition': part,
@@ -1219,24 +1223,24 @@ class TestSender(BaseTest):
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates()
self.sender.updates(connection)
self.assertEqual(self.sender.send_delete.mock_calls, [])
self.assertEqual(1, len(self.sender.send_put.mock_calls))
self.assertEqual(1, len(self.sender.send_post.mock_calls))
args, _kwargs = self.sender.send_put.call_args
path, df = args
connection, path, df = args
self.assertEqual(path, '/a/c/o')
self.assertIsInstance(df, diskfile.DiskFile)
self.assertEqual(expected, df.get_metadata())
args, _kwargs = self.sender.send_post.call_args
path, df = args
connection, path, df = args
self.assertEqual(path, '/a/c/o')
self.assertIsInstance(df, diskfile.DiskFile)
self.assertEqual(expected, df.get_metadata())
self.assertEqual(
''.join(self.sender.connection.sent),
''.join(connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
@@ -1248,7 +1252,7 @@ class TestSender(BaseTest):
policy=POLICIES[0])
object_hash = utils.hash_path(*object_parts)
expected = df.get_metadata()
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.job = {
'device': device,
'partition': part,
@@ -1262,9 +1266,9 @@ class TestSender(BaseTest):
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates()
self.sender.updates(connection)
args, _kwargs = self.sender.send_put.call_args
path, df = args
connection, path, df = args
self.assertEqual(path, '/a/c/o')
self.assertTrue(isinstance(df, diskfile.DiskFile))
self.assertEqual(expected, df.get_metadata())
@@ -1273,7 +1277,7 @@ class TestSender(BaseTest):
df._datadir)
def test_updates_read_response_timeout_start(self):
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.send_map = {}
self.sender.response = FakeResponse(
chunk_body=(
@@ -1287,25 +1291,26 @@ class TestSender(BaseTest):
self.sender.readline = delayed_readline
self.sender.daemon.http_timeout = 0.01
self.assertRaises(exceptions.MessageTimeout, self.sender.updates)
self.assertRaises(exceptions.MessageTimeout, self.sender.updates,
connection)
def test_updates_read_response_disconnect_start(self):
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.send_map = {}
self.sender.response = FakeResponse(chunk_body='\r\n')
exc = None
try:
self.sender.updates()
self.sender.updates(connection)
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), 'Early disconnect')
self.assertEqual(
''.join(self.sender.connection.sent),
''.join(connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
def test_updates_read_response_unexp_start(self):
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.send_map = {}
self.sender.response = FakeResponse(
chunk_body=(
@@ -1314,17 +1319,17 @@ class TestSender(BaseTest):
':UPDATES: END\r\n'))
exc = None
try:
self.sender.updates()
self.sender.updates(connection)
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), "Unexpected response: 'anything else'")
self.assertEqual(
''.join(self.sender.connection.sent),
''.join(connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
def test_updates_read_response_timeout_end(self):
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.send_map = {}
self.sender.response = FakeResponse(
chunk_body=(
@@ -1340,10 +1345,11 @@ class TestSender(BaseTest):
self.sender.readline = delayed_readline
self.sender.daemon.http_timeout = 0.01
self.assertRaises(exceptions.MessageTimeout, self.sender.updates)
self.assertRaises(exceptions.MessageTimeout, self.sender.updates,
connection)
def test_updates_read_response_disconnect_end(self):
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.send_map = {}
self.sender.response = FakeResponse(
chunk_body=(
@@ -1351,17 +1357,17 @@ class TestSender(BaseTest):
'\r\n'))
exc = None
try:
self.sender.updates()
self.sender.updates(connection)
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), 'Early disconnect')
self.assertEqual(
''.join(self.sender.connection.sent),
''.join(connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
def test_updates_read_response_unexp_end(self):
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.send_map = {}
self.sender.response = FakeResponse(
chunk_body=(
@@ -1370,33 +1376,33 @@ class TestSender(BaseTest):
':UPDATES: END\r\n'))
exc = None
try:
self.sender.updates()
self.sender.updates(connection)
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), "Unexpected response: 'anything else'")
self.assertEqual(
''.join(self.sender.connection.sent),
''.join(connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
def test_send_delete_timeout(self):
self.sender.connection = FakeConnection()
self.sender.connection.send = lambda d: eventlet.sleep(1)
connection = FakeConnection()
connection.send = lambda d: eventlet.sleep(1)
self.sender.daemon.node_timeout = 0.01
exc = None
try:
self.sender.send_delete('/a/c/o',
self.sender.send_delete(connection, '/a/c/o',
utils.Timestamp('1381679759.90941'))
except exceptions.MessageTimeout as err:
exc = err
self.assertEqual(str(exc), '0.01 seconds: send_delete')
def test_send_delete(self):
self.sender.connection = FakeConnection()
self.sender.send_delete('/a/c/o',
connection = FakeConnection()
self.sender.send_delete(connection, '/a/c/o',
utils.Timestamp('1381679759.90941'))
self.assertEqual(
''.join(self.sender.connection.sent),
''.join(connection.sent),
'30\r\n'
'DELETE /a/c/o\r\n'
'X-Timestamp: 1381679759.90941\r\n'
@@ -1405,19 +1411,19 @@ class TestSender(BaseTest):
def test_send_put_initial_timeout(self):
df = self._make_open_diskfile()
df._disk_chunk_size = 2
self.sender.connection = FakeConnection()
self.sender.connection.send = lambda d: eventlet.sleep(1)
connection = FakeConnection()
connection.send = lambda d: eventlet.sleep(1)
self.sender.daemon.node_timeout = 0.01
exc = None
try:
self.sender.send_put('/a/c/o', df)
self.sender.send_put(connection, '/a/c/o', df)
except exceptions.MessageTimeout as err:
exc = err
self.assertEqual(str(exc), '0.01 seconds: send_put')
def test_send_put_chunk_timeout(self):
df = self._make_open_diskfile()
self.sender.connection = FakeConnection()
connection = FakeConnection()
self.sender.daemon.node_timeout = 0.01
one_shot = [None]
@@ -1428,11 +1434,11 @@ class TestSender(BaseTest):
except IndexError:
eventlet.sleep(1)
self.sender.connection.send = mock_send
connection.send = mock_send
exc = None
try:
self.sender.send_put('/a/c/o', df)
self.sender.send_put(connection, '/a/c/o', df)
except exceptions.MessageTimeout as err:
exc = err
self.assertEqual(str(exc), '0.01 seconds: send_put chunk')
@@ -1458,10 +1464,10 @@ class TestSender(BaseTest):
metadata = {'X-Timestamp': t2.internal, 'X-Object-Meta-Fruit': 'kiwi'}
df.write_metadata(metadata)
df.open()
self.sender.connection = FakeConnection()
self.sender.send_put(path, df)
connection = FakeConnection()
self.sender.send_put(connection, path, df)
self.assertEqual(
''.join(self.sender.connection.sent),
''.join(connection.sent),
'%(length)s\r\n'
'PUT %(path)s\r\n'
'Content-Length: %(Content-Length)s\r\n'
@@ -1499,11 +1505,11 @@ class TestSender(BaseTest):
path = six.moves.urllib.parse.quote(df.read_metadata()['name'])
length = format(61 + len(path) + len(meta_value), 'x')
self.sender.connection = FakeConnection()
connection = FakeConnection()
with df.open():
self.sender.send_post(path, df)
self.sender.send_post(connection, path, df)
self.assertEqual(
''.join(self.sender.connection.sent),
''.join(connection.sent),
'%s\r\n'
'POST %s\r\n'
'X-Object-Meta-Foo: %s\r\n'
@@ -1519,18 +1525,18 @@ class TestSender(BaseTest):
'o_with_caract\xc3\xa8res_like_in_french', 'm\xc3\xa8ta')
def test_disconnect_timeout(self):
self.sender.connection = FakeConnection()
self.sender.connection.send = lambda d: eventlet.sleep(1)
connection = FakeConnection()
connection.send = lambda d: eventlet.sleep(1)
self.sender.daemon.node_timeout = 0.01
self.sender.disconnect()
self.assertEqual(''.join(self.sender.connection.sent), '')
self.assertTrue(self.sender.connection.closed)
self.sender.disconnect(connection)
self.assertEqual(''.join(connection.sent), '')
self.assertTrue(connection.closed)
def test_disconnect(self):
self.sender.connection = FakeConnection()
self.sender.disconnect()
self.assertEqual(''.join(self.sender.connection.sent), '0\r\n\r\n')
self.assertTrue(self.sender.connection.closed)
connection = FakeConnection()
self.sender.disconnect(connection)
self.assertEqual(''.join(connection.sent), '0\r\n\r\n')
self.assertTrue(connection.closed)
class TestModuleMethods(unittest.TestCase):