blueprint host-aggregates: OSAPI/virt integration, via nova.compute.api

This commit introduces the first cut of integration between the OSAPI
Admin extensions for host aggregates and the virt layer.

This is part of a series of commits that have started with change:

https://review.openstack.org/#change,3035

Change-Id: I75d8b616e3b8f8cef75d40d937e0dce9f29b16db
This commit is contained in:
Armando Migliaccio 2012-01-18 19:47:36 +00:00
parent fcf73f90c3
commit 0a617713b8
8 changed files with 393 additions and 10 deletions

View File

@ -17,3 +17,4 @@
# under the License.
from nova.compute.api import API
from nova.compute.api import AggregateAPI

View File

@ -17,12 +17,28 @@
"""Possible states for host aggregates.
An aggregate may be 'building', in which case the admin has triggered its
creation, but the underlying hypervisor pool has not actually being created
yet. An aggregate may be 'active', in which case the underlying hypervisor
pool is up and running. An aggregate may be in 'error' in all other cases.
An aggregate may be 'created', in which case the admin has triggered its
creation, but the underlying hypervisor pool has not actually being set up
yet. An aggregate may be 'changing', meaning that the underlying hypervisor
pool is being setup. An aggregate may be 'active', in which case the underlying
hypervisor pool is up and running. An aggregate may be 'dismissed' when it has
no hosts and it has been deleted. An aggregate may be in 'error' in all other
cases.
A 'created' aggregate becomes 'changing' during the first request of
adding a host. During a 'changing' status no other requests will be accepted;
this is to allow the hypervisor layer to instantiate the underlying pool
without any potential race condition that may incur in master/slave-based
configurations. The aggregate goes into the 'active' state when the underlying
pool has been correctly instantiated.
All other operations (e.g. add/remove hosts) that succeed will keep the
aggregate in the 'active' state. If a number of continuous requests fail,
an 'active' aggregate goes into an 'error' state. To recover from such a state,
admin intervention is required. Currently an error state is irreversible,
that is, in order to recover from it an aggregate must be deleted.
"""
BUILDING = 'building'
CREATED = 'created'
CHANGING = 'changing'
ACTIVE = 'active'
ERROR = 'error'
DISMISSED = 'dismissed'

View File

