Merge "Speed up reaper for a big account delete and some better error handling"
This commit is contained in:
@@ -15,10 +15,12 @@
|
||||
|
||||
import os
|
||||
import random
|
||||
import socket
|
||||
from swift import gettext_ as _
|
||||
from logging import DEBUG
|
||||
from math import sqrt
|
||||
from time import time
|
||||
from hashlib import md5
|
||||
import itertools
|
||||
|
||||
from eventlet import GreenPool, sleep, Timeout
|
||||
@@ -70,6 +72,7 @@ class AccountReaper(Daemon):
|
||||
self.node_timeout = int(conf.get('node_timeout', 10))
|
||||
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
|
||||
self.myips = whataremyips(conf.get('bind_ip', '0.0.0.0'))
|
||||
self.bind_port = int(conf.get('bind_port', 0))
|
||||
self.concurrency = int(conf.get('concurrency', 25))
|
||||
self.container_concurrency = self.object_concurrency = \
|
||||
sqrt(self.concurrency)
|
||||
@@ -79,6 +82,7 @@ class AccountReaper(Daemon):
|
||||
self.delay_reaping = int(conf.get('delay_reaping') or 0)
|
||||
reap_warn_after = float(conf.get('reap_warn_after') or 86400 * 30)
|
||||
self.reap_not_done_after = reap_warn_after + self.delay_reaping
|
||||
self.start_time = time()
|
||||
|
||||
def get_account_ring(self):
|
||||
"""The account :class:`swift.common.ring.Ring` for the cluster."""
|
||||
@@ -161,9 +165,16 @@ class AccountReaper(Daemon):
|
||||
if not partition.isdigit():
|
||||
continue
|
||||
nodes = self.get_account_ring().get_part_nodes(int(partition))
|
||||
if (not is_local_device(self.myips, None, nodes[0]['ip'], None)
|
||||
or not os.path.isdir(partition_path)):
|
||||
if not os.path.isdir(partition_path):
|
||||
continue
|
||||
container_shard = None
|
||||
for container_shard, node in enumerate(nodes):
|
||||
if is_local_device(self.myips, None, node['ip'], None) and \
|
||||
(not self.bind_port or self.bind_port == node['port']):
|
||||
break
|
||||
else:
|
||||
continue
|
||||
|
||||
for suffix in os.listdir(partition_path):
|
||||
suffix_path = os.path.join(partition_path, suffix)
|
||||
if not os.path.isdir(suffix_path):
|
||||
@@ -181,7 +192,9 @@ class AccountReaper(Daemon):
|
||||
AccountBroker(os.path.join(hsh_path, fname))
|
||||
if broker.is_status_deleted() and \
|
||||
not broker.empty():
|
||||
self.reap_account(broker, partition, nodes)
|
||||
self.reap_account(
|
||||
broker, partition, nodes,
|
||||
container_shard=container_shard)
|
||||
|
||||
def reset_stats(self):
|
||||
self.stats_return_codes = {}
|
||||
@@ -192,7 +205,7 @@ class AccountReaper(Daemon):
|
||||
self.stats_containers_possibly_remaining = 0
|
||||
self.stats_objects_possibly_remaining = 0
|
||||
|
||||
def reap_account(self, broker, partition, nodes):
|
||||
def reap_account(self, broker, partition, nodes, container_shard=None):
|
||||
"""
|
||||
Called once per pass for each account this server is the primary for
|
||||
and attempts to delete the data for the given account. The reaper will
|
||||
@@ -219,6 +232,8 @@ class AccountReaper(Daemon):
|
||||
:param broker: The AccountBroker for the account to delete.
|
||||
:param partition: The partition in the account ring the account is on.
|
||||
:param nodes: The primary node dicts for the account to delete.
|
||||
:param container_shard: int used to shard containers reaped. If None,
|
||||
will reap all containers.
|
||||
|
||||
.. seealso::
|
||||
|
||||
@@ -237,16 +252,24 @@ class AccountReaper(Daemon):
|
||||
account = info['account']
|
||||
self.logger.info(_('Beginning pass on account %s'), account)
|
||||
self.reset_stats()
|
||||
container_limit = 1000
|
||||
if container_shard is not None:
|
||||
container_limit *= len(nodes)
|
||||
try:
|
||||
marker = ''
|
||||
while True:
|
||||
containers = \
|
||||
list(broker.list_containers_iter(1000, marker, None, None,
|
||||
None))
|
||||
list(broker.list_containers_iter(container_limit, marker,
|
||||
None, None, None))
|
||||
if not containers:
|
||||
break
|
||||
try:
|
||||
for (container, _junk, _junk, _junk) in containers:
|
||||
this_shard = int(md5(container).hexdigest(), 16) % \
|
||||
len(nodes)
|
||||
if container_shard not in (this_shard, None):
|
||||
continue
|
||||
|
||||
self.container_pool.spawn(self.reap_container, account,
|
||||
partition, nodes, container)
|
||||
self.container_pool.waitall()
|
||||
@@ -351,6 +374,10 @@ class AccountReaper(Daemon):
|
||||
self.stats_return_codes.get(err.http_status / 100, 0) + 1
|
||||
self.logger.increment(
|
||||
'return_codes.%d' % (err.http_status / 100,))
|
||||
except (Timeout, socket.error) as err:
|
||||
self.logger.error(
|
||||
_('Timeout Exception with %(ip)s:%(port)s/%(device)s'),
|
||||
node)
|
||||
if not objects:
|
||||
break
|
||||
try:
|
||||
@@ -403,6 +430,12 @@ class AccountReaper(Daemon):
|
||||
self.stats_return_codes.get(err.http_status / 100, 0) + 1
|
||||
self.logger.increment(
|
||||
'return_codes.%d' % (err.http_status / 100,))
|
||||
except (Timeout, socket.error) as err:
|
||||
self.logger.error(
|
||||
_('Timeout Exception with %(ip)s:%(port)s/%(device)s'),
|
||||
node)
|
||||
failures += 1
|
||||
self.logger.increment('containers_failures')
|
||||
if successes > failures:
|
||||
self.stats_containers_deleted += 1
|
||||
self.logger.increment('containers_deleted')
|
||||
@@ -473,6 +506,12 @@ class AccountReaper(Daemon):
|
||||
self.stats_return_codes.get(err.http_status / 100, 0) + 1
|
||||
self.logger.increment(
|
||||
'return_codes.%d' % (err.http_status / 100,))
|
||||
except (Timeout, socket.error) as err:
|
||||
failures += 1
|
||||
self.logger.increment('objects_failures')
|
||||
self.logger.error(
|
||||
_('Timeout Exception with %(ip)s:%(port)s/%(device)s'),
|
||||
node)
|
||||
if successes > failures:
|
||||
self.stats_objects_deleted += 1
|
||||
self.logger.increment('objects_deleted')
|
||||
|
@@ -77,6 +77,7 @@ class FakeBroker(object):
|
||||
class FakeAccountBroker(object):
|
||||
def __init__(self, containers):
|
||||
self.containers = containers
|
||||
self.containers_yielded = []
|
||||
|
||||
def get_info(self):
|
||||
info = {'account': 'a',
|
||||
@@ -101,11 +102,11 @@ class FakeRing(object):
|
||||
'port': 6002,
|
||||
'device': None},
|
||||
{'id': '2',
|
||||
'ip': '10.10.10.1',
|
||||
'ip': '10.10.10.2',
|
||||
'port': 6002,
|
||||
'device': None},
|
||||
{'id': '3',
|
||||
'ip': '10.10.10.1',
|
||||
'ip': '10.10.10.3',
|
||||
'port': 6002,
|
||||
'device': None},
|
||||
]
|
||||
@@ -504,24 +505,26 @@ class TestReaper(unittest.TestCase):
|
||||
self.called_amount = 0
|
||||
self.r = r = self.init_reaper({}, fakelogger=True)
|
||||
r.start_time = time.time()
|
||||
ctx = [patch('swift.account.reaper.AccountReaper.reap_container',
|
||||
self.fake_reap_container),
|
||||
patch('swift.account.reaper.AccountReaper.get_account_ring',
|
||||
self.fake_account_ring)]
|
||||
with nested(*ctx):
|
||||
with patch('swift.account.reaper.AccountReaper.reap_container',
|
||||
self.fake_reap_container), \
|
||||
patch('swift.account.reaper.AccountReaper.get_account_ring',
|
||||
self.fake_account_ring):
|
||||
nodes = r.get_account_ring().get_part_nodes()
|
||||
self.assertTrue(r.reap_account(broker, 'partition', nodes))
|
||||
for container_shard, node in enumerate(nodes):
|
||||
self.assertTrue(
|
||||
r.reap_account(broker, 'partition', nodes,
|
||||
container_shard=container_shard))
|
||||
self.assertEqual(self.called_amount, 4)
|
||||
info_lines = r.logger.get_lines_for_level('info')
|
||||
self.assertEqual(len(info_lines), 2)
|
||||
start_line, stat_line = info_lines
|
||||
self.assertEqual(start_line, 'Beginning pass on account a')
|
||||
self.assertTrue(stat_line.find('1 containers deleted'))
|
||||
self.assertTrue(stat_line.find('1 objects deleted'))
|
||||
self.assertTrue(stat_line.find('1 containers remaining'))
|
||||
self.assertTrue(stat_line.find('1 objects remaining'))
|
||||
self.assertTrue(stat_line.find('1 containers possibly remaining'))
|
||||
self.assertTrue(stat_line.find('1 objects possibly remaining'))
|
||||
self.assertEqual(len(info_lines), 6)
|
||||
for start_line, stat_line in zip(*[iter(info_lines)] * 2):
|
||||
self.assertEqual(start_line, 'Beginning pass on account a')
|
||||
self.assertTrue(stat_line.find('1 containers deleted'))
|
||||
self.assertTrue(stat_line.find('1 objects deleted'))
|
||||
self.assertTrue(stat_line.find('1 containers remaining'))
|
||||
self.assertTrue(stat_line.find('1 objects remaining'))
|
||||
self.assertTrue(stat_line.find('1 containers possibly remaining'))
|
||||
self.assertTrue(stat_line.find('1 objects possibly remaining'))
|
||||
|
||||
def test_reap_account_no_container(self):
|
||||
broker = FakeAccountBroker(tuple())
|
||||
@@ -584,6 +587,67 @@ class TestReaper(unittest.TestCase):
|
||||
r.reap_device('sda1')
|
||||
self.assertEqual(self.called_amount, 0)
|
||||
|
||||
def test_reap_device_with_sharding(self):
|
||||
devices = self.prepare_data_dir()
|
||||
conf = {'devices': devices}
|
||||
r = self.init_reaper(conf, myips=['10.10.10.2'])
|
||||
container_shard_used = [-1]
|
||||
|
||||
def fake_reap_account(*args, **kwargs):
|
||||
container_shard_used[0] = kwargs.get('container_shard')
|
||||
|
||||
with patch('swift.account.reaper.AccountBroker',
|
||||
FakeAccountBroker), \
|
||||
patch('swift.account.reaper.AccountReaper.get_account_ring',
|
||||
self.fake_account_ring), \
|
||||
patch('swift.account.reaper.AccountReaper.reap_account',
|
||||
fake_reap_account):
|
||||
r.reap_device('sda1')
|
||||
# 10.10.10.2 is second node from ring
|
||||
self.assertEqual(container_shard_used[0], 1)
|
||||
|
||||
def test_reap_account_with_sharding(self):
|
||||
devices = self.prepare_data_dir()
|
||||
self.called_amount = 0
|
||||
conf = {'devices': devices}
|
||||
r = self.init_reaper(conf, myips=['10.10.10.2'])
|
||||
|
||||
container_reaped = [0]
|
||||
|
||||
def fake_list_containers_iter(self, *args):
|
||||
for container in self.containers:
|
||||
if container in self.containers_yielded:
|
||||
continue
|
||||
|
||||
yield container, None, None, None
|
||||
self.containers_yielded.append(container)
|
||||
|
||||
def fake_reap_container(self, account, account_partition,
|
||||
account_nodes, container):
|
||||
container_reaped[0] += 1
|
||||
|
||||
ctx = [patch('swift.account.reaper.AccountBroker',
|
||||
FakeAccountBroker),
|
||||
patch('swift.account.reaper.AccountBroker.list_containers_iter',
|
||||
fake_list_containers_iter),
|
||||
patch('swift.account.reaper.AccountReaper.reap_container',
|
||||
fake_reap_container), ]
|
||||
fake_ring = FakeRing()
|
||||
with nested(*ctx):
|
||||
fake_broker = FakeAccountBroker(['c', 'd', 'e'])
|
||||
r.reap_account(fake_broker, 10, fake_ring.nodes, 0)
|
||||
self.assertEqual(container_reaped[0], 1)
|
||||
|
||||
fake_broker = FakeAccountBroker(['c', 'd', 'e'])
|
||||
container_reaped[0] = 0
|
||||
r.reap_account(fake_broker, 10, fake_ring.nodes, 1)
|
||||
self.assertEqual(container_reaped[0], 2)
|
||||
|
||||
container_reaped[0] = 0
|
||||
fake_broker = FakeAccountBroker(['c', 'd', 'e'])
|
||||
r.reap_account(fake_broker, 10, fake_ring.nodes, 2)
|
||||
self.assertEqual(container_reaped[0], 0)
|
||||
|
||||
def test_run_once(self):
|
||||
def prepare_data_dir():
|
||||
devices_path = tempfile.mkdtemp()
|
||||
|
Reference in New Issue
Block a user