Prevent ssync writing bad fragment data to diskfile

Previously, if a reconstructor sync type job failed to provide
sufficient bytes from a reconstructed fragment body iterator to match
the content-length that the ssync sender had already sent to the ssync
receiver, the sender would still proceed to send the next
subrequest. The ssync receiver might then write the start of the next
subrequest to the partially complete diskfile for the previous
subrequest (including writing subrequest headers to that diskfile)
until it has received content-length bytes.

Since a reconstructor ssync job does not send an ETag header (it
cannot because it does not know the ETag of a reconstructed fragment
until it has been sent) then the receiving object server does not
detect the "bad" data written to the fragment diskfile, and worse,
will label it with an ETag that matches the md5 sum of the bad
data. The bad fragment file will therefore appear good to the auditor.

There is no easy way for the ssync sender to communicate a lack of
source data to the receiver other than by disconnecting the
session. So this patch adds a check in the ssync sender that the sent
byte count is equal to the sent Content-Length header value for each
subrequest, and disconnect if a mismatch is detected.

The disconnect prevents the receiver finalizing the bad diskfile, but
also prevents subsequent fragments in the ssync job being sync'd until
the next cycle.

Closes-Bug: #1631144
Co-Authored-By: Kota Tsuyuzaki <tsuyuzaki.kota@lab.ntt.co.jp>

Change-Id: I54068906efdb9cd58fcdc6eae7c2163ea92afb9d
This commit is contained in:
Alistair Coles 2016-10-12 20:00:13 +01:00
parent be1cd1ba40
commit 3218f8b064
7 changed files with 385 additions and 36 deletions

View File

