blueprint host-aggregates: improvements and clean-up

This changeset addresses a number of issues found during testing:

- avoid name conflicts during aggregate creation (see db/* changes)
- avoid masking of XenAPI.Failure if pool-join fails (see plugins/* changes)
- preserve VM placement decisions made during scheduling (see xenapi/vmops.py)
- ensure plugins are called on the right hosts in XS pools (see xenapi_con.py)
- stores master uuid in aggregate metadata for use in VM live migration and
  raise InvalidAction rather than Aggregate error if we attempt to remove
  a mster (see xenapi/pool.py and compute/manager.py)
- clean-up of unit tests

Change-Id: I881a94d87efe1e81bd4f86667e75f5cbee50ce91
This commit is contained in:
Armando Migliaccio 2012-02-15 21:17:06 +00:00
parent ae99fe81f5
commit 424de7eea2
10 changed files with 119 additions and 37 deletions

View File

@ -2341,16 +2341,21 @@ class ComputeManager(manager.SchedulerDependentManager):
try:
self.driver.remove_from_aggregate(context,
aggregate, host, **kwargs)
except exception.AggregateError:
except (exception.AggregateError,
exception.InvalidAggregateAction) as e:
error = sys.exc_info()
self._undo_aggregate_operation(context, self.db.aggregate_host_add,
aggregate.id, host)
self._undo_aggregate_operation(
context, self.db.aggregate_host_add,
aggregate.id, host,
isinstance(e, exception.AggregateError))
raise error[0], error[1], error[2]
def _undo_aggregate_operation(self, context, op, aggregate_id, host):
def _undo_aggregate_operation(self, context, op, aggregate_id,
host, set_error=True):
try:
status = {'operational_state': aggregate_states.ERROR}
self.db.aggregate_update(context, aggregate_id, status)
if set_error:
status = {'operational_state': aggregate_states.ERROR}
self.db.aggregate_update(context, aggregate_id, status)
op(context, aggregate_id, host)
except Exception:
LOG.exception(_('Aggregate %(aggregate_id)s: unrecoverable state '

View File

@ -1793,6 +1793,11 @@ def aggregate_get(context, aggregate_id, read_deleted='no'):
return IMPL.aggregate_get(context, aggregate_id, read_deleted)
def aggregate_get_by_host(context, host, read_deleted='no'):
"""Get a specific aggregate by host"""
return IMPL.aggregate_get_by_host(context, host, read_deleted)
def aggregate_update(context, aggregate_id, values):
"""Update the attributes of an aggregates. If values contains a metadata
key, it updates the aggregate metadata too."""

View File

@ -4271,12 +4271,24 @@ def _aggregate_get_query(context, model_class, id_field, id,
@require_admin_context
def aggregate_create(context, values, metadata=None):
try:
session = get_session()
aggregate = _aggregate_get_query(context,
models.Aggregate,
models.Aggregate.name,
values['name'],
session=session,
read_deleted='yes').first()
if not aggregate:
aggregate = models.Aggregate()
values.setdefault('operational_state', aggregate_states.CREATED)
aggregate.update(values)
aggregate.save()
except exception.DBError:
aggregate.save(session=session)
elif aggregate.deleted:
aggregate.update({'deleted': False,
'deleted_at': None,
'availability_zone': values['availability_zone']})
aggregate.save(session=session)
else:
raise exception.AggregateNameExists(aggregate_name=values['name'])
if metadata:
aggregate_metadata_add(context, aggregate.id, metadata)
@ -4296,6 +4308,20 @@ def aggregate_get(context, aggregate_id, read_deleted='no'):
return aggregate
@require_admin_context
def aggregate_get_by_host(context, host, read_deleted='no'):
aggregate_host = _aggregate_get_query(context,
models.AggregateHost,
models.AggregateHost.host,
host,
read_deleted='no').first()
if not aggregate_host:
raise exception.AggregateHostNotFound(host=host)
return aggregate_get(context, aggregate_host.aggregate_id, read_deleted)
@require_admin_context
def aggregate_update(context, aggregate_id, values):
session = get_session()

View File

@ -338,6 +338,15 @@ class AggregateDBApiTestCase(test.TestCase):
result = _create_aggregate(metadata=None)
self.assertEqual(result['operational_state'], 'created')
def test_aggregate_create_avoid_name_conflict(self):
"""Test we can avoid conflict on deleted aggregates."""
r1 = _create_aggregate(metadata=None)
db.aggregate_delete(context.get_admin_context(), r1.id)
values = {'name': r1.name, 'availability_zone': 'new_zone'}
r2 = _create_aggregate(values=values)
self.assertEqual(r2.name, values['name'])
self.assertEqual(r2.availability_zone, values['availability_zone'])
def test_aggregate_create_raise_exist_exc(self):
"""Ensure aggregate names are distinct."""
_create_aggregate(metadata=None)
@ -383,6 +392,20 @@ class AggregateDBApiTestCase(test.TestCase):
self.assertEqual(_get_fake_aggr_hosts(), expected.hosts)
self.assertEqual(_get_fake_aggr_metadata(), expected.metadetails)
def test_aggregate_get_by_host(self):
"""Ensure we can get an aggregate by host."""
ctxt = context.get_admin_context()
r1 = _create_aggregate_with_hosts(context=ctxt)
r2 = db.aggregate_get_by_host(ctxt, 'foo.openstack.org')
self.assertEqual(r1.id, r2.id)
def test_aggregate_get_by_host_not_found(self):
"""Ensure AggregateHostNotFound is raised with unknown host."""
ctxt = context.get_admin_context()
_create_aggregate_with_hosts(context=ctxt)
self.assertRaises(exception.AggregateHostNotFound,
db.aggregate_get_by_host, ctxt, 'unknown_host')
def test_aggregate_delete_raise_not_found(self):
"""Ensure AggregateNotFound is raised when deleting an aggregate."""
ctxt = context.get_admin_context()

View File

@ -1757,10 +1757,13 @@ class XenAPIAggregateTestCase(test.TestCase):
'Dom0IptablesFirewallDriver',
host='host')
xenapi_fake.reset()
host_ref = xenapi_fake.get_all('host')[0]
stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
self.context = context.get_admin_context()
self.conn = xenapi_conn.get_connection(False)
self.fake_metadata = {'master_compute': 'host'}
self.fake_metadata = {'master_compute': 'host',
'host': xenapi_fake.get_record('host',
host_ref)['uuid']}
def tearDown(self):
super(XenAPIAggregateTestCase, self).tearDown()
@ -1871,7 +1874,7 @@ class XenAPIAggregateTestCase(test.TestCase):
aggregate = self._aggregate_setup(aggr_state=aggregate_states.ACTIVE,
hosts=['host', 'host2'],
metadata=self.fake_metadata)
self.assertRaises(exception.AggregateError,
self.assertRaises(exception.InvalidAggregateAction,
self.conn._pool.remove_from_aggregate,
self.context, aggregate, "host")

View File

@ -202,6 +202,12 @@ class FakeSessionForVMTests(fake.SessionBase):
vm['is_a_template'] = False
vm['is_control_domain'] = False
vm['domid'] = random.randrange(1, 1 << 16)
return vm
def VM_start_on(self, _1, vm_ref, host_ref, _2, _3):
vm_rec = self.VM_start(_1, vm_ref, _2, _3)
host_rec = fake.get_record('host', host_ref)
vm_rec['resident_on'] = host_rec['uuid']
def VM_snapshot(self, session_ref, vm_ref, label):
status = "Running"
@ -334,7 +340,7 @@ class FakeSessionForVolumeFailedTests(FakeSessionForVolumeTests):
pass
class FakeSessionForMigrationTests(fake.SessionBase):
class FakeSessionForMigrationTests(FakeSessionForVMTests):
"""Stubs out a XenAPISession for Migration tests"""
def __init__(self, uri):
super(FakeSessionForMigrationTests, self).__init__(uri)
@ -342,16 +348,6 @@ class FakeSessionForMigrationTests(fake.SessionBase):
def VDI_get_by_uuid(self, *args):
return 'hurr'
def VM_start(self, _1, ref, _2, _3):
vm = fake.get_record('VM', ref)
if vm['power_state'] != 'Halted':
raise fake.Failure(['VM_BAD_POWER_STATE', ref, 'Halted',
vm['power_state']])
vm['power_state'] = 'Running'
vm['is_a_template'] = False
vm['is_control_domain'] = False
vm['domid'] = random.randrange(1, 1 << 16)
def VM_set_name_label(self, *args):
pass

View File

@ -61,11 +61,11 @@ class ResourcePool(object):
if len(aggregate.hosts) == 1:
# this is the first host of the pool -> make it master
self._init_pool(aggregate.id, aggregate.name)
# save metadata so that we can find the master again:
# the password should be encrypted, really.
# save metadata so that we can find the master again
values = {
'operational_state': aggregate_states.ACTIVE,
'metadata': {'master_compute': host},
'metadata': {'master_compute': host,
host: self._host_uuid},
}
db.aggregate_update(context, aggregate.id, values)
else:
@ -85,7 +85,6 @@ class ResourcePool(object):
elif master_compute and master_compute != host:
# send rpc cast to master, asking to add the following
# host with specified credentials.
# NOTE: password in clear is not great, but it'll do for now
forward_request(context, "add_aggregate_host", master_compute,
aggregate.id, host,
self._host_addr, self._host_uuid)
@ -104,15 +103,17 @@ class ResourcePool(object):
# master is on its own, otherwise raise fault. Destroying a
# pool made only by master is fictional
if len(aggregate.hosts) > 1:
raise exception.AggregateError(
# NOTE: this could be avoided by doing a master
# re-election, but this is simpler for now.
raise exception.InvalidAggregateAction(
aggregate_id=aggregate.id,
action='remove_from_aggregate',
reason=_('Unable to eject %(host)s '
'from the pool; pool not empty')
% locals())
self._clear_pool(aggregate.id)
db.aggregate_metadata_delete(context,
aggregate.id, 'master_compute')
for key in ['master_compute', host]:
db.aggregate_metadata_delete(context, aggregate.id, key)
elif master_compute and master_compute != host:
# A master exists -> forward pool-eject request to master
forward_request(context, "remove_aggregate_host", master_compute,
@ -194,6 +195,7 @@ def forward_request(context, request_type, master, aggregate_id,
"""Casts add/remove requests to the pool master."""
# replace the address from the xenapi connection url
# because this might be 169.254.0.1, i.e. xenapi
# NOTE: password in clear is not great, but it'll do for now
sender_url = swap_xapi_host(FLAGS.xenapi_connection_url, slave_address)
rpc.cast(context, db.queue_get_for(context, FLAGS.compute_topic, master),
{"method": request_type,

View File

@ -181,7 +181,9 @@ class VMOps(object):
raise Exception(_('Attempted to power on non-existent instance'
' bad instance id %s') % instance.id)
LOG.debug(_("Starting instance %s"), instance.name)
self._session.call_xenapi('VM.start', vm_ref, False, False)
self._session.call_xenapi('VM.start_on', vm_ref,
self._session.get_xenapi_host(),
False, False)
def _create_disks(self, context, instance, image_meta):
disk_image_type = VMHelper.determine_disk_image_type(image_meta)

View File

@ -506,8 +506,10 @@ class XenAPISession(object):
def __init__(self, url, user, pw):
self.XenAPI = self.get_imported_xenapi()
self._sessions = queue.Queue()
self.host_uuid = None
exception = self.XenAPI.Failure(_("Unable to log in to XenAPI "
"(is the Dom0 disk full?)"))
"(is the Dom0 disk full?)"))
is_slave = False
for i in xrange(FLAGS.xenapi_connection_concurrent):
try:
session = self._create_session(url)
@ -520,10 +522,21 @@ class XenAPISession(object):
session = self.XenAPI.Session(pool.swap_xapi_host(url,
master))
session.login_with_password(user, pw)
is_slave = True
else:
raise
self._sessions.put(session)
if is_slave:
try:
aggr = db.aggregate_get_by_host(context.get_admin_context(),
FLAGS.host)
self.host_uuid = aggr.metadetails[FLAGS.host]
except exception.AggregateHostNotFound:
LOG.exception(_('Host is member of a pool, but DB '
'says otherwise'))
raise
def get_product_version(self):
"""Return a tuple of (major, minor, rev) for the host version"""
host = self.get_xenapi_host()
@ -551,9 +564,12 @@ class XenAPISession(object):
self._sessions.put(session)
def get_xenapi_host(self):
"""Return the xenapi host"""
"""Return the xenapi host on which nova-compute runs on."""
with self._get_session() as session:
return session.xenapi.session.get_this_host(session.handle)
if self.host_uuid:
return session.xenapi.host.get_by_uuid(self.host_uuid)
else:
return session.xenapi.session.get_this_host(session.handle)
def call_xenapi(self, method, *args):
"""Call the specified XenAPI method on a background thread."""
@ -578,6 +594,10 @@ class XenAPISession(object):
# _get_session() acquires a session too, it can result in a deadlock
# if multiple greenthreads race with each other. See bug 924918
host = self.get_xenapi_host()
# NOTE(armando): pass the host uuid along with the args so that
# the plugin gets executed on the right host when using XS pools
if self.host_uuid:
args['host_uuid'] = self.host_uuid
with self._get_session() as session:
return tpool.execute(self._unwrap_plugin_exceptions,
session.xenapi.Async.host.call_plugin,

View File

@ -117,9 +117,9 @@ def _resume_compute(session, compute_ref, compute_uuid):
logging.exception('Waited %d seconds for the slave to '
'become available.' % (c * DEFAULT_SLEEP))
time.sleep(DEFAULT_SLEEP)
raise pluginlib.PluginError('Unrecoverable error: the host has '
'not come back for more than %d seconds'
% (DEFAULT_SLEEP * (DEFAULT_TRIES + 1)))
raise pluginlib.PluginError('Unrecoverable error: the host has '
'not come back for more than %d seconds'
% (DEFAULT_SLEEP * (DEFAULT_TRIES + 1)))
def _get_host_uuid():
@ -315,7 +315,7 @@ def host_data(self, arg_dict):
"""Runs the commands on the xenstore host to return the current status
information.
"""
host_uuid = _get_host_uuid()
host_uuid = arg_dict.get('host_uuid', _get_host_uuid())
cmd = "xe host-param-list uuid=%s" % host_uuid
resp = _run_command(cmd)
parsed_data = parse_response(resp)