New enginefacade for networks, subnets.

Usage reader and writer for db operations.

In Neutron code we have common situation like:

with context.session.begin():
   context.session.add(obj)

self._make_obj_dict(obj)

With new enginefacade we change context.session.begin() for
db.context_manager.writer(reader).using(context).

When object leaves this with-block, its reference to session is
cleared because session is discarded. To use this object later to
load some data from its dependencies, we have to provide different
session to work in. To solve this obj either can be moved under
with-block or we have to do context.session.add(obj) one more time
to be able to load relations.

This change also switches to usage of new enginefacade for some db
operations with ports, in order to pass unit and functional tests.

Partially-Implements blueprint: enginefacade-switch

Change-Id: Ia15c63f94d2c67791da3b65546e59f6929c8c685
This commit is contained in:
Ann Kamyshnikova 2016-11-24 17:16:14 +03:00 committed by Kevin Benton
parent b02bbf8ba5
commit cf34df8572
16 changed files with 184 additions and 141 deletions

View File

@ -322,14 +322,12 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
objects = []
collection = "%ss" % resource
items = request_items[collection]
context.session.begin(subtransactions=True)
try:
for item in items:
obj_creator = getattr(self, 'create_%s' % resource)
objects.append(obj_creator(context, item))
context.session.commit()
with db_api.context_manager.writer.using(context):
for item in items:
obj_creator = getattr(self, 'create_%s' % resource)
objects.append(obj_creator(context, item))
except Exception:
context.session.rollback()
with excutils.save_and_reraise_exception():
LOG.error(_LE("An exception occurred while creating "
"the %(resource)s:%(item)s"),
@ -350,7 +348,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
def create_network_db(self, context, network):
# single request processing
n = network['network']
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
args = {'tenant_id': n['tenant_id'],
'id': n.get('id') or uuidutils.generate_uuid(),
'name': n['name'],
@ -369,7 +367,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
@db_api.retry_if_session_inactive()
def update_network(self, context, id, network):
n = network['network']
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
network = self._get_network(context, id)
# validate 'shared' parameter
if 'shared' in n:
@ -408,9 +406,10 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
registry.notify(resources.NETWORK, events.BEFORE_DELETE, self,
context=context, network_id=id)
self._ensure_network_not_in_use(context, id)
auto_delete_port_ids = [p.id for p in context.session.query(
models_v2.Port.id).filter_by(network_id=id).filter(
models_v2.Port.device_owner.in_(AUTO_DELETE_PORT_OWNERS))]
with db_api.context_manager.reader.using(context):
auto_delete_port_ids = [p.id for p in context.session.query(
models_v2.Port.id).filter_by(network_id=id).filter(
models_v2.Port.device_owner.in_(AUTO_DELETE_PORT_OWNERS))]
for port_id in auto_delete_port_ids:
self.delete_port(context.elevated(), port_id)
# clean up subnets
@ -420,7 +419,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
# cleanup if a network-owned port snuck in without failing
for subnet in subnets:
self.delete_subnet(context, subnet['id'])
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
network_db = self._get_network(context, id)
network = self._make_network_dict(network_db, context=context)
registry.notify(resources.NETWORK, events.PRECOMMIT_DELETE,
@ -739,7 +738,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
raise exc.BadRequest(resource='subnets', msg=msg)
self._validate_subnet(context, s)
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
network = self._get_network(context,
subnet['subnet']['network_id'])
subnet, ipam_subnet = self.ipam.allocate_subnet(context,
@ -823,13 +822,16 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
registry.notify(resources.SUBNET_GATEWAY, events.BEFORE_UPDATE,
self, **kwargs)
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
subnet, changes = self.ipam.update_db_subnet(context, id, s,
db_pools)
# we expire here since ipam may have made changes to relationships
# that will be stale on any subsequent lookups while the subnet object
# is in the session otherwise.
context.session.expire(subnet)
# that will be stale on any subsequent lookups while the subnet
# object is in the session otherwise.
# Check if subnet attached to session before expire.
if subnet in context.session:
context.session.expire(subnet)
return self._make_subnet_dict(subnet, context=context)
@property
@ -962,7 +964,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
self._remove_subnet_ip_allocations_from_ports(context, id)
# retry integrity errors to catch ip allocation races
with db_api.exc_to_retry(sql_exc.IntegrityError), \
context.session.begin(subtransactions=True):
db_api.context_manager.writer.using(context):
subnet_db = self._get_subnet(context, id)
subnet = self._make_subnet_dict(subnet_db, context=context)
registry.notify(resources.SUBNET, events.PRECOMMIT_DELETE,
@ -1098,7 +1100,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
def update_subnetpool(self, context, id, subnetpool):
new_sp = subnetpool['subnetpool']
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
orig_sp = self._get_subnetpool(context, id=id)
updated = _update_subnetpool_dict(orig_sp, new_sp)
reader = subnet_alloc.SubnetPoolReader(updated)
@ -1161,7 +1163,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
@db_api.retry_if_session_inactive()
def delete_subnetpool(self, context, id):
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
subnetpool = self._get_subnetpool(context, id=id)
subnets = self._get_subnets_by_subnetpool(context, id)
if subnets:

View File

@ -17,6 +17,7 @@ import netaddr
import six
from sqlalchemy import func
from neutron.db import api as db_api
import neutron.db.models_v2 as mod
NETWORK_ID = 'network_id'
@ -84,6 +85,7 @@ class IpAvailabilityMixin(object):
return net_ip_availabilities
@classmethod
@db_api.context_manager.reader
def _build_network_used_ip_query(cls, context, filters):
# Generate a query to gather network/subnet/used_ips.
# Ensure query is tolerant of missing child table data (outerjoins)
@ -100,6 +102,7 @@ class IpAvailabilityMixin(object):
return cls._adjust_query_for_filters(query, filters)
@classmethod
@db_api.context_manager.reader
def _build_total_ips_query(cls, context, filters):
query = context.session.query()
query = query.add_columns(*cls.total_ips_columns)

View File

@ -18,6 +18,7 @@ from neutron._i18n import _LI
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.db import api as db_api
from neutron.db.models import segment as segments_model
LOG = logging.getLogger(__name__)
@ -37,7 +38,7 @@ def _make_segment_dict(record):
def add_network_segment(context, network_id, segment, segment_index=0,
is_dynamic=False):
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
record = segments_model.NetworkSegment(
id=uuidutils.generate_uuid(),
network_id=network_id,
@ -70,7 +71,7 @@ def get_networks_segments(context, network_ids, filter_dynamic=False):
if not network_ids:
return {}
with context.session.begin(subtransactions=True):
with db_api.context_manager.reader.using(context):
query = (context.session.query(segments_model.NetworkSegment).
filter(segments_model.NetworkSegment.network_id
.in_(network_ids)).
@ -85,7 +86,7 @@ def get_networks_segments(context, network_ids, filter_dynamic=False):
def get_segment_by_id(context, segment_id):
with context.session.begin(subtransactions=True):
with db_api.context_manager.reader.using(context):
try:
record = (context.session.query(segments_model.NetworkSegment).
filter_by(id=segment_id).
@ -98,7 +99,7 @@ def get_segment_by_id(context, segment_id):
def get_dynamic_segment(context, network_id, physical_network=None,
segmentation_id=None):
"""Return a dynamic segment for the filters provided if one exists."""
with context.session.begin(subtransactions=True):
with db_api.context_manager.reader.using(context):
query = (context.session.query(segments_model.NetworkSegment).
filter_by(network_id=network_id, is_dynamic=True))
if physical_network:
@ -122,6 +123,6 @@ def get_dynamic_segment(context, network_id, physical_network=None,
def delete_network_segment(context, segment_id):
"""Release a dynamic segment for the params provided if one exists."""
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
(context.session.query(segments_model.NetworkSegment).
filter_by(id=segment_id).delete())

View File

@ -201,6 +201,8 @@ class NeutronDbSubnet(ipam_base.Subnet):
# The only defined status at this stage is 'ALLOCATED'.
# More states will be available in the future - e.g.: RECYCLABLE
try:
# TODO(ataraday): revisit this after objects switched to
# new enginefacade
with self._context.session.begin(subtransactions=True):
# NOTE(kevinbenton): we use a subtransaction to force
# a flush here so we can capture DBReferenceErrors due

View File

@ -24,6 +24,7 @@ from oslo_utils import uuidutils
from neutron._i18n import _
from neutron.common import exceptions as n_exc
from neutron.db import api as db_api
from neutron.db import models_v2
from neutron.ipam import driver
from neutron.ipam import exceptions as ipam_exc
@ -49,9 +50,10 @@ class SubnetAllocator(driver.Pool):
subnetpool, it's required to ensure non-overlapping cidrs in the same
subnetpool.
"""
current_hash = (self._context.session.query(models_v2.SubnetPool.hash)
.filter_by(id=self._subnetpool['id']).scalar())
with db_api.context_manager.reader.using(self._context):
current_hash = (
self._context.session.query(models_v2.SubnetPool.hash)
.filter_by(id=self._subnetpool['id']).scalar())
if current_hash is None:
# NOTE(cbrandily): subnetpool has been deleted
raise n_exc.SubnetPoolNotFound(
@ -61,17 +63,21 @@ class SubnetAllocator(driver.Pool):
# NOTE(cbrandily): the update disallows 2 concurrent subnet allocation
# to succeed: at most 1 transaction will succeed, others will be
# rolled back and be caught in neutron.db.v2.base
query = self._context.session.query(models_v2.SubnetPool).filter_by(
id=self._subnetpool['id'], hash=current_hash)
count = query.update({'hash': new_hash})
with db_api.context_manager.writer.using(self._context):
query = (
self._context.session.query(models_v2.SubnetPool).filter_by(
id=self._subnetpool['id'], hash=current_hash))
count = query.update({'hash': new_hash})
if not count:
raise db_exc.RetryRequest(lib_exc.SubnetPoolInUse(
subnet_pool_id=self._subnetpool['id']))
def _get_allocated_cidrs(self):
query = self._context.session.query(models_v2.Subnet)
subnets = query.filter_by(subnetpool_id=self._subnetpool['id'])
return (x.cidr for x in subnets)
with db_api.context_manager.reader.using(self._context):
query = self._context.session.query(models_v2.Subnet)
subnets = query.filter_by(subnetpool_id=self._subnetpool['id'])
return (x.cidr for x in subnets)
def _get_available_prefix_list(self):
prefixes = (x.cidr for x in self._subnetpool.prefixes)
@ -90,7 +96,7 @@ class SubnetAllocator(driver.Pool):
def _allocations_used_by_tenant(self, quota_unit):
subnetpool_id = self._subnetpool['id']
tenant_id = self._subnetpool['tenant_id']
with self._context.session.begin(subtransactions=True):
with db_api.context_manager.reader.using(self._context):
qry = self._context.session.query(models_v2.Subnet)
allocations = qry.filter_by(subnetpool_id=subnetpool_id,
tenant_id=tenant_id)
@ -115,7 +121,7 @@ class SubnetAllocator(driver.Pool):
raise n_exc.SubnetPoolQuotaExceeded()
def _allocate_any_subnet(self, request):
with self._context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(self._context):
self._lock_subnetpool()
self._check_subnetpool_tenant_quota(request.tenant_id,
request.prefixlen)
@ -139,7 +145,7 @@ class SubnetAllocator(driver.Pool):
str(request.prefixlen))
def _allocate_specific_subnet(self, request):
with self._context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(self._context):
self._lock_subnetpool()
self._check_subnetpool_tenant_quota(request.tenant_id,
request.prefixlen)

View File

@ -27,6 +27,7 @@ from neutron._i18n import _, _LE
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.db import api as db_api
from neutron.db.models import securitygroup as sg_models
from neutron.db import models_v2
from neutron.extensions import portbindings
@ -40,13 +41,13 @@ LOG = log.getLogger(__name__)
MAX_PORTS_PER_QUERY = 500
@db_api.context_manager.writer
def add_port_binding(context, port_id):
with context.session.begin(subtransactions=True):
record = models.PortBinding(
port_id=port_id,
vif_type=portbindings.VIF_TYPE_UNBOUND)
context.session.add(record)
return record
record = models.PortBinding(
port_id=port_id,
vif_type=portbindings.VIF_TYPE_UNBOUND)
context.session.add(record)
return record
@removals.remove(
@ -77,6 +78,7 @@ def get_locked_port_and_binding(context, port_id):
return None, None
@db_api.context_manager.writer
def set_binding_levels(context, levels):
if levels:
for level in levels:
@ -90,6 +92,7 @@ def set_binding_levels(context, levels):
LOG.debug("Attempted to set empty binding levels")
@db_api.context_manager.reader
def get_binding_levels(context, port_id, host):
if host:
result = (context.session.query(models.PortBindingLevel).
@ -104,6 +107,7 @@ def get_binding_levels(context, port_id, host):
return result
@db_api.context_manager.writer
def clear_binding_levels(context, port_id, host):
if host:
(context.session.query(models.PortBindingLevel).
@ -116,13 +120,14 @@ def clear_binding_levels(context, port_id, host):
def ensure_distributed_port_binding(context, port_id, host, router_id=None):
record = (context.session.query(models.DistributedPortBinding).
filter_by(port_id=port_id, host=host).first())
with db_api.context_manager.reader.using(context):
record = (context.session.query(models.DistributedPortBinding).
filter_by(port_id=port_id, host=host).first())
if record:
return record
try:
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
record = models.DistributedPortBinding(
port_id=port_id,
host=host,
@ -134,13 +139,14 @@ def ensure_distributed_port_binding(context, port_id, host, router_id=None):
return record
except db_exc.DBDuplicateEntry:
LOG.debug("Distributed Port %s already bound", port_id)
return (context.session.query(models.DistributedPortBinding).
filter_by(port_id=port_id, host=host).one())
with db_api.context_manager.reader.using(context):
return (context.session.query(models.DistributedPortBinding).
filter_by(port_id=port_id, host=host).one())
def delete_distributed_port_binding_if_stale(context, binding):
if not binding.router_id and binding.status == n_const.PORT_STATUS_DOWN:
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
LOG.debug("Distributed port: Deleting binding %s", binding)
context.session.delete(binding)
@ -148,10 +154,12 @@ def delete_distributed_port_binding_if_stale(context, binding):
def get_port(context, port_id):
"""Get port record for update within transaction."""
with context.session.begin(subtransactions=True):
with db_api.context_manager.reader.using(context):
try:
# Set enable_eagerloads to True, so that lazy load can be
# proceed later.
record = (context.session.query(models_v2.Port).
enable_eagerloads(False).
enable_eagerloads(True).
filter(models_v2.Port.id.startswith(port_id)).
one())
return record
@ -163,6 +171,7 @@ def get_port(context, port_id):
return
@db_api.context_manager.reader
def get_port_from_device_mac(context, device_mac):
LOG.debug("get_port_from_device_mac() called for mac %s", device_mac)
ports = port_obj.Port.get_objects(context, mac_address=device_mac)
@ -194,7 +203,7 @@ def get_sg_ids_grouped_by_port(context, port_ids):
sg_ids_grouped_by_port = {}
sg_binding_port = sg_models.SecurityGroupPortBinding.port_id
with context.session.begin(subtransactions=True):
with db_api.context_manager.reader.using(context):
# partial UUIDs must be individually matched with startswith.
# full UUIDs may be matched directly in an IN statement
partial_uuids = set(port_id for port_id in port_ids
@ -233,7 +242,7 @@ def make_port_dict_with_security_groups(port, sec_groups):
def get_port_binding_host(context, port_id):
try:
with context.session.begin(subtransactions=True):
with db_api.context_manager.reader.using(context):
query = (context.session.query(models.PortBinding).
filter(models.PortBinding.port_id.startswith(port_id)).
one())
@ -248,6 +257,7 @@ def get_port_binding_host(context, port_id):
return query.host
@db_api.context_manager.reader
def generate_distributed_port_status(context, port_id):
# an OR'ed value of status assigned to parent port from the
# distributedportbinding bucket
@ -262,7 +272,7 @@ def generate_distributed_port_status(context, port_id):
def get_distributed_port_binding_by_host(context, port_id, host):
with context.session.begin(subtransactions=True):
with db_api.context_manager.reader.using(context):
binding = (context.session.query(models.DistributedPortBinding).
filter(models.DistributedPortBinding.port_id.startswith(port_id),
models.DistributedPortBinding.host == host).first())
@ -273,7 +283,7 @@ def get_distributed_port_binding_by_host(context, port_id, host):
def get_distributed_port_bindings(context, port_id):
with context.session.begin(subtransactions=True):
with db_api.context_manager.reader.using(context):
bindings = (context.session.query(models.DistributedPortBinding).
filter(models.DistributedPortBinding.port_id.startswith(
port_id)).all())
@ -282,6 +292,7 @@ def get_distributed_port_bindings(context, port_id):
return bindings
@db_api.context_manager.reader
def is_dhcp_active_on_any_subnet(context, subnet_ids):
if not subnet_ids:
return False
@ -297,13 +308,15 @@ def _prevent_segment_delete_with_port_bound(resource, event, trigger,
if for_net_delete:
# don't check for network deletes
return
segment_id = segment['id']
query = context.session.query(models_v2.Port)
query = query.join(
models.PortBindingLevel,
models.PortBindingLevel.port_id == models_v2.Port.id)
query = query.filter(models.PortBindingLevel.segment_id == segment_id)
port_ids = [p.id for p in query]
with db_api.context_manager.reader.using(context):
segment_id = segment['id']
query = context.session.query(models_v2.Port)
query = query.join(
models.PortBindingLevel,
models.PortBindingLevel.port_id == models_v2.Port.id)
query = query.filter(models.PortBindingLevel.segment_id == segment_id)
port_ids = [p.id for p in query]
# There are still some ports in the segment, segment should not be deleted
# TODO(xiaohhui): Should we delete the dhcp port automatically here?

View File

@ -190,8 +190,7 @@ class TypeManager(stevedore.named.NamedExtensionManager):
def create_network_segments(self, context, network, tenant_id):
"""Call type drivers to create network segments."""
segments = self._process_provider_create(network)
session = context.session
with session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
network_id = network['id']
if segments:
for segment_index, segment in enumerate(segments):
@ -224,7 +223,7 @@ class TypeManager(stevedore.named.NamedExtensionManager):
self.validate_provider_segment(segment)
# Reserve segment in type driver
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
return self.reserve_provider_segment(context, segment)
def is_partial_segment(self, segment):

View File

@ -433,7 +433,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# After we've attempted to bind the port, we begin a
# transaction, get the current port state, and decide whether
# to commit the binding results.
with plugin_context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(plugin_context):
# Get the current port state and build a new PortContext
# reflecting this state as original state for subsequent
# mechanism driver update_port_*commit() calls.
@ -594,17 +594,24 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
attributes.SUBNETS, ['_ml2_md_extend_subnet_dict'])
def _ml2_md_extend_network_dict(self, result, netdb):
session = sqlalchemy.inspect(netdb).session
session = self._object_session_or_new_session(netdb)
self.extension_manager.extend_network_dict(session, netdb, result)
def _ml2_md_extend_port_dict(self, result, portdb):
session = sqlalchemy.inspect(portdb).session
session = self._object_session_or_new_session(portdb)
self.extension_manager.extend_port_dict(session, portdb, result)
def _ml2_md_extend_subnet_dict(self, result, subnetdb):
session = sqlalchemy.inspect(subnetdb).session
session = self._object_session_or_new_session(subnetdb)
self.extension_manager.extend_subnet_dict(session, subnetdb, result)
@staticmethod
def _object_session_or_new_session(sql_obj):
session = sqlalchemy.inspect(sql_obj).session
if not session:
session = db_api.get_reader_session()
return session
# Note - The following hook methods have "ml2" in their names so
# that they are not called twice during unit tests due to global
# registration of hooks in portbindings_db.py used by other
@ -656,7 +663,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
objects = []
collection = "%ss" % resource
items = request_items[collection]
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
obj_creator = getattr(self, '_create_%s_db' % resource)
for item in items:
try:
@ -733,10 +740,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def _create_network_db(self, context, network):
net_data = network[attributes.NETWORK]
tenant_id = net_data['tenant_id']
session = context.session
registry.notify(resources.NETWORK, events.BEFORE_CREATE, self,
context=context, network=net_data)
with session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
net_db = self.create_network_db(context, network)
result = self._make_network_dict(net_db, process_extensions=False,
context=context)
@ -766,7 +772,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
net_db[az_ext.AZ_HINTS] = az_hints
result[az_ext.AZ_HINTS] = az_hints
self._apply_dict_extend_functions('networks', result, net_db)
self._apply_dict_extend_functions('networks', result, net_db)
return result, mech_context
@utils.transaction_guard
@ -797,8 +803,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
net_data = network[attributes.NETWORK]
provider._raise_if_updates_provider_attributes(net_data)
session = context.session
with session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
original_network = super(Ml2Plugin, self).get_network(context, id)
updated_network = super(Ml2Plugin, self).update_network(context,
id,
@ -841,8 +846,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
@db_api.retry_if_session_inactive()
def get_network(self, context, id, fields=None):
session = context.session
with session.begin(subtransactions=True):
with db_api.context_manager.reader.using(context):
result = super(Ml2Plugin, self).get_network(context, id, None)
self.type_manager.extend_network_dict_provider(context, result)
result[api.MTU] = self._get_network_mtu(result)
@ -852,8 +856,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
@db_api.retry_if_session_inactive()
def get_networks(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None, page_reverse=False):
session = context.session
with session.begin(subtransactions=True):
with db_api.context_manager.reader.using(context):
nets = super(Ml2Plugin,
self).get_networks(context, filters, None, sorts,
limit, marker, page_reverse)
@ -900,8 +903,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self.notifier.network_delete(context, network['id'])
def _create_subnet_db(self, context, subnet):
session = context.session
with session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
result, net_db, ipam_sub = self._create_subnet_precommit(
context, subnet)
self.extension_manager.process_create_subnet(
@ -912,6 +914,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
mech_context = driver_context.SubnetContext(self, context,
result, network)
self.mechanism_manager.create_subnet_precommit(mech_context)
# TODO(ataraday): temporary flush until this will be available
# in oslo.db (change https://review.openstack.org/#/c/433758/
# got merged)
context.session.flush()
# db base plugin post commit ops
self._create_subnet_postcommit(context, result, net_db, ipam_sub)
@ -941,8 +948,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
@utils.transaction_guard
@db_api.retry_if_session_inactive()
def update_subnet(self, context, id, subnet):
session = context.session
with session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
original_subnet = self.get_subnet(context, id)
updated_subnet = self._update_subnet_precommit(
context, id, subnet)
@ -1042,8 +1048,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
registry.notify(resources.PORT, events.BEFORE_CREATE, self,
context=context, port=attrs)
session = context.session
with session.begin(subtransactions=True):
# NOTE(kevinbenton): triggered outside of transaction since it
# emits 'AFTER' events if it creates.
self._ensure_default_security_group(context, attrs['tenant_id'])
with db_api.context_manager.writer.using(context):
dhcp_opts = attrs.get(edo_ext.EXTRADHCPOPTS, [])
port_db = self.create_port_db(context, port)
result = self._make_port_dict(port_db, process_extensions=False)
@ -1180,10 +1188,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def update_port(self, context, id, port):
attrs = port[attributes.PORT]
need_port_update_notify = False
session = context.session
bound_mech_contexts = []
with session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
port_db = self._get_port(context, id)
binding = port_db.port_binding
if not binding:
@ -1343,7 +1349,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
LOG.error(_LE("No Host supplied to bind DVR Port %s"), id)
return
session = context.session
binding = db.get_distributed_port_binding_by_host(context,
id, host)
device_id = attrs and attrs.get('device_id')
@ -1353,7 +1358,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
router_id != device_id)
if update_required:
try:
with session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
orig_port = self.get_port(context, id)
if not binding:
binding = db.ensure_distributed_port_binding(
@ -1398,8 +1403,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
router_ids = []
l3plugin = directory.get_plugin(const.L3)
session = context.session
with session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
try:
port_db = self._get_port(context, id)
binding = port_db.port_binding
@ -1466,8 +1470,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
@db_api.retry_if_session_inactive(context_var_name='plugin_context')
def get_bound_port_context(self, plugin_context, port_id, host=None,
cached_networks=None):
session = plugin_context.session
with session.begin(subtransactions=True):
with db_api.context_manager.reader.using(plugin_context) as session:
try:
port_db = (session.query(models_v2.Port).
enable_eagerloads(False).
@ -1527,8 +1530,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
one was already performed by the caller.
"""
updated = False
session = context.session
with session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
port = db.get_port(context, port_id)
if not port:
LOG.debug("Port %(port)s update to %(val)s by agent not found",
@ -1560,7 +1562,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
if (updated and
port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE):
with session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
port = db.get_port(context, port_id)
if not port:
LOG.warning(_LW("Port %s not found during update"),

View File

@ -77,7 +77,7 @@ class SegmentDbMixin(common_db_mixin.CommonDbMixin):
return self._make_segment_dict(new_segment)
def _create_segment_db(self, context, segment_id, segment):
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
network_id = segment['network_id']
physical_network = segment[extension.PHYSICAL_NETWORK]
if physical_network == constants.ATTR_NOT_SPECIFIED:
@ -125,7 +125,7 @@ class SegmentDbMixin(common_db_mixin.CommonDbMixin):
def update_segment(self, context, uuid, segment):
"""Update an existing segment."""
segment = segment['segment']
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
curr_segment = self._get_segment(context, uuid)
curr_segment.update(segment)
return self._make_segment_dict(curr_segment)
@ -175,7 +175,7 @@ class SegmentDbMixin(common_db_mixin.CommonDbMixin):
segment=segment, for_net_delete=for_net_delete)
# Delete segment in DB
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
query = self._model_query(context, segment_model.NetworkSegment)
query = query.filter(segment_model.NetworkSegment.id == uuid)
if 0 == query.delete():
@ -191,7 +191,7 @@ class SegmentDbMixin(common_db_mixin.CommonDbMixin):
def update_segment_host_mapping(context, host, current_segment_ids):
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
segment_host_mapping = network.SegmentHostMapping.get_objects(
context, host=host)
previous_segment_ids = {
@ -241,7 +241,7 @@ def get_segments_with_phys_nets(context, phys_nets):
if not phys_nets:
return []
with context.session.begin(subtransactions=True):
with db_api.context_manager.reader.using(context):
segments = context.session.query(segment_model.NetworkSegment).filter(
segment_model.NetworkSegment.physical_network.in_(phys_nets))
return segments
@ -249,7 +249,7 @@ def get_segments_with_phys_nets(context, phys_nets):
def map_segment_to_hosts(context, segment_id, hosts):
"""Map segment to a collection of hosts."""
with db_api.autonested_transaction(context.session):
with db_api.context_manager.writer.using(context):
for host in hosts:
network.SegmentHostMapping(
context, segment_id=segment_id, host=host).create()

View File

@ -30,6 +30,7 @@ from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.common import exceptions as n_exc
from neutron.db import _resource_extend as resource_extend
from neutron.db import api as db_api
from neutron.db.models import segment as segment_model
from neutron.db import models_v2
from neutron.extensions import ip_allocation
@ -102,14 +103,17 @@ class Plugin(db.SegmentDbMixin, segment.SegmentPluginBase):
if for_net_delete:
# don't check if this is a part of a network delete operation
return
segment_id = segment['id']
query = context.session.query(models_v2.Subnet.id)
query = query.filter(models_v2.Subnet.segment_id == segment_id)
subnet_ids = [s[0] for s in query]
with db_api.context_manager.reader.using(context):
segment_id = segment['id']
query = context.session.query(models_v2.Subnet.id)
query = query.filter(models_v2.Subnet.segment_id == segment_id)
subnet_ids = [s[0] for s in query]
if subnet_ids:
reason = _("The segment is still associated with subnet(s) "
"%s") % ", ".join(subnet_ids)
raise exceptions.SegmentInUse(segment_id=segment_id, reason=reason)
raise exceptions.SegmentInUse(segment_id=segment_id,
reason=reason)
class Event(object):

View File

@ -6295,7 +6295,7 @@ class DbModelMixin(object):
class DbModelTenantTestCase(DbModelMixin, testlib_api.SqlTestCase):
def _make_network(self, ctx):
with ctx.session.begin():
with db_api.context_manager.writer.using(ctx):
network = models_v2.Network(name="net_net", status="OK",
tenant_id='dbcheck',
admin_state_up=True)
@ -6303,7 +6303,7 @@ class DbModelTenantTestCase(DbModelMixin, testlib_api.SqlTestCase):
return network
def _make_subnet(self, ctx, network_id):
with ctx.session.begin():
with db_api.context_manager.writer.using(ctx):
subnet = models_v2.Subnet(name="subsub", ip_version=4,
tenant_id='dbcheck',
cidr='turn_down_for_what',
@ -6321,7 +6321,7 @@ class DbModelTenantTestCase(DbModelMixin, testlib_api.SqlTestCase):
return port
def _make_subnetpool(self, ctx):
with ctx.session.begin():
with db_api.context_manager.writer.using(ctx):
subnetpool = models_v2.SubnetPool(
ip_version=4, default_prefixlen=4, min_prefixlen=4,
max_prefixlen=4, shared=False, default_quota=4,
@ -6334,7 +6334,7 @@ class DbModelTenantTestCase(DbModelMixin, testlib_api.SqlTestCase):
class DbModelProjectTestCase(DbModelMixin, testlib_api.SqlTestCase):
def _make_network(self, ctx):
with ctx.session.begin():
with db_api.context_manager.writer.using(ctx):
network = models_v2.Network(name="net_net", status="OK",
project_id='dbcheck',
admin_state_up=True)
@ -6342,7 +6342,7 @@ class DbModelProjectTestCase(DbModelMixin, testlib_api.SqlTestCase):
return network
def _make_subnet(self, ctx, network_id):
with ctx.session.begin():
with db_api.context_manager.writer.using(ctx):
subnet = models_v2.Subnet(name="subsub", ip_version=4,
project_id='dbcheck',
cidr='turn_down_for_what',
@ -6360,7 +6360,7 @@ class DbModelProjectTestCase(DbModelMixin, testlib_api.SqlTestCase):
return port
def _make_subnetpool(self, ctx):
with ctx.session.begin():
with db_api.context_manager.writer.using(ctx):
subnetpool = models_v2.SubnetPool(
ip_version=4, default_prefixlen=4, min_prefixlen=4,
max_prefixlen=4, shared=False, default_quota=4,

View File

@ -810,7 +810,9 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
RuntimeError,
self.mixin.add_router_interface,
self.ctx, router['id'], {'port_id': port['port']['id']})
# expire since we are re-using the session which might have stale
# ports in it
self.ctx.session.expire_all()
port_info = self.core_plugin.get_port(self.ctx, port['port']['id'])
self.assertEqual(port_dict['device_id'], port_info['device_id'])
self.assertEqual(port_dict['device_owner'],

View File

@ -23,6 +23,7 @@ from oslo_db import exception as db_exc
from oslo_utils import uuidutils
from neutron.common import exceptions as n_exc
from neutron.db import api as db_api
from neutron.ipam import requests as ipam_req
from neutron.ipam import subnet_alloc
from neutron.tests.unit.db import test_db_base_plugin_v2
@ -64,7 +65,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp',
prefix_list, 21, 4)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
with self.ctx.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(self.ctx):
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam_req.AnySubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
@ -80,7 +81,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp',
['10.1.0.0/16', '192.168.1.0/24'],
21, 4)
with self.ctx.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(self.ctx):
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam_req.SpecificSubnetRequest(self._tenant_id,
@ -122,7 +123,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
['10.1.0.0/16', '192.168.1.0/24'],
21, 4)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
with self.ctx.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(self.ctx):
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam_req.AnySubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
@ -137,7 +138,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
['10.1.0.0/16', '192.168.1.0/24'],
21, 4)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
with self.ctx.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(self.ctx):
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam_req.SpecificSubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
@ -154,7 +155,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
['2210::/64'],
64, 6)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
with self.ctx.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(self.ctx):
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam_req.SpecificSubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),

View File

@ -129,7 +129,8 @@ class TestNetworkExtension(model_base.BASEV2):
value = sa.Column(sa.String(64))
network = orm.relationship(
models_v2.Network,
backref=orm.backref('extension', cascade='delete', uselist=False))
backref=orm.backref('extension', cascade='delete', uselist=False,
lazy='joined'))
class TestSubnetExtension(model_base.BASEV2):
@ -139,7 +140,8 @@ class TestSubnetExtension(model_base.BASEV2):
value = sa.Column(sa.String(64))
subnet = orm.relationship(
models_v2.Subnet,
backref=orm.backref('extension', cascade='delete', uselist=False))
backref=orm.backref('extension', cascade='delete', uselist=False,
lazy='joined'))
class TestPortExtension(model_base.BASEV2):
@ -149,7 +151,8 @@ class TestPortExtension(model_base.BASEV2):
value = sa.Column(sa.String(64))
port = orm.relationship(
models_v2.Port,
backref=orm.backref('extension', cascade='delete', uselist=False))
backref=orm.backref('extension', cascade='delete', uselist=False,
lazy='joined'))
class TestDBExtensionDriver(TestExtensionDriverBase):

View File

@ -23,6 +23,7 @@ from oslo_utils import uuidutils
from sqlalchemy.orm import exc
from sqlalchemy.orm import query
from neutron.db import api as db_api
from neutron.db import db_base_plugin_v2
from neutron.db.models import l3 as l3_models
from neutron.db import models_v2
@ -63,7 +64,7 @@ class Ml2DBTestCase(testlib_api.SqlTestCase):
return port
def _setup_neutron_portbinding(self, port_id, vif_type, host):
with self.ctx.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(self.ctx):
self.ctx.session.add(models.PortBinding(port_id=port_id,
vif_type=vif_type,
host=host))
@ -285,7 +286,7 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase):
self.setup_coreplugin(PLUGIN_NAME)
def _setup_neutron_network(self, network_id, port_ids):
with self.ctx.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(self.ctx):
network_obj.Network(self.ctx, id=network_id).create()
ports = []
for port_id in port_ids:
@ -311,7 +312,7 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase):
def _setup_distributed_binding(self, network_id,
port_id, router_id, host_id):
with self.ctx.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(self.ctx):
record = models.DistributedPortBinding(
port_id=port_id,
host=host_id,
@ -403,7 +404,7 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase):
def test_distributed_port_binding_deleted_by_port_deletion(self):
network_id = uuidutils.generate_uuid()
network_obj.Network(self.ctx, id=network_id).create()
with self.ctx.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(self.ctx):
device_owner = constants.DEVICE_OWNER_DVR_INTERFACE
port = models_v2.Port(
id='port_id',
@ -428,7 +429,7 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase):
self.ctx.session.add(models.DistributedPortBinding(
**binding_kwarg))
with warnings.catch_warnings(record=True) as warning_list:
with self.ctx.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(self.ctx):
self.ctx.session.delete(port)
self.assertEqual([], warning_list)
ports = ml2_db.get_distributed_port_bindings(self.ctx,

View File

@ -660,7 +660,7 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
status='ACTIVE', device_id='vm_id',
device_owner=DEVICE_OWNER_COMPUTE
)
with self.context.session.begin():
with db_api.context_manager.writer.using(self.context):
self.context.session.add(port_db)
self.assertIsNone(plugin._port_provisioned('port', 'evt', 'trigger',
self.context, port_id))
@ -1404,7 +1404,7 @@ class TestMl2DvrPortsV2(TestMl2PortsV2):
{'subnet_id': s['subnet']['id']})
# lie to turn the port into an SNAT interface
with self.context.session.begin():
with db_api.context_manager.reader.using(self.context):
rp = self.context.session.query(l3_models.RouterPort).filter_by(
port_id=p['port_id']).first()
rp.port_type = constants.DEVICE_OWNER_ROUTER_SNAT
@ -2560,17 +2560,21 @@ class TestML2Segments(Ml2PluginV2TestCase):
ml2_db.subscribe()
plugin = directory.get_plugin()
with self.port(device_owner=fake_owner_compute) as port:
binding = plugin._get_port(
self.context, port['port']['id']).port_binding
binding['host'] = 'host-ovs-no_filter'
mech_context = driver_context.PortContext(
plugin, self.context, port['port'],
plugin.get_network(self.context, port['port']['network_id']),
binding, None)
plugin._bind_port_if_needed(mech_context)
segment = segments_db.get_network_segments(
self.context, port['port']['network_id'])[0]
segment['network_id'] = port['port']['network_id']
# add writer here to make sure that the following operations are
# performed in the same session
with db_api.context_manager.writer.using(self.context):
binding = plugin._get_port(
self.context, port['port']['id']).port_binding
binding['host'] = 'host-ovs-no_filter'
mech_context = driver_context.PortContext(
plugin, self.context, port['port'],
plugin.get_network(self.context,
port['port']['network_id']),
binding, None)
plugin._bind_port_if_needed(mech_context)
segment = segments_db.get_network_segments(
self.context, port['port']['network_id'])[0]
segment['network_id'] = port['port']['network_id']
self.assertRaises(c_exc.CallbackFailure, registry.notify,
resources.SEGMENT, events.BEFORE_DELETE,
mock.ANY,