Make obj/replicator timeouts configurable

This commit is contained in:
Michael Barton
2010-10-19 15:02:36 +00:00
committed by Tarmac
3 changed files with 48 additions and 28 deletions

View File

@@ -32,8 +32,15 @@ use = egg:swift#object
# daemonize = on # daemonize = on
# run_pause = 30 # run_pause = 30
# concurrency = 1 # concurrency = 1
# timeout = 300 # stats_interval = 300
# stats_interval = 3600 # max duration of a partition rsync
# rsync_timeout = 600
# passed to rsync for io op timeout
# rsync_io_timeout = 10
# max duration of an http request
# http_timeout = 60
# attempts to kill all workers if nothing replicates for lockup_timeout seconds
# lockup_timeout = 900
# The replicator also performs reclamation # The replicator also performs reclamation
# reclaim_age = 604800 # reclaim_age = 604800

View File

@@ -216,14 +216,17 @@ class ObjectReplicator(Daemon):
self.swift_dir = conf.get('swift_dir', '/etc/swift') self.swift_dir = conf.get('swift_dir', '/etc/swift')
self.port = int(conf.get('bind_port', 6000)) self.port = int(conf.get('bind_port', 6000))
self.concurrency = int(conf.get('concurrency', 1)) self.concurrency = int(conf.get('concurrency', 1))
self.timeout = conf.get('timeout', '5') self.stats_interval = int(conf.get('stats_interval', '300'))
self.stats_interval = int(conf.get('stats_interval', '3600'))
self.object_ring = Ring(join(self.swift_dir, 'object.ring.gz')) self.object_ring = Ring(join(self.swift_dir, 'object.ring.gz'))
self.ring_check_interval = int(conf.get('ring_check_interval', 15)) self.ring_check_interval = int(conf.get('ring_check_interval', 15))
self.next_check = time.time() + self.ring_check_interval self.next_check = time.time() + self.ring_check_interval
self.reclaim_age = int(conf.get('reclaim_age', 86400 * 7)) self.reclaim_age = int(conf.get('reclaim_age', 86400 * 7))
self.partition_times = [] self.partition_times = []
self.run_pause = int(conf.get('run_pause', 30)) self.run_pause = int(conf.get('run_pause', 30))
self.rsync_timeout = int(conf.get('rsync_timeout', 300))
self.rsync_io_timeout = conf.get('rsync_io_timeout', '10')
self.http_timeout = int(conf.get('http_timeout', 60))
self.lockup_timeout = int(conf.get('lockup_timeout', 900))
def _rsync(self, args): def _rsync(self, args):
""" """
@@ -234,14 +237,15 @@ class ObjectReplicator(Daemon):
start_time = time.time() start_time = time.time()
ret_val = None ret_val = None
try: try:
with Timeout(120): with Timeout(self.rsync_timeout):
proc = subprocess.Popen(args, stdout=subprocess.PIPE, proc = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT) stderr=subprocess.STDOUT)
results = proc.stdout.read() results = proc.stdout.read()
ret_val = proc.wait() ret_val = proc.wait()
finally: except Timeout:
if ret_val is None: self.logger.error("Killing long-running rsync: %s" % str(args))
proc.kill() proc.kill()
return 1 # failure response code
total_time = time.time() - start_time total_time = time.time() - start_time
if results: if results:
for result in results.split('\n'): for result in results.split('\n'):
@@ -259,7 +263,7 @@ class ObjectReplicator(Daemon):
args[-2], args[-1], total_time, ret_val)) args[-2], args[-1], total_time, ret_val))
if ret_val: if ret_val:
self.logger.error('Bad rsync return code: %d' % ret_val) self.logger.error('Bad rsync return code: %d' % ret_val)
return ret_val, results return ret_val
def rsync(self, node, job, suffixes): def rsync(self, node, job, suffixes):
""" """
@@ -282,8 +286,8 @@ class ObjectReplicator(Daemon):
'--xattrs', '--xattrs',
'--itemize-changes', '--itemize-changes',
'--ignore-existing', '--ignore-existing',
'--timeout=%s' % self.timeout, '--timeout=%s' % self.rsync_io_timeout,
'--contimeout=%s' % self.timeout, '--contimeout=%s' % self.rsync_io_timeout,
] ]
if self.vm_test_mode: if self.vm_test_mode:
rsync_module = '%s::object%s' % (node['ip'], node['port']) rsync_module = '%s::object%s' % (node['ip'], node['port'])
@@ -299,8 +303,7 @@ class ObjectReplicator(Daemon):
return False return False
args.append(join(rsync_module, node['device'], args.append(join(rsync_module, node['device'],
'objects', job['partition'])) 'objects', job['partition']))
ret_val, results = self._rsync(args) return self._rsync(args) == 0
return ret_val == 0
def check_ring(self): def check_ring(self):
""" """
@@ -334,7 +337,7 @@ class ObjectReplicator(Daemon):
for node in job['nodes']: for node in job['nodes']:
success = self.rsync(node, job, suffixes) success = self.rsync(node, job, suffixes)
if success: if success:
with Timeout(60): with Timeout(self.http_timeout):
http_connect(node['ip'], http_connect(node['ip'],
node['port'], node['port'],
node['device'], job['partition'], 'REPLICATE', node['device'], job['partition'], 'REPLICATE',
@@ -371,7 +374,7 @@ class ObjectReplicator(Daemon):
node = next(nodes) node = next(nodes)
attempts_left -= 1 attempts_left -= 1
try: try:
with Timeout(60): with Timeout(self.http_timeout):
resp = http_connect(node['ip'], node['port'], resp = http_connect(node['ip'], node['port'],
node['device'], job['partition'], 'REPLICATE', node['device'], job['partition'], 'REPLICATE',
'', headers={'Content-Length': '0'}).getresponse() '', headers={'Content-Length': '0'}).getresponse()
@@ -394,7 +397,7 @@ class ObjectReplicator(Daemon):
self.rsync(node, job, suffixes) self.rsync(node, job, suffixes)
recalculate_hashes(job['path'], suffixes, recalculate_hashes(job['path'], suffixes,
reclaim_age=self.reclaim_age) reclaim_age=self.reclaim_age)
with Timeout(60): with Timeout(self.http_timeout):
conn = http_connect(node['ip'], node['port'], conn = http_connect(node['ip'], node['port'],
node['device'], job['partition'], 'REPLICATE', node['device'], job['partition'], 'REPLICATE',
'/' + '-'.join(suffixes), '/' + '-'.join(suffixes),
@@ -448,16 +451,24 @@ class ObjectReplicator(Daemon):
def heartbeat(self): def heartbeat(self):
""" """
Loop that runs in the background during replication. It periodically Loop that runs in the background during replication. It periodically
logs progress and attempts to detect lockups, killing any running logs progress.
coroutines if the replicator hasn't made progress since last hearbeat.
""" """
while True: while True:
eventlet.sleep(self.stats_interval)
self.stats_line()
def detect_lockups(self):
"""
In testing, the pool.waitall() call very occasionally failed to return.
This is an attempt to make sure the replicator finishes its replication
pass in some eventuality.
"""
while True:
eventlet.sleep(self.lockup_timeout)
if self.replication_count == self.last_replication_count: if self.replication_count == self.last_replication_count:
self.logger.error("Lockup detected.. killing live coros.") self.logger.error("Lockup detected.. killing live coros.")
self.kill_coros() self.kill_coros()
self.last_replication_count = self.replication_count self.last_replication_count = self.replication_count
eventlet.sleep(300)
self.stats_line()
def replicate(self): def replicate(self):
"""Run a replication pass""" """Run a replication pass"""
@@ -470,6 +481,7 @@ class ObjectReplicator(Daemon):
self.partition_times = [] self.partition_times = []
jobs = [] jobs = []
stats = eventlet.spawn(self.heartbeat) stats = eventlet.spawn(self.heartbeat)
lockup_detector = eventlet.spawn(self.detect_lockups)
try: try:
ips = whataremyips() ips = whataremyips()
self.run_pool = GreenPool(size=self.concurrency) self.run_pool = GreenPool(size=self.concurrency)
@@ -508,13 +520,15 @@ class ObjectReplicator(Daemon):
self.run_pool.spawn(self.update_deleted, job) self.run_pool.spawn(self.update_deleted, job)
else: else:
self.run_pool.spawn(self.update, job) self.run_pool.spawn(self.update, job)
with Timeout(120): with Timeout(self.lockup_timeout):
self.run_pool.waitall() self.run_pool.waitall()
except (Exception, Timeout): except (Exception, Timeout):
self.logger.exception("Exception while replicating") self.logger.exception("Exception in top-level replication loop")
self.kill_coros() self.kill_coros()
self.stats_line() finally:
stats.kill() stats.kill()
lockup_detector.kill()
self.stats_line()
def run_once(self): def run_once(self):
start = time.time() start = time.time()

View File

@@ -538,13 +538,12 @@ class ObjectController(object):
unquote(request.path), 2, 3, True) unquote(request.path), 2, 3, True)
if self.mount_check and not check_mount(self.devices, device): if self.mount_check and not check_mount(self.devices, device):
return Response(status='507 %s is not mounted' % device) return Response(status='507 %s is not mounted' % device)
if suffix:
recalculate_hashes(os.path.join(self.devices, device,
DATADIR, partition), suffix.split('-'))
return Response()
path = os.path.join(self.devices, device, DATADIR, partition) path = os.path.join(self.devices, device, DATADIR, partition)
if not os.path.exists(path): if not os.path.exists(path):
mkdirs(path) mkdirs(path)
if suffix:
recalculate_hashes(path, suffix.split('-'))
return Response()
_, hashes = get_hashes(path, do_listdir=False) _, hashes = get_hashes(path, do_listdir=False)
return Response(body=pickle.dumps(hashes)) return Response(body=pickle.dumps(hashes))