@ -17,7 +17,8 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Handles all requests relating to instances (guest vms)."""
"""Handles all requests relating to compute resources (e.g. guest vms,
networking and storage of vms, and compute hosts on which they run)."""
import functools
import re
@ -27,6 +28,7 @@ import novaclient
import webob.exc
from nova import block_device
from nova.compute import aggregate_states
from nova.compute import instance_types
from nova.compute import power_state
from nova.compute import task_states
@ -1740,3 +1742,125 @@ class API(base.Base):
uuids = [instance['uuid'] for instance in instances]
return self.db.instance_fault_get_by_instance_uuids(context, uuids)
class AggregateAPI(base.Base):
"""Sub-set of the Compute Manager API for managing host aggregates."""
def __init__(self, **kwargs):
super(AggregateAPI, self).__init__(**kwargs)
def create_aggregate(self, context, aggregate_name, availability_zone):
"""Creates the model for the aggregate."""
values = {"name": aggregate_name,
"availability_zone": availability_zone}
aggregate = self.db.aggregate_create(context, values)
return dict(aggregate.iteritems())
def get_aggregate(self, context, aggregate_id):
"""Get an aggregate by id."""
aggregate = self.db.aggregate_get(context, aggregate_id)
return self._get_aggregate_info(context, aggregate)
def get_aggregate_list(self, context):
"""Get all the aggregates for this zone."""
aggregates = self.db.aggregate_get_all(context, read_deleted="no")
return [self._get_aggregate_info(context, a) for a in aggregates]
def update_aggregate(self, context, aggregate_id, values):
"""Update the properties of an aggregate."""
aggregate = self.db.aggregate_update(context, aggregate_id, values)
return self._get_aggregate_info(context, aggregate)
def update_aggregate_metadata(self, context, aggregate_id, metadata):
"""Updates the aggregate metadata.
If a key is set to None, it gets removed from the aggregate metadata.
"""
# As a first release of the host aggregates blueprint, this call is
# pretty dumb, in the sense that interacts only with the model.
# In later releasses, updating metadata may trigger virt actions like
# the setup of shared storage, or more generally changes to the
# underlying hypervisor pools.
for key in metadata.keys():
if not metadata[key]:
try:
self.db.aggregate_metadata_delete(context,
aggregate_id, key)
metadata.pop(key)
except exception.AggregateMetadataNotFound, e:
LOG.warn(e.message)
self.db.aggregate_metadata_add(context, aggregate_id, metadata)
return self.get_aggregate(context, aggregate_id)
def delete_aggregate(self, context, aggregate_id):
"""Deletes the aggregate."""
hosts = self.db.aggregate_host_get_all(context, aggregate_id,
read_deleted="no")
if len(hosts) > 0:
raise exception.InvalidAggregateAction(action='delete',
aggregate_id=aggregate_id,
reason='not empty')
values = {'operational_state': aggregate_states.DISMISSED}
self.db.aggregate_update(context, aggregate_id, values)
self.db.aggregate_delete(context, aggregate_id)
def add_host_to_aggregate(self, context, aggregate_id, host):
"""Adds the host to an aggregate."""
# validates the host; ComputeHostNotFound is raised if invalid
service = self.db.service_get_all_compute_by_host(context, host)[0]
# add host, and reflects action in the aggregate operational state
aggregate = self.db.aggregate_get(context, aggregate_id)
if aggregate.operational_state in [aggregate_states.CREATED,
aggregate_states.ACTIVE]:
if service.availability_zone != aggregate.availability_zone:
raise exception.\
InvalidAggregateAction(action='add host',
aggregate_id=aggregate_id,
reason='availibility zone mismatch')
self.db.aggregate_host_add(context, aggregate_id, host)
if aggregate.operational_state == aggregate_states.CREATED:
values = {'operational_state': aggregate_states.CHANGING}
self.db.aggregate_update(context, aggregate_id, values)
queue = self.db.queue_get_for(context, service.topic, host)
rpc.cast(context, queue, {"method": "add_aggregate_host",
"args": {"aggregate_id": aggregate_id,
"host": host}, })
return self.get_aggregate(context, aggregate_id)
else:
invalid = {aggregate_states.CHANGING: 'setup in progress',
aggregate_states.DISMISSED: 'aggregate deleted',
aggregate_states.ERROR: 'aggregate in error', }
if aggregate.operational_state in invalid.keys():
raise exception.\
InvalidAggregateAction(action='add host',
aggregate_id=aggregate_id,
reason=invalid[aggregate.operational_state])
def remove_host_from_aggregate(self, context, aggregate_id, host):
"""Removes host from the aggregate."""
# validates the host; ComputeHostNotFound is raised if invalid
service = self.db.service_get_all_compute_by_host(context, host)[0]
aggregate = self.db.aggregate_get(context, aggregate_id)
if aggregate.operational_state in [aggregate_states.ACTIVE,
aggregate_states.ERROR]:
self.db.aggregate_host_delete(context, aggregate_id, host)
queue = self.db.queue_get_for(context, service.topic, host)
rpc.cast(context, queue, {"method": "remove_aggregate_host",
"args": {"aggregate_id": aggregate_id,
"host": host}, })
return self.get_aggregate(context, aggregate_id)
elif aggregate.operational_state == aggregate_states.DISMISSED:
raise exception.InvalidAggregateAction(action='add host',
aggregate_id=aggregate_id,
reason='aggregate deleted')
def _get_aggregate_info(self, context, aggregate):
"""Builds a dictionary with aggregate props, metadata and hosts."""
metadata = self.db.aggregate_metadata_get(context, aggregate.id)
hosts = self.db.aggregate_host_get_all(context, aggregate.id,
read_deleted="no")
result = dict(aggregate.iteritems())
result["metadata"] = metadata
result["hosts"] = hosts
return result

View File

@ -2207,3 +2207,11 @@ class ComputeManager(manager.SchedulerDependentManager):
instance_uuid,
vm_state=vm_states.ERROR,
task_state=None)
def add_aggregate_host(self, context, aggregate_id, host):
"""Adds a host to a physical hypervisor pool."""
raise NotImplementedError()
def remove_aggregate_host(self, context, aggregate_id, host):
"""Removes a host from a physical hypervisor pool."""
raise NotImplementedError()

View File

@ -4257,7 +4257,7 @@ def aggregate_create(context, values, metadata=None):
try:
aggregate = models.Aggregate()
aggregate.update(values)
aggregate.operational_state = aggregate_states.BUILDING
aggregate.operational_state = aggregate_states.CREATED
aggregate.save()
except exception.DBError:
raise exception.AggregateNameExists(aggregate_name=values['name'])
@ -4435,20 +4435,27 @@ def aggregate_host_delete(context, aggregate_id, host):
@require_admin_context
@require_aggregate_exists
def aggregate_host_add(context, aggregate_id, host):
session = get_session()
host_ref = _aggregate_get_query(context,
models.AggregateHost,
models.AggregateHost.aggregate_id,
aggregate_id,
read_deleted='no').\
session=session,
read_deleted='yes').\
filter_by(host=host).first()
if not host_ref:
try:
host_ref = models.AggregateHost()
values = {"host": host, "aggregate_id": aggregate_id, }
host_ref.update(values)
host_ref.save()
host_ref.save(session=session)
except exception.DBError:
raise exception.AggregateHostConflict(host=host)
elif host_ref.deleted:
host_ref.update({'deleted': False,
'deleted_at': None,
'updated_at': literal_column('updated_at')})
host_ref.save(session=session)
else:
raise exception.AggregateHostExists(host=host,
aggregate_id=aggregate_id)

View File

@ -261,6 +261,11 @@ class InvalidParameterValue(Invalid):
message = _("%(err)s")
class InvalidAggregateAction(Invalid):
message = _("Cannot perform action '%(action)s' on aggregate "
"%(aggregate_id)s. Reason: %(reason)s.")
class InstanceInvalidState(Invalid):
message = _("Instance %(instance_uuid)s in %(attr)s %(state)s. Cannot "
"%(method)s while the instance is in this state.")

View File

@ -31,6 +31,7 @@ import nova
import nova.common.policy
from nova import compute
import nova.compute.api
from nova.compute import aggregate_states
from nova.compute import instance_types
from nova.compute import manager as compute_manager
from nova.compute import power_state
@ -3011,6 +3012,216 @@ class ComputeAPITestCase(BaseTestCase):
db.instance_destroy(self.context, instance['id'])
def fake_rpc_method(context, topic, msg, do_cast=True):
pass
def _create_service_entries(context, values={'avail_zone1': ['fake_host1',
'fake_host2'],
'avail_zone2': ['fake_host3'], }):
for avail_zone, hosts in values.iteritems():
for host in hosts:
db.service_create(context,
{'host': host,
'binary': 'nova-compute',
'topic': 'compute',
'report_count': 0,
'availability_zone': avail_zone})
return values
class ComputeAPIAggrTestCase(test.TestCase):
def setUp(self):
super(ComputeAPIAggrTestCase, self).setUp()
self.api = compute.AggregateAPI()
self.context = context.get_admin_context()
self.stubs.Set(rpc, 'call', fake_rpc_method)
self.stubs.Set(rpc, 'cast', fake_rpc_method)
def tearDown(self):
super(ComputeAPIAggrTestCase, self).tearDown()
def test_update_aggregate_metadata(self):
aggr = self.api.create_aggregate(self.context, 'fake_aggregate',
'fake_availability_zone')
metadata = {'foo_key1': 'foo_value1',
'foo_key2': 'foo_value2', }
aggr = self.api.update_aggregate_metadata(self.context, aggr['id'],
metadata)
metadata['foo_key1'] = None
expected = self.api.update_aggregate_metadata(self.context,
aggr['id'], metadata)
self.assertDictMatch(expected['metadata'], {'foo_key2': 'foo_value2'})
def test_delete_aggregate(self):
"""Ensure we can delete an aggregate."""
aggr = self.api.create_aggregate(self.context, 'fake_aggregate',
'fake_availability_zone')
self.api.delete_aggregate(self.context, aggr['id'])
expected = db.aggregate_get(self.context, aggr['id'],
read_deleted='yes')
self.assertNotEqual(aggr['operational_state'],
expected['operational_state'])
def test_delete_non_empty_aggregate(self):
"""Ensure InvalidAggregateAction is raised when non empty aggregate."""
aggr = self.api.create_aggregate(self.context, 'fake_aggregate',
'fake_availability_zone')
_create_service_entries(self.context,
{'fake_availability_zone': ['fake_host']})
self.api.add_host_to_aggregate(self.context, aggr['id'], 'fake_host')
self.assertRaises(exception.InvalidAggregateAction,
self.api.delete_aggregate, self.context, aggr['id'])
def test_add_host_to_aggregate(self):
"""Ensure we can add a host to an aggregate."""
values = _create_service_entries(self.context)
fake_zone = values.keys()[0]
fake_host = values[fake_zone][0]
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', fake_zone)
aggr = self.api.add_host_to_aggregate(self.context,
aggr['id'], fake_host)
self.assertEqual(aggr['operational_state'], aggregate_states.CHANGING)
def test_add_host_to_aggregate_multiple(self):
"""Ensure we can add multiple hosts to an aggregate."""
values = _create_service_entries(self.context)
fake_zone = values.keys()[0]
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', fake_zone)
# let's mock the fact that the aggregate is active already!
status = {'operational_state': aggregate_states.ACTIVE}
db.aggregate_update(self.context, aggr['id'], status)
for host in values[fake_zone]:
aggr = self.api.add_host_to_aggregate(self.context,
aggr['id'], host)
self.assertEqual(len(aggr['hosts']), len(values[fake_zone]))
self.assertEqual(aggr['operational_state'],
aggregate_states.ACTIVE)
def test_add_host_to_aggregate_invalid_changing_status(self):
"""Ensure InvalidAggregateAction is raised when adding host while
aggregate is not ready."""
values = _create_service_entries(self.context)
fake_zone = values.keys()[0]
fake_host = values[fake_zone][0]
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', fake_zone)
aggr = self.api.add_host_to_aggregate(self.context,
aggr['id'], fake_host)
self.assertEqual(aggr['operational_state'],
aggregate_states.CHANGING)
self.assertRaises(exception.InvalidAggregateAction,
self.api.add_host_to_aggregate, self.context,
aggr['id'], fake_host)
def test_add_host_to_aggregate_invalid_dismissed_status(self):
"""Ensure InvalidAggregateAction is raised when aggregate is
deleted."""
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', 'fake_zone')
_create_service_entries(self.context, {'fake_zone': ['fake_host']})
# let's mock the fact that the aggregate is dismissed!
status = {'operational_state': aggregate_states.DISMISSED}
db.aggregate_update(self.context, aggr['id'], status)
self.assertRaises(exception.InvalidAggregateAction,
self.api.add_host_to_aggregate, self.context,
aggr['id'], 'fake_host')
def test_add_host_to_aggregate_invalid_error_status(self):
"""Ensure InvalidAggregateAction is raised when aggregate is
in error."""
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', 'fake_zone')
_create_service_entries(self.context, {'fake_zone': ['fake_host']})
# let's mock the fact that the aggregate is in error!
status = {'operational_state': aggregate_states.ERROR}
db.aggregate_update(self.context, aggr['id'], status)
self.assertRaises(exception.InvalidAggregateAction,
self.api.add_host_to_aggregate, self.context,
aggr['id'], 'fake_host')
def test_add_host_to_aggregate_zones_mismatch(self):
"""Ensure InvalidAggregateAction is raised when zones don't match."""
_create_service_entries(self.context, {'fake_zoneX': ['fake_host']})
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', 'fake_zoneY')
self.assertRaises(exception.InvalidAggregateAction,
self.api.add_host_to_aggregate,
self.context, aggr['id'], 'fake_host')
def test_add_host_to_aggregate_raise_not_found(self):
"""Ensure ComputeHostNotFound is raised when adding invalid host."""
aggr = self.api.create_aggregate(self.context, 'fake_aggregate',
'fake_availability_zone')
self.assertRaises(exception.ComputeHostNotFound,
self.api.add_host_to_aggregate,
self.context, aggr['id'], 'invalid_host')
def test_remove_host_from_aggregate_active(self):
"""Ensure we can remove a host from an aggregate."""
values = _create_service_entries(self.context)
fake_zone = values.keys()[0]
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', fake_zone)
# let's mock the fact that the aggregate is active already!
status = {'operational_state': aggregate_states.ACTIVE}
db.aggregate_update(self.context, aggr['id'], status)
for host in values[fake_zone]:
aggr = self.api.add_host_to_aggregate(self.context,
aggr['id'], host)
expected = self.api.remove_host_from_aggregate(self.context,
aggr['id'],
values[fake_zone][0])
self.assertEqual(len(aggr['hosts']) - 1, len(expected['hosts']))
self.assertEqual(expected['operational_state'],
aggregate_states.ACTIVE)
def test_remove_host_from_aggregate_error(self):
"""Ensure we can remove a host from an aggregate even if in error."""
values = _create_service_entries(self.context)
fake_zone = values.keys()[0]
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', fake_zone)
# let's mock the fact that the aggregate is ready!
status = {'operational_state': aggregate_states.ACTIVE}
db.aggregate_update(self.context, aggr['id'], status)
for host in values[fake_zone]:
aggr = self.api.add_host_to_aggregate(self.context,
aggr['id'], host)
# let's mock the fact that the aggregate is in error!
status = {'operational_state': aggregate_states.ERROR}
expected = self.api.remove_host_from_aggregate(self.context,
aggr['id'],
values[fake_zone][0])
self.assertEqual(len(aggr['hosts']) - 1, len(expected['hosts']))
self.assertEqual(expected['operational_state'],
aggregate_states.ACTIVE)
def test_remove_host_from_aggregate_invalid_dismissed_status(self):
"""Ensure InvalidAggregateAction is raised when aggregate is
deleted."""
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', 'fake_zone')
_create_service_entries(self.context, {'fake_zone': ['fake_host']})
# let's mock the fact that the aggregate is dismissed!
status = {'operational_state': aggregate_states.DISMISSED}
db.aggregate_update(self.context, aggr['id'], status)
self.assertRaises(exception.InvalidAggregateAction,
self.api.remove_host_from_aggregate, self.context,
aggr['id'], 'fake_host')
def test_remove_host_from_aggregate_raise_not_found(self):
"""Ensure ComputeHostNotFound is raised when removing invalid host."""
aggr = self.api.create_aggregate(self.context, 'fake_aggregate',
'fake_availability_zone')
self.assertRaises(exception.ComputeHostNotFound,
self.api.remove_host_from_aggregate,
self.context, aggr['id'], 'invalid_host')
class ComputePolicyTestCase(BaseTestCase):
def setUp(self):

