General-host-aggregates part 1.

Partially implements blueprint general-host-aggregates: Xen decoupling

* keep rpcapi to compute-controller for Hypervisor-Pools
  *  Add compute_rpcapi.add_aggregate_host to libvirt
* Remove aggregate states
* Remove One aggregate per host restriction
* Keep Xen support for both hypervisor pools and aggregates

Change-Id: I6ac0f8b6fc31dff589363331e99e0755301f2172
This commit is contained in:
Joe Gordon
2012-06-06 14:03:43 -07:00
parent b63a3e8cbd
commit c571ebb89b
16 changed files with 347 additions and 287 deletions

View File

@@ -54,7 +54,7 @@ class AggregateController(object):
return {'aggregates': aggregates}
def create(self, req, body):
"""Creates an aggregate, given its name and availablity_zone."""
"""Creates an aggregate, given its name and availability_zone."""
context = _get_context(req)
authorize(context)
@@ -90,7 +90,7 @@ class AggregateController(object):
return self._marshall_aggregate(aggregate)
def update(self, req, id, body):
"""Updates the name and/or availbility_zone of given aggregate."""
"""Updates the name and/or availability_zone of given aggregate."""
context = _get_context(req)
authorize(context)
@@ -152,8 +152,7 @@ class AggregateController(object):
LOG.exception(_("Cannot add host %(host)s in aggregate "
"%(id)s") % locals())
raise exc.HTTPNotFound
except (exception.AggregateHostConflict,
exception.AggregateHostExists,
except (exception.AggregateHostExists,
exception.InvalidAggregateAction):
LOG.exception(_("Cannot add host %(host)s in aggregate "
"%(id)s") % locals())

View File

@@ -18,8 +18,8 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Handles all requests relating to compute resources (e.g. guest vms,
networking and storage of vms, and compute hosts on which they run)."""
"""Handles all requests relating to compute resources (e.g. guest VMs,
networking and storage of VMs, and compute hosts on which they run)."""
import functools
import re
@@ -28,7 +28,6 @@ import time
import urllib
from nova import block_device
from nova.compute import aggregate_states
from nova.compute import instance_types
from nova.compute import power_state
from nova.compute import rpcapi as compute_rpcapi
@@ -1709,7 +1708,7 @@ class AggregateAPI(base.Base):
return self._get_aggregate_info(context, aggregate)
def get_aggregate_list(self, context):
"""Get all the aggregates for this zone."""
"""Get all the aggregates."""
aggregates = self.db.aggregate_get_all(context)
return [self._get_aggregate_info(context, a) for a in aggregates]
@@ -1723,11 +1722,6 @@ class AggregateAPI(base.Base):
If a key is set to None, it gets removed from the aggregate metadata.
"""
# As a first release of the host aggregates blueprint, this call is
# pretty dumb, in the sense that interacts only with the model.
# In later releasses, updating metadata may trigger virt actions like
# the setup of shared storage, or more generally changes to the
# underlying hypervisor pools.
for key in metadata.keys():
if not metadata[key]:
try:
@@ -1752,52 +1746,26 @@ class AggregateAPI(base.Base):
"""Adds the host to an aggregate."""
# validates the host; ComputeHostNotFound is raised if invalid
service = self.db.service_get_all_compute_by_host(context, host)[0]
# add host, and reflects action in the aggregate operational state
aggregate = self.db.aggregate_get(context, aggregate_id)
if aggregate.operational_state in [aggregate_states.CREATED,
aggregate_states.ACTIVE]:
if service.availability_zone != aggregate.availability_zone:
raise exception.InvalidAggregateAction(
action='add host',
aggregate_id=aggregate_id,
reason='availibility zone mismatch')
self.db.aggregate_host_add(context, aggregate_id, host)
if aggregate.operational_state == aggregate_states.CREATED:
values = {'operational_state': aggregate_states.CHANGING}
self.db.aggregate_update(context, aggregate_id, values)
self.compute_rpcapi.add_aggregate_host(context,
aggregate_id=aggregate_id, host_param=host, host=host)
return self.get_aggregate(context, aggregate_id)
else:
invalid = {aggregate_states.CHANGING: 'setup in progress',
aggregate_states.DISMISSED: 'aggregate deleted',
aggregate_states.ERROR: 'aggregate in error', }
if aggregate.operational_state in invalid.keys():
raise exception.InvalidAggregateAction(
action='add host',
aggregate_id=aggregate_id,
reason=invalid[aggregate.operational_state])
if service.availability_zone != aggregate.availability_zone:
raise exception.InvalidAggregateAction(
action='add host',
aggregate_id=aggregate_id,
reason='availability zone mismatch')
self.db.aggregate_host_add(context, aggregate_id, host)
#NOTE(jogo): Send message to host to support resource pools
self.compute_rpcapi.add_aggregate_host(context,
aggregate_id=aggregate_id, host_param=host, host=host)
return self.get_aggregate(context, aggregate_id)
def remove_host_from_aggregate(self, context, aggregate_id, host):
"""Removes host from the aggregate."""
# validates the host; ComputeHostNotFound is raised if invalid
service = self.db.service_get_all_compute_by_host(context, host)[0]
aggregate = self.db.aggregate_get(context, aggregate_id)
if aggregate.operational_state in [aggregate_states.ACTIVE,
aggregate_states.ERROR]:
self.db.aggregate_host_delete(context, aggregate_id, host)
self.compute_rpcapi.remove_aggregate_host(context,
aggregate_id=aggregate_id, host_param=host, host=host)
return self.get_aggregate(context, aggregate_id)
else:
invalid = {aggregate_states.CREATED: 'no hosts to remove',
aggregate_states.CHANGING: 'setup in progress',
aggregate_states.DISMISSED: 'aggregate deleted', }
if aggregate.operational_state in invalid.keys():
raise exception.InvalidAggregateAction(
action='remove host',
aggregate_id=aggregate_id,
reason=invalid[aggregate.operational_state])
self.db.aggregate_host_delete(context, aggregate_id, host)
self.compute_rpcapi.remove_aggregate_host(context,
aggregate_id=aggregate_id, host_param=host, host=host)
return self.get_aggregate(context, aggregate_id)
def _get_aggregate_info(self, context, aggregate):
"""Builds a dictionary with aggregate props, metadata and hosts."""

View File

@@ -47,7 +47,6 @@ from eventlet import greenthread
from nova import block_device
from nova import compute
from nova.compute import aggregate_states
from nova.compute import instance_types
from nova.compute import power_state
from nova.compute import rpcapi as compute_rpcapi
@@ -2730,13 +2729,13 @@ class ComputeManager(manager.SchedulerDependentManager):
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
def add_aggregate_host(self, context, aggregate_id, host, **kwargs):
"""Adds a host to a physical hypervisor pool."""
"""Notify hypervisor of change (for hypervisor pools)."""
aggregate = self.db.aggregate_get(context, aggregate_id)
try:
self.driver.add_to_aggregate(context, aggregate, host, **kwargs)
except exception.AggregateError:
with excutils.save_and_reraise_exception():
self._undo_aggregate_operation(context,
self.driver.undo_aggregate_operation(context,
self.db.aggregate_host_delete,
aggregate.id, host)
@@ -2750,22 +2749,11 @@ class ComputeManager(manager.SchedulerDependentManager):
except (exception.AggregateError,
exception.InvalidAggregateAction) as e:
with excutils.save_and_reraise_exception():
self._undo_aggregate_operation(
self.driver.undo_aggregate_operation(
context, self.db.aggregate_host_add,
aggregate.id, host,
isinstance(e, exception.AggregateError))
def _undo_aggregate_operation(self, context, op, aggregate_id,
host, set_error=True):
try:
if set_error:
status = {'operational_state': aggregate_states.ERROR}
self.db.aggregate_update(context, aggregate_id, status)
op(context, aggregate_id, host)
except Exception:
LOG.exception(_('Aggregate %(aggregate_id)s: unrecoverable state '
'during operation on %(host)s') % locals())
@manager.periodic_task(
ticks_between_runs=FLAGS.image_cache_manager_interval)
def _run_image_cache_manager_pass(self, context):

View File

@@ -26,7 +26,6 @@ import re
import warnings
from nova import block_device
from nova.compute import aggregate_states
from nova.compute import vm_states
from nova import db
from nova.db.sqlalchemy import models
@@ -4662,7 +4661,6 @@ def aggregate_create(context, values, metadata=None):
values['name'],
session=session,
read_deleted='yes').first()
values.setdefault('operational_state', aggregate_states.CREATED)
if not aggregate:
aggregate = models.Aggregate()
aggregate.update(values)
@@ -4738,7 +4736,6 @@ def aggregate_delete(context, aggregate_id):
if query.first():
query.update({'deleted': True,
'deleted_at': timeutils.utcnow(),
'operational_state': aggregate_states.DISMISSED,
'updated_at': literal_column('updated_at')})
else:
raise exception.AggregateNotFound(aggregate_id=aggregate_id)
@@ -4864,13 +4861,10 @@ def aggregate_host_add(context, aggregate_id, host):
read_deleted='yes').\
filter_by(host=host).first()
if not host_ref:
try:
host_ref = models.AggregateHost()
values = {"host": host, "aggregate_id": aggregate_id, }
host_ref.update(values)
host_ref.save(session=session)
except exception.DBError:
raise exception.AggregateHostConflict(host=host)
host_ref = models.AggregateHost()
values = {"host": host, "aggregate_id": aggregate_id, }
host_ref.update(values)
host_ref.save(session=session)
elif host_ref.deleted:
host_ref.update({'deleted': False, 'deleted_at': None})
host_ref.save(session=session)

View File

@@ -0,0 +1,68 @@
# Copyright 2012 OpenStack LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import String, Column, MetaData, Table, delete, select
from migrate.changeset import UniqueConstraint
from nova.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
dialect = migrate_engine.url.get_dialect().name
aggregates = Table('aggregates', meta, autoload=True)
aggregate_metadata = Table('aggregate_metadata', meta, autoload=True)
record_list = list(aggregates.select().execute())
for rec in record_list:
row = aggregate_metadata.insert()
row.execute({'created_at': rec['created_at'],
'updated_at': rec['updated_at'],
'deleted_at': rec['deleted_at'],
'deleted': rec['deleted'],
'key': 'operational_state',
'value': rec['operational_state'],
'aggregate_id': rec['id'],
})
aggregates.drop_column('operational_state')
aggregate_hosts = Table('aggregate_hosts', meta, autoload=True)
if dialect.startswith('sqlite'):
aggregate_hosts.drop_column('host')
aggregate_hosts.create_column(Column('host', String(255)))
else:
col = aggregate_hosts.c.host
UniqueConstraint(col, name='host').drop()
def downgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
aggregates = Table('aggregates', meta, autoload=True)
aggregate_metadata = Table('aggregate_metadata', meta, autoload=True)
operational_state = Column('operational_state', String(255))
aggregates.create_column(operational_state)
aggregates.update().values(operational_state=select(
[aggregate_metadata.c.value]).where(aggregates.c.id ==
aggregate_metadata.c.aggregate_id and aggregate_metadata.c.key ==
'operational_state')).execute()
delete(aggregate_metadata, aggregate_metadata.c.key == 'operational_state')
aggregates.c.operational_state.alter(nullable=False)
aggregate_hosts = Table('aggregate_hosts', meta, autoload=True)
aggregate_hosts.c.host.alter(unique=True)

View File

@@ -822,7 +822,7 @@ class AggregateHost(BASE, NovaBase):
"""Represents a host that is member of an aggregate."""
__tablename__ = 'aggregate_hosts'
id = Column(Integer, primary_key=True, autoincrement=True)
host = Column(String(255), unique=True)
host = Column(String(255), unique=False)
aggregate_id = Column(Integer, ForeignKey('aggregates.id'), nullable=False)
@@ -840,9 +840,9 @@ class Aggregate(BASE, NovaBase):
__tablename__ = 'aggregates'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(255), unique=True)
operational_state = Column(String(255), nullable=False)
availability_zone = Column(String(255), nullable=False)
_hosts = relationship(AggregateHost,
lazy="joined",
secondary="aggregate_hosts",
primaryjoin='and_('
'Aggregate.id == AggregateHost.aggregate_id,'
@@ -907,14 +907,14 @@ class S3Image(BASE, NovaBase):
class VolumeIdMapping(BASE, NovaBase):
"""Compatability layer for the EC2 volume service"""
"""Compatibility layer for the EC2 volume service"""
__tablename__ = 'volume_id_mappings'
id = Column(Integer, primary_key=True, nullable=False, autoincrement=True)
uuid = Column(String(36), nullable=False)
class SnapshotIdMapping(BASE, NovaBase):
"""Compatability layer for the EC2 snapshot service"""
"""Compatibility layer for the EC2 snapshot service"""
__tablename__ = 'snapshot_id_mappings'
id = Column(Integer, primary_key=True, nullable=False, autoincrement=True)
uuid = Column(String(36), nullable=False)

View File

@@ -1055,10 +1055,6 @@ class AggregateMetadataNotFound(NotFound):
"key %(metadata_key)s.")
class AggregateHostConflict(Duplicate):
message = _("Host %(host)s already member of another aggregate.")
class AggregateHostExists(Duplicate):
message = _("Aggregate %(aggregate_id)s already has host %(host)s.")

