Aggregate create and destroy work against API db

Make aggregate.create() and destroy() use the API rather than cell database.
Also block aggregate creation until main database empty. This makes
Aggregate.create() fail until the main database has had all of its aggreagtes
migrated. Since we want to avoid any overlap or clashes in integer ids we
need to enforce this.

Note that this includes a change to a notification sample, which encodes
the function and module of a sample exception (which happens to be during
an aggregate operation). Since the notifications are encoding internal
function names, which can and will change over time, this is an expected
change.

blueprint cells-aggregate-api-db

Co-Authored-By: Dan Smith <dansmith@redhat.com>
Change-Id: Ida70e3c05f93d6044ddef4fcbc1af999ac1b1944
This commit is contained in:
Mark Doffman 2016-05-11 13:04:30 -05:00 committed by Dan Smith
parent 2110b69656
commit 9cfd6fc760
7 changed files with 318 additions and 23 deletions

View File

@ -71,6 +71,9 @@ class AggregateController(wsgi.Controller):
aggregate = self.api.create_aggregate(context, name, avail_zone) aggregate = self.api.create_aggregate(context, name, avail_zone)
except exception.AggregateNameExists as e: except exception.AggregateNameExists as e:
raise exc.HTTPConflict(explanation=e.format_message()) raise exc.HTTPConflict(explanation=e.format_message())
except exception.ObjectActionError:
raise exc.HTTPConflict(explanation=_(
'Not all aggregates have been migrated to the API database'))
except exception.InvalidAggregateAction as e: except exception.InvalidAggregateAction as e:
raise exc.HTTPBadRequest(explanation=e.format_message()) raise exc.HTTPBadRequest(explanation=e.format_message())

View File

@ -74,9 +74,11 @@ class Aggregate(API_BASE):
uuid = Column(String(36)) uuid = Column(String(36))
name = Column(String(255)) name = Column(String(255))
_hosts = orm.relationship(AggregateHost, _hosts = orm.relationship(AggregateHost,
primaryjoin='Aggregate.id == AggregateHost.aggregate_id') primaryjoin='Aggregate.id == AggregateHost.aggregate_id',
cascade='delete')
_metadata = orm.relationship(AggregateMetadata, _metadata = orm.relationship(AggregateMetadata,
primaryjoin='Aggregate.id == AggregateMetadata.aggregate_id') primaryjoin='Aggregate.id == AggregateMetadata.aggregate_id',
cascade='delete')
@property @property
def _extra_keys(self): def _extra_keys(self):

View File

