Merge "Add unique constraint to AggregateMetadata"

This commit is contained in:
Jenkins
2013-08-22 12:21:08 +00:00
committed by Gerrit Code Review
5 changed files with 146 additions and 28 deletions

View File

@@ -57,6 +57,7 @@ from nova import exception
from nova.openstack.common.db import exception as db_exc
from nova.openstack.common.db.sqlalchemy import session as db_session
from nova.openstack.common.db.sqlalchemy import utils as sqlalchemyutils
from nova.openstack.common import excutils
from nova.openstack.common.gettextutils import _
from nova.openstack.common import log as logging
from nova.openstack.common import timeutils
@@ -4951,38 +4952,49 @@ def aggregate_metadata_delete(context, aggregate_id, key):
@require_admin_context
@require_aggregate_exists
def aggregate_metadata_add(context, aggregate_id, metadata, set_delete=False):
# NOTE(boris-42): There is a race condition in this method. We should add
# UniqueConstraint on (start_period, uuid, mac, deleted) to
# avoid duplicated aggregate_metadata. This will be
# possible after bp/db-unique-keys implementation.
session = get_session()
def aggregate_metadata_add(context, aggregate_id, metadata, set_delete=False,
max_retries=10):
all_keys = metadata.keys()
with session.begin():
query = _aggregate_metadata_get_query(context, aggregate_id,
read_deleted='no',
session=session)
if set_delete:
query.filter(~models.AggregateMetadata.key.in_(all_keys)).\
soft_delete(synchronize_session=False)
for attempt in xrange(max_retries):
try:
session = get_session()
with session.begin():
query = _aggregate_metadata_get_query(context, aggregate_id,
read_deleted='no',
session=session)
if set_delete:
query.filter(~models.AggregateMetadata.key.in_(all_keys)).\
soft_delete(synchronize_session=False)
query = query.filter(models.AggregateMetadata.key.in_(all_keys))
already_existing_keys = set()
for meta_ref in query.all():
key = meta_ref.key
meta_ref.update({"value": metadata[key]})
already_existing_keys.add(key)
query = \
query.filter(models.AggregateMetadata.key.in_(all_keys))
already_existing_keys = set()
for meta_ref in query.all():
key = meta_ref.key
meta_ref.update({"value": metadata[key]})
already_existing_keys.add(key)
for key, value in metadata.iteritems():
if key in already_existing_keys:
continue
meta_ref = models.AggregateMetadata()
meta_ref.update({"key": key,
"value": value,
"aggregate_id": aggregate_id})
session.add(meta_ref)
for key, value in metadata.iteritems():
if key in already_existing_keys:
continue
meta_ref = models.AggregateMetadata()
meta_ref.update({"key": key,
"value": value,
"aggregate_id": aggregate_id})
session.add(meta_ref)
return metadata
return metadata
except db_exc.DBDuplicateEntry:
# a concurrent transaction has been committed,
# try again unless this was the last attempt
with excutils.save_and_reraise_exception() as ctxt:
if attempt < max_retries - 1:
ctxt.reraise = False
else:
msg = _("Add metadata failed for aggregate %(id)s after "
"%(retries)s retries") % {"id": aggregate_id,
"retries": max_retries}
LOG.warn(msg)
@require_admin_context

View File

@@ -0,0 +1,40 @@
# Copyright 2013 Mirantis Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
from migrate.changeset import UniqueConstraint
from sqlalchemy import MetaData, Table
from nova.db.sqlalchemy import utils
UC_NAME = 'uniq_aggregate_metadata0aggregate_id0key0deleted'
COLUMNS = ('aggregate_id', 'key', 'deleted')
TABLE_NAME = 'aggregate_metadata'
def upgrade(migrate_engine):
meta = MetaData(bind=migrate_engine)
t = Table(TABLE_NAME, meta, autoload=True)
utils.drop_old_duplicate_entries_from_table(migrate_engine, TABLE_NAME,
True, *COLUMNS)
uc = UniqueConstraint(*COLUMNS, table=t, name=UC_NAME)
uc.create()
def downgrade(migrate_engine):
utils.drop_unique_constraint(migrate_engine, TABLE_NAME, UC_NAME, *COLUMNS)