View File

@@ -23,7 +23,6 @@ from nova import exception
from nova.openstack.common import log as logging
from nova import test
LOG = logging.getLogger(__name__)
AGGREGATE_LIST = [
{"name": "aggregate1", "id": "1", "availability_zone": "nova1"},
@@ -226,16 +225,6 @@ class AggregateTestCase(test.TestCase):
self.assertEqual(aggregate["aggregate"], AGGREGATE)
def test_add_host_with_already_added_to_another_aggregate(self):
def stub_add_host_to_aggregate(context, aggregate, host):
raise exception.AggregateHostConflict()
self.stubs.Set(self.controller.api, "add_host_to_aggregate",
stub_add_host_to_aggregate)
self.assertRaises(exc.HTTPConflict, self.controller.action,
self.req, "duplicate_aggregate",
body={"add_host": {"host": "host1"}})
def test_add_host_with_already_added_host(self):
def stub_add_host_to_aggregate(context, aggregate, host):
raise exception.AggregateHostExists()

View File

@@ -27,7 +27,6 @@ import mox
import nova
from nova import compute
from nova.compute import aggregate_states
from nova.compute import api as compute_api
from nova.compute import instance_types
from nova.compute import manager as compute_manager
@@ -867,7 +866,7 @@ class ComputeTestCase(BaseTestCase):
self.compute.terminate_instance(self.context, instance_uuid)
def test_run_instance_usage_notification(self):
"""Ensure run instance generates apropriate usage notification"""
"""Ensure run instance generates appropriate usage notification"""
inst_ref = self._create_fake_instance()
instance_uuid = inst_ref['uuid']
self.compute.run_instance(self.context, instance_uuid)
@@ -896,7 +895,7 @@ class ComputeTestCase(BaseTestCase):
self.compute.terminate_instance(self.context, instance_uuid)
def test_terminate_usage_notification(self):
"""Ensure terminate_instance generates apropriate usage notification"""
"""Ensure terminate_instance generates correct usage notification"""
old_time = datetime.datetime(2012, 4, 1)
cur_time = datetime.datetime(2012, 12, 21, 12, 21)
timeutils.set_time_override(old_time)
@@ -3646,7 +3645,7 @@ def _create_service_entries(context, values={'avail_zone1': ['fake_host1',
return values
class ComputeAPIAggrTestCase(test.TestCase):
class ComputeAPIAggrTestCase(BaseTestCase):
"""This is for unit coverage of aggregate-related methods
defined in nova.compute.api."""
@@ -3664,6 +3663,7 @@ class ComputeAPIAggrTestCase(test.TestCase):
self.context, 'fake_aggr', 'fake_avail_zone')
def test_update_aggregate_metadata(self):
"""Ensure metadata can be updated"""
_create_service_entries(self.context, {'fake_zone': ['fake_host']})
aggr = self.api.create_aggregate(self.context, 'fake_aggregate',
'fake_zone')
@@ -3684,8 +3684,8 @@ class ComputeAPIAggrTestCase(test.TestCase):
self.api.delete_aggregate(self.context, aggr['id'])
expected = db.aggregate_get(self.context.elevated(read_deleted='yes'),
aggr['id'])
self.assertNotEqual(aggr['operational_state'],
expected['operational_state'])
self.assertRaises(exception.AggregateNotFound,
self.api.delete_aggregate, self.context, aggr['id'])
def test_delete_non_empty_aggregate(self):
"""Ensure InvalidAggregateAction is raised when non empty aggregate."""
@@ -3706,7 +3706,7 @@ class ComputeAPIAggrTestCase(test.TestCase):
'fake_aggregate', fake_zone)
aggr = self.api.add_host_to_aggregate(self.context,
aggr['id'], fake_host)
self.assertEqual(aggr['operational_state'], aggregate_states.CHANGING)
self.assertEqual(len(aggr['hosts']), 1)
def test_add_host_to_aggregate_multiple(self):
"""Ensure we can add multiple hosts to an aggregate."""
@@ -3714,57 +3714,10 @@ class ComputeAPIAggrTestCase(test.TestCase):
fake_zone = values.keys()[0]
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', fake_zone)
# let's mock the fact that the aggregate is active already!
status = {'operational_state': aggregate_states.ACTIVE}
db.aggregate_update(self.context, aggr['id'], status)
for host in values[fake_zone]:
aggr = self.api.add_host_to_aggregate(self.context,
aggr['id'], host)
self.assertEqual(len(aggr['hosts']), len(values[fake_zone]))
self.assertEqual(aggr['operational_state'],
aggregate_states.ACTIVE)
def test_add_host_to_aggregate_invalid_changing_status(self):
"""Ensure InvalidAggregateAction is raised when adding host while
aggregate is not ready."""
values = _create_service_entries(self.context)
fake_zone = values.keys()[0]
fake_host = values[fake_zone][0]
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', fake_zone)
aggr = self.api.add_host_to_aggregate(self.context,
aggr['id'], fake_host)
self.assertEqual(aggr['operational_state'],
aggregate_states.CHANGING)
self.assertRaises(exception.InvalidAggregateAction,
self.api.add_host_to_aggregate, self.context,
aggr['id'], fake_host)
def test_add_host_to_aggregate_invalid_dismissed_status(self):
"""Ensure InvalidAggregateAction is raised when aggregate is
deleted."""
_create_service_entries(self.context, {'fake_zone': ['fake_host']})
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', 'fake_zone')
# let's mock the fact that the aggregate is dismissed!
status = {'operational_state': aggregate_states.DISMISSED}
db.aggregate_update(self.context, aggr['id'], status)
self.assertRaises(exception.InvalidAggregateAction,
self.api.add_host_to_aggregate, self.context,
aggr['id'], 'fake_host')
def test_add_host_to_aggregate_invalid_error_status(self):
"""Ensure InvalidAggregateAction is raised when aggregate is
in error."""
_create_service_entries(self.context, {'fake_zone': ['fake_host']})
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', 'fake_zone')
# let's mock the fact that the aggregate is in error!
status = {'operational_state': aggregate_states.ERROR}
db.aggregate_update(self.context, aggr['id'], status)
self.assertRaises(exception.InvalidAggregateAction,
self.api.add_host_to_aggregate, self.context,
aggr['id'], 'fake_host')
def test_add_host_to_aggregate_zones_mismatch(self):
"""Ensure InvalidAggregateAction is raised when zones don't match."""
@@ -3791,9 +3744,6 @@ class ComputeAPIAggrTestCase(test.TestCase):
fake_zone = values.keys()[0]
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', fake_zone)
# let's mock the fact that the aggregate is active already!
status = {'operational_state': aggregate_states.ACTIVE}
db.aggregate_update(self.context, aggr['id'], status)
for host in values[fake_zone]:
aggr = self.api.add_host_to_aggregate(self.context,
aggr['id'], host)
@@ -3801,55 +3751,6 @@ class ComputeAPIAggrTestCase(test.TestCase):
aggr['id'],
values[fake_zone][0])
self.assertEqual(len(aggr['hosts']) - 1, len(expected['hosts']))
self.assertEqual(expected['operational_state'],
aggregate_states.ACTIVE)
def test_remove_host_from_aggregate_error(self):
"""Ensure we can remove a host from an aggregate even if in error."""
values = _create_service_entries(self.context)
fake_zone = values.keys()[0]
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', fake_zone)
# let's mock the fact that the aggregate is ready!
status = {'operational_state': aggregate_states.ACTIVE}
db.aggregate_update(self.context, aggr['id'], status)
for host in values[fake_zone]:
aggr = self.api.add_host_to_aggregate(self.context,
aggr['id'], host)
# let's mock the fact that the aggregate is in error!
status = {'operational_state': aggregate_states.ERROR}
expected = self.api.remove_host_from_aggregate(self.context,
aggr['id'],
values[fake_zone][0])
self.assertEqual(len(aggr['hosts']) - 1, len(expected['hosts']))
self.assertEqual(expected['operational_state'],
aggregate_states.ACTIVE)
def test_remove_host_from_aggregate_invalid_dismissed_status(self):
"""Ensure InvalidAggregateAction is raised when aggregate is
deleted."""
_create_service_entries(self.context, {'fake_zone': ['fake_host']})
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', 'fake_zone')
# let's mock the fact that the aggregate is dismissed!
status = {'operational_state': aggregate_states.DISMISSED}
db.aggregate_update(self.context, aggr['id'], status)
self.assertRaises(exception.InvalidAggregateAction,
self.api.remove_host_from_aggregate, self.context,
aggr['id'], 'fake_host')
def test_remove_host_from_aggregate_invalid_changing_status(self):
"""Ensure InvalidAggregateAction is raised when aggregate is
changing."""
_create_service_entries(self.context, {'fake_zone': ['fake_host']})
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', 'fake_zone')
# let's mock the fact that the aggregate is changing!
status = {'operational_state': aggregate_states.CHANGING}
db.aggregate_update(self.context, aggr['id'], status)
self.assertRaises(exception.InvalidAggregateAction,
self.api.remove_host_from_aggregate, self.context,
aggr['id'], 'fake_host')
def test_remove_host_from_aggregate_raise_not_found(self):
"""Ensure ComputeHostNotFound is raised when removing invalid host."""
@@ -3869,7 +3770,7 @@ class ComputeAggrTestCase(BaseTestCase):
super(ComputeAggrTestCase, self).setUp()
self.context = context.get_admin_context()
values = {'name': 'test_aggr',
'availability_zone': 'test_zone', }
'availability_zone': 'test_zone'}
self.aggr = db.aggregate_create(self.context, values)
def test_add_aggregate_host(self):
@@ -3882,24 +3783,6 @@ class ComputeAggrTestCase(BaseTestCase):
self.compute.add_aggregate_host(self.context, self.aggr.id, "host")
self.assertTrue(fake_driver_add_to_aggregate.called)
def test_add_aggregate_host_raise_err(self):
"""Ensure the undo operation works correctly on add."""
def fake_driver_add_to_aggregate(context, aggregate, host):
raise exception.AggregateError
self.stubs.Set(self.compute.driver, "add_to_aggregate",
fake_driver_add_to_aggregate)
state = {'operational_state': aggregate_states.ACTIVE}
db.aggregate_update(self.context, self.aggr.id, state)
db.aggregate_host_add(self.context, self.aggr.id, 'fake_host')
self.assertRaises(exception.AggregateError,
self.compute.add_aggregate_host,
self.context, self.aggr.id, "fake_host")
excepted = db.aggregate_get(self.context, self.aggr.id)
self.assertEqual(excepted.operational_state, aggregate_states.ERROR)
self.assertEqual(excepted.hosts, [])
def test_remove_aggregate_host(self):
def fake_driver_remove_from_aggregate(context, aggregate, host):
fake_driver_remove_from_aggregate.called = True
@@ -3911,23 +3794,6 @@ class ComputeAggrTestCase(BaseTestCase):
self.compute.remove_aggregate_host(self.context, self.aggr.id, "host")
self.assertTrue(fake_driver_remove_from_aggregate.called)
def test_remove_aggregate_host_raise_err(self):
"""Ensure the undo operation works correctly on remove."""
def fake_driver_remove_from_aggregate(context, aggregate, host):
raise exception.AggregateError
self.stubs.Set(self.compute.driver, "remove_from_aggregate",
fake_driver_remove_from_aggregate)
state = {'operational_state': aggregate_states.ACTIVE}
db.aggregate_update(self.context, self.aggr.id, state)
self.assertRaises(exception.AggregateError,
self.compute.remove_aggregate_host,
self.context, self.aggr.id, "fake_host")
excepted = db.aggregate_get(self.context, self.aggr.id)
self.assertEqual(excepted.operational_state, aggregate_states.ERROR)
self.assertEqual(excepted.hosts, ['fake_host'])
class ComputePolicyTestCase(BaseTestCase):

