Add scatter gather utilities for cells
This adds utilities: scatter_gather_all_cells(), scatter_gather_skip_cell0(), and scatter_gather_cells() for querying cells in parallel using eventlet green threads. Change-Id: I289932176e8029b0f9a76dbfb963f8ac218fdc06
This commit is contained in:
133
nova/context.py
133
nova/context.py
@@ -20,6 +20,8 @@
|
||||
from contextlib import contextmanager
|
||||
import copy
|
||||
|
||||
import eventlet.queue
|
||||
import eventlet.timeout
|
||||
from keystoneauth1.access import service_catalog as ksa_service_catalog
|
||||
from keystoneauth1 import plugin
|
||||
from oslo_context import context
|
||||
@@ -30,6 +32,7 @@ import six
|
||||
|
||||
from nova import exception
|
||||
from nova.i18n import _
|
||||
from nova import objects
|
||||
from nova import policy
|
||||
from nova import utils
|
||||
|
||||
@@ -38,6 +41,16 @@ LOG = logging.getLogger(__name__)
|
||||
# SIGHUP and periodically based on an expiration time. Currently, none of the
|
||||
# cell caches are purged, so neither is this one, for now.
|
||||
CELL_CACHE = {}
|
||||
# NOTE(melwitt): Used for the scatter-gather utility to indicate we timed out
|
||||
# waiting for a result from a cell.
|
||||
did_not_respond_sentinel = object()
|
||||
# NOTE(melwitt): Used for the scatter-gather utility to indicate an exception
|
||||
# was raised gathering a result from a cell.
|
||||
raised_exception_sentinel = object()
|
||||
# FIXME(danms): Keep a global cache of the cells we find the
|
||||
# first time we look. This needs to be refreshed on a timer or
|
||||
# trigger.
|
||||
CELLS = []
|
||||
|
||||
|
||||
class _ContextAuthPlugin(plugin.BaseAuthPlugin):
|
||||
@@ -415,3 +428,123 @@ def target_cell(context, cell_mapping):
|
||||
cctxt = copy.copy(context)
|
||||
set_target_cell(cctxt, cell_mapping)
|
||||
yield cctxt
|
||||
|
||||
|
||||
def scatter_gather_cells(context, cell_mappings, timeout, fn, *args, **kwargs):
|
||||
"""Target cells in parallel and return their results.
|
||||
|
||||
The first parameter in the signature of the function to call for each cell
|
||||
should be of type RequestContext.
|
||||
|
||||
:param context: The RequestContext for querying cells
|
||||
:param cell_mappings: The CellMappings to target in parallel
|
||||
:param timeout: The total time in seconds to wait for all the results to be
|
||||
gathered
|
||||
:param fn: The function to call for each cell
|
||||
:param args: The args for the function to call for each cell, not including
|
||||
the RequestContext
|
||||
:param kwargs: The kwargs for the function to call for each cell
|
||||
:returns: A dict {cell_uuid: result} containing the joined results. The
|
||||
did_not_respond_sentinel will be returned if a cell did not
|
||||
respond within the timeout. The raised_exception_sentinel will
|
||||
be returned if the call to a cell raised an exception. The
|
||||
exception will be logged.
|
||||
"""
|
||||
greenthreads = []
|
||||
queue = eventlet.queue.LightQueue()
|
||||
results = {}
|
||||
|
||||
def gather_result(cell_uuid, fn, *args, **kwargs):
|
||||
try:
|
||||
result = fn(*args, **kwargs)
|
||||
except Exception:
|
||||
LOG.exception('Error gathering result from cell %s', cell_uuid)
|
||||
result = raised_exception_sentinel
|
||||
# The queue is already synchronized.
|
||||
queue.put((cell_uuid, result))
|
||||
|
||||
for cell_mapping in cell_mappings:
|
||||
with target_cell(context, cell_mapping) as cctxt:
|
||||
greenthreads.append((cell_mapping.uuid,
|
||||
utils.spawn(gather_result, cell_mapping.uuid,
|
||||
fn, cctxt, *args, **kwargs)))
|
||||
|
||||
with eventlet.timeout.Timeout(timeout, exception.CellTimeout):
|
||||
try:
|
||||
while len(results) != len(greenthreads):
|
||||
cell_uuid, result = queue.get()
|
||||
results[cell_uuid] = result
|
||||
except exception.CellTimeout:
|
||||
# NOTE(melwitt): We'll fill in did_not_respond_sentinels at the
|
||||
# same time we kill/wait for the green threads.
|
||||
pass
|
||||
|
||||
# Kill the green threads still pending and wait on those we know are done.
|
||||
for cell_uuid, greenthread in greenthreads:
|
||||
if cell_uuid not in results:
|
||||
greenthread.kill()
|
||||
results[cell_uuid] = did_not_respond_sentinel
|
||||
LOG.warning('Timed out waiting for response from cell %s',
|
||||
cell_uuid)
|
||||
else:
|
||||
greenthread.wait()
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def load_cells():
|
||||
global CELLS
|
||||
if not CELLS:
|
||||
CELLS = objects.CellMappingList.get_all(get_admin_context())
|
||||
LOG.debug('Found %(count)i cells: %(cells)s',
|
||||
dict(count=len(CELLS),
|
||||
cells=','.join([c.identity for c in CELLS])))
|
||||
|
||||
if not CELLS:
|
||||
LOG.error('No cells are configured, unable to continue')
|
||||
|
||||
|
||||
def scatter_gather_skip_cell0(context, fn, *args, **kwargs):
|
||||
"""Target all cells except cell0 in parallel and return their results.
|
||||
|
||||
The first parameter in the signature of the function to call for each cell
|
||||
should be of type RequestContext. There is a 60 second timeout for waiting
|
||||
on all results to be gathered.
|
||||
|
||||
:param context: The RequestContext for querying cells
|
||||
:param fn: The function to call for each cell
|
||||
:param args: The args for the function to call for each cell, not including
|
||||
the RequestContext
|
||||
:param kwargs: The kwargs for the function to call for each cell
|
||||
:returns: A dict {cell_uuid: result} containing the joined results. The
|
||||
did_not_respond_sentinel will be returned if a cell did not
|
||||
respond within the timeout. The raised_exception_sentinel will
|
||||
be returned if the call to a cell raised an exception. The
|
||||
exception will be logged.
|
||||
"""
|
||||
load_cells()
|
||||
cell_mappings = [cell for cell in CELLS if not cell.is_cell0()]
|
||||
return scatter_gather_cells(context, cell_mappings, 60, fn, *args,
|
||||
**kwargs)
|
||||
|
||||
|
||||
def scatter_gather_all_cells(context, fn, *args, **kwargs):
|
||||
"""Target all cells in parallel and return their results.
|
||||
|
||||
The first parameter in the signature of the function to call for each cell
|
||||
should be of type RequestContext. There is a 60 second timeout for waiting
|
||||
on all results to be gathered.
|
||||
|
||||
:param context: The RequestContext for querying cells
|
||||
:param fn: The function to call for each cell
|
||||
:param args: The args for the function to call for each cell, not including
|
||||
the RequestContext
|
||||
:param kwargs: The kwargs for the function to call for each cell
|
||||
:returns: A dict {cell_uuid: result} containing the joined results. The
|
||||
did_not_respond_sentinel will be returned if a cell did not
|
||||
respond within the timeout. The raised_exception_sentinel will
|
||||
be returned if the call to a cell raised an exception. The
|
||||
exception will be logged.
|
||||
"""
|
||||
load_cells()
|
||||
return scatter_gather_cells(context, CELLS, 60, fn, *args, **kwargs)
|
||||
|
||||
@@ -265,6 +265,7 @@ class TestCase(testtools.TestCase):
|
||||
from nova.compute import api
|
||||
api.CELLS = []
|
||||
context.CELL_CACHE = {}
|
||||
context.CELLS = []
|
||||
|
||||
self.cell_mappings = {}
|
||||
self.host_mappings = {}
|
||||
|
||||
@@ -19,6 +19,7 @@ from nova import exception
|
||||
from nova import objects
|
||||
from nova import test
|
||||
from nova.tests import fixtures as nova_fixtures
|
||||
from nova.tests import uuidsentinel as uuids
|
||||
|
||||
|
||||
class ConnectionSwitchTestCase(test.NoDBTestCase):
|
||||
@@ -76,26 +77,37 @@ class CellDatabasesTestCase(test.NoDBTestCase):
|
||||
super(CellDatabasesTestCase, self).setUp()
|
||||
self.useFixture(nova_fixtures.Database(database='api'))
|
||||
fix = nova_fixtures.CellDatabases()
|
||||
fix.add_cell_database('blah')
|
||||
fix.add_cell_database('wat')
|
||||
fix.add_cell_database('cell0')
|
||||
fix.add_cell_database('cell1')
|
||||
fix.add_cell_database('cell2')
|
||||
self.useFixture(fix)
|
||||
|
||||
self.context = context.RequestContext('fake-user', 'fake-project')
|
||||
|
||||
def _create_cell_mappings(self):
|
||||
cell0_uuid = objects.CellMapping.CELL0_UUID
|
||||
self.mapping0 = objects.CellMapping(context=self.context,
|
||||
uuid=cell0_uuid,
|
||||
database_connection='cell0',
|
||||
transport_url='none:///')
|
||||
self.mapping1 = objects.CellMapping(context=self.context,
|
||||
uuid=uuidutils.generate_uuid(),
|
||||
database_connection='cell1',
|
||||
transport_url='none:///')
|
||||
self.mapping2 = objects.CellMapping(context=self.context,
|
||||
uuid=uuidutils.generate_uuid(),
|
||||
database_connection='cell2',
|
||||
transport_url='none:///')
|
||||
self.mapping0.create()
|
||||
self.mapping1.create()
|
||||
self.mapping2.create()
|
||||
|
||||
def test_cell_dbs(self):
|
||||
ctxt = context.RequestContext('fake-user', 'fake-project')
|
||||
mapping1 = objects.CellMapping(context=ctxt,
|
||||
uuid=uuidutils.generate_uuid(),
|
||||
database_connection='blah',
|
||||
transport_url='none:///')
|
||||
mapping2 = objects.CellMapping(context=ctxt,
|
||||
uuid=uuidutils.generate_uuid(),
|
||||
database_connection='wat',
|
||||
transport_url='none:///')
|
||||
mapping1.create()
|
||||
mapping2.create()
|
||||
self._create_cell_mappings()
|
||||
|
||||
# Create an instance and read it from cell1
|
||||
uuid = uuidutils.generate_uuid()
|
||||
with context.target_cell(ctxt, mapping1) as cctxt:
|
||||
with context.target_cell(self.context, self.mapping1) as cctxt:
|
||||
instance = objects.Instance(context=cctxt, uuid=uuid,
|
||||
project_id='fake-project')
|
||||
instance.create()
|
||||
@@ -104,18 +116,18 @@ class CellDatabasesTestCase(test.NoDBTestCase):
|
||||
self.assertEqual(uuid, inst.uuid)
|
||||
|
||||
# Make sure it can't be read from cell2
|
||||
with context.target_cell(ctxt, mapping2) as cctxt:
|
||||
with context.target_cell(self.context, self.mapping2) as cctxt:
|
||||
self.assertRaises(exception.InstanceNotFound,
|
||||
objects.Instance.get_by_uuid, cctxt, uuid)
|
||||
|
||||
# Make sure it can still be read from cell1
|
||||
with context.target_cell(ctxt, mapping1) as cctxt:
|
||||
with context.target_cell(self.context, self.mapping1) as cctxt:
|
||||
inst = objects.Instance.get_by_uuid(cctxt, uuid)
|
||||
self.assertEqual(uuid, inst.uuid)
|
||||
|
||||
# Create an instance and read it from cell2
|
||||
uuid = uuidutils.generate_uuid()
|
||||
with context.target_cell(ctxt, mapping2) as cctxt:
|
||||
with context.target_cell(self.context, self.mapping2) as cctxt:
|
||||
instance = objects.Instance(context=cctxt, uuid=uuid,
|
||||
project_id='fake-project')
|
||||
instance.create()
|
||||
@@ -124,6 +136,66 @@ class CellDatabasesTestCase(test.NoDBTestCase):
|
||||
self.assertEqual(uuid, inst.uuid)
|
||||
|
||||
# Make sure it can't be read from cell1
|
||||
with context.target_cell(ctxt, mapping1) as cctxt:
|
||||
with context.target_cell(self.context, self.mapping1) as cctxt:
|
||||
self.assertRaises(exception.InstanceNotFound,
|
||||
objects.Instance.get_by_uuid, cctxt, uuid)
|
||||
|
||||
def test_scatter_gather_cells(self):
|
||||
self._create_cell_mappings()
|
||||
|
||||
# Create an instance in cell0
|
||||
with context.target_cell(self.context, self.mapping0) as cctxt:
|
||||
instance = objects.Instance(context=cctxt, uuid=uuids.instance0,
|
||||
project_id='fake-project')
|
||||
instance.create()
|
||||
|
||||
# Create an instance in first cell
|
||||
with context.target_cell(self.context, self.mapping1) as cctxt:
|
||||
instance = objects.Instance(context=cctxt, uuid=uuids.instance1,
|
||||
project_id='fake-project')
|
||||
instance.create()
|
||||
|
||||
# Create an instance in second cell
|
||||
with context.target_cell(self.context, self.mapping2) as cctxt:
|
||||
instance = objects.Instance(context=cctxt, uuid=uuids.instance2,
|
||||
project_id='fake-project')
|
||||
instance.create()
|
||||
|
||||
filters = {'deleted': False, 'project_id': 'fake-project'}
|
||||
results = context.scatter_gather_all_cells(
|
||||
self.context, objects.InstanceList.get_by_filters, filters,
|
||||
sort_dir='asc')
|
||||
instances = objects.InstanceList()
|
||||
for result in results.values():
|
||||
instances = instances + result
|
||||
|
||||
# Should have 3 instances across cells
|
||||
self.assertEqual(3, len(instances))
|
||||
|
||||
# Verify we skip cell0 when specified
|
||||
results = context.scatter_gather_skip_cell0(
|
||||
self.context, objects.InstanceList.get_by_filters, filters)
|
||||
instances = objects.InstanceList()
|
||||
for result in results.values():
|
||||
instances = instances + result
|
||||
|
||||
# Should have gotten only the instances from the last two cells
|
||||
self.assertEqual(2, len(instances))
|
||||
self.assertIn(self.mapping1.uuid, results)
|
||||
self.assertIn(self.mapping2.uuid, results)
|
||||
instance_uuids = [inst.uuid for inst in instances]
|
||||
self.assertIn(uuids.instance1, instance_uuids)
|
||||
self.assertIn(uuids.instance2, instance_uuids)
|
||||
|
||||
# Try passing one cell
|
||||
results = context.scatter_gather_cells(
|
||||
self.context, [self.mapping1], 60,
|
||||
objects.InstanceList.get_by_filters, filters)
|
||||
instances = objects.InstanceList()
|
||||
for result in results.values():
|
||||
instances = instances + result
|
||||
|
||||
# Should have gotten only one instance from cell1
|
||||
self.assertEqual(1, len(instances))
|
||||
self.assertIn(self.mapping1.uuid, results)
|
||||
self.assertEqual(uuids.instance1, instances[0].uuid)
|
||||
|
||||
@@ -20,6 +20,7 @@ from nova import context
|
||||
from nova import exception
|
||||
from nova import objects
|
||||
from nova import test
|
||||
from nova.tests import fixtures as nova_fixtures
|
||||
from nova.tests import uuidsentinel as uuids
|
||||
|
||||
|
||||
@@ -302,3 +303,121 @@ class ContextTestCase(test.NoDBTestCase):
|
||||
self.assertEqual(mock.sentinel.mq_conn_obj, cctxt.mq_connection)
|
||||
mock_create_cm.assert_not_called()
|
||||
mock_create_tport.assert_not_called()
|
||||
|
||||
@mock.patch('nova.context.target_cell')
|
||||
@mock.patch('nova.objects.InstanceList.get_by_filters')
|
||||
def test_scatter_gather_cells(self, mock_get_inst, mock_target_cell):
|
||||
self.useFixture(nova_fixtures.SpawnIsSynchronousFixture())
|
||||
ctxt = context.get_context()
|
||||
mapping = objects.CellMapping(database_connection='fake://db',
|
||||
transport_url='fake://mq',
|
||||
uuid=uuids.cell)
|
||||
mappings = objects.CellMappingList(objects=[mapping])
|
||||
|
||||
filters = {'deleted': False}
|
||||
context.scatter_gather_cells(
|
||||
ctxt, mappings, 60, objects.InstanceList.get_by_filters, filters,
|
||||
sort_dir='foo')
|
||||
|
||||
mock_get_inst.assert_called_once_with(
|
||||
mock_target_cell.return_value.__enter__.return_value, filters,
|
||||
sort_dir='foo')
|
||||
|
||||
@mock.patch('nova.context.LOG.warning')
|
||||
@mock.patch('eventlet.timeout.Timeout')
|
||||
@mock.patch('eventlet.queue.LightQueue.get')
|
||||
@mock.patch('nova.objects.InstanceList.get_by_filters')
|
||||
def test_scatter_gather_cells_timeout(self, mock_get_inst,
|
||||
mock_get_result, mock_timeout,
|
||||
mock_log_warning):
|
||||
# This is needed because we're mocking get_by_filters.
|
||||
self.useFixture(nova_fixtures.SpawnIsSynchronousFixture())
|
||||
ctxt = context.get_context()
|
||||
mapping0 = objects.CellMapping(database_connection='fake://db0',
|
||||
transport_url='none:///',
|
||||
uuid=objects.CellMapping.CELL0_UUID)
|
||||
mapping1 = objects.CellMapping(database_connection='fake://db1',
|
||||
transport_url='fake://mq1',
|
||||
uuid=uuids.cell1)
|
||||
mappings = objects.CellMappingList(objects=[mapping0, mapping1])
|
||||
|
||||
# Simulate cell1 not responding.
|
||||
mock_get_result.side_effect = [(mapping0.uuid,
|
||||
mock.sentinel.instances),
|
||||
exception.CellTimeout()]
|
||||
|
||||
results = context.scatter_gather_cells(
|
||||
ctxt, mappings, 30, objects.InstanceList.get_by_filters)
|
||||
self.assertEqual(2, len(results))
|
||||
self.assertIn(mock.sentinel.instances, results.values())
|
||||
self.assertIn(context.did_not_respond_sentinel, results.values())
|
||||
mock_timeout.assert_called_once_with(30, exception.CellTimeout)
|
||||
self.assertTrue(mock_log_warning.called)
|
||||
|
||||
@mock.patch('nova.context.LOG.exception')
|
||||
@mock.patch('nova.objects.InstanceList.get_by_filters')
|
||||
def test_scatter_gather_cells_exception(self, mock_get_inst,
|
||||
mock_log_exception):
|
||||
# This is needed because we're mocking get_by_filters.
|
||||
self.useFixture(nova_fixtures.SpawnIsSynchronousFixture())
|
||||
ctxt = context.get_context()
|
||||
mapping0 = objects.CellMapping(database_connection='fake://db0',
|
||||
transport_url='none:///',
|
||||
uuid=objects.CellMapping.CELL0_UUID)
|
||||
mapping1 = objects.CellMapping(database_connection='fake://db1',
|
||||
transport_url='fake://mq1',
|
||||
uuid=uuids.cell1)
|
||||
mappings = objects.CellMappingList(objects=[mapping0, mapping1])
|
||||
|
||||
# Simulate cell1 raising an exception.
|
||||
mock_get_inst.side_effect = [mock.sentinel.instances,
|
||||
test.TestingException()]
|
||||
|
||||
results = context.scatter_gather_cells(
|
||||
ctxt, mappings, 30, objects.InstanceList.get_by_filters)
|
||||
self.assertEqual(2, len(results))
|
||||
self.assertIn(mock.sentinel.instances, results.values())
|
||||
self.assertIn(context.raised_exception_sentinel, results.values())
|
||||
self.assertTrue(mock_log_exception.called)
|
||||
|
||||
@mock.patch('nova.context.scatter_gather_cells')
|
||||
@mock.patch('nova.objects.CellMappingList.get_all')
|
||||
def test_scatter_gather_all_cells(self, mock_get_all, mock_scatter):
|
||||
ctxt = context.get_context()
|
||||
mapping0 = objects.CellMapping(database_connection='fake://db0',
|
||||
transport_url='none:///',
|
||||
uuid=objects.CellMapping.CELL0_UUID)
|
||||
mapping1 = objects.CellMapping(database_connection='fake://db1',
|
||||
transport_url='fake://mq1',
|
||||
uuid=uuids.cell1)
|
||||
mock_get_all.return_value = objects.CellMappingList(
|
||||
objects=[mapping0, mapping1])
|
||||
|
||||
filters = {'deleted': False}
|
||||
context.scatter_gather_all_cells(
|
||||
ctxt, objects.InstanceList.get_by_filters, filters, sort_dir='foo')
|
||||
|
||||
mock_scatter.assert_called_once_with(
|
||||
ctxt, mock_get_all.return_value, 60,
|
||||
objects.InstanceList.get_by_filters, filters, sort_dir='foo')
|
||||
|
||||
@mock.patch('nova.context.scatter_gather_cells')
|
||||
@mock.patch('nova.objects.CellMappingList.get_all')
|
||||
def test_scatter_gather_skip_cell0(self, mock_get_all, mock_scatter):
|
||||
ctxt = context.get_context()
|
||||
mapping0 = objects.CellMapping(database_connection='fake://db0',
|
||||
transport_url='none:///',
|
||||
uuid=objects.CellMapping.CELL0_UUID)
|
||||
mapping1 = objects.CellMapping(database_connection='fake://db1',
|
||||
transport_url='fake://mq1',
|
||||
uuid=uuids.cell1)
|
||||
mock_get_all.return_value = objects.CellMappingList(
|
||||
objects=[mapping0, mapping1])
|
||||
|
||||
filters = {'deleted': False}
|
||||
context.scatter_gather_skip_cell0(
|
||||
ctxt, objects.InstanceList.get_by_filters, filters, sort_dir='foo')
|
||||
|
||||
mock_scatter.assert_called_once_with(
|
||||
ctxt, [mapping1], 60, objects.InstanceList.get_by_filters, filters,
|
||||
sort_dir='foo')
|
||||
|
||||
Reference in New Issue
Block a user