Merge "Remove deprecated aggregate DB compatibility"

This commit is contained in:
Zuul 2018-02-19 22:43:12 +00:00 committed by Gerrit Code Review
commit 8883bd11e7
5 changed files with 59 additions and 884 deletions

View File

@ -1790,91 +1790,6 @@ def s3_image_create(context, image_uuid):
####################
def aggregate_create(context, values, metadata=None):
"""Create a new aggregate with metadata."""
return IMPL.aggregate_create(context, values, metadata)
def aggregate_get(context, aggregate_id):
"""Get a specific aggregate by id."""
return IMPL.aggregate_get(context, aggregate_id)
def aggregate_get_by_host(context, host, key=None):
"""Get a list of aggregates that host belongs to."""
return IMPL.aggregate_get_by_host(context, host, key)
def aggregate_get_by_uuid(context, uuid):
"""Get a specific aggregate by uuid."""
return IMPL.aggregate_get_by_uuid(context, uuid)
def aggregate_metadata_get_by_host(context, host, key=None):
"""Get metadata for all aggregates that host belongs to.
Returns a dictionary where each value is a set, this is to cover the case
where there two aggregates have different values for the same key.
Optional key filter
"""
return IMPL.aggregate_metadata_get_by_host(context, host, key)
def aggregate_get_by_metadata_key(context, key):
return IMPL.aggregate_get_by_metadata_key(context, key)
def aggregate_update(context, aggregate_id, values):
"""Update the attributes of an aggregates.
If values contains a metadata key, it updates the aggregate metadata too.
"""
return IMPL.aggregate_update(context, aggregate_id, values)
def aggregate_delete(context, aggregate_id):
"""Delete an aggregate."""
return IMPL.aggregate_delete(context, aggregate_id)
def aggregate_get_all(context):
"""Get all aggregates."""
return IMPL.aggregate_get_all(context)
def aggregate_metadata_add(context, aggregate_id, metadata, set_delete=False):
"""Add/update metadata. If set_delete=True, it adds only."""
IMPL.aggregate_metadata_add(context, aggregate_id, metadata, set_delete)
def aggregate_metadata_get(context, aggregate_id):
"""Get metadata for the specified aggregate."""
return IMPL.aggregate_metadata_get(context, aggregate_id)
def aggregate_metadata_delete(context, aggregate_id, key):
"""Delete the given metadata key."""
IMPL.aggregate_metadata_delete(context, aggregate_id, key)
def aggregate_host_add(context, aggregate_id, host):
"""Add host to the aggregate."""
IMPL.aggregate_host_add(context, aggregate_id, host)
def aggregate_host_get_all(context, aggregate_id):
"""Get hosts for the specified aggregate."""
return IMPL.aggregate_host_get_all(context, aggregate_id)
def aggregate_host_delete(context, aggregate_id, host):
"""Delete the given host from the aggregate."""
IMPL.aggregate_host_delete(context, aggregate_id, host)
####################
def instance_fault_create(context, values):
"""Create a new Instance Fault."""
return IMPL.instance_fault_create(context, values)

View File

