container-updater: temporary account update suppression on errors

This commit is contained in:
gholt 2011-01-26 00:11:49 +00:00 committed by Tarmac
commit edb4e90ebb
4 changed files with 69 additions and 19 deletions

View File

@ -371,19 +371,25 @@ reclaim_age 604800 Time elapsed in seconds before a
[container-updater] [container-updater]
================== ================= ======================================= ======================== ================= ==================================
Option Default Description Option Default Description
------------------ ----------------- --------------------------------------- ------------------------ ----------------- ----------------------------------
log_name container-updater Label used when logging log_name container-updater Label used when logging
log_facility LOG_LOCAL0 Syslog log facility log_facility LOG_LOCAL0 Syslog log facility
log_level INFO Logging level log_level INFO Logging level
interval 300 Minimum time for a pass to take interval 300 Minimum time for a pass to take
concurrency 4 Number of updater workers to spawn concurrency 4 Number of updater workers to spawn
node_timeout 3 Request timeout to external services node_timeout 3 Request timeout to external
conn_timeout 0.5 Connection timeout to external services services
conn_timeout 0.5 Connection timeout to external
services
slowdown 0.01 Time in seconds to wait between slowdown 0.01 Time in seconds to wait between
containers containers
================== ================= ======================================= account_suppression_time 60 Seconds to suppress updating an
account that has generated an
error (timeout, not yet found,
etc.)
======================== ================= ==================================
[container-auditor] [container-auditor]

View File

@ -50,6 +50,8 @@ use = egg:swift#container
# conn_timeout = 0.5 # conn_timeout = 0.5
# slowdown will sleep that amount between containers # slowdown will sleep that amount between containers
# slowdown = 0.01 # slowdown = 0.01
# Seconds to suppress updating an account that has generated an error
# account_suppression_time = 60
[container-auditor] [container-auditor]
# You can override the default log routing for this app here (don't use set!): # You can override the default log routing for this app here (don't use set!):

View File

@ -19,6 +19,7 @@ import signal
import sys import sys
import time import time
from random import random, shuffle from random import random, shuffle
from tempfile import mkstemp
from eventlet import spawn, patcher, Timeout from eventlet import spawn, patcher, Timeout
@ -51,6 +52,10 @@ class ContainerUpdater(Daemon):
self.no_changes = 0 self.no_changes = 0
self.successes = 0 self.successes = 0
self.failures = 0 self.failures = 0
self.account_suppressions = {}
self.account_suppression_time = \
float(conf.get('account_suppression_time', 60))
self.new_account_suppressions = None
def get_account_ring(self): def get_account_ring(self):
"""Get the account ring. Load it if it hasn't been yet.""" """Get the account ring. Load it if it hasn't been yet."""
@ -80,6 +85,19 @@ class ContainerUpdater(Daemon):
shuffle(paths) shuffle(paths)
return paths return paths
def _load_suppressions(self, filename):
try:
with open(filename, 'r') as tmpfile:
for line in tmpfile:
account, until = line.split()
until = float(until)
self.account_suppressions[account] = until
except:
self.logger.exception(
_('ERROR with loading suppressions from %s: ') % filename)
finally:
os.unlink(filename)
def run_forever(self): # pragma: no cover def run_forever(self): # pragma: no cover
""" """
Run the updator continuously. Run the updator continuously.
@ -88,21 +106,33 @@ class ContainerUpdater(Daemon):
while True: while True:
self.logger.info(_('Begin container update sweep')) self.logger.info(_('Begin container update sweep'))
begin = time.time() begin = time.time()
pids = [] now = time.time()
expired_suppressions = \
[a for a, u in self.account_suppressions.iteritems() if u < now]
for account in expired_suppressions:
del self.account_suppressions[account]
pid2filename = {}
# read from account ring to ensure it's fresh # read from account ring to ensure it's fresh
self.get_account_ring().get_nodes('') self.get_account_ring().get_nodes('')
for path in self.get_paths(): for path in self.get_paths():
while len(pids) >= self.concurrency: while len(pid2filename) >= self.concurrency:
pids.remove(os.wait()[0]) pid = os.wait()[0]
try:
self._load_suppressions(pid2filename[pid])
finally:
del pid2filename[pid]
fd, tmpfilename = mkstemp()
os.close(fd)
pid = os.fork() pid = os.fork()
if pid: if pid:
pids.append(pid) pid2filename[pid] = tmpfilename
else: else:
signal.signal(signal.SIGTERM, signal.SIG_DFL) signal.signal(signal.SIGTERM, signal.SIG_DFL)
patcher.monkey_patch(all=False, socket=True) patcher.monkey_patch(all=False, socket=True)
self.no_changes = 0 self.no_changes = 0
self.successes = 0 self.successes = 0
self.failures = 0 self.failures = 0
self.new_account_suppressions = open(tmpfilename, 'w')
forkbegin = time.time() forkbegin = time.time()
self.container_sweep(path) self.container_sweep(path)
elapsed = time.time() - forkbegin elapsed = time.time() - forkbegin
@ -114,8 +144,12 @@ class ContainerUpdater(Daemon):
'success': self.successes, 'fail': self.failures, 'success': self.successes, 'fail': self.failures,
'no_change': self.no_changes}) 'no_change': self.no_changes})
sys.exit() sys.exit()
while pids: while pid2filename:
pids.remove(os.wait()[0]) pid = os.wait()[0]
try:
self._load_suppressions(pid2filename[pid])
finally:
del pid2filename[pid]
elapsed = time.time() - begin elapsed = time.time() - begin
self.logger.info(_('Container update sweep completed: %.02fs'), self.logger.info(_('Container update sweep completed: %.02fs'),
elapsed) elapsed)
@ -165,6 +199,8 @@ class ContainerUpdater(Daemon):
# definitely doesn't have up to date statistics. # definitely doesn't have up to date statistics.
if float(info['put_timestamp']) <= 0: if float(info['put_timestamp']) <= 0:
return return
if self.account_suppressions.get(info['account'], 0) > time.time():
return
if info['put_timestamp'] > info['reported_put_timestamp'] or \ if info['put_timestamp'] > info['reported_put_timestamp'] or \
info['delete_timestamp'] > info['reported_delete_timestamp'] \ info['delete_timestamp'] > info['reported_delete_timestamp'] \
or info['object_count'] != info['reported_object_count'] or \ or info['object_count'] != info['reported_object_count'] or \
@ -195,6 +231,11 @@ class ContainerUpdater(Daemon):
self.logger.debug( self.logger.debug(
_('Update report failed for %(container)s %(dbfile)s'), _('Update report failed for %(container)s %(dbfile)s'),
{'container': container, 'dbfile': dbfile}) {'container': container, 'dbfile': dbfile})
self.account_suppressions[info['account']] = until = \
time.time() + self.account_suppression_time
if self.new_account_suppressions:
print >>self.new_account_suppressions, \
info['account'], until
else: else:
self.no_changes += 1 self.no_changes += 1

View File

@ -78,6 +78,7 @@ class TestContainerUpdater(unittest.TestCase):
'interval': '1', 'interval': '1',
'concurrency': '1', 'concurrency': '1',
'node_timeout': '15', 'node_timeout': '15',
'account_suppression_time': 0
}) })
cu.run_once() cu.run_once()
containers_dir = os.path.join(self.sda1, container_server.DATADIR) containers_dir = os.path.join(self.sda1, container_server.DATADIR)