Merge "blueprint host-aggregates: xenapi implementation"

This commit is contained in:
Jenkins 2012-02-22 03:22:30 +00:00 committed by Gerrit Code Review
commit c4ff7ef07c
15 changed files with 749 additions and 112 deletions

View File

@ -63,17 +63,18 @@ class AggregateController(object):
raise exc.HTTPBadRequest
try:
host_aggregate = body["aggregate"]
aggregate_name = host_aggregate["name"]
availability_zone = host_aggregate["availability_zone"]
name = host_aggregate["name"]
avail_zone = host_aggregate["availability_zone"]
except KeyError:
raise exc.HTTPBadRequest
if len(host_aggregate) != 2:
raise exc.HTTPBadRequest
try:
aggregate = self.api.create_aggregate(context, aggregate_name,
availability_zone)
aggregate = self.api.create_aggregate(context, name, avail_zone)
except exception.AggregateNameExists:
LOG.exception(_("Cannot create aggregate with name %(name)s and "
"availability zone %(avail_zone)s") % locals())
raise exc.HTTPConflict
return self._marshall_aggregate(aggregate)
@ -84,6 +85,7 @@ class AggregateController(object):
try:
aggregate = self.api.get_aggregate(context, id)
except exception.AggregateNotFound:
LOG.exception(_("Cannot show aggregate: %(id)s") % locals())
raise exc.HTTPNotFound
return self._marshall_aggregate(aggregate)
@ -91,7 +93,6 @@ class AggregateController(object):
"""Updates the name and/or availbility_zone of given aggregate."""
context = _get_context(req)
authorize(context)
aggregate = id
if len(body) != 1:
raise exc.HTTPBadRequest
@ -108,8 +109,9 @@ class AggregateController(object):
raise exc.HTTPBadRequest
try:
aggregate = self.api.update_aggregate(context, aggregate, updates)
aggregate = self.api.update_aggregate(context, id, updates)
except exception.AggregateNotFound:
LOG.exception(_("Cannot update aggregate: %(id)s") % locals())
raise exc.HTTPNotFound
return self._marshall_aggregate(aggregate)
@ -118,10 +120,10 @@ class AggregateController(object):
"""Removes an aggregate by id."""
context = _get_context(req)
authorize(context)
aggregate_id = id
try:
self.api.delete_aggregate(context, aggregate_id)
self.api.delete_aggregate(context, id)
except exception.AggregateNotFound:
LOG.exception(_("Cannot delete aggregate: %(id)s") % locals())
raise exc.HTTPNotFound
def action(self, req, id, body):
@ -144,19 +146,17 @@ class AggregateController(object):
"""Adds a host to the specified aggregate."""
context = _get_context(req)
authorize(context)
aggregate = id
try:
aggregate = self.api.add_host_to_aggregate(context,
aggregate, host)
except exception.AggregateNotFound:
aggregate = self.api.add_host_to_aggregate(context, id, host)
except (exception.AggregateNotFound, exception.ComputeHostNotFound):
LOG.exception(_("Cannot add host %(host)s in aggregate "
"%(id)s") % locals())
raise exc.HTTPNotFound
except exception.ComputeHostNotFound:
raise exc.HTTPNotFound
except exception.AggregateHostConflict:
raise exc.HTTPConflict
except exception.AggregateHostExists:
raise exc.HTTPConflict
except exception.InvalidAggregateAction:
except (exception.AggregateHostConflict,
exception.AggregateHostExists,
exception.InvalidAggregateAction):
LOG.exception(_("Cannot add host %(host)s in aggregate "
"%(id)s") % locals())
raise exc.HTTPConflict
return self._marshall_aggregate(aggregate)
@ -165,15 +165,15 @@ class AggregateController(object):
"""Removes a host from the specified aggregate."""
context = _get_context(req)
authorize(context)
aggregate = id
try:
aggregate = self.api.remove_host_from_aggregate(context,
aggregate, host)
except exception.AggregateNotFound:
raise exc.HTTPNotFound
except exception.AggregateHostNotFound:
aggregate = self.api.remove_host_from_aggregate(context, id, host)
except (exception.AggregateNotFound, exception.AggregateHostNotFound):
LOG.exception(_("Cannot remove host %(host)s in aggregate "
"%(id)s") % locals())
raise exc.HTTPNotFound
except exception.InvalidAggregateAction:
LOG.exception(_("Cannot remove host %(host)s in aggregate "
"%(id)s") % locals())
raise exc.HTTPConflict
return self._marshall_aggregate(aggregate)
@ -181,20 +181,19 @@ class AggregateController(object):
"""Replaces the aggregate's existing metadata with new metadata."""
context = _get_context(req)
authorize(context)
aggregate = id
if len(body) != 1:
raise exc.HTTPBadRequest
try:
metadata = body["metadata"]
except KeyError:
raise exc.HTTPBadRequest
try:
aggregate = self.api.update_aggregate_metadata(context,
aggregate, metadata)
id, metadata)
except exception.AggregateNotFound:
LOG.exception(_("Cannot set metadata %(metadata)s in aggregate "
"%(id)s") % locals())
raise exc.HTTPNotFound
return self._marshall_aggregate(aggregate)

View File

