Finish the new DB engine facade migration

This patch implements the last code bits pending to
conclude the new DB engine facade migration.

Due to the resultant interactions in the modified code, is
not possible to submit smaller patches; this code must be
migrated at once.

Partially-Implements blueprint: enginefacade-switch

Signed-off-by: Slawek Kaplonski <skaplons@redhat.com>
Co-Authored-By: Rodolfo Alonso Hernandez <ralonsoh@redhat.com>

Change-Id: Id3f09b78c8d0a8daa7ec4fa6f5bf79f7d5ab8f8b
This commit is contained in:
Slawek Kaplonski 2020-03-26 05:32:14 +01:00 committed by Rodolfo Alonso Hernandez
parent c2e39c2ce9
commit bf35cf65c8
28 changed files with 383 additions and 401 deletions

View File

@ -355,48 +355,6 @@ model, the nullable parameter is by default :code:`True`, while for OVO fields,
the nullable is set to :code:`False`. Make sure you correctly map database
column nullability properties to relevant object fields.
Database session activation
---------------------------
By default, all objects use old ``oslo.db`` engine facade. To enable the new
facade for a particular object, set ``new_facade`` class attribute to ``True``:
.. code-block:: Python
@obj_base.VersionedObjectRegistry.register
class ExampleObject(base.NeutronDbObject):
new_facade = True
It will make all OVO actions - ``get_object``, ``update``, ``count`` etc. - to
use new ``reader.using`` or ``writer.using`` decorators to manage database
transactions.
Whenever you need to open a new subtransaction in scope of OVO code, use the
following database session decorators:
.. code-block:: Python
@obj_base.VersionedObjectRegistry.register
class ExampleObject(base.NeutronDbObject):
@classmethod
def get_object(cls, context, **kwargs):
with cls.db_context_reader(context):
super(ExampleObject, cls).get_object(context, **kwargs)
# fetch more data in the same transaction
def create(self):
with self.db_context_writer(self.obj_context):
super(ExampleObject, self).create()
# apply more changes in the same transaction
``db_context_reader`` and ``db_context_writer`` decorators abstract the choice
of engine facade used for particular object from action implementation.
Alternatively, you can call all OVO actions under an active ``reader.using`` /
``writer.using`` context manager (or ``session.begin``). In this case, OVO will
pick the appropriate method to open a subtransaction.
Synthetic fields
----------------
:code:`synthetic_fields` is a list of fields, that are not directly backed by

View File

@ -31,6 +31,14 @@ def _noop_context_manager():
yield
def context_if_transaction(context, transaction, writer=True):
if transaction:
return (db_api.CONTEXT_WRITER.using(context) if writer else
db_api.CONTEXT_READER.using(context))
else:
return _noop_context_manager()
def safe_creation(context, create_fn, delete_fn, create_bindings,
transaction=True):
'''This function wraps logic of object creation in safe atomic way.
@ -55,12 +63,11 @@ def safe_creation(context, create_fn, delete_fn, create_bindings,
:param transaction: if true the whole operation will be wrapped in a
transaction. if false, no transaction will be used.
'''
cm = (db_api.CONTEXT_WRITER.using(context)
if transaction else _noop_context_manager())
with cm:
with context_if_transaction(context, transaction):
obj = create_fn()
try:
value = create_bindings(obj['id'])
updated_obj, value = create_bindings(obj['id'])
obj = updated_obj or obj
except Exception:
with excutils.save_and_reraise_exception():
try:

View File

