Merge "Make Aggregate metadata functions work with API db"
This commit is contained in:
@@ -14,6 +14,7 @@
|
|||||||
|
|
||||||
from oslo_db import exception as db_exc
|
from oslo_db import exception as db_exc
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
|
from oslo_utils import excutils
|
||||||
from oslo_utils import uuidutils
|
from oslo_utils import uuidutils
|
||||||
from sqlalchemy.orm import contains_eager
|
from sqlalchemy.orm import contains_eager
|
||||||
from sqlalchemy.orm import joinedload
|
from sqlalchemy.orm import joinedload
|
||||||
@@ -23,6 +24,7 @@ from nova import db
|
|||||||
from nova.db.sqlalchemy import api as db_api
|
from nova.db.sqlalchemy import api as db_api
|
||||||
from nova.db.sqlalchemy import api_models
|
from nova.db.sqlalchemy import api_models
|
||||||
from nova import exception
|
from nova import exception
|
||||||
|
from nova.i18n import _
|
||||||
from nova import objects
|
from nova import objects
|
||||||
from nova.objects import base
|
from nova.objects import base
|
||||||
from nova.objects import fields
|
from nova.objects import fields
|
||||||
@@ -93,6 +95,70 @@ def _host_delete_from_db(context, aggregate_id, host):
|
|||||||
host=host)
|
host=host)
|
||||||
|
|
||||||
|
|
||||||
|
def _metadata_add_to_db(context, aggregate_id, metadata, max_retries=10,
|
||||||
|
set_delete=False):
|
||||||
|
all_keys = metadata.keys()
|
||||||
|
for attempt in range(max_retries):
|
||||||
|
try:
|
||||||
|
with db_api.api_context_manager.writer.using(context):
|
||||||
|
query = context.session.query(api_models.AggregateMetadata).\
|
||||||
|
filter_by(aggregate_id=aggregate_id)
|
||||||
|
|
||||||
|
if set_delete:
|
||||||
|
query.filter(~api_models.AggregateMetadata.key.
|
||||||
|
in_(all_keys)).\
|
||||||
|
delete(synchronize_session=False)
|
||||||
|
|
||||||
|
already_existing_keys = set()
|
||||||
|
if all_keys:
|
||||||
|
query = query.filter(
|
||||||
|
api_models.AggregateMetadata.key.in_(all_keys))
|
||||||
|
for meta_ref in query.all():
|
||||||
|
key = meta_ref.key
|
||||||
|
meta_ref.update({"value": metadata[key]})
|
||||||
|
already_existing_keys.add(key)
|
||||||
|
|
||||||
|
new_entries = []
|
||||||
|
for key, value in metadata.items():
|
||||||
|
if key in already_existing_keys:
|
||||||
|
continue
|
||||||
|
new_entries.append({"key": key,
|
||||||
|
"value": value,
|
||||||
|
"aggregate_id": aggregate_id})
|
||||||
|
if new_entries:
|
||||||
|
context.session.execute(
|
||||||
|
api_models.AggregateMetadata.__table__.insert(),
|
||||||
|
new_entries)
|
||||||
|
|
||||||
|
return metadata
|
||||||
|
except db_exc.DBDuplicateEntry:
|
||||||
|
# a concurrent transaction has been committed,
|
||||||
|
# try again unless this was the last attempt
|
||||||
|
with excutils.save_and_reraise_exception() as ctxt:
|
||||||
|
if attempt < max_retries - 1:
|
||||||
|
ctxt.reraise = False
|
||||||
|
else:
|
||||||
|
msg = _("Add metadata failed for aggregate %(id)s "
|
||||||
|
"after %(retries)s retries") % \
|
||||||
|
{"id": aggregate_id, "retries": max_retries}
|
||||||
|
LOG.warning(msg)
|
||||||
|
|
||||||
|
|
||||||
|
@db_api.api_context_manager.writer
|
||||||
|
def _metadata_delete_from_db(context, aggregate_id, key):
|
||||||
|
# Check to see if the aggregate exists
|
||||||
|
_aggregate_get_from_db(context, aggregate_id)
|
||||||
|
|
||||||
|
query = context.session.query(api_models.AggregateMetadata)
|
||||||
|
query = query.filter(api_models.AggregateMetadata.aggregate_id ==
|
||||||
|
aggregate_id)
|
||||||
|
count = query.filter_by(key=key).delete()
|
||||||
|
|
||||||
|
if count == 0:
|
||||||
|
raise exception.AggregateMetadataNotFound(
|
||||||
|
aggregate_id=aggregate_id, metadata_key=key)
|
||||||
|
|
||||||
|
|
||||||
@base.NovaObjectRegistry.register
|
@base.NovaObjectRegistry.register
|
||||||
class Aggregate(base.NovaPersistentObject, base.NovaObject):
|
class Aggregate(base.NovaPersistentObject, base.NovaObject):
|
||||||
# Version 1.0: Initial version
|
# Version 1.0: Initial version
|
||||||
@@ -237,6 +303,13 @@ class Aggregate(base.NovaPersistentObject, base.NovaObject):
|
|||||||
|
|
||||||
@base.remotable
|
@base.remotable
|
||||||
def update_metadata(self, updates):
|
def update_metadata(self, updates):
|
||||||
|
if self.in_api:
|
||||||
|
metadata_delete = _metadata_delete_from_db
|
||||||
|
metadata_add = _metadata_add_to_db
|
||||||
|
else:
|
||||||
|
metadata_delete = db.aggregate_metadata_delete
|
||||||
|
metadata_add = db.aggregate_metadata_add
|
||||||
|
|
||||||
payload = {'aggregate_id': self.id,
|
payload = {'aggregate_id': self.id,
|
||||||
'meta_data': updates}
|
'meta_data': updates}
|
||||||
compute_utils.notify_about_aggregate_update(self._context,
|
compute_utils.notify_about_aggregate_update(self._context,
|
||||||
@@ -246,7 +319,7 @@ class Aggregate(base.NovaPersistentObject, base.NovaObject):
|
|||||||
for key, value in updates.items():
|
for key, value in updates.items():
|
||||||
if value is None:
|
if value is None:
|
||||||
try:
|
try:
|
||||||
db.aggregate_metadata_delete(self._context, self.id, key)
|
metadata_delete(self._context, self.id, key)
|
||||||
except exception.AggregateMetadataNotFound:
|
except exception.AggregateMetadataNotFound:
|
||||||
pass
|
pass
|
||||||
try:
|
try:
|
||||||
@@ -256,7 +329,7 @@ class Aggregate(base.NovaPersistentObject, base.NovaObject):
|
|||||||
else:
|
else:
|
||||||
to_add[key] = value
|
to_add[key] = value
|
||||||
self.metadata[key] = value
|
self.metadata[key] = value
|
||||||
db.aggregate_metadata_add(self._context, self.id, to_add)
|
metadata_add(self._context, self.id, to_add)
|
||||||
compute_utils.notify_about_aggregate_update(self._context,
|
compute_utils.notify_about_aggregate_update(self._context,
|
||||||
"updatemetadata.end",
|
"updatemetadata.end",
|
||||||
payload)
|
payload)
|
||||||
|
|||||||
@@ -10,6 +10,9 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
from copy import deepcopy
|
||||||
|
import mock
|
||||||
|
from oslo_db import exception as db_exc
|
||||||
from oslo_utils import timeutils
|
from oslo_utils import timeutils
|
||||||
|
|
||||||
from nova import context
|
from nova import context
|
||||||
@@ -19,6 +22,7 @@ from nova.db.sqlalchemy import api_models
|
|||||||
from nova import exception
|
from nova import exception
|
||||||
from nova import test
|
from nova import test
|
||||||
from nova.tests import fixtures
|
from nova.tests import fixtures
|
||||||
|
from nova.tests.unit import matchers
|
||||||
from nova.tests import uuidsentinel
|
from nova.tests import uuidsentinel
|
||||||
|
|
||||||
import nova.objects.aggregate as aggregate_obj
|
import nova.objects.aggregate as aggregate_obj
|
||||||
@@ -95,6 +99,16 @@ def _aggregate_host_get_all(context, aggregate_id):
|
|||||||
filter_by(aggregate_id=aggregate_id).all()
|
filter_by(aggregate_id=aggregate_id).all()
|
||||||
|
|
||||||
|
|
||||||
|
@db_api.api_context_manager.reader
|
||||||
|
def _aggregate_metadata_get_all(context, aggregate_id):
|
||||||
|
results = context.session.query(api_models.AggregateMetadata).\
|
||||||
|
filter_by(aggregate_id=aggregate_id).all()
|
||||||
|
metadata = {}
|
||||||
|
for r in results:
|
||||||
|
metadata[r['key']] = r['value']
|
||||||
|
return metadata
|
||||||
|
|
||||||
|
|
||||||
class AggregateObjectDbTestCase(test.NoDBTestCase):
|
class AggregateObjectDbTestCase(test.NoDBTestCase):
|
||||||
|
|
||||||
USES_DB_SELF = True
|
USES_DB_SELF = True
|
||||||
@@ -251,3 +265,78 @@ class AggregateObjectDbTestCase(test.NoDBTestCase):
|
|||||||
aggregate_obj._host_delete_from_db,
|
aggregate_obj._host_delete_from_db,
|
||||||
self.context, result['id'],
|
self.context, result['id'],
|
||||||
_get_fake_hosts(1)[0])
|
_get_fake_hosts(1)[0])
|
||||||
|
|
||||||
|
def test_aggregate_metadata_add(self):
|
||||||
|
result = _create_aggregate(self.context, metadata=None)
|
||||||
|
metadata = deepcopy(_get_fake_metadata(1))
|
||||||
|
aggregate_obj._metadata_add_to_db(self.context, result['id'], metadata)
|
||||||
|
expected = _aggregate_metadata_get_all(self.context, result['id'])
|
||||||
|
self.assertThat(metadata, matchers.DictMatches(expected))
|
||||||
|
|
||||||
|
def test_aggregate_metadata_add_empty_metadata(self):
|
||||||
|
result = _create_aggregate(self.context, metadata=None)
|
||||||
|
metadata = {}
|
||||||
|
aggregate_obj._metadata_add_to_db(self.context, result['id'], metadata)
|
||||||
|
expected = _aggregate_metadata_get_all(self.context, result['id'])
|
||||||
|
self.assertThat(metadata, matchers.DictMatches(expected))
|
||||||
|
|
||||||
|
def test_aggregate_metadata_add_and_update(self):
|
||||||
|
result = _create_aggregate(self.context)
|
||||||
|
metadata = deepcopy(_get_fake_metadata(1))
|
||||||
|
key = list(metadata.keys())[0]
|
||||||
|
new_metadata = {key: 'foo',
|
||||||
|
'fake_new_key': 'fake_new_value'}
|
||||||
|
metadata.update(new_metadata)
|
||||||
|
aggregate_obj._metadata_add_to_db(self.context,
|
||||||
|
result['id'], new_metadata)
|
||||||
|
expected = _aggregate_metadata_get_all(self.context, result['id'])
|
||||||
|
self.assertThat(metadata, matchers.DictMatches(expected))
|
||||||
|
|
||||||
|
def test_aggregate_metadata_add_retry(self):
|
||||||
|
result = _create_aggregate(self.context, metadata=None)
|
||||||
|
with mock.patch('nova.db.sqlalchemy.api_models.'
|
||||||
|
'AggregateMetadata.__table__.insert') as insert_mock:
|
||||||
|
insert_mock.side_effect = db_exc.DBDuplicateEntry
|
||||||
|
self.assertRaises(db_exc.DBDuplicateEntry,
|
||||||
|
aggregate_obj._metadata_add_to_db,
|
||||||
|
self.context,
|
||||||
|
result['id'],
|
||||||
|
{'fake_key2': 'fake_value2'},
|
||||||
|
max_retries=5)
|
||||||
|
|
||||||
|
def test_aggregate_metadata_update(self):
|
||||||
|
result = _create_aggregate(self.context)
|
||||||
|
metadata = deepcopy(_get_fake_metadata(1))
|
||||||
|
key = list(metadata.keys())[0]
|
||||||
|
aggregate_obj._metadata_delete_from_db(self.context, result['id'], key)
|
||||||
|
new_metadata = {key: 'foo'}
|
||||||
|
aggregate_obj._metadata_add_to_db(self.context,
|
||||||
|
result['id'], new_metadata)
|
||||||
|
expected = _aggregate_metadata_get_all(self.context, result['id'])
|
||||||
|
metadata[key] = 'foo'
|
||||||
|
self.assertThat(metadata, matchers.DictMatches(expected))
|
||||||
|
|
||||||
|
def test_aggregate_metadata_delete(self):
|
||||||
|
result = _create_aggregate(self.context, metadata=None)
|
||||||
|
metadata = deepcopy(_get_fake_metadata(1))
|
||||||
|
aggregate_obj._metadata_add_to_db(self.context, result['id'], metadata)
|
||||||
|
aggregate_obj._metadata_delete_from_db(self.context, result['id'],
|
||||||
|
list(metadata.keys())[0])
|
||||||
|
expected = _aggregate_metadata_get_all(self.context, result['id'])
|
||||||
|
del metadata[list(metadata.keys())[0]]
|
||||||
|
self.assertThat(metadata, matchers.DictMatches(expected))
|
||||||
|
|
||||||
|
def test_aggregate_remove_availability_zone(self):
|
||||||
|
result = _create_aggregate(self.context, metadata={'availability_zone':
|
||||||
|
'fake_avail_zone'})
|
||||||
|
aggregate_obj._metadata_delete_from_db(self.context,
|
||||||
|
result['id'],
|
||||||
|
'availability_zone')
|
||||||
|
aggr = aggregate_obj._aggregate_get_from_db(self.context, result['id'])
|
||||||
|
self.assertIsNone(aggr['availability_zone'])
|
||||||
|
|
||||||
|
def test_aggregate_metadata_delete_raise_not_found(self):
|
||||||
|
result = _create_aggregate(self.context)
|
||||||
|
self.assertRaises(exception.AggregateMetadataNotFound,
|
||||||
|
aggregate_obj._metadata_delete_from_db,
|
||||||
|
self.context, result['id'], 'foo_key')
|
||||||
|
|||||||
@@ -162,9 +162,15 @@ class _TestAggregateObject(object):
|
|||||||
self.assertRaises(exception.ObjectActionError,
|
self.assertRaises(exception.ObjectActionError,
|
||||||
agg.save)
|
agg.save)
|
||||||
|
|
||||||
@mock.patch.object(db, 'aggregate_metadata_delete')
|
@mock.patch('nova.objects.aggregate._metadata_delete_from_db')
|
||||||
@mock.patch.object(db, 'aggregate_metadata_add')
|
@mock.patch('nova.objects.aggregate._metadata_add_to_db')
|
||||||
def test_update_metadata(self, mock_add, mock_delete):
|
@mock.patch('nova.db.aggregate_metadata_delete')
|
||||||
|
@mock.patch('nova.db.aggregate_metadata_add')
|
||||||
|
def test_update_metadata(self,
|
||||||
|
mock_metadata_add,
|
||||||
|
mock_metadata_delete,
|
||||||
|
mock_api_metadata_add,
|
||||||
|
mock_api_metadata_delete):
|
||||||
fake_notifier.NOTIFICATIONS = []
|
fake_notifier.NOTIFICATIONS = []
|
||||||
agg = aggregate.Aggregate()
|
agg = aggregate.Aggregate()
|
||||||
agg._context = self.context
|
agg._context = self.context
|
||||||
@@ -182,10 +188,55 @@ class _TestAggregateObject(object):
|
|||||||
self.assertEqual({'todelete': None, 'toadd': 'myval'},
|
self.assertEqual({'todelete': None, 'toadd': 'myval'},
|
||||||
msg.payload['meta_data'])
|
msg.payload['meta_data'])
|
||||||
self.assertEqual({'foo': 'bar', 'toadd': 'myval'}, agg.metadata)
|
self.assertEqual({'foo': 'bar', 'toadd': 'myval'}, agg.metadata)
|
||||||
|
mock_metadata_add.assert_called_once_with(self.context, 123,
|
||||||
|
{'toadd': 'myval'})
|
||||||
|
mock_metadata_delete.assert_called_once_with(self.context, 123,
|
||||||
|
'todelete')
|
||||||
|
self.assertFalse(mock_api_metadata_add.called)
|
||||||
|
self.assertFalse(mock_api_metadata_delete.called)
|
||||||
|
|
||||||
mock_delete.assert_called_once_with(self.context, 123, 'todelete')
|
@mock.patch('nova.objects.Aggregate.in_api')
|
||||||
mock_add.assert_called_once_with(self.context, 123,
|
@mock.patch('nova.objects.aggregate._metadata_delete_from_db')
|
||||||
{'toadd': 'myval'})
|
@mock.patch('nova.objects.aggregate._metadata_add_to_db')
|
||||||
|
@mock.patch('nova.db.aggregate_metadata_delete')
|
||||||
|
@mock.patch('nova.db.aggregate_metadata_add')
|
||||||
|
def test_update_metadata_api(self,
|
||||||
|
mock_metadata_add,
|
||||||
|
mock_metadata_delete,
|
||||||
|
mock_api_metadata_add,
|
||||||
|
mock_api_metadata_delete,
|
||||||
|
mock_in_api):
|
||||||
|
mock_in_api.return_value = True
|
||||||
|
fake_notifier.NOTIFICATIONS = []
|
||||||
|
agg = aggregate.Aggregate()
|
||||||
|
agg._context = self.context
|
||||||
|
agg.id = 123
|
||||||
|
agg.metadata = {'foo': 'bar'}
|
||||||
|
agg.obj_reset_changes()
|
||||||
|
agg.update_metadata({'todelete': None, 'toadd': 'myval'})
|
||||||
|
self.assertEqual(2, len(fake_notifier.NOTIFICATIONS))
|
||||||
|
msg = fake_notifier.NOTIFICATIONS[0]
|
||||||
|
self.assertEqual('aggregate.updatemetadata.start', msg.event_type)
|
||||||
|
self.assertEqual({'todelete': None, 'toadd': 'myval'},
|
||||||
|
msg.payload['meta_data'])
|
||||||
|
msg = fake_notifier.NOTIFICATIONS[1]
|
||||||
|
self.assertEqual('aggregate.updatemetadata.end', msg.event_type)
|
||||||
|
self.assertEqual({'todelete': None, 'toadd': 'myval'},
|
||||||
|
msg.payload['meta_data'])
|
||||||
|
self.assertEqual({'foo': 'bar', 'toadd': 'myval'}, agg.metadata)
|
||||||
|
mock_api_metadata_delete.assert_called_once_with(self.context, 123,
|
||||||
|
'todelete')
|
||||||
|
mock_api_metadata_add.assert_called_once_with(self.context, 123,
|
||||||
|
{'toadd': 'myval'})
|
||||||
|
self.assertFalse(mock_metadata_add.called)
|
||||||
|
self.assertFalse(mock_metadata_delete.called)
|
||||||
|
|
||||||
|
mock_api_metadata_delete.assert_called_once_with(self.context,
|
||||||
|
123,
|
||||||
|
'todelete')
|
||||||
|
mock_api_metadata_add.assert_called_once_with(self.context,
|
||||||
|
123,
|
||||||
|
{'toadd': 'myval'})
|
||||||
|
|
||||||
@mock.patch.object(db, 'aggregate_delete')
|
@mock.patch.object(db, 'aggregate_delete')
|
||||||
def test_destroy(self, mock_aggregate_delete):
|
def test_destroy(self, mock_aggregate_delete):
|
||||||
|
|||||||
Reference in New Issue
Block a user