View File

@ -304,7 +304,7 @@ class AggregateDBApiTestCase(test.TestCase):
def test_aggregate_create(self):
"""Ensure aggregate can be created with no metadata."""
result = _create_aggregate(metadata=None)
self.assertEqual(result['operational_state'], 'building')
self.assertEqual(result['operational_state'], 'created')
def test_aggregate_create_raise_exist_exc(self):
"""Ensure aggregate names are distinct."""
@ -476,6 +476,17 @@ class AggregateDBApiTestCase(test.TestCase):
expected = db.aggregate_host_get_all(ctxt, result.id)
self.assertEqual(_get_fake_aggr_hosts(), expected)
def test_aggregate_host_add_deleted(self):
"""Ensure we can add a host that was previously deleted."""
ctxt = context.get_admin_context()
result = _create_aggregate_with_hosts(context=ctxt, metadata=None)
host = _get_fake_aggr_hosts()[0]
db.aggregate_host_delete(ctxt, result.id, host)
db.aggregate_host_add(ctxt, result.id, host)
expected = db.aggregate_host_get_all(ctxt, result.id,
read_deleted='no')
self.assertEqual(len(expected), 1)
def test_aggregate_host_add_duplicate_raise_conflict(self):
"""Ensure we cannot add host to distinct aggregates."""
ctxt = context.get_admin_context()