@ -1706,10 +1706,18 @@ class AggregateAPI(base.Base):
def create_aggregate(self, context, aggregate_name, availability_zone):
"""Creates the model for the aggregate."""
values = {"name": aggregate_name,
"availability_zone": availability_zone}
aggregate = self.db.aggregate_create(context, values)
return dict(aggregate.iteritems())
zones = [s.availability_zone for s in
self.db.service_get_all_by_topic(context,
FLAGS.compute_topic)]
if availability_zone in zones:
values = {"name": aggregate_name,
"availability_zone": availability_zone}
aggregate = self.db.aggregate_create(context, values)
return dict(aggregate.iteritems())
else:
raise exception.InvalidAggregateAction(action='create_aggregate',
aggregate_id="'N/A'",
reason='invalid zone')
def get_aggregate(self, context, aggregate_id):
"""Get an aggregate by id."""
@ -1805,7 +1813,8 @@ class AggregateAPI(base.Base):
"host": host}, })
return self.get_aggregate(context, aggregate_id)
else:
invalid = {aggregate_states.CHANGING: 'setup in progress',
invalid = {aggregate_states.CREATED: 'no hosts to remove',
aggregate_states.CHANGING: 'setup in progress',
aggregate_states.DISMISSED: 'aggregate deleted', }
if aggregate.operational_state in invalid.keys():
raise exception.InvalidAggregateAction(

View File

@ -46,6 +46,7 @@ from eventlet import greenthread
from nova import block_device
import nova.context
from nova.compute import aggregate_states
from nova.compute import instance_types
from nova.compute import power_state
from nova.compute import task_states
@ -2320,13 +2321,40 @@ class ComputeManager(manager.SchedulerDependentManager):
LOG.error(msg % error)
self._set_instance_error_state(context, instance_uuid)
def add_aggregate_host(self, context, aggregate_id, host):
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
def add_aggregate_host(self, context, aggregate_id, host, **kwargs):
"""Adds a host to a physical hypervisor pool."""
raise NotImplementedError()
aggregate = self.db.aggregate_get(context, aggregate_id)
try:
self.driver.add_to_aggregate(context, aggregate, host, **kwargs)
except exception.AggregateError:
error = sys.exc_info()
self._undo_aggregate_operation(context,
self.db.aggregate_host_delete,
aggregate.id, host)
raise error[0], error[1], error[2]
def remove_aggregate_host(self, context, aggregate_id, host):
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
def remove_aggregate_host(self, context, aggregate_id, host, **kwargs):
"""Removes a host from a physical hypervisor pool."""
raise NotImplementedError()
aggregate = self.db.aggregate_get(context, aggregate_id)
try:
self.driver.remove_from_aggregate(context,
aggregate, host, **kwargs)
except exception.AggregateError:
error = sys.exc_info()
self._undo_aggregate_operation(context, self.db.aggregate_host_add,
aggregate.id, host)
raise error[0], error[1], error[2]
def _undo_aggregate_operation(self, context, op, aggregate_id, host):
try:
status = {'operational_state': aggregate_states.ERROR}
self.db.aggregate_update(context, aggregate_id, status)
op(context, aggregate_id, host)
except Exception:
LOG.exception(_('Aggregate %(aggregate_id)s: unrecoverable state '
'during operation on %(host)s') % locals())
@manager.periodic_task(
ticks_between_runs=FLAGS.image_cache_manager_interval)

View File

@ -4273,8 +4273,8 @@ def _aggregate_get_query(context, model_class, id_field, id,
def aggregate_create(context, values, metadata=None):
try:
aggregate = models.Aggregate()
values.setdefault('operational_state', aggregate_states.CREATED)
aggregate.update(values)
aggregate.operational_state = aggregate_states.CREATED
aggregate.save()
except exception.DBError:
raise exception.AggregateNameExists(aggregate_name=values['name'])
@ -4409,8 +4409,7 @@ def aggregate_metadata_add(context, aggregate_id, metadata, set_delete=False):
meta_ref = aggregate_metadata_get_item(context, aggregate_id,
meta_key, session)
if meta_ref.deleted:
item.update({'deleted': False, 'deleted_at': None,
'updated_at': literal_column('updated_at')})
item.update({'deleted': False, 'deleted_at': None})
except exception.AggregateMetadataNotFound:
meta_ref = models.AggregateMetadata()
item.update({"key": meta_key, "aggregate_id": aggregate_id})
@ -4469,9 +4468,7 @@ def aggregate_host_add(context, aggregate_id, host):
except exception.DBError:
raise exception.AggregateHostConflict(host=host)
elif host_ref.deleted:
host_ref.update({'deleted': False,
'deleted_at': None,
'updated_at': literal_column('updated_at')})
host_ref.update({'deleted': False, 'deleted_at': None})
host_ref.save(session=session)
else:
raise exception.AggregateHostExists(host=host,

View File

@ -885,26 +885,12 @@ class Zone(BASE, NovaBase):
rpc_virtual_host = Column(String(255))
class Aggregate(BASE, NovaBase):
"""Represents a cluster of hosts that exists in this zone."""
__tablename__ = 'aggregates'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(255), unique=True)
operational_state = Column(String(255), nullable=False)
availability_zone = Column(String(255), nullable=False)
class AggregateHost(BASE, NovaBase):
"""Represents a host that is member of an aggregate."""
__tablename__ = 'aggregate_hosts'
id = Column(Integer, primary_key=True, autoincrement=True)
host = Column(String(255), unique=True)
aggregate_id = Column(Integer, ForeignKey('aggregates.id'), nullable=False)
aggregate = relationship(Aggregate, backref=backref('aggregates'),
foreign_keys=aggregate_id,
primaryjoin='and_('
'AggregateHost.aggregate_id == Aggregate.id,'
'AggregateHost.deleted == False)')
class AggregateMetadata(BASE, NovaBase):
@ -914,11 +900,46 @@ class AggregateMetadata(BASE, NovaBase):
key = Column(String(255), nullable=False)
value = Column(String(255), nullable=False)
aggregate_id = Column(Integer, ForeignKey('aggregates.id'), nullable=False)
aggregate = relationship(Aggregate, backref="metadata",
foreign_keys=aggregate_id,
primaryjoin='and_('
'AggregateMetadata.aggregate_id == Aggregate.id,'
'AggregateMetadata.deleted == False)')
class Aggregate(BASE, NovaBase):
"""Represents a cluster of hosts that exists in this zone."""
__tablename__ = 'aggregates'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(255), unique=True)
operational_state = Column(String(255), nullable=False)
availability_zone = Column(String(255), nullable=False)
_hosts = relationship(AggregateHost,
secondary="aggregate_hosts",
primaryjoin='and_('
'Aggregate.id == AggregateHost.aggregate_id,'
'AggregateHost.deleted == False,'
'Aggregate.deleted == False)',
secondaryjoin='and_('
'AggregateHost.aggregate_id == Aggregate.id, '
'AggregateHost.deleted == False,'
'Aggregate.deleted == False)',
backref='aggregates')
_metadata = relationship(AggregateMetadata,
secondary="aggregate_metadata",
primaryjoin='and_('
'Aggregate.id == AggregateMetadata.aggregate_id,'
'AggregateMetadata.deleted == False,'
'Aggregate.deleted == False)',
secondaryjoin='and_('
'AggregateMetadata.aggregate_id == Aggregate.id, '
'AggregateMetadata.deleted == False,'
'Aggregate.deleted == False)',
backref='aggregates')
@property
def hosts(self):
return [h.host for h in self._hosts]
@property
def metadetails(self):
return dict([(m.key, m.value) for m in self._metadata])
class AgentBuild(BASE, NovaBase):
@ -997,15 +1018,41 @@ def register_models():
connection is lost and needs to be reestablished.
"""
from sqlalchemy import create_engine
models = (Service, Instance, InstanceActions, InstanceTypes,
Volume, IscsiTarget, FixedIp, FloatingIp,
Network, SecurityGroup, SecurityGroupIngressRule,
SecurityGroupInstanceAssociation, AuthToken, User,
Project, Certificate, ConsolePool, Console, Zone,
VolumeMetadata, VolumeTypes, VolumeTypeExtraSpecs,
AgentBuild, InstanceMetadata, InstanceTypeExtraSpecs, Migration,
VirtualStorageArray, SMFlavors, SMBackendConf, SMVolume,
InstanceFault)
models = (AgentBuild,
Aggregate,
AggregateHost,
AggregateMetadata,
AuthToken,
Certificate,
Console,
ConsolePool,
FixedIp,
FloatingIp,
Instance,
InstanceActions,
InstanceFault,
InstanceMetadata,
InstanceTypeExtraSpecs,
InstanceTypes,
IscsiTarget,
Migration,
Network,
Project,
SecurityGroup,
SecurityGroupIngressRule,
SecurityGroupInstanceAssociation,
Service,
SMBackendConf,
SMFlavors,
SMVolume,
User,
VirtualStorageArray,
Volume,
VolumeMetadata,
VolumeTypeExtraSpecs,
VolumeTypes,
Zone,
)
engine = create_engine(FLAGS.sql_connection, echo=False)
for model in models:
model.metadata.create_all(engine)

View File

@ -929,6 +929,11 @@ class QuotaError(NovaException):
message = _("Quota exceeded") + ": code=%(code)s"
class AggregateError(NovaException):
message = _("Aggregate %(aggregate_id)s: action '%(action)s' "
"caused an error: %(reason)s.")
class AggregateNotFound(NotFound):
message = _("Aggregate %(aggregate_id)s could not be found.")

View File

@ -3153,6 +3153,8 @@ def _create_service_entries(context, values={'avail_zone1': ['fake_host1',
class ComputeAPIAggrTestCase(test.TestCase):
"""This is for unit coverage of aggregate-related methods
defined in nova.compute.api."""
def setUp(self):
super(ComputeAPIAggrTestCase, self).setUp()
@ -3164,9 +3166,16 @@ class ComputeAPIAggrTestCase(test.TestCase):
def tearDown(self):
super(ComputeAPIAggrTestCase, self).tearDown()
def test_create_invalid_availability_zone(self):
"""Ensure InvalidAggregateAction is raised with wrong avail_zone."""
self.assertRaises(exception.InvalidAggregateAction,
self.api.create_aggregate,
self.context, 'fake_aggr', 'fake_avail_zone')
def test_update_aggregate_metadata(self):
_create_service_entries(self.context, {'fake_zone': ['fake_host']})
aggr = self.api.create_aggregate(self.context, 'fake_aggregate',
'fake_availability_zone')
'fake_zone')
metadata = {'foo_key1': 'foo_value1',
'foo_key2': 'foo_value2', }
aggr = self.api.update_aggregate_metadata(self.context, aggr['id'],
@ -3178,8 +3187,9 @@ class ComputeAPIAggrTestCase(test.TestCase):
def test_delete_aggregate(self):
"""Ensure we can delete an aggregate."""
_create_service_entries(self.context, {'fake_zone': ['fake_host']})
aggr = self.api.create_aggregate(self.context, 'fake_aggregate',
'fake_availability_zone')
'fake_zone')
self.api.delete_aggregate(self.context, aggr['id'])
expected = db.aggregate_get(self.context, aggr['id'],
read_deleted='yes')
@ -3188,10 +3198,10 @@ class ComputeAPIAggrTestCase(test.TestCase):
def test_delete_non_empty_aggregate(self):
"""Ensure InvalidAggregateAction is raised when non empty aggregate."""
aggr = self.api.create_aggregate(self.context, 'fake_aggregate',
'fake_availability_zone')
_create_service_entries(self.context,
{'fake_availability_zone': ['fake_host']})
aggr = self.api.create_aggregate(self.context, 'fake_aggregate',
'fake_availability_zone')
self.api.add_host_to_aggregate(self.context, aggr['id'], 'fake_host')
self.assertRaises(exception.InvalidAggregateAction,
self.api.delete_aggregate, self.context, aggr['id'])
@ -3242,9 +3252,9 @@ class ComputeAPIAggrTestCase(test.TestCase):
def test_add_host_to_aggregate_invalid_dismissed_status(self):
"""Ensure InvalidAggregateAction is raised when aggregate is
deleted."""
_create_service_entries(self.context, {'fake_zone': ['fake_host']})
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', 'fake_zone')
_create_service_entries(self.context, {'fake_zone': ['fake_host']})
# let's mock the fact that the aggregate is dismissed!
status = {'operational_state': aggregate_states.DISMISSED}
db.aggregate_update(self.context, aggr['id'], status)
@ -3255,9 +3265,9 @@ class ComputeAPIAggrTestCase(test.TestCase):
def test_add_host_to_aggregate_invalid_error_status(self):
"""Ensure InvalidAggregateAction is raised when aggregate is
in error."""
_create_service_entries(self.context, {'fake_zone': ['fake_host']})
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', 'fake_zone')
_create_service_entries(self.context, {'fake_zone': ['fake_host']})
# let's mock the fact that the aggregate is in error!
status = {'operational_state': aggregate_states.ERROR}
db.aggregate_update(self.context, aggr['id'], status)
@ -3267,17 +3277,19 @@ class ComputeAPIAggrTestCase(test.TestCase):
def test_add_host_to_aggregate_zones_mismatch(self):
"""Ensure InvalidAggregateAction is raised when zones don't match."""
_create_service_entries(self.context, {'fake_zoneX': ['fake_host']})
_create_service_entries(self.context, {'fake_zoneX': ['fake_host1'],
'fake_zoneY': ['fake_host2']})
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', 'fake_zoneY')
self.assertRaises(exception.InvalidAggregateAction,
self.api.add_host_to_aggregate,
self.context, aggr['id'], 'fake_host')
self.context, aggr['id'], 'fake_host1')
def test_add_host_to_aggregate_raise_not_found(self):
"""Ensure ComputeHostNotFound is raised when adding invalid host."""
_create_service_entries(self.context, {'fake_zone': ['fake_host']})
aggr = self.api.create_aggregate(self.context, 'fake_aggregate',
'fake_availability_zone')
'fake_zone')
self.assertRaises(exception.ComputeHostNotFound,
self.api.add_host_to_aggregate,
self.context, aggr['id'], 'invalid_host')
@ -3325,9 +3337,9 @@ class ComputeAPIAggrTestCase(test.TestCase):
def test_remove_host_from_aggregate_invalid_dismissed_status(self):
"""Ensure InvalidAggregateAction is raised when aggregate is
deleted."""
_create_service_entries(self.context, {'fake_zone': ['fake_host']})
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', 'fake_zone')
_create_service_entries(self.context, {'fake_zone': ['fake_host']})
# let's mock the fact that the aggregate is dismissed!
status = {'operational_state': aggregate_states.DISMISSED}
db.aggregate_update(self.context, aggr['id'], status)
@ -3338,9 +3350,9 @@ class ComputeAPIAggrTestCase(test.TestCase):
def test_remove_host_from_aggregate_invalid_changing_status(self):
"""Ensure InvalidAggregateAction is raised when aggregate is
changing."""
_create_service_entries(self.context, {'fake_zone': ['fake_host']})
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', 'fake_zone')
_create_service_entries(self.context, {'fake_zone': ['fake_host']})
# let's mock the fact that the aggregate is changing!
status = {'operational_state': aggregate_states.CHANGING}
db.aggregate_update(self.context, aggr['id'], status)
@ -3350,13 +3362,85 @@ class ComputeAPIAggrTestCase(test.TestCase):
def test_remove_host_from_aggregate_raise_not_found(self):
"""Ensure ComputeHostNotFound is raised when removing invalid host."""
_create_service_entries(self.context, {'fake_zone': ['fake_host']})
aggr = self.api.create_aggregate(self.context, 'fake_aggregate',
'fake_availability_zone')
'fake_zone')
self.assertRaises(exception.ComputeHostNotFound,
self.api.remove_host_from_aggregate,
self.context, aggr['id'], 'invalid_host')
class ComputeAggrTestCase(BaseTestCase):
"""This is for unit coverage of aggregate-related methods
defined in nova.compute.manager."""
def setUp(self):
super(ComputeAggrTestCase, self).setUp()
self.context = context.get_admin_context()
values = {'name': 'test_aggr',
'availability_zone': 'test_zone', }
self.aggr = db.aggregate_create(self.context, values)
def tearDown(self):
super(ComputeAggrTestCase, self).tearDown()
def test_add_aggregate_host(self):
def fake_driver_add_to_aggregate(context, aggregate, host):
fake_driver_add_to_aggregate.called = True
return {"foo": "bar"}
self.stubs.Set(self.compute.driver, "add_to_aggregate",
fake_driver_add_to_aggregate)
self.compute.add_aggregate_host(self.context, self.aggr.id, "host")
self.assertTrue(fake_driver_add_to_aggregate.called)
def test_add_aggregate_host_raise_err(self):
"""Ensure the undo operation works correctly on add."""
def fake_driver_add_to_aggregate(context, aggregate, host):
raise exception.AggregateError
self.stubs.Set(self.compute.driver, "add_to_aggregate",
fake_driver_add_to_aggregate)
state = {'operational_state': aggregate_states.ACTIVE}
db.aggregate_update(self.context, self.aggr.id, state)
db.aggregate_host_add(self.context, self.aggr.id, 'fake_host')
self.assertRaises(exception.AggregateError,
self.compute.add_aggregate_host,
self.context, self.aggr.id, "fake_host")
excepted = db.aggregate_get(self.context, self.aggr.id)
self.assertEqual(excepted.operational_state, aggregate_states.ERROR)
self.assertEqual(excepted.hosts, [])
def test_remove_aggregate_host(self):
def fake_driver_remove_from_aggregate(context, aggregate, host):
fake_driver_remove_from_aggregate.called = True
self.assertEqual("host", host, "host")
return {"foo": "bar"}
self.stubs.Set(self.compute.driver, "remove_from_aggregate",
fake_driver_remove_from_aggregate)
self.compute.remove_aggregate_host(self.context, self.aggr.id, "host")
self.assertTrue(fake_driver_remove_from_aggregate.called)
def test_remove_aggregate_host_raise_err(self):
"""Ensure the undo operation works correctly on remove."""
def fake_driver_remove_from_aggregate(context, aggregate, host):
raise exception.AggregateError
self.stubs.Set(self.compute.driver, "remove_from_aggregate",
fake_driver_remove_from_aggregate)
state = {'operational_state': aggregate_states.ACTIVE}
db.aggregate_update(self.context, self.aggr.id, state)
self.assertRaises(exception.AggregateError,
self.compute.remove_aggregate_host,
self.context, self.aggr.id, "fake_host")
excepted = db.aggregate_get(self.context, self.aggr.id)
self.assertEqual(excepted.operational_state, aggregate_states.ERROR)
self.assertEqual(excepted.hosts, ['fake_host'])
class ComputePolicyTestCase(BaseTestCase):
def setUp(self):

