Merge "Finish the new DB engine facade migration"

This commit is contained in:
Zuul 2020-11-27 08:55:18 +00:00 committed by Gerrit Code Review
commit 97f578c3f2
28 changed files with 383 additions and 401 deletions

View File

@ -355,48 +355,6 @@ model, the nullable parameter is by default :code:`True`, while for OVO fields,
the nullable is set to :code:`False`. Make sure you correctly map database
column nullability properties to relevant object fields.
Database session activation
---------------------------
By default, all objects use old ``oslo.db`` engine facade. To enable the new
facade for a particular object, set ``new_facade`` class attribute to ``True``:
.. code-block:: Python
@obj_base.VersionedObjectRegistry.register
class ExampleObject(base.NeutronDbObject):
new_facade = True
It will make all OVO actions - ``get_object``, ``update``, ``count`` etc. - to
use new ``reader.using`` or ``writer.using`` decorators to manage database
transactions.
Whenever you need to open a new subtransaction in scope of OVO code, use the
following database session decorators:
.. code-block:: Python
@obj_base.VersionedObjectRegistry.register
class ExampleObject(base.NeutronDbObject):
@classmethod
def get_object(cls, context, **kwargs):
with cls.db_context_reader(context):
super(ExampleObject, cls).get_object(context, **kwargs)
# fetch more data in the same transaction
def create(self):
with self.db_context_writer(self.obj_context):
super(ExampleObject, self).create()
# apply more changes in the same transaction
``db_context_reader`` and ``db_context_writer`` decorators abstract the choice
of engine facade used for particular object from action implementation.
Alternatively, you can call all OVO actions under an active ``reader.using`` /
``writer.using`` context manager (or ``session.begin``). In this case, OVO will
pick the appropriate method to open a subtransaction.
Synthetic fields
----------------
:code:`synthetic_fields` is a list of fields, that are not directly backed by

View File

@ -31,6 +31,14 @@ def _noop_context_manager():
yield
def context_if_transaction(context, transaction, writer=True):
if transaction:
return (db_api.CONTEXT_WRITER.using(context) if writer else
db_api.CONTEXT_READER.using(context))
else:
return _noop_context_manager()
def safe_creation(context, create_fn, delete_fn, create_bindings,
transaction=True):
'''This function wraps logic of object creation in safe atomic way.
@ -55,12 +63,11 @@ def safe_creation(context, create_fn, delete_fn, create_bindings,
:param transaction: if true the whole operation will be wrapped in a
transaction. if false, no transaction will be used.
'''
cm = (db_api.CONTEXT_WRITER.using(context)
if transaction else _noop_context_manager())
with cm:
with context_if_transaction(context, transaction):
obj = create_fn()
try:
value = create_bindings(obj['id'])
updated_obj, value = create_bindings(obj['id'])
obj = updated_obj or obj
except Exception:
with excutils.save_and_reraise_exception():
try:

View File

