Sharding: Use the metadata timestamp as last_modified
This is a follow up patch from the cleaning up cleave context's patch (patch 681970). Instead of tracking a last_modified timestamp, and storing it in the context metadata, use the timestamp we use when storing any metadata. Reducing duplication is nice, but there's a more significant reason to do this: affected container DBs can start getting cleaned up as soon as they're running the new code rather than needing to wait for an additional reclaim_age. Change-Id: I2cdbe11f06ffb5574e573c4a60ba4e5d41a00c50
This commit is contained in:
parent
81a41da542
commit
370ac4cd70
@ -1996,20 +1996,24 @@ class ContainerBroker(DatabaseBroker):
|
|||||||
self.update_metadata({'X-Container-Sysmeta-Shard-' + key:
|
self.update_metadata({'X-Container-Sysmeta-Shard-' + key:
|
||||||
(value, Timestamp.now().internal)})
|
(value, Timestamp.now().internal)})
|
||||||
|
|
||||||
def get_sharding_sysmeta(self, key=None):
|
def get_sharding_sysmeta(self, key=None, include_timestamps=False):
|
||||||
"""
|
"""
|
||||||
Returns sharding specific info from the broker's metadata.
|
Returns sharding specific info from the broker's metadata.
|
||||||
|
|
||||||
:param key: if given the value stored under ``key`` in the sharding
|
:param key: if given the value stored under ``key`` in the sharding
|
||||||
info will be returned.
|
info will be returned.
|
||||||
|
:param include_timestamps: bool, return the metadata timestamps
|
||||||
|
along with the metadata. I.e last modified timestamp.
|
||||||
:return: either a dict of sharding info or the value stored under
|
:return: either a dict of sharding info or the value stored under
|
||||||
``key`` in that dict.
|
``key`` in that dict.
|
||||||
"""
|
"""
|
||||||
prefix = 'X-Container-Sysmeta-Shard-'
|
prefix = 'X-Container-Sysmeta-Shard-'
|
||||||
metadata = self.metadata
|
metadata = self.metadata
|
||||||
info = dict((k[len(prefix):], v[0]) for
|
info = dict((k[len(prefix):], v if include_timestamps else v[0])
|
||||||
k, v in metadata.items() if k.startswith(prefix))
|
for k, v in metadata.items() if k.startswith(prefix))
|
||||||
if key:
|
if key:
|
||||||
|
if include_timestamps:
|
||||||
|
return info.get(key, (None, None))
|
||||||
return info.get(key)
|
return info.get(key)
|
||||||
return info
|
return info
|
||||||
|
|
||||||
|
@ -197,8 +197,7 @@ def find_shrinking_candidates(broker, shrink_threshold, merge_size):
|
|||||||
class CleavingContext(object):
|
class CleavingContext(object):
|
||||||
def __init__(self, ref, cursor='', max_row=None, cleave_to_row=None,
|
def __init__(self, ref, cursor='', max_row=None, cleave_to_row=None,
|
||||||
last_cleave_to_row=None, cleaving_done=False,
|
last_cleave_to_row=None, cleaving_done=False,
|
||||||
misplaced_done=False, ranges_done=0, ranges_todo=0,
|
misplaced_done=False, ranges_done=0, ranges_todo=0):
|
||||||
last_modified=None):
|
|
||||||
self.ref = ref
|
self.ref = ref
|
||||||
self._cursor = None
|
self._cursor = None
|
||||||
self.cursor = cursor
|
self.cursor = cursor
|
||||||
@ -209,7 +208,6 @@ class CleavingContext(object):
|
|||||||
self.misplaced_done = misplaced_done
|
self.misplaced_done = misplaced_done
|
||||||
self.ranges_done = ranges_done
|
self.ranges_done = ranges_done
|
||||||
self.ranges_todo = ranges_todo
|
self.ranges_todo = ranges_todo
|
||||||
self.last_modified = last_modified
|
|
||||||
|
|
||||||
def __iter__(self):
|
def __iter__(self):
|
||||||
yield 'ref', self.ref
|
yield 'ref', self.ref
|
||||||
@ -221,7 +219,6 @@ class CleavingContext(object):
|
|||||||
yield 'misplaced_done', self.misplaced_done
|
yield 'misplaced_done', self.misplaced_done
|
||||||
yield 'ranges_done', self.ranges_done
|
yield 'ranges_done', self.ranges_done
|
||||||
yield 'ranges_todo', self.ranges_todo
|
yield 'ranges_todo', self.ranges_todo
|
||||||
yield 'last_modified', self.last_modified
|
|
||||||
|
|
||||||
def _encode(cls, value):
|
def _encode(cls, value):
|
||||||
if value is not None and six.PY2 and isinstance(value, six.text_type):
|
if value is not None and six.PY2 and isinstance(value, six.text_type):
|
||||||
@ -250,17 +247,17 @@ class CleavingContext(object):
|
|||||||
Returns all cleaving contexts stored in the broker.
|
Returns all cleaving contexts stored in the broker.
|
||||||
|
|
||||||
:param broker:
|
:param broker:
|
||||||
:return: list of CleavingContexts
|
:return: list of tuples of (CleavingContext, timestamp)
|
||||||
"""
|
"""
|
||||||
brokers = broker.get_brokers()
|
brokers = broker.get_brokers()
|
||||||
sysmeta = brokers[-1].get_sharding_sysmeta()
|
sysmeta = brokers[-1].get_sharding_sysmeta(include_timestamps=True)
|
||||||
|
|
||||||
for key, val in sysmeta.items():
|
for key, (val, timestamp) in sysmeta.items():
|
||||||
# If the value is of length 0, then the metadata is
|
# If the value is of length 0, then the metadata is
|
||||||
# marked for deletion
|
# marked for deletion
|
||||||
if key.startswith("Context-") and len(val) > 0:
|
if key.startswith("Context-") and len(val) > 0:
|
||||||
try:
|
try:
|
||||||
yield cls(**json.loads(val))
|
yield cls(**json.loads(val)), timestamp
|
||||||
except ValueError:
|
except ValueError:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@ -288,7 +285,6 @@ class CleavingContext(object):
|
|||||||
return cls(**data)
|
return cls(**data)
|
||||||
|
|
||||||
def store(self, broker):
|
def store(self, broker):
|
||||||
self.last_modified = Timestamp.now().internal
|
|
||||||
broker.set_sharding_sysmeta('Context-' + self.ref,
|
broker.set_sharding_sysmeta('Context-' + self.ref,
|
||||||
json.dumps(dict(self)))
|
json.dumps(dict(self)))
|
||||||
|
|
||||||
@ -754,12 +750,9 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
def _audit_cleave_contexts(self, broker):
|
def _audit_cleave_contexts(self, broker):
|
||||||
for context in CleavingContext.load_all(broker):
|
for context, last_mod in CleavingContext.load_all(broker):
|
||||||
now = Timestamp.now()
|
now = Timestamp.now()
|
||||||
last_mod = context.last_modified
|
if Timestamp(last_mod).timestamp + self.reclaim_age < \
|
||||||
if not last_mod:
|
|
||||||
context.store(broker)
|
|
||||||
elif Timestamp(last_mod).timestamp + self.reclaim_age < \
|
|
||||||
now.timestamp:
|
now.timestamp:
|
||||||
context.delete(broker)
|
context.delete(broker)
|
||||||
|
|
||||||
|
@ -201,7 +201,6 @@ class TestManageShardRanges(unittest.TestCase):
|
|||||||
' "cleaving_done": false,',
|
' "cleaving_done": false,',
|
||||||
' "cursor": "",',
|
' "cursor": "",',
|
||||||
' "last_cleave_to_row": null,',
|
' "last_cleave_to_row": null,',
|
||||||
' "last_modified": null,',
|
|
||||||
' "max_row": -1,',
|
' "max_row": -1,',
|
||||||
' "misplaced_done": false,',
|
' "misplaced_done": false,',
|
||||||
' "ranges_done": 0,',
|
' "ranges_done": 0,',
|
||||||
|
@ -4499,12 +4499,11 @@ class TestSharder(BaseTestSharder):
|
|||||||
'misplaced_done': True,
|
'misplaced_done': True,
|
||||||
'ranges_done': 2,
|
'ranges_done': 2,
|
||||||
'ranges_todo': 4}
|
'ranges_todo': 4}
|
||||||
if last_modified is not None:
|
|
||||||
params['last_modified'] = last_modified
|
|
||||||
key = 'X-Container-Sysmeta-Shard-Context-%s' % id
|
key = 'X-Container-Sysmeta-Shard-Context-%s' % id
|
||||||
with mock_timestamp_now(Timestamp(0)):
|
with mock_timestamp_now(Timestamp(last_modified)):
|
||||||
broker.update_metadata(
|
broker.update_metadata(
|
||||||
{key: (json.dumps(params), Timestamp.now().internal)})
|
{key: (json.dumps(params),
|
||||||
|
Timestamp(last_modified).internal)})
|
||||||
|
|
||||||
def get_context(id, broker):
|
def get_context(id, broker):
|
||||||
data = broker.get_sharding_sysmeta().get('Context-%s' % id)
|
data = broker.get_sharding_sysmeta().get('Context-%s' % id)
|
||||||
@ -4520,9 +4519,8 @@ class TestSharder(BaseTestSharder):
|
|||||||
self.assertEqual(UNSHARDED, broker.get_db_state())
|
self.assertEqual(UNSHARDED, broker.get_db_state())
|
||||||
|
|
||||||
# Setup some cleaving contexts
|
# Setup some cleaving contexts
|
||||||
id_missing_lm, id_old, id_newish = [str(uuid4()) for _ in range(3)]
|
id_old, id_newish = [str(uuid4()) for _ in range(2)]
|
||||||
contexts = ((id_missing_lm, None),
|
contexts = ((id_old, 1),
|
||||||
(id_old, 1),
|
|
||||||
(id_newish, reclaim_age // 2))
|
(id_newish, reclaim_age // 2))
|
||||||
for id, last_modified in contexts:
|
for id, last_modified in contexts:
|
||||||
add_cleave_context(id, last_modified)
|
add_cleave_context(id, last_modified)
|
||||||
@ -4531,12 +4529,6 @@ class TestSharder(BaseTestSharder):
|
|||||||
with mock_timestamp_now(Timestamp(reclaim_age + 2)):
|
with mock_timestamp_now(Timestamp(reclaim_age + 2)):
|
||||||
sharder._audit_cleave_contexts(broker)
|
sharder._audit_cleave_contexts(broker)
|
||||||
|
|
||||||
# now is reclaim_age + 1, so the old should've been removed and the
|
|
||||||
# context with the missing last modified should now be reclaim + 1
|
|
||||||
missing_lm_ctx = get_context(id_missing_lm, broker)
|
|
||||||
self.assertEqual(missing_lm_ctx.last_modified,
|
|
||||||
Timestamp(reclaim_age + 2).internal)
|
|
||||||
|
|
||||||
old_ctx = get_context(id_old, broker)
|
old_ctx = get_context(id_old, broker)
|
||||||
self.assertEqual(old_ctx, "")
|
self.assertEqual(old_ctx, "")
|
||||||
|
|
||||||
@ -4549,21 +4541,9 @@ class TestSharder(BaseTestSharder):
|
|||||||
with mock_timestamp_now(Timestamp(reclaim_age * 2)):
|
with mock_timestamp_now(Timestamp(reclaim_age * 2)):
|
||||||
sharder._audit_cleave_contexts(broker)
|
sharder._audit_cleave_contexts(broker)
|
||||||
|
|
||||||
missing_lm_ctx = get_context(id_missing_lm, broker)
|
|
||||||
self.assertEqual(missing_lm_ctx.last_modified,
|
|
||||||
Timestamp(reclaim_age + 2).internal)
|
|
||||||
|
|
||||||
newish_ctx = get_context(id_newish, broker)
|
newish_ctx = get_context(id_newish, broker)
|
||||||
self.assertEqual(newish_ctx, "")
|
self.assertEqual(newish_ctx, "")
|
||||||
|
|
||||||
# Fast forward again and they're all cleaned up
|
|
||||||
with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder:
|
|
||||||
with mock_timestamp_now(Timestamp(reclaim_age * 3)):
|
|
||||||
sharder._audit_cleave_contexts(broker)
|
|
||||||
|
|
||||||
missing_lm_ctx = get_context(id_missing_lm, broker)
|
|
||||||
self.assertEqual(missing_lm_ctx, "")
|
|
||||||
|
|
||||||
|
|
||||||
class TestCleavingContext(BaseTestSharder):
|
class TestCleavingContext(BaseTestSharder):
|
||||||
def test_init(self):
|
def test_init(self):
|
||||||
@ -4583,7 +4563,6 @@ class TestCleavingContext(BaseTestSharder):
|
|||||||
'max_row': 12,
|
'max_row': 12,
|
||||||
'cleave_to_row': 11,
|
'cleave_to_row': 11,
|
||||||
'last_cleave_to_row': 10,
|
'last_cleave_to_row': 10,
|
||||||
'last_modified': None,
|
|
||||||
'cleaving_done': False,
|
'cleaving_done': False,
|
||||||
'misplaced_done': True,
|
'misplaced_done': True,
|
||||||
'ranges_done': 0,
|
'ranges_done': 0,
|
||||||
@ -4603,7 +4582,6 @@ class TestCleavingContext(BaseTestSharder):
|
|||||||
'max_row': 12,
|
'max_row': 12,
|
||||||
'cleave_to_row': 11,
|
'cleave_to_row': 11,
|
||||||
'last_cleave_to_row': 10,
|
'last_cleave_to_row': 10,
|
||||||
'last_modified': None,
|
|
||||||
'cleaving_done': False,
|
'cleaving_done': False,
|
||||||
'misplaced_done': True,
|
'misplaced_done': True,
|
||||||
'ranges_done': 0,
|
'ranges_done': 0,
|
||||||
@ -4654,6 +4632,7 @@ class TestCleavingContext(BaseTestSharder):
|
|||||||
def test_load_all(self):
|
def test_load_all(self):
|
||||||
broker = self._make_broker()
|
broker = self._make_broker()
|
||||||
last_ctx = None
|
last_ctx = None
|
||||||
|
timestamp = Timestamp.now()
|
||||||
|
|
||||||
db_ids = [str(uuid4()) for _ in range(6)]
|
db_ids = [str(uuid4()) for _ in range(6)]
|
||||||
for db_id in db_ids:
|
for db_id in db_ids:
|
||||||
@ -4668,17 +4647,29 @@ class TestCleavingContext(BaseTestSharder):
|
|||||||
'ranges_todo': 4}
|
'ranges_todo': 4}
|
||||||
key = 'X-Container-Sysmeta-Shard-Context-%s' % db_id
|
key = 'X-Container-Sysmeta-Shard-Context-%s' % db_id
|
||||||
broker.update_metadata(
|
broker.update_metadata(
|
||||||
{key: (json.dumps(params), Timestamp.now().internal)})
|
{key: (json.dumps(params), timestamp.internal)})
|
||||||
for ctx in CleavingContext.load_all(broker):
|
first_ctx = None
|
||||||
|
for ctx, lm in CleavingContext.load_all(broker):
|
||||||
|
if not first_ctx:
|
||||||
|
first_ctx = ctx
|
||||||
last_ctx = ctx
|
last_ctx = ctx
|
||||||
self.assertIn(ctx.ref, db_ids)
|
self.assertIn(ctx.ref, db_ids)
|
||||||
|
self.assertEqual(lm, timestamp.internal)
|
||||||
|
|
||||||
# If a context is deleted (metadata is "") then it's skipped
|
# If a context is deleted (metadata is "") then it's skipped
|
||||||
last_ctx.delete(broker)
|
last_ctx.delete(broker)
|
||||||
db_ids.remove(last_ctx.ref)
|
db_ids.remove(last_ctx.ref)
|
||||||
|
|
||||||
for ctx in CleavingContext.load_all(broker):
|
# and let's modify the first
|
||||||
|
with mock_timestamp_now() as new_timestamp:
|
||||||
|
first_ctx.store(broker)
|
||||||
|
|
||||||
|
for ctx, lm in CleavingContext.load_all(broker):
|
||||||
self.assertIn(ctx.ref, db_ids)
|
self.assertIn(ctx.ref, db_ids)
|
||||||
|
if ctx.ref == first_ctx.ref:
|
||||||
|
self.assertEqual(lm, new_timestamp.internal)
|
||||||
|
else:
|
||||||
|
self.assertEqual(lm, timestamp.internal)
|
||||||
|
|
||||||
def test_delete(self):
|
def test_delete(self):
|
||||||
broker = self._make_broker()
|
broker = self._make_broker()
|
||||||
@ -4710,36 +4701,6 @@ class TestCleavingContext(BaseTestSharder):
|
|||||||
else:
|
else:
|
||||||
self.fail("Deleted context 'Context-%s' not found")
|
self.fail("Deleted context 'Context-%s' not found")
|
||||||
|
|
||||||
def test_last_modified(self):
|
|
||||||
broker = self._make_broker()
|
|
||||||
|
|
||||||
db_id = broker.get_info()['id']
|
|
||||||
params = {'ref': db_id,
|
|
||||||
'cursor': 'curs',
|
|
||||||
'max_row': 2,
|
|
||||||
'cleave_to_row': 2,
|
|
||||||
'last_cleave_to_row': 1,
|
|
||||||
'cleaving_done': False,
|
|
||||||
'misplaced_done': True,
|
|
||||||
'ranges_done': 2,
|
|
||||||
'ranges_todo': 4}
|
|
||||||
key = 'X-Container-Sysmeta-Shard-Context-%s' % db_id
|
|
||||||
broker.update_metadata(
|
|
||||||
{key: (json.dumps(params), Timestamp.now().internal)})
|
|
||||||
ctx = CleavingContext.load(broker)
|
|
||||||
self.assertIsNone(ctx.last_modified)
|
|
||||||
|
|
||||||
# after a store/save the last_modified will be updated
|
|
||||||
ctx.store(broker)
|
|
||||||
ctx = CleavingContext.load(broker)
|
|
||||||
self.assertIsNotNone(ctx.last_modified)
|
|
||||||
last_modified = ctx.last_modified
|
|
||||||
|
|
||||||
# Store again it'll be updated again
|
|
||||||
ctx.store(broker)
|
|
||||||
ctx = CleavingContext.load(broker)
|
|
||||||
self.assertGreater(ctx.last_modified, last_modified)
|
|
||||||
|
|
||||||
def test_store(self):
|
def test_store(self):
|
||||||
broker = self._make_sharding_broker()
|
broker = self._make_sharding_broker()
|
||||||
old_db_id = broker.get_brokers()[0].get_info()['id']
|
old_db_id = broker.get_brokers()[0].get_info()['id']
|
||||||
@ -4754,12 +4715,13 @@ class TestCleavingContext(BaseTestSharder):
|
|||||||
'max_row': 12,
|
'max_row': 12,
|
||||||
'cleave_to_row': 11,
|
'cleave_to_row': 11,
|
||||||
'last_cleave_to_row': 2,
|
'last_cleave_to_row': 2,
|
||||||
'last_modified': last_mod.internal,
|
|
||||||
'cleaving_done': True,
|
'cleaving_done': True,
|
||||||
'misplaced_done': True,
|
'misplaced_done': True,
|
||||||
'ranges_done': 2,
|
'ranges_done': 2,
|
||||||
'ranges_todo': 4}
|
'ranges_todo': 4}
|
||||||
self.assertEqual(expected, data)
|
self.assertEqual(expected, data)
|
||||||
|
# last modified is the metadata timestamp
|
||||||
|
self.assertEqual(broker.metadata[key][1], last_mod.internal)
|
||||||
|
|
||||||
def test_store_add_row_load(self):
|
def test_store_add_row_load(self):
|
||||||
# adding row to older db changes only max_row in the context
|
# adding row to older db changes only max_row in the context
|
||||||
|
Loading…
Reference in New Issue
Block a user