View File

@ -267,6 +267,30 @@ class DbApiTestCase(test.TestCase):
expected = {uuids[0]: [], uuids[1]: []}
self.assertEqual(expected, instance_faults)
def test_dns_registration(self):
domain1 = 'test.domain.one'
domain2 = 'test.domain.two'
testzone = 'testzone'
ctxt = context.get_admin_context()
db.dnsdomain_register_for_zone(ctxt, domain1, testzone)
domain_ref = db.dnsdomain_get(ctxt, domain1)
zone = domain_ref.availability_zone
scope = domain_ref.scope
self.assertEqual(scope, 'private')
self.assertEqual(zone, testzone)
db.dnsdomain_register_for_project(ctxt, domain2,
self.project_id)
domain_ref = db.dnsdomain_get(ctxt, domain2)
project = domain_ref.project_id
scope = domain_ref.scope
self.assertEqual(project, self.project_id)
self.assertEqual(scope, 'public')
db.dnsdomain_unregister(ctxt, domain1)
db.dnsdomain_unregister(ctxt, domain2)
def _get_fake_aggr_values():
return {'name': 'fake_aggregate',
@ -351,6 +375,14 @@ class AggregateDBApiTestCase(test.TestCase):
db.aggregate_create,
self.context, _get_fake_aggr_values())
def test_aggregate_get(self):
"""Ensure we can get aggregate with all its relations."""
ctxt = context.get_admin_context()
result = _create_aggregate_with_hosts(context=ctxt)
expected = db.aggregate_get(ctxt, result.id)
self.assertEqual(_get_fake_aggr_hosts(), expected.hosts)
self.assertEqual(_get_fake_aggr_metadata(), expected.metadetails)
def test_aggregate_delete_raise_not_found(self):
"""Ensure AggregateNotFound is raised when deleting an aggregate."""
ctxt = context.get_admin_context()
@ -541,30 +573,6 @@ class AggregateDBApiTestCase(test.TestCase):
db.aggregate_host_delete,
ctxt, result.id, _get_fake_aggr_hosts()[0])
def test_dns_registration(self):
domain1 = 'test.domain.one'
domain2 = 'test.domain.two'
testzone = 'testzone'
ctxt = context.get_admin_context()
db.dnsdomain_register_for_zone(ctxt, domain1, testzone)
domain_ref = db.dnsdomain_get(ctxt, domain1)
zone = domain_ref.availability_zone
scope = domain_ref.scope
self.assertEqual(scope, 'private')
self.assertEqual(zone, testzone)
db.dnsdomain_register_for_project(ctxt, domain2,
self.project_id)
domain_ref = db.dnsdomain_get(ctxt, domain2)
project = domain_ref.project_id
scope = domain_ref.scope
self.assertEqual(project, self.project_id)
self.assertEqual(scope, 'public')
db.dnsdomain_unregister(ctxt, domain1)
db.dnsdomain_unregister(ctxt, domain2)
class CapacityTestCase(test.TestCase):
def setUp(self):

