Address review comments
Also: - switch a few more "pivot"s to "shard"s, and - use Timestamp iters more than relying on "now". Change-Id: I4b6622a7212efd3e63787a523744e8e9859be905
This commit is contained in:
parent
0d42f7fe09
commit
462754d3ea
|
@ -338,9 +338,9 @@ use = egg:swift#xprofile
|
|||
# shard_shrink_merge_point = 75
|
||||
#
|
||||
# When sharding, one primary node becomes the scanner and scans for pivot
|
||||
# points. In one shard pass a scanner will search for up to
|
||||
# shard_scanner_batch_size pivots before moving to the next container and
|
||||
# will find more on the next pass.
|
||||
# points. In one shard pass a scanner identifies up to shard_scanner_batch_size
|
||||
# shard ranges before moving to the next container and will find more on the
|
||||
# next pass.
|
||||
# shard_scanner_batch_size = 10
|
||||
#
|
||||
# When sharding how many pivots should it shard in one shard pass before moving
|
||||
|
|
|
@ -83,7 +83,7 @@ class GatekeeperMiddleware(object):
|
|||
# Remove sharding-specific container-server query param.
|
||||
# External clients shouldn't need to worry about shards.
|
||||
try:
|
||||
ver, _acct, _cont = req.split_path(3, 3)
|
||||
ver, _acct, _cont = req.split_path(3)
|
||||
if not valid_api_version(ver):
|
||||
raise ValueError
|
||||
except ValueError:
|
||||
|
|
|
@ -1800,7 +1800,7 @@ class ContainerBroker(DatabaseBroker):
|
|||
(None, None))
|
||||
if path is None:
|
||||
return self.account, self.container
|
||||
if path.count('/') != 1:
|
||||
if path.count('/') != 1 or path.strip('/').count('/') == 0:
|
||||
raise ValueError('Expected X-Container-Sysmeta-Shard-Root to be '
|
||||
"of the form 'account/container', got %r" % path)
|
||||
return tuple(path.split('/'))
|
||||
|
@ -1857,7 +1857,7 @@ class ContainerBroker(DatabaseBroker):
|
|||
:param limit: the maximum number of shard points to be found; a
|
||||
negative value (default) implies no limit.
|
||||
:return: a tuple; the first value in the tuple is a list of
|
||||
:class:`swift.common.utils.ShardRange` instances in object name
|
||||
:class:`~swift.common.utils.ShardRange` instances in object name
|
||||
order, the second value is a boolean which is True if the last
|
||||
shard range has been found, False otherwise.
|
||||
"""
|
||||
|
|
|
@ -332,9 +332,9 @@ class ContainerController(BaseStorageServer):
|
|||
req.headers.get('x-backend-shard-upper'))
|
||||
else:
|
||||
# redirect if a shard range exists for the object name
|
||||
res = self._find_shard_location(req, broker, obj)
|
||||
if res:
|
||||
return res
|
||||
redirect = self._find_shard_location(req, broker, obj)
|
||||
if redirect:
|
||||
return redirect
|
||||
|
||||
broker.delete_object(obj, req.headers.get('x-timestamp'),
|
||||
obj_policy_index)
|
||||
|
@ -448,7 +448,7 @@ class ContainerController(BaseStorageServer):
|
|||
req.headers.get('x-backend-shard-bytes'))
|
||||
|
||||
else:
|
||||
# redirect if a shard exits for this object name
|
||||
# redirect if a shard exists for this object name
|
||||
redirect = self._find_shard_location(req, broker, obj)
|
||||
if redirect:
|
||||
return redirect
|
||||
|
@ -530,7 +530,7 @@ class ContainerController(BaseStorageServer):
|
|||
|
||||
def merge_items(old_items, new_items, reverse=False):
|
||||
# TODO: this method should compare timestamps and not assume that
|
||||
# any item in the pivot db is newer (across data, content-type and
|
||||
# any item in the shard db is newer (across data, content-type and
|
||||
# metadata) than an item in the old db.
|
||||
if old_items and isinstance(old_items[0], dict):
|
||||
# TODO (acoles): does this condition ever occur?
|
||||
|
@ -652,6 +652,7 @@ class ContainerController(BaseStorageServer):
|
|||
elif info.get('db_state') == DB_STATE_SHARDING:
|
||||
# Container is sharding, so we need to look at both brokers
|
||||
# TODO: will we ever want items=all to be supported in this case?
|
||||
# TODO: what happened to reverse?
|
||||
resp_headers, container_list = self._check_local_brokers(
|
||||
req, broker, resp_headers, marker, end_marker, prefix, limit)
|
||||
else:
|
||||
|
|
|
@ -657,7 +657,7 @@ class ContainerSharder(ContainerReplicator):
|
|||
headers_out['user-agent'] = 'container-sharder %s' % \
|
||||
os.getpid()
|
||||
if 'X-Timestamp' not in headers_out:
|
||||
headers_out['X-Timestamp'] = Timestamp(time.time()).normal
|
||||
headers_out['X-Timestamp'] = Timestamp.now().normal
|
||||
try:
|
||||
with ConnectionTimeout(self.conn_timeout):
|
||||
conn = http_connect(ip, port, contdevice, partition,
|
||||
|
@ -1019,7 +1019,7 @@ class ContainerSharder(ContainerReplicator):
|
|||
upper = None
|
||||
|
||||
return ShardRange(container, timestamp, lower, upper,
|
||||
meta_timestamp=Timestamp(time.time()).internal)
|
||||
meta_timestamp=Timestamp.now().internal)
|
||||
|
||||
def _shrink_phase_2(self, broker, root_account, root_container):
|
||||
# We've set metadata last phase. lets make sure it's still the case.
|
||||
|
@ -1080,7 +1080,7 @@ class ContainerSharder(ContainerReplicator):
|
|||
self.logger.warning(str(duex))
|
||||
return
|
||||
|
||||
timestamp = Timestamp(time.time()).internal
|
||||
timestamp = Timestamp.now().internal
|
||||
info = new_broker.get_info()
|
||||
merge_piv = ShardRange(
|
||||
merge_range.name, timestamp, merge_range.lower,
|
||||
|
@ -1139,11 +1139,9 @@ class ContainerSharder(ContainerReplicator):
|
|||
while True:
|
||||
objects = broker.get_objects(CONTAINER_LISTING_LIMIT, **qry)
|
||||
broker_to_update.merge_items(objects)
|
||||
|
||||
if len(objects) >= CONTAINER_LISTING_LIMIT:
|
||||
qry['marker'] = objects[-1]['name']
|
||||
else:
|
||||
if len(objects) < CONTAINER_LISTING_LIMIT:
|
||||
break
|
||||
qry['marker'] = objects[-1]['name']
|
||||
|
||||
def _sharding_complete(self, root_account, root_container, broker):
|
||||
broker.set_sharded_state()
|
||||
|
@ -1151,7 +1149,7 @@ class ContainerSharder(ContainerReplicator):
|
|||
# We aren't in the root container.
|
||||
self._update_shard_ranges(root_account, root_container, 'PUT',
|
||||
broker.get_shard_ranges())
|
||||
timestamp = Timestamp(time.time()).internal
|
||||
timestamp = Timestamp.now().internal
|
||||
shard_range = broker.get_own_shard_range()
|
||||
shard_range.timestamp = timestamp
|
||||
self._update_shard_ranges(root_account, root_container, 'DELETE',
|
||||
|
@ -1255,7 +1253,7 @@ class ContainerSharder(ContainerReplicator):
|
|||
info = new_broker.get_info()
|
||||
shard_range.object_count = info['object_count']
|
||||
shard_range.bytes_used = info['bytes_used']
|
||||
shard_range.meta_timestamp = Timestamp(time.time())
|
||||
shard_range.meta_timestamp = Timestamp.now()
|
||||
if not hasattr(shard_range, 'dont_save'):
|
||||
ranges_done.append(shard_range)
|
||||
|
||||
|
|
|
@ -262,7 +262,7 @@ class ObjectController(BaseStorageServer):
|
|||
logging information.
|
||||
:param container_path: optional path in the form `<account/container>`
|
||||
to which the update should be sent. If given this path will be used
|
||||
instead of constructing a path from the the ``account`` and
|
||||
instead of constructing a path from the ``account`` and
|
||||
``container`` params.
|
||||
"""
|
||||
if logger_thread_locals:
|
||||
|
|
|
@ -1997,9 +1997,9 @@ class Controller(object):
|
|||
return []
|
||||
|
||||
try:
|
||||
pivots = json.loads(response.body)
|
||||
return [ShardRange.from_dict(pivot_range)
|
||||
for pivot_range in pivots]
|
||||
shard_ranges = json.loads(response.body)
|
||||
return [ShardRange.from_dict(shard_range)
|
||||
for shard_range in shard_ranges]
|
||||
except (ValueError, TypeError, KeyError) as err:
|
||||
self.app.logger.exception(
|
||||
"Problem decoding shard ranges in response from %s: %s",
|
||||
|
|
|
@ -3697,7 +3697,7 @@ cluster_dfw1 = http://dfw1.host/v1/
|
|||
shutil.rmtree(tempdir)
|
||||
|
||||
def test_find_shard_range(self):
|
||||
ts = utils.Timestamp(time.time()).internal
|
||||
ts = utils.Timestamp.now().internal
|
||||
start = utils.ShardRange('-a', ts, '', 'a')
|
||||
atof = utils.ShardRange('a-f', ts, 'a', 'f')
|
||||
ftol = utils.ShardRange('f-l', ts, 'f', 'l')
|
||||
|
@ -6510,7 +6510,7 @@ class TestShardRange(unittest.TestCase):
|
|||
def setUp(self):
|
||||
self.ts_iter = make_timestamp_iter()
|
||||
|
||||
def test_pivot_range_initialisation(self):
|
||||
def test_shard_range_initialisation(self):
|
||||
def assert_initialisation_ok(params, expected):
|
||||
pr = utils.ShardRange(**params)
|
||||
self.assertDictEqual(dict(pr), expected)
|
||||
|
@ -6631,7 +6631,7 @@ class TestShardRange(unittest.TestCase):
|
|||
self.assertEqual(ts_4, pr.timestamp)
|
||||
self.assertEqual(ts_4, pr.meta_timestamp)
|
||||
|
||||
def test_pivot_range(self):
|
||||
def test_shard_range(self):
|
||||
# first test infinite range (no boundries)
|
||||
inf_pr = utils.ShardRange(name='test', timestamp=utils.Timestamp.now())
|
||||
self.assertEqual('', inf_pr.upper)
|
||||
|
@ -6646,7 +6646,7 @@ class TestShardRange(unittest.TestCase):
|
|||
for x in ('a', 'z', 'zzzz', '124fsdf', '', 1234):
|
||||
self.assertTrue(x in inf_pr)
|
||||
|
||||
ts = utils.Timestamp(time.time()).internal
|
||||
ts = utils.Timestamp.now().internal
|
||||
|
||||
# upper (if provided) *must* be greater than lower
|
||||
with self.assertRaises(ValueError):
|
||||
|
|
|
@ -237,9 +237,10 @@ class TestContainerBroker(unittest.TestCase):
|
|||
|
||||
def test_delete_shard(self):
|
||||
# Test ContainerBroker.delete_object
|
||||
ts_iter = make_timestamp_iter()
|
||||
broker = ContainerBroker(':memory:', account='a', container='c')
|
||||
broker.initialize(Timestamp('1').internal, 0)
|
||||
broker.put_shard('o', Timestamp(time()).internal)
|
||||
broker.put_shard('o', next(ts_iter).internal)
|
||||
with broker.get() as conn:
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT count(*) FROM shard_ranges "
|
||||
|
@ -247,8 +248,7 @@ class TestContainerBroker(unittest.TestCase):
|
|||
self.assertEqual(conn.execute(
|
||||
"SELECT count(*) FROM shard_ranges "
|
||||
"WHERE deleted = 1").fetchone()[0], 0)
|
||||
sleep(.00001)
|
||||
broker.delete_shard('o', Timestamp(time()).internal)
|
||||
broker.delete_shard('o', next(ts_iter).internal)
|
||||
with broker.get() as conn:
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT count(*) FROM shard_ranges "
|
||||
|
@ -461,9 +461,14 @@ class TestContainerBroker(unittest.TestCase):
|
|||
broker = ContainerBroker(':memory:', account='a', container='c')
|
||||
broker.initialize(Timestamp('1').internal, 0)
|
||||
|
||||
ts_iter = make_timestamp_iter()
|
||||
# Stash these for later
|
||||
old_put_timestamp = next(ts_iter).internal
|
||||
old_delete_timestamp = next(ts_iter).internal
|
||||
|
||||
# Create initial object
|
||||
timestamp = Timestamp(time()).internal
|
||||
meta_timestamp = Timestamp(time()).internal
|
||||
timestamp = next(ts_iter).internal
|
||||
meta_timestamp = next(ts_iter).internal
|
||||
broker.put_shard('"{<shardrange \'&\' name>}"', timestamp,
|
||||
meta_timestamp, lower='low', upper='up')
|
||||
with broker.get() as conn:
|
||||
|
@ -512,9 +517,8 @@ class TestContainerBroker(unittest.TestCase):
|
|||
"SELECT bytes_used FROM shard_ranges").fetchone()[0], 0)
|
||||
|
||||
# Put new event
|
||||
sleep(.00001)
|
||||
timestamp = Timestamp(time()).internal
|
||||
meta_timestamp = Timestamp(time()).internal
|
||||
timestamp = next(ts_iter).internal
|
||||
meta_timestamp = next(ts_iter).internal
|
||||
broker.put_shard('"{<shardrange \'&\' name>}"', timestamp,
|
||||
meta_timestamp, lower='lower', upper='upper',
|
||||
object_count=1, bytes_used=2)
|
||||
|
@ -540,8 +544,7 @@ class TestContainerBroker(unittest.TestCase):
|
|||
"SELECT bytes_used FROM shard_ranges").fetchone()[0], 2)
|
||||
|
||||
# Put old event
|
||||
otimestamp = Timestamp(float(Timestamp(timestamp)) - 1).internal
|
||||
broker.put_shard('"{<shardrange \'&\' name>}"', otimestamp,
|
||||
broker.put_shard('"{<shardrange \'&\' name>}"', old_put_timestamp,
|
||||
meta_timestamp, lower='lower', upper='upper',
|
||||
object_count=1, bytes_used=2)
|
||||
with broker.get() as conn:
|
||||
|
@ -550,7 +553,7 @@ class TestContainerBroker(unittest.TestCase):
|
|||
'"{<shardrange \'&\' name>}"')
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT created_at FROM shard_ranges").fetchone()[0],
|
||||
timestamp)
|
||||
timestamp) # Not old_put_timestamp!
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT meta_timestamp FROM shard_ranges").fetchone()[0],
|
||||
meta_timestamp)
|
||||
|
@ -566,8 +569,7 @@ class TestContainerBroker(unittest.TestCase):
|
|||
"SELECT bytes_used FROM shard_ranges").fetchone()[0], 2)
|
||||
|
||||
# Put old delete event
|
||||
dtimestamp = Timestamp(float(Timestamp(timestamp)) - 1).internal
|
||||
broker.put_shard('"{<shardrange \'&\' name>}"', dtimestamp,
|
||||
broker.put_shard('"{<shardrange \'&\' name>}"', old_delete_timestamp,
|
||||
meta_timestamp, lower='lower', upper='upper',
|
||||
deleted=1)
|
||||
with broker.get() as conn:
|
||||
|
@ -576,7 +578,7 @@ class TestContainerBroker(unittest.TestCase):
|
|||
'"{<shardrange \'&\' name>}"')
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT created_at FROM shard_ranges").fetchone()[0],
|
||||
timestamp)
|
||||
timestamp) # Not old_delete_timestamp!
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT meta_timestamp FROM shard_ranges").fetchone()[0],
|
||||
meta_timestamp)
|
||||
|
@ -592,8 +594,7 @@ class TestContainerBroker(unittest.TestCase):
|
|||
"SELECT bytes_used FROM shard_ranges").fetchone()[0], 2)
|
||||
|
||||
# Put new delete event
|
||||
sleep(.00001)
|
||||
timestamp = Timestamp(time()).internal
|
||||
timestamp = next(ts_iter).internal
|
||||
broker.put_shard('"{<shardrange \'&\' name>}"', timestamp,
|
||||
meta_timestamp, lower='lower', upper='upper',
|
||||
deleted=1)
|
||||
|
@ -608,9 +609,8 @@ class TestContainerBroker(unittest.TestCase):
|
|||
"SELECT deleted FROM shard_ranges").fetchone()[0], 1)
|
||||
|
||||
# Put new event
|
||||
sleep(.00001)
|
||||
timestamp = Timestamp(time()).internal
|
||||
meta_timestamp = Timestamp(time()).internal
|
||||
timestamp = next(ts_iter).internal
|
||||
meta_timestamp = next(ts_iter).internal
|
||||
broker.put_shard('"{<shardrange \'&\' name>}"', timestamp,
|
||||
meta_timestamp, lower='lowerer', upper='upperer',
|
||||
object_count=3, bytes_used=4)
|
||||
|
@ -636,15 +636,13 @@ class TestContainerBroker(unittest.TestCase):
|
|||
"SELECT bytes_used FROM shard_ranges").fetchone()[0], 4)
|
||||
|
||||
# We'll use this later
|
||||
sleep(.0001)
|
||||
in_between_timestamp = Timestamp(time()).internal
|
||||
in_between_timestamp = next(ts_iter).internal
|
||||
|
||||
# New post event
|
||||
sleep(.0001)
|
||||
previous_timestamp = timestamp
|
||||
timestamp = Timestamp(time()).internal
|
||||
timestamp = next(ts_iter).internal
|
||||
previous_meta_timestamp = meta_timestamp
|
||||
meta_timestamp = Timestamp(time()).internal
|
||||
meta_timestamp = next(ts_iter).internal
|
||||
with broker.get() as conn:
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT name FROM shard_ranges").fetchone()[0],
|
||||
|
@ -893,7 +891,7 @@ class TestContainerBroker(unittest.TestCase):
|
|||
db_shard_file = "%s_shard.db" % hsh
|
||||
db_path = os.path.join(tempdir, db_file)
|
||||
db_shard_path = os.path.join(tempdir, db_shard_file)
|
||||
ts = Timestamp(time())
|
||||
ts = Timestamp.now()
|
||||
|
||||
# First test NOTFOUND state
|
||||
broker = ContainerBroker(db_path, account=acct, container=cont)
|
||||
|
@ -931,7 +929,7 @@ class TestContainerBroker(unittest.TestCase):
|
|||
db_shard_file = "%s_shard.db" % hsh
|
||||
db_path = os.path.join(tempdir, db_file)
|
||||
db_shard_path = os.path.join(tempdir, db_shard_file)
|
||||
ts = Timestamp(time())
|
||||
ts = Timestamp.now()
|
||||
|
||||
# First test NOTFOUND state, this will return the default db_file
|
||||
broker = ContainerBroker(db_path, account=acct, container=cont)
|
||||
|
@ -2575,8 +2573,7 @@ class TestContainerBroker(unittest.TestCase):
|
|||
@with_tempdir
|
||||
def _check_find_shard_ranges(self, c_lower, c_upper, tempdir):
|
||||
ts_iter = make_timestamp_iter()
|
||||
now = time()
|
||||
ts_now = Timestamp(now)
|
||||
ts_now = Timestamp.now()
|
||||
container_name = 'test_container'
|
||||
|
||||
def do_test(expected_bounds, expected_last_found, shard_size, limit):
|
||||
|
@ -2593,7 +2590,7 @@ class TestContainerBroker(unittest.TestCase):
|
|||
expected_range_dicts.append(d)
|
||||
# call the method under test
|
||||
with mock.patch('swift.common.utils.time.time',
|
||||
return_value=now):
|
||||
return_value=float(ts_now.normal)):
|
||||
ranges, last_found = broker.find_shard_ranges(shard_size,
|
||||
limit)
|
||||
# verify results
|
||||
|
@ -2690,6 +2687,7 @@ class TestContainerBroker(unittest.TestCase):
|
|||
def test_set_sharding_states(self, tempdir):
|
||||
ts_iter = make_timestamp_iter()
|
||||
db_path = os.path.join(tempdir, 'container.db')
|
||||
new_db_path = os.path.join(tempdir, 'container_shard.db')
|
||||
broker = ContainerBroker(
|
||||
db_path, account='shard_a', container='shard_c')
|
||||
broker.initialize(next(ts_iter).internal, 0)
|
||||
|
@ -2706,6 +2704,14 @@ class TestContainerBroker(unittest.TestCase):
|
|||
} for i in range(5)]
|
||||
broker.merge_items(objects)
|
||||
|
||||
# Add some metadata
|
||||
meta = {
|
||||
'X-Container-Meta-Color': ['Blue', next(ts_iter).normal],
|
||||
'X-Container-Meta-Cleared': ['', next(ts_iter).normal],
|
||||
'X-Container-Sysmeta-Shape': ['Circle', next(ts_iter).normal],
|
||||
}
|
||||
broker.update_metadata(meta)
|
||||
|
||||
# Add some syncs
|
||||
incoming_sync = {'remote_id': 'incoming_123', 'sync_point': 1}
|
||||
outgoing_sync = {'remote_id': 'outgoing_123', 'sync_point': 2}
|
||||
|
@ -2721,47 +2727,79 @@ class TestContainerBroker(unittest.TestCase):
|
|||
|
||||
broker.merge_items(broker.shard_nodes_to_items(shard_ranges))
|
||||
|
||||
def check_broker_properties(broker):
|
||||
# these broker properties should remain unchanged as state changes
|
||||
self.assertEqual(broker.get_max_row(), 5)
|
||||
self.assertEqual(broker.metadata, meta)
|
||||
self.assertEqual(broker.get_syncs(True)[0], incoming_sync)
|
||||
self.assertEqual(broker.get_syncs(False)[0], outgoing_sync)
|
||||
self.assertEqual(broker.get_shard_ranges(), shard_ranges)
|
||||
|
||||
def check_unsharded_state(broker):
|
||||
# this are expected properties in unsharded state
|
||||
self.assertEqual(len(broker.get_brokers()), 1)
|
||||
self.assertEqual(broker.get_db_state(), DB_STATE_UNSHARDED)
|
||||
self.assertTrue(os.path.exists(db_path))
|
||||
self.assertFalse(os.path.exists(new_db_path))
|
||||
self.assertEqual(5, len(broker.list_objects_iter(
|
||||
100, None, None, None, None, include_deleted=True)))
|
||||
|
||||
# Sanity checks
|
||||
self.assertEqual(broker.get_max_row(), 5)
|
||||
self.assertDictEqual(broker.get_syncs(True)[0], incoming_sync)
|
||||
self.assertDictEqual(broker.get_syncs(False)[0], outgoing_sync)
|
||||
self.assertEqual(broker.get_shard_ranges(), shard_ranges)
|
||||
self.assertEqual(len(broker.get_brokers()), 1)
|
||||
self.assertEqual(broker.get_db_state(), DB_STATE_UNSHARDED)
|
||||
self.assertTrue(os.path.exists(os.path.join(tempdir, 'container.db')))
|
||||
self.assertFalse(os.path.exists(os.path.join(tempdir,
|
||||
'container_shard.db')))
|
||||
check_broker_properties(broker)
|
||||
check_unsharded_state(broker)
|
||||
|
||||
# first test that moving from UNSHARDED to SHARDED doesn't work
|
||||
self.assertFalse(broker.set_sharded_state())
|
||||
# check nothing changed
|
||||
check_broker_properties(broker)
|
||||
check_unsharded_state(broker)
|
||||
|
||||
# now set sharding state and make sure everything moves.
|
||||
broker.set_sharding_state()
|
||||
self.assertEqual(broker.get_max_row(), 5)
|
||||
self.assertDictEqual(broker.get_syncs(True)[0], incoming_sync)
|
||||
self.assertDictEqual(broker.get_syncs(False)[0], outgoing_sync)
|
||||
self.assertEqual(broker.get_shard_ranges(), shard_ranges)
|
||||
self.assertEqual(len(broker.get_brokers()), 2)
|
||||
self.assertEqual(broker.get_db_state(), DB_STATE_SHARDING)
|
||||
self.assertTrue(os.path.exists(os.path.join(tempdir, 'container.db')))
|
||||
self.assertTrue(os.path.exists(os.path.join(tempdir,
|
||||
'container_shard.db')))
|
||||
check_broker_properties(broker)
|
||||
|
||||
# to confirm were definitely looking at the shard db
|
||||
broker2 = ContainerBroker(broker._shard_db_file)
|
||||
self.assertEqual(broker2.get_max_row(), 5)
|
||||
self.assertDictEqual(broker2.get_syncs(True)[0], incoming_sync)
|
||||
self.assertDictEqual(broker2.get_syncs(False)[0], outgoing_sync)
|
||||
self.assertEqual(broker2.get_shard_ranges(), shard_ranges)
|
||||
def check_sharding_state(broker):
|
||||
self.assertEqual(len(broker.get_brokers()), 2)
|
||||
self.assertEqual(broker.get_db_state(), DB_STATE_SHARDING)
|
||||
self.assertTrue(os.path.exists(db_path))
|
||||
self.assertTrue(os.path.exists(new_db_path))
|
||||
self.assertEqual([], broker.list_objects_iter(
|
||||
100, None, None, None, None, include_deleted=True))
|
||||
check_sharding_state(broker)
|
||||
|
||||
# to confirm we're definitely looking at the shard db
|
||||
broker2 = ContainerBroker(new_db_path)
|
||||
# this one thinks it is already in DB_STATE_SHARDED because we opened
|
||||
# it with the _shard.db file
|
||||
self.assertEqual(broker2.get_db_state(), DB_STATE_SHARDED)
|
||||
check_broker_properties(broker2)
|
||||
self.assertEqual([], broker2.list_objects_iter(
|
||||
100, None, None, None, None, include_deleted=True))
|
||||
|
||||
# Try to set sharding state again
|
||||
self.assertFalse(broker.set_sharding_state())
|
||||
# check nothing changed
|
||||
check_broker_properties(broker)
|
||||
check_sharding_state(broker)
|
||||
|
||||
# Now move to the final state
|
||||
self.assertTrue(broker.set_sharded_state())
|
||||
self.assertFalse(os.path.exists(os.path.join(tempdir, 'container.db')))
|
||||
self.assertTrue(os.path.exists(os.path.join(tempdir,
|
||||
'container_shard.db')))
|
||||
check_broker_properties(broker)
|
||||
|
||||
def check_sharded_state(broker):
|
||||
self.assertEqual(broker.get_db_state(), DB_STATE_SHARDED)
|
||||
self.assertEqual(len(broker.get_brokers()), 1)
|
||||
self.assertFalse(os.path.exists(db_path))
|
||||
self.assertTrue(os.path.exists(new_db_path))
|
||||
self.assertEqual([], broker.list_objects_iter(
|
||||
100, None, None, None, None, include_deleted=True))
|
||||
check_sharded_state(broker)
|
||||
|
||||
# Try to set sharded state again
|
||||
self.assertFalse(broker.set_sharded_state())
|
||||
# check nothing changed
|
||||
check_broker_properties(broker)
|
||||
check_sharded_state(broker)
|
||||
|
||||
|
||||
class TestCommonContainerBroker(test_db.TestExampleBroker):
|
||||
|
|
|
@ -2247,7 +2247,7 @@ class TestContainerController(unittest.TestCase):
|
|||
broker = self.controller._get_container_broker('sda1', 'p', 'a', 'c')
|
||||
broker.set_sharding_state()
|
||||
|
||||
# these PUTS will land in the pivot db (no shard ranges yet)
|
||||
# these PUTS will land in the shard db (no shard ranges yet)
|
||||
for obj in objects[5:]:
|
||||
req = Request.blank('/sda1/p/a/c/%s' % obj['name'], method='PUT',
|
||||
headers=obj)
|
||||
|
@ -2300,7 +2300,7 @@ class TestContainerController(unittest.TestCase):
|
|||
expected[:2] + expected[4:6])
|
||||
with mock.patch('swift.common.constraints.CONTAINER_LISTING_LIMIT', 2):
|
||||
# mock listing limit to check that repeated calls are made to the
|
||||
# pivot db to replace old items that are found to be deleted
|
||||
# shard db to replace old items that are found to be deleted
|
||||
check_object_GET('/sda1/p/a/c?format=json&marker=obj_1&limit=2',
|
||||
[expected[1], expected[4]])
|
||||
|
||||
|
|
|
@ -14,7 +14,6 @@
|
|||
# limitations under the License.
|
||||
import os
|
||||
import shutil
|
||||
import time
|
||||
from contextlib import contextmanager
|
||||
from tempfile import mkdtemp
|
||||
|
||||
|
@ -34,10 +33,11 @@ from test.unit import FakeLogger, debug_logger, FakeRing, make_timestamp_iter
|
|||
|
||||
class TestRangeAnalyser(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.ts_iter = make_timestamp_iter()
|
||||
self.ranges = self._default_ranges()
|
||||
|
||||
def _default_ranges(self):
|
||||
ts = Timestamp(time.time()).internal
|
||||
ts = next(self.ts_iter).internal
|
||||
|
||||
ranges = [
|
||||
ShardRange('-d', ts, '', 'd'),
|
||||
|
@ -53,7 +53,7 @@ class TestRangeAnalyser(unittest.TestCase):
|
|||
return ranges
|
||||
|
||||
def test_simple_shard(self):
|
||||
ts = Timestamp(time.time()).internal
|
||||
ts = next(self.ts_iter).internal
|
||||
|
||||
# This simulate a shard sharding by having an older 'n-p' and
|
||||
# newer split 'n-o' and 'o-p'
|
||||
|
@ -81,7 +81,7 @@ class TestRangeAnalyser(unittest.TestCase):
|
|||
self.assertSetEqual(expected_best_path, set(path))
|
||||
|
||||
def test_2_paths_diverge_and_then_join(self):
|
||||
ts = Timestamp(time.time()).internal
|
||||
ts = next(self.ts_iter).internal
|
||||
|
||||
# second scanner that joins back up ( ie runs and dumps into
|
||||
# ShardRanges before the other scanner has a go and so takes off where
|
||||
|
@ -111,7 +111,7 @@ class TestRangeAnalyser(unittest.TestCase):
|
|||
self.assertSetEqual(expected_best_path, set(path))
|
||||
|
||||
def test_2_paths_diverge_older_ends_in_gap(self):
|
||||
ts = Timestamp(time.time()).internal
|
||||
ts = next(self.ts_iter).internal
|
||||
|
||||
# second scanner that joins back up ( ie runs and dumps into
|
||||
# ShardRanges before the other scanner has a go and so takes off where
|
||||
|
@ -143,7 +143,7 @@ class TestRangeAnalyser(unittest.TestCase):
|
|||
self.assertSetEqual(expected_best_path, set(path))
|
||||
|
||||
def test_2_paths_diverge_newer_ends_in_gap(self):
|
||||
ts = Timestamp(time.time()).internal
|
||||
ts = next(self.ts_iter).internal
|
||||
|
||||
# second scanner that joins back up ( ie runs and dumps into
|
||||
# ShardRanges before the other scanner has a go and so takes off where
|
||||
|
@ -175,7 +175,7 @@ class TestRangeAnalyser(unittest.TestCase):
|
|||
self.assertSetEqual(expected_best_path[i], set(path))
|
||||
|
||||
def test_2_paths_diverge_different_ends(self):
|
||||
ts = Timestamp(time.time()).internal
|
||||
ts = next(self.ts_iter).internal
|
||||
|
||||
# To the end with different paths
|
||||
overlap_without_gaps = [
|
||||
|
@ -205,7 +205,7 @@ class TestRangeAnalyser(unittest.TestCase):
|
|||
self.assertSetEqual(expected_best_path[i], set(path))
|
||||
|
||||
def test_2_paths_diverge_different_ends_gap_in_newer(self):
|
||||
ts = Timestamp(time.time()).internal
|
||||
ts = next(self.ts_iter).internal
|
||||
|
||||
# To the end with different paths
|
||||
overlap_without_gaps = [
|
||||
|
@ -234,7 +234,7 @@ class TestRangeAnalyser(unittest.TestCase):
|
|||
self.assertSetEqual(expected_best_path[i], set(path))
|
||||
|
||||
def test_tiebreak_newest_difference_wins(self):
|
||||
ts = Timestamp(time.time()).internal
|
||||
ts = next(self.ts_iter).internal
|
||||
|
||||
# second scanner that joins back up ( ie runs and dumps into
|
||||
# ShardRange before the other scanner has a go and so takes off where
|
||||
|
@ -252,7 +252,7 @@ class TestRangeAnalyser(unittest.TestCase):
|
|||
|
||||
# make a shard range in both paths newer then any of the difference to
|
||||
# force a tie break situation
|
||||
self.ranges[2].timestamp = Timestamp(time.time()).internal
|
||||
self.ranges[2].timestamp = next(self.ts_iter).internal
|
||||
self.ranges.extend(overlap_without_gaps)
|
||||
self.ranges.sort()
|
||||
|
||||
|
@ -267,7 +267,7 @@ class TestRangeAnalyser(unittest.TestCase):
|
|||
self.assertSetEqual(expected_best_path[i], set(path))
|
||||
|
||||
def test_tiebreak_newest_difference_wins_1_with_gap(self):
|
||||
ts = Timestamp(time.time()).internal
|
||||
ts = next(self.ts_iter).internal
|
||||
|
||||
# second scanner that joins back up ( ie runs and dumps into
|
||||
# ShardRanges before the other scanner has a go and so takes off where
|
||||
|
@ -285,7 +285,7 @@ class TestRangeAnalyser(unittest.TestCase):
|
|||
|
||||
# make a shard range in both paths newer then any of the difference to
|
||||
# force a tie break situation
|
||||
self.ranges[2].timestamp = Timestamp(time.time()).internal
|
||||
self.ranges[2].timestamp = next(self.ts_iter).internal
|
||||
self.ranges.extend(overlap_with_gaps)
|
||||
self.ranges.sort()
|
||||
|
||||
|
@ -546,7 +546,7 @@ class TestSharder(unittest.TestCase):
|
|||
self.assertFalse(os.path.exists(expected_shard_dbs[3]))
|
||||
|
||||
broker.merge_items(
|
||||
[dict(shard_range, deleted=0, storage_policy_index=0,
|
||||
[dict(shard_range, deleted=0,
|
||||
record_type=RECORD_TYPE_SHARD_NODE)
|
||||
for shard_range in initial_shard_ranges[:3]])
|
||||
|
||||
|
@ -1056,8 +1056,7 @@ class TestSharder(unittest.TestCase):
|
|||
broker.put_object(*obj)
|
||||
broker.get_info() # force updates to be committed
|
||||
# sanity check the puts landed in sharded broker
|
||||
self._check_objects(new_objects[:2] + objects[2:5] + new_objects[2:],
|
||||
broker.db_file)
|
||||
self._check_objects(sorted(new_objects + objects[2:5]), broker.db_file)
|
||||
with self._mock_sharder() as sharder:
|
||||
sharder._get_shard_ranges = lambda *a, **k: root_shard_ranges
|
||||
sharder._misplaced_objects(broker, node, 'a', 'c', own_sr)
|
||||
|
|
|
@ -5330,6 +5330,9 @@ class TestObjectController(unittest.TestCase):
|
|||
# TODO: should these cases trigger a 400 response rather than
|
||||
# defaulting to root path?
|
||||
do_test('garbage', 'a/c', None)
|
||||
do_test('/', 'a/c', None)
|
||||
do_test('/no-acct', 'a/c', None)
|
||||
do_test('no-cont/', 'a/c', None)
|
||||
do_test('too/many/parts', 'a/c', None)
|
||||
do_test('/leading/slash', 'a/c', None)
|
||||
|
||||
|
|
Loading…
Reference in New Issue