Add aggregate_host_add and _delete to conductor.

Offload the db writes for adding and removing a host to or from
a host aggregate to the conductor service.

Part of blueprint no-db-compute.

Change-Id: I7c3da2d5c9fc6d7e72fb9371b7ea6a2b10c5218d
This commit is contained in:
Russell Bryant
2012-11-28 15:13:47 -05:00
parent 91fc376872
commit 93b8204767
10 changed files with 87 additions and 13 deletions

View File

@@ -3241,9 +3241,10 @@ class ComputeManager(manager.SchedulerDependentManager):
slave_info=slave_info)
except exception.AggregateError:
with excutils.save_and_reraise_exception():
self.driver.undo_aggregate_operation(context,
self.db.aggregate_host_delete,
aggregate['id'], host)
self.driver.undo_aggregate_operation(
context,
self.conductor_api.aggregate_host_delete,
aggregate, host)
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
def remove_aggregate_host(self, context, host, slave_info=None,
@@ -3259,8 +3260,9 @@ class ComputeManager(manager.SchedulerDependentManager):
exception.InvalidAggregateAction) as e:
with excutils.save_and_reraise_exception():
self.driver.undo_aggregate_operation(
context, self.db.aggregate_host_add,
aggregate['id'], host,
context,
self.conductor_api.aggregate_host_add,
aggregate, host,
isinstance(e, exception.AggregateError))
@manager.periodic_task(

View File

@@ -56,6 +56,12 @@ class LocalAPI(object):
def migration_update(self, context, migration, status):
return self._manager.migration_update(context, migration, status)
def aggregate_host_add(self, context, aggregate, host):
return self._manager.aggregate_host_add(context, aggregate, host)
def aggregate_host_delete(self, context, aggregate, host):
return self._manager.aggregate_host_delete(context, aggregate, host)
class API(object):
"""Conductor API that does updates via RPC to the ConductorManager"""
@@ -78,3 +84,11 @@ class API(object):
def migration_update(self, context, migration, status):
return self.conductor_rpcapi.migration_update(context, migration,
status)
def aggregate_host_add(self, context, aggregate, host):
return self.conductor_rpcapi.aggregate_host_add(context, aggregate,
host)
def aggregate_host_delete(self, context, aggregate, host):
return self.conductor_rpcapi.aggregate_host_delete(context, aggregate,
host)

View File

@@ -41,7 +41,7 @@ datetime_fields = ['launched_at', 'terminated_at']
class ConductorManager(manager.SchedulerDependentManager):
"""Mission: TBD"""
RPC_API_VERSION = '1.2'
RPC_API_VERSION = '1.3'
def __init__(self, *args, **kwargs):
super(ConductorManager, self).__init__(service_name='conductor',
@@ -74,3 +74,13 @@ class ConductorManager(manager.SchedulerDependentManager):
migration['id'],
{'status': status})
return jsonutils.to_primitive(migration_ref)
def aggregate_host_add(self, context, aggregate, host):
host_ref = self.db.aggregate_host_add(context.elevated(),
aggregate['id'], host)
return jsonutils.to_primitive(host_ref)
def aggregate_host_delete(self, context, aggregate, host):
self.db.aggregate_host_delete(context.elevated(),
aggregate['id'], host)

View File

@@ -29,6 +29,7 @@ class ConductorAPI(nova.openstack.common.rpc.proxy.RpcProxy):
1.0 - Initial version.
1.1 - Added migration_update
1.2 - Added instance_get_by_uuid and instance_get_all_by_host
1.3 - Added aggregate_host_add and aggregate_host_delete
"""
BASE_RPC_API_VERSION = '1.0'
@@ -59,3 +60,15 @@ class ConductorAPI(nova.openstack.common.rpc.proxy.RpcProxy):
msg = self.make_msg('migration_update', migration=migration_p,
status=status)
return self.call(context, msg, version='1.1')
def aggregate_host_add(self, context, aggregate, host):
aggregate_p = jsonutils.to_primitive(aggregate)
msg = self.make_msg('aggregate_host_add', aggregate=aggregate_p,
host=host)
return self.call(context, msg, version='1.3')
def aggregate_host_delete(self, context, aggregate, host):
aggregate_p = jsonutils.to_primitive(aggregate)
msg = self.make_msg('aggregate_host_delete', aggregate=aggregate_p,
host=host)
return self.call(context, msg, version='1.3')

View File

@@ -113,6 +113,39 @@ class ConductorTestCase(BaseTestCase):
self.assertEqual(orig_instance['name'],
all_instances[0]['name'])
def _setup_aggregate_with_host(self):
aggregate_ref = db.aggregate_create(self.context.elevated(),
{'name': 'foo', 'availability_zone': 'foo'})
self.conductor.aggregate_host_add(self.context, aggregate_ref, 'bar')
aggregate_ref = db.aggregate_get(self.context.elevated(),
aggregate_ref['id'])
return aggregate_ref
def test_aggregate_host_add(self):
aggregate_ref = self._setup_aggregate_with_host()
self.assertTrue(any([host == 'bar'
for host in aggregate_ref['hosts']]))
db.aggregate_delete(self.context.elevated(), aggregate_ref['id'])
def test_aggregate_host_delete(self):
aggregate_ref = self._setup_aggregate_with_host()
self.conductor.aggregate_host_delete(self.context, aggregate_ref,
'bar')
aggregate_ref = db.aggregate_get(self.context.elevated(),
aggregate_ref['id'])
self.assertFalse(any([host == 'bar'
for host in aggregate_ref['hosts']]))
db.aggregate_delete(self.context.elevated(), aggregate_ref['id'])
class ConductorRPCAPITestCase(ConductorTestCase):
"""Conductor RPC API Tests"""

View File

@@ -2096,6 +2096,7 @@ class XenAPIAggregateTestCase(stubs.XenAPITestBase):
host='host',
compute_driver='xenapi.XenAPIDriver',
node_availability_zone='avail_zone1')
self.flags(use_local=True, group='conductor')
host_ref = xenapi_fake.get_all('host')[0]
stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
self.context = context.get_admin_context()

View File

@@ -734,7 +734,7 @@ class ComputeDriver(object):
"""Remove a compute host from an aggregate."""
raise NotImplementedError()
def undo_aggregate_operation(self, context, op, aggregate_id,
def undo_aggregate_operation(self, context, op, aggregate,
host, set_error=True):
"""Undo for Resource Pools"""
raise NotImplementedError()

View File

@@ -3029,7 +3029,7 @@ class LibvirtDriver(driver.ComputeDriver):
"""Remove a compute host from an aggregate."""
pass
def undo_aggregate_operation(self, context, op, aggregate_id,
def undo_aggregate_operation(self, context, op, aggregate,
host, set_error=True):
"""only used for Resource Pools"""
pass

View File

@@ -593,11 +593,11 @@ class XenAPIDriver(driver.ComputeDriver):
return self._pool.remove_from_aggregate(context,
aggregate, host, **kwargs)
def undo_aggregate_operation(self, context, op, aggregate_id,
def undo_aggregate_operation(self, context, op, aggregate,
host, set_error=True):
"""Undo aggregate operation when pool error raised"""
return self._pool.undo_aggregate_operation(context, op,
aggregate_id, host, set_error)
aggregate, host, set_error)
def legacy_nwinfo(self):
"""

View File

@@ -64,16 +64,17 @@ class ResourcePool(object):
def _get_metadata(self, context, aggregate_id):
return self._virtapi.aggregate_metadata_get(context, aggregate_id)
def undo_aggregate_operation(self, context, op, aggregate_id,
def undo_aggregate_operation(self, context, op, aggregate,
host, set_error):
"""Undo aggregate operation when pool error raised"""
try:
if set_error:
metadata = {pool_states.KEY: pool_states.ERROR}
self._virtapi.aggregate_metadata_add(context, aggregate_id,
self._virtapi.aggregate_metadata_add(context, aggregate['id'],
metadata)
op(context, aggregate_id, host)
op(context, aggregate, host)
except Exception:
aggregate_id = aggregate['id']
LOG.exception(_('Aggregate %(aggregate_id)s: unrecoverable state '
'during operation on %(host)s') % locals())