View File

@ -401,6 +401,14 @@ class _VirtDriverTestCase(test.TestCase):
def test_host_power_action_startup(self):
self.connection.host_power_action('a useless argument?', 'startup')
@catch_notimplementederror
def test_add_to_aggregate(self):
self.connection.add_to_aggregate(self.ctxt, 'aggregate', 'host')
@catch_notimplementederror
def test_remove_from_aggregate(self):
self.connection.remove_from_aggregate(self.ctxt, 'aggregate', 'host')
class AbstractDriverTestCase(_VirtDriverTestCase):
def setUp(self):

View File

@ -31,6 +31,7 @@ from nova import flags
from nova import log as logging
from nova import test
from nova import utils
from nova.compute import aggregate_states
from nova.compute import instance_types
from nova.compute import power_state
from nova import exception
@ -1741,3 +1742,149 @@ class XenAPISRSelectionTestCase(test.TestCase):
expected = helper.safe_find_sr(session)
self.assertEqual(session.call_xenapi('pool.get_default_SR', pool_ref),
expected)
class XenAPIAggregateTestCase(test.TestCase):
"""Unit tests for aggregate operations."""
def setUp(self):
super(XenAPIAggregateTestCase, self).setUp()
self.stubs = stubout.StubOutForTesting()
self.flags(xenapi_connection_url='http://test_url',
xenapi_connection_username='test_user',
xenapi_connection_password='test_pass',
instance_name_template='%d',
firewall_driver='nova.virt.xenapi.firewall.'
'Dom0IptablesFirewallDriver',
host='host')
xenapi_fake.reset()
stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
self.context = context.get_admin_context()
self.conn = xenapi_conn.get_connection(False)
self.fake_metadata = {'master_compute': 'host'}
def tearDown(self):
super(XenAPIAggregateTestCase, self).tearDown()
self.stubs.UnsetAll()
def test_add_to_aggregate_called(self):
def fake_add_to_aggregate(context, aggregate, host):
fake_add_to_aggregate.called = True
self.stubs.Set(self.conn._pool,
"add_to_aggregate",
fake_add_to_aggregate)
self.conn.add_to_aggregate(None, None, None)
self.assertTrue(fake_add_to_aggregate.called)
def test_add_to_aggregate_for_first_host_sets_metadata(self):
def fake_init_pool(id, name):
fake_init_pool.called = True
self.stubs.Set(self.conn._pool, "_init_pool", fake_init_pool)
aggregate = self._aggregate_setup()
self.conn._pool.add_to_aggregate(self.context, aggregate, "host")
result = db.aggregate_get(self.context, aggregate.id)
self.assertTrue(fake_init_pool.called)
self.assertDictMatch(self.fake_metadata, result.metadetails)
self.assertEqual(aggregate_states.ACTIVE, result.operational_state)
def test_join_slave(self):
"""Ensure join_slave gets called when the request gets to master."""
def fake_join_slave(id, compute_uuid, host, url, user, password):
fake_join_slave.called = True
self.stubs.Set(self.conn._pool, "_join_slave", fake_join_slave)
aggregate = self._aggregate_setup(hosts=['host', 'host2'],
metadata=self.fake_metadata)
self.conn._pool.add_to_aggregate(self.context, aggregate, "host2",
compute_uuid='fake_uuid',
url='fake_url',
user='fake_user',
passwd='fake_pass',
xenhost_uuid='fake_uuid')
self.assertTrue(fake_join_slave.called)
def test_add_to_aggregate_first_host(self):
def fake_pool_set_name_label(self, session, pool_ref, name):
fake_pool_set_name_label.called = True
self.stubs.Set(xenapi_fake.SessionBase, "pool_set_name_label",
fake_pool_set_name_label)
self.conn._session.call_xenapi("pool.create", {"name": "asdf"})
values = {"name": 'fake_aggregate',
"availability_zone": 'fake_zone'}
result = db.aggregate_create(self.context, values)
db.aggregate_host_add(self.context, result.id, "host")
aggregate = db.aggregate_get(self.context, result.id)
self.assertEqual(["host"], aggregate.hosts)
self.assertEqual({}, aggregate.metadetails)
self.conn._pool.add_to_aggregate(self.context, aggregate, "host")
self.assertTrue(fake_pool_set_name_label.called)
def test_remove_from_aggregate_called(self):
def fake_remove_from_aggregate(context, aggregate, host):
fake_remove_from_aggregate.called = True
self.stubs.Set(self.conn._pool,
"remove_from_aggregate",
fake_remove_from_aggregate)
self.conn.remove_from_aggregate(None, None, None)
self.assertTrue(fake_remove_from_aggregate.called)
def test_remove_from_empty_aggregate(self):
values = {"name": 'fake_aggregate',
"availability_zone": 'fake_zone'}
result = db.aggregate_create(self.context, values)
self.assertRaises(exception.AggregateError,
self.conn._pool.remove_from_aggregate,
None, result, "test_host")
def test_remove_slave(self):
"""Ensure eject slave gets called."""
def fake_eject_slave(id, compute_uuid, host_uuid):
fake_eject_slave.called = True
self.stubs.Set(self.conn._pool, "_eject_slave", fake_eject_slave)
self.fake_metadata['host2'] = 'fake_host2_uuid'
aggregate = self._aggregate_setup(hosts=['host', 'host2'],
metadata=self.fake_metadata)
self.conn._pool.remove_from_aggregate(self.context, aggregate, "host2")
self.assertTrue(fake_eject_slave.called)
def test_remove_master_solo(self):
"""Ensure metadata are cleared after removal."""
def fake_clear_pool(id):
fake_clear_pool.called = True
self.stubs.Set(self.conn._pool, "_clear_pool", fake_clear_pool)
aggregate = self._aggregate_setup(aggr_state=aggregate_states.ACTIVE,
metadata=self.fake_metadata)
self.conn._pool.remove_from_aggregate(self.context, aggregate, "host")
result = db.aggregate_get(self.context, aggregate.id)
self.assertTrue(fake_clear_pool.called)
self.assertDictMatch({}, result.metadetails)
self.assertEqual(aggregate_states.ACTIVE, result.operational_state)
def test_remote_master_non_empty_pool(self):
"""Ensure AggregateError is raised if removing the master."""
aggregate = self._aggregate_setup(aggr_state=aggregate_states.ACTIVE,
hosts=['host', 'host2'],
metadata=self.fake_metadata)
self.assertRaises(exception.AggregateError,
self.conn._pool.remove_from_aggregate,
self.context, aggregate, "host")
def _aggregate_setup(self, aggr_name='fake_aggregate',
aggr_zone='fake_zone',
aggr_state=aggregate_states.CREATED,
hosts=['host'], metadata=None):
values = {"name": aggr_name,
"availability_zone": aggr_zone,
"operational_state": aggr_state, }
result = db.aggregate_create(self.context, values)
for host in hosts:
db.aggregate_host_add(self.context, result.id, host)
if metadata:
db.aggregate_metadata_add(self.context, result.id, metadata)
return db.aggregate_get(self.context, result.id)

