Merge "Cleanup EC and SSYNC frag index parameters"

This commit is contained in:
Zuul 2019-02-06 19:59:09 +00:00 committed by Gerrit Code Review
commit 4a276cd111
8 changed files with 195 additions and 86 deletions

View File

@ -369,14 +369,13 @@ class ObjectReconstructor(Daemon):
:returns: a DiskFile like class for use by ssync
:raises DiskFileError: if the fragment archive cannot be reconstructed
"""
part_nodes = job['policy'].object_ring.get_part_nodes(
job['partition'])
part_nodes.remove(node)
# don't try and fetch a fragment from the node we're rebuilding to
part_nodes = [n for n in job['policy'].object_ring.get_part_nodes(
job['partition']) if n['id'] != node['id']]
# the fragment index we need to reconstruct is the position index
# of the node we're rebuilding to within the primary part list
fi_to_rebuild = job['policy'].get_backend_index(node['index'])
fi_to_rebuild = node['backend_index']
# KISS send out connection requests to all nodes, see what sticks.
# Use fragment preferences header to tell other nodes that we want
@ -829,6 +828,8 @@ class ObjectReconstructor(Daemon):
syncd_with += 1
continue
node['backend_index'] = job['policy'].get_backend_index(
node['index'])
# ssync any out-of-sync suffixes with the remote node
success, _ = ssync_sender(
self, node, job, suffixes)()
@ -850,6 +851,8 @@ class ObjectReconstructor(Daemon):
syncd_with = 0
reverted_objs = {}
for node in job['sync_to']:
node['backend_index'] = job['policy'].get_backend_index(
node['index'])
success, in_sync_objs = ssync_sender(
self, node, job, job['suffixes'])()
if success:

View File

@ -759,9 +759,8 @@ class ObjectController(BaseStorageServer):
except ValueError as e:
raise HTTPBadRequest(body=str(e), request=request,
content_type='text/plain')
# SSYNC will include Frag-Index header for subrequests to primary
# nodes; handoff nodes should 409 subrequests to over-write an
# existing data fragment until they offloaded the existing fragment
# SSYNC will include Frag-Index header for subrequests, in which case
# get_diskfile will ignore non-matching on-disk data files
frag_index = request.headers.get('X-Backend-Ssync-Frag-Index')
next_part_power = request.headers.get('X-Backend-Next-Part-Power')
try:

View File

@ -219,7 +219,7 @@ class Receiver(object):
self.device, self.partition, self.policy = \
request_helpers.get_name_and_placement(self.request, 2, 2, False)
self.frag_index = self.node_index = None
self.frag_index = None
if self.request.headers.get('X-Backend-Ssync-Frag-Index'):
try:
self.frag_index = int(
@ -228,19 +228,6 @@ class Receiver(object):
raise swob.HTTPBadRequest(
'Invalid X-Backend-Ssync-Frag-Index %r' %
self.request.headers['X-Backend-Ssync-Frag-Index'])
if self.request.headers.get('X-Backend-Ssync-Node-Index'):
try:
self.node_index = int(
self.request.headers['X-Backend-Ssync-Node-Index'])
except ValueError:
raise swob.HTTPBadRequest(
'Invalid X-Backend-Ssync-Node-Index %r' %
self.request.headers['X-Backend-Ssync-Node-Index'])
if self.node_index != self.frag_index:
# a primary node should only receive it's own fragments
raise swob.HTTPBadRequest(
'Frag-Index (%s) != Node-Index (%s)' % (
self.frag_index, self.node_index))
utils.validate_device_partition(self.device, self.partition)
self.diskfile_mgr = self.app._diskfile_router[self.policy]
if not self.diskfile_mgr.get_dev_path(self.device):
@ -476,9 +463,9 @@ class Receiver(object):
raise Exception('Invalid subrequest method %s' % method)
subreq.headers['X-Backend-Storage-Policy-Index'] = int(self.policy)
subreq.headers['X-Backend-Replication'] = 'True'
if self.node_index is not None:
if self.frag_index is not None:
# primary node should not 409 if it has a non-primary fragment
subreq.headers['X-Backend-Ssync-Frag-Index'] = self.node_index
subreq.headers['X-Backend-Ssync-Frag-Index'] = self.frag_index
if replication_headers:
subreq.headers['X-Backend-Replication-Headers'] = \
' '.join(replication_headers)

View File

@ -231,21 +231,14 @@ class Sender(object):
connection.putheader('Transfer-Encoding', 'chunked')
connection.putheader('X-Backend-Storage-Policy-Index',
int(self.job['policy']))
# a sync job must use the node's index for the frag_index of the
# rebuilt fragments instead of the frag_index from the job which
# will be rebuilding them
frag_index = self.node.get('index', self.job.get('frag_index'))
if frag_index is None:
# replication jobs will not have a frag_index key;
# reconstructor jobs with only tombstones will have a
# frag_index key explicitly set to the value of None - in both
# cases on the wire we write the empty string which
# ssync_receiver will translate to None
frag_index = ''
connection.putheader('X-Backend-Ssync-Frag-Index', frag_index)
# a revert job to a handoff will not have a node index
connection.putheader('X-Backend-Ssync-Node-Index',
self.node.get('index', ''))
# a sync job must use the node's backend_index for the frag_index
# of the rebuilt fragments instead of the frag_index from the job
# which will be rebuilding them
frag_index = self.node.get('backend_index')
if frag_index is not None:
connection.putheader('X-Backend-Ssync-Frag-Index', frag_index)
# Node-Index header is for backwards compat 2.4.0-2.20.0
connection.putheader('X-Backend-Ssync-Node-Index', frag_index)
connection.endheaders()
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'connect receive'):

View File

@ -3628,6 +3628,92 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
set(c['suffixes']),
) for c in ssync_calls))
def test_sync_duplicates_to_remote_region(self):
partition = 0
part_nodes = self.policy.object_ring.get_part_nodes(partition)
# in the non-duplicate case we just pick a random node
local_dev = random.choice(part_nodes[-14:])
frag_index = self.policy.get_backend_index(local_dev['index'])
sync_to = object_reconstructor._get_partners(
local_dev['index'], part_nodes)
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy),
str(partition))
# setup left and right hashes
stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'},
'abc': {frag_index: 'hash', None: 'hash'},
}
# left hand side is in sync
left_frag_index = self.policy.get_backend_index(sync_to[0]['index'])
left_hashes = {
'123': {left_frag_index: 'hash', None: 'hash'},
'abc': {left_frag_index: 'hash', None: 'hash'},
}
# right hand side needs sync
right_frag_index = self.policy.get_backend_index(sync_to[1]['index'])
right_hashes = {
'123': {right_frag_index: 'hash', None: 'hash'},
'abc': {right_frag_index: 'hashX', None: 'hash'},
}
job = {
'job_type': object_reconstructor.SYNC,
'frag_index': frag_index,
'suffixes': stub_hashes.keys(),
'sync_to': sync_to,
'partition': partition,
'path': part_path,
'hashes': stub_hashes,
'policy': self.policy,
'local_dev': self.local_dev,
'device': self.local_dev['device'],
}
responses = [
(200, pickle.dumps(left_hashes)),
(200, pickle.dumps(right_hashes)),
(200, pickle.dumps(right_hashes)),
]
codes, body_iter = zip(*responses)
# we're going to dip our mocks into the ssync layer a bit
ssync_resp = mock.MagicMock()
ssync_resp.status = 200
ssync_resp.readline.side_effect = [
':MISSING_CHECK: START',
':MISSING_CHECK: END',
':UPDATES: START',
':UPDATES: END',
]
ssync_headers = []
def capture_headers(name, value):
ssync_headers.append((name, value))
ssync_conn = mock.MagicMock()
ssync_conn.getresponse.return_value = ssync_resp
ssync_conn.putheader = capture_headers
with mock.patch('swift.obj.ssync_sender.SsyncBufferedHTTPConnection',
return_value=ssync_conn), \
mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes)), \
mock.patch('swift.obj.diskfile.ECDiskFileManager.yield_hashes',
return_value=iter([])), \
mocked_http_conn(*codes, body_iter=body_iter):
self.reconstructor.process_job(job)
# ... to make sure it sets up our headers correctly
self.assertEqual(ssync_headers, [
('Transfer-Encoding', 'chunked'),
('X-Backend-Storage-Policy-Index', 0),
('X-Backend-Ssync-Frag-Index', right_frag_index),
# we include this for backwards compat
('X-Backend-Ssync-Node-Index', right_frag_index),
])
def test_process_job_sync_missing_durable(self):
partition = 0
part_nodes = self.policy.object_ring.get_part_nodes(partition)
@ -4101,9 +4187,10 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
self.assertEqual(self.reconstructor.handoffs_remaining, 0)
def test_process_job_revert_cleanup_tombstone(self):
sync_to = [random.choice([n for n in self.policy.object_ring.devs
if n != self.local_dev])]
partition = 0
sync_to = [random.choice([
n for n in self.policy.object_ring.get_part_nodes(partition)
if n['id'] != self.local_dev['id']])]
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy),
@ -4205,6 +4292,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -4268,6 +4356,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[4]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -4304,6 +4393,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[4]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -4349,6 +4439,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[-4]
node['backend_index'] = self.policy.get_backend_index(node['index'])
# make up some data (trim some amount to make it unaligned with
# segment size)
@ -4385,6 +4476,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
policy = self.policy
possible_errors = [Timeout(), Exception('kaboom!')]
@ -4414,6 +4506,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
policy = self.policy
codes = [404 for i in range(policy.object_ring.replicas - 1)]
@ -4438,6 +4531,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -4488,6 +4582,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -4542,6 +4637,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -4603,6 +4699,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
ec_archive_dict = dict()
@ -4677,7 +4774,9 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
broken_node = random.randint(0, self.policy.ec_ndata - 1)
broken_index = random.randint(0, self.policy.ec_ndata - 1)
node = part_nodes[broken_index]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -4685,7 +4784,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
# instead of popping the broken body, we'll just leave it in the list
# of responses and take away something else.
broken_body = ec_archive_bodies[broken_node]
broken_body = ec_archive_bodies[broken_index]
ec_archive_bodies = ec_archive_bodies[:-1]
def make_header(body):
@ -4698,7 +4797,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, part_nodes[broken_node], self.obj_metadata)
job, node, self.obj_metadata)
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
@ -4711,7 +4810,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
debug_log_lines = self.logger.get_lines_for_level('debug')
# redundant frag found once in first ec_ndata responses
self.assertIn(
'Found existing frag #%s at' % broken_node,
'Found existing frag #%s at' % broken_index,
debug_log_lines[0])
# N.B. in the future, we could avoid those check because
@ -4722,12 +4821,12 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
# liberasurecode[1].
# 1: https://github.com/openstack/liberasurecode/blob/
# master/src/erasurecode.c#L870
log_prefix = 'Reconstruct frag #%s with frag indexes' % broken_node
log_prefix = 'Reconstruct frag #%s with frag indexes' % broken_index
self.assertIn(log_prefix, debug_log_lines[1])
self.assertFalse(debug_log_lines[2:])
got_frag_index_list = json.loads(
debug_log_lines[1][len(log_prefix):])
self.assertNotIn(broken_node, got_frag_index_list)
self.assertNotIn(broken_index, got_frag_index_list)
def test_reconstruct_fa_finds_duplicate_does_not_fail(self):
job = {
@ -4736,6 +4835,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -4785,6 +4885,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -4854,6 +4955,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -4928,6 +5030,7 @@ class TestObjectReconstructorECDuplicationFactor(TestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[index]
node['backend_index'] = self.policy.get_backend_index(node['index'])
metadata = {
'name': '/a/c/o',
'Content-Length': 0,

View File

@ -312,6 +312,7 @@ class TestBaseSsyncEC(TestBaseSsync):
self.policy = POLICIES.default
self.logger = debug_logger('test-ssync-sender')
self.daemon = ObjectReconstructor(self.daemon_conf, self.logger)
self.rx_node['backend_index'] = 0
def _get_object_data(self, path, frag_index=None, **kwargs):
# return a frag archive for given object name and frag index.
@ -378,7 +379,6 @@ class TestSsyncEC(TestBaseSsyncEC):
'policy': policy,
'frag_index': frag_index}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
@ -485,7 +485,6 @@ class TestSsyncEC(TestBaseSsyncEC):
'policy': policy,
'frag_index': frag_index}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
@ -583,7 +582,6 @@ class TestSsyncEC(TestBaseSsyncEC):
'frag_index': frag_index,
'sync_diskfile_builder': fake_reconstruct_fa}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
@ -667,10 +665,11 @@ class TestSsyncEC(TestBaseSsyncEC):
def test_send_invalid_frag_index(self):
policy = POLICIES.default
job = {'frag_index': 'Not a number',
job = {'frag_index': 'No one cares',
'device': self.device,
'partition': self.partition,
'policy': policy}
self.rx_node['backend_index'] = 'Not a number'
sender = ssync_sender.Sender(
self.daemon, self.rx_node, job, ['abc'])
success, _ = sender()
@ -713,7 +712,6 @@ class TestSsyncEC(TestBaseSsyncEC):
'policy': policy,
'frag_index': frag_index}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
@ -808,7 +806,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
os.path.basename(os.path.dirname(df._datadir)))
self.job_node = dict(self.rx_node)
self.job_node['index'] = self.rx_node_index
self.job_node['id'] = 0
self.frag_length = int(
self.tx_objs['o1'][0].get_metadata()['Content-Length'])
@ -1082,7 +1080,6 @@ class TestSsyncReplication(TestBaseSsync):
def test_sync(self):
policy = POLICIES.default
rx_node_index = 0
# create sender side diskfiles...
tx_objs = {}
@ -1136,7 +1133,6 @@ class TestSsyncReplication(TestBaseSsync):
'partition': self.partition,
'policy': policy}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
@ -1204,7 +1200,6 @@ class TestSsyncReplication(TestBaseSsync):
def test_meta_file_sync(self):
policy = POLICIES.default
rx_node_index = 0
# create diskfiles...
tx_objs = {}
@ -1309,7 +1304,6 @@ class TestSsyncReplication(TestBaseSsync):
'partition': self.partition,
'policy': policy}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
@ -1352,7 +1346,6 @@ class TestSsyncReplication(TestBaseSsync):
def test_expired_object(self):
# verify that expired objects sync
policy = POLICIES.default
rx_node_index = 0
tx_df_mgr = self.daemon._df_router[policy]
t1 = next(self.ts_iter)
obj_name = 'o1'
@ -1370,7 +1363,6 @@ class TestSsyncReplication(TestBaseSsync):
'partition': self.partition,
'policy': policy}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
@ -1387,7 +1379,6 @@ class TestSsyncReplication(TestBaseSsync):
def _check_no_longer_expired_object(self, obj_name, df, policy):
# verify that objects with x-delete-at metadata that are not expired
# can be sync'd
rx_node_index = 0
def do_ssync():
# create ssync sender instance...
@ -1396,7 +1387,6 @@ class TestSsyncReplication(TestBaseSsync):
'partition': self.partition,
'policy': policy}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
@ -1480,7 +1470,6 @@ class TestSsyncReplication(TestBaseSsync):
# verify that the sender does sync a data file to a legacy receiver,
# but does not PUT meta file content to a legacy receiver
policy = POLICIES.default
rx_node_index = 0
# create diskfiles...
tx_df_mgr = self.daemon._df_router[policy]
@ -1504,7 +1493,6 @@ class TestSsyncReplication(TestBaseSsync):
'partition': self.partition,
'policy': policy}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
@ -1563,7 +1551,6 @@ class TestSsyncReplication(TestBaseSsync):
def test_content_type_sync(self):
policy = POLICIES.default
rx_node_index = 0
# create diskfiles...
tx_objs = {}
@ -1675,7 +1662,6 @@ class TestSsyncReplication(TestBaseSsync):
'partition': self.partition,
'policy': policy}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
@ -1721,5 +1707,6 @@ class TestSsyncReplication(TestBaseSsync):
self.device, self.partition, suffixes, policy)
self.assertEqual(tx_hashes, rx_hashes)
if __name__ == '__main__':
unittest.main()

View File

@ -211,7 +211,6 @@ class TestReceiver(unittest.TestCase):
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(rcvr.policy, POLICIES[1])
self.assertEqual(rcvr.frag_index, 7)
self.assertIsNone(rcvr.node_index)
@unit.patch_policies()
def test_Receiver_with_only_node_index_header(self):
@ -226,13 +225,17 @@ class TestReceiver(unittest.TestCase):
body=':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'
':UPDATES: START\r\n:UPDATES: END\r\n')
with self.assertRaises(HTTPException) as e:
ssync_receiver.Receiver(self.controller, req)
self.assertEqual(e.exception.status_int, 400)
# if a node index is included - it *must* be
# the same value of frag index
self.assertEqual(e.exception.body,
'Frag-Index (None) != Node-Index (7)')
rcvr = ssync_receiver.Receiver(self.controller, req)
body_lines = [chunk.strip() for chunk in rcvr() if chunk.strip()]
self.assertEqual(
body_lines,
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(rcvr.policy, POLICIES[1])
# we used to require the reconstructor to send the frag_index twice as
# two different headers because of evolutionary reasons, now we ignore
# node_index
self.assertEqual(rcvr.frag_index, None)
@unit.patch_policies()
def test_Receiver_with_matched_indexes(self):
@ -256,7 +259,6 @@ class TestReceiver(unittest.TestCase):
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(rcvr.policy, POLICIES[1])
self.assertEqual(rcvr.frag_index, 7)
self.assertEqual(rcvr.node_index, 7)
@unit.patch_policies()
def test_Receiver_with_invalid_indexes(self):
@ -289,8 +291,16 @@ class TestReceiver(unittest.TestCase):
body=':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'
':UPDATES: START\r\n:UPDATES: END\r\n')
self.assertRaises(HTTPException, ssync_receiver.Receiver,
self.controller, req)
rcvr = ssync_receiver.Receiver(self.controller, req)
body_lines = [chunk.strip() for chunk in rcvr() if chunk.strip()]
self.assertEqual(
body_lines,
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(rcvr.policy, POLICIES[1])
# node_index if provided should always match frag_index; but if they
# differ, frag_index takes precedence
self.assertEqual(rcvr.frag_index, 7)
def test_SSYNC_replication_lock_fail(self):
def _mock(path, policy, partition):
@ -2057,7 +2067,8 @@ class TestSsyncRxServer(unittest.TestCase):
sender = ssync_sender.Sender(self.daemon, node, job, ['abc'])
# kick off the sender and let the error trigger failure
with mock.patch('swift.obj.ssync_receiver.Receiver.initialize_request')\
with mock.patch(
'swift.obj.ssync_receiver.Receiver.initialize_request') \
as mock_initialize_request:
mock_initialize_request.side_effect = \
swob.HTTPInternalServerError()

View File

@ -182,7 +182,7 @@ class TestSender(BaseTest):
def test_connect(self):
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1', index=0)
device='sda1', backend_index=0)
job = dict(partition='9', policy=POLICIES[1])
self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc']
@ -236,8 +236,6 @@ class TestSender(BaseTest):
'putheader': [
mock.call('Transfer-Encoding', 'chunked'),
mock.call('X-Backend-Storage-Policy-Index', 1),
mock.call('X-Backend-Ssync-Frag-Index', 9),
mock.call('X-Backend-Ssync-Node-Index', ''),
],
'endheaders': [mock.call()],
}
@ -270,8 +268,6 @@ class TestSender(BaseTest):
'putheader': [
mock.call('Transfer-Encoding', 'chunked'),
mock.call('X-Backend-Storage-Policy-Index', 0),
mock.call('X-Backend-Ssync-Frag-Index', ''),
mock.call('X-Backend-Ssync-Node-Index', ''),
],
'endheaders': [mock.call()],
}
@ -304,8 +300,40 @@ class TestSender(BaseTest):
'putheader': [
mock.call('Transfer-Encoding', 'chunked'),
mock.call('X-Backend-Storage-Policy-Index', 1),
mock.call('X-Backend-Ssync-Frag-Index', ''),
mock.call('X-Backend-Ssync-Node-Index', ''),
],
'endheaders': [mock.call()],
}
for method_name, expected_calls in expectations.items():
mock_method = getattr(mock_conn, method_name)
self.assertEqual(expected_calls, mock_method.mock_calls,
'connection method "%s" got %r not %r' % (
method_name, mock_method.mock_calls,
expected_calls))
def test_connect_handoff_none_frag_to_primary(self):
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1', backend_index=42)
job = dict(partition='9', policy=POLICIES[1], frag_index=None)
self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc']
with mock.patch(
'swift.obj.ssync_sender.SsyncBufferedHTTPConnection'
) as mock_conn_class:
mock_conn = mock_conn_class.return_value
mock_resp = mock.MagicMock()
mock_resp.status = 200
mock_conn.getresponse.return_value = mock_resp
self.sender.connect()
mock_conn_class.assert_called_once_with('1.2.3.4:5678')
expectations = {
'putrequest': [
mock.call('SSYNC', '/sda1/9'),
],
'putheader': [
mock.call('Transfer-Encoding', 'chunked'),
mock.call('X-Backend-Storage-Policy-Index', 1),
mock.call('X-Backend-Ssync-Frag-Index', 42),
mock.call('X-Backend-Ssync-Node-Index', 42),
],
'endheaders': [mock.call()],
}
@ -339,8 +367,6 @@ class TestSender(BaseTest):
'putheader': [
mock.call('Transfer-Encoding', 'chunked'),
mock.call('X-Backend-Storage-Policy-Index', 1),
mock.call('X-Backend-Ssync-Frag-Index', ''),
mock.call('X-Backend-Ssync-Node-Index', ''),
],
'endheaders': [mock.call()],
}