@ -30,7 +30,6 @@ from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import update_match
from oslo_db.sqlalchemy import utils as sqlalchemyutils
from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import importutils
from oslo_utils import timeutils
from oslo_utils import uuidutils
@ -184,20 +183,6 @@ def require_instance_exists_using_uuid(f):
return wrapper
def require_aggregate_exists(f):
"""Decorator to require the specified aggregate to exist.
Requires the wrapped function to use context and aggregate_id as
their first two arguments.
"""
@functools.wraps(f)
def wrapper(context, aggregate_id, *args, **kwargs):
aggregate_get(context, aggregate_id)
return f(context, aggregate_id, *args, **kwargs)
return wrapper
def select_db_reader_mode(f):
"""Decorator to select synchronous or asynchronous reader mode.
@ -5353,309 +5338,6 @@ def s3_image_create(context, image_uuid):
####################
def _aggregate_get_query(context, model_class, id_field=None, id=None,
read_deleted=None):
columns_to_join = {models.Aggregate: ['_hosts', '_metadata']}
query = model_query(context, model_class, read_deleted=read_deleted)
for c in columns_to_join.get(model_class, []):
query = query.options(joinedload(c))
if id and id_field:
query = query.filter(id_field == id)
return query
@pick_context_manager_writer
def aggregate_create(context, values, metadata=None):
query = _aggregate_get_query(context,
models.Aggregate,
models.Aggregate.name,
values['name'],
read_deleted='no')
aggregate = query.first()
if not aggregate:
aggregate = models.Aggregate()
aggregate.update(values)
aggregate.save(context.session)
# We don't want these to be lazy loaded later. We know there is
# nothing here since we just created this aggregate.
aggregate._hosts = []
aggregate._metadata = []
else:
raise exception.AggregateNameExists(aggregate_name=values['name'])
if metadata:
aggregate_metadata_add(context, aggregate.id, metadata)
# NOTE(pkholkin): '_metadata' attribute was updated during
# 'aggregate_metadata_add' method, so it should be expired and
# read from db
context.session.expire(aggregate, ['_metadata'])
aggregate._metadata
return aggregate
@pick_context_manager_reader
def aggregate_get(context, aggregate_id):
query = _aggregate_get_query(context,
models.Aggregate,
models.Aggregate.id,
aggregate_id)
aggregate = query.first()
if not aggregate:
raise exception.AggregateNotFound(aggregate_id=aggregate_id)
return aggregate
@pick_context_manager_reader
def aggregate_get_by_uuid(context, uuid):
query = _aggregate_get_query(context,
models.Aggregate,
models.Aggregate.uuid,
uuid)
aggregate = query.first()
if not aggregate:
raise exception.AggregateNotFound(aggregate_id=uuid)
return aggregate
@pick_context_manager_reader
def aggregate_get_by_host(context, host, key=None):
"""Return rows that match host (mandatory) and metadata key (optional).
:param host matches host, and is required.
:param key Matches metadata key, if not None.
"""
query = model_query(context, models.Aggregate)
query = query.options(joinedload('_hosts'))
query = query.options(joinedload('_metadata'))
query = query.join('_hosts')
query = query.filter(models.AggregateHost.host == host)
if key:
query = query.join("_metadata").filter(
models.AggregateMetadata.key == key)
return query.all()
@pick_context_manager_reader
def aggregate_metadata_get_by_host(context, host, key=None):
query = model_query(context, models.Aggregate)
query = query.join("_hosts")
query = query.join("_metadata")
query = query.filter(models.AggregateHost.host == host)
query = query.options(contains_eager("_metadata"))
if key:
query = query.filter(models.AggregateMetadata.key == key)
rows = query.all()
metadata = collections.defaultdict(set)
for agg in rows:
for kv in agg._metadata:
metadata[kv['key']].add(kv['value'])
return dict(metadata)
@pick_context_manager_reader
def aggregate_get_by_metadata_key(context, key):
"""Return rows that match metadata key.
:param key Matches metadata key.
"""
query = model_query(context, models.Aggregate)
query = query.join("_metadata")
query = query.filter(models.AggregateMetadata.key == key)
query = query.options(contains_eager("_metadata"))
query = query.options(joinedload("_hosts"))
return query.all()
@pick_context_manager_writer
def aggregate_update(context, aggregate_id, values):
if "name" in values:
aggregate_by_name = (_aggregate_get_query(context,
models.Aggregate,
models.Aggregate.name,
values['name'],
read_deleted='no').first())
if aggregate_by_name and aggregate_by_name.id != aggregate_id:
# there is another aggregate with the new name
raise exception.AggregateNameExists(aggregate_name=values['name'])
aggregate = (_aggregate_get_query(context,
models.Aggregate,
models.Aggregate.id,
aggregate_id).first())
set_delete = True
if aggregate:
if "availability_zone" in values:
az = values.pop('availability_zone')
if 'metadata' not in values:
values['metadata'] = {'availability_zone': az}
set_delete = False
else:
values['metadata']['availability_zone'] = az
metadata = values.get('metadata')
if metadata is not None:
aggregate_metadata_add(context,
aggregate_id,
values.pop('metadata'),
set_delete=set_delete)
aggregate.update(values)
aggregate.save(context.session)
return aggregate_get(context, aggregate.id)
else:
raise exception.AggregateNotFound(aggregate_id=aggregate_id)
@pick_context_manager_writer
def aggregate_delete(context, aggregate_id):
count = _aggregate_get_query(context,
models.Aggregate,
models.Aggregate.id,
aggregate_id).\
soft_delete()
if count == 0:
raise exception.AggregateNotFound(aggregate_id=aggregate_id)
# Delete Metadata
model_query(context, models.AggregateMetadata).\
filter_by(aggregate_id=aggregate_id).\
soft_delete()
@pick_context_manager_reader
def aggregate_get_all(context):
return _aggregate_get_query(context, models.Aggregate).all()
def _aggregate_metadata_get_query(context, aggregate_id, read_deleted="yes"):
return model_query(context,
models.AggregateMetadata,
read_deleted=read_deleted).\
filter_by(aggregate_id=aggregate_id)
@require_aggregate_exists
@pick_context_manager_reader
def aggregate_metadata_get(context, aggregate_id):
rows = model_query(context,
models.AggregateMetadata).\
filter_by(aggregate_id=aggregate_id).all()
return {r['key']: r['value'] for r in rows}
@require_aggregate_exists
@pick_context_manager_writer
def aggregate_metadata_delete(context, aggregate_id, key):
count = _aggregate_get_query(context,
models.AggregateMetadata,
models.AggregateMetadata.aggregate_id,
aggregate_id).\
filter_by(key=key).\
soft_delete()
if count == 0:
raise exception.AggregateMetadataNotFound(aggregate_id=aggregate_id,
metadata_key=key)
@require_aggregate_exists
@pick_context_manager_writer
def aggregate_metadata_add(context, aggregate_id, metadata, set_delete=False,
max_retries=10):
all_keys = metadata.keys()
for attempt in range(max_retries):
try:
query = _aggregate_metadata_get_query(context, aggregate_id,
read_deleted='no')
if set_delete:
query.filter(~models.AggregateMetadata.key.in_(all_keys)).\
soft_delete(synchronize_session=False)
already_existing_keys = set()
if all_keys:
query = query.filter(
models.AggregateMetadata.key.in_(all_keys))
for meta_ref in query.all():
key = meta_ref.key
meta_ref.update({"value": metadata[key]})
already_existing_keys.add(key)
new_entries = []
for key, value in metadata.items():
if key in already_existing_keys:
continue
new_entries.append({"key": key,
"value": value,
"aggregate_id": aggregate_id})
if new_entries:
context.session.execute(
models.AggregateMetadata.__table__.insert(),
new_entries)
return metadata
except db_exc.DBDuplicateEntry:
# a concurrent transaction has been committed,
# try again unless this was the last attempt
with excutils.save_and_reraise_exception() as ctxt:
if attempt < max_retries - 1:
ctxt.reraise = False
else:
LOG.warning("Add metadata failed for aggregate %(id)s "
"after %(retries)s retries",
{"id": aggregate_id, "retries": max_retries})
@require_aggregate_exists
@pick_context_manager_reader
def aggregate_host_get_all(context, aggregate_id):
rows = model_query(context,
models.AggregateHost).\
filter_by(aggregate_id=aggregate_id).all()
return [r.host for r in rows]
@require_aggregate_exists
@pick_context_manager_writer
def aggregate_host_delete(context, aggregate_id, host):
count = _aggregate_get_query(context,
models.AggregateHost,
models.AggregateHost.aggregate_id,
aggregate_id).\
filter_by(host=host).\
soft_delete()
if count == 0:
raise exception.AggregateHostNotFound(aggregate_id=aggregate_id,
host=host)
@require_aggregate_exists
@pick_context_manager_writer
def aggregate_host_add(context, aggregate_id, host):
host_ref = models.AggregateHost()
host_ref.update({"host": host, "aggregate_id": aggregate_id})
try:
host_ref.save(context.session)
except db_exc.DBDuplicateEntry:
raise exception.AggregateHostExists(host=host,
aggregate_id=aggregate_id)
return host_ref
################
@pick_context_manager_writer
def instance_fault_create(context, values):
"""Create a new InstanceFault."""

View File

@ -16,17 +16,12 @@ from oslo_db import exception as db_exc
from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import uuidutils
import six
from sqlalchemy.orm import contains_eager
from sqlalchemy.orm import joinedload
from sqlalchemy.sql import func
from sqlalchemy.sql import text
from nova.compute import utils as compute_utils
from nova import db
from nova.db.sqlalchemy import api as db_api
from nova.db.sqlalchemy import api_models
from nova.db.sqlalchemy import models as main_models
from nova import exception
from nova.i18n import _
from nova import objects
@ -249,10 +244,6 @@ class Aggregate(base.NovaPersistentObject, base.NovaObject):
obj_extra_fields = ['availability_zone']
def __init__(self, *args, **kwargs):
super(Aggregate, self).__init__(*args, **kwargs)
self._in_api = False
@staticmethod
def _from_db_object(context, aggregate, db_aggregate):
for key in aggregate.fields:
@ -264,11 +255,9 @@ class Aggregate(base.NovaPersistentObject, base.NovaObject):
db_key = key
setattr(aggregate, key, db_aggregate[db_key])
# NOTE: This can be removed when we remove compatibility with
# the old aggregate model.
if any(f not in db_aggregate for f in DEPRECATED_FIELDS):
aggregate.deleted_at = None
aggregate.deleted = False
# NOTE: This can be removed when we bump Aggregate to v2.0
aggregate.deleted_at = None
aggregate.deleted = False
aggregate._context = context
aggregate.obj_reset_changes()
@ -281,60 +270,23 @@ class Aggregate(base.NovaPersistentObject, base.NovaObject):
action=action,
reason='hosts updated inline')
@property
def in_api(self):
if self._in_api:
return True
else:
try:
_aggregate_get_from_db(self._context, self.id)
self._in_api = True
except exception.AggregateNotFound:
pass
return self._in_api
@base.remotable_classmethod
def get_by_id(cls, context, aggregate_id):
try:
db_aggregate = _aggregate_get_from_db(context, aggregate_id)
except exception.AggregateNotFound:
db_aggregate = db.aggregate_get(context, aggregate_id)
db_aggregate = _aggregate_get_from_db(context, aggregate_id)
return cls._from_db_object(context, cls(), db_aggregate)
@base.remotable_classmethod
def get_by_uuid(cls, context, aggregate_uuid):
try:
db_aggregate = _aggregate_get_from_db_by_uuid(context,
aggregate_uuid)
except exception.AggregateNotFound:
db_aggregate = db.aggregate_get_by_uuid(context, aggregate_uuid)
db_aggregate = _aggregate_get_from_db_by_uuid(context,
aggregate_uuid)
return cls._from_db_object(context, cls(), db_aggregate)
@staticmethod
@db_api.pick_context_manager_reader
def _ensure_migrated(context):
result = context.session.query(main_models.Aggregate).\
filter_by(deleted=0).count()
if result:
LOG.warning(
'Main database contains %(count)i unmigrated aggregates',
{'count': result})
return result == 0
@base.remotable
def create(self):
if self.obj_attr_is_set('id'):
raise exception.ObjectActionError(action='create',
reason='already created')
# NOTE(mdoff): Once we have made it past a point where we know
# all aggregates have been migrated, we can remove this. Ideally
# in Ocata with a blocker migration to be sure.
if not self._ensure_migrated(self._context):
raise exception.ObjectActionError(
action='create',
reason='main database still contains aggregates')
self._assert_no_hosts('create')
updates = self.obj_get_changes()
payload = dict(updates)
@ -381,12 +333,8 @@ class Aggregate(base.NovaPersistentObject, base.NovaObject):
"updateprop.start",
payload)
updates.pop('id', None)
try:
db_aggregate = _aggregate_update_to_db(self._context,
self.id, updates)
except exception.AggregateNotFound:
db_aggregate = db.aggregate_update(self._context, self.id, updates)
db_aggregate = _aggregate_update_to_db(self._context,
self.id, updates)
compute_utils.notify_about_aggregate_update(self._context,
"updateprop.end",
payload)
@ -394,13 +342,6 @@ class Aggregate(base.NovaPersistentObject, base.NovaObject):
@base.remotable
def update_metadata(self, updates):
if self.in_api:
metadata_delete = _metadata_delete_from_db
metadata_add = _metadata_add_to_db
else:
metadata_delete = db.aggregate_metadata_delete
metadata_add = db.aggregate_metadata_add
payload = {'aggregate_id': self.id,
'meta_data': updates}
compute_utils.notify_about_aggregate_update(self._context,
@ -410,7 +351,7 @@ class Aggregate(base.NovaPersistentObject, base.NovaObject):
for key, value in updates.items():
if value is None:
try:
metadata_delete(self._context, self.id, key)
_metadata_delete_from_db(self._context, self.id, key)
except exception.AggregateMetadataNotFound:
pass
try:
@ -420,7 +361,7 @@ class Aggregate(base.NovaPersistentObject, base.NovaObject):
else:
to_add[key] = value
self.metadata[key] = value
metadata_add(self._context, self.id, to_add)
_metadata_add_to_db(self._context, self.id, to_add)
compute_utils.notify_about_aggregate_update(self._context,
"updatemetadata.end",
payload)
@ -428,17 +369,11 @@ class Aggregate(base.NovaPersistentObject, base.NovaObject):
@base.remotable
def destroy(self):
try:
_aggregate_delete_from_db(self._context, self.id)
except exception.AggregateNotFound:
db.aggregate_delete(self._context, self.id)
_aggregate_delete_from_db(self._context, self.id)
@base.remotable
def add_host(self, host):
if self.in_api:
_host_add_to_db(self._context, self.id, host)
else:
db.aggregate_host_add(self._context, self.id, host)
_host_add_to_db(self._context, self.id, host)
if self.hosts is None:
self.hosts = []
@ -447,10 +382,7 @@ class Aggregate(base.NovaPersistentObject, base.NovaObject):
@base.remotable
def delete_host(self, host):
if self.in_api:
_host_delete_from_db(self._context, self.id, host)
else:
db.aggregate_host_delete(self._context, self.id, host)
_host_delete_from_db(self._context, self.id, host)
self.hosts.remove(host)
self.obj_reset_changes(fields=['hosts'])
@ -507,14 +439,6 @@ class AggregateList(base.ObjectListBase, base.NovaObject):
'objects': fields.ListOfObjectsField('Aggregate'),
}
# NOTE(mdoff): Calls to this can be removed when we remove
# compatibility with the old aggregate model.
@staticmethod
def _fill_deprecated(db_aggregate):
db_aggregate['deleted_at'] = None
db_aggregate['deleted'] = False
return db_aggregate
@classmethod
def _filter_db_aggregates(cls, db_aggregates, hosts):
if not isinstance(hosts, set):
@ -529,89 +453,20 @@ class AggregateList(base.ObjectListBase, base.NovaObject):
@base.remotable_classmethod
def get_all(cls, context):
api_db_aggregates = [cls._fill_deprecated(agg) for agg in
_get_all_from_db(context)]
db_aggregates = db.aggregate_get_all(context)
db_aggregates = _get_all_from_db(context)
return base.obj_make_list(context, cls(context), objects.Aggregate,
db_aggregates + api_db_aggregates)
db_aggregates)
@base.remotable_classmethod
def get_by_host(cls, context, host, key=None):
api_db_aggregates = [cls._fill_deprecated(agg) for agg in
_get_by_host_from_db(context, host, key=key)]
db_aggregates = db.aggregate_get_by_host(context, host, key=key)
db_aggregates = _get_by_host_from_db(context, host, key=key)
return base.obj_make_list(context, cls(context), objects.Aggregate,
db_aggregates + api_db_aggregates)
db_aggregates)
@base.remotable_classmethod
def get_by_metadata_key(cls, context, key, hosts=None):
api_db_aggregates = [cls._fill_deprecated(agg) for agg in
_get_by_metadata_key_from_db(context, key=key)]
db_aggregates = db.aggregate_get_by_metadata_key(context, key=key)
all_aggregates = db_aggregates + api_db_aggregates
db_aggregates = _get_by_metadata_key_from_db(context, key=key)
if hosts is not None:
all_aggregates = cls._filter_db_aggregates(all_aggregates, hosts)
db_aggregates = cls._filter_db_aggregates(db_aggregates, hosts)
return base.obj_make_list(context, cls(context), objects.Aggregate,
all_aggregates)
@db_api.pick_context_manager_reader
def _get_main_db_aggregate_ids(context, limit):
from nova.db.sqlalchemy import models
return [x[0] for x in context.session.query(models.Aggregate.id).
filter_by(deleted=0).
limit(limit)]
def migrate_aggregates(ctxt, count):
main_db_ids = _get_main_db_aggregate_ids(ctxt, count)
if not main_db_ids:
return 0, 0
count_all = len(main_db_ids)
count_hit = 0
for aggregate_id in main_db_ids:
try:
aggregate = Aggregate.get_by_id(ctxt, aggregate_id)
remove = ['metadata', 'hosts']
values = {field: getattr(aggregate, field)
for field in aggregate.fields if field not in remove}
_aggregate_create_in_db(ctxt, values, metadata=aggregate.metadata)
for host in aggregate.hosts:
_host_add_to_db(ctxt, aggregate_id, host)
count_hit += 1
db.aggregate_delete(ctxt, aggregate.id)
except exception.AggregateNotFound:
LOG.warning(
'Aggregate id %(id)i disappeared during migration',
{'id': aggregate_id})
except (exception.AggregateNameExists) as e:
LOG.error(six.text_type(e))
return count_all, count_hit
def _adjust_autoincrement(context, value):
engine = db_api.get_api_engine()
if engine.name == 'postgresql':
# NOTE(danms): If we migrated some aggregates in the above function,
# then we will have confused postgres' sequence for the autoincrement
# primary key. MySQL does not care about this, but since postgres does,
# we need to reset this to avoid a failure on the next aggregate
# creation.
engine.execute(
text('ALTER SEQUENCE aggregates_id_seq RESTART WITH %i;' % (
value)))
@db_api.api_context_manager.reader
def _get_max_aggregate_id(context):
return context.session.query(func.max(api_models.Aggregate.id)).one()[0]
def migrate_aggregate_reset_autoincrement(ctxt, count):
max_id = _get_max_aggregate_id(ctxt) or 0
_adjust_autoincrement(ctxt, max_id + 1)
return 0, 0
db_aggregates)

View File

@ -17,7 +17,6 @@ from oslo_db import exception as db_exc
from oslo_utils import timeutils
from nova import context
from nova import db
from nova.db.sqlalchemy import api as db_api
from nova.db.sqlalchemy import api_models
from nova import exception
@ -115,19 +114,6 @@ class AggregateObjectDbTestCase(test.TestCase):
super(AggregateObjectDbTestCase, self).setUp()
self.context = context.RequestContext('fake-user', 'fake-project')
def test_in_api(self):
ca1 = _create_aggregate(self.context, values={'name': 'fake_agg_1',
'id': 1, 'uuid': uuidsentinel.agg})
ca2 = db.aggregate_create(self.context, {'name': 'fake_agg_2', 'id': 2,
'uuid': uuidsentinel.agg})
api_db_agg = aggregate_obj.Aggregate.get_by_id(self.context, ca1['id'])
cell_db_agg = aggregate_obj.Aggregate.get_by_id(
self.context, ca2['id'])
self.assertTrue(api_db_agg.in_api)
self.assertFalse(cell_db_agg.in_api)
def test_aggregate_get_from_db(self):
result = _create_aggregate_with_hosts(self.context)
expected = aggregate_obj._aggregate_get_from_db(self.context,
@ -463,19 +449,12 @@ class AggregateObjectDbTestCase(test.TestCase):
self.context, result['id'], 'foo_key')
def create_aggregate(context, db_id, in_api=True):
if in_api:
fake_aggregate = _get_fake_aggregate(db_id, in_api=False, result=False)
aggregate_obj._aggregate_create_in_db(context, fake_aggregate,
metadata=_get_fake_metadata(db_id))
for host in _get_fake_hosts(db_id):
aggregate_obj._host_add_to_db(context, fake_aggregate['id'], host)
else:
fake_aggregate = _get_fake_aggregate(db_id, in_api=False, result=False)
db.aggregate_create(context, fake_aggregate,
metadata=_get_fake_metadata(db_id))
for host in _get_fake_hosts(db_id):
db.aggregate_host_add(context, fake_aggregate['id'], host)
def create_aggregate(context, db_id):
fake_aggregate = _get_fake_aggregate(db_id, in_api=False, result=False)
aggregate_obj._aggregate_create_in_db(context, fake_aggregate,
metadata=_get_fake_metadata(db_id))
for host in _get_fake_hosts(db_id):
aggregate_obj._host_add_to_db(context, fake_aggregate['id'], host)
def compare_obj(test, result, source):
@ -488,17 +467,22 @@ def compare_obj(test, result, source):
comparators={'updated_at': updated_at_comparator})
class AggregateObjectCellTestCase(test.TestCase):
"""Tests for the case where all aggregate data is in Cell DB"""
class AggregateObjectTestCase(test.TestCase):
def setUp(self):
super(AggregateObjectCellTestCase, self).setUp()
super(AggregateObjectTestCase, self).setUp()
self.context = context.RequestContext('fake-user', 'fake-project')
self._seed_data()
def _seed_data(self):
for i in range(1, 10):
create_aggregate(self.context, i, in_api=False)
create_aggregate(self.context, i)
def test_create(self):
new_agg = aggregate_obj.Aggregate(self.context)
new_agg.name = 'new-aggregate'
new_agg.create()
result = aggregate_obj.Aggregate.get_by_id(self.context, new_agg.id)
self.assertEqual(new_agg.name, result.name)
def test_get_by_id(self):
for i in range(1, 10):
@ -554,106 +538,3 @@ class AggregateObjectCellTestCase(test.TestCase):
result = aggregate_obj.Aggregate.get_by_id(self.context, i)
compare_obj(self, agg, fake_agg)
compare_obj(self, result, fake_agg)
class AggregateObjectApiTestCase(AggregateObjectCellTestCase):
"""Tests the aggregate in the case where all data is in the API DB"""
def _seed_data(self):
for i in range(1, 10):
create_aggregate(self.context, i)
def test_create(self):
new_agg = aggregate_obj.Aggregate(self.context)
new_agg.name = 'new-aggregate'
new_agg.create()
result = aggregate_obj.Aggregate.get_by_id(self.context, new_agg.id)
self.assertEqual(new_agg.name, result.name)
class AggregateObjectMixedTestCase(AggregateObjectCellTestCase):
"""Tests the aggregate in the case where data is in both databases"""
def _seed_data(self):
for i in range(1, 6):
create_aggregate(self.context, i)
for i in range(6, 10):
create_aggregate(self.context, i, in_api=False)
def test_create(self):
new_agg = aggregate_obj.Aggregate(self.context)
new_agg.name = 'new-aggregate'
self.assertRaises(exception.ObjectActionError,
new_agg.create)
class AggregateObjectMigrationTestCase(AggregateObjectCellTestCase):
"""Tests the aggregate in the case where data is migrated to the API db"""
def _seed_data(self):
for i in range(1, 10):
create_aggregate(self.context, i, in_api=False)
aggregate_obj.migrate_aggregates(self.context, 50)
def test_create(self):
new_agg = aggregate_obj.Aggregate(self.context)
new_agg.name = 'new-aggregate'
new_agg.create()
result = aggregate_obj.Aggregate.get_by_id(self.context, new_agg.id)
self.assertEqual(new_agg.name, result.name)
class AggregateMigrationTestCase(test.TestCase):
def setUp(self):
super(AggregateMigrationTestCase, self).setUp()
self.context = context.get_admin_context()
def test_migration(self):
db.aggregate_create(self.context, {'name': 'foo',
'uuid': uuidsentinel.agg_uuid})
main_aggregates_len = len(db.aggregate_get_all(self.context))
match, done = aggregate_obj.migrate_aggregates(self.context, 50)
self.assertEqual(1, main_aggregates_len)
self.assertEqual(main_aggregates_len, match)
self.assertEqual(main_aggregates_len, done)
self.assertEqual(0, len(db.aggregate_get_all(self.context)))
self.assertEqual(main_aggregates_len,
len(aggregate_obj.AggregateList.get_all(
self.context)))
def test_migrate_aggregate_reset_autoincrement(self):
agg = aggregate_obj.Aggregate(self.context, name='foo')
agg.create()
match, done = aggregate_obj.migrate_aggregate_reset_autoincrement(
self.context, 0)
self.assertEqual(0, match)
self.assertEqual(0, done)
def test_migrate_aggregate_reset_autoincrement_no_aggregates(self):
# NOTE(danms): This validates the "or 0" default if there are no
# aggregates (and thus no max id).
match, done = aggregate_obj.migrate_aggregate_reset_autoincrement(
self.context, 0)
self.assertEqual(0, match)
self.assertEqual(0, done)
@mock.patch('nova.objects.aggregate.LOG.error')
def test_migrate_aggregates_duplicate_unicode(self, mock_log_error):
"""Tests that we handle a duplicate aggregate when migrating and that
we handle when the exception message is in unicode.
"""
# First create an aggregate that will be migrated from main to API DB.
create_aggregate(self.context, 1, in_api=False)
# Now create that same aggregate in the API DB.
create_aggregate(self.context, 1, in_api=True)
# Now let's run the online data migration which will fail to create
# a duplicate aggregate in the API database and will raise
# AggregateNameExists which we want to modify to have a unicode
# message.
with mock.patch.object(exception.AggregateNameExists, 'msg_fmt',
u'\xF0\x9F\x92\xA9'):
match, done = aggregate_obj.migrate_aggregates(self.context, 50)
# we found one
self.assertEqual(1, match)
# but we didn't migrate it
self.assertEqual(0, done)
# and we logged an error for the duplicate aggregate
mock_log_error.assert_called()

View File

@ -15,7 +15,6 @@
import mock
from oslo_utils import timeutils
from nova import db
from nova import exception
from nova.objects import aggregate
from nova.tests.unit import fake_notifier
@ -25,18 +24,8 @@ from nova.tests import uuidsentinel
NOW = timeutils.utcnow().replace(microsecond=0)
fake_aggregate = {
'created_at': NOW,
'updated_at': None,
'deleted': 0,
'deleted_at': None,
'deleted': False,
'id': 123,
'uuid': uuidsentinel.fake_aggregate,
'name': 'fake-aggregate',
'hosts': ['foo', 'bar'],
'metadetails': {'this': 'that'},
}
fake_api_aggregate = {
'created_at': NOW,
'updated_at': None,
'id': 123,
@ -52,53 +41,24 @@ SUBS = {'metadata': 'metadetails'}
class _TestAggregateObject(object):
@mock.patch('nova.objects.aggregate._aggregate_get_from_db')
@mock.patch('nova.db.aggregate_get')
def test_get_by_id_from_api(self, mock_get, mock_get_api):
mock_get_api.return_value = fake_api_aggregate
def test_get_by_id_from_api(self, mock_get_api):
mock_get_api.return_value = fake_aggregate
agg = aggregate.Aggregate.get_by_id(self.context, 123)
self.compare_obj(agg, fake_aggregate, subs=SUBS)
mock_get_api.assert_called_once_with(self.context, 123)
self.assertFalse(mock_get.called)
@mock.patch('nova.objects.aggregate._aggregate_get_from_db')
@mock.patch('nova.db.aggregate_get')
def test_get_by_id(self, mock_get, mock_get_api):
mock_get_api.side_effect = exception.AggregateNotFound(
aggregate_id=123)
mock_get.return_value = fake_aggregate
agg = aggregate.Aggregate.get_by_id(self.context, 123)
self.compare_obj(agg, fake_aggregate, subs=SUBS)
mock_get_api.assert_called_once_with(self.context, 123)
mock_get.assert_called_once_with(self.context, 123)
@mock.patch('nova.objects.aggregate._aggregate_get_from_db_by_uuid')
@mock.patch('nova.db.aggregate_get_by_uuid')
def test_get_by_uuid(self, get_by_uuid, get_by_uuid_api):
get_by_uuid_api.side_effect = exception.AggregateNotFound(
aggregate_id=123)
get_by_uuid.return_value = fake_aggregate
agg = aggregate.Aggregate.get_by_uuid(self.context,
uuidsentinel.fake_aggregate)
self.assertEqual(uuidsentinel.fake_aggregate, agg.uuid)
self.assertEqual(fake_aggregate['id'], agg.id)
@mock.patch('nova.objects.aggregate._aggregate_get_from_db_by_uuid')
@mock.patch('nova.db.aggregate_get_by_uuid')
def test_get_by_uuid_from_api(self, get_by_uuid, get_by_uuid_api):
def test_get_by_uuid_from_api(self, get_by_uuid_api):
get_by_uuid_api.return_value = fake_aggregate
agg = aggregate.Aggregate.get_by_uuid(self.context,
uuidsentinel.fake_aggregate)
self.assertEqual(uuidsentinel.fake_aggregate, agg.uuid)
self.assertEqual(fake_aggregate['id'], agg.id)
self.assertFalse(get_by_uuid.called)
@mock.patch('nova.objects.aggregate._aggregate_create_in_db')
@mock.patch('nova.db.aggregate_create')
def test_create(self, create_mock, api_create_mock):
def test_create(self, api_create_mock):
api_create_mock.return_value = fake_aggregate
agg = aggregate.Aggregate(context=self.context)
agg.name = 'foo'
@ -109,7 +69,6 @@ class _TestAggregateObject(object):
self.context,
{'name': 'foo', 'uuid': uuidsentinel.fake_agg},
metadata={'one': 'two'})
self.assertFalse(create_mock.called)
self.compare_obj(agg, fake_aggregate, subs=SUBS)
api_create_mock.assert_called_once_with(self.context,
@ -117,8 +76,7 @@ class _TestAggregateObject(object):
metadata={'one': 'two'})
@mock.patch('nova.objects.aggregate._aggregate_create_in_db')
@mock.patch.object(db, 'aggregate_create')
def test_recreate_fails(self, create_mock, api_create_mock):
def test_recreate_fails(self, api_create_mock):
api_create_mock.return_value = fake_aggregate
agg = aggregate.Aggregate(context=self.context)
agg.name = 'foo'
@ -132,44 +90,14 @@ class _TestAggregateObject(object):
metadata={'one': 'two'})
@mock.patch('nova.objects.aggregate._aggregate_delete_from_db')
@mock.patch('nova.db.aggregate_delete')
def test_destroy(self, delete_mock, api_delete_mock):
def test_destroy(self, api_delete_mock):
agg = aggregate.Aggregate(context=self.context)
agg.id = 123
agg.destroy()
self.assertFalse(delete_mock.called)
api_delete_mock.assert_called_with(self.context, 123)
@mock.patch('nova.objects.aggregate._aggregate_delete_from_db')
@mock.patch('nova.db.aggregate_delete')
def test_destroy_cell(self, delete_mock, api_delete_mock):
api_delete_mock.side_effect = exception.AggregateNotFound(
aggregate_id=123)
agg = aggregate.Aggregate(context=self.context)
agg.id = 123
agg.destroy()
delete_mock.assert_called_with(self.context, 123)
api_delete_mock.assert_called_with(self.context, 123)
@mock.patch('nova.objects.aggregate._aggregate_update_to_db')
@mock.patch('nova.db.aggregate_update')
def test_save_to_cell(self, update_mock, api_update_mock):
api_update_mock.side_effect = exception.AggregateNotFound(
aggregate_id='foo')
update_mock.return_value = fake_aggregate
agg = aggregate.Aggregate(context=self.context)
agg.id = 123
agg.name = 'fake-aggregate'
agg.save()
self.compare_obj(agg, fake_aggregate, subs=SUBS)
update_mock.assert_called_once_with(self.context,
123,
{'name': 'fake-aggregate'})
self.assertTrue(api_update_mock.called)
@mock.patch('nova.objects.aggregate._aggregate_update_to_db')
@mock.patch('nova.db.aggregate_update')
def test_save_to_api(self, update_mock, api_update_mock):
def test_save_to_api(self, api_update_mock):
api_update_mock.return_value = fake_aggregate
agg = aggregate.Aggregate(context=self.context)
agg.id = 123
@ -179,7 +107,6 @@ class _TestAggregateObject(object):
api_update_mock.assert_called_once_with(self.context,
123,
{'name': 'fake-api-aggregate'})
self.assertFalse(update_mock.called)
api_update_mock.assert_called_once_with(self.context,
123, {'name': 'fake-api-aggregate'})
@ -195,49 +122,8 @@ class _TestAggregateObject(object):
@mock.patch('nova.objects.aggregate._metadata_delete_from_db')
@mock.patch('nova.objects.aggregate._metadata_add_to_db')
@mock.patch('nova.db.aggregate_metadata_delete')
@mock.patch('nova.db.aggregate_metadata_add')
def test_update_metadata(self,
mock_metadata_add,
mock_metadata_delete,
mock_api_metadata_add,
mock_api_metadata_delete):
fake_notifier.NOTIFICATIONS = []
agg = aggregate.Aggregate()
agg._context = self.context
agg.id = 123
agg.metadata = {'foo': 'bar'}
agg.obj_reset_changes()
agg.update_metadata({'todelete': None, 'toadd': 'myval'})
self.assertEqual(2, len(fake_notifier.NOTIFICATIONS))
msg = fake_notifier.NOTIFICATIONS[0]
self.assertEqual('aggregate.updatemetadata.start', msg.event_type)
self.assertEqual({'todelete': None, 'toadd': 'myval'},
msg.payload['meta_data'])
msg = fake_notifier.NOTIFICATIONS[1]
self.assertEqual('aggregate.updatemetadata.end', msg.event_type)
self.assertEqual({'todelete': None, 'toadd': 'myval'},
msg.payload['meta_data'])
self.assertEqual({'foo': 'bar', 'toadd': 'myval'}, agg.metadata)
mock_metadata_add.assert_called_once_with(self.context, 123,
{'toadd': 'myval'})
mock_metadata_delete.assert_called_once_with(self.context, 123,
'todelete')
self.assertFalse(mock_api_metadata_add.called)
self.assertFalse(mock_api_metadata_delete.called)
@mock.patch('nova.objects.Aggregate.in_api')
@mock.patch('nova.objects.aggregate._metadata_delete_from_db')
@mock.patch('nova.objects.aggregate._metadata_add_to_db')
@mock.patch('nova.db.aggregate_metadata_delete')
@mock.patch('nova.db.aggregate_metadata_add')
def test_update_metadata_api(self,
mock_metadata_add,
mock_metadata_delete,
mock_api_metadata_add,
mock_api_metadata_delete,
mock_in_api):
mock_in_api.return_value = True
def test_update_metadata_api(self, mock_api_metadata_add,
mock_api_metadata_delete):
fake_notifier.NOTIFICATIONS = []
agg = aggregate.Aggregate()
agg._context = self.context
@ -259,9 +145,6 @@ class _TestAggregateObject(object):
'todelete')
mock_api_metadata_add.assert_called_once_with(self.context, 123,
{'toadd': 'myval'})
self.assertFalse(mock_metadata_add.called)
self.assertFalse(mock_metadata_delete.called)
mock_api_metadata_delete.assert_called_once_with(self.context,
123,
'todelete')
@ -269,23 +152,9 @@ class _TestAggregateObject(object):
123,
{'toadd': 'myval'})
@mock.patch.object(db, 'aggregate_host_add')
def test_add_host(self, mock_host_add):
mock_host_add.return_value = {'host': 'bar'}
agg = aggregate.Aggregate()
agg.id = 123
agg.hosts = ['foo']
agg._context = self.context
agg.add_host('bar')
self.assertEqual(agg.hosts, ['foo', 'bar'])
mock_host_add.assert_called_once_with(self.context, 123, 'bar')
@mock.patch('nova.db.aggregate_host_add')
@mock.patch('nova.objects.aggregate._host_add_to_db')
@mock.patch('nova.objects.Aggregate.in_api')
def test_add_host_api(self, mock_in_api, mock_host_add_api, mock_host_add):
def test_add_host_api(self, mock_host_add_api):
mock_host_add_api.return_value = {'host': 'bar'}
mock_in_api.return_value = True
agg = aggregate.Aggregate()
agg.id = 123
agg.hosts = ['foo']
@ -293,25 +162,9 @@ class _TestAggregateObject(object):
agg.add_host('bar')
self.assertEqual(agg.hosts, ['foo', 'bar'])
mock_host_add_api.assert_called_once_with(self.context, 123, 'bar')
self.assertFalse(mock_host_add.called)
@mock.patch.object(db, 'aggregate_host_delete')
def test_delete_host(self, mock_host_delete):
agg = aggregate.Aggregate()
agg.id = 123
agg.hosts = ['foo', 'bar']
agg._context = self.context
agg.delete_host('foo')
self.assertEqual(agg.hosts, ['bar'])
mock_host_delete.assert_called_once_with(self.context, 123, 'foo')
@mock.patch('nova.db.aggregate_host_delete')
@mock.patch('nova.objects.aggregate._host_delete_from_db')
@mock.patch('nova.objects.Aggregate.in_api')
def test_delete_host_api(self, mock_in_api,
mock_host_delete_api,
mock_host_delete):
mock_in_api.return_value = True
def test_delete_host_api(self, mock_host_delete_api):
agg = aggregate.Aggregate()
agg.id = 123
agg.hosts = ['foo', 'bar']
@ -319,7 +172,6 @@ class _TestAggregateObject(object):
agg.delete_host('foo')
self.assertEqual(agg.hosts, ['bar'])
mock_host_delete_api.assert_called_once_with(self.context, 123, 'foo')
self.assertFalse(mock_host_delete.called)
def test_availability_zone(self):
agg = aggregate.Aggregate()
@ -327,45 +179,35 @@ class _TestAggregateObject(object):
self.assertEqual('foo', agg.availability_zone)
@mock.patch('nova.objects.aggregate._get_all_from_db')
@mock.patch('nova.db.aggregate_get_all')
def test_get_all(self, mock_get_all, mock_api_get_all):
mock_get_all.return_value = [fake_aggregate]
mock_api_get_all.return_value = [fake_api_aggregate]
def test_get_all(self, mock_api_get_all):
mock_api_get_all.return_value = [fake_aggregate]
aggs = aggregate.AggregateList.get_all(self.context)
self.assertEqual(2, len(aggs))
self.assertEqual(1, len(aggs))
self.compare_obj(aggs[0], fake_aggregate, subs=SUBS)
self.compare_obj(aggs[1], fake_api_aggregate, subs=SUBS)
@mock.patch('nova.objects.aggregate._get_by_host_from_db')
@mock.patch('nova.db.aggregate_get_by_host')
def test_by_host(self, mock_get_by_host, mock_api_get_by_host):
mock_get_by_host.return_value = [fake_aggregate]
mock_api_get_by_host.return_value = [fake_api_aggregate]
def test_by_host(self, mock_api_get_by_host):
mock_api_get_by_host.return_value = [fake_aggregate]
aggs = aggregate.AggregateList.get_by_host(self.context, 'fake-host')
self.assertEqual(2, len(aggs))
self.assertEqual(1, len(aggs))
self.compare_obj(aggs[0], fake_aggregate, subs=SUBS)
self.compare_obj(aggs[1], fake_api_aggregate, subs=SUBS)
@mock.patch('nova.objects.aggregate._get_by_metadata_key_from_db')
@mock.patch('nova.db.aggregate_get_by_metadata_key')
def test_get_by_metadata_key(self,
mock_get_by_metadata_key,
mock_api_get_by_metadata_key):
mock_get_by_metadata_key.return_value = [fake_aggregate]
mock_api_get_by_metadata_key.return_value = [fake_api_aggregate]
def test_get_by_metadata_key(self, mock_api_get_by_metadata_key):
mock_api_get_by_metadata_key.return_value = [fake_aggregate]
aggs = aggregate.AggregateList.get_by_metadata_key(
self.context, 'this')
self.assertEqual(2, len(aggs))
self.assertEqual(1, len(aggs))
self.compare_obj(aggs[0], fake_aggregate, subs=SUBS)
@mock.patch('nova.db.aggregate_get_by_metadata_key')
@mock.patch('nova.objects.aggregate._get_by_metadata_key_from_db')
def test_get_by_metadata_key_and_hosts_no_match(self, get_by_metadata_key):
get_by_metadata_key.return_value = [fake_aggregate]
aggs = aggregate.AggregateList.get_by_metadata_key(
self.context, 'this', hosts=['baz'])
self.assertEqual(0, len(aggs))
@mock.patch('nova.db.aggregate_get_by_metadata_key')
@mock.patch('nova.objects.aggregate._get_by_metadata_key_from_db')
def test_get_by_metadata_key_and_hosts_match(self, get_by_metadata_key):
get_by_metadata_key.return_value = [fake_aggregate]
aggs = aggregate.AggregateList.get_by_metadata_key(