View File

@ -654,6 +654,13 @@ class ComputeDriver(object):
related to other calls into the driver. The prime example is to clean
the cache and remove images which are no longer of interest.
"""
def add_to_aggregate(self, context, aggregate, host, **kwargs):
"""Add a compute host to an aggregate."""
raise NotImplementedError()
def remove_from_aggregate(self, context, aggregate, host, **kwargs):
"""Remove a compute host from an aggregate."""
raise NotImplementedError()
def get_volume_connector(self, instance):

View File

@ -63,7 +63,7 @@ from nova import log as logging
from nova import utils
_CLASSES = ['host', 'network', 'session', 'SR', 'VBD', 'pool',
_CLASSES = ['host', 'network', 'session', 'pool', 'SR', 'VBD',
'PBD', 'VDI', 'VIF', 'PIF', 'VM', 'VLAN', 'task']
_db_content = {}
@ -509,6 +509,15 @@ class SessionBase(object):
def VM_clean_reboot(self, *args):
return 'burp'
def pool_eject(self, session, host_ref):
pass
def pool_join(self, session, hostname, username, password):
pass
def pool_set_name_label(self, session, pool_ref, name):
pass
def network_get_all_records_where(self, _1, filter):
return self.xenapi.network.get_all_records()

214
nova/virt/xenapi/pool.py Normal file
View File

@ -0,0 +1,214 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2012 Citrix Systems, Inc.
# Copyright 2010 OpenStack LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Management class for Pool-related functions (join, eject, etc).
"""
import json
import urlparse
from nova import db
from nova import exception
from nova import flags
from nova import log as logging
from nova import rpc
from nova.compute import aggregate_states
from nova.openstack.common import cfg
from nova.virt.xenapi import vm_utils
LOG = logging.getLogger("nova.virt.xenapi.pool")
xenapi_pool_opts = [
cfg.BoolOpt('use_join_force',
default=True,
help='To use for hosts with different CPUs'),
]
FLAGS = flags.FLAGS
FLAGS.register_opts(xenapi_pool_opts)
class ResourcePool(object):
"""
Implements resource pool operations.
"""
def __init__(self, session):
self.XenAPI = session.get_imported_xenapi()
host_ref = session.get_xenapi_host()
host_rec = session.call_xenapi('host.get_record', host_ref)
self._host_name = host_rec['hostname']
self._host_addr = host_rec['address']
self._host_uuid = host_rec['uuid']
self._session = session
def add_to_aggregate(self, context, aggregate, host, **kwargs):
"""Add a compute host to an aggregate."""
if len(aggregate.hosts) == 1:
# this is the first host of the pool -> make it master
self._init_pool(aggregate.id, aggregate.name)
# save metadata so that we can find the master again:
# the password should be encrypted, really.
values = {
'operational_state': aggregate_states.ACTIVE,
'metadata': {'master_compute': host},
}
db.aggregate_update(context, aggregate.id, values)
else:
# the pool is already up and running, we need to figure out
# whether we can serve the request from this host or not.
master_compute = aggregate.metadetails['master_compute']
if master_compute == FLAGS.host and master_compute != host:
# this is the master -> do a pool-join
# To this aim, nova compute on the slave has to go down.
# NOTE: it is assumed that ONLY nova compute is running now
self._join_slave(aggregate.id, host,
kwargs.get('compute_uuid'),
kwargs.get('url'), kwargs.get('user'),
kwargs.get('passwd'))
metadata = {host: kwargs.get('xenhost_uuid'), }
db.aggregate_metadata_add(context, aggregate.id, metadata)
elif master_compute and master_compute != host:
# send rpc cast to master, asking to add the following
# host with specified credentials.
# NOTE: password in clear is not great, but it'll do for now
forward_request(context, "add_aggregate_host", master_compute,
aggregate.id, host,
self._host_addr, self._host_uuid)
def remove_from_aggregate(self, context, aggregate, host, **kwargs):
"""Remove a compute host from an aggregate."""
master_compute = aggregate.metadetails.get('master_compute')
if master_compute == FLAGS.host and master_compute != host:
# this is the master -> instruct it to eject a host from the pool
host_uuid = db.aggregate_metadata_get(context, aggregate.id)[host]
self._eject_slave(aggregate.id,
kwargs.get('compute_uuid'), host_uuid)
db.aggregate_metadata_delete(context, aggregate.id, host)
elif master_compute == host:
# Remove master from its own pool -> destroy pool only if the
# master is on its own, otherwise raise fault. Destroying a
# pool made only by master is fictional
if len(aggregate.hosts) > 1:
raise exception.AggregateError(
aggregate_id=aggregate.id,
action='remove_from_aggregate',
reason=_('Unable to eject %(host)s '
'from the pool; pool not empty')
% locals())
self._clear_pool(aggregate.id)
db.aggregate_metadata_delete(context,
aggregate.id, 'master_compute')
elif master_compute and master_compute != host:
# A master exists -> forward pool-eject request to master
forward_request(context, "remove_aggregate_host", master_compute,
aggregate.id, host,
self._host_addr, self._host_uuid)
else:
# this shouldn't have happened
raise exception.AggregateError(aggregate_id=aggregate.id,
action='remove_from_aggregate',
reason=_('Unable to eject %(host)s '
'from the pool; No master found')
% locals())
def _join_slave(self, aggregate_id, host, compute_uuid, url, user, passwd):
"""Joins a slave into a XenServer resource pool."""
try:
args = {'compute_uuid': compute_uuid,
'url': url,
'user': user,
'password': passwd,
'force': json.dumps(FLAGS.use_join_force),
'master_addr': self._host_addr,
'master_user': FLAGS.xenapi_connection_username,
'master_pass': FLAGS.xenapi_connection_password, }
task = self._session.async_call_plugin('xenhost',
'host_join', args)
self._session.wait_for_task(task)
except self.XenAPI.Failure as e:
LOG.error(_("Pool-Join failed: %(e)s") % locals())
raise exception.AggregateError(aggregate_id=aggregate_id,
action='add_to_aggregate',
reason=_('Unable to join %(host)s '
'in the pool') % locals())
def _eject_slave(self, aggregate_id, compute_uuid, host_uuid):
"""Eject a slave from a XenServer resource pool."""
try:
# shutdown nova-compute; if there are other VMs running, e.g.
# guest instances, the eject will fail. That's a precaution
# to deal with the fact that the admin should evacuate the host
# first. The eject wipes out the host completely.
vm_ref = self._session.call_xenapi('VM.get_by_uuid', compute_uuid)
self._session.call_xenapi("VM.clean_shutdown", vm_ref)
host_ref = self._session.call_xenapi('host.get_by_uuid', host_uuid)
self._session.call_xenapi("pool.eject", host_ref)
except self.XenAPI.Failure as e:
LOG.error(_("Pool-eject failed: %(e)s") % locals())
raise exception.AggregateError(aggregate_id=aggregate_id,
action='remove_from_aggregate',
reason=str(e.details))
def _init_pool(self, aggregate_id, aggregate_name):
"""Set the name label of a XenServer pool."""
try:
pool_ref = self._session.call_xenapi("pool.get_all")[0]
self._session.call_xenapi("pool.set_name_label",
pool_ref, aggregate_name)
except self.XenAPI.Failure as e:
LOG.error(_("Unable to set up pool: %(e)s.") % locals())
raise exception.AggregateError(aggregate_id=aggregate_id,
action='add_to_aggregate',
reason=str(e.details))
def _clear_pool(self, aggregate_id):
"""Clear the name label of a XenServer pool."""
try:
pool_ref = self._session.call_xenapi('pool.get_all')[0]
self._session.call_xenapi('pool.set_name_label', pool_ref, '')
except self.XenAPI.Failure as e:
LOG.error(_("Pool-set_name_label failed: %(e)s") % locals())
raise exception.AggregateError(aggregate_id=aggregate_id,
action='remove_from_aggregate',
reason=str(e.details))
def forward_request(context, request_type, master, aggregate_id,
slave_compute, slave_address, slave_uuid):
"""Casts add/remove requests to the pool master."""
# replace the address from the xenapi connection url
# because this might be 169.254.0.1, i.e. xenapi
sender_url = swap_xapi_host(FLAGS.xenapi_connection_url, slave_address)
rpc.cast(context, db.queue_get_for(context, FLAGS.compute_topic, master),
{"method": request_type,
"args": {"aggregate_id": aggregate_id,
"host": slave_compute,
"url": sender_url,
"user": FLAGS.xenapi_connection_username,
"passwd": FLAGS.xenapi_connection_password,
"compute_uuid": vm_utils.get_this_vm_uuid(),
"xenhost_uuid": slave_uuid, },
})
def swap_xapi_host(url, host_addr):
"""Replace the XenServer address present in 'url' with 'host_addr'."""
temp_url = urlparse.urlparse(url)
_, sep, port = temp_url.netloc.partition(':')
return url.replace(temp_url.netloc, '%s%s%s' % (host_addr, sep, port))