@ -77,8 +77,7 @@ class RebuildingECDiskFileStream(object):
self.datafile_metadata = datafile_metadata
# the new FA is going to have the same length as others in the set
self._content_length = self.datafile_metadata['Content-Length']
self._content_length = int(self.datafile_metadata['Content-Length'])
# update the FI and delete the ETag, the obj server will
# recalc on the other side...
self.datafile_metadata['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index

View File

@ -179,7 +179,7 @@ class Receiver(object):
yield ':ERROR: %d %r\n' % (0, str(err))
except exceptions.MessageTimeout as err:
self.app.logger.error(
'%s/%s/%s TIMEOUT in replication.Receiver: %s' % (
'%s/%s/%s TIMEOUT in ssync.Receiver: %s' % (
self.request.remote_addr, self.device, self.partition,
err))
yield ':ERROR: %d %r\n' % (408, str(err))
@ -188,11 +188,11 @@ class Receiver(object):
yield ':ERROR: %d %r\n' % (err.status_int, body)
except Exception as err:
self.app.logger.exception(
'%s/%s/%s EXCEPTION in replication.Receiver' %
'%s/%s/%s EXCEPTION in ssync.Receiver' %
(self.request.remote_addr, self.device, self.partition))
yield ':ERROR: %d %r\n' % (0, str(err))
except Exception:
self.app.logger.exception('EXCEPTION in replication.Receiver')
self.app.logger.exception('EXCEPTION in ssync.Receiver')
if self.disconnect:
# This makes the socket close early so the remote side doesn't have
# to send its whole request while the lower Eventlet-level just
@ -287,7 +287,7 @@ class Receiver(object):
# if commit fails then log exception and fall back to wanting
# a full update
self.app.logger.exception(
'%s/%s/%s EXCEPTION in replication.Receiver while '
'%s/%s/%s EXCEPTION in ssync.Receiver while '
'attempting commit of %s'
% (self.request.remote_addr, self.device, self.partition,
df._datadir))
@ -466,7 +466,7 @@ class Receiver(object):
chunk = self.fp.read(
min(left, self.app.network_chunk_size))
if not chunk:
raise Exception(
raise exceptions.ChunkReadError(
'Early termination for %s %s' % (method, path))
left -= len(chunk)
yield chunk
@ -488,6 +488,9 @@ class Receiver(object):
resp.status_int == http.HTTP_NOT_FOUND:
successes += 1
else:
self.app.logger.warning(
'ssync subrequest failed with %s: %s %s' %
(resp.status_int, method, subreq.path))
failures += 1
if failures >= self.app.replication_failure_threshold and (
not successes or

View File

@ -149,7 +149,7 @@ class Sender(object):
# was originally written to shell out to rsync which would do
# no such thing.
self.daemon.logger.exception(
'%s:%s/%s/%s EXCEPTION in replication.Sender',
'%s:%s/%s/%s EXCEPTION in ssync.Sender',
self.node.get('replication_ip'),
self.node.get('replication_port'),
self.node.get('device'), self.job.get('partition'))
@ -163,7 +163,7 @@ class Sender(object):
# This particular exception handler does the minimal amount as it
# would only get called if the above except Exception handler
# failed (bad node or job data).
self.daemon.logger.exception('EXCEPTION in replication.Sender')
self.daemon.logger.exception('EXCEPTION in ssync.Sender')
return False, {}
def connect(self):
@ -350,6 +350,11 @@ class Sender(object):
if want.get('data'):
self.send_delete(url_path, err.timestamp)
except exceptions.DiskFileError:
# DiskFileErrors are expected while opening the diskfile,
# before any data is read and sent. Since there is no partial
# state on the receiver it's ok to ignore this diskfile and
# continue. The diskfile may however be deleted after a
# successful ssync since it remains in the send_map.
pass
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'updates end'):
@ -404,10 +409,21 @@ class Sender(object):
msg = '\r\n'.join(msg) + '\r\n\r\n'
with exceptions.MessageTimeout(self.daemon.node_timeout, 'send_put'):
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
bytes_read = 0
for chunk in df.reader():
bytes_read += len(chunk)
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'send_put chunk'):
self.connection.send('%x\r\n%s\r\n' % (len(chunk), chunk))
if bytes_read != df.content_length:
# Since we may now have partial state on the receiver we have to
# prevent the receiver finalising what may well be a bad or
# partially written diskfile. Unfortunately we have no other option
# than to pull the plug on this ssync session. If ssync supported
# multiphase PUTs like the proxy uses for EC we could send a bad
# etag in a footer of this subrequest, but that is not supported.
raise exceptions.ReplicationException(
'Sent data length does not match content-length')
def send_post(self, url_path, df):
metadata = df.get_metafile_metadata()

View File

@ -2512,7 +2512,7 @@ class TestObjectReconstructor(unittest.TestCase):
node = part_nodes[1]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'Content-Length': '0',
'ETag': 'etag',
}
@ -2545,6 +2545,7 @@ class TestObjectReconstructor(unittest.TestCase):
*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, metadata)
self.assertEqual(0, df.content_length)
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),

View File

