b6b7578190
It is more convenient to use float node_timeout for fine tunning latency. Change-Id: I7c57bba053711a27d3802efe6f2a0bf53483a54f
833 lines
36 KiB
Python
833 lines
36 KiB
Python
# Copyright (c) 2010-2012 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import os
|
|
import random
|
|
import math
|
|
import time
|
|
import shutil
|
|
import uuid
|
|
import errno
|
|
import re
|
|
from contextlib import contextmanager
|
|
from swift import gettext_ as _
|
|
|
|
from eventlet import GreenPool, sleep, Timeout
|
|
from eventlet.green import subprocess
|
|
|
|
import swift.common.db
|
|
from swift.common.direct_client import quote
|
|
from swift.common.utils import get_logger, whataremyips, storage_directory, \
|
|
renamer, mkdirs, lock_parent_directory, config_true_value, \
|
|
unlink_older_than, dump_recon_cache, rsync_module_interpolation, ismount, \
|
|
json, Timestamp
|
|
from swift.common import ring
|
|
from swift.common.ring.utils import is_local_device
|
|
from swift.common.http import HTTP_NOT_FOUND, HTTP_INSUFFICIENT_STORAGE
|
|
from swift.common.bufferedhttp import BufferedHTTPConnection
|
|
from swift.common.exceptions import DriveNotMounted
|
|
from swift.common.daemon import Daemon
|
|
from swift.common.swob import Response, HTTPNotFound, HTTPNoContent, \
|
|
HTTPAccepted, HTTPBadRequest
|
|
|
|
|
|
DEBUG_TIMINGS_THRESHOLD = 10
|
|
|
|
|
|
def quarantine_db(object_file, server_type):
|
|
"""
|
|
In the case that a corrupt file is found, move it to a quarantined area to
|
|
allow replication to fix it.
|
|
|
|
:param object_file: path to corrupt file
|
|
:param server_type: type of file that is corrupt
|
|
('container' or 'account')
|
|
"""
|
|
object_dir = os.path.dirname(object_file)
|
|
quarantine_dir = os.path.abspath(
|
|
os.path.join(object_dir, '..', '..', '..', '..', 'quarantined',
|
|
server_type + 's', os.path.basename(object_dir)))
|
|
try:
|
|
renamer(object_dir, quarantine_dir, fsync=False)
|
|
except OSError as e:
|
|
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
|
|
raise
|
|
quarantine_dir = "%s-%s" % (quarantine_dir, uuid.uuid4().hex)
|
|
renamer(object_dir, quarantine_dir, fsync=False)
|
|
|
|
|
|
def roundrobin_datadirs(datadirs):
|
|
"""
|
|
Generator to walk the data dirs in a round robin manner, evenly
|
|
hitting each device on the system, and yielding any .db files
|
|
found (in their proper places). The partitions within each data
|
|
dir are walked randomly, however.
|
|
|
|
:param datadirs: a list of (path, node_id) to walk
|
|
:returns: A generator of (partition, path_to_db_file, node_id)
|
|
"""
|
|
|
|
def walk_datadir(datadir, node_id):
|
|
partitions = os.listdir(datadir)
|
|
random.shuffle(partitions)
|
|
for partition in partitions:
|
|
part_dir = os.path.join(datadir, partition)
|
|
if not os.path.isdir(part_dir):
|
|
continue
|
|
suffixes = os.listdir(part_dir)
|
|
if not suffixes:
|
|
os.rmdir(part_dir)
|
|
for suffix in suffixes:
|
|
suff_dir = os.path.join(part_dir, suffix)
|
|
if not os.path.isdir(suff_dir):
|
|
continue
|
|
hashes = os.listdir(suff_dir)
|
|
for hsh in hashes:
|
|
hash_dir = os.path.join(suff_dir, hsh)
|
|
if not os.path.isdir(hash_dir):
|
|
continue
|
|
object_file = os.path.join(hash_dir, hsh + '.db')
|
|
if os.path.exists(object_file):
|
|
yield (partition, object_file, node_id)
|
|
|
|
its = [walk_datadir(datadir, node_id) for datadir, node_id in datadirs]
|
|
while its:
|
|
for it in its:
|
|
try:
|
|
yield next(it)
|
|
except StopIteration:
|
|
its.remove(it)
|
|
|
|
|
|
class ReplConnection(BufferedHTTPConnection):
|
|
"""
|
|
Helper to simplify REPLICATEing to a remote server.
|
|
"""
|
|
|
|
def __init__(self, node, partition, hash_, logger):
|
|
""
|
|
self.logger = logger
|
|
self.node = node
|
|
host = "%s:%s" % (node['replication_ip'], node['replication_port'])
|
|
BufferedHTTPConnection.__init__(self, host)
|
|
self.path = '/%s/%s/%s' % (node['device'], partition, hash_)
|
|
|
|
def replicate(self, *args):
|
|
"""
|
|
Make an HTTP REPLICATE request
|
|
|
|
:param args: list of json-encodable objects
|
|
|
|
:returns: bufferedhttp response object
|
|
"""
|
|
try:
|
|
body = json.dumps(args)
|
|
self.request('REPLICATE', self.path, body,
|
|
{'Content-Type': 'application/json'})
|
|
response = self.getresponse()
|
|
response.data = response.read()
|
|
return response
|
|
except (Exception, Timeout):
|
|
self.logger.exception(
|
|
_('ERROR reading HTTP response from %s'), self.node)
|
|
return None
|
|
|
|
|
|
class Replicator(Daemon):
|
|
"""
|
|
Implements the logic for directing db replication.
|
|
"""
|
|
|
|
def __init__(self, conf, logger=None):
|
|
self.conf = conf
|
|
self.logger = logger or get_logger(conf, log_route='replicator')
|
|
self.root = conf.get('devices', '/srv/node')
|
|
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
|
|
self.bind_ip = conf.get('bind_ip', '0.0.0.0')
|
|
self.port = int(conf.get('bind_port', self.default_port))
|
|
concurrency = int(conf.get('concurrency', 8))
|
|
self.cpool = GreenPool(size=concurrency)
|
|
swift_dir = conf.get('swift_dir', '/etc/swift')
|
|
self.ring = ring.Ring(swift_dir, ring_name=self.server_type)
|
|
self._local_device_ids = set()
|
|
self.per_diff = int(conf.get('per_diff', 1000))
|
|
self.max_diffs = int(conf.get('max_diffs') or 100)
|
|
self.interval = int(conf.get('interval') or
|
|
conf.get('run_pause') or 30)
|
|
self.node_timeout = float(conf.get('node_timeout', 10))
|
|
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
|
|
self.rsync_compress = config_true_value(
|
|
conf.get('rsync_compress', 'no'))
|
|
self.rsync_module = conf.get('rsync_module', '').rstrip('/')
|
|
if not self.rsync_module:
|
|
self.rsync_module = '{replication_ip}::%s' % self.server_type
|
|
if config_true_value(conf.get('vm_test_mode', 'no')):
|
|
self.logger.warn('Option %(type)s-replicator/vm_test_mode is '
|
|
'deprecated and will be removed in a future '
|
|
'version. Update your configuration to use '
|
|
'option %(type)s-replicator/rsync_module.'
|
|
% {'type': self.server_type})
|
|
self.rsync_module += '{replication_port}'
|
|
self.reclaim_age = float(conf.get('reclaim_age', 86400 * 7))
|
|
swift.common.db.DB_PREALLOCATION = \
|
|
config_true_value(conf.get('db_preallocation', 'f'))
|
|
self._zero_stats()
|
|
self.recon_cache_path = conf.get('recon_cache_path',
|
|
'/var/cache/swift')
|
|
self.recon_replicator = '%s.recon' % self.server_type
|
|
self.rcache = os.path.join(self.recon_cache_path,
|
|
self.recon_replicator)
|
|
self.extract_device_re = re.compile('%s%s([^%s]+)' % (
|
|
self.root, os.path.sep, os.path.sep))
|
|
|
|
def _zero_stats(self):
|
|
"""Zero out the stats."""
|
|
self.stats = {'attempted': 0, 'success': 0, 'failure': 0, 'ts_repl': 0,
|
|
'no_change': 0, 'hashmatch': 0, 'rsync': 0, 'diff': 0,
|
|
'remove': 0, 'empty': 0, 'remote_merge': 0,
|
|
'start': time.time(), 'diff_capped': 0,
|
|
'failure_nodes': {}}
|
|
|
|
def _report_stats(self):
|
|
"""Report the current stats to the logs."""
|
|
now = time.time()
|
|
self.logger.info(
|
|
_('Attempted to replicate %(count)d dbs in %(time).5f seconds '
|
|
'(%(rate).5f/s)'),
|
|
{'count': self.stats['attempted'],
|
|
'time': now - self.stats['start'],
|
|
'rate': self.stats['attempted'] /
|
|
(now - self.stats['start'] + 0.0000001)})
|
|
self.logger.info(_('Removed %(remove)d dbs') % self.stats)
|
|
self.logger.info(_('%(success)s successes, %(failure)s failures')
|
|
% self.stats)
|
|
dump_recon_cache(
|
|
{'replication_stats': self.stats,
|
|
'replication_time': now - self.stats['start'],
|
|
'replication_last': now},
|
|
self.rcache, self.logger)
|
|
self.logger.info(' '.join(['%s:%s' % item for item in
|
|
self.stats.items() if item[0] in
|
|
('no_change', 'hashmatch', 'rsync', 'diff', 'ts_repl',
|
|
'empty', 'diff_capped')]))
|
|
|
|
def _add_failure_stats(self, failure_devs_info):
|
|
for node, dev in failure_devs_info:
|
|
self.stats['failure'] += 1
|
|
failure_devs = self.stats['failure_nodes'].setdefault(node, {})
|
|
failure_devs.setdefault(dev, 0)
|
|
failure_devs[dev] += 1
|
|
|
|
def _rsync_file(self, db_file, remote_file, whole_file=True,
|
|
different_region=False):
|
|
"""
|
|
Sync a single file using rsync. Used by _rsync_db to handle syncing.
|
|
|
|
:param db_file: file to be synced
|
|
:param remote_file: remote location to sync the DB file to
|
|
:param whole-file: if True, uses rsync's --whole-file flag
|
|
:param different_region: if True, the destination node is in a
|
|
different region
|
|
|
|
:returns: True if the sync was successful, False otherwise
|
|
"""
|
|
popen_args = ['rsync', '--quiet', '--no-motd',
|
|
'--timeout=%s' % int(math.ceil(self.node_timeout)),
|
|
'--contimeout=%s' % int(math.ceil(self.conn_timeout))]
|
|
if whole_file:
|
|
popen_args.append('--whole-file')
|
|
|
|
if self.rsync_compress and different_region:
|
|
# Allow for compression, but only if the remote node is in
|
|
# a different region than the local one.
|
|
popen_args.append('--compress')
|
|
|
|
popen_args.extend([db_file, remote_file])
|
|
proc = subprocess.Popen(popen_args)
|
|
proc.communicate()
|
|
if proc.returncode != 0:
|
|
self.logger.error(_('ERROR rsync failed with %(code)s: %(args)s'),
|
|
{'code': proc.returncode, 'args': popen_args})
|
|
return proc.returncode == 0
|
|
|
|
def _rsync_db(self, broker, device, http, local_id,
|
|
replicate_method='complete_rsync', replicate_timeout=None,
|
|
different_region=False):
|
|
"""
|
|
Sync a whole db using rsync.
|
|
|
|
:param broker: DB broker object of DB to be synced
|
|
:param device: device to sync to
|
|
:param http: ReplConnection object
|
|
:param local_id: unique ID of the local database replica
|
|
:param replicate_method: remote operation to perform after rsync
|
|
:param replicate_timeout: timeout to wait in seconds
|
|
:param different_region: if True, the destination node is in a
|
|
different region
|
|
"""
|
|
rsync_module = rsync_module_interpolation(self.rsync_module, device)
|
|
rsync_path = '%s/tmp/%s' % (device['device'], local_id)
|
|
remote_file = '%s/%s' % (rsync_module, rsync_path)
|
|
mtime = os.path.getmtime(broker.db_file)
|
|
if not self._rsync_file(broker.db_file, remote_file,
|
|
different_region=different_region):
|
|
return False
|
|
# perform block-level sync if the db was modified during the first sync
|
|
if os.path.exists(broker.db_file + '-journal') or \
|
|
os.path.getmtime(broker.db_file) > mtime:
|
|
# grab a lock so nobody else can modify it
|
|
with broker.lock():
|
|
if not self._rsync_file(broker.db_file, remote_file,
|
|
whole_file=False,
|
|
different_region=different_region):
|
|
return False
|
|
with Timeout(replicate_timeout or self.node_timeout):
|
|
response = http.replicate(replicate_method, local_id)
|
|
return response and response.status >= 200 and response.status < 300
|
|
|
|
def _usync_db(self, point, broker, http, remote_id, local_id):
|
|
"""
|
|
Sync a db by sending all records since the last sync.
|
|
|
|
:param point: synchronization high water mark between the replicas
|
|
:param broker: database broker object
|
|
:param http: ReplConnection object for the remote server
|
|
:param remote_id: database id for the remote replica
|
|
:param local_id: database id for the local replica
|
|
|
|
:returns: boolean indicating completion and success
|
|
"""
|
|
self.stats['diff'] += 1
|
|
self.logger.increment('diffs')
|
|
self.logger.debug('Syncing chunks with %s, starting at %s',
|
|
http.host, point)
|
|
sync_table = broker.get_syncs()
|
|
objects = broker.get_items_since(point, self.per_diff)
|
|
diffs = 0
|
|
while len(objects) and diffs < self.max_diffs:
|
|
diffs += 1
|
|
with Timeout(self.node_timeout):
|
|
response = http.replicate('merge_items', objects, local_id)
|
|
if not response or response.status >= 300 or response.status < 200:
|
|
if response:
|
|
self.logger.error(_('ERROR Bad response %(status)s from '
|
|
'%(host)s'),
|
|
{'status': response.status,
|
|
'host': http.host})
|
|
return False
|
|
# replication relies on db order to send the next merge batch in
|
|
# order with no gaps
|
|
point = objects[-1]['ROWID']
|
|
objects = broker.get_items_since(point, self.per_diff)
|
|
if objects:
|
|
self.logger.debug(
|
|
'Synchronization for %s has fallen more than '
|
|
'%s rows behind; moving on and will try again next pass.',
|
|
broker, self.max_diffs * self.per_diff)
|
|
self.stats['diff_capped'] += 1
|
|
self.logger.increment('diff_caps')
|
|
else:
|
|
with Timeout(self.node_timeout):
|
|
response = http.replicate('merge_syncs', sync_table)
|
|
if response and response.status >= 200 and response.status < 300:
|
|
broker.merge_syncs([{'remote_id': remote_id,
|
|
'sync_point': point}],
|
|
incoming=False)
|
|
return True
|
|
return False
|
|
|
|
def _in_sync(self, rinfo, info, broker, local_sync):
|
|
"""
|
|
Determine whether or not two replicas of a databases are considered
|
|
to be in sync.
|
|
|
|
:param rinfo: remote database info
|
|
:param info: local database info
|
|
:param broker: database broker object
|
|
:param local_sync: cached last sync point between replicas
|
|
|
|
:returns: boolean indicating whether or not the replicas are in sync
|
|
"""
|
|
if max(rinfo['point'], local_sync) >= info['max_row']:
|
|
self.stats['no_change'] += 1
|
|
self.logger.increment('no_changes')
|
|
return True
|
|
if rinfo['hash'] == info['hash']:
|
|
self.stats['hashmatch'] += 1
|
|
self.logger.increment('hashmatches')
|
|
broker.merge_syncs([{'remote_id': rinfo['id'],
|
|
'sync_point': rinfo['point']}],
|
|
incoming=False)
|
|
return True
|
|
|
|
def _http_connect(self, node, partition, db_file):
|
|
"""
|
|
Make an http_connection using ReplConnection
|
|
|
|
:param node: node dictionary from the ring
|
|
:param partition: partition partition to send in the url
|
|
:param db_file: DB file
|
|
|
|
:returns: ReplConnection object
|
|
"""
|
|
return ReplConnection(node, partition,
|
|
os.path.basename(db_file).split('.', 1)[0],
|
|
self.logger)
|
|
|
|
def _gather_sync_args(self, info):
|
|
"""
|
|
Convert local replication_info to sync args tuple.
|
|
"""
|
|
sync_args_order = ('max_row', 'hash', 'id', 'created_at',
|
|
'put_timestamp', 'delete_timestamp', 'metadata')
|
|
return tuple(info[key] for key in sync_args_order)
|
|
|
|
def _repl_to_node(self, node, broker, partition, info,
|
|
different_region=False):
|
|
"""
|
|
Replicate a database to a node.
|
|
|
|
:param node: node dictionary from the ring to be replicated to
|
|
:param broker: DB broker for the DB to be replication
|
|
:param partition: partition on the node to replicate to
|
|
:param info: DB info as a dictionary of {'max_row', 'hash', 'id',
|
|
'created_at', 'put_timestamp', 'delete_timestamp',
|
|
'metadata'}
|
|
:param different_region: if True, the destination node is in a
|
|
different region
|
|
|
|
:returns: True if successful, False otherwise
|
|
"""
|
|
http = self._http_connect(node, partition, broker.db_file)
|
|
sync_args = self._gather_sync_args(info)
|
|
with Timeout(self.node_timeout):
|
|
response = http.replicate('sync', *sync_args)
|
|
if not response:
|
|
return False
|
|
return self._handle_sync_response(node, response, info, broker, http,
|
|
different_region=different_region)
|
|
|
|
def _handle_sync_response(self, node, response, info, broker, http,
|
|
different_region=False):
|
|
if response.status == HTTP_NOT_FOUND: # completely missing, rsync
|
|
self.stats['rsync'] += 1
|
|
self.logger.increment('rsyncs')
|
|
return self._rsync_db(broker, node, http, info['id'],
|
|
different_region=different_region)
|
|
elif response.status == HTTP_INSUFFICIENT_STORAGE:
|
|
raise DriveNotMounted()
|
|
elif response.status >= 200 and response.status < 300:
|
|
rinfo = json.loads(response.data)
|
|
local_sync = broker.get_sync(rinfo['id'], incoming=False)
|
|
if self._in_sync(rinfo, info, broker, local_sync):
|
|
return True
|
|
# if the difference in rowids between the two differs by
|
|
# more than 50% and the difference is greater than per_diff,
|
|
# rsync then do a remote merge.
|
|
# NOTE: difference > per_diff stops us from dropping to rsync
|
|
# on smaller containers, who have only a few rows to sync.
|
|
if rinfo['max_row'] / float(info['max_row']) < 0.5 and \
|
|
info['max_row'] - rinfo['max_row'] > self.per_diff:
|
|
self.stats['remote_merge'] += 1
|
|
self.logger.increment('remote_merges')
|
|
return self._rsync_db(broker, node, http, info['id'],
|
|
replicate_method='rsync_then_merge',
|
|
replicate_timeout=(info['count'] / 2000),
|
|
different_region=different_region)
|
|
# else send diffs over to the remote server
|
|
return self._usync_db(max(rinfo['point'], local_sync),
|
|
broker, http, rinfo['id'], info['id'])
|
|
|
|
def _post_replicate_hook(self, broker, info, responses):
|
|
"""
|
|
:param broker: the container that just replicated
|
|
:param info: pre-replication full info dict
|
|
:param responses: a list of bools indicating success from nodes
|
|
"""
|
|
pass
|
|
|
|
def _replicate_object(self, partition, object_file, node_id):
|
|
"""
|
|
Replicate the db, choosing method based on whether or not it
|
|
already exists on peers.
|
|
|
|
:param partition: partition to be replicated to
|
|
:param object_file: DB file name to be replicated
|
|
:param node_id: node id of the node to be replicated to
|
|
"""
|
|
start_time = now = time.time()
|
|
self.logger.debug('Replicating db %s', object_file)
|
|
self.stats['attempted'] += 1
|
|
self.logger.increment('attempts')
|
|
shouldbehere = True
|
|
try:
|
|
broker = self.brokerclass(object_file, pending_timeout=30)
|
|
broker.reclaim(now - self.reclaim_age,
|
|
now - (self.reclaim_age * 2))
|
|
info = broker.get_replication_info()
|
|
bpart = self.ring.get_part(
|
|
info['account'], info.get('container'))
|
|
if bpart != int(partition):
|
|
partition = bpart
|
|
# Important to set this false here since the later check only
|
|
# checks if it's on the proper device, not partition.
|
|
shouldbehere = False
|
|
name = '/' + quote(info['account'])
|
|
if 'container' in info:
|
|
name += '/' + quote(info['container'])
|
|
self.logger.error(
|
|
'Found %s for %s when it should be on partition %s; will '
|
|
'replicate out and remove.' % (object_file, name, bpart))
|
|
except (Exception, Timeout) as e:
|
|
if 'no such table' in str(e):
|
|
self.logger.error(_('Quarantining DB %s'), object_file)
|
|
quarantine_db(broker.db_file, broker.db_type)
|
|
else:
|
|
self.logger.exception(_('ERROR reading db %s'), object_file)
|
|
nodes = self.ring.get_part_nodes(int(partition))
|
|
self._add_failure_stats([(failure_dev['replication_ip'],
|
|
failure_dev['device'])
|
|
for failure_dev in nodes])
|
|
self.logger.increment('failures')
|
|
return
|
|
# The db is considered deleted if the delete_timestamp value is greater
|
|
# than the put_timestamp, and there are no objects.
|
|
delete_timestamp = Timestamp(info.get('delete_timestamp') or 0)
|
|
put_timestamp = Timestamp(info.get('put_timestamp') or 0)
|
|
if delete_timestamp < (now - self.reclaim_age) and \
|
|
delete_timestamp > put_timestamp and \
|
|
info['count'] in (None, '', 0, '0'):
|
|
if self.report_up_to_date(info):
|
|
self.delete_db(broker)
|
|
self.logger.timing_since('timing', start_time)
|
|
return
|
|
responses = []
|
|
failure_devs_info = set()
|
|
nodes = self.ring.get_part_nodes(int(partition))
|
|
local_dev = None
|
|
for node in nodes:
|
|
if node['id'] == node_id:
|
|
local_dev = node
|
|
break
|
|
if shouldbehere:
|
|
shouldbehere = bool([n for n in nodes if n['id'] == node_id])
|
|
# See Footnote [1] for an explanation of the repl_nodes assignment.
|
|
i = 0
|
|
while i < len(nodes) and nodes[i]['id'] != node_id:
|
|
i += 1
|
|
repl_nodes = nodes[i + 1:] + nodes[:i]
|
|
more_nodes = self.ring.get_more_nodes(int(partition))
|
|
if not local_dev:
|
|
# Check further if local device is a handoff node
|
|
for node in more_nodes:
|
|
if node['id'] == node_id:
|
|
local_dev = node
|
|
break
|
|
for node in repl_nodes:
|
|
different_region = False
|
|
if local_dev and local_dev['region'] != node['region']:
|
|
# This additional information will help later if we
|
|
# want to handle syncing to a node in different
|
|
# region with some optimizations.
|
|
different_region = True
|
|
success = False
|
|
try:
|
|
success = self._repl_to_node(node, broker, partition, info,
|
|
different_region)
|
|
except DriveNotMounted:
|
|
repl_nodes.append(next(more_nodes))
|
|
self.logger.error(_('ERROR Remote drive not mounted %s'), node)
|
|
except (Exception, Timeout):
|
|
self.logger.exception(_('ERROR syncing %(file)s with node'
|
|
' %(node)s'),
|
|
{'file': object_file, 'node': node})
|
|
if not success:
|
|
failure_devs_info.add((node['replication_ip'], node['device']))
|
|
self.logger.increment('successes' if success else 'failures')
|
|
responses.append(success)
|
|
try:
|
|
self._post_replicate_hook(broker, info, responses)
|
|
except (Exception, Timeout):
|
|
self.logger.exception('UNHANDLED EXCEPTION: in post replicate '
|
|
'hook for %s', broker.db_file)
|
|
if not shouldbehere and all(responses):
|
|
# If the db shouldn't be on this node and has been successfully
|
|
# synced to all of its peers, it can be removed.
|
|
if not self.delete_db(broker):
|
|
failure_devs_info.update(
|
|
[(failure_dev['replication_ip'], failure_dev['device'])
|
|
for failure_dev in repl_nodes])
|
|
|
|
target_devs_info = set([(target_dev['replication_ip'],
|
|
target_dev['device'])
|
|
for target_dev in repl_nodes])
|
|
self.stats['success'] += len(target_devs_info - failure_devs_info)
|
|
self._add_failure_stats(failure_devs_info)
|
|
|
|
self.logger.timing_since('timing', start_time)
|
|
|
|
def delete_db(self, broker):
|
|
object_file = broker.db_file
|
|
hash_dir = os.path.dirname(object_file)
|
|
suf_dir = os.path.dirname(hash_dir)
|
|
with lock_parent_directory(object_file):
|
|
shutil.rmtree(hash_dir, True)
|
|
try:
|
|
os.rmdir(suf_dir)
|
|
except OSError as err:
|
|
if err.errno not in (errno.ENOENT, errno.ENOTEMPTY):
|
|
self.logger.exception(
|
|
_('ERROR while trying to clean up %s') % suf_dir)
|
|
return False
|
|
self.stats['remove'] += 1
|
|
device_name = self.extract_device(object_file)
|
|
self.logger.increment('removes.' + device_name)
|
|
return True
|
|
|
|
def extract_device(self, object_file):
|
|
"""
|
|
Extract the device name from an object path. Returns "UNKNOWN" if the
|
|
path could not be extracted successfully for some reason.
|
|
|
|
:param object_file: the path to a database file.
|
|
"""
|
|
match = self.extract_device_re.match(object_file)
|
|
if match:
|
|
return match.groups()[0]
|
|
return "UNKNOWN"
|
|
|
|
def report_up_to_date(self, full_info):
|
|
return True
|
|
|
|
def run_once(self, *args, **kwargs):
|
|
"""Run a replication pass once."""
|
|
self._zero_stats()
|
|
dirs = []
|
|
ips = whataremyips(self.bind_ip)
|
|
if not ips:
|
|
self.logger.error(_('ERROR Failed to get my own IPs?'))
|
|
return
|
|
self._local_device_ids = set()
|
|
for node in self.ring.devs:
|
|
if node and is_local_device(ips, self.port,
|
|
node['replication_ip'],
|
|
node['replication_port']):
|
|
if self.mount_check and not ismount(
|
|
os.path.join(self.root, node['device'])):
|
|
self._add_failure_stats(
|
|
[(failure_dev['replication_ip'],
|
|
failure_dev['device'])
|
|
for failure_dev in self.ring.devs if failure_dev])
|
|
self.logger.warn(
|
|
_('Skipping %(device)s as it is not mounted') % node)
|
|
continue
|
|
unlink_older_than(
|
|
os.path.join(self.root, node['device'], 'tmp'),
|
|
time.time() - self.reclaim_age)
|
|
datadir = os.path.join(self.root, node['device'], self.datadir)
|
|
if os.path.isdir(datadir):
|
|
self._local_device_ids.add(node['id'])
|
|
dirs.append((datadir, node['id']))
|
|
self.logger.info(_('Beginning replication run'))
|
|
for part, object_file, node_id in roundrobin_datadirs(dirs):
|
|
self.cpool.spawn_n(
|
|
self._replicate_object, part, object_file, node_id)
|
|
self.cpool.waitall()
|
|
self.logger.info(_('Replication run OVER'))
|
|
self._report_stats()
|
|
|
|
def run_forever(self, *args, **kwargs):
|
|
"""
|
|
Replicate dbs under the given root in an infinite loop.
|
|
"""
|
|
sleep(random.random() * self.interval)
|
|
while True:
|
|
begin = time.time()
|
|
try:
|
|
self.run_once()
|
|
except (Exception, Timeout):
|
|
self.logger.exception(_('ERROR trying to replicate'))
|
|
elapsed = time.time() - begin
|
|
if elapsed < self.interval:
|
|
sleep(self.interval - elapsed)
|
|
|
|
|
|
class ReplicatorRpc(object):
|
|
"""Handle Replication RPC calls. TODO(redbo): document please :)"""
|
|
|
|
def __init__(self, root, datadir, broker_class, mount_check=True,
|
|
logger=None):
|
|
self.root = root
|
|
self.datadir = datadir
|
|
self.broker_class = broker_class
|
|
self.mount_check = mount_check
|
|
self.logger = logger or get_logger({}, log_route='replicator-rpc')
|
|
|
|
def dispatch(self, replicate_args, args):
|
|
if not hasattr(args, 'pop'):
|
|
return HTTPBadRequest(body='Invalid object type')
|
|
op = args.pop(0)
|
|
drive, partition, hsh = replicate_args
|
|
if self.mount_check and not ismount(os.path.join(self.root, drive)):
|
|
return Response(status='507 %s is not mounted' % drive)
|
|
db_file = os.path.join(self.root, drive,
|
|
storage_directory(self.datadir, partition, hsh),
|
|
hsh + '.db')
|
|
if op == 'rsync_then_merge':
|
|
return self.rsync_then_merge(drive, db_file, args)
|
|
if op == 'complete_rsync':
|
|
return self.complete_rsync(drive, db_file, args)
|
|
else:
|
|
# someone might be about to rsync a db to us,
|
|
# make sure there's a tmp dir to receive it.
|
|
mkdirs(os.path.join(self.root, drive, 'tmp'))
|
|
if not os.path.exists(db_file):
|
|
return HTTPNotFound()
|
|
return getattr(self, op)(self.broker_class(db_file), args)
|
|
|
|
@contextmanager
|
|
def debug_timing(self, name):
|
|
timemark = time.time()
|
|
yield
|
|
timespan = time.time() - timemark
|
|
if timespan > DEBUG_TIMINGS_THRESHOLD:
|
|
self.logger.debug(
|
|
'replicator-rpc-sync time for %s: %.02fs' % (
|
|
name, timespan))
|
|
|
|
def _parse_sync_args(self, args):
|
|
"""
|
|
Convert remote sync args to remote_info dictionary.
|
|
"""
|
|
(remote_sync, hash_, id_, created_at, put_timestamp,
|
|
delete_timestamp, metadata) = args[:7]
|
|
remote_metadata = {}
|
|
if metadata:
|
|
try:
|
|
remote_metadata = json.loads(metadata)
|
|
except ValueError:
|
|
self.logger.error("Unable to decode remote metadata %r",
|
|
metadata)
|
|
remote_info = {
|
|
'point': remote_sync,
|
|
'hash': hash_,
|
|
'id': id_,
|
|
'created_at': created_at,
|
|
'put_timestamp': put_timestamp,
|
|
'delete_timestamp': delete_timestamp,
|
|
'metadata': remote_metadata,
|
|
}
|
|
return remote_info
|
|
|
|
def sync(self, broker, args):
|
|
remote_info = self._parse_sync_args(args)
|
|
return self._handle_sync_request(broker, remote_info)
|
|
|
|
def _get_synced_replication_info(self, broker, remote_info):
|
|
"""
|
|
Apply any changes to the broker based on remote_info and return the
|
|
current replication info.
|
|
|
|
:param broker: the database broker
|
|
:param remote_info: the remote replication info
|
|
|
|
:returns: local broker replication info
|
|
"""
|
|
return broker.get_replication_info()
|
|
|
|
def _handle_sync_request(self, broker, remote_info):
|
|
"""
|
|
Update metadata, timestamps, sync points.
|
|
"""
|
|
with self.debug_timing('info'):
|
|
try:
|
|
info = self._get_synced_replication_info(broker, remote_info)
|
|
except (Exception, Timeout) as e:
|
|
if 'no such table' in str(e):
|
|
self.logger.error(_("Quarantining DB %s"), broker)
|
|
quarantine_db(broker.db_file, broker.db_type)
|
|
return HTTPNotFound()
|
|
raise
|
|
if remote_info['metadata']:
|
|
with self.debug_timing('update_metadata'):
|
|
broker.update_metadata(remote_info['metadata'])
|
|
sync_timestamps = ('created_at', 'put_timestamp', 'delete_timestamp')
|
|
if any(info[ts] != remote_info[ts] for ts in sync_timestamps):
|
|
with self.debug_timing('merge_timestamps'):
|
|
broker.merge_timestamps(*(remote_info[ts] for ts in
|
|
sync_timestamps))
|
|
with self.debug_timing('get_sync'):
|
|
info['point'] = broker.get_sync(remote_info['id'])
|
|
if remote_info['hash'] == info['hash'] and \
|
|
info['point'] < remote_info['point']:
|
|
with self.debug_timing('merge_syncs'):
|
|
translate = {
|
|
'remote_id': 'id',
|
|
'sync_point': 'point',
|
|
}
|
|
data = dict((k, remote_info[v]) for k, v in translate.items())
|
|
broker.merge_syncs([data])
|
|
info['point'] = remote_info['point']
|
|
return Response(json.dumps(info))
|
|
|
|
def merge_syncs(self, broker, args):
|
|
broker.merge_syncs(args[0])
|
|
return HTTPAccepted()
|
|
|
|
def merge_items(self, broker, args):
|
|
broker.merge_items(args[0], args[1])
|
|
return HTTPAccepted()
|
|
|
|
def complete_rsync(self, drive, db_file, args):
|
|
old_filename = os.path.join(self.root, drive, 'tmp', args[0])
|
|
if os.path.exists(db_file):
|
|
return HTTPNotFound()
|
|
if not os.path.exists(old_filename):
|
|
return HTTPNotFound()
|
|
broker = self.broker_class(old_filename)
|
|
broker.newid(args[0])
|
|
renamer(old_filename, db_file)
|
|
return HTTPNoContent()
|
|
|
|
def rsync_then_merge(self, drive, db_file, args):
|
|
old_filename = os.path.join(self.root, drive, 'tmp', args[0])
|
|
if not os.path.exists(db_file) or not os.path.exists(old_filename):
|
|
return HTTPNotFound()
|
|
new_broker = self.broker_class(old_filename)
|
|
existing_broker = self.broker_class(db_file)
|
|
point = -1
|
|
objects = existing_broker.get_items_since(point, 1000)
|
|
while len(objects):
|
|
new_broker.merge_items(objects)
|
|
point = objects[-1]['ROWID']
|
|
objects = existing_broker.get_items_since(point, 1000)
|
|
sleep()
|
|
new_broker.newid(args[0])
|
|
renamer(old_filename, db_file)
|
|
return HTTPNoContent()
|
|
|
|
# Footnote [1]:
|
|
# This orders the nodes so that, given nodes a b c, a will contact b then c,
|
|
# b will contact c then a, and c will contact a then b -- in other words, each
|
|
# node will always contact the next node in the list first.
|
|
# This helps in the case where databases are all way out of sync, so each
|
|
# node is likely to be sending to a different node than it's receiving from,
|
|
# rather than two nodes talking to each other, starving out the third.
|
|
# If the third didn't even have a copy and the first two nodes were way out
|
|
# of sync, such starvation would mean the third node wouldn't get any copy
|
|
# until the first two nodes finally got in sync, which could take a while.
|
|
# This new ordering ensures such starvation doesn't occur, making the data
|
|
# more durable.
|