diff --git a/swift/account/reaper.py b/swift/account/reaper.py index c121bf0ea5..e11fea6a47 100644 --- a/swift/account/reaper.py +++ b/swift/account/reaper.py @@ -15,10 +15,12 @@ import os import random +import socket from swift import gettext_ as _ from logging import DEBUG from math import sqrt from time import time +from hashlib import md5 import itertools from eventlet import GreenPool, sleep, Timeout @@ -70,6 +72,7 @@ class AccountReaper(Daemon): self.node_timeout = int(conf.get('node_timeout', 10)) self.conn_timeout = float(conf.get('conn_timeout', 0.5)) self.myips = whataremyips(conf.get('bind_ip', '0.0.0.0')) + self.bind_port = int(conf.get('bind_port', 0)) self.concurrency = int(conf.get('concurrency', 25)) self.container_concurrency = self.object_concurrency = \ sqrt(self.concurrency) @@ -79,6 +82,7 @@ class AccountReaper(Daemon): self.delay_reaping = int(conf.get('delay_reaping') or 0) reap_warn_after = float(conf.get('reap_warn_after') or 86400 * 30) self.reap_not_done_after = reap_warn_after + self.delay_reaping + self.start_time = time() def get_account_ring(self): """The account :class:`swift.common.ring.Ring` for the cluster.""" @@ -161,9 +165,16 @@ class AccountReaper(Daemon): if not partition.isdigit(): continue nodes = self.get_account_ring().get_part_nodes(int(partition)) - if (not is_local_device(self.myips, None, nodes[0]['ip'], None) - or not os.path.isdir(partition_path)): + if not os.path.isdir(partition_path): continue + container_shard = None + for container_shard, node in enumerate(nodes): + if is_local_device(self.myips, None, node['ip'], None) and \ + (not self.bind_port or self.bind_port == node['port']): + break + else: + continue + for suffix in os.listdir(partition_path): suffix_path = os.path.join(partition_path, suffix) if not os.path.isdir(suffix_path): @@ -181,7 +192,9 @@ class AccountReaper(Daemon): AccountBroker(os.path.join(hsh_path, fname)) if broker.is_status_deleted() and \ not broker.empty(): - self.reap_account(broker, partition, nodes) + self.reap_account( + broker, partition, nodes, + container_shard=container_shard) def reset_stats(self): self.stats_return_codes = {} @@ -192,7 +205,7 @@ class AccountReaper(Daemon): self.stats_containers_possibly_remaining = 0 self.stats_objects_possibly_remaining = 0 - def reap_account(self, broker, partition, nodes): + def reap_account(self, broker, partition, nodes, container_shard=None): """ Called once per pass for each account this server is the primary for and attempts to delete the data for the given account. The reaper will @@ -219,6 +232,8 @@ class AccountReaper(Daemon): :param broker: The AccountBroker for the account to delete. :param partition: The partition in the account ring the account is on. :param nodes: The primary node dicts for the account to delete. + :param container_shard: int used to shard containers reaped. If None, + will reap all containers. .. seealso:: @@ -237,16 +252,24 @@ class AccountReaper(Daemon): account = info['account'] self.logger.info(_('Beginning pass on account %s'), account) self.reset_stats() + container_limit = 1000 + if container_shard is not None: + container_limit *= len(nodes) try: marker = '' while True: containers = \ - list(broker.list_containers_iter(1000, marker, None, None, - None)) + list(broker.list_containers_iter(container_limit, marker, + None, None, None)) if not containers: break try: for (container, _junk, _junk, _junk) in containers: + this_shard = int(md5(container).hexdigest(), 16) % \ + len(nodes) + if container_shard not in (this_shard, None): + continue + self.container_pool.spawn(self.reap_container, account, partition, nodes, container) self.container_pool.waitall() @@ -351,6 +374,10 @@ class AccountReaper(Daemon): self.stats_return_codes.get(err.http_status / 100, 0) + 1 self.logger.increment( 'return_codes.%d' % (err.http_status / 100,)) + except (Timeout, socket.error) as err: + self.logger.error( + _('Timeout Exception with %(ip)s:%(port)s/%(device)s'), + node) if not objects: break try: @@ -403,6 +430,12 @@ class AccountReaper(Daemon): self.stats_return_codes.get(err.http_status / 100, 0) + 1 self.logger.increment( 'return_codes.%d' % (err.http_status / 100,)) + except (Timeout, socket.error) as err: + self.logger.error( + _('Timeout Exception with %(ip)s:%(port)s/%(device)s'), + node) + failures += 1 + self.logger.increment('containers_failures') if successes > failures: self.stats_containers_deleted += 1 self.logger.increment('containers_deleted') @@ -473,6 +506,12 @@ class AccountReaper(Daemon): self.stats_return_codes.get(err.http_status / 100, 0) + 1 self.logger.increment( 'return_codes.%d' % (err.http_status / 100,)) + except (Timeout, socket.error) as err: + failures += 1 + self.logger.increment('objects_failures') + self.logger.error( + _('Timeout Exception with %(ip)s:%(port)s/%(device)s'), + node) if successes > failures: self.stats_objects_deleted += 1 self.logger.increment('objects_deleted') diff --git a/test/unit/account/test_reaper.py b/test/unit/account/test_reaper.py index e9776ecc3c..658bdc02cf 100644 --- a/test/unit/account/test_reaper.py +++ b/test/unit/account/test_reaper.py @@ -77,6 +77,7 @@ class FakeBroker(object): class FakeAccountBroker(object): def __init__(self, containers): self.containers = containers + self.containers_yielded = [] def get_info(self): info = {'account': 'a', @@ -101,11 +102,11 @@ class FakeRing(object): 'port': 6002, 'device': None}, {'id': '2', - 'ip': '10.10.10.1', + 'ip': '10.10.10.2', 'port': 6002, 'device': None}, {'id': '3', - 'ip': '10.10.10.1', + 'ip': '10.10.10.3', 'port': 6002, 'device': None}, ] @@ -504,24 +505,26 @@ class TestReaper(unittest.TestCase): self.called_amount = 0 self.r = r = self.init_reaper({}, fakelogger=True) r.start_time = time.time() - ctx = [patch('swift.account.reaper.AccountReaper.reap_container', - self.fake_reap_container), - patch('swift.account.reaper.AccountReaper.get_account_ring', - self.fake_account_ring)] - with nested(*ctx): + with patch('swift.account.reaper.AccountReaper.reap_container', + self.fake_reap_container), \ + patch('swift.account.reaper.AccountReaper.get_account_ring', + self.fake_account_ring): nodes = r.get_account_ring().get_part_nodes() - self.assertTrue(r.reap_account(broker, 'partition', nodes)) + for container_shard, node in enumerate(nodes): + self.assertTrue( + r.reap_account(broker, 'partition', nodes, + container_shard=container_shard)) self.assertEqual(self.called_amount, 4) info_lines = r.logger.get_lines_for_level('info') - self.assertEqual(len(info_lines), 2) - start_line, stat_line = info_lines - self.assertEqual(start_line, 'Beginning pass on account a') - self.assertTrue(stat_line.find('1 containers deleted')) - self.assertTrue(stat_line.find('1 objects deleted')) - self.assertTrue(stat_line.find('1 containers remaining')) - self.assertTrue(stat_line.find('1 objects remaining')) - self.assertTrue(stat_line.find('1 containers possibly remaining')) - self.assertTrue(stat_line.find('1 objects possibly remaining')) + self.assertEqual(len(info_lines), 6) + for start_line, stat_line in zip(*[iter(info_lines)] * 2): + self.assertEqual(start_line, 'Beginning pass on account a') + self.assertTrue(stat_line.find('1 containers deleted')) + self.assertTrue(stat_line.find('1 objects deleted')) + self.assertTrue(stat_line.find('1 containers remaining')) + self.assertTrue(stat_line.find('1 objects remaining')) + self.assertTrue(stat_line.find('1 containers possibly remaining')) + self.assertTrue(stat_line.find('1 objects possibly remaining')) def test_reap_account_no_container(self): broker = FakeAccountBroker(tuple()) @@ -584,6 +587,67 @@ class TestReaper(unittest.TestCase): r.reap_device('sda1') self.assertEqual(self.called_amount, 0) + def test_reap_device_with_sharding(self): + devices = self.prepare_data_dir() + conf = {'devices': devices} + r = self.init_reaper(conf, myips=['10.10.10.2']) + container_shard_used = [-1] + + def fake_reap_account(*args, **kwargs): + container_shard_used[0] = kwargs.get('container_shard') + + with patch('swift.account.reaper.AccountBroker', + FakeAccountBroker), \ + patch('swift.account.reaper.AccountReaper.get_account_ring', + self.fake_account_ring), \ + patch('swift.account.reaper.AccountReaper.reap_account', + fake_reap_account): + r.reap_device('sda1') + # 10.10.10.2 is second node from ring + self.assertEqual(container_shard_used[0], 1) + + def test_reap_account_with_sharding(self): + devices = self.prepare_data_dir() + self.called_amount = 0 + conf = {'devices': devices} + r = self.init_reaper(conf, myips=['10.10.10.2']) + + container_reaped = [0] + + def fake_list_containers_iter(self, *args): + for container in self.containers: + if container in self.containers_yielded: + continue + + yield container, None, None, None + self.containers_yielded.append(container) + + def fake_reap_container(self, account, account_partition, + account_nodes, container): + container_reaped[0] += 1 + + ctx = [patch('swift.account.reaper.AccountBroker', + FakeAccountBroker), + patch('swift.account.reaper.AccountBroker.list_containers_iter', + fake_list_containers_iter), + patch('swift.account.reaper.AccountReaper.reap_container', + fake_reap_container), ] + fake_ring = FakeRing() + with nested(*ctx): + fake_broker = FakeAccountBroker(['c', 'd', 'e']) + r.reap_account(fake_broker, 10, fake_ring.nodes, 0) + self.assertEqual(container_reaped[0], 1) + + fake_broker = FakeAccountBroker(['c', 'd', 'e']) + container_reaped[0] = 0 + r.reap_account(fake_broker, 10, fake_ring.nodes, 1) + self.assertEqual(container_reaped[0], 2) + + container_reaped[0] = 0 + fake_broker = FakeAccountBroker(['c', 'd', 'e']) + r.reap_account(fake_broker, 10, fake_ring.nodes, 2) + self.assertEqual(container_reaped[0], 0) + def test_run_once(self): def prepare_data_dir(): devices_path = tempfile.mkdtemp()