@ -192,6 +192,7 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
'device_owner': [DEVICE_OWNER_FLOATINGIP]}
return {p['id'] for p in self._core_plugin.get_ports(context, filters)}
@db_api.CONTEXT_READER
def _get_router(self, context, router_id):
try:
router = model_query.get_by_id(
@ -227,7 +228,7 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
router['tenant_id'] = tenant_id
registry.notify(resources.ROUTER, events.BEFORE_CREATE,
self, context=context, router=router)
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
# pre-generate id so it will be available when
# configuring external gw port
router_db = l3_models.Router(
@ -245,10 +246,15 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
def _update_gw_for_create_router(self, context, gw_info, router_id):
if gw_info:
router_db = self._get_router(context, router_id)
with db_utils.context_if_transaction(
context, not context.session.is_active, writer=False):
router_db = self._get_router(context, router_id)
self._update_router_gw_info(context, router_id,
gw_info, router=router_db)
return self._get_router(context, router_id), None
return None, None
@db_api.retry_if_session_inactive()
def create_router(self, context, router):
r = router['router']
@ -288,9 +294,6 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
gw_info = r.pop(EXTERNAL_GW_INFO, constants.ATTR_NOT_SPECIFIED)
original = self.get_router(context, id)
if gw_info != constants.ATTR_NOT_SPECIFIED:
# Update the gateway outside of the DB update since it involves L2
# calls that don't make sense to rollback and may cause deadlocks
# in a transaction.
self._update_router_gw_info(context, id, gw_info)
router_db = self._update_router_db(context, id, r)
updated = self._make_router_dict(router_db)
@ -308,6 +311,13 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
'device_owner': DEVICE_OWNER_ROUTER_GW,
'admin_state_up': True,
'name': ''}
if context.session.is_active:
# TODO(ralonsoh): ML2 plugin "create_port" should be called outside
# a DB transaction. In this case an exception is made but in order
# to prevent future errors, this call should be moved outside
# the current transaction.
context.GUARD_TRANSACTION = False
gw_port = plugin_utils.create_port(
self._core_plugin, context.elevated(), {'port': port_data})
@ -316,7 +326,8 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
network_id)
with plugin_utils.delete_port_on_error(
self._core_plugin, context.elevated(), gw_port['id']):
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
router = self._get_router(context, router['id'])
router.gw_port = self._core_plugin._get_port(
context.elevated(), gw_port['id'])
router_port = l3_obj.RouterPort(
@ -325,7 +336,6 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
port_id=gw_port['id'],
port_type=DEVICE_OWNER_ROUTER_GW
)
context.session.add(router)
router_port.create()
def _validate_gw_info(self, context, gw_port, info, ext_ips):
@ -371,10 +381,14 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
gw_ips = [x['ip_address'] for x in router.gw_port['fixed_ips']]
gw_port_id = router.gw_port['id']
self._delete_router_gw_port_db(context, router)
if admin_ctx.session.is_active:
# TODO(ralonsoh): ML2 plugin "delete_port" should be called outside
# a DB transaction. In this case an exception is made but in order
# to prevent future errors, this call should be moved outside
# the current transaction.
admin_ctx.GUARD_TRANSACTION = False
self._core_plugin.delete_port(
admin_ctx, gw_port_id, l3_port_check=False)
with context.session.begin(subtransactions=True):
context.session.refresh(router)
# TODO(boden): normalize metadata
metadata = {'network_id': old_network_id,
'new_network_id': new_network_id,
@ -387,7 +401,7 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
resource_id=router_id))
def _delete_router_gw_port_db(self, context, router):
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
router.gw_port = None
if router not in context.session:
context.session.add(router)
@ -405,10 +419,12 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
def _create_gw_port(self, context, router_id, router, new_network_id,
ext_ips):
new_valid_gw_port_attachment = (
new_network_id and
(not router.gw_port or
router.gw_port['network_id'] != new_network_id))
with db_api.CONTEXT_READER.using(context):
router = self._get_router(context, router_id)
new_valid_gw_port_attachment = (
new_network_id and
(not router.gw_port or
router.gw_port['network_id'] != new_network_id))
if new_valid_gw_port_attachment:
subnets = self._core_plugin.get_subnets_by_network(context,
new_network_id)
@ -432,7 +448,9 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
self._create_router_gw_port(context, router,
new_network_id, ext_ips)
gw_ips = [x['ip_address'] for x in router.gw_port['fixed_ips']]
with db_api.CONTEXT_READER.using(context):
router = self._get_router(context, router_id)
gw_ips = [x['ip_address'] for x in router.gw_port['fixed_ips']]
registry.publish(resources.ROUTER_GATEWAY,
events.AFTER_CREATE,
@ -446,12 +464,8 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
def _update_current_gw_port(self, context, router_id, router, ext_ips):
self._core_plugin.update_port(context.elevated(), router.gw_port['id'],
{'port': {'fixed_ips': ext_ips}})
context.session.expire(router.gw_port)
def _update_router_gw_info(self, context, router_id, info, router=None):
# TODO(salvatore-orlando): guarantee atomic behavior also across
# operations that span beyond the model classes handled by this
# class (e.g.: delete_port)
router = router or self._get_router(context, router_id)
gw_port = router.gw_port
ext_ips = info.get('external_fixed_ips') if info else []
@ -504,27 +518,30 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
payload=events.DBEventPayload(
context, resource_id=id))
# TODO(nati) Refactor here when we have router insertion model
router = self._ensure_router_not_in_use(context, id)
original = self._make_router_dict(router)
self._delete_current_gw_port(context, id, router, None)
with context.session.begin(subtransactions=True):
context.session.refresh(router)
with db_api.CONTEXT_WRITER.using(context):
router = self._ensure_router_not_in_use(context, id)
original = self._make_router_dict(router)
self._delete_current_gw_port(context, id, router, None)
router_ports = router.attached_ports
for rp in router_ports:
self._core_plugin.delete_port(context.elevated(),
rp.port.id,
l3_port_check=False)
with context.session.begin(subtransactions=True):
context.session.refresh(router)
# TODO(ralonsoh): move this section (port deletion) out of the DB
# transaction.
router_ports_ids = (rp.port.id for rp in router.attached_ports)
if context.session.is_active:
context.GUARD_TRANSACTION = False
for rp_id in router_ports_ids:
self._core_plugin.delete_port(context.elevated(), rp_id,
l3_port_check=False)
router = self._get_router(context, id)
registry.notify(resources.ROUTER, events.PRECOMMIT_DELETE,
self, context=context, router_db=router,
router_id=id)
# we bump the revision even though we are about to delete to throw
# staledataerror if something snuck in with a new interface
# staledataerror if something stuck in with a new interface
router.bump_revision()
context.session.flush()
context.session.delete(router)
registry.notify(resources.ROUTER, events.AFTER_DELETE, self,
context=context, router_id=id, original=original)
@ -667,7 +684,7 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
raise n_exc.BadRequest(resource='router', msg=msg)
def _validate_router_port_info(self, context, router, port_id):
with db_api.autonested_transaction(context.session):
with db_api.CONTEXT_READER.using(context):
# check again within transaction to mitigate race
port = self._check_router_port(context, port_id, router.id)
@ -873,8 +890,6 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
new_interface=new_router_intf,
interface_info=interface_info)
with context.session.begin(subtransactions=True):
context.session.refresh(router)
return self._make_router_interface_info(
router.id, port['tenant_id'], port['id'], port['network_id'],
subnets[-1]['id'], [subnet['id'] for subnet in subnets])
@ -1018,8 +1033,6 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
port=port,
router_id=router_id,
interface_info=interface_info)
with context.session.begin(subtransactions=True):
context.session.refresh(router)
return self._make_router_interface_info(router_id, port['tenant_id'],
port['id'], port['network_id'],
subnets[0]['id'],
@ -1321,7 +1334,7 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
with plugin_utils.delete_port_on_error(
self._core_plugin, context.elevated(),
external_port['id']),\
context.session.begin(subtransactions=True):
db_api.CONTEXT_WRITER.using(context):
# Ensure IPv4 addresses are allocated on external port
external_ipv4_ips = self._port_ipv4_fixed_ips(external_port)
if not external_ipv4_ips:
@ -1396,7 +1409,7 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
raise e.errors[0].error
fip = floatingip['floatingip']
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
floatingip_obj = self._get_floatingip(context, id)
old_floatingip = self._make_floatingip_dict(floatingip_obj)
old_fixed_port_id = floatingip_obj.fixed_port_id
@ -1591,7 +1604,7 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
This parameter is ignored.
@return: set of router-ids that require notification updates
"""
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
floating_ip_objs = l3_obj.FloatingIP.get_objects(
context, fixed_port_id=port_id)
router_ids = {fip.router_id for fip in floating_ip_objs}
@ -1831,7 +1844,7 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
def _get_router_info_list(self, context, router_ids=None, active=None,
device_owners=None):
"""Query routers and their related floating_ips, interfaces."""
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
routers = self._get_sync_routers(context,
router_ids=router_ids,
active=active)

View File

@ -18,6 +18,7 @@
from neutron_lib.api.definitions import l3 as l3_apidef
from neutron_lib.api.definitions import qos_gateway_ip
from neutron_lib.api import extensions
from neutron_lib.db import api as db_api
from neutron_lib.db import resource_extend
from neutron_lib.services.qos import constants as qos_consts
from oslo_log import log as logging
@ -65,9 +66,10 @@ class L3_gw_ip_qos_dbonly_mixin(l3_gwmode_db.L3_NAT_dbonly_mixin):
self)._update_router_gw_info(
context, router_id, info, router)
if self._is_gw_ip_qos_supported and router.gw_port:
self._update_router_gw_qos_policy(context, router_id,
info, router)
with db_api.CONTEXT_WRITER.using(context):
if self._is_gw_ip_qos_supported and router.gw_port:
self._update_router_gw_qos_policy(context, router_id,
info, router)
return router
@ -94,9 +96,6 @@ class L3_gw_ip_qos_dbonly_mixin(l3_gwmode_db.L3_NAT_dbonly_mixin):
router_id,
old_qos_policy_id)
with context.session.begin(subtransactions=True):
context.session.refresh(router)
if new_qos_policy_id:
self._create_gw_ip_qos_db(
context, router_id, new_qos_policy_id)

View File

@ -17,6 +17,7 @@ from neutron_lib.api.definitions import l3 as l3_apidef
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib.db import api as db_api
from neutron_lib.db import resource_extend
from oslo_config import cfg
import sqlalchemy as sa
@ -56,10 +57,9 @@ class L3_NAT_dbonly_mixin(l3_db.L3_NAT_dbonly_mixin):
})
def _update_router_gw_info(self, context, router_id, info, router=None):
# Load the router only if necessary
if not router:
with db_api.CONTEXT_WRITER.using(context):
# Always load the router inside the DB context.
router = self._get_router(context, router_id)
with context.session.begin(subtransactions=True):
old_router = self._make_router_dict(router)
router.enable_snat = self._get_enable_snat(info)
router_body = {l3_apidef.ROUTER:
@ -75,7 +75,7 @@ class L3_NAT_dbonly_mixin(l3_db.L3_NAT_dbonly_mixin):
context, router_id, info, router=router)
# Returning the router might come back useful if this
# method is overridden in child classes
return router
return self._get_router(context, router_id)
@staticmethod
def _get_enable_snat(info):

View File

@ -143,7 +143,10 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
context, network_id=network_id, vr_id=vr_id)
allocation.create()
router_db.extra_attributes.ha_vr_id = allocation.vr_id
router_db.extra_attributes.update(
{'ha_vr_id': allocation.vr_id})
context.session.add(router_db.extra_attributes)
LOG.debug(
"Router %(router_id)s has been allocated a ha_vr_id "
"%(ha_vr_id)d.",
@ -200,7 +203,7 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
# and the process is started over where the existing
# network will be selected.
raise db_exc.DBDuplicateEntry(columns=['tenant_id'])
return ha_network
return None, ha_network
def _add_ha_network_settings(self, network):
if cfg.CONF.l3_ha_network_type:
@ -267,7 +270,7 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
context, port_id=port_id, router_id=router_id)
portbinding.create()
return portbinding
return None, portbinding
except db_exc.DBReferenceError as e:
with excutils.save_and_reraise_exception() as ctxt:
if isinstance(e.inner_exception, sql_exc.IntegrityError):

View File

@ -104,7 +104,7 @@ def create_initial_revision(context, resource_uuid, resource_type,
LOG.debug('create_initial_revision uuid=%s, type=%s, rev=%s',
resource_uuid, resource_type, revision_number)
db_func = context.session.merge if may_exist else context.session.add
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
std_attr_id = std_attr_id or _get_standard_attr_id(
context, resource_uuid, resource_type)
row = ovn_models.OVNRevisionNumbers(
@ -116,7 +116,7 @@ def create_initial_revision(context, resource_uuid, resource_type,
@db_api.retry_if_session_inactive()
def delete_revision(context, resource_uuid, resource_type):
LOG.debug('delete_revision(%s)', resource_uuid)
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
row = context.session.query(ovn_models.OVNRevisionNumbers).filter_by(
resource_uuid=resource_uuid,
resource_type=resource_type).one_or_none()
@ -136,7 +136,7 @@ def _ensure_revision_row_exist(context, resource, resource_type, std_attr_id):
# deal with objects that already existed before the sync work. I believe
# that we can remove this method after few development cycles. Or,
# if we decide to make a migration script as well.
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(context):
if not context.session.query(ovn_models.OVNRevisionNumbers).filter_by(
resource_uuid=resource['id'],
resource_type=resource_type).one_or_none():
@ -152,7 +152,7 @@ def _ensure_revision_row_exist(context, resource, resource_type, std_attr_id):
@db_api.retry_if_session_inactive()
def get_revision_row(context, resource_uuid):
try:
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(context):
return context.session.query(
ovn_models.OVNRevisionNumbers).filter_by(
resource_uuid=resource_uuid).one()
@ -163,7 +163,7 @@ def get_revision_row(context, resource_uuid):
@db_api.retry_if_session_inactive()
def bump_revision(context, resource, resource_type):
revision_number = ovn_utils.get_revision_number(resource, resource_type)
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
# NOTE(ralonsoh): "resource" could be a dict or an OVO.
try:
std_attr_id = resource.db_obj.standard_attr.id
@ -203,7 +203,7 @@ def get_inconsistent_resources(context):
whens=MAINTENANCE_CREATE_UPDATE_TYPE_ORDER)
time_ = (timeutils.utcnow() -
datetime.timedelta(seconds=INCONSISTENCIES_OLDER_THAN))
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(context):
query = context.session.query(ovn_models.OVNRevisionNumbers).join(
standard_attr.StandardAttribute,
ovn_models.OVNRevisionNumbers.standard_attr_id ==
@ -232,6 +232,6 @@ def get_deleted_resources(context):
"""
sort_order = sa.case(value=ovn_models.OVNRevisionNumbers.resource_type,
whens=MAINTENANCE_DELETE_TYPE_ORDER)
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(context):
return context.session.query(ovn_models.OVNRevisionNumbers).filter_by(
standard_attr_id=None).order_by(sort_order).all()

View File

@ -109,18 +109,18 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase,
return self.get_security_group(context, existing_def_sg_id)
with db_api.CONTEXT_WRITER.using(context):
sg = sg_obj.SecurityGroup(
context, id=s.get('id') or uuidutils.generate_uuid(),
description=s['description'], project_id=tenant_id,
name=s['name'], is_default=default_sg, stateful=stateful)
sg.create()
delta = len(ext_sg.sg_supported_ethertypes)
delta = delta * 2 if default_sg else delta
reservation = quota.QUOTAS.make_reservation(
context, tenant_id, {'security_group_rule': delta},
self)
sg = sg_obj.SecurityGroup(
context, id=s.get('id') or uuidutils.generate_uuid(),
description=s['description'], project_id=tenant_id,
name=s['name'], is_default=default_sg, stateful=stateful)
sg.create()
for ethertype in ext_sg.sg_supported_ethertypes:
if default_sg:
# Allow intercommunication
@ -739,6 +739,17 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase,
raise ext_sg.SecurityGroupRuleParameterConflict(
ethertype=rule['ethertype'], cidr=input_prefix)
@db_api.retry_if_session_inactive()
def get_security_group_rules_count(self, context, filters=None):
filters = filters if filters else {}
if not filters and context.project_id and not context.is_admin:
rule_ids = sg_obj.SecurityGroupRule.get_security_group_rule_ids(
context.project_id)
filters = {'id': rule_ids}
return sg_obj.SecurityGroupRule.count(context_lib.get_admin_context(),
**filters)
@db_api.retry_if_session_inactive()
def get_security_group_rules(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,

View File

@ -405,9 +405,6 @@ class NeutronDbObject(NeutronObject, metaclass=DeclarativeObject):
# should be overridden for all rbac aware objects
rbac_db_cls = None
# whether to use new engine facade for the object
new_facade = False
primary_keys = ['id']
# 'unique_keys' is a list of unique keys that can be used with get_object
@ -571,16 +568,12 @@ class NeutronDbObject(NeutronObject, metaclass=DeclarativeObject):
@classmethod
def db_context_writer(cls, context):
"""Return read-write session activation decorator."""
if cls.new_facade or cls._use_db_facade(context):
return db_api.CONTEXT_WRITER.using(context)
return db_api.autonested_transaction(context.session)
return db_api.CONTEXT_WRITER.using(context)
@classmethod
def db_context_reader(cls, context):
"""Return read-only session activation decorator."""
if cls.new_facade or cls._use_db_facade(context):
return db_api.CONTEXT_READER.using(context)
return db_api.autonested_transaction(context.session)
return db_api.CONTEXT_READER.using(context)
@classmethod
def get_object(cls, context, fields=None, **kwargs):

View File

@ -37,7 +37,7 @@ class L3HARouterAgentPortBinding(base.NeutronDbObject):
}
primary_keys = ['port_id']
fields_no_update = ['router_id', 'port_id', 'l3_agent_id']
fields_no_update = ['router_id', 'port_id']
@classmethod
def get_l3ha_filter_host_router(cls, context, router_ids, host):

View File

@ -363,7 +363,6 @@ class DvrFipGatewayPortAgentBinding(base.NeutronDbObject):
VERSION = '1.0'
db_model = dvr_models.DvrFipGatewayPortAgentBinding
new_facade = True
primary_keys = ['network_id', 'agent_id']

View File

@ -24,8 +24,6 @@ class StandardAttribute(base.NeutronDbObject):
# Version 1.0: Initial version
VERSION = '1.0'
new_facade = True
db_model = standard_attr.StandardAttribute
fields = {

View File

@ -69,8 +69,6 @@ class Route(base.NeutronDbObject):
# Version 1.0: Initial version
VERSION = '1.0'
new_facade = True
db_model = models_v2.SubnetRoute
primary_keys = ['destination', 'nexthop', 'subnet_id']
@ -196,7 +194,6 @@ class Subnet(base.NeutronDbObject):
VERSION = '1.1'
db_model = models_v2.Subnet
new_facade = True
fields = {
'id': common_types.UUIDField(),
@ -500,7 +497,6 @@ class NetworkSubnetLock(base.NeutronDbObject):
VERSION = '1.0'
db_model = models_v2.NetworkSubnetLock
new_facade = True
primary_keys = ['network_id']
fields = {

View File

@ -30,7 +30,6 @@ class SubPort(base.NeutronDbObject):
VERSION = '1.0'
db_model = models.SubPort
new_facade = True
primary_keys = ['port_id']
foreign_keys = {'Trunk': {'trunk_id': 'id'}}
@ -89,7 +88,6 @@ class Trunk(base.NeutronDbObject):
VERSION = '1.1'
db_model = models.Trunk
new_facade = True
fields = {
'admin_state_up': obj_fields.BooleanField(),

View File

@ -30,6 +30,7 @@ from oslo_log import log as logging
from neutron.common import utils
from neutron.conf.db import l3_hamode_db
from neutron.db.models import l3agent as rb_model
from neutron.objects import l3_hamode as l3_hamode_obj
from neutron.objects import l3agent as rb_obj
@ -284,10 +285,12 @@ class L3Scheduler(object, metaclass=abc.ABCMeta):
port_binding = utils.create_object_with_dependency(
creator, dep_getter, dep_creator,
dep_id_attr, dep_deleter)[0]
# NOTE(ralonsoh): to be migrated to the new facade that can't be
# used with "create_object_with_dependency".
with lib_db_api.autonested_transaction(context.session):
with lib_db_api.CONTEXT_WRITER.using(context):
port_binding = (
l3_hamode_obj.L3HARouterAgentPortBinding.get_object(
context, port_id=port_binding['port_id']))
port_binding.l3_agent_id = agent['id']
port_binding.update()
except db_exc.DBDuplicateEntry:
LOG.debug("Router %(router)s already scheduled for agent "
"%(agent)s", {'router': router_id,

View File

@ -382,13 +382,15 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
raise lib_exc.BadRequest(resource=apidef.RESOURCE_NAME,
msg=message)
if self._rpc_notifications_required:
self.push_api.push(context, [pf_obj], rpc_events.CREATED)
registry.notify(pf_consts.PORT_FORWARDING, events.AFTER_CREATE,
self,
payload=[callbacks.PortForwardingPayload(context,
current_pf=pf_obj)])
return pf_obj
registry.notify(pf_consts.PORT_FORWARDING, events.AFTER_CREATE,
self,
payload=[callbacks.PortForwardingPayload(context,
current_pf=pf_obj)])
if self._rpc_notifications_required:
self.push_api.push(context, [pf_obj], rpc_events.CREATED)
return pf_obj
@db_base_plugin_common.convert_result_to_dict
def update_floatingip_port_forwarding(self, context, id, floatingip_id,

View File

@ -546,7 +546,7 @@ class QoSPlugin(qos.QoSPluginBase):
return rule
def _get_policy_id(self, context, rule_cls, rule_id):
with db_api.autonested_transaction(context.session):
with db_api.CONTEXT_READER.using(context):
rule_object = rule_cls.get_object(context, id=rule_id)
if not rule_object:
raise qos_exc.QosRuleNotFound(policy_id="", rule_id=rule_id)

View File

@ -226,6 +226,9 @@ class TestDhcpAgentHARaceCondition(BaseDhcpAgentTest):
self._simulate_concurrent_requests_process_and_raise(funcs, args)
def test_dhcp_agent_ha_with_race_condition(self):
# NOTE(ralonsoh): the concurrent creation in the same thread could
# fail because the context and the session is the same for all DB
# calls.
network_dhcp_agents = self.client.list_dhcp_agent_hosting_networks(
self.network['id'])['agents']
self.assertEqual(1, len(network_dhcp_agents))

View File

@ -14,7 +14,6 @@ from unittest import mock
from neutron_lib.api.definitions import fip_pf_description as ext_apidef
from neutron_lib.api.definitions import floating_ip_port_forwarding as apidef
from neutron_lib.callbacks import exceptions as c_exc
from neutron_lib import exceptions as lib_exc
from neutron_lib.exceptions import l3 as lib_l3_exc
from neutron_lib.plugins import constants as plugin_constants
@ -387,70 +386,6 @@ class PortForwardingTestCase(PortForwardingTestCaseBase):
self.pf_plugin.delete_floatingip_port_forwarding,
self.context, res['id'], uuidutils.generate_uuid())
def test_concurrent_create_port_forwarding_delete_fip(self):
func1 = self.pf_plugin.create_floatingip_port_forwarding
func2 = self._delete_floatingip
funcs = [func1, func2]
args_list = [(self.context, self.fip['id'], self.port_forwarding),
(self.fip['id'],)]
self.assertRaises(c_exc.CallbackFailure,
self._simulate_concurrent_requests_process_and_raise,
funcs, args_list)
port_forwardings = self.pf_plugin.get_floatingip_port_forwardings(
self.context, floatingip_id=self.fip['id'], fields=['id'])
self.pf_plugin.delete_floatingip_port_forwarding(
self.context, port_forwardings[0][apidef.ID],
floatingip_id=self.fip['id'])
funcs.reverse()
args_list.reverse()
self.assertRaises(lib_l3_exc.FloatingIPNotFound,
self._simulate_concurrent_requests_process_and_raise,
funcs, args_list)
def test_concurrent_create_port_forwarding_update_fip(self):
newport = self._create_port(self.fmt, self.net['id']).json['port']
func1 = self.pf_plugin.create_floatingip_port_forwarding
func2 = self._update_floatingip
funcs = [func1, func2]
args_list = [(self.context, self.fip['id'], self.port_forwarding),
(self.fip['id'], {'port_id': newport['id']})]
self.assertRaises(c_exc.CallbackFailure,
self._simulate_concurrent_requests_process_and_raise,
funcs, args_list)
funcs.reverse()
args_list.reverse()
self.assertRaises(c_exc.CallbackFailure,
self._simulate_concurrent_requests_process_and_raise,
funcs, args_list)
def test_concurrent_create_port_forwarding_update_port(self):
new_ip = self._find_ip_address(
self.subnet,
exclude=self._get_network_port_ips(),
is_random=True)
funcs = [self.pf_plugin.create_floatingip_port_forwarding,
self._update_port]
args_list = [(self.context, self.fip['id'], self.port_forwarding),
(self.port['id'], {
'fixed_ips': [{'subnet_id': self.subnet['id'],
'ip_address': new_ip}]})]
self._simulate_concurrent_requests_process_and_raise(funcs, args_list)
self.assertEqual([], self.pf_plugin.get_floatingip_port_forwardings(
self.context, floatingip_id=self.fip['id']))
def test_concurrent_create_port_forwarding_delete_port(self):
funcs = [self.pf_plugin.create_floatingip_port_forwarding,
self._delete_port]
args_list = [(self.context, self.fip['id'], self.port_forwarding),
(self.port['id'],)]
self._simulate_concurrent_requests_process_and_raise(funcs, args_list)
self.assertEqual([], self.pf_plugin.get_floatingip_port_forwardings(
self.context, floatingip_id=self.fip['id']))
def test_create_floatingip_port_forwarding_port_in_use(self):
res = self.pf_plugin.create_floatingip_port_forwarding(
self.context, self.fip['id'], self.port_forwarding)

View File

@ -21,6 +21,7 @@ from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants as n_const
from neutron_lib import context
from neutron_lib.db import api as db_api
from neutron_lib import exceptions as n_exc
from neutron_lib.plugins import constants as plugin_constants
from neutron_lib.plugins import directory
@ -39,9 +40,11 @@ from neutron.tests import base
from neutron.tests.unit.db import test_db_base_plugin_v2
class TestL3_NAT_dbonly_mixin(base.BaseTestCase):
def setUp(self):
super(TestL3_NAT_dbonly_mixin, self).setUp()
class TestL3_NAT_dbonly_mixin(
test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
def setUp(self, *args, **kwargs):
super(TestL3_NAT_dbonly_mixin, self).setUp(*args, **kwargs)
self.db = l3_db.L3_NAT_dbonly_mixin()
def test__each_port_having_fixed_ips_none(self):
@ -289,6 +292,9 @@ class TestL3_NAT_dbonly_mixin(base.BaseTestCase):
**kwargs)
def test__create_gw_port(self):
# NOTE(slaweq): this test is probably wrong
# returing dict as gw_port breaks test later in L334 in
# neutron.db.l3_db file
router_id = '2afb8434-7380-43a2-913f-ba3a5ad5f349'
router = l3_models.Router(id=router_id)
new_network_id = 'net-id'
@ -298,37 +304,42 @@ class TestL3_NAT_dbonly_mixin(base.BaseTestCase):
'id': '8742d007-6f05-4b7e-abdb-11818f608959'}
ctx = context.get_admin_context()
with mock.patch.object(directory, 'get_plugin') as get_p, \
mock.patch.object(get_p(), 'get_subnets_by_network',
return_value=mock.ANY), \
mock.patch.object(get_p(), '_get_port',
return_value=gw_port), \
mock.patch.object(l3_db.L3_NAT_dbonly_mixin,
'_check_for_dup_router_subnets') as cfdrs,\
mock.patch.object(plugin_utils, 'create_port',
return_value=gw_port), \
mock.patch.object(ctx.session, 'add'), \
mock.patch.object(base_obj.NeutronDbObject, 'create'), \
mock.patch.object(l3_db.registry, 'publish') as mock_notify:
with db_api.CONTEXT_WRITER.using(ctx):
with mock.patch.object(directory, 'get_plugin') as get_p, \
mock.patch.object(get_p(), 'get_subnets_by_network',
return_value=mock.ANY), \
mock.patch.object(get_p(), '_get_port',
return_value=gw_port), \
mock.patch.object(l3_db.L3_NAT_dbonly_mixin,
'_check_for_dup_router_subnets') as \
cfdrs, \
mock.patch.object(plugin_utils, 'create_port',
return_value=gw_port), \
mock.patch.object(ctx.session, 'add'), \
mock.patch.object(base_obj.NeutronDbObject, 'create'), \
mock.patch.object(l3_db.registry, 'publish') as \
mock_notify, \
mock.patch.object(l3_db.L3_NAT_dbonly_mixin, '_get_router',
return_value=router):
self.db._create_gw_port(ctx, router_id=router_id,
router=router,
new_network_id=new_network_id,
ext_ips=ext_ips)
self.db._create_gw_port(ctx, router_id=router_id,
router=router,
new_network_id=new_network_id,
ext_ips=ext_ips)
expected_gw_ips = ['1.1.1.1']
expected_gw_ips = ['1.1.1.1']
self.assertTrue(cfdrs.called)
mock_notify.assert_called_with(
resources.ROUTER_GATEWAY, events.AFTER_CREATE,
self.db._create_gw_port, payload=mock.ANY)
cb_payload = mock_notify.mock_calls[1][2]['payload']
self.assertEqual(ctx, cb_payload.context)
self.assertEqual(expected_gw_ips,
cb_payload.metadata.get('gateway_ips'))
self.assertEqual(new_network_id,
cb_payload.metadata.get('network_id'))
self.assertEqual(router_id, cb_payload.resource_id)
self.assertTrue(cfdrs.called)
mock_notify.assert_called_with(
resources.ROUTER_GATEWAY, events.AFTER_CREATE,
self.db._create_gw_port, payload=mock.ANY)
cb_payload = mock_notify.mock_calls[1][2]['payload']
self.assertEqual(ctx, cb_payload.context)
self.assertEqual(expected_gw_ips,
cb_payload.metadata.get('gateway_ips'))
self.assertEqual(new_network_id,
cb_payload.metadata.get('network_id'))
self.assertEqual(router_id, cb_payload.resource_id)
class L3_NAT_db_mixin(base.BaseTestCase):
@ -428,20 +439,20 @@ class L3TestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
l3_obj.Router.get_object(self.ctx, id=self.router['id']).delete()
def create_router(self, router):
with self.ctx.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(self.ctx):
return self.mixin.create_router(self.ctx, router)
def create_port(self, net_id, port_info):
with self.ctx.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(self.ctx):
return self._make_port(self.fmt, net_id, **port_info)
def create_network(self, name=None, **kwargs):
name = name or 'network1'
with self.ctx.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(self.ctx):
return self._make_network(self.fmt, name, True, **kwargs)
def create_subnet(self, network, gateway, cidr, **kwargs):
with self.ctx.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(self.ctx):
return self._make_subnet(self.fmt, network, gateway, cidr,
**kwargs)

View File

@ -21,6 +21,7 @@ from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants as const
from neutron_lib import context
from neutron_lib.db import api as db_api
from neutron_lib import exceptions
from neutron_lib.exceptions import l3 as l3_exc
from neutron_lib.objects import exceptions as o_exc
@ -60,11 +61,11 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
directory.add_plugin(plugin_constants.L3, self.mixin)
def _create_router(self, router):
with self.ctx.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(self.ctx):
return self.mixin._create_router_db(self.ctx, router, 'foo_tenant')
def create_port(self, net_id, port_info):
with self.ctx.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(self.ctx):
return self._create_port(self.fmt, net_id, **port_info)
def _test__create_router_db(self, expected=False, distributed=None):
@ -463,20 +464,20 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
'admin_state_up': True,
'distributed': True
}
router = self._create_router(router_db)
if gw_port:
with self.subnet(cidr='10.10.10.0/24') as subnet:
port_dict = {
'device_id': router.id,
'device_owner': const.DEVICE_OWNER_ROUTER_GW,
'admin_state_up': True,
'fixed_ips': [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.10.10.100'}]
}
net_id = subnet['subnet']['network_id']
port_res = self.create_port(net_id, port_dict)
port_res_dict = self.deserialize(self.fmt, port_res)
with self.ctx.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(self.ctx):
router = self._create_router(router_db)
if gw_port:
with self.subnet(cidr='10.10.10.0/24') as subnet:
port_dict = {
'device_id': router.id,
'device_owner': const.DEVICE_OWNER_ROUTER_GW,
'admin_state_up': True,
'fixed_ips': [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.10.10.100'}]
}
net_id = subnet['subnet']['network_id']
port_res = self.create_port(net_id, port_dict)
port_res_dict = self.deserialize(self.fmt, port_res)
port_db = self.ctx.session.query(models_v2.Port).filter_by(
id=port_res_dict['port']['id']).one()
router.gw_port = port_db
@ -487,9 +488,8 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
)
self.ctx.session.add(router)
self.ctx.session.add(router_port)
else:
net_id = None
else:
net_id = None
plugin = mock.Mock()
directory.add_plugin(plugin_constants.CORE, plugin)
@ -1132,6 +1132,10 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
mock_notify.assert_called_once_with(
'router', 'before_update', self.mixin, **kwargs)
def _assert_mock_called_with_router(self, mock_fn, router_id):
router = mock_fn.call_args[1].get('router_db')
self.assertEqual(router_id, router.id)
def test__validate_router_migration_notify_advanced_services_mocked(self):
# call test with admin_state_down_before_update ENABLED
self._test__validate_router_migration_notify_advanced_services()
@ -1152,9 +1156,16 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
interface_info = {'subnet_id': sub['subnet']['id']}
self.mixin.add_router_interface(self.ctx, router_db.id,
interface_info)
mock_notify.assert_called_once_with(self.ctx, router_db=router_db,
# NOTE(slaweq): here we are just checking if mock_notify was called
# with kwargs which we are expecting, but we can't check exactly if
# router_db was object which we are expecting and because of that
# below we are checking if router_db used as argument in
# mock_notify call is the has same id as the one which we are
# expecting
mock_notify.assert_called_once_with(self.ctx, router_db=mock.ANY,
port=mock.ANY,
interface_info=interface_info)
self._assert_mock_called_with_router(mock_notify, router_db.id)
def test_validate_add_router_interface_by_port_notify_advanced_services(
self):
@ -1169,9 +1180,16 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
interface_info = {'port_id': port['port']['id']}
self.mixin.add_router_interface(self.ctx, router_db.id,
interface_info)
mock_notify.assert_called_once_with(self.ctx, router_db=router_db,
# NOTE(slaweq): here we are just checking if mock_notify was called
# with kwargs which we are expecting, but we can't check exactly if
# router_db was object which we are expecting and because of that
# below we are checking if router_db used as argument in
# mock_notify call is the has same id as the one which we are
# expecting.
mock_notify.assert_called_once_with(self.ctx, router_db=mock.ANY,
port=mock.ANY,
interface_info=interface_info)
self._assert_mock_called_with_router(mock_notify, router_db.id)
def test_add_router_interface_csnat_ports_failure(self):
router_dict = {'name': 'test_router', 'admin_state_up': True,

View File

@ -54,57 +54,72 @@ class TestRevisionNumber(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
def _create_initial_revision(self, resource_uuid, resource_type,
revision_number=ovn_rn_db.INITIAL_REV_NUM,
may_exist=False):
with self.ctx.session.begin(subtransactions=True):
ovn_rn_db.create_initial_revision(
self.ctx, resource_uuid, resource_type,
revision_number=revision_number, may_exist=may_exist)
ovn_rn_db.create_initial_revision(
self.ctx, resource_uuid, resource_type,
revision_number=revision_number, may_exist=may_exist)
def test_bump_revision(self):
self._create_initial_revision(self.net['id'], ovn_rn_db.TYPE_NETWORKS)
self.net['revision_number'] = 123
ovn_rn_db.bump_revision(self.ctx, self.net,
ovn_rn_db.TYPE_NETWORKS)
row = ovn_rn_db.get_revision_row(self.ctx, self.net['id'])
self.assertEqual(123, row.revision_number)
with db_api.CONTEXT_WRITER.using(self.ctx):
self._create_initial_revision(self.net['id'],
ovn_rn_db.TYPE_NETWORKS)
self.net['revision_number'] = 123
ovn_rn_db.bump_revision(self.ctx, self.net,
ovn_rn_db.TYPE_NETWORKS)
row = ovn_rn_db.get_revision_row(self.ctx, self.net['id'])
self.assertEqual(123, row.revision_number)
def test_bump_older_revision(self):
self._create_initial_revision(self.net['id'], ovn_rn_db.TYPE_NETWORKS,
revision_number=124)
self.net['revision_number'] = 1
ovn_rn_db.bump_revision(self.ctx, self.net,
ovn_rn_db.TYPE_NETWORKS)
row = ovn_rn_db.get_revision_row(self.ctx, self.net['id'])
self.assertEqual(124, row.revision_number)
with db_api.CONTEXT_WRITER.using(self.ctx):
self._create_initial_revision(
self.net['id'], ovn_rn_db.TYPE_NETWORKS,
revision_number=124)
self.net['revision_number'] = 1
ovn_rn_db.bump_revision(self.ctx, self.net,
ovn_rn_db.TYPE_NETWORKS)
row = ovn_rn_db.get_revision_row(self.ctx, self.net['id'])
self.assertEqual(124, row.revision_number)
@mock.patch.object(ovn_rn_db.LOG, 'warning')
def test_bump_revision_row_not_found(self, mock_log):
self.net['revision_number'] = 123
ovn_rn_db.bump_revision(self.ctx, self.net, ovn_rn_db.TYPE_NETWORKS)
# Assert the revision number wasn't bumped
row = ovn_rn_db.get_revision_row(self.ctx, self.net['id'])
self.assertEqual(123, row.revision_number)
self.assertIn('No revision row found for', mock_log.call_args[0][0])
with db_api.CONTEXT_WRITER.using(self.ctx):
self.net['revision_number'] = 123
ovn_rn_db.bump_revision(self.ctx, self.net,
ovn_rn_db.TYPE_NETWORKS)
# Assert the revision number wasn't bumped
row = ovn_rn_db.get_revision_row(self.ctx, self.net['id'])
self.assertEqual(123, row.revision_number)
self.assertIn('No revision row found for',
mock_log.call_args[0][0])
def test_delete_revision(self):
self._create_initial_revision(self.net['id'], ovn_rn_db.TYPE_NETWORKS)
ovn_rn_db.delete_revision(self.ctx, self.net['id'],
ovn_rn_db.TYPE_NETWORKS)
row = ovn_rn_db.get_revision_row(self.ctx, self.net['id'])
self.assertIsNone(row)
with db_api.CONTEXT_WRITER.using(self.ctx):
self._create_initial_revision(self.net['id'],
ovn_rn_db.TYPE_NETWORKS)
ovn_rn_db.delete_revision(self.ctx, self.net['id'],
ovn_rn_db.TYPE_NETWORKS)
row = ovn_rn_db.get_revision_row(self.ctx, self.net['id'])
self.assertIsNone(row)
def test_create_initial_revision_may_exist_duplicated_entry(self):
args = (self.net['id'], ovn_rn_db.TYPE_NETWORKS)
self._create_initial_revision(*args)
# Assert DBDuplicateEntry is raised when may_exist is False (default)
self.assertRaises(db_exc.DBDuplicateEntry,
self._create_initial_revision, *args)
try:
self._create_initial_revision(*args, may_exist=True)
except db_exc.DBDuplicateEntry:
self.fail("create_initial_revision shouldn't raise "
"DBDuplicateEntry when may_exist is True")
with db_api.CONTEXT_WRITER.using(self.ctx):
args = (self.net['id'], ovn_rn_db.TYPE_NETWORKS)
self._create_initial_revision(*args)
# DBDuplicateEntry is raised when may_exist is False (default)
self._create_initial_revision(*args)
except Exception as exc:
if type(exc) is not db_exc.DBDuplicateEntry:
self.fail("create_initial_revision with the same parameters "
"should have raisen a DBDuplicateEntry exception")
with db_api.CONTEXT_WRITER.using(self.ctx):
args = (self.net['id'], ovn_rn_db.TYPE_NETWORKS)
self._create_initial_revision(*args)
try:
self._create_initial_revision(*args, may_exist=True)
except db_exc.DBDuplicateEntry:
self.fail("create_initial_revision shouldn't raise "
"DBDuplicateEntry when may_exist is True")
class TestMaintenancePlugin(test_securitygroup.SecurityGroupTestPlugin,
@ -149,7 +164,7 @@ class TestRevisionNumberMaintenance(test_securitygroup.SecurityGroupsTestCase,
def _create_initial_revision(self, resource_uuid, resource_type,
revision_number=ovn_rn_db.INITIAL_REV_NUM,
may_exist=False):
with self.ctx.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(self.ctx):
ovn_rn_db.create_initial_revision(
self.ctx, resource_uuid, resource_type,
revision_number=revision_number, may_exist=may_exist)

View File

@ -821,8 +821,8 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
def test_router_update_gateway(self):
with self.router() as r:
with self.subnet() as s1:
with self.subnet() as s2:
with self.subnet(cidr='10.51.0.0/24') as s1:
with self.subnet(cidr='10.52.0.0/24') as s2:
self._set_net_external(s1['subnet']['network_id'])
self._add_external_gateway_to_router(
r['router']['id'],

View File

@ -14,6 +14,7 @@
# limitations under the License.
import copy
from unittest import mock
from neutron_lib.api.definitions import port_security as psec
from neutron_lib.api import validators
@ -28,6 +29,7 @@ from neutron.db import db_base_plugin_v2
from neutron.db import portsecurity_db
from neutron.db import securitygroups_db
from neutron.extensions import securitygroup as ext_sg
from neutron import quota
from neutron.tests.unit.db import test_db_base_plugin_v2
from neutron.tests.unit.extensions import test_securitygroup
@ -65,9 +67,9 @@ class PortSecurityTestPlugin(db_base_plugin_v2.NeutronDbPluginV2,
supported_extension_aliases = ["security-group", psec.ALIAS]
def create_network(self, context, network):
tenant_id = network['network'].get('tenant_id')
self._ensure_default_security_group(context, tenant_id)
with db_api.CONTEXT_WRITER.using(context):
tenant_id = network['network'].get('tenant_id')
self._ensure_default_security_group(context, tenant_id)
neutron_db = super(PortSecurityTestPlugin, self).create_network(
context, network)
neutron_db.update(network['network'])
@ -177,6 +179,14 @@ class PortSecurityDBTestCase(PortSecurityTestCase):
class TestPortSecurity(PortSecurityDBTestCase):
def setUp(self, plugin=None, service_plugins=None):
super().setUp(plugin)
make_res = mock.patch.object(quota.QuotaEngine, 'make_reservation')
commit_res = mock.patch.object(quota.QuotaEngine, 'commit_reservation')
self.mock_quota_make_res = make_res.start()
self.mock_quota_commit_res = commit_res.start()
def test_create_network_with_portsecurity_mac(self):
res = self._create_network('json', 'net1', True)
net = self.deserialize('json', res)

View File

@ -99,56 +99,61 @@ class CRUDScenarioTestCase(testlib_api.SqlTestCase):
self.obj_cls = network.Network
self.ctxt = context.get_admin_context()
def _compare_objs(self, obj1, obj2):
for field in (field for field in self.obj_cls.fields if
field not in ('updated_at', 'created_at')):
self.assertEqual(getattr(obj1, field, None),
getattr(obj2, field, None))
def test_get_object_with_None_value_in_filters(self):
obj = api.create_object(self.obj_cls, self.ctxt, {'name': 'foo'})
new_obj = api.get_object(
self.obj_cls, self.ctxt, name='foo', status=None)
self.assertEqual(obj, new_obj)
self._compare_objs(obj, new_obj)
def test_get_objects_with_None_value_in_filters(self):
obj = api.create_object(self.obj_cls, self.ctxt, {'name': 'foo'})
new_objs = api.get_objects(
self.obj_cls, self.ctxt, name='foo', status=None)
self.assertEqual(obj, new_objs[0])
self._compare_objs(obj, new_objs[0])
def test_get_objects_with_string_matching_filters_contains(self):
obj1 = api.create_object(
self.obj_cls, self.ctxt, {'name': 'obj_con_1'})
obj2 = api.create_object(
self.obj_cls, self.ctxt, {'name': 'obj_con_2'})
obj3 = api.create_object(
self.obj_cls, self.ctxt, {'name': 'obj_3'})
api.create_object(self.obj_cls, self.ctxt, {'name': 'obj_3'})
objs = api.get_objects(
self.obj_cls, self.ctxt, name=obj_utils.StringContains('con'))
self.obj_cls, self.ctxt, name=obj_utils.StringContains('con'),
_pager=base.Pager(sorts=[('name', True)]))
self.assertEqual(2, len(objs))
self.assertIn(obj1, objs)
self.assertIn(obj2, objs)
self.assertNotIn(obj3, objs)
self._compare_objs(obj1, objs[0])
self._compare_objs(obj2, objs[1])
def test_get_objects_with_string_matching_filters_starts(self):
obj1 = api.create_object(self.obj_cls, self.ctxt, {'name': 'pre_obj1'})
obj2 = api.create_object(self.obj_cls, self.ctxt, {'name': 'pre_obj2'})
obj3 = api.create_object(self.obj_cls, self.ctxt, {'name': 'obj_3'})
api.create_object(self.obj_cls, self.ctxt, {'name': 'obj_3'})
objs = api.get_objects(
self.obj_cls, self.ctxt, name=obj_utils.StringStarts('pre'))
self.obj_cls, self.ctxt, name=obj_utils.StringStarts('pre'),
_pager=base.Pager(sorts=[('name', True)]))
self.assertEqual(2, len(objs))
self.assertIn(obj1, objs)
self.assertIn(obj2, objs)
self.assertNotIn(obj3, objs)
self._compare_objs(obj1, objs[0])
self._compare_objs(obj2, objs[1])
def test_get_objects_with_string_matching_filters_ends(self):
obj1 = api.create_object(self.obj_cls, self.ctxt, {'name': 'obj1_end'})
obj2 = api.create_object(self.obj_cls, self.ctxt, {'name': 'obj2_end'})
obj3 = api.create_object(self.obj_cls, self.ctxt, {'name': 'obj_3'})
api.create_object(self.obj_cls, self.ctxt, {'name': 'obj_3'})
objs = api.get_objects(
self.obj_cls, self.ctxt, name=obj_utils.StringEnds('end'))
self.obj_cls, self.ctxt, name=obj_utils.StringEnds('end'),
_pager=base.Pager(sorts=[('name', True)]))
self.assertEqual(2, len(objs))
self.assertIn(obj1, objs)
self.assertIn(obj2, objs)
self.assertNotIn(obj3, objs)
self._compare_objs(obj1, objs[0])
self._compare_objs(obj2, objs[1])
def test_get_values_with_None_value_in_filters(self):
api.create_object(self.obj_cls, self.ctxt, {'name': 'foo'})
@ -201,15 +206,14 @@ class CRUDScenarioTestCase(testlib_api.SqlTestCase):
obj = api.create_object(self.obj_cls, self.ctxt, {'name': 'foo'})
new_obj = api.get_object(self.obj_cls, self.ctxt, id=obj.id)
self.assertEqual(obj, new_obj)
self._compare_objs(obj, new_obj)
obj = new_obj
obj.name = 'bar'
api.update_object(self.obj_cls, self.ctxt, {'name': 'bar'}, id=obj.id)
new_obj = api.get_object(self.obj_cls, self.ctxt, id=obj.id)
self.assertEqual(obj, new_obj)
self._compare_objs(obj, new_obj)
obj = new_obj
api.delete_object(self.obj_cls, self.ctxt, id=obj.id)
new_obj = api.get_object(self.obj_cls, self.ctxt, id=obj.id)

View File

@ -101,8 +101,6 @@ class FakeSmallNeutronObjectNewEngineFacade(base.NeutronDbObject):
db_model = ObjectFieldsModel
new_facade = True
primary_keys = ['field1']
foreign_keys = {
@ -1796,7 +1794,7 @@ class BaseDbObjectTestCase(_BaseObjectTestCase,
def test_get_objects_single_transaction(self):
with mock.patch(self._get_ro_txn_exit_func_name()) as mock_exit:
with db_api.autonested_transaction(self.context.session):
with db_api.CONTEXT_READER.using(self.context):
self._test_class.get_objects(self.context)
self.assertEqual(1, mock_exit.call_count)
@ -1811,7 +1809,7 @@ class BaseDbObjectTestCase(_BaseObjectTestCase,
obj.create()
with mock.patch(self._get_ro_txn_exit_func_name()) as mock_exit:
with db_api.autonested_transaction(self.context.session):
with db_api.CONTEXT_READER.using(self.context):
obj = self._test_class.get_object(self.context,
**obj._get_composite_keys())
self.assertEqual(1, mock_exit.call_count)

View File

@ -18,6 +18,7 @@ import netaddr
from neutron_lib.api.definitions import qos as qos_api
from neutron_lib import constants
from neutron_lib import context
from neutron_lib.db import api as db_api
from neutron_lib.services.qos import constants as qos_constants
from oslo_config import cfg
from oslo_utils import uuidutils
@ -435,7 +436,9 @@ class TestOVNClientQosExtension(test_plugin.Ml2PluginV2TestCase):
# We can't ensure the call order because we are not enforcing any order
# when retrieving the port and the network list.
self.mock_rules.assert_has_calls(calls, any_order=True)
fip = self.qos_driver._plugin_l3._make_floatingip_dict(self.fips[0])
with db_api.CONTEXT_READER.using(self.ctx):
fip = self.qos_driver._plugin_l3.get_floatingip(self.ctx,
self.fips[0].id)
mock_update_fip.assert_called_once_with(self.txn, fip)
def test_update_floatingip(self):

View File

@ -17,6 +17,7 @@ from unittest import mock
from futurist import periodics
from neutron_lib import context
from neutron_lib.db import api as db_api
from oslo_config import cfg
from neutron.common.ovn import constants
@ -139,39 +140,40 @@ class TestDBInconsistenciesPeriodics(testlib_api.SqlTestCaseLight,
never_again=False)
def _test_fix_create_update_network(self, ovn_rev, neutron_rev):
self.net['revision_number'] = neutron_rev
with db_api.CONTEXT_WRITER.using(self.ctx):
self.net['revision_number'] = neutron_rev
# Create an entry to the revision_numbers table and assert the
# initial revision_number for our test object is the expected
ovn_revision_numbers_db.create_initial_revision(
self.ctx, self.net['id'], constants.TYPE_NETWORKS,
revision_number=ovn_rev)
row = ovn_revision_numbers_db.get_revision_row(self.ctx,
self.net['id'])
self.assertEqual(ovn_rev, row.revision_number)
# Create an entry to the revision_numbers table and assert the
# initial revision_number for our test object is the expected
ovn_revision_numbers_db.create_initial_revision(
self.ctx, self.net['id'], constants.TYPE_NETWORKS,
revision_number=ovn_rev)
row = ovn_revision_numbers_db.get_revision_row(self.ctx,
self.net['id'])
self.assertEqual(ovn_rev, row.revision_number)
if ovn_rev < 0:
self.fake_ovn_client._nb_idl.get_lswitch.return_value = None
else:
fake_ls = mock.Mock(external_ids={
constants.OVN_REV_NUM_EXT_ID_KEY: ovn_rev})
self.fake_ovn_client._nb_idl.get_lswitch.return_value = fake_ls
if ovn_rev < 0:
self.fake_ovn_client._nb_idl.get_lswitch.return_value = None
else:
fake_ls = mock.Mock(external_ids={
constants.OVN_REV_NUM_EXT_ID_KEY: ovn_rev})
self.fake_ovn_client._nb_idl.get_lswitch.return_value = fake_ls
self.fake_ovn_client._plugin.get_network.return_value = self.net
self.periodic._fix_create_update(self.ctx, row)
self.fake_ovn_client._plugin.get_network.return_value = self.net
self.periodic._fix_create_update(self.ctx, row)
# Since the revision number was < 0, make sure create_network()
# is invoked with the latest version of the object in the neutron
# database
if ovn_rev < 0:
self.fake_ovn_client.create_network.assert_called_once_with(
self.ctx, self.net)
# If the revision number is > 0 it means that the object already
# exist and we just need to update to match the latest in the
# neutron database so, update_network() should be called.
else:
self.fake_ovn_client.update_network.assert_called_once_with(
self.ctx, self.net)
# Since the revision number was < 0, make sure create_network()
# is invoked with the latest version of the object in the neutron
# database
if ovn_rev < 0:
self.fake_ovn_client.create_network.assert_called_once_with(
self.ctx, self.net)
# If the revision number is > 0 it means that the object already
# exist and we just need to update to match the latest in the
# neutron database so, update_network() should be called.
else:
self.fake_ovn_client.update_network.assert_called_once_with(
self.ctx, self.net)
def test_fix_network_create(self):
self._test_fix_create_update_network(ovn_rev=-1, neutron_rev=2)
@ -180,40 +182,41 @@ class TestDBInconsistenciesPeriodics(testlib_api.SqlTestCaseLight,
self._test_fix_create_update_network(ovn_rev=5, neutron_rev=7)
def _test_fix_create_update_port(self, ovn_rev, neutron_rev):
self.port['revision_number'] = neutron_rev
_nb_idl = self.fake_ovn_client._nb_idl
with db_api.CONTEXT_WRITER.using(self.ctx):
self.port['revision_number'] = neutron_rev
# Create an entry to the revision_numbers table and assert the
# initial revision_number for our test object is the expected
ovn_revision_numbers_db.create_initial_revision(
self.ctx, self.port['id'], constants.TYPE_PORTS,
revision_number=ovn_rev)
row = ovn_revision_numbers_db.get_revision_row(self.ctx,
self.port['id'])
self.assertEqual(ovn_rev, row.revision_number)
# Create an entry to the revision_numbers table and assert the
# initial revision_number for our test object is the expected
ovn_revision_numbers_db.create_initial_revision(
self.ctx, self.port['id'], constants.TYPE_PORTS,
revision_number=ovn_rev)
row = ovn_revision_numbers_db.get_revision_row(self.ctx,
self.port['id'])
self.assertEqual(ovn_rev, row.revision_number)
if ovn_rev < 0:
self.fake_ovn_client._nb_idl.get_lswitch_port.return_value = None
else:
fake_lsp = mock.Mock(external_ids={
constants.OVN_REV_NUM_EXT_ID_KEY: ovn_rev})
self.fake_ovn_client._nb_idl.get_lswitch_port.return_value = (
fake_lsp)
if ovn_rev < 0:
_nb_idl.get_lswitch_port.return_value = None
else:
fake_lsp = mock.Mock(external_ids={
constants.OVN_REV_NUM_EXT_ID_KEY: ovn_rev})
_nb_idl.get_lswitch_port.return_value = fake_lsp
self.fake_ovn_client._plugin.get_port.return_value = self.port
self.periodic._fix_create_update(self.ctx, row)
self.fake_ovn_client._plugin.get_port.return_value = self.port
self.periodic._fix_create_update(self.ctx, row)
# Since the revision number was < 0, make sure create_port()
# is invoked with the latest version of the object in the neutron
# database
if ovn_rev < 0:
self.fake_ovn_client.create_port.assert_called_once_with(
self.ctx, self.port)
# If the revision number is > 0 it means that the object already
# exist and we just need to update to match the latest in the
# neutron database so, update_port() should be called.
else:
self.fake_ovn_client.update_port.assert_called_once_with(
self.ctx, self.port)
# Since the revision number was < 0, make sure create_port()
# is invoked with the latest version of the object in the neutron
# database
if ovn_rev < 0:
self.fake_ovn_client.create_port.assert_called_once_with(
self.ctx, self.port)
# If the revision number is > 0 it means that the object already
# exist and we just need to update to match the latest in the
# neutron database so, update_port() should be called.
else:
self.fake_ovn_client.update_port.assert_called_once_with(
self.ctx, self.port)
def test_fix_port_create(self):
self._test_fix_create_update_port(ovn_rev=-1, neutron_rev=2)
@ -223,14 +226,16 @@ class TestDBInconsistenciesPeriodics(testlib_api.SqlTestCaseLight,
@mock.patch.object(ovn_revision_numbers_db, 'bump_revision')
def _test_fix_security_group_create(self, mock_bump, revision_number):
sg_name = utils.ovn_addrset_name('fake_id', 'ip4')
sg = self._make_security_group(self.fmt, sg_name, '')['security_group']
with db_api.CONTEXT_WRITER.using(self.ctx):
sg_name = utils.ovn_addrset_name('fake_id', 'ip4')
sg = self._make_security_group(
self.fmt, sg_name, '')['security_group']
ovn_revision_numbers_db.create_initial_revision(
self.ctx, sg['id'], constants.TYPE_SECURITY_GROUPS,
revision_number=revision_number)
row = ovn_revision_numbers_db.get_revision_row(self.ctx, sg['id'])
self.assertEqual(revision_number, row.revision_number)
ovn_revision_numbers_db.create_initial_revision(
self.ctx, sg['id'], constants.TYPE_SECURITY_GROUPS,
revision_number=revision_number)
row = ovn_revision_numbers_db.get_revision_row(self.ctx, sg['id'])
self.assertEqual(revision_number, row.revision_number)
if revision_number < 0:
self.fake_ovn_client._nb_idl.get_address_set.return_value = None