View File

@ -78,6 +78,7 @@ from nova import flags
from nova import log as logging
from nova.openstack.common import cfg
from nova.virt import driver
from nova.virt.xenapi import pool
from nova.virt.xenapi import vm_utils
from nova.virt.xenapi.vmops import VMOps
from nova.virt.xenapi.volumeops import VolumeOps
@ -180,6 +181,7 @@ class XenAPIConnection(driver.ComputeDriver):
self._product_version = self._session.get_product_version()
self._vmops = VMOps(self._session, self._product_version)
self._initiator = None
self._pool = pool.ResourcePool(self._session)
@property
def host_state(self):
@ -488,6 +490,15 @@ class XenAPIConnection(driver.ComputeDriver):
"""Sets the specified host's ability to accept new instances."""
return self._vmops.set_host_enabled(host, enabled)
def add_to_aggregate(self, context, aggregate, host, **kwargs):
"""Add a compute host to an aggregate."""
return self._pool.add_to_aggregate(context, aggregate, host, **kwargs)
def remove_from_aggregate(self, context, aggregate, host, **kwargs):
"""Remove a compute host from an aggregate."""
return self._pool.remove_from_aggregate(context,
aggregate, host, **kwargs)
class XenAPISession(object):
"""The session to invoke XenAPI SDK calls"""
@ -498,9 +509,19 @@ class XenAPISession(object):
exception = self.XenAPI.Failure(_("Unable to log in to XenAPI "
"(is the Dom0 disk full?)"))
for i in xrange(FLAGS.xenapi_connection_concurrent):
session = self._create_session(url)
with timeout.Timeout(FLAGS.xenapi_login_timeout, exception):
session.login_with_password(user, pw)
try:
session = self._create_session(url)
with timeout.Timeout(FLAGS.xenapi_login_timeout, exception):
session.login_with_password(user, pw)
except self.XenAPI.Failure, e:
# if user and pw of the master are different, we're doomed!
if e.details[0] == 'HOST_IS_SLAVE':
master = e.details[1]
session = self.XenAPI.Session(pool.swap_xapi_host(url,
master))
session.login_with_password(user, pw)
else:
raise
self._sessions.put(session)
def get_product_version(self):