View File

@@ -453,7 +453,7 @@ class AggregateDBApiTestCase(test.TestCase):
def test_aggregate_create(self):
"""Ensure aggregate can be created with no metadata."""
result = _create_aggregate(metadata=None)
self.assertEqual(result['operational_state'], 'created')
self.assertEquals(result.name, 'fake_aggregate')
def test_aggregate_create_avoid_name_conflict(self):
"""Test we can avoid conflict on deleted aggregates."""
@@ -463,7 +463,6 @@ class AggregateDBApiTestCase(test.TestCase):
r2 = _create_aggregate(values=values)
self.assertEqual(r2.name, values['name'])
self.assertEqual(r2.availability_zone, values['availability_zone'])
self.assertEqual(r2.operational_state, "created")
def test_aggregate_create_raise_exist_exc(self):
"""Ensure aggregate names are distinct."""
@@ -542,7 +541,7 @@ class AggregateDBApiTestCase(test.TestCase):
self.assertEqual(0, len(expected))
aggregate = db.aggregate_get(ctxt.elevated(read_deleted='yes'),
result['id'])
self.assertEqual(aggregate["operational_state"], "dismissed")
self.assertEqual(aggregate.deleted, True)
def test_aggregate_update(self):
"""Ensure an aggregate can be updated."""
@@ -670,15 +669,17 @@ class AggregateDBApiTestCase(test.TestCase):
expected = db.aggregate_host_get_all(ctxt, result.id)
self.assertEqual(len(expected), 1)
def test_aggregate_host_add_duplicate_raise_conflict(self):
"""Ensure we cannot add host to distinct aggregates."""
def test_aggregate_host_add_duplicate_works(self):
"""Ensure we can add host to distinct aggregates."""
ctxt = context.get_admin_context()
_create_aggregate_with_hosts(context=ctxt, metadata=None)
self.assertRaises(exception.AggregateHostConflict,
_create_aggregate_with_hosts, ctxt,
r1 = _create_aggregate_with_hosts(context=ctxt, metadata=None)
r2 = _create_aggregate_with_hosts(ctxt,
values={'name': 'fake_aggregate2',
'availability_zone': 'fake_avail_zone2', },
metadata=None)
h1 = db.aggregate_host_get_all(ctxt, r1.id)
h2 = db.aggregate_host_get_all(ctxt, r2.id)
self.assertEqual(h1, h2)
def test_aggregate_host_add_duplicate_raise_exist_exc(self):
"""Ensure we cannot add host to the same aggregate."""

View File

@@ -20,10 +20,11 @@ import ast
import contextlib
import cPickle as pickle
import functools
import mox
import os
import re
from nova.compute import aggregate_states
from nova.compute import api as compute_api
from nova.compute import instance_types
from nova.compute import power_state
from nova import context
@@ -43,6 +44,7 @@ from nova.tests.xenapi import stubs
from nova.virt.xenapi import agent
from nova.virt.xenapi import driver as xenapi_conn
from nova.virt.xenapi import fake as xenapi_fake
from nova.virt.xenapi import pool_states
from nova.virt.xenapi import vm_utils
from nova.virt.xenapi import vmops
from nova.virt.xenapi import volume_utils
@@ -1750,6 +1752,20 @@ class XenAPISRSelectionTestCase(stubs.XenAPITestBase):
expected)
def _create_service_entries(context, values={'avail_zone1': ['fake_host1',
'fake_host2'],
'avail_zone2': ['fake_host3'], }):
for avail_zone, hosts in values.iteritems():
for host in hosts:
db.service_create(context,
{'host': host,
'binary': 'nova-compute',
'topic': 'compute',
'report_count': 0,
'availability_zone': avail_zone})
return values
class XenAPIAggregateTestCase(stubs.XenAPITestBase):
"""Unit tests for aggregate operations."""
def setUp(self):
@@ -1760,12 +1776,22 @@ class XenAPIAggregateTestCase(stubs.XenAPITestBase):
instance_name_template='%d',
firewall_driver='nova.virt.xenapi.firewall.'
'Dom0IptablesFirewallDriver',
host='host')
host='host',
connection_type='xenapi',
compute_driver='nova.virt.xenapi.driver.XenAPIDriver')
host_ref = xenapi_fake.get_all('host')[0]
stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
self.context = context.get_admin_context()
self.conn = xenapi_conn.XenAPIDriver(False)
self.fake_metadata = {'master_compute': 'host',
self.compute = importutils.import_object(FLAGS.compute_manager)
self.api = compute_api.AggregateAPI()
values = {'name': 'test_aggr',
'availability_zone': 'test_zone',
'metadata': {pool_states.POOL_FLAG: 'XenAPI'}}
self.aggr = db.aggregate_create(self.context, values)
self.fake_metadata = {pool_states.POOL_FLAG: 'XenAPI',
'master_compute': 'host',
pool_states.KEY: pool_states.ACTIVE,
'host': xenapi_fake.get_record('host',
host_ref)['uuid']}
@@ -1789,7 +1815,6 @@ class XenAPIAggregateTestCase(stubs.XenAPITestBase):
result = db.aggregate_get(self.context, aggregate.id)
self.assertTrue(fake_init_pool.called)
self.assertDictMatch(self.fake_metadata, result.metadetails)
self.assertEqual(aggregate_states.ACTIVE, result.operational_state)
def test_join_slave(self):
"""Ensure join_slave gets called when the request gets to master."""
@@ -1817,10 +1842,14 @@ class XenAPIAggregateTestCase(stubs.XenAPITestBase):
values = {"name": 'fake_aggregate',
"availability_zone": 'fake_zone'}
result = db.aggregate_create(self.context, values)
metadata = {pool_states.POOL_FLAG: "XenAPI",
pool_states.KEY: pool_states.CREATED}
db.aggregate_metadata_add(self.context, result.id, metadata)
db.aggregate_host_add(self.context, result.id, "host")
aggregate = db.aggregate_get(self.context, result.id)
self.assertEqual(["host"], aggregate.hosts)
self.assertEqual({}, aggregate.metadetails)
self.assertEqual(metadata, aggregate.metadetails)
self.conn._pool.add_to_aggregate(self.context, aggregate, "host")
self.assertTrue(fake_pool_set_name_label.called)
@@ -1836,12 +1865,10 @@ class XenAPIAggregateTestCase(stubs.XenAPITestBase):
self.assertTrue(fake_remove_from_aggregate.called)
def test_remove_from_empty_aggregate(self):
values = {"name": 'fake_aggregate',
"availability_zone": 'fake_zone'}
result = db.aggregate_create(self.context, values)
self.assertRaises(exception.AggregateError,
result = self._aggregate_setup()
self.assertRaises(exception.InvalidAggregateAction,
self.conn._pool.remove_from_aggregate,
None, result, "test_host")
self.context, result, "test_host")
def test_remove_slave(self):
"""Ensure eject slave gets called."""
@@ -1851,7 +1878,7 @@ class XenAPIAggregateTestCase(stubs.XenAPITestBase):
self.fake_metadata['host2'] = 'fake_host2_uuid'
aggregate = self._aggregate_setup(hosts=['host', 'host2'],
metadata=self.fake_metadata)
metadata=self.fake_metadata, aggr_state=pool_states.ACTIVE)
self.conn._pool.remove_from_aggregate(self.context, aggregate, "host2")
self.assertTrue(fake_eject_slave.called)
@@ -1861,37 +1888,120 @@ class XenAPIAggregateTestCase(stubs.XenAPITestBase):
fake_clear_pool.called = True
self.stubs.Set(self.conn._pool, "_clear_pool", fake_clear_pool)
aggregate = self._aggregate_setup(aggr_state=aggregate_states.ACTIVE,
metadata=self.fake_metadata)
aggregate = self._aggregate_setup(metadata=self.fake_metadata)
self.conn._pool.remove_from_aggregate(self.context, aggregate, "host")
result = db.aggregate_get(self.context, aggregate.id)
self.assertTrue(fake_clear_pool.called)
self.assertDictMatch({}, result.metadetails)
self.assertEqual(aggregate_states.ACTIVE, result.operational_state)
self.assertDictMatch({pool_states.POOL_FLAG: 'XenAPI',
pool_states.KEY: pool_states.ACTIVE}, result.metadetails)
def test_remote_master_non_empty_pool(self):
"""Ensure AggregateError is raised if removing the master."""
aggregate = self._aggregate_setup(aggr_state=aggregate_states.ACTIVE,
hosts=['host', 'host2'],
aggregate = self._aggregate_setup(hosts=['host', 'host2'],
metadata=self.fake_metadata)
self.assertRaises(exception.InvalidAggregateAction,
self.conn._pool.remove_from_aggregate,
self.context, aggregate, "host")
def _aggregate_setup(self, aggr_name='fake_aggregate',
aggr_zone='fake_zone',
aggr_state=aggregate_states.CREATED,
aggr_state=pool_states.CREATED,
hosts=['host'], metadata=None):
values = {"name": aggr_name,
"availability_zone": aggr_zone,
"operational_state": aggr_state, }
"availability_zone": aggr_zone}
result = db.aggregate_create(self.context, values)
pool_flag = {pool_states.POOL_FLAG: "XenAPI",
pool_states.KEY: aggr_state}
db.aggregate_metadata_add(self.context, result.id, pool_flag)
for host in hosts:
db.aggregate_host_add(self.context, result.id, host)
if metadata:
db.aggregate_metadata_add(self.context, result.id, metadata)
return db.aggregate_get(self.context, result.id)
def test_add_host_to_aggregate_invalid_changing_status(self):
"""Ensure InvalidAggregateAction is raised when adding host while
aggregate is not ready."""
aggregate = self._aggregate_setup(aggr_state=pool_states.CHANGING)
self.assertRaises(exception.InvalidAggregateAction,
self.conn.add_to_aggregate, self.context,
aggregate, 'host')
def test_add_host_to_aggregate_invalid_dismissed_status(self):
"""Ensure InvalidAggregateAction is raised when aggregate is
deleted."""
aggregate = self._aggregate_setup(aggr_state=pool_states.DISMISSED)
self.assertRaises(exception.InvalidAggregateAction,
self.conn.add_to_aggregate, self.context,
aggregate, 'fake_host')
def test_add_host_to_aggregate_invalid_error_status(self):
"""Ensure InvalidAggregateAction is raised when aggregate is
in error."""
aggregate = self._aggregate_setup(aggr_state=pool_states.ERROR)
self.assertRaises(exception.InvalidAggregateAction,
self.conn.add_to_aggregate, self.context,
aggregate, 'fake_host')
def test_remove_host_from_aggregate_error(self):
"""Ensure we can remove a host from an aggregate even if in error."""
values = _create_service_entries(self.context)
fake_zone = values.keys()[0]
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', fake_zone)
# let's mock the fact that the aggregate is ready!
metadata = {pool_states.POOL_FLAG: "XenAPI",
pool_states.KEY: pool_states.ACTIVE}
db.aggregate_metadata_add(self.context, aggr['id'], metadata)
for host in values[fake_zone]:
aggr = self.api.add_host_to_aggregate(self.context,
aggr['id'], host)
# let's mock the fact that the aggregate is in error!
status = {'operational_state': pool_states.ERROR}
expected = self.api.remove_host_from_aggregate(self.context,
aggr['id'],
values[fake_zone][0])
self.assertEqual(len(aggr['hosts']) - 1, len(expected['hosts']))
self.assertEqual(expected['metadata'][pool_states.KEY],
pool_states.ACTIVE)
def test_remove_host_from_aggregate_invalid_dismissed_status(self):
"""Ensure InvalidAggregateAction is raised when aggregate is
deleted."""
aggregate = self._aggregate_setup(aggr_state=pool_states.DISMISSED)
self.assertRaises(exception.InvalidAggregateAction,
self.conn.remove_from_aggregate, self.context,
aggregate, 'fake_host')
def test_remove_host_from_aggregate_invalid_changing_status(self):
"""Ensure InvalidAggregateAction is raised when aggregate is
changing."""
aggregate = self._aggregate_setup(aggr_state=pool_states.CHANGING)
self.assertRaises(exception.InvalidAggregateAction,
self.conn.remove_from_aggregate, self.context,
aggregate, 'fake_host')
def test_add_aggregate_host_raise_err(self):
"""Ensure the undo operation works correctly on add."""
def fake_driver_add_to_aggregate(context, aggregate, host):
raise exception.AggregateError
self.stubs.Set(self.compute.driver, "add_to_aggregate",
fake_driver_add_to_aggregate)
metadata = {pool_states.POOL_FLAG: "XenAPI",
pool_states.KEY: pool_states.ACTIVE}
db.aggregate_metadata_add(self.context, self.aggr.id, metadata)
db.aggregate_host_add(self.context, self.aggr.id, 'fake_host')
self.assertRaises(exception.AggregateError,
self.compute.add_aggregate_host,
self.context, self.aggr.id, "fake_host")
excepted = db.aggregate_get(self.context, self.aggr.id)
self.assertEqual(excepted.metadetails[pool_states.KEY],
pool_states.ERROR)
self.assertEqual(excepted.hosts, [])
class VmUtilsTestCase(test.TestCase):
"""Unit tests for xenapi utils."""

