enginefacade: 'migration'
Use enginefacade in 'migration' section. New 'select_db_reader_mode' decorator was introduced. It is needed to select and set synchronous or asynchronous reader mode to object's functions. The transaction mode will be passed to sqlalchemy layer and correctly supported there by enginefacade allow_async property. New decorator uses 'use_slave' parameter which is now removed from db and sqlalchemy layers. Implements: blueprint new-oslodb-enginefacade Co-Authored-By: Sergey Nikitin <snikitin@mirantis.com> Change-Id: I339a50e53652e5690ece33ee711fbea2cca61f65
This commit is contained in:
parent
997bec1bb9
commit
6eb1c0e1bc
|
@ -89,6 +89,18 @@ def not_equal(*values):
|
|||
###################
|
||||
|
||||
|
||||
def select_db_reader_mode(f):
|
||||
"""Decorator to select synchronous or asynchronous reader mode.
|
||||
|
||||
The kwarg argument 'use_slave' defines reader mode. Asynchronous reader
|
||||
will be used if 'use_slave' is True and synchronous reader otherwise.
|
||||
"""
|
||||
return IMPL.select_db_reader_mode(f)
|
||||
|
||||
|
||||
###################
|
||||
|
||||
|
||||
def service_destroy(context, service_id):
|
||||
"""Destroy the service or raise if it does not exist."""
|
||||
return IMPL.service_destroy(context, service_id)
|
||||
|
@ -470,12 +482,12 @@ def migration_get_by_instance_and_status(context, instance_uuid, status):
|
|||
|
||||
|
||||
def migration_get_unconfirmed_by_dest_compute(context, confirm_window,
|
||||
dest_compute, use_slave=False):
|
||||
dest_compute):
|
||||
"""Finds all unconfirmed migrations within the confirmation window for
|
||||
a specific destination compute host.
|
||||
"""
|
||||
return IMPL.migration_get_unconfirmed_by_dest_compute(context,
|
||||
confirm_window, dest_compute, use_slave=use_slave)
|
||||
confirm_window, dest_compute)
|
||||
|
||||
|
||||
def migration_get_in_progress_by_host_and_node(context, host, node):
|
||||
|
|
|
@ -21,6 +21,7 @@ import collections
|
|||
import copy
|
||||
import datetime
|
||||
import functools
|
||||
import inspect
|
||||
import sys
|
||||
import uuid
|
||||
|
||||
|
@ -64,6 +65,7 @@ from nova.db.sqlalchemy import models
|
|||
from nova import exception
|
||||
from nova.i18n import _, _LI, _LE, _LW
|
||||
from nova import quota
|
||||
from nova import safe_utils
|
||||
|
||||
db_opts = [
|
||||
cfg.StrOpt('osapi_compute_unique_server_name_scope',
|
||||
|
@ -232,6 +234,34 @@ def require_aggregate_exists(f):
|
|||
return wrapper
|
||||
|
||||
|
||||
def select_db_reader_mode(f):
|
||||
"""Decorator to select synchronous or asynchronous reader mode.
|
||||
|
||||
The kwarg argument 'use_slave' defines reader mode. Asynchronous reader
|
||||
will be used if 'use_slave' is True and synchronous reader otherwise.
|
||||
If 'use_slave' is not specified default value 'False' will be used.
|
||||
|
||||
Wrapped function must have a context in the arguments.
|
||||
"""
|
||||
|
||||
@functools.wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
wrapped_func = safe_utils.get_wrapped_function(f)
|
||||
keyed_args = inspect.getcallargs(wrapped_func, *args, **kwargs)
|
||||
|
||||
context = keyed_args['context']
|
||||
use_slave = keyed_args.get('use_slave', False)
|
||||
|
||||
if use_slave:
|
||||
reader_mode = main_context_manager.async
|
||||
else:
|
||||
reader_mode = main_context_manager.reader
|
||||
|
||||
with reader_mode.using(context):
|
||||
return f(*args, **kwargs)
|
||||
return wrapper
|
||||
|
||||
|
||||
def model_query(context, model,
|
||||
args=None,
|
||||
session=None,
|
||||
|
@ -4441,25 +4471,25 @@ def project_get_networks(context, project_id, associate=True):
|
|||
###################
|
||||
|
||||
|
||||
@main_context_manager.writer
|
||||
def migration_create(context, values):
|
||||
migration = models.Migration()
|
||||
migration.update(values)
|
||||
migration.save()
|
||||
migration.save(context.session)
|
||||
return migration
|
||||
|
||||
|
||||
@main_context_manager.writer
|
||||
def migration_update(context, id, values):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
migration = _migration_get(context, id, session=session)
|
||||
migration.update(values)
|
||||
migration = migration_get(context, id)
|
||||
migration.update(values)
|
||||
|
||||
return migration
|
||||
|
||||
|
||||
def _migration_get(context, id, session=None):
|
||||
result = model_query(context, models.Migration, session=session,
|
||||
read_deleted="yes").\
|
||||
@main_context_manager.reader
|
||||
def migration_get(context, id):
|
||||
result = model_query(context, models.Migration, read_deleted="yes").\
|
||||
filter_by(id=id).\
|
||||
first()
|
||||
|
||||
|
@ -4469,10 +4499,7 @@ def _migration_get(context, id, session=None):
|
|||
return result
|
||||
|
||||
|
||||
def migration_get(context, id):
|
||||
return _migration_get(context, id)
|
||||
|
||||
|
||||
@main_context_manager.reader
|
||||
def migration_get_by_instance_and_status(context, instance_uuid, status):
|
||||
result = model_query(context, models.Migration, read_deleted="yes").\
|
||||
filter_by(instance_uuid=instance_uuid).\
|
||||
|
@ -4486,19 +4513,20 @@ def migration_get_by_instance_and_status(context, instance_uuid, status):
|
|||
return result
|
||||
|
||||
|
||||
@main_context_manager.reader.allow_async
|
||||
def migration_get_unconfirmed_by_dest_compute(context, confirm_window,
|
||||
dest_compute, use_slave=False):
|
||||
dest_compute):
|
||||
confirm_window = (timeutils.utcnow() -
|
||||
datetime.timedelta(seconds=confirm_window))
|
||||
|
||||
return model_query(context, models.Migration, read_deleted="yes",
|
||||
use_slave=use_slave).\
|
||||
return model_query(context, models.Migration, read_deleted="yes").\
|
||||
filter(models.Migration.updated_at <= confirm_window).\
|
||||
filter_by(status="finished").\
|
||||
filter_by(dest_compute=dest_compute).\
|
||||
all()
|
||||
|
||||
|
||||
@main_context_manager.reader
|
||||
def migration_get_in_progress_by_host_and_node(context, host, node):
|
||||
|
||||
return model_query(context, models.Migration).\
|
||||
|
@ -4512,6 +4540,7 @@ def migration_get_in_progress_by_host_and_node(context, host, node):
|
|||
all()
|
||||
|
||||
|
||||
@main_context_manager.reader
|
||||
def migration_get_all_by_filters(context, filters):
|
||||
query = model_query(context, models.Migration)
|
||||
if "status" in filters:
|
||||
|
|
|
@ -134,10 +134,17 @@ class MigrationList(base.ObjectListBase, base.NovaObject):
|
|||
'objects': fields.ListOfObjectsField('Migration'),
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
@db.select_db_reader_mode
|
||||
def _db_migration_get_unconfirmed_by_dest_compute(
|
||||
context, confirm_window, dest_compute, use_slave=False):
|
||||
return db.migration_get_unconfirmed_by_dest_compute(
|
||||
context, confirm_window, dest_compute)
|
||||
|
||||
@base.remotable_classmethod
|
||||
def get_unconfirmed_by_dest_compute(cls, context, confirm_window,
|
||||
dest_compute, use_slave=False):
|
||||
db_migrations = db.migration_get_unconfirmed_by_dest_compute(
|
||||
db_migrations = cls._db_migration_get_unconfirmed_by_dest_compute(
|
||||
context, confirm_window, dest_compute, use_slave=use_slave)
|
||||
return base.obj_make_list(context, cls(context), objects.Migration,
|
||||
db_migrations)
|
||||
|
|
|
@ -28,6 +28,7 @@ import netaddr
|
|||
from oslo_config import cfg
|
||||
from oslo_db import api as oslo_db_api
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_db.sqlalchemy import enginefacade
|
||||
from oslo_db.sqlalchemy import test_base
|
||||
from oslo_db.sqlalchemy import update_match
|
||||
from oslo_db.sqlalchemy import utils as sqlalchemyutils
|
||||
|
@ -180,6 +181,58 @@ class DecoratorTestCase(test.TestCase):
|
|||
self._test_decorator_wraps_helper(
|
||||
oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True))
|
||||
|
||||
@mock.patch.object(enginefacade._TransactionContextManager, 'using')
|
||||
@mock.patch.object(enginefacade._TransactionContextManager, '_clone')
|
||||
def test_select_db_reader_mode_select_sync(self, mock_clone, mock_using):
|
||||
|
||||
@db.select_db_reader_mode
|
||||
def func(self, context, value, use_slave=False):
|
||||
pass
|
||||
|
||||
mock_clone.return_value = enginefacade._TransactionContextManager(
|
||||
mode=enginefacade._READER)
|
||||
ctxt = context.get_admin_context()
|
||||
value = 'some_value'
|
||||
func(self, ctxt, value)
|
||||
|
||||
mock_clone.assert_called_once_with(mode=enginefacade._READER)
|
||||
mock_using.assert_called_once_with(ctxt)
|
||||
|
||||
@mock.patch.object(enginefacade._TransactionContextManager, 'using')
|
||||
@mock.patch.object(enginefacade._TransactionContextManager, '_clone')
|
||||
def test_select_db_reader_mode_select_async(self, mock_clone, mock_using):
|
||||
|
||||
@db.select_db_reader_mode
|
||||
def func(self, context, value, use_slave=False):
|
||||
pass
|
||||
|
||||
mock_clone.return_value = enginefacade._TransactionContextManager(
|
||||
mode=enginefacade._ASYNC_READER)
|
||||
ctxt = context.get_admin_context()
|
||||
value = 'some_value'
|
||||
func(self, ctxt, value, use_slave=True)
|
||||
|
||||
mock_clone.assert_called_once_with(mode=enginefacade._ASYNC_READER)
|
||||
mock_using.assert_called_once_with(ctxt)
|
||||
|
||||
@mock.patch.object(enginefacade._TransactionContextManager, 'using')
|
||||
@mock.patch.object(enginefacade._TransactionContextManager, '_clone')
|
||||
def test_select_db_reader_mode_no_use_slave_select_sync(self, mock_clone,
|
||||
mock_using):
|
||||
|
||||
@db.select_db_reader_mode
|
||||
def func(self, context, value):
|
||||
pass
|
||||
|
||||
mock_clone.return_value = enginefacade._TransactionContextManager(
|
||||
mode=enginefacade._READER)
|
||||
ctxt = context.get_admin_context()
|
||||
value = 'some_value'
|
||||
func(self, ctxt, value)
|
||||
|
||||
mock_clone.assert_called_once_with(mode=enginefacade._READER)
|
||||
mock_using.assert_called_once_with(ctxt)
|
||||
|
||||
|
||||
def _get_fake_aggr_values():
|
||||
return {'name': 'fake_aggregate'}
|
||||
|
|
|
@ -150,8 +150,7 @@ class _TestMigrationObject(object):
|
|||
self.mox.StubOutWithMock(
|
||||
db, 'migration_get_unconfirmed_by_dest_compute')
|
||||
db.migration_get_unconfirmed_by_dest_compute(
|
||||
ctxt, 'window', 'foo',
|
||||
use_slave=False).AndReturn(db_migrations)
|
||||
ctxt, 'window', 'foo').AndReturn(db_migrations)
|
||||
self.mox.ReplayAll()
|
||||
migrations = (
|
||||
migration.MigrationList.get_unconfirmed_by_dest_compute(
|
||||
|
|
Loading…
Reference in New Issue