Add support for sharding in ContainerBroker

With this patch the ContainerBroker gains several new features:

1. A shard_ranges table to persist ShardRange data, along with
methods to merge and access ShardRange instances to that table,
and to remove expired shard ranges.

2. The ability to create a fresh db file to replace the existing db
file. Fresh db files are named using the hash of the container path
plus an epoch which is a serialized Timestamp value, in the form:

  <hash>_<epoch>.db

During sharding both the fresh and retiring db files co-exist on
disk. The ContainerBroker is now able to choose the newest on disk db
file when instantiated. It also provides a method (get_brokers()) to
gain access to broker instance for either on disk file.

3. Methods to access the current state of the on disk db files i.e.
UNSHARDED (old file only), SHARDING (fresh and retiring files), or
SHARDED (fresh file only with shard ranges).

Container replication is also modified:

1. shard ranges are replicated between container db peers. Unlike
objects, shard ranges are both pushed and pulled during a REPLICATE
event.

2. If a container db is capable of being sharded (i.e. it has a set of
shard ranges) then it will no longer attempt to replicate objects to
its peers. Object record durability is achieved by sharding rather than
peer to peer replication.

Co-Authored-By: Matthew Oliver <matt@oliver.net.au>
Co-Authored-By: Tim Burke <tim.burke@gmail.com>
Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>

Change-Id: Ie4d2816259e6c25c346976e181fb9d350f947190
This commit is contained in:
Alistair Coles 2018-05-01 15:44:18 +01:00
parent a962340dd8
commit 14af38a899
11 changed files with 5064 additions and 157 deletions

View File