View File

@@ -629,12 +629,18 @@ class ComputeDriver(object):
def add_to_aggregate(self, context, aggregate, host, **kwargs):
"""Add a compute host to an aggregate."""
#NOTE(jogo) Currently only used for XenAPI-Pool
raise NotImplementedError()
def remove_from_aggregate(self, context, aggregate, host, **kwargs):
"""Remove a compute host from an aggregate."""
raise NotImplementedError()
def undo_aggregate_operation(self, context, op, aggregate_id,
host, set_error=True):
"""Undo for Resource Pools"""
raise NotImplementedError()
def get_volume_connector(self, instance):
"""Get connector information for the instance for attaching to volumes.

View File

@@ -2798,6 +2798,20 @@ class LibvirtDriver(driver.ComputeDriver):
pass
return output
def add_to_aggregate(self, context, aggregate, host, **kwargs):
"""Add a compute host to an aggregate."""
#NOTE(jogo) Currently only used for XenAPI-Pool
pass
def remove_from_aggregate(self, context, aggregate, host, **kwargs):
"""Remove a compute host from an aggregate."""
pass
def undo_aggregate_operation(self, context, op, aggregate_id,
host, set_error=True):
"""only used for Resource Pools"""
pass
class HostState(object):
"""Manages information about the compute node through libvirt"""

View File

@@ -493,6 +493,12 @@ class XenAPIDriver(driver.ComputeDriver):
return self._pool.remove_from_aggregate(context,
aggregate, host, **kwargs)
def undo_aggregate_operation(self, context, op, aggregate_id,
host, set_error=True):
"""Undo aggregate operation when pool error raised"""
return self._pool.undo_aggregate_operation(context, op,
aggregate_id, host, set_error)
def legacy_nwinfo(self):
"""
Indicate if the driver requires the legacy network_info format.

