diff --git a/neutron/db/db_base_plugin_v2.py b/neutron/db/db_base_plugin_v2.py index 3bb8dee3e6f..e36c1228c47 100644 --- a/neutron/db/db_base_plugin_v2.py +++ b/neutron/db/db_base_plugin_v2.py @@ -322,14 +322,12 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon, objects = [] collection = "%ss" % resource items = request_items[collection] - context.session.begin(subtransactions=True) try: - for item in items: - obj_creator = getattr(self, 'create_%s' % resource) - objects.append(obj_creator(context, item)) - context.session.commit() + with db_api.context_manager.writer.using(context): + for item in items: + obj_creator = getattr(self, 'create_%s' % resource) + objects.append(obj_creator(context, item)) except Exception: - context.session.rollback() with excutils.save_and_reraise_exception(): LOG.error(_LE("An exception occurred while creating " "the %(resource)s:%(item)s"), @@ -350,7 +348,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon, def create_network_db(self, context, network): # single request processing n = network['network'] - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): args = {'tenant_id': n['tenant_id'], 'id': n.get('id') or uuidutils.generate_uuid(), 'name': n['name'], @@ -369,7 +367,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon, @db_api.retry_if_session_inactive() def update_network(self, context, id, network): n = network['network'] - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): network = self._get_network(context, id) # validate 'shared' parameter if 'shared' in n: @@ -408,9 +406,10 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon, registry.notify(resources.NETWORK, events.BEFORE_DELETE, self, context=context, network_id=id) self._ensure_network_not_in_use(context, id) - auto_delete_port_ids = [p.id for p in context.session.query( - models_v2.Port.id).filter_by(network_id=id).filter( - models_v2.Port.device_owner.in_(AUTO_DELETE_PORT_OWNERS))] + with db_api.context_manager.reader.using(context): + auto_delete_port_ids = [p.id for p in context.session.query( + models_v2.Port.id).filter_by(network_id=id).filter( + models_v2.Port.device_owner.in_(AUTO_DELETE_PORT_OWNERS))] for port_id in auto_delete_port_ids: self.delete_port(context.elevated(), port_id) # clean up subnets @@ -420,7 +419,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon, # cleanup if a network-owned port snuck in without failing for subnet in subnets: self.delete_subnet(context, subnet['id']) - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): network_db = self._get_network(context, id) network = self._make_network_dict(network_db, context=context) registry.notify(resources.NETWORK, events.PRECOMMIT_DELETE, @@ -739,7 +738,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon, raise exc.BadRequest(resource='subnets', msg=msg) self._validate_subnet(context, s) - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): network = self._get_network(context, subnet['subnet']['network_id']) subnet, ipam_subnet = self.ipam.allocate_subnet(context, @@ -823,13 +822,16 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon, registry.notify(resources.SUBNET_GATEWAY, events.BEFORE_UPDATE, self, **kwargs) - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): subnet, changes = self.ipam.update_db_subnet(context, id, s, db_pools) # we expire here since ipam may have made changes to relationships - # that will be stale on any subsequent lookups while the subnet object - # is in the session otherwise. - context.session.expire(subnet) + # that will be stale on any subsequent lookups while the subnet + # object is in the session otherwise. + # Check if subnet attached to session before expire. + if subnet in context.session: + context.session.expire(subnet) + return self._make_subnet_dict(subnet, context=context) @property @@ -962,7 +964,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon, self._remove_subnet_ip_allocations_from_ports(context, id) # retry integrity errors to catch ip allocation races with db_api.exc_to_retry(sql_exc.IntegrityError), \ - context.session.begin(subtransactions=True): + db_api.context_manager.writer.using(context): subnet_db = self._get_subnet(context, id) subnet = self._make_subnet_dict(subnet_db, context=context) registry.notify(resources.SUBNET, events.PRECOMMIT_DELETE, @@ -1098,7 +1100,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon, def update_subnetpool(self, context, id, subnetpool): new_sp = subnetpool['subnetpool'] - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): orig_sp = self._get_subnetpool(context, id=id) updated = _update_subnetpool_dict(orig_sp, new_sp) reader = subnet_alloc.SubnetPoolReader(updated) @@ -1161,7 +1163,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon, @db_api.retry_if_session_inactive() def delete_subnetpool(self, context, id): - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): subnetpool = self._get_subnetpool(context, id=id) subnets = self._get_subnets_by_subnetpool(context, id) if subnets: diff --git a/neutron/db/network_ip_availability_db.py b/neutron/db/network_ip_availability_db.py index 4855be2c9b2..10c067f82b4 100644 --- a/neutron/db/network_ip_availability_db.py +++ b/neutron/db/network_ip_availability_db.py @@ -17,6 +17,7 @@ import netaddr import six from sqlalchemy import func +from neutron.db import api as db_api import neutron.db.models_v2 as mod NETWORK_ID = 'network_id' @@ -84,6 +85,7 @@ class IpAvailabilityMixin(object): return net_ip_availabilities @classmethod + @db_api.context_manager.reader def _build_network_used_ip_query(cls, context, filters): # Generate a query to gather network/subnet/used_ips. # Ensure query is tolerant of missing child table data (outerjoins) @@ -100,6 +102,7 @@ class IpAvailabilityMixin(object): return cls._adjust_query_for_filters(query, filters) @classmethod + @db_api.context_manager.reader def _build_total_ips_query(cls, context, filters): query = context.session.query() query = query.add_columns(*cls.total_ips_columns) diff --git a/neutron/db/segments_db.py b/neutron/db/segments_db.py index b1745d8163f..c6dd3d8b9df 100644 --- a/neutron/db/segments_db.py +++ b/neutron/db/segments_db.py @@ -18,6 +18,7 @@ from neutron._i18n import _LI from neutron.callbacks import events from neutron.callbacks import registry from neutron.callbacks import resources +from neutron.db import api as db_api from neutron.db.models import segment as segments_model LOG = logging.getLogger(__name__) @@ -37,7 +38,7 @@ def _make_segment_dict(record): def add_network_segment(context, network_id, segment, segment_index=0, is_dynamic=False): - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): record = segments_model.NetworkSegment( id=uuidutils.generate_uuid(), network_id=network_id, @@ -70,7 +71,7 @@ def get_networks_segments(context, network_ids, filter_dynamic=False): if not network_ids: return {} - with context.session.begin(subtransactions=True): + with db_api.context_manager.reader.using(context): query = (context.session.query(segments_model.NetworkSegment). filter(segments_model.NetworkSegment.network_id .in_(network_ids)). @@ -85,7 +86,7 @@ def get_networks_segments(context, network_ids, filter_dynamic=False): def get_segment_by_id(context, segment_id): - with context.session.begin(subtransactions=True): + with db_api.context_manager.reader.using(context): try: record = (context.session.query(segments_model.NetworkSegment). filter_by(id=segment_id). @@ -98,7 +99,7 @@ def get_segment_by_id(context, segment_id): def get_dynamic_segment(context, network_id, physical_network=None, segmentation_id=None): """Return a dynamic segment for the filters provided if one exists.""" - with context.session.begin(subtransactions=True): + with db_api.context_manager.reader.using(context): query = (context.session.query(segments_model.NetworkSegment). filter_by(network_id=network_id, is_dynamic=True)) if physical_network: @@ -122,6 +123,6 @@ def get_dynamic_segment(context, network_id, physical_network=None, def delete_network_segment(context, segment_id): """Release a dynamic segment for the params provided if one exists.""" - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): (context.session.query(segments_model.NetworkSegment). filter_by(id=segment_id).delete()) diff --git a/neutron/ipam/drivers/neutrondb_ipam/driver.py b/neutron/ipam/drivers/neutrondb_ipam/driver.py index 4e6f3884827..c969c3c305e 100644 --- a/neutron/ipam/drivers/neutrondb_ipam/driver.py +++ b/neutron/ipam/drivers/neutrondb_ipam/driver.py @@ -201,6 +201,8 @@ class NeutronDbSubnet(ipam_base.Subnet): # The only defined status at this stage is 'ALLOCATED'. # More states will be available in the future - e.g.: RECYCLABLE try: + # TODO(ataraday): revisit this after objects switched to + # new enginefacade with self._context.session.begin(subtransactions=True): # NOTE(kevinbenton): we use a subtransaction to force # a flush here so we can capture DBReferenceErrors due diff --git a/neutron/ipam/subnet_alloc.py b/neutron/ipam/subnet_alloc.py index 00bb4b3e98b..6be42014241 100644 --- a/neutron/ipam/subnet_alloc.py +++ b/neutron/ipam/subnet_alloc.py @@ -24,6 +24,7 @@ from oslo_utils import uuidutils from neutron._i18n import _ from neutron.common import exceptions as n_exc +from neutron.db import api as db_api from neutron.db import models_v2 from neutron.ipam import driver from neutron.ipam import exceptions as ipam_exc @@ -49,9 +50,10 @@ class SubnetAllocator(driver.Pool): subnetpool, it's required to ensure non-overlapping cidrs in the same subnetpool. """ - - current_hash = (self._context.session.query(models_v2.SubnetPool.hash) - .filter_by(id=self._subnetpool['id']).scalar()) + with db_api.context_manager.reader.using(self._context): + current_hash = ( + self._context.session.query(models_v2.SubnetPool.hash) + .filter_by(id=self._subnetpool['id']).scalar()) if current_hash is None: # NOTE(cbrandily): subnetpool has been deleted raise n_exc.SubnetPoolNotFound( @@ -61,17 +63,21 @@ class SubnetAllocator(driver.Pool): # NOTE(cbrandily): the update disallows 2 concurrent subnet allocation # to succeed: at most 1 transaction will succeed, others will be # rolled back and be caught in neutron.db.v2.base - query = self._context.session.query(models_v2.SubnetPool).filter_by( - id=self._subnetpool['id'], hash=current_hash) - count = query.update({'hash': new_hash}) + with db_api.context_manager.writer.using(self._context): + query = ( + self._context.session.query(models_v2.SubnetPool).filter_by( + id=self._subnetpool['id'], hash=current_hash)) + + count = query.update({'hash': new_hash}) if not count: raise db_exc.RetryRequest(lib_exc.SubnetPoolInUse( subnet_pool_id=self._subnetpool['id'])) def _get_allocated_cidrs(self): - query = self._context.session.query(models_v2.Subnet) - subnets = query.filter_by(subnetpool_id=self._subnetpool['id']) - return (x.cidr for x in subnets) + with db_api.context_manager.reader.using(self._context): + query = self._context.session.query(models_v2.Subnet) + subnets = query.filter_by(subnetpool_id=self._subnetpool['id']) + return (x.cidr for x in subnets) def _get_available_prefix_list(self): prefixes = (x.cidr for x in self._subnetpool.prefixes) @@ -90,7 +96,7 @@ class SubnetAllocator(driver.Pool): def _allocations_used_by_tenant(self, quota_unit): subnetpool_id = self._subnetpool['id'] tenant_id = self._subnetpool['tenant_id'] - with self._context.session.begin(subtransactions=True): + with db_api.context_manager.reader.using(self._context): qry = self._context.session.query(models_v2.Subnet) allocations = qry.filter_by(subnetpool_id=subnetpool_id, tenant_id=tenant_id) @@ -115,7 +121,7 @@ class SubnetAllocator(driver.Pool): raise n_exc.SubnetPoolQuotaExceeded() def _allocate_any_subnet(self, request): - with self._context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(self._context): self._lock_subnetpool() self._check_subnetpool_tenant_quota(request.tenant_id, request.prefixlen) @@ -139,7 +145,7 @@ class SubnetAllocator(driver.Pool): str(request.prefixlen)) def _allocate_specific_subnet(self, request): - with self._context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(self._context): self._lock_subnetpool() self._check_subnetpool_tenant_quota(request.tenant_id, request.prefixlen) diff --git a/neutron/plugins/ml2/db.py b/neutron/plugins/ml2/db.py index ca9ccfdfd12..e98ba5f3e22 100644 --- a/neutron/plugins/ml2/db.py +++ b/neutron/plugins/ml2/db.py @@ -27,6 +27,7 @@ from neutron._i18n import _, _LE from neutron.callbacks import events from neutron.callbacks import registry from neutron.callbacks import resources +from neutron.db import api as db_api from neutron.db.models import securitygroup as sg_models from neutron.db import models_v2 from neutron.extensions import portbindings @@ -40,13 +41,13 @@ LOG = log.getLogger(__name__) MAX_PORTS_PER_QUERY = 500 +@db_api.context_manager.writer def add_port_binding(context, port_id): - with context.session.begin(subtransactions=True): - record = models.PortBinding( - port_id=port_id, - vif_type=portbindings.VIF_TYPE_UNBOUND) - context.session.add(record) - return record + record = models.PortBinding( + port_id=port_id, + vif_type=portbindings.VIF_TYPE_UNBOUND) + context.session.add(record) + return record @removals.remove( @@ -77,6 +78,7 @@ def get_locked_port_and_binding(context, port_id): return None, None +@db_api.context_manager.writer def set_binding_levels(context, levels): if levels: for level in levels: @@ -90,6 +92,7 @@ def set_binding_levels(context, levels): LOG.debug("Attempted to set empty binding levels") +@db_api.context_manager.reader def get_binding_levels(context, port_id, host): if host: result = (context.session.query(models.PortBindingLevel). @@ -104,6 +107,7 @@ def get_binding_levels(context, port_id, host): return result +@db_api.context_manager.writer def clear_binding_levels(context, port_id, host): if host: (context.session.query(models.PortBindingLevel). @@ -116,13 +120,14 @@ def clear_binding_levels(context, port_id, host): def ensure_distributed_port_binding(context, port_id, host, router_id=None): - record = (context.session.query(models.DistributedPortBinding). - filter_by(port_id=port_id, host=host).first()) + with db_api.context_manager.reader.using(context): + record = (context.session.query(models.DistributedPortBinding). + filter_by(port_id=port_id, host=host).first()) if record: return record try: - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): record = models.DistributedPortBinding( port_id=port_id, host=host, @@ -134,13 +139,14 @@ def ensure_distributed_port_binding(context, port_id, host, router_id=None): return record except db_exc.DBDuplicateEntry: LOG.debug("Distributed Port %s already bound", port_id) - return (context.session.query(models.DistributedPortBinding). - filter_by(port_id=port_id, host=host).one()) + with db_api.context_manager.reader.using(context): + return (context.session.query(models.DistributedPortBinding). + filter_by(port_id=port_id, host=host).one()) def delete_distributed_port_binding_if_stale(context, binding): if not binding.router_id and binding.status == n_const.PORT_STATUS_DOWN: - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): LOG.debug("Distributed port: Deleting binding %s", binding) context.session.delete(binding) @@ -148,10 +154,12 @@ def delete_distributed_port_binding_if_stale(context, binding): def get_port(context, port_id): """Get port record for update within transaction.""" - with context.session.begin(subtransactions=True): + with db_api.context_manager.reader.using(context): try: + # Set enable_eagerloads to True, so that lazy load can be + # proceed later. record = (context.session.query(models_v2.Port). - enable_eagerloads(False). + enable_eagerloads(True). filter(models_v2.Port.id.startswith(port_id)). one()) return record @@ -163,6 +171,7 @@ def get_port(context, port_id): return +@db_api.context_manager.reader def get_port_from_device_mac(context, device_mac): LOG.debug("get_port_from_device_mac() called for mac %s", device_mac) ports = port_obj.Port.get_objects(context, mac_address=device_mac) @@ -194,7 +203,7 @@ def get_sg_ids_grouped_by_port(context, port_ids): sg_ids_grouped_by_port = {} sg_binding_port = sg_models.SecurityGroupPortBinding.port_id - with context.session.begin(subtransactions=True): + with db_api.context_manager.reader.using(context): # partial UUIDs must be individually matched with startswith. # full UUIDs may be matched directly in an IN statement partial_uuids = set(port_id for port_id in port_ids @@ -233,7 +242,7 @@ def make_port_dict_with_security_groups(port, sec_groups): def get_port_binding_host(context, port_id): try: - with context.session.begin(subtransactions=True): + with db_api.context_manager.reader.using(context): query = (context.session.query(models.PortBinding). filter(models.PortBinding.port_id.startswith(port_id)). one()) @@ -248,6 +257,7 @@ def get_port_binding_host(context, port_id): return query.host +@db_api.context_manager.reader def generate_distributed_port_status(context, port_id): # an OR'ed value of status assigned to parent port from the # distributedportbinding bucket @@ -262,7 +272,7 @@ def generate_distributed_port_status(context, port_id): def get_distributed_port_binding_by_host(context, port_id, host): - with context.session.begin(subtransactions=True): + with db_api.context_manager.reader.using(context): binding = (context.session.query(models.DistributedPortBinding). filter(models.DistributedPortBinding.port_id.startswith(port_id), models.DistributedPortBinding.host == host).first()) @@ -273,7 +283,7 @@ def get_distributed_port_binding_by_host(context, port_id, host): def get_distributed_port_bindings(context, port_id): - with context.session.begin(subtransactions=True): + with db_api.context_manager.reader.using(context): bindings = (context.session.query(models.DistributedPortBinding). filter(models.DistributedPortBinding.port_id.startswith( port_id)).all()) @@ -282,6 +292,7 @@ def get_distributed_port_bindings(context, port_id): return bindings +@db_api.context_manager.reader def is_dhcp_active_on_any_subnet(context, subnet_ids): if not subnet_ids: return False @@ -297,13 +308,15 @@ def _prevent_segment_delete_with_port_bound(resource, event, trigger, if for_net_delete: # don't check for network deletes return - segment_id = segment['id'] - query = context.session.query(models_v2.Port) - query = query.join( - models.PortBindingLevel, - models.PortBindingLevel.port_id == models_v2.Port.id) - query = query.filter(models.PortBindingLevel.segment_id == segment_id) - port_ids = [p.id for p in query] + + with db_api.context_manager.reader.using(context): + segment_id = segment['id'] + query = context.session.query(models_v2.Port) + query = query.join( + models.PortBindingLevel, + models.PortBindingLevel.port_id == models_v2.Port.id) + query = query.filter(models.PortBindingLevel.segment_id == segment_id) + port_ids = [p.id for p in query] # There are still some ports in the segment, segment should not be deleted # TODO(xiaohhui): Should we delete the dhcp port automatically here? diff --git a/neutron/plugins/ml2/managers.py b/neutron/plugins/ml2/managers.py index 896f1a9a823..ffc6f92a98a 100644 --- a/neutron/plugins/ml2/managers.py +++ b/neutron/plugins/ml2/managers.py @@ -190,8 +190,7 @@ class TypeManager(stevedore.named.NamedExtensionManager): def create_network_segments(self, context, network, tenant_id): """Call type drivers to create network segments.""" segments = self._process_provider_create(network) - session = context.session - with session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): network_id = network['id'] if segments: for segment_index, segment in enumerate(segments): @@ -224,7 +223,7 @@ class TypeManager(stevedore.named.NamedExtensionManager): self.validate_provider_segment(segment) # Reserve segment in type driver - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): return self.reserve_provider_segment(context, segment) def is_partial_segment(self, segment): diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index 2018deb6010..0063de7b71e 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -433,7 +433,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, # After we've attempted to bind the port, we begin a # transaction, get the current port state, and decide whether # to commit the binding results. - with plugin_context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(plugin_context): # Get the current port state and build a new PortContext # reflecting this state as original state for subsequent # mechanism driver update_port_*commit() calls. @@ -594,17 +594,24 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, attributes.SUBNETS, ['_ml2_md_extend_subnet_dict']) def _ml2_md_extend_network_dict(self, result, netdb): - session = sqlalchemy.inspect(netdb).session + session = self._object_session_or_new_session(netdb) self.extension_manager.extend_network_dict(session, netdb, result) def _ml2_md_extend_port_dict(self, result, portdb): - session = sqlalchemy.inspect(portdb).session + session = self._object_session_or_new_session(portdb) self.extension_manager.extend_port_dict(session, portdb, result) def _ml2_md_extend_subnet_dict(self, result, subnetdb): - session = sqlalchemy.inspect(subnetdb).session + session = self._object_session_or_new_session(subnetdb) self.extension_manager.extend_subnet_dict(session, subnetdb, result) + @staticmethod + def _object_session_or_new_session(sql_obj): + session = sqlalchemy.inspect(sql_obj).session + if not session: + session = db_api.get_reader_session() + return session + # Note - The following hook methods have "ml2" in their names so # that they are not called twice during unit tests due to global # registration of hooks in portbindings_db.py used by other @@ -656,7 +663,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, objects = [] collection = "%ss" % resource items = request_items[collection] - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): obj_creator = getattr(self, '_create_%s_db' % resource) for item in items: try: @@ -733,10 +740,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, def _create_network_db(self, context, network): net_data = network[attributes.NETWORK] tenant_id = net_data['tenant_id'] - session = context.session registry.notify(resources.NETWORK, events.BEFORE_CREATE, self, context=context, network=net_data) - with session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): net_db = self.create_network_db(context, network) result = self._make_network_dict(net_db, process_extensions=False, context=context) @@ -766,7 +772,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, net_db[az_ext.AZ_HINTS] = az_hints result[az_ext.AZ_HINTS] = az_hints - self._apply_dict_extend_functions('networks', result, net_db) + self._apply_dict_extend_functions('networks', result, net_db) return result, mech_context @utils.transaction_guard @@ -797,8 +803,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, net_data = network[attributes.NETWORK] provider._raise_if_updates_provider_attributes(net_data) - session = context.session - with session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): original_network = super(Ml2Plugin, self).get_network(context, id) updated_network = super(Ml2Plugin, self).update_network(context, id, @@ -841,8 +846,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, @db_api.retry_if_session_inactive() def get_network(self, context, id, fields=None): - session = context.session - with session.begin(subtransactions=True): + with db_api.context_manager.reader.using(context): result = super(Ml2Plugin, self).get_network(context, id, None) self.type_manager.extend_network_dict_provider(context, result) result[api.MTU] = self._get_network_mtu(result) @@ -852,8 +856,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, @db_api.retry_if_session_inactive() def get_networks(self, context, filters=None, fields=None, sorts=None, limit=None, marker=None, page_reverse=False): - session = context.session - with session.begin(subtransactions=True): + with db_api.context_manager.reader.using(context): nets = super(Ml2Plugin, self).get_networks(context, filters, None, sorts, limit, marker, page_reverse) @@ -900,8 +903,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, self.notifier.network_delete(context, network['id']) def _create_subnet_db(self, context, subnet): - session = context.session - with session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): result, net_db, ipam_sub = self._create_subnet_precommit( context, subnet) self.extension_manager.process_create_subnet( @@ -912,6 +914,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, mech_context = driver_context.SubnetContext(self, context, result, network) self.mechanism_manager.create_subnet_precommit(mech_context) + # TODO(ataraday): temporary flush until this will be available + # in oslo.db (change https://review.openstack.org/#/c/433758/ + # got merged) + context.session.flush() + # db base plugin post commit ops self._create_subnet_postcommit(context, result, net_db, ipam_sub) @@ -941,8 +948,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, @utils.transaction_guard @db_api.retry_if_session_inactive() def update_subnet(self, context, id, subnet): - session = context.session - with session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): original_subnet = self.get_subnet(context, id) updated_subnet = self._update_subnet_precommit( context, id, subnet) @@ -1042,8 +1048,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, registry.notify(resources.PORT, events.BEFORE_CREATE, self, context=context, port=attrs) - session = context.session - with session.begin(subtransactions=True): + # NOTE(kevinbenton): triggered outside of transaction since it + # emits 'AFTER' events if it creates. + self._ensure_default_security_group(context, attrs['tenant_id']) + with db_api.context_manager.writer.using(context): dhcp_opts = attrs.get(edo_ext.EXTRADHCPOPTS, []) port_db = self.create_port_db(context, port) result = self._make_port_dict(port_db, process_extensions=False) @@ -1180,10 +1188,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, def update_port(self, context, id, port): attrs = port[attributes.PORT] need_port_update_notify = False - session = context.session bound_mech_contexts = [] - - with session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): port_db = self._get_port(context, id) binding = port_db.port_binding if not binding: @@ -1343,7 +1349,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, LOG.error(_LE("No Host supplied to bind DVR Port %s"), id) return - session = context.session binding = db.get_distributed_port_binding_by_host(context, id, host) device_id = attrs and attrs.get('device_id') @@ -1353,7 +1358,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, router_id != device_id) if update_required: try: - with session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): orig_port = self.get_port(context, id) if not binding: binding = db.ensure_distributed_port_binding( @@ -1398,8 +1403,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, router_ids = [] l3plugin = directory.get_plugin(const.L3) - session = context.session - with session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): try: port_db = self._get_port(context, id) binding = port_db.port_binding @@ -1466,8 +1470,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, @db_api.retry_if_session_inactive(context_var_name='plugin_context') def get_bound_port_context(self, plugin_context, port_id, host=None, cached_networks=None): - session = plugin_context.session - with session.begin(subtransactions=True): + with db_api.context_manager.reader.using(plugin_context) as session: try: port_db = (session.query(models_v2.Port). enable_eagerloads(False). @@ -1527,8 +1530,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, one was already performed by the caller. """ updated = False - session = context.session - with session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): port = db.get_port(context, port_id) if not port: LOG.debug("Port %(port)s update to %(val)s by agent not found", @@ -1560,7 +1562,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, if (updated and port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE): - with session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): port = db.get_port(context, port_id) if not port: LOG.warning(_LW("Port %s not found during update"), diff --git a/neutron/services/segments/db.py b/neutron/services/segments/db.py index 8e8b303473a..6f96050f553 100644 --- a/neutron/services/segments/db.py +++ b/neutron/services/segments/db.py @@ -77,7 +77,7 @@ class SegmentDbMixin(common_db_mixin.CommonDbMixin): return self._make_segment_dict(new_segment) def _create_segment_db(self, context, segment_id, segment): - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): network_id = segment['network_id'] physical_network = segment[extension.PHYSICAL_NETWORK] if physical_network == constants.ATTR_NOT_SPECIFIED: @@ -125,7 +125,7 @@ class SegmentDbMixin(common_db_mixin.CommonDbMixin): def update_segment(self, context, uuid, segment): """Update an existing segment.""" segment = segment['segment'] - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): curr_segment = self._get_segment(context, uuid) curr_segment.update(segment) return self._make_segment_dict(curr_segment) @@ -175,7 +175,7 @@ class SegmentDbMixin(common_db_mixin.CommonDbMixin): segment=segment, for_net_delete=for_net_delete) # Delete segment in DB - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): query = self._model_query(context, segment_model.NetworkSegment) query = query.filter(segment_model.NetworkSegment.id == uuid) if 0 == query.delete(): @@ -191,7 +191,7 @@ class SegmentDbMixin(common_db_mixin.CommonDbMixin): def update_segment_host_mapping(context, host, current_segment_ids): - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): segment_host_mapping = network.SegmentHostMapping.get_objects( context, host=host) previous_segment_ids = { @@ -241,7 +241,7 @@ def get_segments_with_phys_nets(context, phys_nets): if not phys_nets: return [] - with context.session.begin(subtransactions=True): + with db_api.context_manager.reader.using(context): segments = context.session.query(segment_model.NetworkSegment).filter( segment_model.NetworkSegment.physical_network.in_(phys_nets)) return segments @@ -249,7 +249,7 @@ def get_segments_with_phys_nets(context, phys_nets): def map_segment_to_hosts(context, segment_id, hosts): """Map segment to a collection of hosts.""" - with db_api.autonested_transaction(context.session): + with db_api.context_manager.writer.using(context): for host in hosts: network.SegmentHostMapping( context, segment_id=segment_id, host=host).create() diff --git a/neutron/services/segments/plugin.py b/neutron/services/segments/plugin.py index 5d637b39ceb..b4167553c11 100644 --- a/neutron/services/segments/plugin.py +++ b/neutron/services/segments/plugin.py @@ -30,6 +30,7 @@ from neutron.callbacks import registry from neutron.callbacks import resources from neutron.common import exceptions as n_exc from neutron.db import _resource_extend as resource_extend +from neutron.db import api as db_api from neutron.db.models import segment as segment_model from neutron.db import models_v2 from neutron.extensions import ip_allocation @@ -102,14 +103,17 @@ class Plugin(db.SegmentDbMixin, segment.SegmentPluginBase): if for_net_delete: # don't check if this is a part of a network delete operation return - segment_id = segment['id'] - query = context.session.query(models_v2.Subnet.id) - query = query.filter(models_v2.Subnet.segment_id == segment_id) - subnet_ids = [s[0] for s in query] + with db_api.context_manager.reader.using(context): + segment_id = segment['id'] + query = context.session.query(models_v2.Subnet.id) + query = query.filter(models_v2.Subnet.segment_id == segment_id) + subnet_ids = [s[0] for s in query] + if subnet_ids: reason = _("The segment is still associated with subnet(s) " "%s") % ", ".join(subnet_ids) - raise exceptions.SegmentInUse(segment_id=segment_id, reason=reason) + raise exceptions.SegmentInUse(segment_id=segment_id, + reason=reason) class Event(object): diff --git a/neutron/tests/unit/db/test_db_base_plugin_v2.py b/neutron/tests/unit/db/test_db_base_plugin_v2.py index b6d561e5269..7e385892468 100644 --- a/neutron/tests/unit/db/test_db_base_plugin_v2.py +++ b/neutron/tests/unit/db/test_db_base_plugin_v2.py @@ -6295,7 +6295,7 @@ class DbModelMixin(object): class DbModelTenantTestCase(DbModelMixin, testlib_api.SqlTestCase): def _make_network(self, ctx): - with ctx.session.begin(): + with db_api.context_manager.writer.using(ctx): network = models_v2.Network(name="net_net", status="OK", tenant_id='dbcheck', admin_state_up=True) @@ -6303,7 +6303,7 @@ class DbModelTenantTestCase(DbModelMixin, testlib_api.SqlTestCase): return network def _make_subnet(self, ctx, network_id): - with ctx.session.begin(): + with db_api.context_manager.writer.using(ctx): subnet = models_v2.Subnet(name="subsub", ip_version=4, tenant_id='dbcheck', cidr='turn_down_for_what', @@ -6321,7 +6321,7 @@ class DbModelTenantTestCase(DbModelMixin, testlib_api.SqlTestCase): return port def _make_subnetpool(self, ctx): - with ctx.session.begin(): + with db_api.context_manager.writer.using(ctx): subnetpool = models_v2.SubnetPool( ip_version=4, default_prefixlen=4, min_prefixlen=4, max_prefixlen=4, shared=False, default_quota=4, @@ -6334,7 +6334,7 @@ class DbModelTenantTestCase(DbModelMixin, testlib_api.SqlTestCase): class DbModelProjectTestCase(DbModelMixin, testlib_api.SqlTestCase): def _make_network(self, ctx): - with ctx.session.begin(): + with db_api.context_manager.writer.using(ctx): network = models_v2.Network(name="net_net", status="OK", project_id='dbcheck', admin_state_up=True) @@ -6342,7 +6342,7 @@ class DbModelProjectTestCase(DbModelMixin, testlib_api.SqlTestCase): return network def _make_subnet(self, ctx, network_id): - with ctx.session.begin(): + with db_api.context_manager.writer.using(ctx): subnet = models_v2.Subnet(name="subsub", ip_version=4, project_id='dbcheck', cidr='turn_down_for_what', @@ -6360,7 +6360,7 @@ class DbModelProjectTestCase(DbModelMixin, testlib_api.SqlTestCase): return port def _make_subnetpool(self, ctx): - with ctx.session.begin(): + with db_api.context_manager.writer.using(ctx): subnetpool = models_v2.SubnetPool( ip_version=4, default_prefixlen=4, min_prefixlen=4, max_prefixlen=4, shared=False, default_quota=4, diff --git a/neutron/tests/unit/db/test_l3_dvr_db.py b/neutron/tests/unit/db/test_l3_dvr_db.py index 7ba7fba3b3b..b7ac4eaa0f5 100644 --- a/neutron/tests/unit/db/test_l3_dvr_db.py +++ b/neutron/tests/unit/db/test_l3_dvr_db.py @@ -810,7 +810,9 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase): RuntimeError, self.mixin.add_router_interface, self.ctx, router['id'], {'port_id': port['port']['id']}) - + # expire since we are re-using the session which might have stale + # ports in it + self.ctx.session.expire_all() port_info = self.core_plugin.get_port(self.ctx, port['port']['id']) self.assertEqual(port_dict['device_id'], port_info['device_id']) self.assertEqual(port_dict['device_owner'], diff --git a/neutron/tests/unit/ipam/test_subnet_alloc.py b/neutron/tests/unit/ipam/test_subnet_alloc.py index eaf51229203..ff1f853ff75 100644 --- a/neutron/tests/unit/ipam/test_subnet_alloc.py +++ b/neutron/tests/unit/ipam/test_subnet_alloc.py @@ -23,6 +23,7 @@ from oslo_db import exception as db_exc from oslo_utils import uuidutils from neutron.common import exceptions as n_exc +from neutron.db import api as db_api from neutron.ipam import requests as ipam_req from neutron.ipam import subnet_alloc from neutron.tests.unit.db import test_db_base_plugin_v2 @@ -64,7 +65,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase): sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp', prefix_list, 21, 4) sp = self.plugin._get_subnetpool(self.ctx, sp['id']) - with self.ctx.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(self.ctx): sa = subnet_alloc.SubnetAllocator(sp, self.ctx) req = ipam_req.AnySubnetRequest(self._tenant_id, uuidutils.generate_uuid(), @@ -80,7 +81,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase): sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp', ['10.1.0.0/16', '192.168.1.0/24'], 21, 4) - with self.ctx.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(self.ctx): sp = self.plugin._get_subnetpool(self.ctx, sp['id']) sa = subnet_alloc.SubnetAllocator(sp, self.ctx) req = ipam_req.SpecificSubnetRequest(self._tenant_id, @@ -122,7 +123,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase): ['10.1.0.0/16', '192.168.1.0/24'], 21, 4) sp = self.plugin._get_subnetpool(self.ctx, sp['id']) - with self.ctx.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(self.ctx): sa = subnet_alloc.SubnetAllocator(sp, self.ctx) req = ipam_req.AnySubnetRequest(self._tenant_id, uuidutils.generate_uuid(), @@ -137,7 +138,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase): ['10.1.0.0/16', '192.168.1.0/24'], 21, 4) sp = self.plugin._get_subnetpool(self.ctx, sp['id']) - with self.ctx.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(self.ctx): sa = subnet_alloc.SubnetAllocator(sp, self.ctx) req = ipam_req.SpecificSubnetRequest(self._tenant_id, uuidutils.generate_uuid(), @@ -154,7 +155,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase): ['2210::/64'], 64, 6) sp = self.plugin._get_subnetpool(self.ctx, sp['id']) - with self.ctx.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(self.ctx): sa = subnet_alloc.SubnetAllocator(sp, self.ctx) req = ipam_req.SpecificSubnetRequest(self._tenant_id, uuidutils.generate_uuid(), diff --git a/neutron/tests/unit/plugins/ml2/drivers/ext_test.py b/neutron/tests/unit/plugins/ml2/drivers/ext_test.py index 6c39020b9b8..4571f38900f 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/ext_test.py +++ b/neutron/tests/unit/plugins/ml2/drivers/ext_test.py @@ -129,7 +129,8 @@ class TestNetworkExtension(model_base.BASEV2): value = sa.Column(sa.String(64)) network = orm.relationship( models_v2.Network, - backref=orm.backref('extension', cascade='delete', uselist=False)) + backref=orm.backref('extension', cascade='delete', uselist=False, + lazy='joined')) class TestSubnetExtension(model_base.BASEV2): @@ -139,7 +140,8 @@ class TestSubnetExtension(model_base.BASEV2): value = sa.Column(sa.String(64)) subnet = orm.relationship( models_v2.Subnet, - backref=orm.backref('extension', cascade='delete', uselist=False)) + backref=orm.backref('extension', cascade='delete', uselist=False, + lazy='joined')) class TestPortExtension(model_base.BASEV2): @@ -149,7 +151,8 @@ class TestPortExtension(model_base.BASEV2): value = sa.Column(sa.String(64)) port = orm.relationship( models_v2.Port, - backref=orm.backref('extension', cascade='delete', uselist=False)) + backref=orm.backref('extension', cascade='delete', uselist=False, + lazy='joined')) class TestDBExtensionDriver(TestExtensionDriverBase): diff --git a/neutron/tests/unit/plugins/ml2/test_db.py b/neutron/tests/unit/plugins/ml2/test_db.py index 3423cb9dbd7..19471766fef 100644 --- a/neutron/tests/unit/plugins/ml2/test_db.py +++ b/neutron/tests/unit/plugins/ml2/test_db.py @@ -23,6 +23,7 @@ from oslo_utils import uuidutils from sqlalchemy.orm import exc from sqlalchemy.orm import query +from neutron.db import api as db_api from neutron.db import db_base_plugin_v2 from neutron.db.models import l3 as l3_models from neutron.db import models_v2 @@ -63,7 +64,7 @@ class Ml2DBTestCase(testlib_api.SqlTestCase): return port def _setup_neutron_portbinding(self, port_id, vif_type, host): - with self.ctx.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(self.ctx): self.ctx.session.add(models.PortBinding(port_id=port_id, vif_type=vif_type, host=host)) @@ -285,7 +286,7 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase): self.setup_coreplugin(PLUGIN_NAME) def _setup_neutron_network(self, network_id, port_ids): - with self.ctx.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(self.ctx): network_obj.Network(self.ctx, id=network_id).create() ports = [] for port_id in port_ids: @@ -311,7 +312,7 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase): def _setup_distributed_binding(self, network_id, port_id, router_id, host_id): - with self.ctx.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(self.ctx): record = models.DistributedPortBinding( port_id=port_id, host=host_id, @@ -403,7 +404,7 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase): def test_distributed_port_binding_deleted_by_port_deletion(self): network_id = uuidutils.generate_uuid() network_obj.Network(self.ctx, id=network_id).create() - with self.ctx.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(self.ctx): device_owner = constants.DEVICE_OWNER_DVR_INTERFACE port = models_v2.Port( id='port_id', @@ -428,7 +429,7 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase): self.ctx.session.add(models.DistributedPortBinding( **binding_kwarg)) with warnings.catch_warnings(record=True) as warning_list: - with self.ctx.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(self.ctx): self.ctx.session.delete(port) self.assertEqual([], warning_list) ports = ml2_db.get_distributed_port_bindings(self.ctx, diff --git a/neutron/tests/unit/plugins/ml2/test_plugin.py b/neutron/tests/unit/plugins/ml2/test_plugin.py index 091f0b53f87..15018531b24 100644 --- a/neutron/tests/unit/plugins/ml2/test_plugin.py +++ b/neutron/tests/unit/plugins/ml2/test_plugin.py @@ -660,7 +660,7 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase): status='ACTIVE', device_id='vm_id', device_owner=DEVICE_OWNER_COMPUTE ) - with self.context.session.begin(): + with db_api.context_manager.writer.using(self.context): self.context.session.add(port_db) self.assertIsNone(plugin._port_provisioned('port', 'evt', 'trigger', self.context, port_id)) @@ -1404,7 +1404,7 @@ class TestMl2DvrPortsV2(TestMl2PortsV2): {'subnet_id': s['subnet']['id']}) # lie to turn the port into an SNAT interface - with self.context.session.begin(): + with db_api.context_manager.reader.using(self.context): rp = self.context.session.query(l3_models.RouterPort).filter_by( port_id=p['port_id']).first() rp.port_type = constants.DEVICE_OWNER_ROUTER_SNAT @@ -2560,17 +2560,21 @@ class TestML2Segments(Ml2PluginV2TestCase): ml2_db.subscribe() plugin = directory.get_plugin() with self.port(device_owner=fake_owner_compute) as port: - binding = plugin._get_port( - self.context, port['port']['id']).port_binding - binding['host'] = 'host-ovs-no_filter' - mech_context = driver_context.PortContext( - plugin, self.context, port['port'], - plugin.get_network(self.context, port['port']['network_id']), - binding, None) - plugin._bind_port_if_needed(mech_context) - segment = segments_db.get_network_segments( - self.context, port['port']['network_id'])[0] - segment['network_id'] = port['port']['network_id'] + # add writer here to make sure that the following operations are + # performed in the same session + with db_api.context_manager.writer.using(self.context): + binding = plugin._get_port( + self.context, port['port']['id']).port_binding + binding['host'] = 'host-ovs-no_filter' + mech_context = driver_context.PortContext( + plugin, self.context, port['port'], + plugin.get_network(self.context, + port['port']['network_id']), + binding, None) + plugin._bind_port_if_needed(mech_context) + segment = segments_db.get_network_segments( + self.context, port['port']['network_id'])[0] + segment['network_id'] = port['port']['network_id'] self.assertRaises(c_exc.CallbackFailure, registry.notify, resources.SEGMENT, events.BEFORE_DELETE, mock.ANY,