View File

@@ -1064,6 +1064,9 @@ class AggregateMetadata(BASE, NovaBase):
"""Represents a metadata key/value pair for an aggregate."""
__tablename__ = 'aggregate_metadata'
__table_args__ = (
schema.UniqueConstraint("aggregate_id", "key", "deleted",
name="uniq_aggregate_metadata0aggregate_id0key0deleted"
),
Index('aggregate_metadata_key_idx', 'key'),
)
id = Column(Integer, primary_key=True)

View File

@@ -518,6 +518,25 @@ class AggregateDBApiTestCase(test.TestCase):
expected = db.aggregate_metadata_get(ctxt, result['id'])
self.assertThat(metadata, matchers.DictMatches(expected))
def test_aggregate_metadata_add_retry(self):
ctxt = context.get_admin_context()
result = _create_aggregate(context=ctxt, metadata=None)
def counted():
def get_query(context, id, session, read_deleted):
get_query.counter += 1
raise db_exc.DBDuplicateEntry
get_query.counter = 0
return get_query
get_query = counted()
self.stubs.Set(sqlalchemy_api,
'_aggregate_metadata_get_query', get_query)
self.assertRaises(db_exc.DBDuplicateEntry, sqlalchemy_api.
aggregate_metadata_add, ctxt, result['id'], {},
max_retries=5)
self.assertEqual(get_query.counter, 5)
def test_aggregate_metadata_update(self):
ctxt = context.get_admin_context()
result = _create_aggregate(context=ctxt)

View File

@@ -2705,6 +2705,50 @@ class TestNovaMigrations(BaseMigrationTestCase, CommonTestsMixIn):
self.assertNotIn(('project_user_quotas_user_id_deleted_idx',
['user_id', 'deleted']), index_data)
def _pre_upgrade_211(self, engine):
fake_aggregates = [{'id': 7, 'name': 'name1'},
{'id': 8, 'name': 'name2'}]
aggregates = db_utils.get_table(engine, 'aggregates')
engine.execute(aggregates.insert(), fake_aggregates)
metadata = db_utils.get_table(engine, 'aggregate_metadata')
data = [
{'aggregate_id': 7, 'key': 'availability_zone',
'value': 'custom_az1', 'deleted': 0},
{'aggregate_id': 7, 'key': 'availability_zone',
'value': 'custom_az2', 'deleted': 0},
{'aggregate_id': 8, 'key': 'availability_zone',
'value': 'custom_az3', 'deleted': 0},
]
for item in data:
metadata.insert().values(item).execute()
return data
def _check_211(self, engine, data):
metadata = db_utils.get_table(engine, 'aggregate_metadata')
def get_(aggrid, deleted):
deleted_value = 0 if not deleted else metadata.c.id
return metadata.select().\
where(metadata.c.aggregate_id == aggrid).\
where(metadata.c.deleted == deleted_value).\
execute().\
fetchall()
self.assertEqual(1, len(get_(7, False)))
self.assertEqual(1, len(get_(7, True)))
self.assertEqual(1, len(get_(8, False)))
self.assertRaises(sqlalchemy.exc.IntegrityError,
metadata.insert().execute,
{'aggregate_id': 7, 'key': 'availability_zone',
'value': 'az4', 'deleted': 0})
def _post_downgrade_211(self, engine):
metadata = db_utils.get_table(engine, 'aggregate_metadata')
data = {'aggregate_id': 8, 'key': 'availability_zone',
'value': 'az', 'deleted': 0}
metadata.insert().values(data).execute()
self.assertIsNotNone(metadata.insert().values(data).execute())
class TestBaremetalMigrations(BaseMigrationTestCase, CommonTestsMixIn):
"""Test sqlalchemy-migrate migrations."""