@ -298,6 +298,27 @@ def print_db_info_metadata(db_type, info, metadata, drop_prefixes=False):
else:
print('No user metadata found in db file')
if db_type == 'container':
print('Sharding Metadata:')
shard_type = 'root' if info['is_root'] else 'shard'
print(' Type: %s' % shard_type)
print(' State: %s' % info['db_state'])
if info.get('shard_ranges'):
print('Shard Ranges (%d):' % len(info['shard_ranges']))
for srange in info['shard_ranges']:
srange = dict(srange, state_text=srange.state_text)
print(' Name: %(name)s' % srange)
print(' lower: %(lower)r, upper: %(upper)r' % srange)
print(' Object Count: %(object_count)d, Bytes Used: '
'%(bytes_used)d, State: %(state_text)s (%(state)d)'
% srange)
print(' Created at: %s (%s)'
% (Timestamp(srange['timestamp']).isoformat,
srange['timestamp']))
print(' Meta Timestamp: %s (%s)'
% (Timestamp(srange['meta_timestamp']).isoformat,
srange['meta_timestamp']))
def print_obj_metadata(metadata, drop_prefixes=False):
"""
@ -406,7 +427,13 @@ def print_info(db_type, db_file, swift_dir='/etc/swift', stale_reads_ok=False,
raise InfoSystemExit()
raise
account = info['account']
container = info['container'] if db_type == 'container' else None
container = None
if db_type == 'container':
container = info['container']
info['is_root'] = broker.is_root_container()
sranges = broker.get_shard_ranges()
if sranges:
info['shard_ranges'] = sranges
print_db_info_metadata(db_type, info, broker.metadata, drop_prefixes)
try:
ring = Ring(swift_dir, ring_name=db_type)

View File

@ -33,7 +33,8 @@ from swift.common.direct_client import quote
from swift.common.utils import get_logger, whataremyips, storage_directory, \
renamer, mkdirs, lock_parent_directory, config_true_value, \
unlink_older_than, dump_recon_cache, rsync_module_interpolation, \
json, parse_override_options, round_robin_iter, Everything
json, parse_override_options, round_robin_iter, Everything, get_db_files, \
parse_db_filename
from swift.common import ring
from swift.common.ring.utils import is_local_device
from swift.common.http import HTTP_NOT_FOUND, HTTP_INSUFFICIENT_STORAGE, \
@ -120,14 +121,20 @@ def roundrobin_datadirs(datadirs):
if not os.path.isdir(hash_dir):
continue
object_file = os.path.join(hash_dir, hsh + '.db')
# common case
if os.path.exists(object_file):
yield (partition, object_file, context)
else:
try:
os.rmdir(hash_dir)
except OSError as e:
if e.errno != errno.ENOTEMPTY:
raise
continue
# look for any alternate db filenames
db_files = get_db_files(object_file)
if db_files:
yield (partition, db_files[-1], context)
continue
try:
os.rmdir(hash_dir)
except OSError as e:
if e.errno != errno.ENOTEMPTY:
raise
its = [walk_datadir(datadir, context, filt)
for datadir, context, filt in datadirs]
@ -216,7 +223,7 @@ class Replicator(Daemon):
self.stats = {'attempted': 0, 'success': 0, 'failure': 0, 'ts_repl': 0,
'no_change': 0, 'hashmatch': 0, 'rsync': 0, 'diff': 0,
'remove': 0, 'empty': 0, 'remote_merge': 0,
'start': time.time(), 'diff_capped': 0,
'start': time.time(), 'diff_capped': 0, 'deferred': 0,
'failure_nodes': {}}
def _report_stats(self):
@ -313,12 +320,13 @@ class Replicator(Daemon):
different_region=different_region):
return False
with Timeout(replicate_timeout or self.node_timeout):
response = http.replicate(replicate_method, local_id)
response = http.replicate(replicate_method, local_id,
os.path.basename(broker.db_file))
return response and 200 <= response.status < 300
def _send_merge_items(self, http, local_id, items):
def _send_replicate_request(self, http, *repl_args):
with Timeout(self.node_timeout):
response = http.replicate('merge_items', items, local_id)
response = http.replicate(*repl_args)
if not response or not is_success(response.status):
if response:
self.logger.error('ERROR Bad response %s from %s',
@ -350,7 +358,8 @@ class Replicator(Daemon):
diffs = 0
while len(objects) and diffs < self.max_diffs:
diffs += 1
if not self._send_merge_items(http, local_id, objects):
if not self._send_replicate_request(
http, 'merge_items', objects, local_id):
return False
# replication relies on db order to send the next merge batch in
# order with no gaps
@ -413,9 +422,8 @@ class Replicator(Daemon):
:returns: ReplConnection object
"""
return ReplConnection(node, partition,
os.path.basename(db_file).split('.', 1)[0],
self.logger)
hsh, other, ext = parse_db_filename(db_file)
return ReplConnection(node, partition, hsh, self.logger)
def _gather_sync_args(self, info):
"""
@ -931,6 +939,8 @@ class ReplicatorRpc(object):
def complete_rsync(self, drive, db_file, args):
old_filename = os.path.join(self.root, drive, 'tmp', args[0])
if args[1:]:
db_file = os.path.join(os.path.dirname(db_file), args[1])
if os.path.exists(db_file):
return HTTPNotFound()
if not os.path.exists(old_filename):
@ -944,6 +954,10 @@ class ReplicatorRpc(object):
return not (self._db_file_exists(db_file) and
os.path.exists(tmp_filename))
def _post_rsync_then_merge_hook(self, existing_broker, new_broker):
# subclasses may override to make custom changes to the new broker
pass
def rsync_then_merge(self, drive, db_file, args):
tmp_filename = os.path.join(self.root, drive, 'tmp', args[0])
if self._abort_rsync_then_merge(db_file, tmp_filename):
@ -959,6 +973,7 @@ class ReplicatorRpc(object):
objects = existing_broker.get_items_since(point, 1000)
sleep()
new_broker.merge_syncs(existing_broker.get_syncs())
self._post_rsync_then_merge_hook(existing_broker, new_broker)
new_broker.newid(args[0])
new_broker.update_metadata(existing_broker.metadata)
if self._abort_rsync_then_merge(db_file, tmp_filename):

View File

@ -5300,3 +5300,86 @@ def distribute_evenly(items, num_buckets):
for index, item in enumerate(items):
out[index % num_buckets].append(item)
return out
def parse_db_filename(filename):
"""
Splits a db filename into three parts: the hash, the epoch, and the
extension.
>>> parse_db_filename("ab2134.db")
('ab2134', None, '.db')
>>> parse_db_filename("ab2134_1234567890.12345.db")
('ab2134', '1234567890.12345', '.db')
:param filename: A db file basename or path to a db file.
:return: A tuple of (hash , epoch, extension). ``epoch`` may be None.
:raises ValueError: if ``filename`` is not a path to a file.
"""
filename = os.path.basename(filename)
if not filename:
raise ValueError('Path to a file required.')
name, ext = os.path.splitext(filename)
parts = name.split('_')
hash_ = parts.pop(0)
epoch = parts[0] if parts else None
return hash_, epoch, ext
def make_db_file_path(db_path, epoch):
"""
Given a path to a db file, return a modified path whose filename part has
the given epoch.
A db filename takes the form <hash>[_<epoch>].db; this method replaces the
<epoch> part of the given ``db_path`` with the given ``epoch`` value.
:param db_path: Path to a db file that does not necessarily exist.
:param epoch: A string that will be used as the epoch in the new path's
filename; the value will be normalized to the normal string
representation of a :class:`~swift.common.utils.Timestamp`.
:return: A modified path to a db file.
:raises ValueError: if the ``epoch`` is not valid for constructing a
:class:`~swift.common.utils.Timestamp`.
"""
if epoch is None:
raise ValueError('epoch must not be None')
epoch = Timestamp(epoch).normal
hash_, _, ext = parse_db_filename(db_path)
db_dir = os.path.dirname(db_path)
return os.path.join(db_dir, '%s_%s%s' % (hash_, epoch, ext))
def get_db_files(db_path):
"""
Given the path to a db file, return a sorted list of all valid db files
that actually exist in that path's dir. A valid db filename has the form:
<hash>[_<epoch>].db
where <hash> matches the <hash> part of the given db_path as would be
parsed by :meth:`~swift.utils.common.parse_db_filename`.
:param db_path: Path to a db file that does not necessarily exist.
:return: List of valid db files that do exist in the dir of the
``db_path``. This list may be empty.
"""
db_dir, db_file = os.path.split(db_path)
try:
files = os.listdir(db_dir)
except OSError as err:
if err.errno == errno.ENOENT:
return []
raise
if not files:
return []
match_hash, epoch, ext = parse_db_filename(db_file)
results = []
for f in files:
hash_, epoch, ext = parse_db_filename(f)
if ext != '.db':
continue
if hash_ != match_hash:
continue
results.append(os.path.join(db_dir, f))
return sorted(results)

File diff suppressed because it is too large Load Diff

View File

@ -26,9 +26,10 @@ from swift.container.reconciler import (
get_reconciler_container_name, get_row_to_q_entry_translator)
from swift.common import db_replicator
from swift.common.storage_policy import POLICIES
from swift.common.swob import HTTPOk, HTTPAccepted
from swift.common.exceptions import DeviceUnavailable
from swift.common.http import is_success
from swift.common.utils import Timestamp, majority_size
from swift.common.utils import Timestamp, majority_size, get_db_files
class ContainerReplicator(db_replicator.Replicator):
@ -76,9 +77,51 @@ class ContainerReplicator(db_replicator.Replicator):
if any(info[key] != remote_info[key] for key in sync_timestamps):
broker.merge_timestamps(*(remote_info[key] for key in
sync_timestamps))
# Grab remote's shard ranges, too
self._fetch_and_merge_shard_ranges(http, broker)
return super(ContainerReplicator, self)._handle_sync_response(
node, response, info, broker, http, different_region)
def _sync_shard_ranges(self, broker, http, local_id):
# TODO: currently the number of shard ranges is expected to be _much_
# less than normal objects so all are sync'd on each cycle. However, in
# future there should be sync points maintained much like for object
# syncing so that only new shard range rows are sync'd.
shard_range_data = broker.get_all_shard_range_data()
if shard_range_data:
if not self._send_replicate_request(
http, 'merge_shard_ranges', shard_range_data, local_id):
return False
self.logger.debug('%s synced %s shard ranges to %s',
broker.db_file, len(shard_range_data),
'%(ip)s:%(port)s/%(device)s' % http.node)
return True
def _choose_replication_mode(self, node, rinfo, info, local_sync, broker,
http, different_region):
# Always replicate shard ranges
shard_range_success = self._sync_shard_ranges(broker, http, info['id'])
if broker.sharding_initiated():
self.logger.warning(
'%s is able to shard -- refusing to replicate objects to peer '
'%s; have shard ranges and will wait for cleaving',
broker.db_file,
'%(ip)s:%(port)s/%(device)s' % node)
self.stats['deferred'] += 1
return shard_range_success
success = super(ContainerReplicator, self)._choose_replication_mode(
node, rinfo, info, local_sync, broker, http,
different_region)
return shard_range_success and success
def _fetch_and_merge_shard_ranges(self, http, broker):
response = http.replicate('get_shard_ranges')
if is_success(response.status):
broker.merge_shard_ranges(json.loads(response.data))
def find_local_handoff_for_part(self, part):
"""
Look through devices in the ring for the first handoff device that was
@ -202,6 +245,18 @@ class ContainerReplicator(db_replicator.Replicator):
# replication
broker.update_reconciler_sync(max_sync)
def cleanup_post_replicate(self, broker, orig_info, responses):
debug_template = 'Not deleting db %s (%%s)' % broker.db_file
if broker.sharding_required():
# despite being a handoff, since we're sharding we're not going to
# do any cleanup so we can continue cleaving - this is still
# considered "success"
reason = 'requires sharding, state %s' % broker.get_db_state()
self.logger.debug(debug_template, reason)
return True
return super(ContainerReplicator, self).cleanup_post_replicate(
broker, orig_info, responses)
def delete_db(self, broker):
"""
Ensure that reconciler databases are only cleaned up at the end of the
@ -255,9 +310,20 @@ class ContainerReplicator(db_replicator.Replicator):
self.replicate_reconcilers()
return rv
def _in_sync(self, rinfo, info, broker, local_sync):
# TODO: don't always sync shard ranges!
if broker.get_shard_ranges(include_own=True, include_deleted=True):
return False
return super(ContainerReplicator, self)._in_sync(
rinfo, info, broker, local_sync)
class ContainerReplicatorRpc(db_replicator.ReplicatorRpc):
def _db_file_exists(self, db_path):
return bool(get_db_files(db_path))
def _parse_sync_args(self, args):
parent = super(ContainerReplicatorRpc, self)
remote_info = parent._parse_sync_args(args)
@ -285,3 +351,27 @@ class ContainerReplicatorRpc(db_replicator.ReplicatorRpc):
timestamp=status_changed_at)
info = broker.get_replication_info()
return info
def _abort_rsync_then_merge(self, db_file, old_filename):
if super(ContainerReplicatorRpc, self)._abort_rsync_then_merge(
db_file, old_filename):
return True
# if the local db has started sharding since the original 'sync'
# request then abort object replication now; instantiate a fresh broker
# each time this check if performed so to get latest state
broker = ContainerBroker(db_file)
return broker.sharding_initiated()
def _post_rsync_then_merge_hook(self, existing_broker, new_broker):
# Note the following hook will need to change to using a pointer and
# limit in the future.
new_broker.merge_shard_ranges(
existing_broker.get_all_shard_range_data())
def merge_shard_ranges(self, broker, args):
broker.merge_shard_ranges(args[0])
return HTTPAccepted()
def get_shard_ranges(self, broker, args):
return HTTPOk(headers={'Content-Type': 'application/json'},
body=json.dumps(broker.get_all_shard_range_data()))

View File

@ -31,6 +31,7 @@ from swift.cli.info import (print_db_info_metadata, print_ring_locations,
parse_get_node_args)
from swift.account.server import AccountController
from swift.container.server import ContainerController
from swift.container.backend import UNSHARDED, SHARDED
from swift.obj.diskfile import write_metadata
@ -103,17 +104,18 @@ class TestCliInfo(TestCliInfoBase):
self.assertRaisesMessage(ValueError, 'Info is incomplete',
print_db_info_metadata, 'container', {}, {})
info = dict(
account='acct',
created_at=100.1,
put_timestamp=106.3,
delete_timestamp=107.9,
status_changed_at=108.3,
container_count='3',
object_count='20',
bytes_used='42')
info['hash'] = 'abaddeadbeefcafe'
info['id'] = 'abadf100d0ddba11'
info = {
'account': 'acct',
'created_at': 100.1,
'put_timestamp': 106.3,
'delete_timestamp': 107.9,
'status_changed_at': 108.3,
'container_count': '3',
'object_count': '20',
'bytes_used': '42',
'hash': 'abaddeadbeefcafe',
'id': 'abadf100d0ddba11',
}
md = {'x-account-meta-mydata': ('swift', '0000000000.00000'),
'x-other-something': ('boo', '0000000000.00000')}
out = StringIO()
@ -154,7 +156,9 @@ No system metadata found in db file
reported_object_count='20',
reported_bytes_used='42',
x_container_foo='bar',
x_container_bar='goo')
x_container_bar='goo',
db_state=UNSHARDED,
is_root=True)
info['hash'] = 'abaddeadbeefcafe'
info['id'] = 'abadf100d0ddba11'
md = {'x-container-sysmeta-mydata': ('swift', '0000000000.00000')}
@ -182,10 +186,88 @@ Metadata:
X-Container-Bar: goo
X-Container-Foo: bar
System Metadata: {'mydata': 'swift'}
No user metadata found in db file''' % POLICIES[0].name
No user metadata found in db file
Sharding Metadata:
Type: root
State: unsharded''' % POLICIES[0].name
self.assertEqual(sorted(out.getvalue().strip().split('\n')),
sorted(exp_out.split('\n')))
def test_print_db_info_metadata_with_shard_ranges(self):
shard_ranges = [utils.ShardRange(
name='.sharded_a/shard_range_%s' % i,
timestamp=utils.Timestamp(i), lower='%da' % i,
upper='%dz' % i, object_count=i, bytes_used=i,
meta_timestamp=utils.Timestamp(i)) for i in range(1, 4)]
shard_ranges[0].state = utils.ShardRange.CLEAVED
shard_ranges[1].state = utils.ShardRange.CREATED
info = dict(
account='acct',
container='cont',
storage_policy_index=0,
created_at='0000000100.10000',
put_timestamp='0000000106.30000',
delete_timestamp='0000000107.90000',
status_changed_at='0000000108.30000',
object_count='20',
bytes_used='42',
reported_put_timestamp='0000010106.30000',
reported_delete_timestamp='0000010107.90000',
reported_object_count='20',
reported_bytes_used='42',
db_state=SHARDED,
is_root=True,
shard_ranges=shard_ranges)
info['hash'] = 'abaddeadbeefcafe'
info['id'] = 'abadf100d0ddba11'
out = StringIO()
with mock.patch('sys.stdout', out):
print_db_info_metadata('container', info, {})
exp_out = '''Path: /acct/cont
Account: acct
Container: cont
Container Hash: d49d0ecbb53be1fcc49624f2f7c7ccae
Metadata:
Created at: 1970-01-01T00:01:40.100000 (0000000100.10000)
Put Timestamp: 1970-01-01T00:01:46.300000 (0000000106.30000)
Delete Timestamp: 1970-01-01T00:01:47.900000 (0000000107.90000)
Status Timestamp: 1970-01-01T00:01:48.300000 (0000000108.30000)
Object Count: 20
Bytes Used: 42
Storage Policy: %s (0)
Reported Put Timestamp: 1970-01-01T02:48:26.300000 (0000010106.30000)
Reported Delete Timestamp: 1970-01-01T02:48:27.900000 (0000010107.90000)
Reported Object Count: 20
Reported Bytes Used: 42
Chexor: abaddeadbeefcafe
UUID: abadf100d0ddba11
No system metadata found in db file
No user metadata found in db file
Sharding Metadata:
Type: root
State: sharded
Shard Ranges (3):
Name: .sharded_a/shard_range_1
lower: '1a', upper: '1z'
Object Count: 1, Bytes Used: 1, State: cleaved (30)
Created at: 1970-01-01T00:00:01.000000 (0000000001.00000)
Meta Timestamp: 1970-01-01T00:00:01.000000 (0000000001.00000)
Name: .sharded_a/shard_range_2
lower: '2a', upper: '2z'
Object Count: 2, Bytes Used: 2, State: created (20)
Created at: 1970-01-01T00:00:02.000000 (0000000002.00000)
Meta Timestamp: 1970-01-01T00:00:02.000000 (0000000002.00000)
Name: .sharded_a/shard_range_3
lower: '3a', upper: '3z'
Object Count: 3, Bytes Used: 3, State: found (10)
Created at: 1970-01-01T00:00:03.000000 (0000000003.00000)
Meta Timestamp: 1970-01-01T00:00:03.000000 (0000000003.00000)''' %\
POLICIES[0].name
self.assertEqual(sorted(out.getvalue().strip().split('\n')),
sorted(exp_out.strip().split('\n')))
def test_print_ring_locations_invalid_args(self):
self.assertRaises(ValueError, print_ring_locations,
None, 'dir', 'acct')
@ -423,14 +505,8 @@ No user metadata found in db file''' % POLICIES[0].name
'1', 'b47',
'dc5be2aa4347a22a0fee6bc7de505b47',
'dc5be2aa4347a22a0fee6bc7de505b47.db')
try:
print_info('account', db_file, swift_dir=self.testdir)
except Exception:
exp_raised = True
if exp_raised:
self.fail("Unexpected exception raised")
else:
self.assertGreater(len(out.getvalue().strip()), 800)
print_info('account', db_file, swift_dir=self.testdir)
self.assertGreater(len(out.getvalue().strip()), 800)
controller = ContainerController(
{'devices': self.testdir, 'mount_check': 'false'})

View File

@ -274,6 +274,9 @@ class FakeBroker(object):
self.put_timestamp = put_timestamp
self.delete_timestamp = delete_timestamp
def get_brokers(self):
return [self]
class FakeAccountBroker(FakeBroker):
db_type = 'account'
@ -1205,7 +1208,7 @@ class TestDBReplicator(unittest.TestCase):
unit.mock_check_drive(isdir=True):
mock_os.path.exists.side_effect = [False, True]
response = rpc.dispatch(('drive', 'part', 'hash'),
['complete_rsync', 'arg1', 'arg2'])
['complete_rsync', 'arg1'])
expected_calls = [call('/part/ash/hash/hash.db'),
call('/drive/tmp/arg1')]
self.assertEqual(mock_os.path.exists.call_args_list,
@ -1213,6 +1216,19 @@ class TestDBReplicator(unittest.TestCase):
self.assertEqual('204 No Content', response.status)
self.assertEqual(204, response.status_int)
with patch('swift.common.db_replicator.os',
new=mock.MagicMock(wraps=os)) as mock_os, \
unit.mock_check_drive(isdir=True):
mock_os.path.exists.side_effect = [False, True]
response = rpc.dispatch(('drive', 'part', 'hash'),
['complete_rsync', 'arg1', 'arg2'])
expected_calls = [call('/part/ash/hash/arg2'),
call('/drive/tmp/arg1')]
self.assertEqual(mock_os.path.exists.call_args_list,
expected_calls)
self.assertEqual('204 No Content', response.status)
self.assertEqual(204, response.status_int)
def test_rsync_then_merge_db_does_not_exist(self):
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
mount_check=False)
@ -1267,13 +1283,22 @@ class TestDBReplicator(unittest.TestCase):
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
mount_check=False)
with patch('swift.common.db_replicator.os',
new=mock.MagicMock(wraps=os)) as mock_os, \
unit.mock_check_drive(isdir=True):
mock_os.path.exists.return_value = True
response = rpc.complete_rsync('drive', '/data/db.db', ['arg1'])
mock_os.path.exists.assert_called_with('/data/db.db')
self.assertEqual('404 Not Found', response.status)
self.assertEqual(404, response.status_int)
with patch('swift.common.db_replicator.os',
new=mock.MagicMock(wraps=os)) as mock_os, \
unit.mock_check_drive(isdir=True):
mock_os.path.exists.return_value = True
response = rpc.complete_rsync('drive', '/data/db.db',
['arg1', 'arg2'])
mock_os.path.exists.assert_called_with('/data/db.db')
mock_os.path.exists.assert_called_with('/data/arg2')
self.assertEqual('404 Not Found', response.status)
self.assertEqual(404, response.status_int)
@ -1286,37 +1311,57 @@ class TestDBReplicator(unittest.TestCase):
unit.mock_check_drive(isdir=True):
mock_os.path.exists.return_value = False
response = rpc.complete_rsync('drive', '/data/db.db',
['arg1', 'arg2'])
['arg1'])
expected_calls = [call('/data/db.db'), call('/drive/tmp/arg1')]
self.assertEqual(expected_calls,
mock_os.path.exists.call_args_list)
self.assertEqual('404 Not Found', response.status)
self.assertEqual(404, response.status_int)
with patch('swift.common.db_replicator.os',
new=mock.MagicMock(wraps=os)) as mock_os, \
unit.mock_check_drive(isdir=True):
mock_os.path.exists.return_value = False
response = rpc.complete_rsync('drive', '/data/db.db',
['arg1', 'arg2'])
expected_calls = [call('/data/arg2'), call('/drive/tmp/arg1')]
self.assertEqual(expected_calls,
mock_os.path.exists.call_args_list)
self.assertEqual('404 Not Found', response.status)
self.assertEqual(404, response.status_int)
def test_complete_rsync_rename(self):
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
mount_check=False)
def mock_exists(path):
if path == '/data/db.db':
return False
self.assertEqual('/drive/tmp/arg1', path)
return True
def mock_renamer(old, new):
self.assertEqual('/drive/tmp/arg1', old)
self.assertEqual('/data/db.db', new)
renamer_calls.append((old, new))
self._patch(patch.object, db_replicator, 'renamer', mock_renamer)
renamer_calls = []
with patch('swift.common.db_replicator.os',
new=mock.MagicMock(wraps=os)) as mock_os, \
unit.mock_check_drive(isdir=True):
mock_os.path.exists.side_effect = [False, True]
response = rpc.complete_rsync('drive', '/data/db.db',
['arg1'])
self.assertEqual('204 No Content', response.status)
self.assertEqual(204, response.status_int)
self.assertEqual(('/drive/tmp/arg1', '/data/db.db'), renamer_calls[0])
self.assertFalse(renamer_calls[1:])
renamer_calls = []
with patch('swift.common.db_replicator.os',
new=mock.MagicMock(wraps=os)) as mock_os, \
unit.mock_check_drive(isdir=True):
mock_os.path.exists.side_effect = [False, True]
response = rpc.complete_rsync('drive', '/data/db.db',
['arg1', 'arg2'])
self.assertEqual('204 No Content', response.status)
self.assertEqual(204, response.status_int)
self.assertEqual('204 No Content', response.status)
self.assertEqual(204, response.status_int)
self.assertEqual(('/drive/tmp/arg1', '/data/arg2'), renamer_calls[0])
self.assertFalse(renamer_calls[1:])
def test_replicator_sync_with_broker_replication_missing_table(self):
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
@ -1675,10 +1720,10 @@ class TestDBReplicator(unittest.TestCase):
db_file = __file__
replicator = TestReplicator({})
replicator._http_connect(node, partition, db_file)
expected_hsh = os.path.basename(db_file).split('.', 1)[0]
expected_hsh = expected_hsh.split('_', 1)[0]
db_replicator.ReplConnection.assert_has_calls([
mock.call(node, partition,
os.path.basename(db_file).split('.', 1)[0],
replicator.logger)])
mock.call(node, partition, expected_hsh, replicator.logger)])
class TestHandoffsOnly(unittest.TestCase):

View File

@ -3878,6 +3878,47 @@ cluster_dfw1 = http://dfw1.host/v1/
found = utils.find_shard_range('l', overlapping_ranges)
self.assertEqual(found, ktol)
def test_parse_db_filename(self):
actual = utils.parse_db_filename('hash.db')
self.assertEqual(('hash', None, '.db'), actual)
actual = utils.parse_db_filename('hash_1234567890.12345.db')
self.assertEqual(('hash', '1234567890.12345', '.db'), actual)
actual = utils.parse_db_filename(
'/dev/containers/part/ash/hash/hash_1234567890.12345.db')
self.assertEqual(('hash', '1234567890.12345', '.db'), actual)
self.assertRaises(ValueError, utils.parse_db_filename, '/path/to/dir/')
# These shouldn't come up in practice; included for completeness
self.assertEqual(utils.parse_db_filename('hashunder_.db'),
('hashunder', '', '.db'))
self.assertEqual(utils.parse_db_filename('lots_of_underscores.db'),
('lots', 'of', '.db'))
def test_make_db_file_path(self):
epoch = utils.Timestamp.now()
actual = utils.make_db_file_path('hash.db', epoch)
self.assertEqual('hash_%s.db' % epoch.internal, actual)
actual = utils.make_db_file_path('hash_oldepoch.db', epoch)
self.assertEqual('hash_%s.db' % epoch.internal, actual)
actual = utils.make_db_file_path('/path/to/hash.db', epoch)
self.assertEqual('/path/to/hash_%s.db' % epoch.internal, actual)
epoch = utils.Timestamp.now()
actual = utils.make_db_file_path(actual, epoch)
self.assertEqual('/path/to/hash_%s.db' % epoch.internal, actual)
# epochs shouldn't have offsets
epoch = utils.Timestamp.now(offset=10)
actual = utils.make_db_file_path(actual, epoch)
self.assertEqual('/path/to/hash_%s.db' % epoch.normal, actual)
self.assertRaises(ValueError, utils.make_db_file_path,
'/path/to/hash.db', 'bad epoch')
self.assertRaises(ValueError, utils.make_db_file_path,
'/path/to/hash.db', None)
def test_modify_priority(self):
pid = os.getpid()
logger = debug_logger()
@ -4168,6 +4209,70 @@ cluster_dfw1 = http://dfw1.host/v1/
# iterators
self.assertListEqual([1, 4, 6, 2, 5, 7, 3, 8, 9], got)
@with_tempdir
def test_get_db_files(self, tempdir):
dbdir = os.path.join(tempdir, 'dbdir')
self.assertEqual([], utils.get_db_files(dbdir))
path_1 = os.path.join(dbdir, 'dbfile.db')
self.assertEqual([], utils.get_db_files(path_1))
os.mkdir(dbdir)
self.assertEqual([], utils.get_db_files(path_1))
with open(path_1, 'wb'):
pass
self.assertEqual([path_1], utils.get_db_files(path_1))
path_2 = os.path.join(dbdir, 'dbfile_2.db')
self.assertEqual([path_1], utils.get_db_files(path_2))
with open(path_2, 'wb'):
pass
self.assertEqual([path_1, path_2], utils.get_db_files(path_1))
self.assertEqual([path_1, path_2], utils.get_db_files(path_2))
path_3 = os.path.join(dbdir, 'dbfile_3.db')
self.assertEqual([path_1, path_2], utils.get_db_files(path_3))
with open(path_3, 'wb'):
pass
self.assertEqual([path_1, path_2, path_3], utils.get_db_files(path_1))
self.assertEqual([path_1, path_2, path_3], utils.get_db_files(path_2))
self.assertEqual([path_1, path_2, path_3], utils.get_db_files(path_3))
other_hash = os.path.join(dbdir, 'other.db')
self.assertEqual([], utils.get_db_files(other_hash))
other_hash = os.path.join(dbdir, 'other_1.db')
self.assertEqual([], utils.get_db_files(other_hash))
pending = os.path.join(dbdir, 'dbfile.pending')
self.assertEqual([path_1, path_2, path_3], utils.get_db_files(pending))
with open(pending, 'wb'):
pass
self.assertEqual([path_1, path_2, path_3], utils.get_db_files(pending))
self.assertEqual([path_1, path_2, path_3], utils.get_db_files(path_1))
self.assertEqual([path_1, path_2, path_3], utils.get_db_files(path_2))
self.assertEqual([path_1, path_2, path_3], utils.get_db_files(path_3))
self.assertEqual([], utils.get_db_files(dbdir))
os.unlink(path_1)
self.assertEqual([path_2, path_3], utils.get_db_files(path_1))
self.assertEqual([path_2, path_3], utils.get_db_files(path_2))
self.assertEqual([path_2, path_3], utils.get_db_files(path_3))
os.unlink(path_2)
self.assertEqual([path_3], utils.get_db_files(path_1))
self.assertEqual([path_3], utils.get_db_files(path_2))
self.assertEqual([path_3], utils.get_db_files(path_3))
os.unlink(path_3)
self.assertEqual([], utils.get_db_files(path_1))
self.assertEqual([], utils.get_db_files(path_2))
self.assertEqual([], utils.get_db_files(path_3))
self.assertEqual([], utils.get_db_files('/path/to/nowhere'))
class ResellerConfReader(unittest.TestCase):

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1460,7 +1460,7 @@ class TestContainerController(unittest.TestCase):
self.assertEqual(True, db.is_deleted())
# now save a copy of this db (and remove it from the "current node")
db = self.controller._get_container_broker('sda1', 'p', 'a', 'c')
db_path = db.db_file
db_path = db._db_file
other_path = os.path.join(self.testdir, 'othernode.db')
os.rename(db_path, other_path)
# that should make it missing on this node
@ -1474,6 +1474,8 @@ class TestContainerController(unittest.TestCase):
def mock_exists(db_path):
rv = _real_exists(db_path)
if db_path != db._db_file:
return rv
if not mock_called:
# be as careful as we might hope backend replication can be...
with lock_parent_directory(db_path, timeout=1):