View File

@@ -21,7 +21,6 @@ Management class for Pool-related functions (join, eject, etc).
import urlparse
from nova.compute import aggregate_states
from nova import db
from nova import exception
from nova import flags
@@ -29,6 +28,7 @@ from nova.openstack.common import cfg
from nova.openstack.common import jsonutils
from nova.openstack.common import log as logging
from nova.openstack.common import rpc
from nova.virt.xenapi import pool_states
from nova.virt.xenapi import vm_utils
LOG = logging.getLogger(__name__)
@@ -55,22 +55,58 @@ class ResourcePool(object):
self._host_uuid = host_rec['uuid']
self._session = session
def undo_aggregate_operation(self, context, op, aggregate_id,
host, set_error):
"""Undo aggregate operation when pool error raised"""
try:
if set_error:
metadata = {pool_states.KEY: pool_states.ERROR}
db.aggregate_metadata_add(context, aggregate_id, metadata)
op(context, aggregate_id, host)
except Exception:
LOG.exception(_('Aggregate %(aggregate_id)s: unrecoverable state '
'during operation on %(host)s') % locals())
def _is_hv_pool(self, context, aggregate_id):
"""Checks if aggregate is a hypervisor_pool"""
metadata = db.aggregate_metadata_get(context, aggregate_id)
return pool_states.POOL_FLAG in metadata.keys()
def add_to_aggregate(self, context, aggregate, host, **kwargs):
"""Add a compute host to an aggregate."""
if not self._is_hv_pool(context, aggregate.id):
return
invalid = {pool_states.CHANGING: 'setup in progress',
pool_states.DISMISSED: 'aggregate deleted',
pool_states.ERROR: 'aggregate in error'}
if (db.aggregate_metadata_get(context, aggregate.id)[pool_states.KEY]
in invalid.keys()):
raise exception.InvalidAggregateAction(
action='add host',
aggregate_id=aggregate.id,
reason=invalid[db.aggregate_metadata_get(context,
aggregate.id)
[pool_states.KEY]])
if (db.aggregate_metadata_get(context, aggregate.id)[pool_states.KEY]
== pool_states.CREATED):
db.aggregate_metadata_add(context, aggregate.id,
{pool_states.KEY: pool_states.CHANGING})
if len(aggregate.hosts) == 1:
# this is the first host of the pool -> make it master
self._init_pool(aggregate.id, aggregate.name)
# save metadata so that we can find the master again
values = {
'operational_state': aggregate_states.ACTIVE,
'metadata': {'master_compute': host,
host: self._host_uuid},
}
db.aggregate_update(context, aggregate.id, values)
metadata = {'master_compute': host,
host: self._host_uuid,
pool_states.KEY: pool_states.ACTIVE}
db.aggregate_metadata_add(context, aggregate.id, metadata)
else:
# the pool is already up and running, we need to figure out
# whether we can serve the request from this host or not.
master_compute = aggregate.metadetails['master_compute']
master_compute = db.aggregate_metadata_get(context,
aggregate.id)['master_compute']
if master_compute == FLAGS.host and master_compute != host:
# this is the master -> do a pool-join
# To this aim, nova compute on the slave has to go down.
@@ -90,7 +126,22 @@ class ResourcePool(object):
def remove_from_aggregate(self, context, aggregate, host, **kwargs):
"""Remove a compute host from an aggregate."""
master_compute = aggregate.metadetails.get('master_compute')
if not self._is_hv_pool(context, aggregate.id):
return
invalid = {pool_states.CREATED: 'no hosts to remove',
pool_states.CHANGING: 'setup in progress',
pool_states.DISMISSED: 'aggregate deleted', }
if (db.aggregate_metadata_get(context, aggregate.id)[pool_states.KEY]
in invalid.keys()):
raise exception.InvalidAggregateAction(
action='remove host',
aggregate_id=aggregate.id,
reason=invalid[db.aggregate_metadata_get(context,
aggregate.id)[pool_states.KEY]])
master_compute = db.aggregate_metadata_get(context,
aggregate.id)['master_compute']
if master_compute == FLAGS.host and master_compute != host:
# this is the master -> instruct it to eject a host from the pool
host_uuid = db.aggregate_metadata_get(context, aggregate.id)[host]

