Merge "Prevent ssync writing bad fragment data to diskfile"
This commit is contained in:
commit
264e728364
@ -77,8 +77,7 @@ class RebuildingECDiskFileStream(object):
|
||||
self.datafile_metadata = datafile_metadata
|
||||
|
||||
# the new FA is going to have the same length as others in the set
|
||||
self._content_length = self.datafile_metadata['Content-Length']
|
||||
|
||||
self._content_length = int(self.datafile_metadata['Content-Length'])
|
||||
# update the FI and delete the ETag, the obj server will
|
||||
# recalc on the other side...
|
||||
self.datafile_metadata['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index
|
||||
|
@ -179,7 +179,7 @@ class Receiver(object):
|
||||
yield ':ERROR: %d %r\n' % (0, str(err))
|
||||
except exceptions.MessageTimeout as err:
|
||||
self.app.logger.error(
|
||||
'%s/%s/%s TIMEOUT in replication.Receiver: %s' % (
|
||||
'%s/%s/%s TIMEOUT in ssync.Receiver: %s' % (
|
||||
self.request.remote_addr, self.device, self.partition,
|
||||
err))
|
||||
yield ':ERROR: %d %r\n' % (408, str(err))
|
||||
@ -188,11 +188,11 @@ class Receiver(object):
|
||||
yield ':ERROR: %d %r\n' % (err.status_int, body)
|
||||
except Exception as err:
|
||||
self.app.logger.exception(
|
||||
'%s/%s/%s EXCEPTION in replication.Receiver' %
|
||||
'%s/%s/%s EXCEPTION in ssync.Receiver' %
|
||||
(self.request.remote_addr, self.device, self.partition))
|
||||
yield ':ERROR: %d %r\n' % (0, str(err))
|
||||
except Exception:
|
||||
self.app.logger.exception('EXCEPTION in replication.Receiver')
|
||||
self.app.logger.exception('EXCEPTION in ssync.Receiver')
|
||||
if self.disconnect:
|
||||
# This makes the socket close early so the remote side doesn't have
|
||||
# to send its whole request while the lower Eventlet-level just
|
||||
@ -287,7 +287,7 @@ class Receiver(object):
|
||||
# if commit fails then log exception and fall back to wanting
|
||||
# a full update
|
||||
self.app.logger.exception(
|
||||
'%s/%s/%s EXCEPTION in replication.Receiver while '
|
||||
'%s/%s/%s EXCEPTION in ssync.Receiver while '
|
||||
'attempting commit of %s'
|
||||
% (self.request.remote_addr, self.device, self.partition,
|
||||
df._datadir))
|
||||
@ -466,7 +466,7 @@ class Receiver(object):
|
||||
chunk = self.fp.read(
|
||||
min(left, self.app.network_chunk_size))
|
||||
if not chunk:
|
||||
raise Exception(
|
||||
raise exceptions.ChunkReadError(
|
||||
'Early termination for %s %s' % (method, path))
|
||||
left -= len(chunk)
|
||||
yield chunk
|
||||
@ -488,6 +488,9 @@ class Receiver(object):
|
||||
resp.status_int == http.HTTP_NOT_FOUND:
|
||||
successes += 1
|
||||
else:
|
||||
self.app.logger.warning(
|
||||
'ssync subrequest failed with %s: %s %s' %
|
||||
(resp.status_int, method, subreq.path))
|
||||
failures += 1
|
||||
if failures >= self.app.replication_failure_threshold and (
|
||||
not successes or
|
||||
|
@ -149,7 +149,7 @@ class Sender(object):
|
||||
# was originally written to shell out to rsync which would do
|
||||
# no such thing.
|
||||
self.daemon.logger.exception(
|
||||
'%s:%s/%s/%s EXCEPTION in replication.Sender',
|
||||
'%s:%s/%s/%s EXCEPTION in ssync.Sender',
|
||||
self.node.get('replication_ip'),
|
||||
self.node.get('replication_port'),
|
||||
self.node.get('device'), self.job.get('partition'))
|
||||
@ -163,7 +163,7 @@ class Sender(object):
|
||||
# This particular exception handler does the minimal amount as it
|
||||
# would only get called if the above except Exception handler
|
||||
# failed (bad node or job data).
|
||||
self.daemon.logger.exception('EXCEPTION in replication.Sender')
|
||||
self.daemon.logger.exception('EXCEPTION in ssync.Sender')
|
||||
return False, {}
|
||||
|
||||
def connect(self):
|
||||
@ -350,6 +350,11 @@ class Sender(object):
|
||||
if want.get('data'):
|
||||
self.send_delete(url_path, err.timestamp)
|
||||
except exceptions.DiskFileError:
|
||||
# DiskFileErrors are expected while opening the diskfile,
|
||||
# before any data is read and sent. Since there is no partial
|
||||
# state on the receiver it's ok to ignore this diskfile and
|
||||
# continue. The diskfile may however be deleted after a
|
||||
# successful ssync since it remains in the send_map.
|
||||
pass
|
||||
with exceptions.MessageTimeout(
|
||||
self.daemon.node_timeout, 'updates end'):
|
||||
@ -404,10 +409,21 @@ class Sender(object):
|
||||
msg = '\r\n'.join(msg) + '\r\n\r\n'
|
||||
with exceptions.MessageTimeout(self.daemon.node_timeout, 'send_put'):
|
||||
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
|
||||
bytes_read = 0
|
||||
for chunk in df.reader():
|
||||
bytes_read += len(chunk)
|
||||
with exceptions.MessageTimeout(
|
||||
self.daemon.node_timeout, 'send_put chunk'):
|
||||
self.connection.send('%x\r\n%s\r\n' % (len(chunk), chunk))
|
||||
if bytes_read != df.content_length:
|
||||
# Since we may now have partial state on the receiver we have to
|
||||
# prevent the receiver finalising what may well be a bad or
|
||||
# partially written diskfile. Unfortunately we have no other option
|
||||
# than to pull the plug on this ssync session. If ssync supported
|
||||
# multiphase PUTs like the proxy uses for EC we could send a bad
|
||||
# etag in a footer of this subrequest, but that is not supported.
|
||||
raise exceptions.ReplicationException(
|
||||
'Sent data length does not match content-length')
|
||||
|
||||
def send_post(self, url_path, df):
|
||||
metadata = df.get_metafile_metadata()
|
||||
|
@ -2506,7 +2506,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
node = part_nodes[1]
|
||||
metadata = {
|
||||
'name': '/a/c/o',
|
||||
'Content-Length': 0,
|
||||
'Content-Length': '0',
|
||||
'ETag': 'etag',
|
||||
}
|
||||
|
||||
@ -2539,6 +2539,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
*codes, body_iter=body_iter, headers=headers):
|
||||
df = self.reconstructor.reconstruct_fa(
|
||||
job, node, metadata)
|
||||
self.assertEqual(0, df.content_length)
|
||||
fixed_body = ''.join(df.reader())
|
||||
self.assertEqual(len(fixed_body), len(broken_body))
|
||||
self.assertEqual(md5(fixed_body).hexdigest(),
|
||||
|
@ -29,9 +29,10 @@ from swift.common import utils
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.utils import Timestamp
|
||||
from swift.obj import ssync_sender, server
|
||||
from swift.obj.reconstructor import RebuildingECDiskFileStream
|
||||
from swift.obj.reconstructor import RebuildingECDiskFileStream, \
|
||||
ObjectReconstructor
|
||||
|
||||
from test.unit import patch_policies
|
||||
from test.unit import patch_policies, debug_logger
|
||||
from test.unit.obj.common import BaseTest, FakeReplicator
|
||||
|
||||
|
||||
@ -60,13 +61,14 @@ class TestBaseSsync(BaseTest):
|
||||
'mount_check': 'false',
|
||||
'replication_one_per_device': 'false',
|
||||
'log_requests': 'false'}
|
||||
self.rx_controller = server.ObjectController(conf)
|
||||
self.rx_logger = debug_logger()
|
||||
self.rx_controller = server.ObjectController(conf, self.rx_logger)
|
||||
self.ts_iter = (Timestamp(t)
|
||||
for t in itertools.count(int(time.time())))
|
||||
self.rx_ip = '127.0.0.1'
|
||||
sock = eventlet.listen((self.rx_ip, 0))
|
||||
self.rx_server = eventlet.spawn(
|
||||
eventlet.wsgi.server, sock, self.rx_controller, utils.NullLogger())
|
||||
eventlet.wsgi.server, sock, self.rx_controller, self.rx_logger)
|
||||
self.rx_port = sock.getsockname()[1]
|
||||
self.rx_node = {'replication_ip': self.rx_ip,
|
||||
'replication_port': self.rx_port,
|
||||
@ -730,6 +732,317 @@ class TestSsyncEC(TestBaseSsync):
|
||||
self.assertFalse(results['rx_updates'])
|
||||
|
||||
|
||||
class FakeResponse(object):
|
||||
def __init__(self, frag_index, data):
|
||||
self.headers = {
|
||||
'X-Object-Sysmeta-Ec-Frag-Index': str(frag_index),
|
||||
'X-Object-Sysmeta-Ec-Etag': 'the etag',
|
||||
'X-Backend-Timestamp': '1234567890.12345'
|
||||
}
|
||||
self.data = data
|
||||
|
||||
def getheaders(self):
|
||||
return self.headers
|
||||
|
||||
def read(self, length):
|
||||
if isinstance(self.data, Exception):
|
||||
raise self.data
|
||||
val = self.data
|
||||
self.data = ''
|
||||
return val
|
||||
|
||||
|
||||
@patch_policies(with_ec_default=True)
|
||||
class TestSsyncECReconstructorSyncJob(TestBaseSsync):
|
||||
def setUp(self):
|
||||
super(TestSsyncECReconstructorSyncJob, self).setUp()
|
||||
self.policy = POLICIES.default
|
||||
self.rx_node_index = 0
|
||||
self.tx_node_index = 1
|
||||
|
||||
# create sender side diskfiles...
|
||||
self.tx_objs = {}
|
||||
tx_df_mgr = self.daemon._diskfile_router[self.policy]
|
||||
t1 = next(self.ts_iter)
|
||||
self.tx_objs['o1'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o1', self.policy, t1, (self.tx_node_index,))
|
||||
t2 = next(self.ts_iter)
|
||||
self.tx_objs['o2'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o2', self.policy, t2, (self.tx_node_index,))
|
||||
|
||||
self.suffixes = set()
|
||||
for diskfiles in list(self.tx_objs.values()):
|
||||
for df in diskfiles:
|
||||
self.suffixes.add(
|
||||
os.path.basename(os.path.dirname(df._datadir)))
|
||||
|
||||
self.job_node = dict(self.rx_node)
|
||||
self.job_node['index'] = self.rx_node_index
|
||||
|
||||
self.frag_length = int(
|
||||
self.tx_objs['o1'][0].get_metadata()['Content-Length'])
|
||||
|
||||
def _test_reconstructor_sync_job(self, frag_responses):
|
||||
# Helper method to mock reconstructor to consume given lists of fake
|
||||
# responses while reconstructing a fragment for a sync type job. The
|
||||
# tests verify that when the reconstructed fragment iter fails in some
|
||||
# way then ssync does not mistakenly create fragments on the receiving
|
||||
# node which have incorrect data.
|
||||
# See https://bugs.launchpad.net/swift/+bug/1631144
|
||||
|
||||
# frag_responses is a list of two lists of responses to each
|
||||
# reconstructor GET request for a fragment archive. The two items in
|
||||
# the outer list are lists of responses for each of the two fragments
|
||||
# to be reconstructed. Items in the inner lists are responses for each
|
||||
# of the other fragments fetched during the reconstructor rebuild.
|
||||
path_to_responses = {}
|
||||
fake_get_response_calls = []
|
||||
|
||||
def fake_get_response(recon, node, part, path, headers, policy):
|
||||
# select a list of fake responses for this path and return the next
|
||||
# from the list
|
||||
if path not in path_to_responses:
|
||||
path_to_responses[path] = frag_responses.pop(0)
|
||||
response = path_to_responses[path].pop()
|
||||
fake_get_response_calls.append(path)
|
||||
return response
|
||||
|
||||
def fake_get_part_nodes(part):
|
||||
# the reconstructor will try to remove the receiver node from the
|
||||
# object ring part nodes, but the fake node we created for our
|
||||
# receiver is not actually in the ring part nodes, so append it
|
||||
# here simply so that the reconstructor does not fail to remove it.
|
||||
return (self.policy.object_ring._get_part_nodes(part) +
|
||||
[self.job_node])
|
||||
|
||||
def fake_reconstruct(self, policy, fragment_payload, frag_index):
|
||||
# fake EC reconstruction by returning first frag, which is ok
|
||||
# because all frags would be same length
|
||||
return fragment_payload[0]
|
||||
|
||||
with mock.patch(
|
||||
'swift.obj.reconstructor.ObjectReconstructor._get_response',
|
||||
fake_get_response):
|
||||
with mock.patch(
|
||||
'swift.obj.reconstructor.ObjectReconstructor._reconstruct',
|
||||
fake_reconstruct):
|
||||
with mock.patch.object(
|
||||
self.policy.object_ring, 'get_part_nodes',
|
||||
fake_get_part_nodes):
|
||||
self.reconstructor = ObjectReconstructor(
|
||||
{}, logger=debug_logger('test_reconstructor'))
|
||||
job = {
|
||||
'device': self.device,
|
||||
'partition': self.partition,
|
||||
'policy': self.policy,
|
||||
'sync_diskfile_builder':
|
||||
self.reconstructor.reconstruct_fa
|
||||
}
|
||||
sender = ssync_sender.Sender(
|
||||
self.daemon, self.job_node, job, self.suffixes)
|
||||
sender.connect, trace = self.make_connect_wrapper(sender)
|
||||
sender()
|
||||
return trace
|
||||
|
||||
def test_sync_reconstructor_partial_rebuild(self):
|
||||
# First fragment to sync gets partial content from reconstructor.
|
||||
# Expect ssync job to exit early with no file written on receiver.
|
||||
frag_responses = [
|
||||
[FakeResponse(i, 'x' * (self.frag_length - 1))
|
||||
for i in range(self.policy.ec_ndata + self.policy.ec_nparity)],
|
||||
[FakeResponse(i, 'y' * self.frag_length)
|
||||
for i in range(self.policy.ec_ndata + self.policy.ec_nparity)]]
|
||||
|
||||
self._test_reconstructor_sync_job(frag_responses)
|
||||
msgs = []
|
||||
for obj_name in ('o1', 'o2'):
|
||||
try:
|
||||
df = self._open_rx_diskfile(
|
||||
obj_name, self.policy, self.rx_node_index)
|
||||
msgs.append('Unexpected rx diskfile for %r with content %r' %
|
||||
(obj_name, ''.join([d for d in df.reader()])))
|
||||
except DiskFileNotExist:
|
||||
pass # expected outcome
|
||||
if msgs:
|
||||
self.fail('Failed with:\n%s' % '\n'.join(msgs))
|
||||
log_lines = self.daemon.logger.get_lines_for_level('error')
|
||||
self.assertIn('Sent data length does not match content-length',
|
||||
log_lines[0])
|
||||
self.assertFalse(log_lines[1:])
|
||||
# trampoline for the receiver to write a log
|
||||
eventlet.sleep(0)
|
||||
log_lines = self.rx_logger.get_lines_for_level('warning')
|
||||
self.assertIn('ssync subrequest failed with 499',
|
||||
log_lines[0])
|
||||
self.assertFalse(log_lines[1:])
|
||||
self.assertFalse(self.rx_logger.get_lines_for_level('error'))
|
||||
|
||||
def test_sync_reconstructor_no_rebuilt_content(self):
|
||||
# First fragment to sync gets no content in any response to
|
||||
# reconstructor. Expect ssync job to exit early with no file written on
|
||||
# receiver.
|
||||
frag_responses = [
|
||||
[FakeResponse(i, '')
|
||||
for i in range(self.policy.ec_ndata + self.policy.ec_nparity)],
|
||||
[FakeResponse(i, 'y' * self.frag_length)
|
||||
for i in range(self.policy.ec_ndata + self.policy.ec_nparity)]]
|
||||
|
||||
self._test_reconstructor_sync_job(frag_responses)
|
||||
msgs = []
|
||||
for obj_name in ('o1', 'o2'):
|
||||
try:
|
||||
df = self._open_rx_diskfile(
|
||||
obj_name, self.policy, self.rx_node_index)
|
||||
msgs.append('Unexpected rx diskfile for %r with content %r' %
|
||||
(obj_name, ''.join([d for d in df.reader()])))
|
||||
except DiskFileNotExist:
|
||||
pass # expected outcome
|
||||
if msgs:
|
||||
self.fail('Failed with:\n%s' % '\n'.join(msgs))
|
||||
log_lines = self.daemon.logger.get_lines_for_level('error')
|
||||
self.assertIn('Sent data length does not match content-length',
|
||||
log_lines[0])
|
||||
self.assertFalse(log_lines[1:])
|
||||
# trampoline for the receiver to write a log
|
||||
eventlet.sleep(0)
|
||||
log_lines = self.rx_logger.get_lines_for_level('warning')
|
||||
self.assertIn('ssync subrequest failed with 499',
|
||||
log_lines[0])
|
||||
self.assertFalse(log_lines[1:])
|
||||
self.assertFalse(self.rx_logger.get_lines_for_level('error'))
|
||||
|
||||
def test_sync_reconstructor_exception_during_rebuild(self):
|
||||
# First fragment to sync has some reconstructor get responses raise
|
||||
# exception while rebuilding. Expect ssync job to exit early with no
|
||||
# files written on receiver.
|
||||
frag_responses = [
|
||||
# ec_ndata responses are ok, but one of these will be ignored as
|
||||
# it is for the frag index being rebuilt
|
||||
[FakeResponse(i, 'x' * self.frag_length)
|
||||
for i in range(self.policy.ec_ndata)] +
|
||||
# ec_nparity responses will raise an Exception - at least one of
|
||||
# these will be used during rebuild
|
||||
[FakeResponse(i, Exception('raised in response read method'))
|
||||
for i in range(self.policy.ec_ndata,
|
||||
self.policy.ec_ndata + self.policy.ec_nparity)],
|
||||
# second set of response are all good
|
||||
[FakeResponse(i, 'y' * self.frag_length)
|
||||
for i in range(self.policy.ec_ndata + self.policy.ec_nparity)]]
|
||||
|
||||
self._test_reconstructor_sync_job(frag_responses)
|
||||
msgs = []
|
||||
for obj_name in ('o1', 'o2'):
|
||||
try:
|
||||
df = self._open_rx_diskfile(
|
||||
obj_name, self.policy, self.rx_node_index)
|
||||
msgs.append('Unexpected rx diskfile for %r with content %r' %
|
||||
(obj_name, ''.join([d for d in df.reader()])))
|
||||
except DiskFileNotExist:
|
||||
pass # expected outcome
|
||||
if msgs:
|
||||
self.fail('Failed with:\n%s' % '\n'.join(msgs))
|
||||
|
||||
log_lines = self.reconstructor.logger.get_lines_for_level('error')
|
||||
self.assertIn('Error trying to rebuild', log_lines[0])
|
||||
log_lines = self.daemon.logger.get_lines_for_level('error')
|
||||
self.assertIn('Sent data length does not match content-length',
|
||||
log_lines[0])
|
||||
self.assertFalse(log_lines[1:])
|
||||
# trampoline for the receiver to write a log
|
||||
eventlet.sleep(0)
|
||||
log_lines = self.rx_logger.get_lines_for_level('warning')
|
||||
self.assertIn('ssync subrequest failed with 499',
|
||||
log_lines[0])
|
||||
self.assertFalse(log_lines[1:])
|
||||
self.assertFalse(self.rx_logger.get_lines_for_level('error'))
|
||||
|
||||
def test_sync_reconstructor_no_responses(self):
|
||||
# First fragment to sync gets no responses for reconstructor to rebuild
|
||||
# with, nothing is sent to receiver so expect to skip that fragment and
|
||||
# continue with second.
|
||||
frag_responses = [
|
||||
[None
|
||||
for i in range(self.policy.ec_ndata + self.policy.ec_nparity)],
|
||||
[FakeResponse(i, 'y' * self.frag_length)
|
||||
for i in range(self.policy.ec_ndata + self.policy.ec_nparity)]]
|
||||
|
||||
trace = self._test_reconstructor_sync_job(frag_responses)
|
||||
results = self._analyze_trace(trace)
|
||||
self.assertEqual(2, len(results['tx_missing']))
|
||||
self.assertEqual(2, len(results['rx_missing']))
|
||||
self.assertEqual(1, len(results['tx_updates']))
|
||||
self.assertFalse(results['rx_updates'])
|
||||
self.assertEqual('PUT', results['tx_updates'][0].get('method'))
|
||||
synced_obj_name = results['tx_updates'][0].get('path')[-2:]
|
||||
|
||||
msgs = []
|
||||
obj_name = synced_obj_name
|
||||
try:
|
||||
df = self._open_rx_diskfile(
|
||||
obj_name, self.policy, self.rx_node_index)
|
||||
self.assertEqual('y' * self.frag_length,
|
||||
''.join([d for d in df.reader()]))
|
||||
except DiskFileNotExist:
|
||||
msgs.append('Missing rx diskfile for %r' % obj_name)
|
||||
|
||||
obj_names = list(self.tx_objs)
|
||||
obj_names.remove(synced_obj_name)
|
||||
obj_name = obj_names[0]
|
||||
try:
|
||||
df = self._open_rx_diskfile(
|
||||
obj_name, self.policy, self.rx_node_index)
|
||||
msgs.append('Unexpected rx diskfile for %r with content %r' %
|
||||
(obj_name, ''.join([d for d in df.reader()])))
|
||||
except DiskFileNotExist:
|
||||
pass # expected outcome
|
||||
if msgs:
|
||||
self.fail('Failed with:\n%s' % '\n'.join(msgs))
|
||||
self.assertFalse(self.daemon.logger.get_lines_for_level('error'))
|
||||
log_lines = self.reconstructor.logger.get_lines_for_level('error')
|
||||
self.assertIn('Unable to get enough responses', log_lines[0])
|
||||
# trampoline for the receiver to write a log
|
||||
eventlet.sleep(0)
|
||||
self.assertFalse(self.rx_logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(self.rx_logger.get_lines_for_level('error'))
|
||||
|
||||
def test_sync_reconstructor_rebuild_ok(self):
|
||||
# Sanity test for this class of tests. Both fragments get a full
|
||||
# complement of responses and rebuild correctly.
|
||||
frag_responses = [
|
||||
[FakeResponse(i, 'x' * self.frag_length)
|
||||
for i in range(self.policy.ec_ndata + self.policy.ec_nparity)],
|
||||
[FakeResponse(i, 'y' * self.frag_length)
|
||||
for i in range(self.policy.ec_ndata + self.policy.ec_nparity)]]
|
||||
|
||||
trace = self._test_reconstructor_sync_job(frag_responses)
|
||||
results = self._analyze_trace(trace)
|
||||
self.assertEqual(2, len(results['tx_missing']))
|
||||
self.assertEqual(2, len(results['rx_missing']))
|
||||
self.assertEqual(2, len(results['tx_updates']))
|
||||
self.assertFalse(results['rx_updates'])
|
||||
msgs = []
|
||||
rx_frags = []
|
||||
for obj_name in self.tx_objs:
|
||||
try:
|
||||
df = self._open_rx_diskfile(
|
||||
obj_name, self.policy, self.rx_node_index)
|
||||
rx_frags.append(''.join([d for d in df.reader()]))
|
||||
except DiskFileNotExist:
|
||||
msgs.append('Missing rx diskfile for %r' % obj_name)
|
||||
if msgs:
|
||||
self.fail('Failed with:\n%s' % '\n'.join(msgs))
|
||||
self.assertIn('x' * self.frag_length, rx_frags)
|
||||
self.assertIn('y' * self.frag_length, rx_frags)
|
||||
self.assertFalse(rx_frags[2:])
|
||||
self.assertFalse(self.daemon.logger.get_lines_for_level('error'))
|
||||
self.assertFalse(
|
||||
self.reconstructor.logger.get_lines_for_level('error'))
|
||||
# trampoline for the receiver to write a log
|
||||
eventlet.sleep(0)
|
||||
self.assertFalse(self.rx_logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(self.rx_logger.get_lines_for_level('error'))
|
||||
|
||||
|
||||
@patch_policies
|
||||
class TestSsyncReplication(TestBaseSsync):
|
||||
def test_sync(self):
|
||||
|
@ -445,7 +445,7 @@ class TestReceiver(unittest.TestCase):
|
||||
mock_wsgi_input.mock_socket)
|
||||
mock_wsgi_input.mock_socket.close.assert_called_once_with()
|
||||
self.controller.logger.exception.assert_called_once_with(
|
||||
'1.2.3.4/device/partition EXCEPTION in replication.Receiver')
|
||||
'1.2.3.4/device/partition EXCEPTION in ssync.Receiver')
|
||||
|
||||
def test_SSYNC_Exception_Exception(self):
|
||||
|
||||
@ -481,7 +481,7 @@ class TestReceiver(unittest.TestCase):
|
||||
mock_wsgi_input.mock_socket)
|
||||
mock_wsgi_input.mock_socket.close.assert_called_once_with()
|
||||
self.controller.logger.exception.assert_called_once_with(
|
||||
'EXCEPTION in replication.Receiver')
|
||||
'EXCEPTION in ssync.Receiver')
|
||||
|
||||
def test_MISSING_CHECK_timeout(self):
|
||||
|
||||
@ -522,7 +522,7 @@ class TestReceiver(unittest.TestCase):
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertTrue(mock_shutdown_safe.called)
|
||||
self.controller.logger.error.assert_called_once_with(
|
||||
'2.3.4.5/sda1/1 TIMEOUT in replication.Receiver: '
|
||||
'2.3.4.5/sda1/1 TIMEOUT in ssync.Receiver: '
|
||||
'0.01 seconds: missing_check line')
|
||||
|
||||
def test_MISSING_CHECK_other_exception(self):
|
||||
@ -564,7 +564,7 @@ class TestReceiver(unittest.TestCase):
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertTrue(mock_shutdown_safe.called)
|
||||
self.controller.logger.exception.assert_called_once_with(
|
||||
'3.4.5.6/sda1/1 EXCEPTION in replication.Receiver')
|
||||
'3.4.5.6/sda1/1 EXCEPTION in ssync.Receiver')
|
||||
|
||||
def test_MISSING_CHECK_empty_list(self):
|
||||
|
||||
@ -773,7 +773,7 @@ class TestReceiver(unittest.TestCase):
|
||||
self.assertFalse(self.controller.logger.error.called)
|
||||
self.assertTrue(self.controller.logger.exception.called)
|
||||
self.assertIn(
|
||||
'EXCEPTION in replication.Receiver while attempting commit of',
|
||||
'EXCEPTION in ssync.Receiver while attempting commit of',
|
||||
self.controller.logger.exception.call_args[0][0])
|
||||
|
||||
def test_MISSING_CHECK_storage_policy(self):
|
||||
@ -970,7 +970,7 @@ class TestReceiver(unittest.TestCase):
|
||||
mock_wsgi_input.mock_socket)
|
||||
mock_wsgi_input.mock_socket.close.assert_called_once_with()
|
||||
self.controller.logger.error.assert_called_once_with(
|
||||
'2.3.4.5/device/partition TIMEOUT in replication.Receiver: '
|
||||
'2.3.4.5/device/partition TIMEOUT in ssync.Receiver: '
|
||||
'0.01 seconds: updates line')
|
||||
|
||||
def test_UPDATES_other_exception(self):
|
||||
@ -1017,7 +1017,7 @@ class TestReceiver(unittest.TestCase):
|
||||
mock_wsgi_input.mock_socket)
|
||||
mock_wsgi_input.mock_socket.close.assert_called_once_with()
|
||||
self.controller.logger.exception.assert_called_once_with(
|
||||
'3.4.5.6/device/partition EXCEPTION in replication.Receiver')
|
||||
'3.4.5.6/device/partition EXCEPTION in ssync.Receiver')
|
||||
|
||||
def test_UPDATES_no_problems_no_hard_disconnect(self):
|
||||
|
||||
@ -1071,7 +1071,7 @@ class TestReceiver(unittest.TestCase):
|
||||
":ERROR: 0 'need more than 1 value to unpack'"])
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.controller.logger.exception.assert_called_once_with(
|
||||
'None/device/partition EXCEPTION in replication.Receiver')
|
||||
'None/device/partition EXCEPTION in ssync.Receiver')
|
||||
|
||||
with mock.patch.object(
|
||||
self.controller, 'DELETE',
|
||||
@ -1093,7 +1093,7 @@ class TestReceiver(unittest.TestCase):
|
||||
":ERROR: 0 'need more than 1 value to unpack'"])
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.controller.logger.exception.assert_called_once_with(
|
||||
'None/device/partition EXCEPTION in replication.Receiver')
|
||||
'None/device/partition EXCEPTION in ssync.Receiver')
|
||||
|
||||
def test_UPDATES_no_headers(self):
|
||||
self.controller.logger = mock.MagicMock()
|
||||
@ -1110,7 +1110,7 @@ class TestReceiver(unittest.TestCase):
|
||||
":ERROR: 0 'Got no headers for DELETE /a/c/o'"])
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.controller.logger.exception.assert_called_once_with(
|
||||
'None/device/partition EXCEPTION in replication.Receiver')
|
||||
'None/device/partition EXCEPTION in ssync.Receiver')
|
||||
|
||||
def test_UPDATES_bad_headers(self):
|
||||
self.controller.logger = mock.MagicMock()
|
||||
@ -1128,7 +1128,7 @@ class TestReceiver(unittest.TestCase):
|
||||
":ERROR: 0 'need more than 1 value to unpack'"])
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.controller.logger.exception.assert_called_once_with(
|
||||
'None/device/partition EXCEPTION in replication.Receiver')
|
||||
'None/device/partition EXCEPTION in ssync.Receiver')
|
||||
|
||||
self.controller.logger = mock.MagicMock()
|
||||
req = swob.Request.blank(
|
||||
@ -1146,7 +1146,7 @@ class TestReceiver(unittest.TestCase):
|
||||
":ERROR: 0 'need more than 1 value to unpack'"])
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.controller.logger.exception.assert_called_once_with(
|
||||
'None/device/partition EXCEPTION in replication.Receiver')
|
||||
'None/device/partition EXCEPTION in ssync.Receiver')
|
||||
|
||||
def test_UPDATES_bad_content_length(self):
|
||||
self.controller.logger = mock.MagicMock()
|
||||
@ -1164,7 +1164,7 @@ class TestReceiver(unittest.TestCase):
|
||||
':ERROR: 0 "invalid literal for int() with base 10: \'a\'"'])
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.controller.logger.exception.assert_called_once_with(
|
||||
'None/device/partition EXCEPTION in replication.Receiver')
|
||||
'None/device/partition EXCEPTION in ssync.Receiver')
|
||||
|
||||
def test_UPDATES_content_length_with_DELETE(self):
|
||||
self.controller.logger = mock.MagicMock()
|
||||
@ -1182,7 +1182,7 @@ class TestReceiver(unittest.TestCase):
|
||||
":ERROR: 0 'DELETE subrequest with content-length /a/c/o'"])
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.controller.logger.exception.assert_called_once_with(
|
||||
'None/device/partition EXCEPTION in replication.Receiver')
|
||||
'None/device/partition EXCEPTION in ssync.Receiver')
|
||||
|
||||
def test_UPDATES_no_content_length_with_PUT(self):
|
||||
self.controller.logger = mock.MagicMock()
|
||||
@ -1199,7 +1199,7 @@ class TestReceiver(unittest.TestCase):
|
||||
":ERROR: 0 'No content-length sent for PUT /a/c/o'"])
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.controller.logger.exception.assert_called_once_with(
|
||||
'None/device/partition EXCEPTION in replication.Receiver')
|
||||
'None/device/partition EXCEPTION in ssync.Receiver')
|
||||
|
||||
def test_UPDATES_early_termination(self):
|
||||
self.controller.logger = mock.MagicMock()
|
||||
@ -1217,7 +1217,7 @@ class TestReceiver(unittest.TestCase):
|
||||
":ERROR: 0 'Early termination for PUT /a/c/o'"])
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.controller.logger.exception.assert_called_once_with(
|
||||
'None/device/partition EXCEPTION in replication.Receiver')
|
||||
'None/device/partition EXCEPTION in ssync.Receiver')
|
||||
|
||||
def test_UPDATES_failures(self):
|
||||
|
||||
@ -1250,6 +1250,9 @@ class TestReceiver(unittest.TestCase):
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertFalse(self.controller.logger.exception.called)
|
||||
self.assertFalse(self.controller.logger.error.called)
|
||||
self.assertTrue(self.controller.logger.warning.called)
|
||||
self.assertEqual(3, self.controller.logger.warning.call_count)
|
||||
self.controller.logger.clear()
|
||||
|
||||
# failures hit threshold and no successes, so ratio is like infinity
|
||||
with mock.patch.object(self.controller, 'DELETE', _DELETE):
|
||||
@ -1274,8 +1277,11 @@ class TestReceiver(unittest.TestCase):
|
||||
":ERROR: 0 'Too many 4 failures to 0 successes'"])
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.controller.logger.exception.assert_called_once_with(
|
||||
'None/device/partition EXCEPTION in replication.Receiver')
|
||||
'None/device/partition EXCEPTION in ssync.Receiver')
|
||||
self.assertFalse(self.controller.logger.error.called)
|
||||
self.assertTrue(self.controller.logger.warning.called)
|
||||
self.assertEqual(4, self.controller.logger.warning.call_count)
|
||||
self.controller.logger.clear()
|
||||
|
||||
# failures hit threshold and ratio hits 1.33333333333
|
||||
with mock.patch.object(self.controller, 'DELETE', _DELETE):
|
||||
@ -1304,6 +1310,9 @@ class TestReceiver(unittest.TestCase):
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertFalse(self.controller.logger.exception.called)
|
||||
self.assertFalse(self.controller.logger.error.called)
|
||||
self.assertTrue(self.controller.logger.warning.called)
|
||||
self.assertEqual(4, self.controller.logger.warning.call_count)
|
||||
self.controller.logger.clear()
|
||||
|
||||
# failures hit threshold and ratio hits 2.0
|
||||
with mock.patch.object(self.controller, 'DELETE', _DELETE):
|
||||
@ -1329,8 +1338,11 @@ class TestReceiver(unittest.TestCase):
|
||||
":ERROR: 0 'Too many 4 failures to 2 successes'"])
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.controller.logger.exception.assert_called_once_with(
|
||||
'None/device/partition EXCEPTION in replication.Receiver')
|
||||
'None/device/partition EXCEPTION in ssync.Receiver')
|
||||
self.assertFalse(self.controller.logger.error.called)
|
||||
self.assertTrue(self.controller.logger.warning.called)
|
||||
self.assertEqual(4, self.controller.logger.warning.call_count)
|
||||
self.controller.logger.clear()
|
||||
|
||||
def test_UPDATES_PUT(self):
|
||||
_PUT_request = [None]
|
||||
@ -1658,7 +1670,7 @@ class TestReceiver(unittest.TestCase):
|
||||
":ERROR: 0 'Invalid subrequest method BONK'"])
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.controller.logger.exception.assert_called_once_with(
|
||||
'None/device/partition EXCEPTION in replication.Receiver')
|
||||
'None/device/partition EXCEPTION in ssync.Receiver')
|
||||
self.assertEqual(len(_BONK_request), 1) # sanity
|
||||
self.assertEqual(_BONK_request[0], None)
|
||||
|
||||
@ -1885,6 +1897,8 @@ class TestReceiver(unittest.TestCase):
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertFalse(self.controller.logger.exception.called)
|
||||
self.assertFalse(self.controller.logger.error.called)
|
||||
self.assertTrue(self.controller.logger.warning.called)
|
||||
self.assertEqual(2, self.controller.logger.warning.call_count)
|
||||
self.assertEqual(len(_requests), 2) # sanity
|
||||
req = _requests.pop(0)
|
||||
self.assertEqual(req.path, '/device/partition/a/c/o1')
|
||||
|
@ -143,7 +143,7 @@ class TestSender(BaseTest):
|
||||
error_lines = self.daemon.logger.get_lines_for_level('error')
|
||||
for line in error_lines:
|
||||
self.assertTrue(line.startswith(
|
||||
'1.2.3.4:5678/sda1/9 EXCEPTION in replication.Sender:'))
|
||||
'1.2.3.4:5678/sda1/9 EXCEPTION in ssync.Sender:'))
|
||||
|
||||
def test_call_catches_exception_handling_exception(self):
|
||||
job = node = None # Will cause inside exception handler to fail
|
||||
@ -156,7 +156,7 @@ class TestSender(BaseTest):
|
||||
error_lines = self.daemon.logger.get_lines_for_level('error')
|
||||
for line in error_lines:
|
||||
self.assertTrue(line.startswith(
|
||||
'EXCEPTION in replication.Sender'))
|
||||
'EXCEPTION in ssync.Sender'))
|
||||
|
||||
def test_call_calls_others(self):
|
||||
self.sender.suffixes = ['abc']
|
||||
@ -461,7 +461,10 @@ class TestSender(BaseTest):
|
||||
))
|
||||
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
|
||||
self.sender.connect = mock.MagicMock()
|
||||
self.sender.df_mgr.get_diskfile_from_hash = mock.MagicMock()
|
||||
df = mock.MagicMock()
|
||||
df.content_length = 0
|
||||
self.sender.df_mgr.get_diskfile_from_hash = mock.MagicMock(
|
||||
return_value=df)
|
||||
self.sender.disconnect = mock.MagicMock()
|
||||
success, candidates = self.sender()
|
||||
self.assertTrue(success)
|
||||
|
Loading…
Reference in New Issue
Block a user