@ -29,9 +29,10 @@ from swift.common import utils
from swift.common.storage_policy import POLICIES
from swift.common.utils import Timestamp
from swift.obj import ssync_sender, server
from swift.obj.reconstructor import RebuildingECDiskFileStream
from swift.obj.reconstructor import RebuildingECDiskFileStream, \
ObjectReconstructor
from test.unit import patch_policies
from test.unit import patch_policies, debug_logger
from test.unit.obj.common import BaseTest, FakeReplicator
@ -60,13 +61,14 @@ class TestBaseSsync(BaseTest):
'mount_check': 'false',
'replication_one_per_device': 'false',
'log_requests': 'false'}
self.rx_controller = server.ObjectController(conf)
self.rx_logger = debug_logger()
self.rx_controller = server.ObjectController(conf, self.rx_logger)
self.ts_iter = (Timestamp(t)
for t in itertools.count(int(time.time())))
self.rx_ip = '127.0.0.1'
sock = eventlet.listen((self.rx_ip, 0))
self.rx_server = eventlet.spawn(
eventlet.wsgi.server, sock, self.rx_controller, utils.NullLogger())
eventlet.wsgi.server, sock, self.rx_controller, self.rx_logger)
self.rx_port = sock.getsockname()[1]
self.rx_node = {'replication_ip': self.rx_ip,
'replication_port': self.rx_port,
@ -657,6 +659,317 @@ class TestSsyncEC(TestBaseSsync):
error_msg)
class FakeResponse(object):
def __init__(self, frag_index, data):
self.headers = {
'X-Object-Sysmeta-Ec-Frag-Index': str(frag_index),
'X-Object-Sysmeta-Ec-Etag': 'the etag',
'X-Backend-Timestamp': '1234567890.12345'
}
self.data = data
def getheaders(self):
return self.headers
def read(self, length):
if isinstance(self.data, Exception):
raise self.data
val = self.data
self.data = ''
return val
@patch_policies(with_ec_default=True)
class TestSsyncECReconstructorSyncJob(TestBaseSsync):
def setUp(self):
super(TestSsyncECReconstructorSyncJob, self).setUp()
self.policy = POLICIES.default
self.rx_node_index = 0
self.tx_node_index = 1
# create sender side diskfiles...
self.tx_objs = {}
tx_df_mgr = self.daemon._diskfile_router[self.policy]
t1 = next(self.ts_iter)
self.tx_objs['o1'] = self._create_ondisk_files(
tx_df_mgr, 'o1', self.policy, t1, (self.tx_node_index,))
t2 = next(self.ts_iter)
self.tx_objs['o2'] = self._create_ondisk_files(
tx_df_mgr, 'o2', self.policy, t2, (self.tx_node_index,))
self.suffixes = set()
for diskfiles in list(self.tx_objs.values()):
for df in diskfiles:
self.suffixes.add(
os.path.basename(os.path.dirname(df._datadir)))
self.job_node = dict(self.rx_node)
self.job_node['index'] = self.rx_node_index
self.frag_length = int(
self.tx_objs['o1'][0].get_metadata()['Content-Length'])
def _test_reconstructor_sync_job(self, frag_responses):
# Helper method to mock reconstructor to consume given lists of fake
# responses while reconstructing a fragment for a sync type job. The
# tests verify that when the reconstructed fragment iter fails in some
# way then ssync does not mistakenly create fragments on the receiving
# node which have incorrect data.
# See https://bugs.launchpad.net/swift/+bug/1631144
# frag_responses is a list of two lists of responses to each
# reconstructor GET request for a fragment archive. The two items in
# the outer list are lists of responses for each of the two fragments
# to be reconstructed. Items in the inner lists are responses for each
# of the other fragments fetched during the reconstructor rebuild.
path_to_responses = {}
fake_get_response_calls = []
def fake_get_response(recon, node, part, path, headers, policy):
# select a list of fake responses for this path and return the next
# from the list
if path not in path_to_responses:
path_to_responses[path] = frag_responses.pop(0)
response = path_to_responses[path].pop()
fake_get_response_calls.append(path)
return response
def fake_get_part_nodes(part):
# the reconstructor will try to remove the receiver node from the
# object ring part nodes, but the fake node we created for our
# receiver is not actually in the ring part nodes, so append it
# here simply so that the reconstructor does not fail to remove it.
return (self.policy.object_ring._get_part_nodes(part) +
[self.job_node])
def fake_reconstruct(self, policy, fragment_payload, frag_index):
# fake EC reconstruction by returning first frag, which is ok
# because all frags would be same length
return fragment_payload[0]
with mock.patch(
'swift.obj.reconstructor.ObjectReconstructor._get_response',
fake_get_response):
with mock.patch(
'swift.obj.reconstructor.ObjectReconstructor._reconstruct',
fake_reconstruct):
with mock.patch.object(
self.policy.object_ring, 'get_part_nodes',
fake_get_part_nodes):
self.reconstructor = ObjectReconstructor(
{}, logger=debug_logger('test_reconstructor'))
job = {
'device': self.device,
'partition': self.partition,
'policy': self.policy,
'sync_diskfile_builder':
self.reconstructor.reconstruct_fa
}
sender = ssync_sender.Sender(
self.daemon, self.job_node, job, self.suffixes)
sender.connect, trace = self.make_connect_wrapper(sender)
sender()
return trace
def test_sync_reconstructor_partial_rebuild(self):
# First fragment to sync gets partial content from reconstructor.
# Expect ssync job to exit early with no file written on receiver.
frag_responses = [
[FakeResponse(i, 'x' * (self.frag_length - 1))
for i in range(self.policy.ec_ndata + self.policy.ec_nparity)],
[FakeResponse(i, 'y' * self.frag_length)
for i in range(self.policy.ec_ndata + self.policy.ec_nparity)]]
self._test_reconstructor_sync_job(frag_responses)
msgs = []
for obj_name in ('o1', 'o2'):
try:
df = self._open_rx_diskfile(
obj_name, self.policy, self.rx_node_index)
msgs.append('Unexpected rx diskfile for %r with content %r' %
(obj_name, ''.join([d for d in df.reader()])))
except DiskFileNotExist:
pass # expected outcome
if msgs:
self.fail('Failed with:\n%s' % '\n'.join(msgs))
log_lines = self.daemon.logger.get_lines_for_level('error')
self.assertIn('Sent data length does not match content-length',
log_lines[0])
self.assertFalse(log_lines[1:])
# trampoline for the receiver to write a log
eventlet.sleep(0)
log_lines = self.rx_logger.get_lines_for_level('warning')
self.assertIn('ssync subrequest failed with 499',
log_lines[0])
self.assertFalse(log_lines[1:])
self.assertFalse(self.rx_logger.get_lines_for_level('error'))
def test_sync_reconstructor_no_rebuilt_content(self):
# First fragment to sync gets no content in any response to
# reconstructor. Expect ssync job to exit early with no file written on
# receiver.
frag_responses = [
[FakeResponse(i, '')
for i in range(self.policy.ec_ndata + self.policy.ec_nparity)],
[FakeResponse(i, 'y' * self.frag_length)
for i in range(self.policy.ec_ndata + self.policy.ec_nparity)]]
self._test_reconstructor_sync_job(frag_responses)
msgs = []
for obj_name in ('o1', 'o2'):
try:
df = self._open_rx_diskfile(
obj_name, self.policy, self.rx_node_index)
msgs.append('Unexpected rx diskfile for %r with content %r' %
(obj_name, ''.join([d for d in df.reader()])))
except DiskFileNotExist:
pass # expected outcome
if msgs:
self.fail('Failed with:\n%s' % '\n'.join(msgs))
log_lines = self.daemon.logger.get_lines_for_level('error')
self.assertIn('Sent data length does not match content-length',
log_lines[0])
self.assertFalse(log_lines[1:])
# trampoline for the receiver to write a log
eventlet.sleep(0)
log_lines = self.rx_logger.get_lines_for_level('warning')
self.assertIn('ssync subrequest failed with 499',
log_lines[0])
self.assertFalse(log_lines[1:])
self.assertFalse(self.rx_logger.get_lines_for_level('error'))
def test_sync_reconstructor_exception_during_rebuild(self):
# First fragment to sync has some reconstructor get responses raise
# exception while rebuilding. Expect ssync job to exit early with no
# files written on receiver.
frag_responses = [
# ec_ndata responses are ok, but one of these will be ignored as
# it is for the frag index being rebuilt
[FakeResponse(i, 'x' * self.frag_length)
for i in range(self.policy.ec_ndata)] +
# ec_nparity responses will raise an Exception - at least one of
# these will be used during rebuild
[FakeResponse(i, Exception('raised in response read method'))
for i in range(self.policy.ec_ndata,
self.policy.ec_ndata + self.policy.ec_nparity)],
# second set of response are all good
[FakeResponse(i, 'y' * self.frag_length)
for i in range(self.policy.ec_ndata + self.policy.ec_nparity)]]
self._test_reconstructor_sync_job(frag_responses)
msgs = []
for obj_name in ('o1', 'o2'):
try:
df = self._open_rx_diskfile(
obj_name, self.policy, self.rx_node_index)
msgs.append('Unexpected rx diskfile for %r with content %r' %
(obj_name, ''.join([d for d in df.reader()])))
except DiskFileNotExist:
pass # expected outcome
if msgs:
self.fail('Failed with:\n%s' % '\n'.join(msgs))
log_lines = self.reconstructor.logger.get_lines_for_level('error')
self.assertIn('Error trying to rebuild', log_lines[0])
log_lines = self.daemon.logger.get_lines_for_level('error')
self.assertIn('Sent data length does not match content-length',
log_lines[0])
self.assertFalse(log_lines[1:])
# trampoline for the receiver to write a log
eventlet.sleep(0)
log_lines = self.rx_logger.get_lines_for_level('warning')
self.assertIn('ssync subrequest failed with 499',
log_lines[0])
self.assertFalse(log_lines[1:])
self.assertFalse(self.rx_logger.get_lines_for_level('error'))
def test_sync_reconstructor_no_responses(self):
# First fragment to sync gets no responses for reconstructor to rebuild
# with, nothing is sent to receiver so expect to skip that fragment and
# continue with second.
frag_responses = [
[None
for i in range(self.policy.ec_ndata + self.policy.ec_nparity)],
[FakeResponse(i, 'y' * self.frag_length)
for i in range(self.policy.ec_ndata + self.policy.ec_nparity)]]
trace = self._test_reconstructor_sync_job(frag_responses)
results = self._analyze_trace(trace)
self.assertEqual(2, len(results['tx_missing']))
self.assertEqual(2, len(results['rx_missing']))
self.assertEqual(1, len(results['tx_updates']))
self.assertFalse(results['rx_updates'])
self.assertEqual('PUT', results['tx_updates'][0].get('method'))
synced_obj_name = results['tx_updates'][0].get('path')[-2:]
msgs = []
obj_name = synced_obj_name
try:
df = self._open_rx_diskfile(
obj_name, self.policy, self.rx_node_index)
self.assertEqual('y' * self.frag_length,
''.join([d for d in df.reader()]))
except DiskFileNotExist:
msgs.append('Missing rx diskfile for %r' % obj_name)
obj_names = list(self.tx_objs)
obj_names.remove(synced_obj_name)
obj_name = obj_names[0]
try:
df = self._open_rx_diskfile(
obj_name, self.policy, self.rx_node_index)
msgs.append('Unexpected rx diskfile for %r with content %r' %
(obj_name, ''.join([d for d in df.reader()])))
except DiskFileNotExist:
pass # expected outcome
if msgs:
self.fail('Failed with:\n%s' % '\n'.join(msgs))
self.assertFalse(self.daemon.logger.get_lines_for_level('error'))
log_lines = self.reconstructor.logger.get_lines_for_level('error')
self.assertIn('Unable to get enough responses', log_lines[0])
# trampoline for the receiver to write a log
eventlet.sleep(0)
self.assertFalse(self.rx_logger.get_lines_for_level('warning'))
self.assertFalse(self.rx_logger.get_lines_for_level('error'))
def test_sync_reconstructor_rebuild_ok(self):
# Sanity test for this class of tests. Both fragments get a full
# complement of responses and rebuild correctly.
frag_responses = [
[FakeResponse(i, 'x' * self.frag_length)
for i in range(self.policy.ec_ndata + self.policy.ec_nparity)],
[FakeResponse(i, 'y' * self.frag_length)
for i in range(self.policy.ec_ndata + self.policy.ec_nparity)]]
trace = self._test_reconstructor_sync_job(frag_responses)
results = self._analyze_trace(trace)
self.assertEqual(2, len(results['tx_missing']))
self.assertEqual(2, len(results['rx_missing']))
self.assertEqual(2, len(results['tx_updates']))
self.assertFalse(results['rx_updates'])
msgs = []
rx_frags = []
for obj_name in self.tx_objs:
try:
df = self._open_rx_diskfile(
obj_name, self.policy, self.rx_node_index)
rx_frags.append(''.join([d for d in df.reader()]))
except DiskFileNotExist:
msgs.append('Missing rx diskfile for %r' % obj_name)
if msgs:
self.fail('Failed with:\n%s' % '\n'.join(msgs))
self.assertIn('x' * self.frag_length, rx_frags)
self.assertIn('y' * self.frag_length, rx_frags)
self.assertFalse(rx_frags[2:])
self.assertFalse(self.daemon.logger.get_lines_for_level('error'))
self.assertFalse(
self.reconstructor.logger.get_lines_for_level('error'))
# trampoline for the receiver to write a log
eventlet.sleep(0)
self.assertFalse(self.rx_logger.get_lines_for_level('warning'))
self.assertFalse(self.rx_logger.get_lines_for_level('error'))
@patch_policies
class TestSsyncReplication(TestBaseSsync):
def test_sync(self):

