Merge "New enginefacade for networks, subnets."

This commit is contained in:
Jenkins 2017-04-01 12:19:20 +00:00 committed by Gerrit Code Review
commit b8d4f81b8e
16 changed files with 184 additions and 141 deletions

View File

@ -322,14 +322,12 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
objects = []
collection = "%ss" % resource
items = request_items[collection]
context.session.begin(subtransactions=True)
try:
for item in items:
obj_creator = getattr(self, 'create_%s' % resource)
objects.append(obj_creator(context, item))
context.session.commit()
with db_api.context_manager.writer.using(context):
for item in items:
obj_creator = getattr(self, 'create_%s' % resource)
objects.append(obj_creator(context, item))
except Exception:
context.session.rollback()
with excutils.save_and_reraise_exception():
LOG.error(_LE("An exception occurred while creating "
"the %(resource)s:%(item)s"),
@ -350,7 +348,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
def create_network_db(self, context, network):
# single request processing
n = network['network']
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
args = {'tenant_id': n['tenant_id'],
'id': n.get('id') or uuidutils.generate_uuid(),
'name': n['name'],
@ -369,7 +367,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
@db_api.retry_if_session_inactive()
def update_network(self, context, id, network):
n = network['network']
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
network = self._get_network(context, id)
# validate 'shared' parameter
if 'shared' in n:
@ -408,9 +406,10 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
registry.notify(resources.NETWORK, events.BEFORE_DELETE, self,
context=context, network_id=id)
self._ensure_network_not_in_use(context, id)
auto_delete_port_ids = [p.id for p in context.session.query(
models_v2.Port.id).filter_by(network_id=id).filter(
models_v2.Port.device_owner.in_(AUTO_DELETE_PORT_OWNERS))]
with db_api.context_manager.reader.using(context):
auto_delete_port_ids = [p.id for p in context.session.query(
models_v2.Port.id).filter_by(network_id=id).filter(
models_v2.Port.device_owner.in_(AUTO_DELETE_PORT_OWNERS))]
for port_id in auto_delete_port_ids:
self.delete_port(context.elevated(), port_id)
# clean up subnets
@ -420,7 +419,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
# cleanup if a network-owned port snuck in without failing
for subnet in subnets:
self.delete_subnet(context, subnet['id'])
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
network_db = self._get_network(context, id)
network = self._make_network_dict(network_db, context=context)
registry.notify(resources.NETWORK, events.PRECOMMIT_DELETE,
@ -739,7 +738,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
raise exc.BadRequest(resource='subnets', msg=msg)
self._validate_subnet(context, s)
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
network = self._get_network(context,
subnet['subnet']['network_id'])
subnet, ipam_subnet = self.ipam.allocate_subnet(context,
@ -823,13 +822,16 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
registry.notify(resources.SUBNET_GATEWAY, events.BEFORE_UPDATE,
self, **kwargs)
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
subnet, changes = self.ipam.update_db_subnet(context, id, s,
db_pools)
# we expire here since ipam may have made changes to relationships
# that will be stale on any subsequent lookups while the subnet object
# is in the session otherwise.
context.session.expire(subnet)
# that will be stale on any subsequent lookups while the subnet
# object is in the session otherwise.
# Check if subnet attached to session before expire.
if subnet in context.session:
context.session.expire(subnet)
return self._make_subnet_dict(subnet, context=context)
@property
@ -962,7 +964,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
self._remove_subnet_ip_allocations_from_ports(context, id)
# retry integrity errors to catch ip allocation races
with db_api.exc_to_retry(sql_exc.IntegrityError), \
context.session.begin(subtransactions=True):
db_api.context_manager.writer.using(context):
subnet_db = self._get_subnet(context, id)
subnet = self._make_subnet_dict(subnet_db, context=context)
registry.notify(resources.SUBNET, events.PRECOMMIT_DELETE,
@ -1098,7 +1100,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
def update_subnetpool(self, context, id, subnetpool):
new_sp = subnetpool['subnetpool']
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
orig_sp = self._get_subnetpool(context, id=id)
updated = _update_subnetpool_dict(orig_sp, new_sp)
reader = subnet_alloc.SubnetPoolReader(updated)
@ -1161,7 +1163,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
@db_api.retry_if_session_inactive()
def delete_subnetpool(self, context, id):
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
subnetpool = self._get_subnetpool(context, id=id)
subnets = self._get_subnets_by_subnetpool(context, id)
if subnets:

View File

@ -17,6 +17,7 @@ import netaddr
import six
from sqlalchemy import func
from neutron.db import api as db_api
import neutron.db.models_v2 as mod
NETWORK_ID = 'network_id'
@ -84,6 +85,7 @@ class IpAvailabilityMixin(object):
return net_ip_availabilities
@classmethod
@db_api.context_manager.reader
def _build_network_used_ip_query(cls, context, filters):
# Generate a query to gather network/subnet/used_ips.
# Ensure query is tolerant of missing child table data (outerjoins)
@ -100,6 +102,7 @@ class IpAvailabilityMixin(object):
return cls._adjust_query_for_filters(query, filters)
@classmethod
@db_api.context_manager.reader
def _build_total_ips_query(cls, context, filters):
query = context.session.query()
query = query.add_columns(*cls.total_ips_columns)

View File

@ -18,6 +18,7 @@ from neutron._i18n import _LI
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.db import api as db_api
from neutron.db.models import segment as segments_model
LOG = logging.getLogger(__name__)
@ -37,7 +38,7 @@ def _make_segment_dict(record):
def add_network_segment(context, network_id, segment, segment_index=0,
is_dynamic=False):
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
record = segments_model.NetworkSegment(
id=uuidutils.generate_uuid(),
network_id=network_id,
@ -70,7 +71,7 @@ def get_networks_segments(context, network_ids, filter_dynamic=False):
if not network_ids:
return {}
with context.session.begin(subtransactions=True):
with db_api.context_manager.reader.using(context):
query = (context.session.query(segments_model.NetworkSegment).
filter(segments_model.NetworkSegment.network_id
.in_(network_ids)).
@ -85,7 +86,7 @@ def get_networks_segments(context, network_ids, filter_dynamic=False):
def get_segment_by_id(context, segment_id):
with context.session.begin(subtransactions=True):
with db_api.context_manager.reader.using(context):
try:
record = (context.session.query(segments_model.NetworkSegment).
filter_by(id=segment_id).
@ -98,7 +99,7 @@ def get_segment_by_id(context, segment_id):
def get_dynamic_segment(context, network_id, physical_network=None,
segmentation_id=None):
"""Return a dynamic segment for the filters provided if one exists."""
with context.session.begin(subtransactions=True):
with db_api.context_manager.reader.using(context):
query = (context.session.query(segments_model.NetworkSegment).
filter_by(network_id=network_id, is_dynamic=True))
if physical_network:
@ -122,6 +123,6 @@ def get_dynamic_segment(context, network_id, physical_network=None,
def delete_network_segment(context, segment_id):
"""Release a dynamic segment for the params provided if one exists."""
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
(context.session.query(segments_model.NetworkSegment).
filter_by(id=segment_id).delete())

View File

@ -201,6 +201,8 @@ class NeutronDbSubnet(ipam_base.Subnet):
# The only defined status at this stage is 'ALLOCATED'.
# More states will be available in the future - e.g.: RECYCLABLE
try:
# TODO(ataraday): revisit this after objects switched to
# new enginefacade
with self._context.session.begin(subtransactions=True):
# NOTE(kevinbenton): we use a subtransaction to force
# a flush here so we can capture DBReferenceErrors due

View File

@ -24,6 +24,7 @@ from oslo_utils import uuidutils
from neutron._i18n import _
from neutron.common import exceptions as n_exc
from neutron.db import api as db_api
from neutron.db import models_v2
from neutron.ipam import driver
from neutron.ipam import exceptions as ipam_exc
@ -49,9 +50,10 @@ class SubnetAllocator(driver.Pool):
subnetpool, it's required to ensure non-overlapping cidrs in the same
subnetpool.
"""
current_hash = (self._context.session.query(models_v2.SubnetPool.hash)
.filter_by(id=self._subnetpool['id']).scalar())
with db_api.context_manager.reader.using(self._context):
current_hash = (
self._context.session.query(models_v2.SubnetPool.hash)
.filter_by(id=self._subnetpool['id']).scalar())
if current_hash is None:
# NOTE(cbrandily): subnetpool has been deleted
raise n_exc.SubnetPoolNotFound(
@ -61,17 +63,21 @@ class SubnetAllocator(driver.Pool):
# NOTE(cbrandily): the update disallows 2 concurrent subnet allocation
# to succeed: at most 1 transaction will succeed, others will be
# rolled back and be caught in neutron.db.v2.base
query = self._context.session.query(models_v2.SubnetPool).filter_by(
id=self._subnetpool['id'], hash=current_hash)
count = query.update({'hash': new_hash})
with db_api.context_manager.writer.using(self._context):
query = (
self._context.session.query(models_v2.SubnetPool).filter_by(
id=self._subnetpool['id'], hash=current_hash))
count = query.update({'hash': new_hash})
if not count:
raise db_exc.RetryRequest(lib_exc.SubnetPoolInUse(
subnet_pool_id=self._subnetpool['id']))
def _get_allocated_cidrs(self):
query = self._context.session.query(models_v2.Subnet)
subnets = query.filter_by(subnetpool_id=self._subnetpool['id'])
return (x.cidr for x in subnets)
with db_api.context_manager.reader.using(self._context):
query = self._context.session.query(models_v2.Subnet)
subnets = query.filter_by(subnetpool_id=self._subnetpool['id'])
return (x.cidr for x in subnets)
def _get_available_prefix_list(self):
prefixes = (x.cidr for x in self._subnetpool.prefixes)
@ -90,7 +96,7 @@ class SubnetAllocator(driver.Pool):
def _allocations_used_by_tenant(self, quota_unit):
subnetpool_id = self._subnetpool['id']
tenant_id = self._subnetpool['tenant_id']
with self._context.session.begin(subtransactions=True):
with db_api.context_manager.reader.using(self._context):
qry = self._context.session.query(models_v2.Subnet)
allocations = qry.filter_by(subnetpool_id=subnetpool_id,
tenant_id=tenant_id)
@ -115,7 +121,7 @@ class SubnetAllocator(driver.Pool):
raise n_exc.SubnetPoolQuotaExceeded()
def _allocate_any_subnet(self, request):
with self._context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(self._context):
self._lock_subnetpool()
self._check_subnetpool_tenant_quota(request.tenant_id,
request.prefixlen)
@ -139,7 +145,7 @@ class SubnetAllocator(driver.Pool):
str(request.prefixlen))
def _allocate_specific_subnet(self, request):
with self._context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(self._context):
self._lock_subnetpool()
self._check_subnetpool_tenant_quota(request.tenant_id,
request.prefixlen)

View File

@ -27,6 +27,7 @@ from neutron._i18n import _, _LE
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.db import api as db_api
from neutron.db.models import securitygroup as sg_models
from neutron.db import models_v2
from neutron.extensions import portbindings
@ -40,13 +41,13 @@ LOG = log.getLogger(__name__)
MAX_PORTS_PER_QUERY = 500
@db_api.context_manager.writer
def add_port_binding(context, port_id):
with context.session.begin(subtransactions=True):
record = models.PortBinding(
port_id=port_id,
vif_type=portbindings.VIF_TYPE_UNBOUND)
context.session.add(record)
return record
record = models.PortBinding(
port_id=port_id,
vif_type=portbindings.VIF_TYPE_UNBOUND)
context.session.add(record)
return record
@removals.remove(
@ -77,6 +78,7 @@ def get_locked_port_and_binding(context, port_id):
return None, None
@db_api.context_manager.writer
def set_binding_levels(context, levels):
if levels:
for level in levels:
@ -90,6 +92,7 @@ def set_binding_levels(context, levels):
LOG.debug("Attempted to set empty binding levels")
@db_api.context_manager.reader
def get_binding_levels(context, port_id, host):
if host:
result = (context.session.query(models.PortBindingLevel).
@ -104,6 +107,7 @@ def get_binding_levels(context, port_id, host):
return result
@db_api.context_manager.writer
def clear_binding_levels(context, port_id, host):
if host:
(context.session.query(models.PortBindingLevel).
@ -116,13 +120,14 @@ def clear_binding_levels(context, port_id, host):
def ensure_distributed_port_binding(context, port_id, host, router_id=None):
record = (context.session.query(models.DistributedPortBinding).
filter_by(port_id=port_id, host=host).first())
with db_api.context_manager.reader.using(context):
record = (context.session.query(models.DistributedPortBinding).
filter_by(port_id=port_id, host=host).first())
if record:
return record
try:
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
record = models.DistributedPortBinding(
port_id=port_id,
host=host,
@ -134,13 +139,14 @@ def ensure_distributed_port_binding(context, port_id, host, router_id=None):
return record
except db_exc.DBDuplicateEntry:
LOG.debug("Distributed Port %s already bound", port_id)
return (context.session.query(models.DistributedPortBinding).
filter_by(port_id=port_id, host=host).one())
with db_api.context_manager.reader.using(context):
return (context.session.query(models.DistributedPortBinding).
filter_by(port_id=port_id, host=host).one())
def delete_distributed_port_binding_if_stale(context, binding):
if not binding.router_id and binding.status == n_const.PORT_STATUS_DOWN:
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
LOG.debug("Distributed port: Deleting binding %s", binding)
context.session.delete(binding)
@ -148,10 +154,12 @@ def delete_distributed_port_binding_if_stale(context, binding):
def get_port(context, port_id):
"""Get port record for update within transaction."""
with context.session.begin(subtransactions=True):
with db_api.context_manager.reader.using(context):
try:
# Set enable_eagerloads to True, so that lazy load can be
# proceed later.
record = (context.session.query(models_v2.Port).
enable_eagerloads(False).
enable_eagerloads(True).
filter(models_v2.Port.id.startswith(port_id)).
one())
return record
@ -163,6 +171,7 @@ def get_port(context, port_id):
return
@db_api.context_manager.reader
def get_port_from_device_mac(context, device_mac):
LOG.debug("get_port_from_device_mac() called for mac %s", device_mac)
ports = port_obj.Port.get_objects(context, mac_address=device_mac)
@ -194,7 +203,7 @@ def get_sg_ids_grouped_by_port(context, port_ids):
sg_ids_grouped_by_port = {}
sg_binding_port = sg_models.SecurityGroupPortBinding.port_id
with context.session.begin(subtransactions=True):
with db_api.context_manager.reader.using(context):
# partial UUIDs must be individually matched with startswith.
# full UUIDs may be matched directly in an IN statement
partial_uuids = set(port_id for port_id in port_ids
@ -233,7 +242,7 @@ def make_port_dict_with_security_groups(port, sec_groups):
def get_port_binding_host(context, port_id):
try:
with context.session.begin(subtransactions=True):
with db_api.context_manager.reader.using(context):
query = (context.session.query(models.PortBinding).
filter(models.PortBinding.port_id.startswith(port_id)).
one())
@ -248,6 +257,7 @@ def get_port_binding_host(context, port_id):
return query.host
@db_api.context_manager.reader
def generate_distributed_port_status(context, port_id):
# an OR'ed value of status assigned to parent port from the
# distributedportbinding bucket
@ -262,7 +272,7 @@ def generate_distributed_port_status(context, port_id):
def get_distributed_port_binding_by_host(context, port_id, host):
with context.session.begin(subtransactions=True):
with db_api.context_manager.reader.using(context):
binding = (context.session.query(models.DistributedPortBinding).
filter(models.DistributedPortBinding.port_id.startswith(port_id),
models.DistributedPortBinding.host == host).first())
@ -273,7 +283,7 @@ def get_distributed_port_binding_by_host(context, port_id, host):
def get_distributed_port_bindings(context, port_id):
with context.session.begin(subtransactions=True):
with db_api.context_manager.reader.using(context):
bindings = (context.session.query(models.DistributedPortBinding).
filter(models.DistributedPortBinding.port_id.startswith(
port_id)).all())
@ -282,6 +292,7 @@ def get_distributed_port_bindings(context, port_id):
return bindings
@db_api.context_manager.reader
def is_dhcp_active_on_any_subnet(context, subnet_ids):
if not subnet_ids:
return False
@ -297,13 +308,15 @@ def _prevent_segment_delete_with_port_bound(resource, event, trigger,
if for_net_delete:
# don't check for network deletes
return
segment_id = segment['id']
query = context.session.query(models_v2.Port)
query = query.join(
models.PortBindingLevel,
models.PortBindingLevel.port_id == models_v2.Port.id)
query = query.filter(models.PortBindingLevel.segment_id == segment_id)
port_ids = [p.id for p in query]
with db_api.context_manager.reader.using(context):
segment_id = segment['id']
query = context.session.query(models_v2.Port)
query = query.join(
models.PortBindingLevel,
models.PortBindingLevel.port_id == models_v2.Port.id)
query = query.filter(models.PortBindingLevel.segment_id == segment_id)
port_ids = [p.id for p in query]
# There are still some ports in the segment, segment should not be deleted
# TODO(xiaohhui): Should we delete the dhcp port automatically here?

View File

@ -190,8 +190,7 @@ class TypeManager(stevedore.named.NamedExtensionManager):
def create_network_segments(self, context, network, tenant_id):
"""Call type drivers to create network segments."""
segments = self._process_provider_create(network)
session = context.session
with session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
network_id = network['id']
if segments:
for segment_index, segment in enumerate(segments):
@ -224,7 +223,7 @@ class TypeManager(stevedore.named.NamedExtensionManager):
self.validate_provider_segment(segment)
# Reserve segment in type driver
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
return self.reserve_provider_segment(context, segment)
def is_partial_segment(self, segment):

View File

@ -434,7 +434,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# After we've attempted to bind the port, we begin a
# transaction, get the current port state, and decide whether
# to commit the binding results.
with plugin_context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(plugin_context):
# Get the current port state and build a new PortContext
# reflecting this state as original state for subsequent
# mechanism driver update_port_*commit() calls.
@ -595,17 +595,24 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
attributes.SUBNETS, ['_ml2_md_extend_subnet_dict'])
def _ml2_md_extend_network_dict(self, result, netdb):
session = sqlalchemy.inspect(netdb).session
session = self._object_session_or_new_session(netdb)
self.extension_manager.extend_network_dict(session, netdb, result)
def _ml2_md_extend_port_dict(self, result, portdb):
session = sqlalchemy.inspect(portdb).session
session = self._object_session_or_new_session(portdb)
self.extension_manager.extend_port_dict(session, portdb, result)
def _ml2_md_extend_subnet_dict(self, result, subnetdb):
session = sqlalchemy.inspect(subnetdb).session
session = self._object_session_or_new_session(subnetdb)
self.extension_manager.extend_subnet_dict(session, subnetdb, result)
@staticmethod
def _object_session_or_new_session(sql_obj):
session = sqlalchemy.inspect(sql_obj).session
if not session:
session = db_api.get_reader_session()
return session
# Note - The following hook methods have "ml2" in their names so
# that they are not called twice during unit tests due to global
# registration of hooks in portbindings_db.py used by other
@ -657,7 +664,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
objects = []
collection = "%ss" % resource
items = request_items[collection]
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
obj_creator = getattr(self, '_create_%s_db' % resource)
for item in items:
try:
@ -734,10 +741,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def _create_network_db(self, context, network):
net_data = network[attributes.NETWORK]
tenant_id = net_data['tenant_id']
session = context.session
registry.notify(resources.NETWORK, events.BEFORE_CREATE, self,
context=context, network=net_data)
with session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
net_db = self.create_network_db(context, network)
result = self._make_network_dict(net_db, process_extensions=False,
context=context)
@ -767,7 +773,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
net_db[az_ext.AZ_HINTS] = az_hints
result[az_ext.AZ_HINTS] = az_hints
self._apply_dict_extend_functions('networks', result, net_db)
self._apply_dict_extend_functions('networks', result, net_db)
return result, mech_context
@utils.transaction_guard
@ -798,8 +804,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
net_data = network[attributes.NETWORK]
provider._raise_if_updates_provider_attributes(net_data)
session = context.session
with session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
original_network = super(Ml2Plugin, self).get_network(context, id)
updated_network = super(Ml2Plugin, self).update_network(context,
id,
@ -842,8 +847,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
@db_api.retry_if_session_inactive()
def get_network(self, context, id, fields=None):
session = context.session
with session.begin(subtransactions=True):
with db_api.context_manager.reader.using(context):
result = super(Ml2Plugin, self).get_network(context, id, None)
self.type_manager.extend_network_dict_provider(context, result)
result[api.MTU] = self._get_network_mtu(result)
@ -853,8 +857,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
@db_api.retry_if_session_inactive()
def get_networks(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None, page_reverse=False):
session = context.session
with session.begin(subtransactions=True):
with db_api.context_manager.reader.using(context):
nets = super(Ml2Plugin,
self).get_networks(context, filters, None, sorts,
limit, marker, page_reverse)
@ -901,8 +904,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self.notifier.network_delete(context, network['id'])
def _create_subnet_db(self, context, subnet):
session = context.session
with session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
result, net_db, ipam_sub = self._create_subnet_precommit(
context, subnet)
self.extension_manager.process_create_subnet(
@ -913,6 +915,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
mech_context = driver_context.SubnetContext(self, context,
result, network)
self.mechanism_manager.create_subnet_precommit(mech_context)
# TODO(ataraday): temporary flush until this will be available
# in oslo.db (change https://review.openstack.org/#/c/433758/
# got merged)
context.session.flush()
# db base plugin post commit ops
self._create_subnet_postcommit(context, result, net_db, ipam_sub)
@ -942,8 +949,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
@utils.transaction_guard
@db_api.retry_if_session_inactive()
def update_subnet(self, context, id, subnet):
session = context.session
with session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
original_subnet = self.get_subnet(context, id)
updated_subnet = self._update_subnet_precommit(
context, id, subnet)
@ -1043,8 +1049,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
registry.notify(resources.PORT, events.BEFORE_CREATE, self,
context=context, port=attrs)
session = context.session
with session.begin(subtransactions=True):
# NOTE(kevinbenton): triggered outside of transaction since it
# emits 'AFTER' events if it creates.
self._ensure_default_security_group(context, attrs['tenant_id'])
with db_api.context_manager.writer.using(context):
dhcp_opts = attrs.get(edo_ext.EXTRADHCPOPTS, [])
port_db = self.create_port_db(context, port)
result = self._make_port_dict(port_db, process_extensions=False)
@ -1181,10 +1189,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def update_port(self, context, id, port):
attrs = port[attributes.PORT]
need_port_update_notify = False
session = context.session
bound_mech_contexts = []
with session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
port_db = self._get_port(context, id)
binding = port_db.port_binding
if not binding:
@ -1344,7 +1350,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
LOG.error(_LE("No Host supplied to bind DVR Port %s"), id)
return
session = context.session
binding = db.get_distributed_port_binding_by_host(context,
id, host)
device_id = attrs and attrs.get('device_id')
@ -1354,7 +1359,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
router_id != device_id)
if update_required:
try:
with session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
orig_port = self.get_port(context, id)
if not binding:
binding = db.ensure_distributed_port_binding(
@ -1399,8 +1404,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
router_ids = []
l3plugin = directory.get_plugin(const.L3)
session = context.session
with session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
try:
port_db = self._get_port(context, id)
binding = port_db.port_binding
@ -1467,8 +1471,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
@db_api.retry_if_session_inactive(context_var_name='plugin_context')
def get_bound_port_context(self, plugin_context, port_id, host=None,
cached_networks=None):
session = plugin_context.session
with session.begin(subtransactions=True):
with db_api.context_manager.reader.using(plugin_context) as session:
try:
port_db = (session.query(models_v2.Port).
enable_eagerloads(False).
@ -1528,8 +1531,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
one was already performed by the caller.
"""
updated = False
session = context.session
with session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
port = db.get_port(context, port_id)
if not port:
LOG.debug("Port %(port)s update to %(val)s by agent not found",
@ -1561,7 +1563,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
if (updated and
port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE):
with session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
port = db.get_port(context, port_id)
if not port:
LOG.warning(_LW("Port %s not found during update"),

View File

@ -77,7 +77,7 @@ class SegmentDbMixin(common_db_mixin.CommonDbMixin):
return self._make_segment_dict(new_segment)
def _create_segment_db(self, context, segment_id, segment):
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
network_id = segment['network_id']
physical_network = segment[extension.PHYSICAL_NETWORK]
if physical_network == constants.ATTR_NOT_SPECIFIED:
@ -125,7 +125,7 @@ class SegmentDbMixin(common_db_mixin.CommonDbMixin):
def update_segment(self, context, uuid, segment):
"""Update an existing segment."""
segment = segment['segment']
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
curr_segment = self._get_segment(context, uuid)
curr_segment.update(segment)
return self._make_segment_dict(curr_segment)
@ -175,7 +175,7 @@ class SegmentDbMixin(common_db_mixin.CommonDbMixin):
segment=segment, for_net_delete=for_net_delete)
# Delete segment in DB
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
query = self._model_query(context, segment_model.NetworkSegment)
query = query.filter(segment_model.NetworkSegment.id == uuid)
if 0 == query.delete():
@ -191,7 +191,7 @@ class SegmentDbMixin(common_db_mixin.CommonDbMixin):
def update_segment_host_mapping(context, host, current_segment_ids):
with context.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(context):
segment_host_mapping = network.SegmentHostMapping.get_objects(
context, host=host)
previous_segment_ids = {
@ -241,7 +241,7 @@ def get_segments_with_phys_nets(context, phys_nets):
if not phys_nets:
return []
with context.session.begin(subtransactions=True):
with db_api.context_manager.reader.using(context):
segments = context.session.query(segment_model.NetworkSegment).filter(
segment_model.NetworkSegment.physical_network.in_(phys_nets))
return segments
@ -249,7 +249,7 @@ def get_segments_with_phys_nets(context, phys_nets):
def map_segment_to_hosts(context, segment_id, hosts):
"""Map segment to a collection of hosts."""
with db_api.autonested_transaction(context.session):
with db_api.context_manager.writer.using(context):
for host in hosts:
network.SegmentHostMapping(
context, segment_id=segment_id, host=host).create()

View File

@ -30,6 +30,7 @@ from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.common import exceptions as n_exc
from neutron.db import _resource_extend as resource_extend
from neutron.db import api as db_api
from neutron.db.models import segment as segment_model
from neutron.db import models_v2
from neutron.extensions import ip_allocation
@ -102,14 +103,17 @@ class Plugin(db.SegmentDbMixin, segment.SegmentPluginBase):
if for_net_delete:
# don't check if this is a part of a network delete operation
return
segment_id = segment['id']
query = context.session.query(models_v2.Subnet.id)
query = query.filter(models_v2.Subnet.segment_id == segment_id)
subnet_ids = [s[0] for s in query]
with db_api.context_manager.reader.using(context):
segment_id = segment['id']
query = context.session.query(models_v2.Subnet.id)
query = query.filter(models_v2.Subnet.segment_id == segment_id)
subnet_ids = [s[0] for s in query]
if subnet_ids:
reason = _("The segment is still associated with subnet(s) "
"%s") % ", ".join(subnet_ids)
raise exceptions.SegmentInUse(segment_id=segment_id, reason=reason)
raise exceptions.SegmentInUse(segment_id=segment_id,
reason=reason)
class Event(object):

View File

@ -6295,7 +6295,7 @@ class DbModelMixin(object):
class DbModelTenantTestCase(DbModelMixin, testlib_api.SqlTestCase):
def _make_network(self, ctx):
with ctx.session.begin():
with db_api.context_manager.writer.using(ctx):
network = models_v2.Network(name="net_net", status="OK",
tenant_id='dbcheck',
admin_state_up=True)
@ -6303,7 +6303,7 @@ class DbModelTenantTestCase(DbModelMixin, testlib_api.SqlTestCase):
return network
def _make_subnet(self, ctx, network_id):
with ctx.session.begin():
with db_api.context_manager.writer.using(ctx):
subnet = models_v2.Subnet(name="subsub", ip_version=4,
tenant_id='dbcheck',
cidr='turn_down_for_what',
@ -6321,7 +6321,7 @@ class DbModelTenantTestCase(DbModelMixin, testlib_api.SqlTestCase):
return port
def _make_subnetpool(self, ctx):
with ctx.session.begin():
with db_api.context_manager.writer.using(ctx):
subnetpool = models_v2.SubnetPool(
ip_version=4, default_prefixlen=4, min_prefixlen=4,
max_prefixlen=4, shared=False, default_quota=4,
@ -6334,7 +6334,7 @@ class DbModelTenantTestCase(DbModelMixin, testlib_api.SqlTestCase):
class DbModelProjectTestCase(DbModelMixin, testlib_api.SqlTestCase):
def _make_network(self, ctx):
with ctx.session.begin():
with db_api.context_manager.writer.using(ctx):
network = models_v2.Network(name="net_net", status="OK",
project_id='dbcheck',
admin_state_up=True)
@ -6342,7 +6342,7 @@ class DbModelProjectTestCase(DbModelMixin, testlib_api.SqlTestCase):
return network
def _make_subnet(self, ctx, network_id):
with ctx.session.begin():
with db_api.context_manager.writer.using(ctx):
subnet = models_v2.Subnet(name="subsub", ip_version=4,
project_id='dbcheck',
cidr='turn_down_for_what',
@ -6360,7 +6360,7 @@ class DbModelProjectTestCase(DbModelMixin, testlib_api.SqlTestCase):
return port
def _make_subnetpool(self, ctx):
with ctx.session.begin():
with db_api.context_manager.writer.using(ctx):
subnetpool = models_v2.SubnetPool(
ip_version=4, default_prefixlen=4, min_prefixlen=4,
max_prefixlen=4, shared=False, default_quota=4,

View File

@ -810,7 +810,9 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
RuntimeError,
self.mixin.add_router_interface,
self.ctx, router['id'], {'port_id': port['port']['id']})
# expire since we are re-using the session which might have stale
# ports in it
self.ctx.session.expire_all()
port_info = self.core_plugin.get_port(self.ctx, port['port']['id'])
self.assertEqual(port_dict['device_id'], port_info['device_id'])
self.assertEqual(port_dict['device_owner'],

View File

@ -23,6 +23,7 @@ from oslo_db import exception as db_exc
from oslo_utils import uuidutils
from neutron.common import exceptions as n_exc
from neutron.db import api as db_api
from neutron.ipam import requests as ipam_req
from neutron.ipam import subnet_alloc
from neutron.tests.unit.db import test_db_base_plugin_v2
@ -64,7 +65,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp',
prefix_list, 21, 4)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
with self.ctx.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(self.ctx):
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam_req.AnySubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
@ -80,7 +81,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp',
['10.1.0.0/16', '192.168.1.0/24'],
21, 4)
with self.ctx.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(self.ctx):
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam_req.SpecificSubnetRequest(self._tenant_id,
@ -122,7 +123,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
['10.1.0.0/16', '192.168.1.0/24'],
21, 4)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
with self.ctx.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(self.ctx):
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam_req.AnySubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
@ -137,7 +138,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
['10.1.0.0/16', '192.168.1.0/24'],
21, 4)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
with self.ctx.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(self.ctx):
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam_req.SpecificSubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
@ -154,7 +155,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
['2210::/64'],
64, 6)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
with self.ctx.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(self.ctx):
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam_req.SpecificSubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),

View File

@ -129,7 +129,8 @@ class TestNetworkExtension(model_base.BASEV2):
value = sa.Column(sa.String(64))
network = orm.relationship(
models_v2.Network,
backref=orm.backref('extension', cascade='delete', uselist=False))
backref=orm.backref('extension', cascade='delete', uselist=False,
lazy='joined'))
class TestSubnetExtension(model_base.BASEV2):
@ -139,7 +140,8 @@ class TestSubnetExtension(model_base.BASEV2):
value = sa.Column(sa.String(64))
subnet = orm.relationship(
models_v2.Subnet,
backref=orm.backref('extension', cascade='delete', uselist=False))
backref=orm.backref('extension', cascade='delete', uselist=False,
lazy='joined'))
class TestPortExtension(model_base.BASEV2):
@ -149,7 +151,8 @@ class TestPortExtension(model_base.BASEV2):
value = sa.Column(sa.String(64))
port = orm.relationship(
models_v2.Port,
backref=orm.backref('extension', cascade='delete', uselist=False))
backref=orm.backref('extension', cascade='delete', uselist=False,
lazy='joined'))
class TestDBExtensionDriver(TestExtensionDriverBase):

View File

@ -23,6 +23,7 @@ from oslo_utils import uuidutils
from sqlalchemy.orm import exc
from sqlalchemy.orm import query
from neutron.db import api as db_api
from neutron.db import db_base_plugin_v2
from neutron.db.models import l3 as l3_models
from neutron.db import models_v2
@ -63,7 +64,7 @@ class Ml2DBTestCase(testlib_api.SqlTestCase):
return port
def _setup_neutron_portbinding(self, port_id, vif_type, host):
with self.ctx.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(self.ctx):
self.ctx.session.add(models.PortBinding(port_id=port_id,
vif_type=vif_type,
host=host))
@ -285,7 +286,7 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase):
self.setup_coreplugin(PLUGIN_NAME)
def _setup_neutron_network(self, network_id, port_ids):
with self.ctx.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(self.ctx):
network_obj.Network(self.ctx, id=network_id).create()
ports = []
for port_id in port_ids:
@ -311,7 +312,7 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase):
def _setup_distributed_binding(self, network_id,
port_id, router_id, host_id):
with self.ctx.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(self.ctx):
record = models.DistributedPortBinding(
port_id=port_id,
host=host_id,
@ -403,7 +404,7 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase):
def test_distributed_port_binding_deleted_by_port_deletion(self):
network_id = uuidutils.generate_uuid()
network_obj.Network(self.ctx, id=network_id).create()
with self.ctx.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(self.ctx):
device_owner = constants.DEVICE_OWNER_DVR_INTERFACE
port = models_v2.Port(
id='port_id',
@ -428,7 +429,7 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase):
self.ctx.session.add(models.DistributedPortBinding(
**binding_kwarg))
with warnings.catch_warnings(record=True) as warning_list:
with self.ctx.session.begin(subtransactions=True):
with db_api.context_manager.writer.using(self.ctx):
self.ctx.session.delete(port)
self.assertEqual([], warning_list)
ports = ml2_db.get_distributed_port_bindings(self.ctx,

View File

@ -660,7 +660,7 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
status='ACTIVE', device_id='vm_id',
device_owner=DEVICE_OWNER_COMPUTE
)
with self.context.session.begin():
with db_api.context_manager.writer.using(self.context):
self.context.session.add(port_db)
self.assertIsNone(plugin._port_provisioned('port', 'evt', 'trigger',
self.context, port_id))
@ -1404,7 +1404,7 @@ class TestMl2DvrPortsV2(TestMl2PortsV2):
{'subnet_id': s['subnet']['id']})
# lie to turn the port into an SNAT interface
with self.context.session.begin():
with db_api.context_manager.reader.using(self.context):
rp = self.context.session.query(l3_models.RouterPort).filter_by(
port_id=p['port_id']).first()
rp.port_type = constants.DEVICE_OWNER_ROUTER_SNAT
@ -2560,17 +2560,21 @@ class TestML2Segments(Ml2PluginV2TestCase):
ml2_db.subscribe()
plugin = directory.get_plugin()
with self.port(device_owner=fake_owner_compute) as port:
binding = plugin._get_port(
self.context, port['port']['id']).port_binding
binding['host'] = 'host-ovs-no_filter'
mech_context = driver_context.PortContext(
plugin, self.context, port['port'],
plugin.get_network(self.context, port['port']['network_id']),
binding, None)
plugin._bind_port_if_needed(mech_context)
segment = segments_db.get_network_segments(
self.context, port['port']['network_id'])[0]
segment['network_id'] = port['port']['network_id']
# add writer here to make sure that the following operations are
# performed in the same session
with db_api.context_manager.writer.using(self.context):
binding = plugin._get_port(
self.context, port['port']['id']).port_binding
binding['host'] = 'host-ovs-no_filter'
mech_context = driver_context.PortContext(
plugin, self.context, port['port'],
plugin.get_network(self.context,
port['port']['network_id']),
binding, None)
plugin._bind_port_if_needed(mech_context)
segment = segments_db.get_network_segments(
self.context, port['port']['network_id'])[0]
segment['network_id'] = port['port']['network_id']
self.assertRaises(c_exc.CallbackFailure, registry.notify,
resources.SEGMENT, events.BEFORE_DELETE,
mock.ANY,