Eliminate lookup of model query hooks by name
By registering functions directly we cut off the dependency of the query hooks on the plugin. This is a step towards the goal of removing the CommonDbMixin mixin class. Also, we register all query hooks at plugin create (in __new__) instead of in the class definition (which caused the hooks to be registered on import). This ensures the query hooks are only registered for the plugins/mixins that are actually used. Since the query hooks are decoupled from the plugin, we remove them from the extension mixins (make them global in their module). This is a step towards refactoring all extension mixins for removal. Extra: In this patch we also remove the CommonDbMixinHooksFixture test fixture and instead just clear out the hooks after each test. Related-Blueprint: neutron-lib Change-Id: Ib6c2134d29e1764de627c3355f6cdee789d6301e
This commit is contained in:
parent
143a6e8546
commit
844033a5d1
|
@ -317,13 +317,3 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
|
|||
return [{'subnet_id': ip["subnet_id"],
|
||||
'ip_address': ip["ip_address"]}
|
||||
for ip in ips]
|
||||
|
||||
@staticmethod
|
||||
def _port_filter_hook(context, original_model, conditions):
|
||||
# Apply the port filter only in non-admin and non-advsvc context
|
||||
if db_utils.model_query_scope_is_project(context, original_model):
|
||||
conditions |= (models_v2.Port.network_id.in_(
|
||||
context.session.query(models_v2.Network.id).
|
||||
filter(context.project_id == models_v2.Network.project_id).
|
||||
subquery()))
|
||||
return conditions
|
||||
|
|
|
@ -109,6 +109,16 @@ def _update_subnetpool_dict(orig_pool, new_pool):
|
|||
return updated
|
||||
|
||||
|
||||
def _port_filter_hook(context, original_model, conditions):
|
||||
# Apply the port filter only in non-admin and non-advsvc context
|
||||
if ndb_utils.model_query_scope_is_project(context, original_model):
|
||||
conditions |= (models_v2.Port.network_id.in_(
|
||||
context.session.query(models_v2.Network.id).
|
||||
filter(context.project_id == models_v2.Network.project_id).
|
||||
subquery()))
|
||||
return conditions
|
||||
|
||||
|
||||
@registry.has_registry_receivers
|
||||
class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
|
||||
neutron_plugin_base_v2.NeutronPluginBaseV2,
|
||||
|
@ -129,6 +139,15 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
|
|||
__native_pagination_support = True
|
||||
__native_sorting_support = True
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
model_query.register_hook(
|
||||
models_v2.Port,
|
||||
"port",
|
||||
query_hook=None,
|
||||
filter_hook=_port_filter_hook,
|
||||
result_filters=None)
|
||||
return super(NeutronDbPluginV2, cls).__new__(cls, *args, **kwargs)
|
||||
|
||||
def __init__(self):
|
||||
self.set_ipam_backend()
|
||||
if cfg.CONF.notify_nova_on_port_status_changes:
|
||||
|
@ -1393,10 +1412,3 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
|
|||
device_id=device_id)
|
||||
if tenant_id != router['tenant_id']:
|
||||
raise n_exc.DeviceIDNotOwnedByTenant(device_id=device_id)
|
||||
|
||||
model_query.register_hook(
|
||||
models_v2.Port,
|
||||
"port",
|
||||
None,
|
||||
'_port_filter_hook',
|
||||
None)
|
||||
|
|
|
@ -39,41 +39,44 @@ from neutron.objects import network as net_obj
|
|||
DEVICE_OWNER_ROUTER_GW = constants.DEVICE_OWNER_ROUTER_GW
|
||||
|
||||
|
||||
def _network_filter_hook(context, original_model, conditions):
|
||||
if conditions is not None and not hasattr(conditions, '__iter__'):
|
||||
conditions = (conditions, )
|
||||
# Apply the external network filter only in non-admin and non-advsvc
|
||||
# context
|
||||
if db_utils.model_query_scope_is_project(context, original_model):
|
||||
# the table will already be joined to the rbac entries for the
|
||||
# shared check so we don't need to worry about ensuring that
|
||||
rbac_model = original_model.rbac_entries.property.mapper.class_
|
||||
tenant_allowed = (
|
||||
(rbac_model.action == 'access_as_external') &
|
||||
(rbac_model.target_tenant == context.tenant_id) |
|
||||
(rbac_model.target_tenant == '*'))
|
||||
conditions = expr.or_(tenant_allowed, *conditions)
|
||||
return conditions
|
||||
|
||||
|
||||
def _network_result_filter_hook(query, filters):
|
||||
vals = filters and filters.get(external_net.EXTERNAL, [])
|
||||
if not vals:
|
||||
return query
|
||||
if vals[0]:
|
||||
return query.filter(models_v2.Network.external.has())
|
||||
return query.filter(~models_v2.Network.external.has())
|
||||
|
||||
|
||||
@registry.has_registry_receivers
|
||||
class External_net_db_mixin(object):
|
||||
"""Mixin class to add external network methods to db_base_plugin_v2."""
|
||||
|
||||
@staticmethod
|
||||
def _network_filter_hook(context, original_model, conditions):
|
||||
if conditions is not None and not hasattr(conditions, '__iter__'):
|
||||
conditions = (conditions, )
|
||||
# Apply the external network filter only in non-admin and non-advsvc
|
||||
# context
|
||||
if db_utils.model_query_scope_is_project(context, original_model):
|
||||
# the table will already be joined to the rbac entries for the
|
||||
# shared check so we don't need to worry about ensuring that
|
||||
rbac_model = original_model.rbac_entries.property.mapper.class_
|
||||
tenant_allowed = (
|
||||
(rbac_model.action == 'access_as_external') &
|
||||
(rbac_model.target_tenant == context.tenant_id) |
|
||||
(rbac_model.target_tenant == '*'))
|
||||
conditions = expr.or_(tenant_allowed, *conditions)
|
||||
return conditions
|
||||
|
||||
def _network_result_filter_hook(self, query, filters):
|
||||
vals = filters and filters.get(external_net.EXTERNAL, [])
|
||||
if not vals:
|
||||
return query
|
||||
if vals[0]:
|
||||
return query.filter(models_v2.Network.external.has())
|
||||
return query.filter(~models_v2.Network.external.has())
|
||||
|
||||
model_query.register_hook(
|
||||
models_v2.Network,
|
||||
"external_net",
|
||||
None,
|
||||
'_network_filter_hook',
|
||||
'_network_result_filter_hook')
|
||||
def __new__(cls, *args, **kwargs):
|
||||
model_query.register_hook(
|
||||
models_v2.Network,
|
||||
"external_net",
|
||||
query_hook=None,
|
||||
filter_hook=_network_filter_hook,
|
||||
result_filters=_network_result_filter_hook)
|
||||
return super(External_net_db_mixin, cls).__new__(cls, *args, **kwargs)
|
||||
|
||||
def _network_is_external(self, context, net_id):
|
||||
return net_obj.ExternalNetwork.objects_exist(
|
||||
|
|
|
@ -25,28 +25,32 @@ from neutron.db import models_v2
|
|||
from neutron.db import portbindings_base
|
||||
|
||||
|
||||
def _port_model_hook(context, original_model, query):
|
||||
query = query.outerjoin(
|
||||
pmodels.PortBindingPort,
|
||||
(original_model.id == pmodels.PortBindingPort.port_id))
|
||||
return query
|
||||
|
||||
|
||||
def _port_result_filter_hook(query, filters):
|
||||
values = filters and filters.get(portbindings.HOST_ID, [])
|
||||
if not values:
|
||||
return query
|
||||
query = query.filter(pmodels.PortBindingPort.host.in_(values))
|
||||
return query
|
||||
|
||||
|
||||
class PortBindingMixin(portbindings_base.PortBindingBaseMixin):
|
||||
extra_binding_dict = None
|
||||
|
||||
def _port_model_hook(self, context, original_model, query):
|
||||
query = query.outerjoin(
|
||||
pmodels.PortBindingPort,
|
||||
(original_model.id == pmodels.PortBindingPort.port_id))
|
||||
return query
|
||||
|
||||
def _port_result_filter_hook(self, query, filters):
|
||||
values = filters and filters.get(portbindings.HOST_ID, [])
|
||||
if not values:
|
||||
return query
|
||||
query = query.filter(pmodels.PortBindingPort.host.in_(values))
|
||||
return query
|
||||
|
||||
model_query.register_hook(
|
||||
models_v2.Port,
|
||||
"portbindings_port",
|
||||
'_port_model_hook',
|
||||
None,
|
||||
'_port_result_filter_hook')
|
||||
def __new__(cls, *args, **kwargs):
|
||||
model_query.register_hook(
|
||||
models_v2.Port,
|
||||
"portbindings_port",
|
||||
query_hook=_port_model_hook,
|
||||
filter_hook=None,
|
||||
result_filters=_port_result_filter_hook)
|
||||
return super(PortBindingMixin, cls).__new__(cls, *args, **kwargs)
|
||||
|
||||
def _process_portbindings_create_and_update(self, context, port_data,
|
||||
port):
|
||||
|
|
|
@ -101,6 +101,14 @@ SERVICE_PLUGINS_REQUIRED_DRIVERS = {
|
|||
}
|
||||
|
||||
|
||||
def _ml2_port_result_filter_hook(query, filters):
|
||||
values = filters and filters.get(portbindings.HOST_ID, [])
|
||||
if not values:
|
||||
return query
|
||||
bind_criteria = models.PortBinding.host.in_(values)
|
||||
return query.filter(models_v2.Port.port_binding.has(bind_criteria))
|
||||
|
||||
|
||||
@registry.has_registry_receivers
|
||||
class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
dvr_mac_db.DVRDbMixin,
|
||||
|
@ -152,6 +160,15 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
self._aliases = aliases
|
||||
return self._aliases
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
model_query.register_hook(
|
||||
models_v2.Port,
|
||||
"ml2_port_bindings",
|
||||
query_hook=None,
|
||||
filter_hook=None,
|
||||
result_filters=_ml2_port_result_filter_hook)
|
||||
return super(Ml2Plugin, cls).__new__(cls, *args, **kwargs)
|
||||
|
||||
@resource_registry.tracked_resources(
|
||||
network=models_v2.Network,
|
||||
port=models_v2.Port,
|
||||
|
@ -621,25 +638,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
session = db_api.get_reader_session()
|
||||
return session
|
||||
|
||||
# Note - The following hook methods have "ml2" in their names so
|
||||
# that they are not called twice during unit tests due to global
|
||||
# registration of hooks in portbindings_db.py used by other
|
||||
# plugins.
|
||||
|
||||
def _ml2_port_result_filter_hook(self, query, filters):
|
||||
values = filters and filters.get(portbindings.HOST_ID, [])
|
||||
if not values:
|
||||
return query
|
||||
bind_criteria = models.PortBinding.host.in_(values)
|
||||
return query.filter(models_v2.Port.port_binding.has(bind_criteria))
|
||||
|
||||
model_query.register_hook(
|
||||
models_v2.Port,
|
||||
"ml2_port_bindings",
|
||||
None,
|
||||
None,
|
||||
'_ml2_port_result_filter_hook')
|
||||
|
||||
def _notify_port_updated(self, mech_context):
|
||||
port = mech_context.current
|
||||
segment = mech_context.bottom_bound_segment
|
||||
|
|
|
@ -56,6 +56,19 @@ class TagPlugin(common_db_mixin.CommonDbMixin, tag_ext.TagPluginBase):
|
|||
|
||||
supported_extension_aliases = ['tag', 'tag-ext']
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
inst = super(TagPlugin, cls).__new__(cls, *args, **kwargs)
|
||||
inst._filter_methods = [] # prevent GC of our partial functions
|
||||
for resource, model in resource_model_map.items():
|
||||
resource_extend.register_funcs(resource, [_extend_tags_dict])
|
||||
method = functools.partial(tag_methods.apply_tag_filters, model)
|
||||
inst._filter_methods.append(method)
|
||||
model_query.register_hook(model, "tag",
|
||||
query_hook=None,
|
||||
filter_hook=None,
|
||||
result_filters=method)
|
||||
return inst
|
||||
|
||||
def _get_resource(self, context, resource, resource_id):
|
||||
model = resource_model_map[resource]
|
||||
try:
|
||||
|
@ -124,15 +137,3 @@ class TagPlugin(common_db_mixin.CommonDbMixin, tag_ext.TagPluginBase):
|
|||
if not tag_obj.Tag.delete_objects(context,
|
||||
tag=tag, standard_attr_id=res.standard_attr_id):
|
||||
raise tag_ext.TagNotFound(tag=tag)
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
inst = super(TagPlugin, cls).__new__(cls, *args, **kwargs)
|
||||
inst._filter_methods = [] # prevent GC of our partial functions
|
||||
for resource, model in resource_model_map.items():
|
||||
resource_extend.register_funcs(
|
||||
resource, [_extend_tags_dict])
|
||||
method = functools.partial(tag_methods.apply_tag_filters, model)
|
||||
inst._filter_methods.append(method)
|
||||
model_query.register_hook(
|
||||
model, "tag", None, None, method)
|
||||
return inst
|
||||
|
|
|
@ -17,73 +17,91 @@ from oslo_utils import timeutils
|
|||
from sqlalchemy.orm import session as se
|
||||
|
||||
from neutron._i18n import _LW
|
||||
from neutron.db import _model_query as model_query
|
||||
from neutron.db import _resource_extend as resource_extend
|
||||
from neutron.db import api as db_api
|
||||
from neutron.db import standard_attr
|
||||
|
||||
CHANGED_SINCE = 'changed_since'
|
||||
TIME_FORMAT_WHOLE_SECONDS = '%Y-%m-%dT%H:%M:%S'
|
||||
|
||||
|
||||
def _change_since_result_filter_hook(query, filters):
|
||||
# this block is for change_since query
|
||||
# we get the changed_since string from filters.
|
||||
# And translate it from string to datetime type.
|
||||
# Then compare with the timestamp in db which has
|
||||
# datetime type.
|
||||
values = filters and filters.get(CHANGED_SINCE, [])
|
||||
if not values:
|
||||
return query
|
||||
data = filters[CHANGED_SINCE][0]
|
||||
try:
|
||||
changed_since_string = timeutils.parse_isotime(data)
|
||||
except Exception:
|
||||
msg = _LW("The input %s must be in the "
|
||||
"following format: YYYY-MM-DDTHH:MM:SSZ") % CHANGED_SINCE
|
||||
raise n_exc.InvalidInput(error_message=msg)
|
||||
changed_since = (timeutils.
|
||||
normalize_time(changed_since_string))
|
||||
target_model_class = query.column_descriptions[0]['type']
|
||||
query = query.join(standard_attr.StandardAttribute,
|
||||
target_model_class.standard_attr_id ==
|
||||
standard_attr.StandardAttribute.id).filter(
|
||||
standard_attr.StandardAttribute.updated_at
|
||||
>= changed_since)
|
||||
return query
|
||||
|
||||
|
||||
def _update_timestamp(session, context, instances):
|
||||
objs_list = session.new.union(session.dirty)
|
||||
|
||||
while objs_list:
|
||||
obj = objs_list.pop()
|
||||
if (isinstance(obj, standard_attr.HasStandardAttributes)
|
||||
and obj.standard_attr_id):
|
||||
obj.updated_at = timeutils.utcnow()
|
||||
|
||||
|
||||
def _format_timestamp(resource_db, result):
|
||||
result['created_at'] = (resource_db.created_at.
|
||||
strftime(TIME_FORMAT_WHOLE_SECONDS)) + 'Z'
|
||||
result['updated_at'] = (resource_db.updated_at.
|
||||
strftime(TIME_FORMAT_WHOLE_SECONDS)) + 'Z'
|
||||
|
||||
|
||||
def _extend_resource_dict_timestamp(plugin_obj, resource_res, resource_db):
|
||||
if (resource_db and resource_db.created_at and
|
||||
resource_db.updated_at):
|
||||
_format_timestamp(resource_db, resource_res)
|
||||
|
||||
|
||||
def _add_timestamp(mapper, _conn, target):
|
||||
if not target.created_at and not target.updated_at:
|
||||
time = timeutils.utcnow()
|
||||
for field in ['created_at', 'updated_at']:
|
||||
setattr(target, field, time)
|
||||
return target
|
||||
|
||||
|
||||
class TimeStamp_db_mixin(object):
|
||||
"""Mixin class to add Time Stamp methods."""
|
||||
|
||||
ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S'
|
||||
|
||||
def _change_since_result_filter_hook(self, query, filters):
|
||||
# this block is for change_since query
|
||||
# we get the changed_since string from filters.
|
||||
# And translate it from string to datetime type.
|
||||
# Then compare with the timestamp in db which has
|
||||
# datetime type.
|
||||
values = filters and filters.get(CHANGED_SINCE, [])
|
||||
if not values:
|
||||
return query
|
||||
data = filters[CHANGED_SINCE][0]
|
||||
try:
|
||||
changed_since_string = timeutils.parse_isotime(data)
|
||||
except Exception:
|
||||
msg = _LW("The input %s must be in the "
|
||||
"following format: YYYY-MM-DDTHH:MM:SSZ") % CHANGED_SINCE
|
||||
raise n_exc.InvalidInput(error_message=msg)
|
||||
changed_since = (timeutils.
|
||||
normalize_time(changed_since_string))
|
||||
target_model_class = query.column_descriptions[0]['type']
|
||||
query = query.join(standard_attr.StandardAttribute,
|
||||
target_model_class.standard_attr_id ==
|
||||
standard_attr.StandardAttribute.id).filter(
|
||||
standard_attr.StandardAttribute.updated_at
|
||||
>= changed_since)
|
||||
return query
|
||||
|
||||
def update_timestamp(self, session, context, instances):
|
||||
objs_list = session.new.union(session.dirty)
|
||||
|
||||
while objs_list:
|
||||
obj = objs_list.pop()
|
||||
if (isinstance(obj, standard_attr.HasStandardAttributes)
|
||||
and obj.standard_attr_id):
|
||||
obj.updated_at = timeutils.utcnow()
|
||||
def __new__(cls, *args, **kwargs):
|
||||
rs_model_maps = standard_attr.get_standard_attr_resource_model_map()
|
||||
for rsmap, model in rs_model_maps.items():
|
||||
resource_extend.register_funcs(
|
||||
rsmap, [_extend_resource_dict_timestamp])
|
||||
model_query.register_hook(
|
||||
model,
|
||||
"change_since_query",
|
||||
query_hook=None,
|
||||
filter_hook=None,
|
||||
result_filters=_change_since_result_filter_hook)
|
||||
return super(TimeStamp_db_mixin, cls).__new__(cls, *args, **kwargs)
|
||||
|
||||
def register_db_events(self):
|
||||
listen = db_api.sqla_listen
|
||||
listen(standard_attr.StandardAttribute, 'before_insert',
|
||||
self._add_timestamp)
|
||||
listen(se.Session, 'before_flush', self.update_timestamp)
|
||||
|
||||
def _format_timestamp(self, resource_db, result):
|
||||
result['created_at'] = (resource_db.created_at.
|
||||
strftime(self.ISO8601_TIME_FORMAT)) + 'Z'
|
||||
result['updated_at'] = (resource_db.updated_at.
|
||||
strftime(self.ISO8601_TIME_FORMAT)) + 'Z'
|
||||
|
||||
def extend_resource_dict_timestamp(self, plugin_obj,
|
||||
resource_res, resource_db):
|
||||
if (resource_db and resource_db.created_at and
|
||||
resource_db.updated_at):
|
||||
self._format_timestamp(resource_db, resource_res)
|
||||
|
||||
def _add_timestamp(self, mapper, _conn, target):
|
||||
if not target.created_at and not target.updated_at:
|
||||
time = timeutils.utcnow()
|
||||
for field in ['created_at', 'updated_at']:
|
||||
setattr(target, field, time)
|
||||
return target
|
||||
_add_timestamp)
|
||||
listen(se.Session, 'before_flush', _update_timestamp)
|
||||
|
|
|
@ -14,10 +14,7 @@
|
|||
|
||||
from neutron_lib.services import base as service_base
|
||||
|
||||
from neutron.db import _model_query as model_query
|
||||
from neutron.db import _resource_extend as resource_extend
|
||||
from neutron.db import models_v2
|
||||
from neutron.db import standard_attr
|
||||
from neutron.objects import base as base_obj
|
||||
from neutron.services.timestamp import timestamp_db as ts_db
|
||||
|
||||
|
@ -31,13 +28,6 @@ class TimeStampPlugin(service_base.ServicePluginBase,
|
|||
def __init__(self):
|
||||
super(TimeStampPlugin, self).__init__()
|
||||
self.register_db_events()
|
||||
rs_model_maps = standard_attr.get_standard_attr_resource_model_map()
|
||||
for rsmap, model in rs_model_maps.items():
|
||||
resource_extend.register_funcs(
|
||||
rsmap, [self.extend_resource_dict_timestamp])
|
||||
model_query.register_hook(
|
||||
model, "change_since_query", None, None,
|
||||
self._change_since_result_filter_hook)
|
||||
# TODO(jlibosva): Move this to register_model_query_hook
|
||||
base_obj.register_filter_hook_on_model(
|
||||
models_v2.SubnetPool, ts_db.CHANGED_SINCE)
|
||||
|
|
|
@ -45,6 +45,7 @@ from neutron.callbacks import manager as registry_manager
|
|||
from neutron.callbacks import registry
|
||||
from neutron.common import config
|
||||
from neutron.common import rpc as n_rpc
|
||||
from neutron.db import _model_query as model_query
|
||||
from neutron.db import agentschedulers_db
|
||||
from neutron.db import api as db_api
|
||||
from neutron import manager
|
||||
|
@ -165,11 +166,17 @@ class DietTestCase(base.BaseTestCase):
|
|||
# six before removing the cleanup callback from here.
|
||||
self.addCleanup(mock.patch.stopall)
|
||||
|
||||
self.addCleanup(self.reset_model_query_hooks)
|
||||
|
||||
self.addOnException(self.check_for_systemexit)
|
||||
self.orig_pid = os.getpid()
|
||||
|
||||
tools.reset_random_seed()
|
||||
|
||||
@staticmethod
|
||||
def reset_model_query_hooks():
|
||||
model_query._model_query_hooks = {}
|
||||
|
||||
def addOnException(self, handler):
|
||||
|
||||
def safe_handler(*args, **kwargs):
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import datetime
|
||||
import os
|
||||
import platform
|
||||
|
@ -35,7 +34,6 @@ import unittest2
|
|||
|
||||
from neutron.api.v2 import attributes
|
||||
from neutron.common import constants as n_const
|
||||
from neutron.db import _model_query as model_query
|
||||
from neutron.plugins.common import constants as p_const
|
||||
|
||||
|
||||
|
@ -123,17 +121,6 @@ class SafeCleanupFixture(fixtures.Fixture):
|
|||
self.addCleanup(cleanUp)
|
||||
|
||||
|
||||
class CommonDbMixinHooksFixture(fixtures.Fixture):
|
||||
def _setUp(self):
|
||||
self.original_hooks = model_query._model_query_hooks
|
||||
self.addCleanup(self.restore_hooks)
|
||||
model_query._model_query_hooks = copy.copy(
|
||||
model_query._model_query_hooks)
|
||||
|
||||
def restore_hooks(self):
|
||||
model_query._model_query_hooks = self.original_hooks
|
||||
|
||||
|
||||
def setup_mock_calls(mocked_call, expected_calls_and_values):
|
||||
"""A convenient method to setup a sequence of mock calls.
|
||||
|
||||
|
|
|
@ -14,7 +14,9 @@
|
|||
# under the License.
|
||||
|
||||
import mock
|
||||
from neutron_lib import constants
|
||||
from neutron_lib import context
|
||||
from neutron_lib.plugins import directory
|
||||
|
||||
from neutron.db import common_db_mixin
|
||||
from neutron.db import extraroute_db
|
||||
|
@ -30,9 +32,7 @@ class TestExtraRouteDb(testlib_api.SqlTestCase):
|
|||
def setUp(self):
|
||||
super(TestExtraRouteDb, self).setUp()
|
||||
self._plugin = _Plugin()
|
||||
get_plugin = mock.patch('neutron_lib.plugins.directory.get_plugin',
|
||||
return_value=self._plugin)
|
||||
get_plugin.start()
|
||||
directory.add_plugin(constants.CORE, self._plugin)
|
||||
|
||||
def test_update(self):
|
||||
ctx = context.get_admin_context()
|
||||
|
|
|
@ -21,6 +21,7 @@ from oslo_utils import uuidutils
|
|||
import testtools
|
||||
from webob import exc
|
||||
|
||||
from neutron.db import external_net_db
|
||||
from neutron.db import models_v2
|
||||
from neutron.extensions import external_net as external_net
|
||||
from neutron.tests.unit.api.v2 import test_base
|
||||
|
@ -131,26 +132,25 @@ class ExtNetDBTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
|
|||
self.assertFalse(result[0]['shared'])
|
||||
|
||||
def test_network_filter_hook_admin_context(self):
|
||||
plugin = directory.get_plugin()
|
||||
ctx = context.Context(None, None, is_admin=True)
|
||||
model = models_v2.Network
|
||||
conditions = plugin._network_filter_hook(ctx, model, [])
|
||||
conditions = external_net_db._network_filter_hook(ctx, model, [])
|
||||
self.assertEqual([], conditions)
|
||||
|
||||
def test_network_filter_hook_nonadmin_context(self):
|
||||
plugin = directory.get_plugin()
|
||||
ctx = context.Context('edinson', 'cavani')
|
||||
model = models_v2.Network
|
||||
txt = ("networkrbacs.action = :action_1 AND "
|
||||
"networkrbacs.target_tenant = :target_tenant_1 OR "
|
||||
"networkrbacs.target_tenant = :target_tenant_2")
|
||||
conditions = plugin._network_filter_hook(ctx, model, [])
|
||||
conditions = external_net_db._network_filter_hook(ctx, model, [])
|
||||
self.assertEqual(conditions.__str__(), txt)
|
||||
# Try to concatenate conditions
|
||||
txt2 = (txt.replace('tenant_1', 'tenant_3').
|
||||
replace('tenant_2', 'tenant_4').
|
||||
replace('action_1', 'action_2'))
|
||||
conditions = plugin._network_filter_hook(ctx, model, conditions)
|
||||
conditions = external_net_db._network_filter_hook(ctx, model,
|
||||
conditions)
|
||||
self.assertEqual(conditions.__str__(), "%s OR %s" % (txt, txt2))
|
||||
|
||||
def test_create_port_external_network_non_admin_fails(self):
|
||||
|
|
|
@ -1245,7 +1245,6 @@ class BaseDbObjectTestCase(_BaseObjectTestCase,
|
|||
test_db_base_plugin_v2.DbOperationBoundMixin):
|
||||
def setUp(self):
|
||||
super(BaseDbObjectTestCase, self).setUp()
|
||||
self.useFixture(tools.CommonDbMixinHooksFixture())
|
||||
synthetic_fields = self._get_object_synthetic_fields(self._test_class)
|
||||
for synth_field in synthetic_fields:
|
||||
objclass = self._get_ovo_object_class(self._test_class,
|
||||
|
@ -1514,9 +1513,9 @@ class BaseDbObjectTestCase(_BaseObjectTestCase,
|
|||
model_query.register_hook(
|
||||
self._test_class.db_model,
|
||||
'foo_filter',
|
||||
None,
|
||||
None,
|
||||
foo_filter)
|
||||
query_hook=None,
|
||||
filter_hook=None,
|
||||
result_filters=foo_filter)
|
||||
base.register_filter_hook_on_model(self._test_class.db_model, 'foo')
|
||||
|
||||
self._test_class.get_objects(self.context, foo=42)
|
||||
|
|
Loading…
Reference in New Issue