View File

@ -445,7 +445,7 @@ class TestReceiver(unittest.TestCase):
mock_wsgi_input.mock_socket)
mock_wsgi_input.mock_socket.close.assert_called_once_with()
self.controller.logger.exception.assert_called_once_with(
'1.2.3.4/device/partition EXCEPTION in replication.Receiver')
'1.2.3.4/device/partition EXCEPTION in ssync.Receiver')
def test_SSYNC_Exception_Exception(self):
@ -481,7 +481,7 @@ class TestReceiver(unittest.TestCase):
mock_wsgi_input.mock_socket)
mock_wsgi_input.mock_socket.close.assert_called_once_with()
self.controller.logger.exception.assert_called_once_with(
'EXCEPTION in replication.Receiver')
'EXCEPTION in ssync.Receiver')
def test_MISSING_CHECK_timeout(self):
@ -522,7 +522,7 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(resp.status_int, 200)
self.assertTrue(mock_shutdown_safe.called)
self.controller.logger.error.assert_called_once_with(
'2.3.4.5/sda1/1 TIMEOUT in replication.Receiver: '
'2.3.4.5/sda1/1 TIMEOUT in ssync.Receiver: '
'0.01 seconds: missing_check line')
def test_MISSING_CHECK_other_exception(self):
@ -564,7 +564,7 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(resp.status_int, 200)
self.assertTrue(mock_shutdown_safe.called)
self.controller.logger.exception.assert_called_once_with(
'3.4.5.6/sda1/1 EXCEPTION in replication.Receiver')
'3.4.5.6/sda1/1 EXCEPTION in ssync.Receiver')
def test_MISSING_CHECK_empty_list(self):
@ -773,7 +773,7 @@ class TestReceiver(unittest.TestCase):
self.assertFalse(self.controller.logger.error.called)
self.assertTrue(self.controller.logger.exception.called)
self.assertIn(
'EXCEPTION in replication.Receiver while attempting commit of',
'EXCEPTION in ssync.Receiver while attempting commit of',
self.controller.logger.exception.call_args[0][0])
def test_MISSING_CHECK_storage_policy(self):
@ -970,7 +970,7 @@ class TestReceiver(unittest.TestCase):
mock_wsgi_input.mock_socket)
mock_wsgi_input.mock_socket.close.assert_called_once_with()
self.controller.logger.error.assert_called_once_with(
'2.3.4.5/device/partition TIMEOUT in replication.Receiver: '
'2.3.4.5/device/partition TIMEOUT in ssync.Receiver: '
'0.01 seconds: updates line')
def test_UPDATES_other_exception(self):
@ -1017,7 +1017,7 @@ class TestReceiver(unittest.TestCase):
mock_wsgi_input.mock_socket)
mock_wsgi_input.mock_socket.close.assert_called_once_with()
self.controller.logger.exception.assert_called_once_with(
'3.4.5.6/device/partition EXCEPTION in replication.Receiver')
'3.4.5.6/device/partition EXCEPTION in ssync.Receiver')
def test_UPDATES_no_problems_no_hard_disconnect(self):
@ -1071,7 +1071,7 @@ class TestReceiver(unittest.TestCase):
":ERROR: 0 'need more than 1 value to unpack'"])
self.assertEqual(resp.status_int, 200)
self.controller.logger.exception.assert_called_once_with(
'None/device/partition EXCEPTION in replication.Receiver')
'None/device/partition EXCEPTION in ssync.Receiver')
with mock.patch.object(
self.controller, 'DELETE',
@ -1093,7 +1093,7 @@ class TestReceiver(unittest.TestCase):
":ERROR: 0 'need more than 1 value to unpack'"])
self.assertEqual(resp.status_int, 200)
self.controller.logger.exception.assert_called_once_with(
'None/device/partition EXCEPTION in replication.Receiver')
'None/device/partition EXCEPTION in ssync.Receiver')
def test_UPDATES_no_headers(self):
self.controller.logger = mock.MagicMock()
@ -1110,7 +1110,7 @@ class TestReceiver(unittest.TestCase):
":ERROR: 0 'Got no headers for DELETE /a/c/o'"])
self.assertEqual(resp.status_int, 200)
self.controller.logger.exception.assert_called_once_with(
'None/device/partition EXCEPTION in replication.Receiver')
'None/device/partition EXCEPTION in ssync.Receiver')
def test_UPDATES_bad_headers(self):
self.controller.logger = mock.MagicMock()
@ -1128,7 +1128,7 @@ class TestReceiver(unittest.TestCase):
":ERROR: 0 'need more than 1 value to unpack'"])
self.assertEqual(resp.status_int, 200)
self.controller.logger.exception.assert_called_once_with(
'None/device/partition EXCEPTION in replication.Receiver')
'None/device/partition EXCEPTION in ssync.Receiver')
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
@ -1146,7 +1146,7 @@ class TestReceiver(unittest.TestCase):
":ERROR: 0 'need more than 1 value to unpack'"])
self.assertEqual(resp.status_int, 200)
self.controller.logger.exception.assert_called_once_with(
'None/device/partition EXCEPTION in replication.Receiver')
'None/device/partition EXCEPTION in ssync.Receiver')
def test_UPDATES_bad_content_length(self):
self.controller.logger = mock.MagicMock()
@ -1164,7 +1164,7 @@ class TestReceiver(unittest.TestCase):
':ERROR: 0 "invalid literal for int() with base 10: \'a\'"'])
self.assertEqual(resp.status_int, 200)
self.controller.logger.exception.assert_called_once_with(
'None/device/partition EXCEPTION in replication.Receiver')
'None/device/partition EXCEPTION in ssync.Receiver')
def test_UPDATES_content_length_with_DELETE(self):
self.controller.logger = mock.MagicMock()
@ -1182,7 +1182,7 @@ class TestReceiver(unittest.TestCase):
":ERROR: 0 'DELETE subrequest with content-length /a/c/o'"])
self.assertEqual(resp.status_int, 200)
self.controller.logger.exception.assert_called_once_with(
'None/device/partition EXCEPTION in replication.Receiver')
'None/device/partition EXCEPTION in ssync.Receiver')
def test_UPDATES_no_content_length_with_PUT(self):
self.controller.logger = mock.MagicMock()
@ -1199,7 +1199,7 @@ class TestReceiver(unittest.TestCase):
":ERROR: 0 'No content-length sent for PUT /a/c/o'"])
self.assertEqual(resp.status_int, 200)
self.controller.logger.exception.assert_called_once_with(
'None/device/partition EXCEPTION in replication.Receiver')
'None/device/partition EXCEPTION in ssync.Receiver')
def test_UPDATES_early_termination(self):
self.controller.logger = mock.MagicMock()
@ -1217,7 +1217,7 @@ class TestReceiver(unittest.TestCase):
":ERROR: 0 'Early termination for PUT /a/c/o'"])
self.assertEqual(resp.status_int, 200)
self.controller.logger.exception.assert_called_once_with(
'None/device/partition EXCEPTION in replication.Receiver')
'None/device/partition EXCEPTION in ssync.Receiver')
def test_UPDATES_failures(self):
@ -1250,6 +1250,9 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(resp.status_int, 200)
self.assertFalse(self.controller.logger.exception.called)
self.assertFalse(self.controller.logger.error.called)
self.assertTrue(self.controller.logger.warning.called)
self.assertEqual(3, self.controller.logger.warning.call_count)
self.controller.logger.clear()
# failures hit threshold and no successes, so ratio is like infinity
with mock.patch.object(self.controller, 'DELETE', _DELETE):
@ -1274,8 +1277,11 @@ class TestReceiver(unittest.TestCase):
":ERROR: 0 'Too many 4 failures to 0 successes'"])
self.assertEqual(resp.status_int, 200)
self.controller.logger.exception.assert_called_once_with(
'None/device/partition EXCEPTION in replication.Receiver')
'None/device/partition EXCEPTION in ssync.Receiver')
self.assertFalse(self.controller.logger.error.called)
self.assertTrue(self.controller.logger.warning.called)
self.assertEqual(4, self.controller.logger.warning.call_count)
self.controller.logger.clear()
# failures hit threshold and ratio hits 1.33333333333
with mock.patch.object(self.controller, 'DELETE', _DELETE):
@ -1304,6 +1310,9 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(resp.status_int, 200)
self.assertFalse(self.controller.logger.exception.called)
self.assertFalse(self.controller.logger.error.called)
self.assertTrue(self.controller.logger.warning.called)
self.assertEqual(4, self.controller.logger.warning.call_count)
self.controller.logger.clear()
# failures hit threshold and ratio hits 2.0
with mock.patch.object(self.controller, 'DELETE', _DELETE):
@ -1329,8 +1338,11 @@ class TestReceiver(unittest.TestCase):
":ERROR: 0 'Too many 4 failures to 2 successes'"])
self.assertEqual(resp.status_int, 200)
self.controller.logger.exception.assert_called_once_with(
'None/device/partition EXCEPTION in replication.Receiver')
'None/device/partition EXCEPTION in ssync.Receiver')
self.assertFalse(self.controller.logger.error.called)
self.assertTrue(self.controller.logger.warning.called)
self.assertEqual(4, self.controller.logger.warning.call_count)
self.controller.logger.clear()
def test_UPDATES_PUT(self):
_PUT_request = [None]
@ -1658,7 +1670,7 @@ class TestReceiver(unittest.TestCase):
":ERROR: 0 'Invalid subrequest method BONK'"])
self.assertEqual(resp.status_int, 200)
self.controller.logger.exception.assert_called_once_with(
'None/device/partition EXCEPTION in replication.Receiver')
'None/device/partition EXCEPTION in ssync.Receiver')
self.assertEqual(len(_BONK_request), 1) # sanity
self.assertEqual(_BONK_request[0], None)
@ -1885,6 +1897,8 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(resp.status_int, 200)
self.assertFalse(self.controller.logger.exception.called)
self.assertFalse(self.controller.logger.error.called)
self.assertTrue(self.controller.logger.warning.called)
self.assertEqual(2, self.controller.logger.warning.call_count)
self.assertEqual(len(_requests), 2) # sanity
req = _requests.pop(0)
self.assertEqual(req.path, '/device/partition/a/c/o1')

View File

@ -143,7 +143,7 @@ class TestSender(BaseTest):
error_lines = self.daemon.logger.get_lines_for_level('error')
for line in error_lines:
self.assertTrue(line.startswith(
'1.2.3.4:5678/sda1/9 EXCEPTION in replication.Sender:'))
'1.2.3.4:5678/sda1/9 EXCEPTION in ssync.Sender:'))
def test_call_catches_exception_handling_exception(self):
job = node = None # Will cause inside exception handler to fail
@ -156,7 +156,7 @@ class TestSender(BaseTest):
error_lines = self.daemon.logger.get_lines_for_level('error')
for line in error_lines:
self.assertTrue(line.startswith(
'EXCEPTION in replication.Sender'))
'EXCEPTION in ssync.Sender'))
def test_call_calls_others(self):
self.sender.suffixes = ['abc']
@ -461,7 +461,10 @@ class TestSender(BaseTest):
))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock()
self.sender.df_mgr.get_diskfile_from_hash = mock.MagicMock()
df = mock.MagicMock()
df.content_length = 0
self.sender.df_mgr.get_diskfile_from_hash = mock.MagicMock(
return_value=df)
self.sender.disconnect = mock.MagicMock()
success, candidates = self.sender()
self.assertTrue(success)