Merge "Make the reaper use same timestamp for replica deletes"
This commit is contained in:
commit
d45b575534
@ -376,6 +376,7 @@ class AccountReaper(Daemon):
|
|||||||
break
|
break
|
||||||
successes = 0
|
successes = 0
|
||||||
failures = 0
|
failures = 0
|
||||||
|
timestamp = Timestamp(time())
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
anode = account_nodes.pop()
|
anode = account_nodes.pop()
|
||||||
try:
|
try:
|
||||||
@ -386,7 +387,8 @@ class AccountReaper(Daemon):
|
|||||||
headers={'X-Account-Host': '%(ip)s:%(port)s' % anode,
|
headers={'X-Account-Host': '%(ip)s:%(port)s' % anode,
|
||||||
'X-Account-Partition': str(account_partition),
|
'X-Account-Partition': str(account_partition),
|
||||||
'X-Account-Device': anode['device'],
|
'X-Account-Device': anode['device'],
|
||||||
'X-Account-Override-Deleted': 'yes'})
|
'X-Account-Override-Deleted': 'yes',
|
||||||
|
'X-Timestamp': timestamp.internal})
|
||||||
successes += 1
|
successes += 1
|
||||||
self.stats_return_codes[2] = \
|
self.stats_return_codes[2] = \
|
||||||
self.stats_return_codes.get(2, 0) + 1
|
self.stats_return_codes.get(2, 0) + 1
|
||||||
@ -443,6 +445,8 @@ class AccountReaper(Daemon):
|
|||||||
part, nodes = ring.get_nodes(account, container, obj)
|
part, nodes = ring.get_nodes(account, container, obj)
|
||||||
successes = 0
|
successes = 0
|
||||||
failures = 0
|
failures = 0
|
||||||
|
timestamp = Timestamp(time())
|
||||||
|
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
cnode = next(cnodes)
|
cnode = next(cnodes)
|
||||||
try:
|
try:
|
||||||
@ -453,7 +457,8 @@ class AccountReaper(Daemon):
|
|||||||
headers={'X-Container-Host': '%(ip)s:%(port)s' % cnode,
|
headers={'X-Container-Host': '%(ip)s:%(port)s' % cnode,
|
||||||
'X-Container-Partition': str(container_partition),
|
'X-Container-Partition': str(container_partition),
|
||||||
'X-Container-Device': cnode['device'],
|
'X-Container-Device': cnode['device'],
|
||||||
'X-Backend-Storage-Policy-Index': policy_index})
|
'X-Backend-Storage-Policy-Index': policy_index,
|
||||||
|
'X-Timestamp': timestamp.internal})
|
||||||
successes += 1
|
successes += 1
|
||||||
self.stats_return_codes[2] = \
|
self.stats_return_codes[2] = \
|
||||||
self.stats_return_codes.get(2, 0) + 1
|
self.stats_return_codes.get(2, 0) + 1
|
||||||
|
@ -204,10 +204,11 @@ def direct_delete_container(node, part, account, container, conn_timeout=5,
|
|||||||
headers = {}
|
headers = {}
|
||||||
|
|
||||||
path = '/%s/%s' % (account, container)
|
path = '/%s/%s' % (account, container)
|
||||||
|
add_timestamp = 'x-timestamp' not in (k.lower() for k in headers)
|
||||||
with Timeout(conn_timeout):
|
with Timeout(conn_timeout):
|
||||||
conn = http_connect(node['ip'], node['port'], node['device'], part,
|
conn = http_connect(node['ip'], node['port'], node['device'], part,
|
||||||
'DELETE', path,
|
'DELETE', path,
|
||||||
headers=gen_headers(headers, True))
|
headers=gen_headers(headers, add_timestamp))
|
||||||
with Timeout(response_timeout):
|
with Timeout(response_timeout):
|
||||||
resp = conn.getresponse()
|
resp = conn.getresponse()
|
||||||
resp.read()
|
resp.read()
|
||||||
|
@ -53,33 +53,96 @@ class TestAccountReaper(ReplProbeTest):
|
|||||||
for node in nodes:
|
for node in nodes:
|
||||||
direct_delete_account(node, part, self.account)
|
direct_delete_account(node, part, self.account)
|
||||||
|
|
||||||
|
# run the reaper
|
||||||
Manager(['account-reaper']).once()
|
Manager(['account-reaper']).once()
|
||||||
|
|
||||||
self.get_to_final_state()
|
|
||||||
|
|
||||||
for policy, container, obj in all_objects:
|
for policy, container, obj in all_objects:
|
||||||
|
# verify that any container deletes were at same timestamp
|
||||||
cpart, cnodes = self.container_ring.get_nodes(
|
cpart, cnodes = self.container_ring.get_nodes(
|
||||||
self.account, container)
|
self.account, container)
|
||||||
|
delete_times = set()
|
||||||
for cnode in cnodes:
|
for cnode in cnodes:
|
||||||
try:
|
try:
|
||||||
direct_head_container(cnode, cpart, self.account,
|
direct_head_container(cnode, cpart, self.account,
|
||||||
container)
|
container)
|
||||||
except ClientException as err:
|
except ClientException as err:
|
||||||
self.assertEquals(err.http_status, 404)
|
self.assertEquals(err.http_status, 404)
|
||||||
|
delete_time = err.http_headers.get(
|
||||||
|
'X-Backend-DELETE-Timestamp')
|
||||||
|
# 'X-Backend-DELETE-Timestamp' confirms it was deleted
|
||||||
|
self.assertTrue(delete_time)
|
||||||
|
delete_times.add(delete_time)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
self.fail('Found un-reaped /%s/%s on %r' %
|
# Container replicas may not yet be deleted if we have a
|
||||||
(self.account, container, node))
|
# policy with object replicas < container replicas, so
|
||||||
|
# ignore successful HEAD. We'll check for all replicas to
|
||||||
|
# be deleted again after running the replicators.
|
||||||
|
pass
|
||||||
|
self.assertEqual(1, len(delete_times), delete_times)
|
||||||
|
|
||||||
|
# verify that all object deletes were at same timestamp
|
||||||
object_ring = POLICIES.get_object_ring(policy.idx, '/etc/swift/')
|
object_ring = POLICIES.get_object_ring(policy.idx, '/etc/swift/')
|
||||||
part, nodes = object_ring.get_nodes(self.account, container, obj)
|
part, nodes = object_ring.get_nodes(self.account, container, obj)
|
||||||
|
headers = {'X-Backend-Storage-Policy-Index': int(policy)}
|
||||||
|
delete_times = set()
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
try:
|
try:
|
||||||
direct_get_object(node, part, self.account,
|
direct_get_object(node, part, self.account,
|
||||||
container, obj)
|
container, obj, headers=headers)
|
||||||
except ClientException as err:
|
except ClientException as err:
|
||||||
self.assertEquals(err.http_status, 404)
|
self.assertEquals(err.http_status, 404)
|
||||||
|
delete_time = err.http_headers.get('X-Backend-Timestamp')
|
||||||
|
# 'X-Backend-Timestamp' confirms obj was deleted
|
||||||
|
self.assertTrue(delete_time)
|
||||||
|
delete_times.add(delete_time)
|
||||||
else:
|
else:
|
||||||
self.fail('Found un-reaped /%s/%s/%s on %r in %s!' %
|
self.fail('Found un-reaped /%s/%s/%s on %r in %s!' %
|
||||||
(self.account, container, obj, node, policy))
|
(self.account, container, obj, node, policy))
|
||||||
|
self.assertEqual(1, len(delete_times))
|
||||||
|
|
||||||
|
# run replicators and updaters
|
||||||
|
self.get_to_final_state()
|
||||||
|
|
||||||
|
for policy, container, obj in all_objects:
|
||||||
|
# verify that ALL container replicas are now deleted
|
||||||
|
cpart, cnodes = self.container_ring.get_nodes(
|
||||||
|
self.account, container)
|
||||||
|
delete_times = set()
|
||||||
|
for cnode in cnodes:
|
||||||
|
try:
|
||||||
|
direct_head_container(cnode, cpart, self.account,
|
||||||
|
container)
|
||||||
|
except ClientException as err:
|
||||||
|
self.assertEquals(err.http_status, 404)
|
||||||
|
delete_time = err.http_headers.get(
|
||||||
|
'X-Backend-DELETE-Timestamp')
|
||||||
|
# 'X-Backend-DELETE-Timestamp' confirms it was deleted
|
||||||
|
self.assertTrue(delete_time)
|
||||||
|
delete_times.add(delete_time)
|
||||||
|
else:
|
||||||
|
self.fail('Found un-reaped /%s/%s on %r' %
|
||||||
|
(self.account, container, cnode))
|
||||||
|
|
||||||
|
# sanity check that object state is still consistent...
|
||||||
|
object_ring = POLICIES.get_object_ring(policy.idx, '/etc/swift/')
|
||||||
|
part, nodes = object_ring.get_nodes(self.account, container, obj)
|
||||||
|
headers = {'X-Backend-Storage-Policy-Index': int(policy)}
|
||||||
|
delete_times = set()
|
||||||
|
for node in nodes:
|
||||||
|
try:
|
||||||
|
direct_get_object(node, part, self.account,
|
||||||
|
container, obj, headers=headers)
|
||||||
|
except ClientException as err:
|
||||||
|
self.assertEquals(err.http_status, 404)
|
||||||
|
delete_time = err.http_headers.get('X-Backend-Timestamp')
|
||||||
|
# 'X-Backend-Timestamp' confirms obj was deleted
|
||||||
|
self.assertTrue(delete_time)
|
||||||
|
delete_times.add(delete_time)
|
||||||
|
else:
|
||||||
|
self.fail('Found un-reaped /%s/%s/%s on %r in %s!' %
|
||||||
|
(self.account, container, obj, node, policy))
|
||||||
|
self.assertEqual(1, len(delete_times))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
@ -278,30 +278,34 @@ class TestReaper(unittest.TestCase):
|
|||||||
'mount_check': 'false',
|
'mount_check': 'false',
|
||||||
}
|
}
|
||||||
r = reaper.AccountReaper(conf, logger=unit.debug_logger())
|
r = reaper.AccountReaper(conf, logger=unit.debug_logger())
|
||||||
ring = unit.FakeRing()
|
|
||||||
mock_path = 'swift.account.reaper.direct_delete_object'
|
mock_path = 'swift.account.reaper.direct_delete_object'
|
||||||
for policy in POLICIES:
|
for policy in POLICIES:
|
||||||
r.reset_stats()
|
r.reset_stats()
|
||||||
with patch(mock_path) as fake_direct_delete:
|
with patch(mock_path) as fake_direct_delete:
|
||||||
r.reap_object('a', 'c', 'partition', cont_nodes, 'o',
|
with patch('swift.account.reaper.time') as mock_time:
|
||||||
policy.idx)
|
mock_time.return_value = 1429117638.86767
|
||||||
for i, call_args in enumerate(
|
r.reap_object('a', 'c', 'partition', cont_nodes, 'o',
|
||||||
fake_direct_delete.call_args_list):
|
policy.idx)
|
||||||
cnode = cont_nodes[i % len(cont_nodes)]
|
mock_time.assert_called_once_with()
|
||||||
host = '%(ip)s:%(port)s' % cnode
|
for i, call_args in enumerate(
|
||||||
device = cnode['device']
|
fake_direct_delete.call_args_list):
|
||||||
headers = {
|
cnode = cont_nodes[i % len(cont_nodes)]
|
||||||
'X-Container-Host': host,
|
host = '%(ip)s:%(port)s' % cnode
|
||||||
'X-Container-Partition': 'partition',
|
device = cnode['device']
|
||||||
'X-Container-Device': device,
|
headers = {
|
||||||
'X-Backend-Storage-Policy-Index': policy.idx
|
'X-Container-Host': host,
|
||||||
}
|
'X-Container-Partition': 'partition',
|
||||||
ring = r.get_object_ring(policy.idx)
|
'X-Container-Device': device,
|
||||||
expected = call(dict(ring.devs[i], index=i), 0,
|
'X-Backend-Storage-Policy-Index': policy.idx,
|
||||||
'a', 'c', 'o',
|
'X-Timestamp': '1429117638.86767'
|
||||||
headers=headers, conn_timeout=0.5,
|
}
|
||||||
response_timeout=10)
|
ring = r.get_object_ring(policy.idx)
|
||||||
self.assertEqual(call_args, expected)
|
expected = call(dict(ring.devs[i], index=i), 0,
|
||||||
|
'a', 'c', 'o',
|
||||||
|
headers=headers, conn_timeout=0.5,
|
||||||
|
response_timeout=10)
|
||||||
|
self.assertEqual(call_args, expected)
|
||||||
|
self.assertEqual(policy.object_ring.replicas - 1, i)
|
||||||
self.assertEqual(r.stats_objects_deleted,
|
self.assertEqual(r.stats_objects_deleted,
|
||||||
policy.object_ring.replicas)
|
policy.object_ring.replicas)
|
||||||
|
|
||||||
@ -366,7 +370,11 @@ class TestReaper(unittest.TestCase):
|
|||||||
return headers, obj_list
|
return headers, obj_list
|
||||||
|
|
||||||
mocks['direct_get_container'].side_effect = fake_get_container
|
mocks['direct_get_container'].side_effect = fake_get_container
|
||||||
r.reap_container('a', 'partition', acc_nodes, 'c')
|
with patch('swift.account.reaper.time') as mock_time:
|
||||||
|
mock_time.side_effect = [1429117638.86767, 1429117639.67676]
|
||||||
|
r.reap_container('a', 'partition', acc_nodes, 'c')
|
||||||
|
|
||||||
|
# verify calls to direct_delete_object
|
||||||
mock_calls = mocks['direct_delete_object'].call_args_list
|
mock_calls = mocks['direct_delete_object'].call_args_list
|
||||||
self.assertEqual(policy.object_ring.replicas, len(mock_calls))
|
self.assertEqual(policy.object_ring.replicas, len(mock_calls))
|
||||||
for call_args in mock_calls:
|
for call_args in mock_calls:
|
||||||
@ -374,8 +382,29 @@ class TestReaper(unittest.TestCase):
|
|||||||
self.assertEqual(kwargs['headers']
|
self.assertEqual(kwargs['headers']
|
||||||
['X-Backend-Storage-Policy-Index'],
|
['X-Backend-Storage-Policy-Index'],
|
||||||
policy.idx)
|
policy.idx)
|
||||||
|
self.assertEqual(kwargs['headers']
|
||||||
|
['X-Timestamp'],
|
||||||
|
'1429117638.86767')
|
||||||
|
|
||||||
|
# verify calls to direct_delete_container
|
||||||
self.assertEquals(mocks['direct_delete_container'].call_count, 3)
|
self.assertEquals(mocks['direct_delete_container'].call_count, 3)
|
||||||
|
for i, call_args in enumerate(
|
||||||
|
mocks['direct_delete_container'].call_args_list):
|
||||||
|
anode = acc_nodes[i % len(acc_nodes)]
|
||||||
|
host = '%(ip)s:%(port)s' % anode
|
||||||
|
device = anode['device']
|
||||||
|
headers = {
|
||||||
|
'X-Account-Host': host,
|
||||||
|
'X-Account-Partition': 'partition',
|
||||||
|
'X-Account-Device': device,
|
||||||
|
'X-Account-Override-Deleted': 'yes',
|
||||||
|
'X-Timestamp': '1429117639.67676'
|
||||||
|
}
|
||||||
|
ring = r.get_object_ring(policy.idx)
|
||||||
|
expected = call(dict(ring.devs[i], index=i), 0, 'a', 'c',
|
||||||
|
headers=headers, conn_timeout=0.5,
|
||||||
|
response_timeout=10)
|
||||||
|
self.assertEqual(call_args, expected)
|
||||||
self.assertEqual(r.stats_objects_deleted, policy.object_ring.replicas)
|
self.assertEqual(r.stats_objects_deleted, policy.object_ring.replicas)
|
||||||
|
|
||||||
def test_reap_container_get_object_fail(self):
|
def test_reap_container_get_object_fail(self):
|
||||||
|
@ -341,6 +341,19 @@ class TestDirectClient(unittest.TestCase):
|
|||||||
self.assertEqual(conn.method, 'DELETE')
|
self.assertEqual(conn.method, 'DELETE')
|
||||||
self.assertEqual(conn.path, self.container_path)
|
self.assertEqual(conn.path, self.container_path)
|
||||||
|
|
||||||
|
def test_direct_delete_container_with_timestamp(self):
|
||||||
|
# ensure timestamp is different from any that might be auto-generated
|
||||||
|
timestamp = Timestamp(time.time() - 100)
|
||||||
|
headers = {'X-Timestamp': timestamp.internal}
|
||||||
|
with mocked_http_conn(200) as conn:
|
||||||
|
direct_client.direct_delete_container(
|
||||||
|
self.node, self.part, self.account, self.container,
|
||||||
|
headers=headers)
|
||||||
|
self.assertEqual(conn.method, 'DELETE')
|
||||||
|
self.assertEqual(conn.path, self.container_path)
|
||||||
|
self.assertTrue('X-Timestamp' in conn.req_headers)
|
||||||
|
self.assertEqual(timestamp, conn.req_headers['X-Timestamp'])
|
||||||
|
|
||||||
def test_direct_delete_container_error(self):
|
def test_direct_delete_container_error(self):
|
||||||
with mocked_http_conn(500) as conn:
|
with mocked_http_conn(500) as conn:
|
||||||
try:
|
try:
|
||||||
@ -536,6 +549,19 @@ class TestDirectClient(unittest.TestCase):
|
|||||||
self.assertEqual(conn.path, self.obj_path)
|
self.assertEqual(conn.path, self.obj_path)
|
||||||
self.assertEqual(resp, None)
|
self.assertEqual(resp, None)
|
||||||
|
|
||||||
|
def test_direct_delete_object_with_timestamp(self):
|
||||||
|
# ensure timestamp is different from any that might be auto-generated
|
||||||
|
timestamp = Timestamp(time.time() - 100)
|
||||||
|
headers = {'X-Timestamp': timestamp.internal}
|
||||||
|
with mocked_http_conn(200) as conn:
|
||||||
|
direct_client.direct_delete_object(
|
||||||
|
self.node, self.part, self.account, self.container, self.obj,
|
||||||
|
headers=headers)
|
||||||
|
self.assertEqual(conn.method, 'DELETE')
|
||||||
|
self.assertEqual(conn.path, self.obj_path)
|
||||||
|
self.assertTrue('X-Timestamp' in conn.req_headers)
|
||||||
|
self.assertEqual(timestamp, conn.req_headers['X-Timestamp'])
|
||||||
|
|
||||||
def test_direct_delete_object_error(self):
|
def test_direct_delete_object_error(self):
|
||||||
with mocked_http_conn(503) as conn:
|
with mocked_http_conn(503) as conn:
|
||||||
try:
|
try:
|
||||||
|
Loading…
Reference in New Issue
Block a user