diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index fa9af4b905..d4523566bf 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -32,8 +32,15 @@ use = egg:swift#object # daemonize = on # run_pause = 30 # concurrency = 1 -# timeout = 300 -# stats_interval = 3600 +# stats_interval = 300 +# max duration of a partition rsync +# rsync_timeout = 600 +# passed to rsync for io op timeout +# rsync_io_timeout = 10 +# max duration of an http request +# http_timeout = 60 +# attempts to kill all workers if nothing replicates for lockup_timeout seconds +# lockup_timeout = 900 # The replicator also performs reclamation # reclaim_age = 604800 diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index 76d95e970e..963f02e375 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -216,14 +216,17 @@ class ObjectReplicator(Daemon): self.swift_dir = conf.get('swift_dir', '/etc/swift') self.port = int(conf.get('bind_port', 6000)) self.concurrency = int(conf.get('concurrency', 1)) - self.timeout = conf.get('timeout', '5') - self.stats_interval = int(conf.get('stats_interval', '3600')) + self.stats_interval = int(conf.get('stats_interval', '300')) self.object_ring = Ring(join(self.swift_dir, 'object.ring.gz')) self.ring_check_interval = int(conf.get('ring_check_interval', 15)) self.next_check = time.time() + self.ring_check_interval self.reclaim_age = int(conf.get('reclaim_age', 86400 * 7)) self.partition_times = [] self.run_pause = int(conf.get('run_pause', 30)) + self.rsync_timeout = int(conf.get('rsync_timeout', 300)) + self.rsync_io_timeout = conf.get('rsync_io_timeout', '10') + self.http_timeout = int(conf.get('http_timeout', 60)) + self.lockup_timeout = int(conf.get('lockup_timeout', 900)) def _rsync(self, args): """ @@ -234,14 +237,15 @@ class ObjectReplicator(Daemon): start_time = time.time() ret_val = None try: - with Timeout(120): + with Timeout(self.rsync_timeout): proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) results = proc.stdout.read() ret_val = proc.wait() - finally: - if ret_val is None: - proc.kill() + except Timeout: + self.logger.error("Killing long-running rsync: %s" % str(args)) + proc.kill() + return 1 # failure response code total_time = time.time() - start_time if results: for result in results.split('\n'): @@ -259,7 +263,7 @@ class ObjectReplicator(Daemon): args[-2], args[-1], total_time, ret_val)) if ret_val: self.logger.error('Bad rsync return code: %d' % ret_val) - return ret_val, results + return ret_val def rsync(self, node, job, suffixes): """ @@ -282,8 +286,8 @@ class ObjectReplicator(Daemon): '--xattrs', '--itemize-changes', '--ignore-existing', - '--timeout=%s' % self.timeout, - '--contimeout=%s' % self.timeout, + '--timeout=%s' % self.rsync_io_timeout, + '--contimeout=%s' % self.rsync_io_timeout, ] if self.vm_test_mode: rsync_module = '%s::object%s' % (node['ip'], node['port']) @@ -299,8 +303,7 @@ class ObjectReplicator(Daemon): return False args.append(join(rsync_module, node['device'], 'objects', job['partition'])) - ret_val, results = self._rsync(args) - return ret_val == 0 + return self._rsync(args) == 0 def check_ring(self): """ @@ -334,7 +337,7 @@ class ObjectReplicator(Daemon): for node in job['nodes']: success = self.rsync(node, job, suffixes) if success: - with Timeout(60): + with Timeout(self.http_timeout): http_connect(node['ip'], node['port'], node['device'], job['partition'], 'REPLICATE', @@ -371,7 +374,7 @@ class ObjectReplicator(Daemon): node = next(nodes) attempts_left -= 1 try: - with Timeout(60): + with Timeout(self.http_timeout): resp = http_connect(node['ip'], node['port'], node['device'], job['partition'], 'REPLICATE', '', headers={'Content-Length': '0'}).getresponse() @@ -394,7 +397,7 @@ class ObjectReplicator(Daemon): self.rsync(node, job, suffixes) recalculate_hashes(job['path'], suffixes, reclaim_age=self.reclaim_age) - with Timeout(60): + with Timeout(self.http_timeout): conn = http_connect(node['ip'], node['port'], node['device'], job['partition'], 'REPLICATE', '/' + '-'.join(suffixes), @@ -448,16 +451,24 @@ class ObjectReplicator(Daemon): def heartbeat(self): """ Loop that runs in the background during replication. It periodically - logs progress and attempts to detect lockups, killing any running - coroutines if the replicator hasn't made progress since last hearbeat. + logs progress. """ while True: + eventlet.sleep(self.stats_interval) + self.stats_line() + + def detect_lockups(self): + """ + In testing, the pool.waitall() call very occasionally failed to return. + This is an attempt to make sure the replicator finishes its replication + pass in some eventuality. + """ + while True: + eventlet.sleep(self.lockup_timeout) if self.replication_count == self.last_replication_count: self.logger.error("Lockup detected.. killing live coros.") self.kill_coros() self.last_replication_count = self.replication_count - eventlet.sleep(300) - self.stats_line() def replicate(self): """Run a replication pass""" @@ -470,6 +481,7 @@ class ObjectReplicator(Daemon): self.partition_times = [] jobs = [] stats = eventlet.spawn(self.heartbeat) + lockup_detector = eventlet.spawn(self.detect_lockups) try: ips = whataremyips() self.run_pool = GreenPool(size=self.concurrency) @@ -508,13 +520,15 @@ class ObjectReplicator(Daemon): self.run_pool.spawn(self.update_deleted, job) else: self.run_pool.spawn(self.update, job) - with Timeout(120): + with Timeout(self.lockup_timeout): self.run_pool.waitall() except (Exception, Timeout): - self.logger.exception("Exception while replicating") + self.logger.exception("Exception in top-level replication loop") self.kill_coros() - self.stats_line() - stats.kill() + finally: + stats.kill() + lockup_detector.kill() + self.stats_line() def run_once(self): start = time.time() diff --git a/swift/obj/server.py b/swift/obj/server.py index 6a15aef35d..e37fa7e782 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -538,13 +538,12 @@ class ObjectController(object): unquote(request.path), 2, 3, True) if self.mount_check and not check_mount(self.devices, device): return Response(status='507 %s is not mounted' % device) - if suffix: - recalculate_hashes(os.path.join(self.devices, device, - DATADIR, partition), suffix.split('-')) - return Response() path = os.path.join(self.devices, device, DATADIR, partition) if not os.path.exists(path): mkdirs(path) + if suffix: + recalculate_hashes(path, suffix.split('-')) + return Response() _, hashes = get_hashes(path, do_listdir=False) return Response(body=pickle.dumps(hashes))