Merge "Batch results per cell when doing cross-cell listing"
This commit is contained in:
@@ -45,9 +45,10 @@ class InstanceSortContext(multi_cell_list.RecordSortContext):
|
||||
|
||||
|
||||
class InstanceLister(multi_cell_list.CrossCellLister):
|
||||
def __init__(self, sort_keys, sort_dirs, cells=None):
|
||||
def __init__(self, sort_keys, sort_dirs, cells=None, batch_size=None):
|
||||
super(InstanceLister, self).__init__(
|
||||
InstanceSortContext(sort_keys, sort_dirs), cells=cells)
|
||||
InstanceSortContext(sort_keys, sort_dirs), cells=cells,
|
||||
batch_size=batch_size)
|
||||
|
||||
@property
|
||||
def marker_identifier(self):
|
||||
@@ -89,9 +90,11 @@ class InstanceLister(multi_cell_list.CrossCellLister):
|
||||
# NOTE(danms): These methods are here for legacy glue reasons. We should not
|
||||
# replicate these for every data type we implement.
|
||||
def get_instances_sorted(ctx, filters, limit, marker, columns_to_join,
|
||||
sort_keys, sort_dirs, cell_mappings=None):
|
||||
sort_keys, sort_dirs, cell_mappings=None,
|
||||
batch_size=None):
|
||||
return InstanceLister(sort_keys, sort_dirs,
|
||||
cells=cell_mappings).get_records_sorted(
|
||||
cells=cell_mappings,
|
||||
batch_size=batch_size).get_records_sorted(
|
||||
ctx, filters, limit, marker, columns_to_join=columns_to_join)
|
||||
|
||||
|
||||
|
||||
+131
-29
@@ -13,15 +13,18 @@
|
||||
import abc
|
||||
import copy
|
||||
import heapq
|
||||
import itertools
|
||||
|
||||
import eventlet
|
||||
import six
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from nova import context
|
||||
from nova import exception
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CELL_FAIL_SENTINELS = (context.did_not_respond_sentinel,
|
||||
context.raised_exception_sentinel)
|
||||
|
||||
|
||||
class RecordSortContext(object):
|
||||
@@ -61,12 +64,54 @@ class RecordWrapper(object):
|
||||
self._db_record = db_record
|
||||
|
||||
def __lt__(self, other):
|
||||
# NOTE(danms): This makes us always sort failure sentinels
|
||||
# higher than actual results. We do this so that they bubble
|
||||
# up in the get_records_sorted() feeder loop ahead of anything
|
||||
# else, and so that the implementation of RecordSortContext
|
||||
# never sees or has to handle the sentinels. If we did not
|
||||
# sort these to the top then we could potentially return
|
||||
# $limit results from good cells before we noticed the failed
|
||||
# cells, and would not properly report them as failed for
|
||||
# fix-up in the higher layers.
|
||||
if self._db_record in CELL_FAIL_SENTINELS:
|
||||
return True
|
||||
elif other._db_record in CELL_FAIL_SENTINELS:
|
||||
return False
|
||||
|
||||
r = self._sort_ctx.compare_records(self._db_record,
|
||||
other._db_record)
|
||||
# cmp(x, y) returns -1 if x < y
|
||||
return r == -1
|
||||
|
||||
|
||||
def query_wrapper(ctx, fn, *args, **kwargs):
|
||||
"""This is a helper to run a query with predictable fail semantics.
|
||||
|
||||
This is a generator which will mimic the scatter_gather_cells() behavior
|
||||
by honoring a timeout and catching exceptions, yielding the usual
|
||||
sentinel objects instead of raising. It wraps these in RecordWrapper
|
||||
objects, which will prioritize them to the merge sort, causing them to
|
||||
be handled by the main get_objects_sorted() feeder loop quickly and
|
||||
gracefully.
|
||||
"""
|
||||
with eventlet.timeout.Timeout(context.CELL_TIMEOUT, exception.CellTimeout):
|
||||
try:
|
||||
for record in fn(ctx, *args, **kwargs):
|
||||
yield record
|
||||
except exception.CellTimeout:
|
||||
# Here, we yield a RecordWrapper (no sort_ctx needed since
|
||||
# we won't call into the implementation's comparison routines)
|
||||
# wrapping the sentinel indicating timeout.
|
||||
yield RecordWrapper(ctx, None, context.did_not_respond_sentinel)
|
||||
raise StopIteration
|
||||
except Exception:
|
||||
# Here, we yield a RecordWrapper (no sort_ctx needed since
|
||||
# we won't call into the implementation's comparison routines)
|
||||
# wrapping the sentinel indicating failure.
|
||||
yield RecordWrapper(ctx, None, context.raised_exception_sentinel)
|
||||
raise StopIteration
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class CrossCellLister(object):
|
||||
"""An implementation of a cross-cell efficient lister.
|
||||
@@ -78,9 +123,10 @@ class CrossCellLister(object):
|
||||
your data type from cell databases.
|
||||
|
||||
"""
|
||||
def __init__(self, sort_ctx, cells=None):
|
||||
def __init__(self, sort_ctx, cells=None, batch_size=None):
|
||||
self.sort_ctx = sort_ctx
|
||||
self.cells = cells
|
||||
self.batch_size = batch_size
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
@@ -238,48 +284,104 @@ class CrossCellLister(object):
|
||||
# nothing. If we didn't have this clause, we'd
|
||||
# pass marker=None to the query below and return a
|
||||
# full unpaginated set for our cell.
|
||||
return []
|
||||
return
|
||||
|
||||
main_query_result = self.get_by_filters(
|
||||
cctx, filters,
|
||||
limit=limit, marker=local_marker,
|
||||
**kwargs)
|
||||
if local_marker_prefix:
|
||||
# Per above, if we had a matching marker object, that is
|
||||
# the first result we should generate.
|
||||
yield RecordWrapper(cctx, self.sort_ctx,
|
||||
local_marker_prefix[0])
|
||||
|
||||
return (RecordWrapper(cctx, self.sort_ctx, inst) for inst in
|
||||
itertools.chain(local_marker_prefix, main_query_result))
|
||||
# If a batch size was provided, use that as the limit per
|
||||
# batch. If not, then ask for the entire $limit in a single
|
||||
# batch.
|
||||
batch_size = self.batch_size or limit
|
||||
|
||||
# NOTE(tssurya): When the below routine provides sentinels to indicate
|
||||
# a timeout on a cell, we ignore that cell to avoid the crash when
|
||||
# doing the merge below and continue merging the results from the 'up'
|
||||
# cells.
|
||||
# TODO(tssurya): Modify this to return the minimal available info from
|
||||
# the down cells.
|
||||
# Keep track of how many we have returned in all batches
|
||||
return_count = 0
|
||||
|
||||
# If limit was unlimited then keep querying batches until
|
||||
# we run out of results. Otherwise, query until the total count
|
||||
# we have returned exceeds the limit.
|
||||
while limit is None or return_count < limit:
|
||||
batch_count = 0
|
||||
|
||||
# Do not query a full batch if it would cause our total
|
||||
# to exceed the limit
|
||||
if limit:
|
||||
query_size = min(batch_size, limit - return_count)
|
||||
else:
|
||||
query_size = batch_size
|
||||
|
||||
# Get one batch
|
||||
query_result = self.get_by_filters(
|
||||
cctx, filters,
|
||||
limit=query_size or None, marker=local_marker,
|
||||
**kwargs)
|
||||
|
||||
# Yield wrapped results from the batch, counting as we go
|
||||
# (to avoid traversing the list to count). Also, update our
|
||||
# local_marker each time so that local_marker is the end of
|
||||
# this batch in order to find the next batch.
|
||||
for item in query_result:
|
||||
local_marker = item[self.marker_identifier]
|
||||
yield RecordWrapper(cctx, self.sort_ctx, item)
|
||||
batch_count += 1
|
||||
|
||||
# No results means we are done for this cell
|
||||
if not batch_count:
|
||||
break
|
||||
|
||||
return_count += batch_count
|
||||
LOG.debug(('Listed batch of %(batch)i results from cell '
|
||||
'out of %(limit)s limit. Returned %(total)i '
|
||||
'total so far.'),
|
||||
{'batch': batch_count,
|
||||
'total': return_count,
|
||||
'limit': limit or 'no'})
|
||||
|
||||
# NOTE(danms): The calls to do_query() will return immediately
|
||||
# with a generator. There is no point in us checking the
|
||||
# results for failure or timeout since we have not actually
|
||||
# run any code in do_query() until the first iteration
|
||||
# below. The query_wrapper() utility handles inline
|
||||
# translation of failures and timeouts to sentinels which will
|
||||
# be generated and consumed just like any normal result below.
|
||||
if self.cells:
|
||||
results = context.scatter_gather_cells(ctx, self.cells,
|
||||
context.CELL_TIMEOUT,
|
||||
do_query)
|
||||
query_wrapper, do_query)
|
||||
else:
|
||||
results = context.scatter_gather_all_cells(ctx, do_query)
|
||||
for cell_uuid in list(results):
|
||||
if results[cell_uuid] in (context.did_not_respond_sentinel,
|
||||
context.raised_exception_sentinel):
|
||||
LOG.warning("Cell %s is not responding and hence skipped "
|
||||
"from the results.", cell_uuid)
|
||||
results.pop(cell_uuid)
|
||||
results = context.scatter_gather_all_cells(ctx,
|
||||
query_wrapper, do_query)
|
||||
|
||||
# If a limit was provided, it was passed to the per-cell query
|
||||
# routines. That means we have NUM_CELLS * limit items across
|
||||
# results. So, we need to consume from that limit below and
|
||||
# stop returning results.
|
||||
limit = limit or 0
|
||||
# stop returning results. Call that total_limit since we will
|
||||
# modify it in the loop below, but do_query() above also looks
|
||||
# at the original provided limit.
|
||||
total_limit = limit or 0
|
||||
|
||||
# Generate results from heapq so we can return the inner
|
||||
# instance instead of the wrapper. This is basically free
|
||||
# as it works as our caller iterates the results.
|
||||
for i in heapq.merge(*results.values()):
|
||||
yield i._db_record
|
||||
limit -= 1
|
||||
if limit == 0:
|
||||
feeder = heapq.merge(*results.values())
|
||||
while True:
|
||||
try:
|
||||
item = next(feeder)
|
||||
except StopIteration:
|
||||
return
|
||||
|
||||
if item._db_record in CELL_FAIL_SENTINELS:
|
||||
LOG.warning('Cell %s is not responding and hence is '
|
||||
'being omitted from the results',
|
||||
item.cell_uuid)
|
||||
continue
|
||||
|
||||
yield item._db_record
|
||||
total_limit -= 1
|
||||
if total_limit == 0:
|
||||
# We'll only hit this if limit was nonzero and we just
|
||||
# generated our last one
|
||||
return
|
||||
|
||||
@@ -146,6 +146,15 @@ class InstanceListTestCase(test.TestCase):
|
||||
self.assertEqual(sorted(uuids), uuids)
|
||||
self.assertEqual(len(self.instances), len(uuids))
|
||||
|
||||
def test_get_sorted_with_large_limit_batched(self):
|
||||
insts = instance_list.get_instances_sorted(self.context, {},
|
||||
5000, None,
|
||||
[], ['uuid'], ['asc'],
|
||||
batch_size=2)
|
||||
uuids = [inst['uuid'] for inst in insts]
|
||||
self.assertEqual(sorted(uuids), uuids)
|
||||
self.assertEqual(len(self.instances), len(uuids))
|
||||
|
||||
def _test_get_sorted_with_limit_marker(self, sort_by, pages=2, pagesize=2,
|
||||
sort_dir='asc'):
|
||||
"""Get multiple pages by a sort key and validate the results.
|
||||
|
||||
@@ -145,15 +145,17 @@ class TestInstanceList(test.NoDBTestCase):
|
||||
# storing the uuids of the instances from the up cell
|
||||
uuid_initial = [inst['uuid'] for inst in inst_cell0]
|
||||
|
||||
def wrap(thing):
|
||||
return multi_cell_list.RecordWrapper(ctx, self.context, thing)
|
||||
|
||||
ctx = nova_context.RequestContext()
|
||||
instances = (multi_cell_list.RecordWrapper(ctx, self.context, inst)
|
||||
for inst in inst_cell0)
|
||||
instances = [wrap(inst) for inst in inst_cell0]
|
||||
|
||||
# creating one up cell and two down cells
|
||||
ret_val = {}
|
||||
ret_val[uuids.cell0] = instances
|
||||
ret_val[uuids.cell1] = nova_context.raised_exception_sentinel
|
||||
ret_val[uuids.cell2] = nova_context.did_not_respond_sentinel
|
||||
ret_val[uuids.cell1] = [wrap(nova_context.raised_exception_sentinel)]
|
||||
ret_val[uuids.cell2] = [wrap(nova_context.did_not_respond_sentinel)]
|
||||
mock_sg.return_value = ret_val
|
||||
|
||||
res = instance_list.get_instances_sorted(self.context, {}, None, None,
|
||||
|
||||
@@ -10,10 +10,15 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from contextlib import contextmanager
|
||||
import copy
|
||||
import datetime
|
||||
import mock
|
||||
|
||||
from nova.compute import multi_cell_list
|
||||
from nova import context
|
||||
from nova import exception
|
||||
from nova import objects
|
||||
from nova import test
|
||||
from nova.tests import uuidsentinel as uuids
|
||||
|
||||
@@ -103,3 +108,238 @@ class TestUtils(test.NoDBTestCase):
|
||||
|
||||
# Make sure we can tell which cell a request came from
|
||||
self.assertEqual(uuids.cell, iw1.cell_uuid)
|
||||
|
||||
def test_wrapper_sentinels(self):
|
||||
inst1 = {'key0': 'foo', 'key1': 'd', 'key2': 456}
|
||||
|
||||
ctx = context.RequestContext()
|
||||
ctx.cell_uuid = uuids.cell
|
||||
|
||||
sort_ctx = multi_cell_list.RecordSortContext(['key0', 'key1'],
|
||||
['asc', 'asc'])
|
||||
iw1 = multi_cell_list.RecordWrapper(ctx, sort_ctx, inst1)
|
||||
|
||||
# Wrappers with sentinels
|
||||
iw2 = multi_cell_list.RecordWrapper(ctx, sort_ctx,
|
||||
context.did_not_respond_sentinel)
|
||||
iw3 = multi_cell_list.RecordWrapper(ctx, sort_ctx,
|
||||
context.raised_exception_sentinel)
|
||||
|
||||
# NOTE(danms): The sentinel wrappers always win
|
||||
self.assertTrue(iw2 < iw1)
|
||||
self.assertTrue(iw3 < iw1)
|
||||
self.assertFalse(iw1 < iw2)
|
||||
self.assertFalse(iw1 < iw3)
|
||||
|
||||
# NOTE(danms): Comparing two wrappers with sentinels will always return
|
||||
# True for less-than because we're just naive about always favoring the
|
||||
# left hand side. This is fine for our purposes but put it here to make
|
||||
# it explicit.
|
||||
self.assertTrue(iw2 < iw3)
|
||||
self.assertTrue(iw3 < iw2)
|
||||
|
||||
def test_query_wrapper_success(self):
|
||||
def test(ctx, data):
|
||||
for thing in data:
|
||||
yield thing
|
||||
|
||||
self.assertEqual([1, 2, 3],
|
||||
list(multi_cell_list.query_wrapper(
|
||||
None, test, [1, 2, 3])))
|
||||
|
||||
def test_query_wrapper_timeout(self):
|
||||
def test(ctx):
|
||||
raise exception.CellTimeout
|
||||
|
||||
self.assertEqual([context.did_not_respond_sentinel],
|
||||
[x._db_record for x in
|
||||
multi_cell_list.query_wrapper(
|
||||
mock.MagicMock(), test)])
|
||||
|
||||
def test_query_wrapper_fail(self):
|
||||
def test(ctx):
|
||||
raise test.TestingException
|
||||
|
||||
self.assertEqual([context.raised_exception_sentinel],
|
||||
[x._db_record for x in
|
||||
multi_cell_list.query_wrapper(
|
||||
mock.MagicMock(), test)])
|
||||
|
||||
|
||||
class TestListContext(multi_cell_list.RecordSortContext):
|
||||
def compare_records(self, rec1, rec2):
|
||||
return -1
|
||||
|
||||
|
||||
class TestLister(multi_cell_list.CrossCellLister):
|
||||
def __init__(self, data, sort_keys, sort_dirs,
|
||||
cells=None, batch_size=None):
|
||||
self._data = data
|
||||
self._count_by_cell = {}
|
||||
super(TestLister, self).__init__(TestListContext(sort_keys, sort_dirs),
|
||||
cells=cells, batch_size=batch_size)
|
||||
|
||||
@property
|
||||
def marker_identifier(self):
|
||||
return 'id'
|
||||
|
||||
def _method_called(self, ctx, method, limit):
|
||||
self._count_by_cell.setdefault(ctx.cell_uuid, {})
|
||||
self._count_by_cell[ctx.cell_uuid].setdefault(method, [])
|
||||
self._count_by_cell[ctx.cell_uuid][method].append(limit)
|
||||
|
||||
def call_summary(self, method):
|
||||
results = {
|
||||
'total': 0,
|
||||
'count_by_cell': [],
|
||||
'limit_by_cell': [],
|
||||
'total_by_cell': [],
|
||||
}
|
||||
for i, cell in enumerate(self._count_by_cell):
|
||||
results['total'] += len(self._count_by_cell[cell][method])
|
||||
# List of number of calls in each cell
|
||||
results['count_by_cell'].append(
|
||||
len(self._count_by_cell[cell][method]))
|
||||
# List of limits used in calls to each cell
|
||||
results['limit_by_cell'].append(
|
||||
self._count_by_cell[cell][method])
|
||||
# List of total results fetched from each cell
|
||||
results['total_by_cell'].append(sum(
|
||||
self._count_by_cell[cell][method]))
|
||||
results['count_by_cell'].sort()
|
||||
results['limit_by_cell'].sort()
|
||||
results['total_by_cell'].sort()
|
||||
return results
|
||||
|
||||
def get_marker_record(self, ctx, marker):
|
||||
pass
|
||||
|
||||
def get_marker_by_values(self, ctx, values):
|
||||
pass
|
||||
|
||||
def get_by_filters(self, ctx, filters, limit, marker, **kwargs):
|
||||
self._method_called(ctx, 'get_by_filters', limit)
|
||||
batch = self._data[:limit]
|
||||
self._data = self._data[limit:]
|
||||
return batch
|
||||
|
||||
|
||||
@contextmanager
|
||||
def target_cell_cheater(context, target_cell):
|
||||
# In order to help us do accounting, we need to mimic the real
|
||||
# behavior where at least cell_uuid gets set on the context, which
|
||||
# doesn't happen in the simple test fixture.
|
||||
context = copy.deepcopy(context)
|
||||
context.cell_uuid = target_cell.uuid
|
||||
yield context
|
||||
|
||||
|
||||
@mock.patch('nova.context.target_cell', new=target_cell_cheater)
|
||||
class TestBatching(test.NoDBTestCase):
|
||||
def setUp(self):
|
||||
super(TestBatching, self).setUp()
|
||||
|
||||
self._data = [{'id': 'foo-%i' % i}
|
||||
for i in range(0, 1000)]
|
||||
self._cells = [objects.CellMapping(uuid=getattr(uuids, 'cell%i' % i),
|
||||
name='cell%i' % i)
|
||||
for i in range(0, 10)]
|
||||
|
||||
def test_batches_not_needed(self):
|
||||
lister = TestLister(self._data, [], [],
|
||||
cells=self._cells, batch_size=10)
|
||||
ctx = context.RequestContext()
|
||||
res = list(lister.get_records_sorted(ctx, {}, 5, None))
|
||||
self.assertEqual(5, len(res))
|
||||
summary = lister.call_summary('get_by_filters')
|
||||
# We only needed one batch per cell to hit the total,
|
||||
# so we should have the same number of calls as cells
|
||||
self.assertEqual(len(self._cells), summary['total'])
|
||||
# One call per cell, hitting all cells
|
||||
self.assertEqual(len(self._cells), len(summary['count_by_cell']))
|
||||
self.assertTrue(all([
|
||||
cell_count == 1 for cell_count in summary['count_by_cell']]))
|
||||
|
||||
def test_batches(self):
|
||||
lister = TestLister(self._data, [], [],
|
||||
cells=self._cells, batch_size=10)
|
||||
ctx = context.RequestContext()
|
||||
res = list(lister.get_records_sorted(ctx, {}, 500, None))
|
||||
self.assertEqual(500, len(res))
|
||||
summary = lister.call_summary('get_by_filters')
|
||||
|
||||
# Since we got everything from one cell (due to how things are sorting)
|
||||
# we should have made 500 / 10 calls to one cell, and 1 call to
|
||||
# the rest
|
||||
calls_expected = [1 for cell in self._cells[1:]] + [500 / 10]
|
||||
self.assertEqual(calls_expected, summary['count_by_cell'])
|
||||
|
||||
# Since we got everything from one cell (due to how things are sorting)
|
||||
# we should have received 500 from one cell and 10 from the rest
|
||||
count_expected = [10 for cell in self._cells[1:]] + [500]
|
||||
self.assertEqual(count_expected, summary['total_by_cell'])
|
||||
|
||||
# Since we got everything from one cell (due to how things are sorting)
|
||||
# we should have a bunch of calls for batches of 10, one each for
|
||||
# every cell except the one that served the bulk of the requests which
|
||||
# should have 500 / 10 batches of 10.
|
||||
limit_expected = ([[10] for cell in self._cells[1:]] +
|
||||
[[10 for i in range(0, 500 // 10)]])
|
||||
self.assertEqual(limit_expected, summary['limit_by_cell'])
|
||||
|
||||
def test_no_batches(self):
|
||||
lister = TestLister(self._data, [], [],
|
||||
cells=self._cells)
|
||||
ctx = context.RequestContext()
|
||||
res = list(lister.get_records_sorted(ctx, {}, 50, None))
|
||||
self.assertEqual(50, len(res))
|
||||
summary = lister.call_summary('get_by_filters')
|
||||
|
||||
# Since we used no batches we should have one call per cell
|
||||
calls_expected = [1 for cell in self._cells]
|
||||
self.assertEqual(calls_expected, summary['count_by_cell'])
|
||||
|
||||
# Since we used no batches, each cell should have returned 50 results
|
||||
count_expected = [50 for cell in self._cells]
|
||||
self.assertEqual(count_expected, summary['total_by_cell'])
|
||||
|
||||
# Since we used no batches, each cell call should be for $limit
|
||||
limit_expected = [[count] for count in count_expected]
|
||||
self.assertEqual(limit_expected, summary['limit_by_cell'])
|
||||
|
||||
|
||||
class FailureLister(TestLister):
|
||||
def __init__(self, *a, **k):
|
||||
super(FailureLister, self).__init__(*a, **k)
|
||||
self._fails = [context.did_not_respond_sentinel,
|
||||
None,
|
||||
context.raised_exception_sentinel,
|
||||
None,
|
||||
None]
|
||||
|
||||
def get_by_filters(self, *a, **k):
|
||||
action = self._fails.pop()
|
||||
if action == context.did_not_respond_sentinel:
|
||||
raise exception.CellTimeout
|
||||
elif action == context.raised_exception_sentinel:
|
||||
raise test.TestingException
|
||||
else:
|
||||
return super(FailureLister, self).get_by_filters(*a, **k)
|
||||
|
||||
|
||||
@mock.patch('nova.context.target_cell', new=target_cell_cheater)
|
||||
class TestBaseClass(test.NoDBTestCase):
|
||||
def test_with_failing_cells(self):
|
||||
data = [{'id': 'foo-%i' % i} for i in range(0, 100)]
|
||||
cells = [objects.CellMapping(uuid=getattr(uuids, 'cell%i' % i),
|
||||
name='cell%i' % i)
|
||||
for i in range(0, 5)]
|
||||
|
||||
# Two of the five cells will fail, one with timeout and one
|
||||
# with an error
|
||||
lister = FailureLister(data, [], [], cells=cells)
|
||||
ctx = context.RequestContext()
|
||||
result = lister.get_records_sorted(ctx, {}, 50, None)
|
||||
# We should still have 50 results since there are enough from the
|
||||
# good cells to fill our limit.
|
||||
self.assertEqual(50, len(list(result)))
|
||||
|
||||
Reference in New Issue
Block a user