@ -192,6 +192,7 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
'device_owner': [DEVICE_OWNER_FLOATINGIP]}
return {p['id'] for p in self._core_plugin.get_ports(context, filters)}
@db_api.CONTEXT_READER
def _get_router(self, context, router_id):
try:
router = model_query.get_by_id(
@ -227,7 +228,7 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
router['tenant_id'] = tenant_id
registry.notify(resources.ROUTER, events.BEFORE_CREATE,
self, context=context, router=router)
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
# pre-generate id so it will be available when
# configuring external gw port
router_db = l3_models.Router(
@ -245,10 +246,15 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
def _update_gw_for_create_router(self, context, gw_info, router_id):
if gw_info:
router_db = self._get_router(context, router_id)
with db_utils.context_if_transaction(
context, not context.session.is_active, writer=False):
router_db = self._get_router(context, router_id)
self._update_router_gw_info(context, router_id,
gw_info, router=router_db)
return self._get_router(context, router_id), None
return None, None
@db_api.retry_if_session_inactive()
def create_router(self, context, router):
r = router['router']
@ -288,9 +294,6 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
gw_info = r.pop(EXTERNAL_GW_INFO, constants.ATTR_NOT_SPECIFIED)
original = self.get_router(context, id)
if gw_info != constants.ATTR_NOT_SPECIFIED:
# Update the gateway outside of the DB update since it involves L2
# calls that don't make sense to rollback and may cause deadlocks
# in a transaction.
self._update_router_gw_info(context, id, gw_info)
router_db = self._update_router_db(context, id, r)
updated = self._make_router_dict(router_db)
@ -308,6 +311,13 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
'device_owner': DEVICE_OWNER_ROUTER_GW,
'admin_state_up': True,
'name': ''}
if context.session.is_active:
# TODO(ralonsoh): ML2 plugin "create_port" should be called outside
# a DB transaction. In this case an exception is made but in order
# to prevent future errors, this call should be moved outside
# the current transaction.
context.GUARD_TRANSACTION = False
gw_port = plugin_utils.create_port(
self._core_plugin, context.elevated(), {'port': port_data})
@ -316,7 +326,8 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
network_id)
with plugin_utils.delete_port_on_error(
self._core_plugin, context.elevated(), gw_port['id']):
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
router = self._get_router(context, router['id'])
router.gw_port = self._core_plugin._get_port(
context.elevated(), gw_port['id'])
router_port = l3_obj.RouterPort(
@ -325,7 +336,6 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
port_id=gw_port['id'],
port_type=DEVICE_OWNER_ROUTER_GW
)
context.session.add(router)
router_port.create()
def _validate_gw_info(self, context, gw_port, info, ext_ips):
@ -371,10 +381,14 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
gw_ips = [x['ip_address'] for x in router.gw_port['fixed_ips']]
gw_port_id = router.gw_port['id']
self._delete_router_gw_port_db(context, router)
if admin_ctx.session.is_active:
# TODO(ralonsoh): ML2 plugin "delete_port" should be called outside
# a DB transaction. In this case an exception is made but in order
# to prevent future errors, this call should be moved outside
# the current transaction.
admin_ctx.GUARD_TRANSACTION = False
self._core_plugin.delete_port(
admin_ctx, gw_port_id, l3_port_check=False)
with context.session.begin(subtransactions=True):
context.session.refresh(router)
# TODO(boden): normalize metadata
metadata = {'network_id': old_network_id,
'new_network_id': new_network_id,
@ -387,7 +401,7 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
resource_id=router_id))
def _delete_router_gw_port_db(self, context, router):
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
router.gw_port = None
if router not in context.session:
context.session.add(router)
@ -405,10 +419,12 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
def _create_gw_port(self, context, router_id, router, new_network_id,
ext_ips):
new_valid_gw_port_attachment = (
new_network_id and
(not router.gw_port or
router.gw_port['network_id'] != new_network_id))
with db_api.CONTEXT_READER.using(context):
router = self._get_router(context, router_id)
new_valid_gw_port_attachment = (
new_network_id and
(not router.gw_port or
router.gw_port['network_id'] != new_network_id))
if new_valid_gw_port_attachment:
subnets = self._core_plugin.get_subnets_by_network(context,
new_network_id)
@ -432,7 +448,9 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
self._create_router_gw_port(context, router,
new_network_id, ext_ips)
gw_ips = [x['ip_address'] for x in router.gw_port['fixed_ips']]
with db_api.CONTEXT_READER.using(context):
router = self._get_router(context, router_id)
gw_ips = [x['ip_address'] for x in router.gw_port['fixed_ips']]
registry.publish(resources.ROUTER_GATEWAY,
events.AFTER_CREATE,
@ -446,12 +464,8 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
def _update_current_gw_port(self, context, router_id, router, ext_ips):
self._core_plugin.update_port(context.elevated(), router.gw_port['id'],
{'port': {'fixed_ips': ext_ips}})
context.session.expire(router.gw_port)
def _update_router_gw_info(self, context, router_id, info, router=None):
# TODO(salvatore-orlando): guarantee atomic behavior also across
# operations that span beyond the model classes handled by this
# class (e.g.: delete_port)
router = router or self._get_router(context, router_id)
gw_port = router.gw_port
ext_ips = info.get('external_fixed_ips') if info else []
@ -504,27 +518,30 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
payload=events.DBEventPayload(
context, resource_id=id))
# TODO(nati) Refactor here when we have router insertion model
router = self._ensure_router_not_in_use(context, id)
original = self._make_router_dict(router)
self._delete_current_gw_port(context, id, router, None)
with context.session.begin(subtransactions=True):
context.session.refresh(router)
with db_api.CONTEXT_WRITER.using(context):
router = self._ensure_router_not_in_use(context, id)
original = self._make_router_dict(router)
self._delete_current_gw_port(context, id, router, None)
router_ports = router.attached_ports
for rp in router_ports:
self._core_plugin.delete_port(context.elevated(),
rp.port.id,
l3_port_check=False)
with context.session.begin(subtransactions=True):
context.session.refresh(router)
# TODO(ralonsoh): move this section (port deletion) out of the DB
# transaction.
router_ports_ids = (rp.port.id for rp in router.attached_ports)
if context.session.is_active:
context.GUARD_TRANSACTION = False
for rp_id in router_ports_ids:
self._core_plugin.delete_port(context.elevated(), rp_id,
l3_port_check=False)
router = self._get_router(context, id)
registry.notify(resources.ROUTER, events.PRECOMMIT_DELETE,
self, context=context, router_db=router,
router_id=id)
# we bump the revision even though we are about to delete to throw
# staledataerror if something snuck in with a new interface
# staledataerror if something stuck in with a new interface
router.bump_revision()
context.session.flush()
context.session.delete(router)
registry.notify(resources.ROUTER, events.AFTER_DELETE, self,
context=context, router_id=id, original=original)
@ -667,7 +684,7 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
raise n_exc.BadRequest(resource='router', msg=msg)
def _validate_router_port_info(self, context, router, port_id):
with db_api.autonested_transaction(context.session):
with db_api.CONTEXT_READER.using(context):
# check again within transaction to mitigate race
port = self._check_router_port(context, port_id, router.id)
@ -873,8 +890,6 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
new_interface=new_router_intf,
interface_info=interface_info)
with context.session.begin(subtransactions=True):
context.session.refresh(router)
return self._make_router_interface_info(
router.id, port['tenant_id'], port['id'], port['network_id'],
subnets[-1]['id'], [subnet['id'] for subnet in subnets])
@ -1018,8 +1033,6 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
port=port,
router_id=router_id,
interface_info=interface_info)
with context.session.begin(subtransactions=True):
context.session.refresh(router)
return self._make_router_interface_info(router_id, port['tenant_id'],
port['id'], port['network_id'],
subnets[0]['id'],
@ -1321,7 +1334,7 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
with plugin_utils.delete_port_on_error(
self._core_plugin, context.elevated(),
external_port['id']),\
context.session.begin(subtransactions=True):
db_api.CONTEXT_WRITER.using(context):
# Ensure IPv4 addresses are allocated on external port
external_ipv4_ips = self._port_ipv4_fixed_ips(external_port)
if not external_ipv4_ips:
@ -1396,7 +1409,7 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
raise e.errors[0].error
fip = floatingip['floatingip']
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
floatingip_obj = self._get_floatingip(context, id)
old_floatingip = self._make_floatingip_dict(floatingip_obj)
old_fixed_port_id = floatingip_obj.fixed_port_id
@ -1591,7 +1604,7 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
This parameter is ignored.
@return: set of router-ids that require notification updates
"""
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
floating_ip_objs = l3_obj.FloatingIP.get_objects(
context, fixed_port_id=port_id)
router_ids = {fip.router_id for fip in floating_ip_objs}
@ -1831,7 +1844,7 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
def _get_router_info_list(self, context, router_ids=None, active=None,
device_owners=None):
"""Query routers and their related floating_ips, interfaces."""
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
routers = self._get_sync_routers(context,
router_ids=router_ids,
active=active)

View File

@ -18,6 +18,7 @@
from neutron_lib.api.definitions import l3 as l3_apidef
from neutron_lib.api.definitions import qos_gateway_ip
from neutron_lib.api import extensions
from neutron_lib.db import api as db_api
from neutron_lib.db import resource_extend
from neutron_lib.services.qos import constants as qos_consts
from oslo_log import log as logging
@ -65,9 +66,10 @@ class L3_gw_ip_qos_dbonly_mixin(l3_gwmode_db.L3_NAT_dbonly_mixin):
self)._update_router_gw_info(
context, router_id, info, router)
if self._is_gw_ip_qos_supported and router.gw_port:
self._update_router_gw_qos_policy(context, router_id,
info, router)
with db_api.CONTEXT_WRITER.using(context):
if self._is_gw_ip_qos_supported and router.gw_port:
self._update_router_gw_qos_policy(context, router_id,
info, router)
return router
@ -94,9 +96,6 @@ class L3_gw_ip_qos_dbonly_mixin(l3_gwmode_db.L3_NAT_dbonly_mixin):
router_id,
old_qos_policy_id)
with context.session.begin(subtransactions=True):
context.session.refresh(router)
if new_qos_policy_id:
self._create_gw_ip_qos_db(
context, router_id, new_qos_policy_id)

View File

@ -17,6 +17,7 @@ from neutron_lib.api.definitions import l3 as l3_apidef
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib.db import api as db_api
from neutron_lib.db import resource_extend
from oslo_config import cfg
import sqlalchemy as sa
@ -56,10 +57,9 @@ class L3_NAT_dbonly_mixin(l3_db.L3_NAT_dbonly_mixin):
})
def _update_router_gw_info(self, context, router_id, info, router=None):
# Load the router only if necessary
if not router:
with db_api.CONTEXT_WRITER.using(context):
# Always load the router inside the DB context.
router = self._get_router(context, router_id)
with context.session.begin(subtransactions=True):
old_router = self._make_router_dict(router)
router.enable_snat = self._get_enable_snat(info)
router_body = {l3_apidef.ROUTER:
@ -75,7 +75,7 @@ class L3_NAT_dbonly_mixin(l3_db.L3_NAT_dbonly_mixin):
context, router_id, info, router=router)
# Returning the router might come back useful if this
# method is overridden in child classes
return router
return self._get_router(context, router_id)
@staticmethod
def _get_enable_snat(info):

View File

@ -143,7 +143,10 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
context, network_id=network_id, vr_id=vr_id)
allocation.create()
router_db.extra_attributes.ha_vr_id = allocation.vr_id
router_db.extra_attributes.update(
{'ha_vr_id': allocation.vr_id})
context.session.add(router_db.extra_attributes)
LOG.debug(
"Router %(router_id)s has been allocated a ha_vr_id "
"%(ha_vr_id)d.",
@ -200,7 +203,7 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
# and the process is started over where the existing
# network will be selected.
raise db_exc.DBDuplicateEntry(columns=['tenant_id'])
return ha_network
return None, ha_network
def _add_ha_network_settings(self, network):
if cfg.CONF.l3_ha_network_type:
@ -267,7 +270,7 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
context, port_id=port_id, router_id=router_id)
portbinding.create()
return portbinding
return None, portbinding
except db_exc.DBReferenceError as e:
with excutils.save_and_reraise_exception() as ctxt:
if isinstance(e.inner_exception, sql_exc.IntegrityError):

View File

@ -104,7 +104,7 @@ def create_initial_revision(context, resource_uuid, resource_type,
LOG.debug('create_initial_revision uuid=%s, type=%s, rev=%s',
resource_uuid, resource_type, revision_number)
db_func = context.session.merge if may_exist else context.session.add
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
std_attr_id = std_attr_id or _get_standard_attr_id(
context, resource_uuid, resource_type)
row = ovn_models.OVNRevisionNumbers(
@ -116,7 +116,7 @@ def create_initial_revision(context, resource_uuid, resource_type,
@db_api.retry_if_session_inactive()
def delete_revision(context, resource_uuid, resource_type):
LOG.debug('delete_revision(%s)', resource_uuid)
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
row = context.session.query(ovn_models.OVNRevisionNumbers).filter_by(
resource_uuid=resource_uuid,
resource_type=resource_type).one_or_none()
@ -136,7 +136,7 @@ def _ensure_revision_row_exist(context, resource, resource_type, std_attr_id):
# deal with objects that already existed before the sync work. I believe
# that we can remove this method after few development cycles. Or,
# if we decide to make a migration script as well.
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(context):
if not context.session.query(ovn_models.OVNRevisionNumbers).filter_by(
resource_uuid=resource['id'],
resource_type=resource_type).one_or_none():
@ -152,7 +152,7 @@ def _ensure_revision_row_exist(context, resource, resource_type, std_attr_id):
@db_api.retry_if_session_inactive()
def get_revision_row(context, resource_uuid):
try:
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(context):
return context.session.query(
ovn_models.OVNRevisionNumbers).filter_by(
resource_uuid=resource_uuid).one()
@ -163,7 +163,7 @@ def get_revision_row(context, resource_uuid):
@db_api.retry_if_session_inactive()
def bump_revision(context, resource, resource_type):
revision_number = ovn_utils.get_revision_number(resource, resource_type)
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
# NOTE(ralonsoh): "resource" could be a dict or an OVO.
try:
std_attr_id = resource.db_obj.standard_attr.id
@ -203,7 +203,7 @@ def get_inconsistent_resources(context):
whens=MAINTENANCE_CREATE_UPDATE_TYPE_ORDER)
time_ = (timeutils.utcnow() -
datetime.timedelta(seconds=INCONSISTENCIES_OLDER_THAN))
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(context):
query = context.session.query(ovn_models.OVNRevisionNumbers).join(
standard_attr.StandardAttribute,
ovn_models.OVNRevisionNumbers.standard_attr_id ==
@ -232,6 +232,6 @@ def get_deleted_resources(context):
"""
sort_order = sa.case(value=ovn_models.OVNRevisionNumbers.resource_type,
whens=MAINTENANCE_DELETE_TYPE_ORDER)
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(context):
return context.session.query(ovn_models.OVNRevisionNumbers).filter_by(
standard_attr_id=None).order_by(sort_order).all()

View File

@ -109,18 +109,18 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase,
return self.get_security_group(context, existing_def_sg_id)
with db_api.CONTEXT_WRITER.using(context):
sg = sg_obj.SecurityGroup(
context, id=s.get('id') or uuidutils.generate_uuid(),
description=s['description'], project_id=tenant_id,
name=s['name'], is_default=default_sg, stateful=stateful)
sg.create()
delta = len(ext_sg.sg_supported_ethertypes)
delta = delta * 2 if default_sg else delta
reservation = quota.QUOTAS.make_reservation(
context, tenant_id, {'security_group_rule': delta},
self)
sg = sg_obj.SecurityGroup(
context, id=s.get('id') or uuidutils.generate_uuid(),
description=s['description'], project_id=tenant_id,
name=s['name'], is_default=default_sg, stateful=stateful)
sg.create()
for ethertype in ext_sg.sg_supported_ethertypes:
if default_sg:
# Allow intercommunication
@ -739,6 +739,17 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase,
raise ext_sg.SecurityGroupRuleParameterConflict(
ethertype=rule['ethertype'], cidr=input_prefix)
@db_api.retry_if_session_inactive()
def get_security_group_rules_count(self, context, filters=None):
filters = filters if filters else {}
if not filters and context.project_id and not context.is_admin:
rule_ids = sg_obj.SecurityGroupRule.get_security_group_rule_ids(
context.project_id)
filters = {'id': rule_ids}
return sg_obj.SecurityGroupRule.count(context_lib.get_admin_context(),
**filters)
@db_api.retry_if_session_inactive()
def get_security_group_rules(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,

View File

@ -405,9 +405,6 @@ class NeutronDbObject(NeutronObject, metaclass=DeclarativeObject):
# should be overridden for all rbac aware objects
rbac_db_cls = None
# whether to use new engine facade for the object
new_facade = False
primary_keys = ['id']
# 'unique_keys' is a list of unique keys that can be used with get_object
@ -571,16 +568,12 @@ class NeutronDbObject(NeutronObject, metaclass=DeclarativeObject):
@classmethod
def db_context_writer(cls, context):
"""Return read-write session activation decorator."""
if cls.new_facade or cls._use_db_facade(context):
return db_api.CONTEXT_WRITER.using(context)
return db_api.autonested_transaction(context.session)
return db_api.CONTEXT_WRITER.using(context)
@classmethod
def db_context_reader(cls, context):
"""Return read-only session activation decorator."""
if cls.new_facade or cls._use_db_facade(context):
return db_api.CONTEXT_READER.using(context)
return db_api.autonested_transaction(context.session)
return db_api.CONTEXT_READER.using(context)
@classmethod
def get_object(cls, context, fields=None, **kwargs):

View File

@ -37,7 +37,7 @@ class L3HARouterAgentPortBinding(base.NeutronDbObject):
}
primary_keys = ['port_id']
fields_no_update = ['router_id', 'port_id', 'l3_agent_id']
fields_no_update = ['router_id', 'port_id']
@classmethod
def get_l3ha_filter_host_router(cls, context, router_ids, host):

View File

@ -363,7 +363,6 @@ class DvrFipGatewayPortAgentBinding(base.NeutronDbObject):
VERSION = '1.0'
db_model = dvr_models.DvrFipGatewayPortAgentBinding
new_facade = True
primary_keys = ['network_id', 'agent_id']

View File

@ -24,8 +24,6 @@ class StandardAttribute(base.NeutronDbObject):
# Version 1.0: Initial version
VERSION = '1.0'
new_facade = True
db_model = standard_attr.StandardAttribute
fields = {

View File

@ -69,8 +69,6 @@ class Route(base.NeutronDbObject):
# Version 1.0: Initial version
VERSION = '1.0'
new_facade = True
db_model = models_v2.SubnetRoute
primary_keys = ['destination', 'nexthop', 'subnet_id']
@ -196,7 +194,6 @@ class Subnet(base.NeutronDbObject):
VERSION = '1.1'
db_model = models_v2.Subnet
new_facade = True
fields = {
'id': common_types.UUIDField(),
@ -500,7 +497,6 @@ class NetworkSubnetLock(base.NeutronDbObject):
VERSION = '1.0'
db_model = models_v2.NetworkSubnetLock
new_facade = True
primary_keys = ['network_id']
fields = {

View File

@ -30,7 +30,6 @@ class SubPort(base.NeutronDbObject):
VERSION = '1.0'
db_model = models.SubPort
new_facade = True
primary_keys = ['port_id']
foreign_keys = {'Trunk': {'trunk_id': 'id'}}
@ -89,7 +88,6 @@ class Trunk(base.NeutronDbObject):
VERSION = '1.1'
db_model = models.Trunk
new_facade = True
fields = {
'admin_state_up': obj_fields.BooleanField(),

View File

@ -30,6 +30,7 @@ from oslo_log import log as logging
from neutron.common import utils
from neutron.conf.db import l3_hamode_db
from neutron.db.models import l3agent as rb_model
from neutron.objects import l3_hamode as l3_hamode_obj
from neutron.objects import l3agent as rb_obj
@ -284,10 +285,12 @@ class L3Scheduler(object, metaclass=abc.ABCMeta):
port_binding = utils.create_object_with_dependency(
creator, dep_getter, dep_creator,
dep_id_attr, dep_deleter)[0]
# NOTE(ralonsoh): to be migrated to the new facade that can't be
# used with "create_object_with_dependency".
with lib_db_api.autonested_transaction(context.session):
with lib_db_api.CONTEXT_WRITER.using(context):
port_binding = (
l3_hamode_obj.L3HARouterAgentPortBinding.get_object(
context, port_id=port_binding['port_id']))
port_binding.l3_agent_id = agent['id']
port_binding.update()
except db_exc.DBDuplicateEntry:
LOG.debug("Router %(router)s already scheduled for agent "
"%(agent)s", {'router': router_id,

View File

@ -382,13 +382,15 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
raise lib_exc.BadRequest(resource=apidef.RESOURCE_NAME,
msg=message)
if self._rpc_notifications_required:
self.push_api.push(context, [pf_obj], rpc_events.CREATED)
registry.notify(pf_consts.PORT_FORWARDING, events.AFTER_CREATE,
self,
payload=[callbacks.PortForwardingPayload(context,
current_pf=pf_obj)])
return pf_obj
registry.notify(pf_consts.PORT_FORWARDING, events.AFTER_CREATE,
self,
payload=[callbacks.PortForwardingPayload(context,
current_pf=pf_obj)])
if self._rpc_notifications_required:
self.push_api.push(context, [pf_obj], rpc_events.CREATED)
return pf_obj
@db_base_plugin_common.convert_result_to_dict
def update_floatingip_port_forwarding(self, context, id, floatingip_id,

View File

@ -546,7 +546,7 @@ class QoSPlugin(qos.QoSPluginBase):
return rule
def _get_policy_id(self, context, rule_cls, rule_id):
with db_api.autonested_transaction(context.session):
with db_api.CONTEXT_READER.using(context):
rule_object = rule_cls.get_object(context, id=rule_id)
if not rule_object:
raise qos_exc.QosRuleNotFound(policy_id="", rule_id=rule_id)

View File

@ -226,6 +226,9 @@ class TestDhcpAgentHARaceCondition(BaseDhcpAgentTest):
self._simulate_concurrent_requests_process_and_raise(funcs, args)
def test_dhcp_agent_ha_with_race_condition(self):
# NOTE(ralonsoh): the concurrent creation in the same thread could
# fail because the context and the session is the same for all DB
# calls.
network_dhcp_agents = self.client.list_dhcp_agent_hosting_networks(
self.network['id'])['agents']
self.assertEqual(1, len(network_dhcp_agents))

View File

@ -14,7 +14,6 @@ from unittest import mock
from neutron_lib.api.definitions import fip_pf_description as ext_apidef
from neutron_lib.api.definitions import floating_ip_port_forwarding as apidef
from neutron_lib.callbacks import exceptions as c_exc
from neutron_lib import exceptions as lib_exc
from neutron_lib.exceptions import l3 as lib_l3_exc
from neutron_lib.plugins import constants as plugin_constants
@ -387,70 +386,6 @@ class PortForwardingTestCase(PortForwardingTestCaseBase):
self.pf_plugin.delete_floatingip_port_forwarding,
self.context, res['id'], uuidutils.generate_uuid())
def test_concurrent_create_port_forwarding_delete_fip(self):
func1 = self.pf_plugin.create_floatingip_port_forwarding
func2 = self._delete_floatingip
funcs = [func1, func2]
args_list = [(self.context, self.fip['id'], self.port_forwarding),
(self.fip['id'],)]
self.assertRaises(c_exc.CallbackFailure,
self._simulate_concurrent_requests_process_and_raise,
funcs, args_list)
port_forwardings = self.pf_plugin.get_floatingip_port_forwardings(
self.context, floatingip_id=self.fip['id'], fields=['id'])
self.pf_plugin.delete_floatingip_port_forwarding(
self.context, port_forwardings[0][apidef.ID],
floatingip_id=self.fip['id'])
funcs.reverse()
args_list.reverse()
self.assertRaises(lib_l3_exc.FloatingIPNotFound,
self._simulate_concurrent_requests_process_and_raise,
funcs, args_list)
def test_concurrent_create_port_forwarding_update_fip(self):
newport = self._create_port(self.fmt, self.net['id']).json['port']
func1 = self.pf_plugin.create_floatingip_port_forwarding
func2 = self._update_floatingip
funcs = [func1, func2]
args_list = [(self.context, self.fip['id'], self.port_forwarding),
(self.fip['id'], {'port_id': newport['id']})]
self.assertRaises(c_exc.CallbackFailure,
self._simulate_concurrent_requests_process_and_raise,
funcs, args_list)
funcs.reverse()
args_list.reverse()
self.assertRaises(c_exc.CallbackFailure,
self._simulate_concurrent_requests_process_and_raise,
funcs, args_list)
def test_concurrent_create_port_forwarding_update_port(self):
new_ip = self._find_ip_address(
self.subnet,
exclude=self._get_network_port_ips(),
is_random=True)
funcs = [self.pf_plugin.create_floatingip_port_forwarding,
self._update_port]
args_list = [(self.context, self.fip['id'], self.port_forwarding),
(self.port['id'], {
'fixed_ips': [{'subnet_id': self.subnet['id'],
'ip_address': new_ip}]})]
self._simulate_concurrent_requests_process_and_raise(funcs, args_list)
self.assertEqual([], self.pf_plugin.get_floatingip_port_forwardings(
self.context, floatingip_id=self.fip['id']))
def test_concurrent_create_port_forwarding_delete_port(self):
funcs = [self.pf_plugin.create_floatingip_port_forwarding,
self._delete_port]
args_list = [(self.context, self.fip['id'], self.port_forwarding),
(self.port['id'],)]
self._simulate_concurrent_requests_process_and_raise(funcs, args_list)
self.assertEqual([], self.pf_plugin.get_floatingip_port_forwardings(
self.context, floatingip_id=self.fip['id']))
def test_create_floatingip_port_forwarding_port_in_use(self):
res = self.pf_plugin.create_floatingip_port_forwarding(
self.context, self.fip['id'], self.port_forwarding)

View File

@ -21,6 +21,7 @@ from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants as n_const
from neutron_lib import context
from neutron_lib.db import api as db_api
from neutron_lib import exceptions as n_exc
from neutron_lib.plugins import constants as plugin_constants
from neutron_lib.plugins import directory
@ -39,9 +40,11 @@ from neutron.tests import base
from neutron.tests.unit.db import test_db_base_plugin_v2
class TestL3_NAT_dbonly_mixin(base.BaseTestCase):
def setUp(self):
super(TestL3_NAT_dbonly_mixin, self).setUp()
class TestL3_NAT_dbonly_mixin(
test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
def setUp(self, *args, **kwargs):
super(TestL3_NAT_dbonly_mixin, self).setUp(*args, **kwargs)
self.db = l3_db.L3_NAT_dbonly_mixin()
def test__each_port_having_fixed_ips_none(self):
@ -289,6 +292,9 @@ class TestL3_NAT_dbonly_mixin(base.BaseTestCase):
**kwargs)
def test__create_gw_port(self):
# NOTE(slaweq): this test is probably wrong
# returing dict as gw_port breaks test later in L334 in
# neutron.db.l3_db file
router_id = '2afb8434-7380-43a2-913f-ba3a5ad5f349'
router = l3_models.Router(id=router_id)
new_network_id = 'net-id'
@ -298,37 +304,42 @@ class TestL3_NAT_dbonly_mixin(base.BaseTestCase):
'id': '8742d007-6f05-4b7e-abdb-11818f608959'}
ctx = context.get_admin_context()
with mock.patch.object(directory, 'get_plugin') as get_p, \
mock.patch.object(get_p(), 'get_subnets_by_network',
return_value=mock.ANY), \
mock.patch.object(get_p(), '_get_port',
return_value=gw_port), \
mock.patch.object(l3_db.L3_NAT_dbonly_mixin,
'_check_for_dup_router_subnets') as cfdrs,\
mock.patch.object(plugin_utils, 'create_port',
return_value=gw_port), \
mock.patch.object(ctx.session, 'add'), \
mock.patch.object(base_obj.NeutronDbObject, 'create'), \
mock.patch.object(l3_db.registry, 'publish') as mock_notify:
with db_api.CONTEXT_WRITER.using(ctx):
with mock.patch.object(directory, 'get_plugin') as get_p, \
mock.patch.object(get_p(), 'get_subnets_by_network',
return_value=mock.ANY), \
mock.patch.object(get_p(), '_get_port',
return_value=gw_port), \
mock.patch.object(l3_db.L3_NAT_dbonly_mixin,
'_check_for_dup_router_subnets') as \
cfdrs, \
mock.patch.object(plugin_utils, 'create_port',
return_value=gw_port), \
mock.patch.object(ctx.session, 'add'), \
mock.patch.object(base_obj.NeutronDbObject, 'create'), \
mock.patch.object(l3_db.registry, 'publish') as \
mock_notify, \
mock.patch.object(l3_db.L3_NAT_dbonly_mixin, '_get_router',
return_value=router):
self.db._create_gw_port(ctx, router_id=router_id,
router=router,
new_network_id=new_network_id,
ext_ips=ext_ips)
self.db._create_gw_port(ctx, router_id=router_id,
router=router,
new_network_id=new_network_id,
ext_ips=ext_ips)
expected_gw_ips = ['1.1.1.1']
expected_gw_ips = ['1.1.1.1']
self.assertTrue(cfdrs.called)
mock_notify.assert_called_with(
resources.ROUTER_GATEWAY, events.AFTER_CREATE,
self.db._create_gw_port, payload=mock.ANY)
cb_payload = mock_notify.mock_calls[1][2]['payload']
self.assertEqual(ctx, cb_payload.context)
self.assertEqual(expected_gw_ips,
cb_payload.metadata.get('gateway_ips'))
self.assertEqual(new_network_id,
cb_payload.metadata.get('network_id'))
self.assertEqual(router_id, cb_payload.resource_id)
self.assertTrue(cfdrs.called)
mock_notify.assert_called_with(
resources.ROUTER_GATEWAY, events.AFTER_CREATE,
self.db._create_gw_port, payload=mock.ANY)
cb_payload = mock_notify.mock_calls[1][2]['payload']
self.assertEqual(ctx, cb_payload.context)
self.assertEqual(expected_gw_ips,
cb_payload.metadata.get('gateway_ips'))
self.assertEqual(new_network_id,
cb_payload.metadata.get('network_id'))
self.assertEqual(router_id, cb_payload.resource_id)
class L3_NAT_db_mixin(base.BaseTestCase):
@ -428,20 +439,20 @@ class L3TestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
l3_obj.Router.get_object(self.ctx, id=self.router['id']).delete()
def create_router(self, router):
with self.ctx.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(self.ctx):
return self.mixin.create_router(self.ctx, router)
def create_port(self, net_id, port_info):
with self.ctx.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(self.ctx):
return self._make_port(self.fmt, net_id, **port_info)
def create_network(self, name=None, **kwargs):
name = name or 'network1'
with self.ctx.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(self.ctx):
return self._make_network(self.fmt, name, True, **kwargs)
def create_subnet(self, network, gateway, cidr, **kwargs):
with self.ctx.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(self.ctx):
return self._make_subnet(self.fmt, network, gateway, cidr,
**kwargs)

View File

@ -21,6 +21,7 @@ from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants as const
from neutron_lib import context
from neutron_lib.db import api as db_api
from neutron_lib import exceptions
from neutron_lib.exceptions import l3 as l3_exc
from neutron_lib.objects import exceptions as o_exc
@ -60,11 +61,11 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
directory.add_plugin(plugin_constants.L3, self.mixin)
def _create_router(self, router):
with self.ctx.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(self.ctx):
return self.mixin._create_router_db(self.ctx, router, 'foo_tenant')
def create_port(self, net_id, port_info):
with self.ctx.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(self.ctx):
return self._create_port(self.fmt, net_id, **port_info)
def _test__create_router_db(self, expected=False, distributed=None):
@ -463,20 +464,20 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
'admin_state_up': True,
'distributed': True
}
router = self._create_router(router_db)
if gw_port:
with self.subnet(cidr='10.10.10.0/24') as subnet:
port_dict = {
'device_id': router.id,
'device_owner': const.DEVICE_OWNER_ROUTER_GW,
'admin_state_up': True,
'fixed_ips': [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.10.10.100'}]
}
net_id = subnet['subnet']['network_id']
port_res = self.create_port(net_id, port_dict)
port_res_dict = self.deserialize(self.fmt, port_res)
with self.ctx.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(self.ctx):
router = self._create_router(router_db)
if gw_port:
with self.subnet(cidr='10.10.10.0/24') as subnet:
port_dict = {
'device_id': router.id,
'device_owner': const.DEVICE_OWNER_ROUTER_GW,
'admin_state_up': True,
'fixed_ips': [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.10.10.100'}]
}
net_id = subnet['subnet']['network_id']
port_res = self.create_port(net_id, port_dict)
port_res_dict = self.deserialize(self.fmt, port_res)
port_db = self.ctx.session.query(models_v2.Port).filter_by(
id=port_res_dict['port']['id']).one()
router.gw_port = port_db
@ -487,9 +488,8 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
)
self.ctx.session.add(router)
self.ctx.session.add(router_port)
else:
net_id = None
else:
net_id = None
plugin = mock.Mock()
directory.add_plugin(plugin_constants.CORE, plugin)
@ -1132,6 +1132,10 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
mock_notify.assert_called_once_with(
'router', 'before_update', self.mixin, **kwargs)
def _assert_mock_called_with_router(self, mock_fn, router_id):
router = mock_fn.call_args[1].get('router_db')
self.assertEqual(router_id, router.id)
def test__validate_router_migration_notify_advanced_services_mocked(self):
# call test with admin_state_down_before_update ENABLED
self._test__validate_router_migration_notify_advanced_services()
@ -1152,9 +1156,16 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
interface_info = {'subnet_id': sub['subnet']['id']}
self.mixin.add_router_interface(self.ctx, router_db.id,
interface_info)
mock_notify.assert_called_once_with(self.ctx, router_db=router_db,
# NOTE(slaweq): here we are just checking if mock_notify was called
# with kwargs which we are expecting, but we can't check exactly if
# router_db was object which we are expecting and because of that
# below we are checking if router_db used as argument in
# mock_notify call is the has same id as the one which we are
# expecting
mock_notify.assert_called_once_with(self.ctx, router_db=mock.ANY,
port=mock.ANY,
interface_info=interface_info)
self._assert_mock_called_with_router(mock_notify, router_db.id)
def test_validate_add_router_interface_by_port_notify_advanced_services(
self):
@ -1169,9 +1180,16 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
interface_info = {'port_id': port['port']['id']}
self.mixin.add_router_interface(self.ctx, router_db.id,
interface_info)
mock_notify.assert_called_once_with(self.ctx, router_db=router_db,
# NOTE(slaweq): here we are just checking if mock_notify was called
# with kwargs which we are expecting, but we can't check exactly if
# router_db was object which we are expecting and because of that
# below we are checking if router_db used as argument in
# mock_notify call is the has same id as the one which we are
# expecting.
mock_notify.assert_called_once_with(self.ctx, router_db=mock.ANY,
port=mock.ANY,
interface_info=interface_info)
self._assert_mock_called_with_router(mock_notify, router_db.id)
def test_add_router_interface_csnat_ports_failure(self):
router_dict = {'name': 'test_router', 'admin_state_up': True,

View File

@ -54,57 +54,72 @@ class TestRevisionNumber(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
def _create_initial_revision(self, resource_uuid, resource_type,
revision_number=ovn_rn_db.INITIAL_REV_NUM,
may_exist=False):
with self.ctx.session.begin(subtransactions=True):
ovn_rn_db.create_initial_revision(
self.ctx, resource_uuid, resource_type,
revision_number=revision_number, may_exist=may_exist)
ovn_rn_db.create_initial_revision(
self.ctx, resource_uuid, resource_type,
revision_number=revision_number, may_exist=may_exist)
def test_bump_revision(self):
self._create_initial_revision(self.net['id'], ovn_rn_db.TYPE_NETWORKS)
self.net['revision_number'] = 123
ovn_rn_db.bump_revision(self.ctx, self.net,
ovn_rn_db.TYPE_NETWORKS)
row = ovn_rn_db.get_revision_row(self.ctx, self.net['id'])
self.assertEqual(123, row.revision_number)
with db_api.CONTEXT_WRITER.using(self.ctx):
self._create_initial_revision(self.net['id'],
ovn_rn_db.TYPE_NETWORKS)
self.net['revision_number'] = 123
ovn_rn_db.bump_revision(self.ctx, self.net,
ovn_rn_db.TYPE_NETWORKS)
row = ovn_rn_db.get_revision_row(self.ctx, self.net['id'])
self.assertEqual(123, row.revision_number)
def test_bump_older_revision(self):
self._create_initial_revision(self.net['id'], ovn_rn_db.TYPE_NETWORKS,
revision_number=124)
self.net['revision_number'] = 1
ovn_rn_db.bump_revision(self.ctx, self.net,
ovn_rn_db.TYPE_NETWORKS)
row = ovn_rn_db.get_revision_row(self.ctx, self.net['id'])
self.assertEqual(124, row.revision_number)
with db_api.CONTEXT_WRITER.using(self.ctx):
self._create_initial_revision(
self.net['id'], ovn_rn_db.TYPE_NETWORKS,
revision_number=124)
self.net['revision_number'] = 1
ovn_rn_db.bump_revision(self.ctx, self.net,
ovn_rn_db.TYPE_NETWORKS)
row = ovn_rn_db.get_revision_row(self.ctx, self.net['id'])
self.assertEqual(124, row.revision_number)
@mock.patch.object(ovn_rn_db.LOG, 'warning')
def test_bump_revision_row_not_found(self, mock_log):
self.net['revision_number'] = 123
ovn_rn_db.bump_revision(self.ctx, self.net, ovn_rn_db.TYPE_NETWORKS)
# Assert the revision number wasn't bumped
row = ovn_rn_db.get_revision_row(self.ctx, self.net['id'])
self.assertEqual(123, row.revision_number)
self.assertIn('No revision row found for', mock_log.call_args[0][0])
with db_api.CONTEXT_WRITER.using(self.ctx):
self.net['revision_number'] = 123
ovn_rn_db.bump_revision(self.ctx, self.net,
ovn_rn_db.TYPE_NETWORKS)
# Assert the revision number wasn't bumped
row = ovn_rn_db.get_revision_row(self.ctx, self.net['id'])
self.assertEqual(123, row.revision_number)
self.assertIn('No revision row found for',
mock_log.call_args[0][0])
def test_delete_revision(self):
self._create_initial_revision(self.net['id'], ovn_rn_db.TYPE_NETWORKS)
ovn_rn_db.delete_revision(self.ctx, self.net['id'],
ovn_rn_db.TYPE_NETWORKS)
row = ovn_rn_db.get_revision_row(self.ctx, self.net['id'])
self.assertIsNone(row)
with db_api.CONTEXT_WRITER.using(self.ctx):
self._create_initial_revision(self.net['id'],
ovn_rn_db.TYPE_NETWORKS)
ovn_rn_db.delete_revision(self.ctx, self.net['id'],
ovn_rn_db.TYPE_NETWORKS)
row = ovn_rn_db.get_revision_row(self.ctx, self.net['id'])
self.assertIsNone(row)
def test_create_initial_revision_may_exist_duplicated_entry(self):
args = (self.net['id'], ovn_rn_db.TYPE_NETWORKS)
self._create_initial_revision(*args)
# Assert DBDuplicateEntry is raised when may_exist is False (default)
self.assertRaises(db_exc.DBDuplicateEntry,
self._create_initial_revision, *args)
try:
self._create_initial_revision(*args, may_exist=True)
except db_exc.DBDuplicateEntry:
self.fail("create_initial_revision shouldn't raise "
"DBDuplicateEntry when may_exist is True")
with db_api.CONTEXT_WRITER.using(self.ctx):
args = (self.net['id'], ovn_rn_db.TYPE_NETWORKS)
self._create_initial_revision(*args)
# DBDuplicateEntry is raised when may_exist is False (default)
self._create_initial_revision(*args)
except Exception as exc:
if type(exc) is not db_exc.DBDuplicateEntry:
self.fail("create_initial_revision with the same parameters "
"should have raisen a DBDuplicateEntry exception")
with db_api.CONTEXT_WRITER.using(self.ctx):
args = (self.net['id'], ovn_rn_db.TYPE_NETWORKS)
self._create_initial_revision(*args)
try:
self._create_initial_revision(*args, may_exist=True)
except db_exc.DBDuplicateEntry:
self.fail("create_initial_revision shouldn't raise "
"DBDuplicateEntry when may_exist is True")
class TestMaintenancePlugin(test_securitygroup.SecurityGroupTestPlugin,
@ -149,7 +164,7 @@ class TestRevisionNumberMaintenance(test_securitygroup.SecurityGroupsTestCase,
def _create_initial_revision(self, resource_uuid, resource_type,
revision_number=ovn_rn_db.INITIAL_REV_NUM,
may_exist=False):
with self.ctx.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(self.ctx):
ovn_rn_db.create_initial_revision(
self.ctx, resource_uuid, resource_type,
revision_number=revision_number, may_exist=may_exist)

View File

@ -821,8 +821,8 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
def test_router_update_gateway(self):
with self.router() as r:
with self.subnet() as s1:
with self.subnet() as s2:
with self.subnet(cidr='10.51.0.0/24') as s1:
with self.subnet(cidr='10.52.0.0/24') as s2:
self._set_net_external(s1['subnet']['network_id'])
self._add_external_gateway_to_router(
r['router']['id'],

View File

@ -14,6 +14,7 @@
# limitations under the License.
import copy
from unittest import mock
from neutron_lib.api.definitions import port_security as psec
from neutron_lib.api import validators
@ -28,6 +29,7 @@ from neutron.db import db_base_plugin_v2
from neutron.db import portsecurity_db
from neutron.db import securitygroups_db
from neutron.extensions import securitygroup as ext_sg
from neutron import quota
from neutron.tests.unit.db import test_db_base_plugin_v2
from neutron.tests.unit.extensions import test_securitygroup
@ -65,9 +67,9 @@ class PortSecurityTestPlugin(db_base_plugin_v2.NeutronDbPluginV2,
supported_extension_aliases = ["security-group", psec.ALIAS]
def create_network(self, context, network):
tenant_id = network['network'].get('tenant_id')
self._ensure_default_security_group(context, tenant_id)
with db_api.CONTEXT_WRITER.using(context):
tenant_id = network['network'].get('tenant_id')
self._ensure_default_security_group(context, tenant_id)
neutron_db = super(PortSecurityTestPlugin, self).create_network(
context, network)
neutron_db.update(network['network'])
@ -177,6 +179,14 @@ class PortSecurityDBTestCase(PortSecurityTestCase):
class TestPortSecurity(PortSecurityDBTestCase):
def setUp(self, plugin=None, service_plugins=None):
super().setUp(plugin)
make_res = mock.patch.object(quota.QuotaEngine, 'make_reservation')
commit_res = mock.patch.object(quota.QuotaEngine, 'commit_reservation')
self.mock_quota_make_res = make_res.start()
self.mock_quota_commit_res = commit_res.start()
def test_create_network_with_portsecurity_mac(self):
res = self._create_network('json', 'net1', True)
net = self.deserialize('json', res)

View File

@ -99,56 +99,61 @@ class CRUDScenarioTestCase(testlib_api.SqlTestCase):
self.obj_cls = network.Network
self.ctxt = context.get_admin_context()
def _compare_objs(self, obj1, obj2):
for field in (field for field in self.obj_cls.fields if
field not in ('updated_at', 'created_at')):
self.assertEqual(getattr(obj1, field, None),
getattr(obj2, field, None))
def test_get_object_with_None_value_in_filters(self):
obj = api.create_object(self.obj_cls, self.ctxt, {'name': 'foo'})
new_obj = api.get_object(
self.obj_cls, self.ctxt, name='foo', status=None)
self.assertEqual(obj, new_obj)
self._compare_objs(obj, new_obj)
def test_get_objects_with_None_value_in_filters(self):
obj = api.create_object(self.obj_cls, self.ctxt, {'name': 'foo'})
new_objs = api.get_objects(
self.obj_cls, self.ctxt, name='foo', status=None)
self.assertEqual(obj, new_objs[0])
self._compare_objs(obj, new_objs[0])
def test_get_objects_with_string_matching_filters_contains(self):
obj1 = api.create_object(
self.obj_cls, self.ctxt, {'name': 'obj_con_1'})
obj2 = api.create_object(
self.obj_cls, self.ctxt, {'name': 'obj_con_2'})
obj3 = api.create_object(
self.obj_cls, self.ctxt, {'name': 'obj_3'})
api.create_object(self.obj_cls, self.ctxt, {'name': 'obj_3'})
objs = api.get_objects(
self.obj_cls, self.ctxt, name=obj_utils.StringContains('con'))
self.obj_cls, self.ctxt, name=obj_utils.StringContains('con'),
_pager=base.Pager(sorts=[('name', True)]))
self.assertEqual(2, len(objs))
self.assertIn(obj1, objs)
self.assertIn(obj2, objs)
self.assertNotIn(obj3, objs)
self._compare_objs(obj1, objs[0])
self._compare_objs(obj2, objs[1])
def test_get_objects_with_string_matching_filters_starts(self):
obj1 = api.create_object(self.obj_cls, self.ctxt, {'name': 'pre_obj1'})
obj2 = api.create_object(self.obj_cls, self.ctxt, {'name': 'pre_obj2'})
obj3 = api.create_object(self.obj_cls, self.ctxt, {'name': 'obj_3'})
api.create_object(self.obj_cls, self.ctxt, {'name': 'obj_3'})
objs = api.get_objects(
self.obj_cls, self.ctxt, name=obj_utils.StringStarts('pre'))
self.obj_cls, self.ctxt, name=obj_utils.StringStarts('pre'),
_pager=base.Pager(sorts=[('name', True)]))
self.assertEqual(2, len(objs))
self.assertIn(obj1, objs)
self.assertIn(obj2, objs)
self.assertNotIn(obj3, objs)
self._compare_objs(obj1, objs[0])
self._compare_objs(obj2, objs[1])
def test_get_objects_with_string_matching_filters_ends(self):
obj1 = api.create_object(self.obj_cls, self.ctxt, {'name': 'obj1_end'})
obj2 = api.create_object(self.obj_cls, self.ctxt, {'name': 'obj2_end'})
obj3 = api.create_object(self.obj_cls, self.ctxt, {'name': 'obj_3'})
api.create_object(self.obj_cls, self.ctxt, {'name': 'obj_3'})
objs = api.get_objects(
self.obj_cls, self.ctxt, name=obj_utils.StringEnds('end'))
self.obj_cls, self.ctxt, name=obj_utils.StringEnds('end'),
_pager=base.Pager(sorts=[('name', True)]))
self.assertEqual(2, len(objs))
self.assertIn(obj1, objs)
self.assertIn(obj2, objs)
self.assertNotIn(obj3, objs)
self._compare_objs(obj1, objs[0])
self._compare_objs(obj2, objs[1])
def test_get_values_with_None_value_in_filters(self):
api.create_object(self.obj_cls, self.ctxt, {'name': 'foo'})
@ -201,15 +206,14 @@ class CRUDScenarioTestCase(testlib_api.SqlTestCase):
obj = api.create_object(self.obj_cls, self.ctxt, {'name': 'foo'})
new_obj = api.get_object(self.obj_cls, self.ctxt, id=obj.id)
self.assertEqual(obj, new_obj)
self._compare_objs(obj, new_obj)
obj = new_obj
obj.name = 'bar'
api.update_object(self.obj_cls, self.ctxt, {'name': 'bar'}, id=obj.id)
new_obj = api.get_object(self.obj_cls, self.ctxt, id=obj.id)
self.assertEqual(obj, new_obj)
self._compare_objs(obj, new_obj)
obj = new_obj
api.delete_object(self.obj_cls, self.ctxt, id=obj.id)
new_obj = api.get_object(self.obj_cls, self.ctxt, id=obj.id)

View File

@ -101,8 +101,6 @@ class FakeSmallNeutronObjectNewEngineFacade(base.NeutronDbObject):
db_model = ObjectFieldsModel
new_facade = True
primary_keys = ['field1']
foreign_keys = {
@ -1796,7 +1794,7 @@ class BaseDbObjectTestCase(_BaseObjectTestCase,
def test_get_objects_single_transaction(self):
with mock.patch(self._get_ro_txn_exit_func_name()) as mock_exit:
with db_api.autonested_transaction(self.context.session):
with db_api.CONTEXT_READER.using(self.context):
self._test_class.get_objects(self.context)
self.assertEqual(1, mock_exit.call_count)
@ -1811,7 +1809,7 @@ class BaseDbObjectTestCase(_BaseObjectTestCase,
obj.create()
with mock.patch(self._get_ro_txn_exit_func_name()) as mock_exit:
with db_api.autonested_transaction(self.context.session):
with db_api.CONTEXT_READER.using(self.context):
obj = self._test_class.get_object(self.context,
**obj._get_composite_keys())
self.assertEqual(1, mock_exit.call_count)

View File

@ -18,6 +18,7 @@ import netaddr
from neutron_lib.api.definitions import qos as qos_api
from neutron_lib import constants
from neutron_lib import context
from neutron_lib.db import api as db_api
from neutron_lib.services.qos import constants as qos_constants
from oslo_config import cfg
from oslo_utils import uuidutils
@ -435,7 +436,9 @@ class TestOVNClientQosExtension(test_plugin.Ml2PluginV2TestCase):
# We can't ensure the call order because we are not enforcing any order
# when retrieving the port and the network list.
self.mock_rules.assert_has_calls(calls, any_order=True)
fip = self.qos_driver._plugin_l3._make_floatingip_dict(self.fips[0])
with db_api.CONTEXT_READER.using(self.ctx):
fip = self.qos_driver._plugin_l3.get_floatingip(self.ctx,
self.fips[0].id)
mock_update_fip.assert_called_once_with(self.txn, fip)
def test_update_floatingip(self):

View File

@ -17,6 +17,7 @@ from unittest import mock
from futurist import periodics
from neutron_lib import context
from neutron_lib.db import api as db_api
from oslo_config import cfg
from neutron.common.ovn import constants
@ -139,39 +140,40 @@ class TestDBInconsistenciesPeriodics(testlib_api.SqlTestCaseLight,
never_again=False)
def _test_fix_create_update_network(self, ovn_rev, neutron_rev):
self.net['revision_number'] = neutron_rev
with db_api.CONTEXT_WRITER.using(self.ctx):
self.net['revision_number'] = neutron_rev
# Create an entry to the revision_numbers table and assert the
# initial revision_number for our test object is the expected
ovn_revision_numbers_db.create_initial_revision(
self.ctx, self.net['id'], constants.TYPE_NETWORKS,
revision_number=ovn_rev)
row = ovn_revision_numbers_db.get_revision_row(self.ctx,
self.net['id'])
self.assertEqual(ovn_rev, row.revision_number)
# Create an entry to the revision_numbers table and assert the
# initial revision_number for our test object is the expected
ovn_revision_numbers_db.create_initial_revision(
self.ctx, self.net['id'], constants.TYPE_NETWORKS,
revision_number=ovn_rev)
row = ovn_revision_numbers_db.get_revision_row(self.ctx,
self.net['id'])
self.assertEqual(ovn_rev, row.revision_number)
if ovn_rev < 0:
self.fake_ovn_client._nb_idl.get_lswitch.return_value = None
else:
fake_ls = mock.Mock(external_ids={
constants.OVN_REV_NUM_EXT_ID_KEY: ovn_rev})
self.fake_ovn_client._nb_idl.get_lswitch.return_value = fake_ls
if ovn_rev < 0:
self.fake_ovn_client._nb_idl.get_lswitch.return_value = None
else:
fake_ls = mock.Mock(external_ids={
constants.OVN_REV_NUM_EXT_ID_KEY: ovn_rev})
self.fake_ovn_client._nb_idl.get_lswitch.return_value = fake_ls
self.fake_ovn_client._plugin.get_network.return_value = self.net
self.periodic._fix_create_update(self.ctx, row)
self.fake_ovn_client._plugin.get_network.return_value = self.net
self.periodic._fix_create_update(self.ctx, row)
# Since the revision number was < 0, make sure create_network()
# is invoked with the latest version of the object in the neutron
# database
if ovn_rev < 0:
self.fake_ovn_client.create_network.assert_called_once_with(
self.ctx, self.net)
# If the revision number is > 0 it means that the object already
# exist and we just need to update to match the latest in the
# neutron database so, update_network() should be called.
else:
self.fake_ovn_client.update_network.assert_called_once_with(
self.ctx, self.net)
# Since the revision number was < 0, make sure create_network()
# is invoked with the latest version of the object in the neutron
# database
if ovn_rev < 0:
self.fake_ovn_client.create_network.assert_called_once_with(
self.ctx, self.net)
# If the revision number is > 0 it means that the object already
# exist and we just need to update to match the latest in the
# neutron database so, update_network() should be called.
else:
self.fake_ovn_client.update_network.assert_called_once_with(
self.ctx, self.net)
def test_fix_network_create(self):
self._test_fix_create_update_network(ovn_rev=-1, neutron_rev=2)
@ -180,40 +182,41 @@ class TestDBInconsistenciesPeriodics(testlib_api.SqlTestCaseLight,
self._test_fix_create_update_network(ovn_rev=5, neutron_rev=7)
def _test_fix_create_update_port(self, ovn_rev, neutron_rev):
self.port['revision_number'] = neutron_rev
_nb_idl = self.fake_ovn_client._nb_idl
with db_api.CONTEXT_WRITER.using(self.ctx):
self.port['revision_number'] = neutron_rev
# Create an entry to the revision_numbers table and assert the
# initial revision_number for our test object is the expected
ovn_revision_numbers_db.create_initial_revision(
self.ctx, self.port['id'], constants.TYPE_PORTS,
revision_number=ovn_rev)
row = ovn_revision_numbers_db.get_revision_row(self.ctx,
self.port['id'])
self.assertEqual(ovn_rev, row.revision_number)
# Create an entry to the revision_numbers table and assert the
# initial revision_number for our test object is the expected
ovn_revision_numbers_db.create_initial_revision(
self.ctx, self.port['id'], constants.TYPE_PORTS,
revision_number=ovn_rev)
row = ovn_revision_numbers_db.get_revision_row(self.ctx,
self.port['id'])
self.assertEqual(ovn_rev, row.revision_number)
if ovn_rev < 0:
self.fake_ovn_client._nb_idl.get_lswitch_port.return_value = None
else:
fake_lsp = mock.Mock(external_ids={
constants.OVN_REV_NUM_EXT_ID_KEY: ovn_rev})
self.fake_ovn_client._nb_idl.get_lswitch_port.return_value = (
fake_lsp)
if ovn_rev < 0:
_nb_idl.get_lswitch_port.return_value = None
else:
fake_lsp = mock.Mock(external_ids={
constants.OVN_REV_NUM_EXT_ID_KEY: ovn_rev})
_nb_idl.get_lswitch_port.return_value = fake_lsp
self.fake_ovn_client._plugin.get_port.return_value = self.port
self.periodic._fix_create_update(self.ctx, row)
self.fake_ovn_client._plugin.get_port.return_value = self.port
self.periodic._fix_create_update(self.ctx, row)
# Since the revision number was < 0, make sure create_port()
# is invoked with the latest version of the object in the neutron
# database
if ovn_rev < 0:
self.fake_ovn_client.create_port.assert_called_once_with(
self.ctx, self.port)
# If the revision number is > 0 it means that the object already
# exist and we just need to update to match the latest in the
# neutron database so, update_port() should be called.
else:
self.fake_ovn_client.update_port.assert_called_once_with(
self.ctx, self.port)
# Since the revision number was < 0, make sure create_port()
# is invoked with the latest version of the object in the neutron
# database
if ovn_rev < 0:
self.fake_ovn_client.create_port.assert_called_once_with(
self.ctx, self.port)
# If the revision number is > 0 it means that the object already
# exist and we just need to update to match the latest in the
# neutron database so, update_port() should be called.
else:
self.fake_ovn_client.update_port.assert_called_once_with(
self.ctx, self.port)
def test_fix_port_create(self):
self._test_fix_create_update_port(ovn_rev=-1, neutron_rev=2)
@ -223,14 +226,16 @@ class TestDBInconsistenciesPeriodics(testlib_api.SqlTestCaseLight,
@mock.patch.object(ovn_revision_numbers_db, 'bump_revision')
def _test_fix_security_group_create(self, mock_bump, revision_number):
sg_name = utils.ovn_addrset_name('fake_id', 'ip4')
sg = self._make_security_group(self.fmt, sg_name, '')['security_group']
with db_api.CONTEXT_WRITER.using(self.ctx):
sg_name = utils.ovn_addrset_name('fake_id', 'ip4')
sg = self._make_security_group(
self.fmt, sg_name, '')['security_group']
ovn_revision_numbers_db.create_initial_revision(
self.ctx, sg['id'], constants.TYPE_SECURITY_GROUPS,
revision_number=revision_number)
row = ovn_revision_numbers_db.get_revision_row(self.ctx, sg['id'])
self.assertEqual(revision_number, row.revision_number)
ovn_revision_numbers_db.create_initial_revision(
self.ctx, sg['id'], constants.TYPE_SECURITY_GROUPS,
revision_number=revision_number)
row = ovn_revision_numbers_db.get_revision_row(self.ctx, sg['id'])
self.assertEqual(revision_number, row.revision_number)
if revision_number < 0:
self.fake_ovn_client._nb_idl.get_address_set.return_value = None