@ -23,8 +23,9 @@ from nova.compute import utils as compute_utils
from nova import db from nova import db
from nova.db.sqlalchemy import api as db_api from nova.db.sqlalchemy import api as db_api
from nova.db.sqlalchemy import api_models from nova.db.sqlalchemy import api_models
from nova.db.sqlalchemy import models as main_models
from nova import exception from nova import exception
from nova.i18n import _ from nova.i18n import _, _LW
from nova import objects from nova import objects
from nova.objects import base from nova.objects import base
from nova.objects import fields from nova.objects import fields
@ -159,6 +160,45 @@ def _metadata_delete_from_db(context, aggregate_id, key):
aggregate_id=aggregate_id, metadata_key=key) aggregate_id=aggregate_id, metadata_key=key)
@db_api.api_context_manager.writer
def _aggregate_create_in_db(context, values, metadata=None):
query = context.session.query(api_models.Aggregate)
query = query.filter(api_models.Aggregate.name == values['name'])
aggregate = query.first()
if not aggregate:
aggregate = api_models.Aggregate()
aggregate.update(values)
aggregate.save(context.session)
# We don't want these to be lazy loaded later. We know there is
# nothing here since we just created this aggregate.
aggregate._hosts = []
aggregate._metadata = []
else:
raise exception.AggregateNameExists(aggregate_name=values['name'])
if metadata:
_metadata_add_to_db(context, aggregate.id, metadata)
context.session.expire(aggregate, ['_metadata'])
aggregate._metadata
return aggregate
@db_api.api_context_manager.writer
def _aggregate_delete_from_db(context, aggregate_id):
# Delete Metadata first
context.session.query(api_models.AggregateMetadata).\
filter_by(aggregate_id=aggregate_id).\
delete()
count = context.session.query(api_models.Aggregate).\
filter(api_models.Aggregate.id == aggregate_id).\
delete()
if count == 0:
raise exception.AggregateNotFound(aggregate_id=aggregate_id)
@db_api.api_context_manager.writer @db_api.api_context_manager.writer
def _aggregate_update_to_db(context, aggregate_id, values): def _aggregate_update_to_db(context, aggregate_id, values):
aggregate = _aggregate_get_from_db(context, aggregate_id) aggregate = _aggregate_get_from_db(context, aggregate_id)
@ -285,11 +325,31 @@ class Aggregate(base.NovaPersistentObject, base.NovaObject):
db_aggregate = db.aggregate_get_by_uuid(context, aggregate_uuid) db_aggregate = db.aggregate_get_by_uuid(context, aggregate_uuid)
return cls._from_db_object(context, cls(), db_aggregate) return cls._from_db_object(context, cls(), db_aggregate)
@staticmethod
@db_api.main_context_manager.reader
def _ensure_migrated(context):
result = context.session.query(main_models.Aggregate).\
filter_by(deleted=0).count()
if result:
LOG.warning(
_LW('Main database contains %(count)i unmigrated aggregates'),
{'count': result})
return result == 0
@base.remotable @base.remotable
def create(self): def create(self):
if self.obj_attr_is_set('id'): if self.obj_attr_is_set('id'):
raise exception.ObjectActionError(action='create', raise exception.ObjectActionError(action='create',
reason='already created') reason='already created')
# NOTE(mdoff): Once we have made it past a point where we know
# all aggregates have been migrated, we can remove this. Ideally
# in Ocata with a blocker migration to be sure.
if not self._ensure_migrated(self._context):
raise exception.ObjectActionError(
action='create',
reason='main database still contains aggregates')
self._assert_no_hosts('create') self._assert_no_hosts('create')
updates = self.obj_get_changes() updates = self.obj_get_changes()
payload = dict(updates) payload = dict(updates)
@ -304,7 +364,7 @@ class Aggregate(base.NovaPersistentObject, base.NovaObject):
"create.start", "create.start",
payload) payload)
metadata = updates.pop('metadata', None) metadata = updates.pop('metadata', None)
db_aggregate = db.aggregate_create(self._context, updates, db_aggregate = _aggregate_create_in_db(self._context, updates,
metadata=metadata) metadata=metadata)
self._from_db_object(self._context, self, db_aggregate) self._from_db_object(self._context, self, db_aggregate)
payload['aggregate_id'] = self.id payload['aggregate_id'] = self.id
@ -372,6 +432,9 @@ class Aggregate(base.NovaPersistentObject, base.NovaObject):
@base.remotable @base.remotable
def destroy(self): def destroy(self):
try:
_aggregate_delete_from_db(self._context, self.id)
except exception.AggregateNotFound:
db.aggregate_delete(self._context, self.id) db.aggregate_delete(self._context, self.id)
@base.remotable @base.remotable

View File