View File

@ -33,6 +33,7 @@ import subprocess
import tempfile
import time
import XenAPI
import XenAPIPlugin
import pluginlib_nova as pluginlib
@ -41,6 +42,8 @@ pluginlib.configure_logging("xenhost")
host_data_pattern = re.compile(r"\s*(\S+) \([^\)]+\) *: ?(.*)")
config_file_path = "/usr/etc/xenhost.conf"
DEFAULT_TRIES = 23
DEFAULT_SLEEP = 10
def jsonify(fnc):
@ -97,6 +100,28 @@ def _run_command_with_input(cmd, process_input):
return output
def _resume_compute(session, compute_ref, compute_uuid):
"""Resume compute node on slave host after pool join. This has to
happen regardless of the success or failure of the join operation."""
try:
# session is valid if the join operation has failed
session.xenapi.VM.start(compute_ref, False, True)
except XenAPI.Failure, e:
# if session is invalid, e.g. xapi has restarted, then the pool
# join has been successful, wait for xapi to become alive again
for c in xrange(0, DEFAULT_TRIES):
try:
_run_command("xe vm-start uuid=%s" % compute_uuid)
return
except pluginlib.PluginError, e:
logging.exception('Waited %d seconds for the slave to '
'become available.' % (c * DEFAULT_SLEEP))
time.sleep(DEFAULT_SLEEP)
raise pluginlib.PluginError('Unrecoverable error: the host has '
'not come back for more than %d seconds'
% (DEFAULT_SLEEP * (DEFAULT_TRIES + 1)))
def _get_host_uuid():
cmd = "xe host-list | grep uuid"
resp = _run_command(cmd)
@ -257,6 +282,34 @@ def host_start(self, arg_dict):
return _power_action("startup")
@jsonify
def host_join(self, arg_dict):
"""Join a remote host into a pool whose master is the host
where the plugin is called from. The following constraints apply:
- The host must have no VMs running, except nova-compute, which will be
shut down (and restarted upon pool-join) automatically,
- The host must have no shared storage currently set up,
- The host must have the same license of the master,
- The host must have the same supplemental packs as the master."""
session = XenAPI.Session(arg_dict.get("url"))
session.login_with_password(arg_dict.get("user"),
arg_dict.get("password"))
compute_ref = session.xenapi.VM.get_by_uuid(arg_dict.get('compute_uuid'))
session.xenapi.VM.clean_shutdown(compute_ref)
try:
if arg_dict.get("force"):
session.xenapi.pool.join(arg_dict.get("master_addr"),
arg_dict.get("master_user"),
arg_dict.get("master_pass"))
else:
session.xenapi.pool.join_force(arg_dict.get("master_addr"),
arg_dict.get("master_user"),
arg_dict.get("master_pass"))
finally:
_resume_compute(session, compute_ref, arg_dict.get("compute_uuid"))
@jsonify
def host_data(self, arg_dict):
"""Runs the commands on the xenstore host to return the current status
@ -380,6 +433,7 @@ if __name__ == "__main__":
"host_shutdown": host_shutdown,
"host_reboot": host_reboot,
"host_start": host_start,
"host_join": host_join,
"get_config": get_config,
"set_config": set_config,
"iptables_config": iptables_config})