View File

@@ -15,26 +15,26 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Possible states for host aggregates.
"""Possible states for xen resource pools.
An aggregate may be 'created', in which case the admin has triggered its
A pool may be 'created', in which case the admin has triggered its
creation, but the underlying hypervisor pool has not actually being set up
yet. An aggregate may be 'changing', meaning that the underlying hypervisor
pool is being setup. An aggregate may be 'active', in which case the underlying
hypervisor pool is up and running. An aggregate may be 'dismissed' when it has
no hosts and it has been deleted. An aggregate may be in 'error' in all other
yet. An pool may be 'changing', meaning that the underlying hypervisor
pool is being setup. An pool may be 'active', in which case the underlying
hypervisor pool is up and running. An pool may be 'dismissed' when it has
no hosts and it has been deleted. An pool may be in 'error' in all other
cases.
A 'created' aggregate becomes 'changing' during the first request of
A 'created' pool becomes 'changing' during the first request of
adding a host. During a 'changing' status no other requests will be accepted;
this is to allow the hypervisor layer to instantiate the underlying pool
without any potential race condition that may incur in master/slave-based
configurations. The aggregate goes into the 'active' state when the underlying
configurations. The pool goes into the 'active' state when the underlying
pool has been correctly instantiated.
All other operations (e.g. add/remove hosts) that succeed will keep the
aggregate in the 'active' state. If a number of continuous requests fail,
an 'active' aggregate goes into an 'error' state. To recover from such a state,
pool in the 'active' state. If a number of continuous requests fail,
an 'active' pool goes into an 'error' state. To recover from such a state,
admin intervention is required. Currently an error state is irreversible,
that is, in order to recover from it an aggregate must be deleted.
that is, in order to recover from it an pool must be deleted.
"""
CREATED = 'created'
@@ -42,3 +42,7 @@ CHANGING = 'changing'
ACTIVE = 'active'
ERROR = 'error'
DISMISSED = 'dismissed'
# Metadata keys
KEY = 'operational_state'
POOL_FLAG = 'hypervisor_pool'