@ -23,6 +23,7 @@ from nova import exception
from nova import test from nova import test
from nova.tests import fixtures from nova.tests import fixtures
from nova.tests.unit import matchers from nova.tests.unit import matchers
from nova.tests.unit.objects.test_objects import compare_obj as base_compare
from nova.tests import uuidsentinel from nova.tests import uuidsentinel
import nova.objects.aggregate as aggregate_obj import nova.objects.aggregate as aggregate_obj
@ -205,6 +206,65 @@ class AggregateObjectDbTestCase(test.NoDBTestCase):
key='goodkey') key='goodkey')
self.assertEqual(2, len(rl1)) self.assertEqual(2, len(rl1))
def test_aggregate_create_in_db(self):
fake_create_aggregate = {
'name': 'fake-aggregate',
}
agg = aggregate_obj._aggregate_create_in_db(self.context,
fake_create_aggregate)
result = aggregate_obj._aggregate_get_from_db(self.context,
agg.id)
self.assertEqual(result.name, fake_create_aggregate['name'])
def test_aggregate_create_in_db_with_metadata(self):
fake_create_aggregate = {
'name': 'fake-aggregate',
}
agg = aggregate_obj._aggregate_create_in_db(self.context,
fake_create_aggregate,
metadata={'goodkey': 'good'})
result = aggregate_obj._aggregate_get_from_db(self.context,
agg.id)
md = aggregate_obj._get_by_metadata_key_from_db(self.context,
key='goodkey')
self.assertEqual(len(md), 1)
self.assertEqual(md[0]['id'], agg.id)
self.assertEqual(result.name, fake_create_aggregate['name'])
def test_aggregate_create_raise_exist_exc(self):
fake_create_aggregate = {
'name': 'fake-aggregate',
}
aggregate_obj._aggregate_create_in_db(self.context,
fake_create_aggregate)
self.assertRaises(exception.AggregateNameExists,
aggregate_obj._aggregate_create_in_db,
self.context,
fake_create_aggregate,
metadata=None)
def test_aggregate_delete(self):
result = _create_aggregate(self.context, metadata=None)
aggregate_obj._aggregate_delete_from_db(self.context, result['id'])
self.assertRaises(exception.AggregateNotFound,
aggregate_obj._aggregate_get_from_db,
self.context, result['id'])
def test_aggregate_delete_raise_not_found(self):
# this does not exist!
aggregate_id = 45
self.assertRaises(exception.AggregateNotFound,
aggregate_obj._aggregate_delete_from_db,
self.context, aggregate_id)
def test_aggregate_delete_with_metadata(self):
result = _create_aggregate(self.context,
metadata={'availability_zone': 'fake_avail_zone'})
aggregate_obj._aggregate_delete_from_db(self.context, result['id'])
self.assertRaises(exception.AggregateNotFound,
aggregate_obj._aggregate_get_from_db,
self.context, result['id'])
def test_aggregate_update(self): def test_aggregate_update(self):
created = _create_aggregate(self.context, created = _create_aggregate(self.context,
metadata={'availability_zone': 'fake_avail_zone'}) metadata={'availability_zone': 'fake_avail_zone'})
@ -408,3 +468,128 @@ class AggregateObjectDbTestCase(test.NoDBTestCase):
self.assertRaises(exception.AggregateMetadataNotFound, self.assertRaises(exception.AggregateMetadataNotFound,
aggregate_obj._metadata_delete_from_db, aggregate_obj._metadata_delete_from_db,
self.context, result['id'], 'foo_key') self.context, result['id'], 'foo_key')
def create_aggregate(context, db_id, in_api=True):
if in_api:
fake_aggregate = _get_fake_aggregate(db_id, in_api=False, result=False)
aggregate_obj._aggregate_create_in_db(context, fake_aggregate,
metadata=_get_fake_metadata(db_id))
for host in _get_fake_hosts(db_id):
aggregate_obj._host_add_to_db(context, fake_aggregate['id'], host)
else:
fake_aggregate = _get_fake_aggregate(db_id, in_api=False, result=False)
db.aggregate_create(context, fake_aggregate,
metadata=_get_fake_metadata(db_id))
for host in _get_fake_hosts(db_id):
db.aggregate_host_add(context, fake_aggregate['id'], host)
def compare_obj(test, result, source):
source['deleted'] = False
def updated_at_comparator(result, source):
return True
return base_compare(test, result, source, subs=SUBS,
comparators={'updated_at': updated_at_comparator})
class AggregateObjectCellTestCase(test.NoDBTestCase):
"""Tests for the case where all aggregate data is in Cell DB"""
USES_DB_SELF = True
def setUp(self):
super(AggregateObjectCellTestCase, self).setUp()
self.context = context.RequestContext('fake-user', 'fake-project')
self.useFixture(fixtures.Database())
self.useFixture(fixtures.Database(database='api'))
self._seed_data()
def _seed_data(self):
for i in range(1, 10):
create_aggregate(self.context, i, in_api=False)
def test_get_by_id(self):
for i in range(1, 10):
agg = aggregate_obj.Aggregate.get_by_id(self.context, i)
compare_obj(self, agg, _get_fake_aggregate(i))
def test_save(self):
for i in range(1, 10):
agg = aggregate_obj.Aggregate.get_by_id(self.context, i)
fake_agg = _get_fake_aggregate(i)
fake_agg['name'] = 'new-name' + str(i)
agg.name = 'new-name' + str(i)
agg.save()
result = aggregate_obj.Aggregate.get_by_id(self.context, i)
compare_obj(self, agg, fake_agg)
compare_obj(self, result, fake_agg)
def test_update_metadata(self):
for i in range(1, 10):
agg = aggregate_obj.Aggregate.get_by_id(self.context, i)
fake_agg = _get_fake_aggregate(i)
fake_agg['metadetails'] = {'constant_key': 'constant_value'}
agg.update_metadata({'unique_key': None})
agg.save()
result = aggregate_obj.Aggregate.get_by_id(self.context, i)
compare_obj(self, agg, fake_agg)
compare_obj(self, result, fake_agg)
def test_destroy(self):
for i in range(1, 10):
agg = aggregate_obj.Aggregate.get_by_id(self.context, i)
agg.destroy()
aggs = aggregate_obj.AggregateList.get_all(self.context)
self.assertEqual(len(aggs), 0)
def test_add_host(self):
for i in range(1, 10):
agg = aggregate_obj.Aggregate.get_by_id(self.context, i)
fake_agg = _get_fake_aggregate(i)
fake_agg['hosts'].append('barbar')
agg.add_host('barbar')
agg.save()
result = aggregate_obj.Aggregate.get_by_id(self.context, i)
compare_obj(self, agg, fake_agg)
compare_obj(self, result, fake_agg)
def test_delete_host(self):
for i in range(1, 10):
agg = aggregate_obj.Aggregate.get_by_id(self.context, i)
fake_agg = _get_fake_aggregate(i)
fake_agg['hosts'].remove('constant_host')
agg.delete_host('constant_host')
result = aggregate_obj.Aggregate.get_by_id(self.context, i)
compare_obj(self, agg, fake_agg)
compare_obj(self, result, fake_agg)
class AggregateObjectApiTestCase(AggregateObjectCellTestCase):
"""Tests the aggregate in the case where all data is in the API DB"""
def _seed_data(self):
for i in range(1, 10):
create_aggregate(self.context, i)
def test_create(self):
new_agg = aggregate_obj.Aggregate(self.context)
new_agg.name = 'new-aggregate'
new_agg.create()
result = aggregate_obj.Aggregate.get_by_id(self.context, new_agg.id)
self.assertEqual(new_agg.name, result.name)
class AggregateObjectMixedTestCase(AggregateObjectCellTestCase):
"""Tests the aggregate in the case where data is in both databases"""
def _seed_data(self):
for i in range(1, 6):
create_aggregate(self.context, i)
for i in range(6, 10):
create_aggregate(self.context, i, in_api=False)
def test_create(self):
new_agg = aggregate_obj.Aggregate(self.context)
new_agg.name = 'new-aggregate'
self.assertRaises(exception.ObjectActionError,
new_agg.create)

