Merge from trunk

This commit is contained in:
gholt 2011-06-01 23:01:04 +00:00
commit 419184c854
3 changed files with 35 additions and 6 deletions

View File

@ -46,8 +46,8 @@ class AccountController(object):
self.root = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')
self.replicator_rpc = \
ReplicatorRpc(self.root, DATADIR, AccountBroker, self.mount_check)
self.replicator_rpc = ReplicatorRpc(self.root, DATADIR, AccountBroker,
self.mount_check, logger=self.logger)
def _get_account_broker(self, drive, part, account):
hsh = hash_path(account)

View File

@ -38,6 +38,9 @@ from swift.common.exceptions import DriveNotMounted, ConnectionTimeout
from swift.common.daemon import Daemon
DEBUG_TIMINGS_THRESHOLD = 10
def quarantine_db(object_file, server_type):
"""
In the case that a corrupt file is found, move it to a quarantined area to
@ -448,11 +451,13 @@ class Replicator(Daemon):
class ReplicatorRpc(object):
"""Handle Replication RPC calls. TODO(redbo): document please :)"""
def __init__(self, root, datadir, broker_class, mount_check=True):
def __init__(self, root, datadir, broker_class, mount_check=True,
logger=None):
self.root = root
self.datadir = datadir
self.broker_class = broker_class
self.mount_check = mount_check
self.logger = logger or get_logger({}, log_route='replicator-rpc')
def dispatch(self, replicate_args, args):
if not hasattr(args, 'pop'):
@ -479,27 +484,51 @@ class ReplicatorRpc(object):
def sync(self, broker, args):
(remote_sync, hash_, id_, created_at, put_timestamp,
delete_timestamp, metadata) = args
timemark = time.time()
try:
info = broker.get_replication_info()
except Exception, e:
if 'no such table' in str(e):
# TODO(unknown): find a real logger
print _("Quarantining DB %s") % broker.db_file
self.logger.error(_("Quarantining DB %s") % broker.db_file)
quarantine_db(broker.db_file, broker.db_type)
return HTTPNotFound()
raise
timespan = time.time() - timemark
if timespan > DEBUG_TIMINGS_THRESHOLD:
self.logger.debug(_('replicator-rpc-sync time for info: %.02fs') %
timespan)
if metadata:
timemark = time.time()
broker.update_metadata(simplejson.loads(metadata))
timespan = time.time() - timemark
if timespan > DEBUG_TIMINGS_THRESHOLD:
self.logger.debug(_('replicator-rpc-sync time for '
'update_metadata: %.02fs') % timespan)
if info['put_timestamp'] != put_timestamp or \
info['created_at'] != created_at or \
info['delete_timestamp'] != delete_timestamp:
timemark = time.time()
broker.merge_timestamps(
created_at, put_timestamp, delete_timestamp)
timespan = time.time() - timemark
if timespan > DEBUG_TIMINGS_THRESHOLD:
self.logger.debug(_('replicator-rpc-sync time for '
'merge_timestamps: %.02fs') % timespan)
timemark = time.time()
info['point'] = broker.get_sync(id_)
timespan = time.time() - timemark
if timespan > DEBUG_TIMINGS_THRESHOLD:
self.logger.debug(_('replicator-rpc-sync time for get_sync: '
'%.02fs') % timespan)
if hash_ == info['hash'] and info['point'] < remote_sync:
timemark = time.time()
broker.merge_syncs([{'remote_id': id_,
'sync_point': remote_sync}])
info['point'] = remote_sync
timespan = time.time() - timemark
if timespan > DEBUG_TIMINGS_THRESHOLD:
self.logger.debug(_('replicator-rpc-sync time for '
'merge_syncs: %.02fs') % timespan)
return Response(simplejson.dumps(info))
def merge_syncs(self, broker, args):

View File

@ -61,7 +61,7 @@ class ContainerController(object):
for h in conf.get('allowed_sync_hosts', '127.0.0.1').split(',')
if h.strip()]
self.replicator_rpc = ReplicatorRpc(self.root, DATADIR,
ContainerBroker, self.mount_check)
ContainerBroker, self.mount_check, logger=self.logger)
def _get_container_broker(self, drive, part, account, container):
"""