View File

@ -154,6 +154,17 @@ class AggregateTestCaseV21(test.NoDBTestCase):
{"name": "test", {"name": "test",
"availability_zone": "nova1"}}) "availability_zone": "nova1"}})
@mock.patch.object(compute_api.AggregateAPI, 'create_aggregate')
def test_create_with_unmigrated_aggregates(self, mock_create_aggregate):
mock_create_aggregate.side_effect = \
exception.ObjectActionError(action='create',
reason='main database still contains aggregates')
self.assertRaises(exc.HTTPConflict, self.controller.create,
self.req, body={"aggregate":
{"name": "test",
"availability_zone": "nova1"}})
def test_create_with_incorrect_availability_zone(self): def test_create_with_incorrect_availability_zone(self):
def stub_create_aggregate(context, name, availability_zone): def stub_create_aggregate(context, name, availability_zone):
raise exception.InvalidAggregateAction(action='create_aggregate', raise exception.InvalidAggregateAction(action='create_aggregate',

View File

@ -110,25 +110,30 @@ class _TestAggregateObject(object):
self.assertEqual(fake_aggregate['id'], agg.id) self.assertEqual(fake_aggregate['id'], agg.id)
self.assertFalse(get_by_uuid.called) self.assertFalse(get_by_uuid.called)
@mock.patch.object(db, 'aggregate_create') @mock.patch('nova.objects.aggregate._aggregate_create_in_db')
def test_create(self, mock_aggregate_create): @mock.patch('nova.db.aggregate_create')
mock_aggregate_create.return_value = fake_aggregate def test_create(self, create_mock, api_create_mock):
api_create_mock.return_value = fake_aggregate
agg = aggregate.Aggregate(context=self.context) agg = aggregate.Aggregate(context=self.context)
agg.name = 'foo' agg.name = 'foo'
agg.metadata = {'one': 'two'} agg.metadata = {'one': 'two'}
agg.uuid = uuidsentinel.fake_agg agg.uuid = uuidsentinel.fake_agg
agg.create() agg.create()
api_create_mock.assert_called_once_with(
self.context,
{'name': 'foo', 'uuid': uuidsentinel.fake_agg},
metadata={'one': 'two'})
self.assertFalse(create_mock.called)
self.compare_obj(agg, fake_aggregate, subs=SUBS) self.compare_obj(agg, fake_aggregate, subs=SUBS)
mock_aggregate_create.assert_called_once_with(self.context, api_create_mock.assert_called_once_with(self.context,
{'name': 'foo', 'uuid': uuidsentinel.fake_agg}, {'name': 'foo', 'uuid': uuidsentinel.fake_agg},
metadata={'one': 'two'}) metadata={'one': 'two'})
@mock.patch('nova.objects.aggregate._aggregate_create_in_db')
@mock.patch.object(db, 'aggregate_create') @mock.patch.object(db, 'aggregate_create')
def test_recreate_fails(self, mock_aggregate_create): def test_recreate_fails(self, create_mock, api_create_mock):
mock_aggregate_create.return_value = fake_aggregate api_create_mock.return_value = fake_aggregate
agg = aggregate.Aggregate(context=self.context) agg = aggregate.Aggregate(context=self.context)
agg.name = 'foo' agg.name = 'foo'
agg.metadata = {'one': 'two'} agg.metadata = {'one': 'two'}
@ -136,10 +141,30 @@ class _TestAggregateObject(object):
agg.create() agg.create()
self.assertRaises(exception.ObjectActionError, agg.create) self.assertRaises(exception.ObjectActionError, agg.create)
mock_aggregate_create.assert_called_once_with(self.context, api_create_mock.assert_called_once_with(self.context,
{'name': 'foo', 'uuid': uuidsentinel.fake_agg}, {'name': 'foo', 'uuid': uuidsentinel.fake_agg},
metadata={'one': 'two'}) metadata={'one': 'two'})
@mock.patch('nova.objects.aggregate._aggregate_delete_from_db')
@mock.patch('nova.db.aggregate_delete')
def test_destroy(self, delete_mock, api_delete_mock):
agg = aggregate.Aggregate(context=self.context)
agg.id = 123
agg.destroy()
self.assertFalse(delete_mock.called)
api_delete_mock.assert_called_with(self.context, 123)
@mock.patch('nova.objects.aggregate._aggregate_delete_from_db')
@mock.patch('nova.db.aggregate_delete')
def test_destroy_cell(self, delete_mock, api_delete_mock):
api_delete_mock.side_effect = exception.AggregateNotFound(
aggregate_id=123)
agg = aggregate.Aggregate(context=self.context)
agg.id = 123
agg.destroy()
delete_mock.assert_called_with(self.context, 123)
api_delete_mock.assert_called_with(self.context, 123)
@mock.patch('nova.objects.aggregate._aggregate_update_to_db') @mock.patch('nova.objects.aggregate._aggregate_update_to_db')
@mock.patch('nova.db.aggregate_update') @mock.patch('nova.db.aggregate_update')
def test_save(self, update_mock, api_update_mock): def test_save(self, update_mock, api_update_mock):
@ -259,14 +284,6 @@ class _TestAggregateObject(object):
123, 123,
{'toadd': 'myval'}) {'toadd': 'myval'})
@mock.patch.object(db, 'aggregate_delete')
def test_destroy(self, mock_aggregate_delete):
agg = aggregate.Aggregate(context=self.context)
agg.id = 123
agg.destroy()
mock_aggregate_delete.assert_called_once_with(self.context, 123)
@mock.patch.object(db, 'aggregate_host_add') @mock.patch.object(db, 'aggregate_host_add')
def test_add_host(self, mock_host_add): def test_add_host(self, mock_host_add):
mock_host_add.return_value = {'host': 'bar'} mock_host_add.return_value = {'host': 'bar'}

View File

@ -0,0 +1,14 @@
---
upgrade:
- Aggregates are being moved to the API database for CellsV2. In this
release, the online data migrations will move any aggregates you have
in your main database to the API database, retaining all
attributes. Until this is complete, new attempts to create aggregates
will return an HTTP 409 to avoid creating aggregates in one place that
may conflict with aggregates you already have and are yet to be
migrated.
- Note that aggregates can no longer be soft-deleted as the API
database does not replicate the legacy soft-delete functionality
from the main database. As such, deleted aggregates are not migrated
and the behavior users will experience will be the same as if a
purge of deleted records was performed.