From cf34df857273d3be289e00590d80498cc11149ee Mon Sep 17 00:00:00 2001 From: Ann Kamyshnikova Date: Thu, 24 Nov 2016 17:16:14 +0300 Subject: [PATCH] New enginefacade for networks, subnets. Usage reader and writer for db operations. In Neutron code we have common situation like: with context.session.begin(): context.session.add(obj) self._make_obj_dict(obj) With new enginefacade we change context.session.begin() for db.context_manager.writer(reader).using(context). When object leaves this with-block, its reference to session is cleared because session is discarded. To use this object later to load some data from its dependencies, we have to provide different session to work in. To solve this obj either can be moved under with-block or we have to do context.session.add(obj) one more time to be able to load relations. This change also switches to usage of new enginefacade for some db operations with ports, in order to pass unit and functional tests. Partially-Implements blueprint: enginefacade-switch Change-Id: Ia15c63f94d2c67791da3b65546e59f6929c8c685 --- neutron/db/db_base_plugin_v2.py | 42 ++++++------ neutron/db/network_ip_availability_db.py | 3 + neutron/db/segments_db.py | 11 ++-- neutron/ipam/drivers/neutrondb_ipam/driver.py | 2 + neutron/ipam/subnet_alloc.py | 30 +++++---- neutron/plugins/ml2/db.py | 63 +++++++++++------- neutron/plugins/ml2/managers.py | 5 +- neutron/plugins/ml2/plugin.py | 66 ++++++++++--------- neutron/services/segments/db.py | 12 ++-- neutron/services/segments/plugin.py | 14 ++-- .../tests/unit/db/test_db_base_plugin_v2.py | 12 ++-- neutron/tests/unit/db/test_l3_dvr_db.py | 4 +- neutron/tests/unit/ipam/test_subnet_alloc.py | 11 ++-- .../unit/plugins/ml2/drivers/ext_test.py | 9 ++- neutron/tests/unit/plugins/ml2/test_db.py | 11 ++-- neutron/tests/unit/plugins/ml2/test_plugin.py | 30 +++++---- 16 files changed, 184 insertions(+), 141 deletions(-) diff --git a/neutron/db/db_base_plugin_v2.py b/neutron/db/db_base_plugin_v2.py index 3bb8dee3e6f..e36c1228c47 100644 --- a/neutron/db/db_base_plugin_v2.py +++ b/neutron/db/db_base_plugin_v2.py @@ -322,14 +322,12 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon, objects = [] collection = "%ss" % resource items = request_items[collection] - context.session.begin(subtransactions=True) try: - for item in items: - obj_creator = getattr(self, 'create_%s' % resource) - objects.append(obj_creator(context, item)) - context.session.commit() + with db_api.context_manager.writer.using(context): + for item in items: + obj_creator = getattr(self, 'create_%s' % resource) + objects.append(obj_creator(context, item)) except Exception: - context.session.rollback() with excutils.save_and_reraise_exception(): LOG.error(_LE("An exception occurred while creating " "the %(resource)s:%(item)s"), @@ -350,7 +348,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon, def create_network_db(self, context, network): # single request processing n = network['network'] - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): args = {'tenant_id': n['tenant_id'], 'id': n.get('id') or uuidutils.generate_uuid(), 'name': n['name'], @@ -369,7 +367,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon, @db_api.retry_if_session_inactive() def update_network(self, context, id, network): n = network['network'] - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): network = self._get_network(context, id) # validate 'shared' parameter if 'shared' in n: @@ -408,9 +406,10 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon, registry.notify(resources.NETWORK, events.BEFORE_DELETE, self, context=context, network_id=id) self._ensure_network_not_in_use(context, id) - auto_delete_port_ids = [p.id for p in context.session.query( - models_v2.Port.id).filter_by(network_id=id).filter( - models_v2.Port.device_owner.in_(AUTO_DELETE_PORT_OWNERS))] + with db_api.context_manager.reader.using(context): + auto_delete_port_ids = [p.id for p in context.session.query( + models_v2.Port.id).filter_by(network_id=id).filter( + models_v2.Port.device_owner.in_(AUTO_DELETE_PORT_OWNERS))] for port_id in auto_delete_port_ids: self.delete_port(context.elevated(), port_id) # clean up subnets @@ -420,7 +419,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon, # cleanup if a network-owned port snuck in without failing for subnet in subnets: self.delete_subnet(context, subnet['id']) - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): network_db = self._get_network(context, id) network = self._make_network_dict(network_db, context=context) registry.notify(resources.NETWORK, events.PRECOMMIT_DELETE, @@ -739,7 +738,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon, raise exc.BadRequest(resource='subnets', msg=msg) self._validate_subnet(context, s) - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): network = self._get_network(context, subnet['subnet']['network_id']) subnet, ipam_subnet = self.ipam.allocate_subnet(context, @@ -823,13 +822,16 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon, registry.notify(resources.SUBNET_GATEWAY, events.BEFORE_UPDATE, self, **kwargs) - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): subnet, changes = self.ipam.update_db_subnet(context, id, s, db_pools) # we expire here since ipam may have made changes to relationships - # that will be stale on any subsequent lookups while the subnet object - # is in the session otherwise. - context.session.expire(subnet) + # that will be stale on any subsequent lookups while the subnet + # object is in the session otherwise. + # Check if subnet attached to session before expire. + if subnet in context.session: + context.session.expire(subnet) + return self._make_subnet_dict(subnet, context=context) @property @@ -962,7 +964,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon, self._remove_subnet_ip_allocations_from_ports(context, id) # retry integrity errors to catch ip allocation races with db_api.exc_to_retry(sql_exc.IntegrityError), \ - context.session.begin(subtransactions=True): + db_api.context_manager.writer.using(context): subnet_db = self._get_subnet(context, id) subnet = self._make_subnet_dict(subnet_db, context=context) registry.notify(resources.SUBNET, events.PRECOMMIT_DELETE, @@ -1098,7 +1100,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon, def update_subnetpool(self, context, id, subnetpool): new_sp = subnetpool['subnetpool'] - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): orig_sp = self._get_subnetpool(context, id=id) updated = _update_subnetpool_dict(orig_sp, new_sp) reader = subnet_alloc.SubnetPoolReader(updated) @@ -1161,7 +1163,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon, @db_api.retry_if_session_inactive() def delete_subnetpool(self, context, id): - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): subnetpool = self._get_subnetpool(context, id=id) subnets = self._get_subnets_by_subnetpool(context, id) if subnets: diff --git a/neutron/db/network_ip_availability_db.py b/neutron/db/network_ip_availability_db.py index 4855be2c9b2..10c067f82b4 100644 --- a/neutron/db/network_ip_availability_db.py +++ b/neutron/db/network_ip_availability_db.py @@ -17,6 +17,7 @@ import netaddr import six from sqlalchemy import func +from neutron.db import api as db_api import neutron.db.models_v2 as mod NETWORK_ID = 'network_id' @@ -84,6 +85,7 @@ class IpAvailabilityMixin(object): return net_ip_availabilities @classmethod + @db_api.context_manager.reader def _build_network_used_ip_query(cls, context, filters): # Generate a query to gather network/subnet/used_ips. # Ensure query is tolerant of missing child table data (outerjoins) @@ -100,6 +102,7 @@ class IpAvailabilityMixin(object): return cls._adjust_query_for_filters(query, filters) @classmethod + @db_api.context_manager.reader def _build_total_ips_query(cls, context, filters): query = context.session.query() query = query.add_columns(*cls.total_ips_columns) diff --git a/neutron/db/segments_db.py b/neutron/db/segments_db.py index b1745d8163f..c6dd3d8b9df 100644 --- a/neutron/db/segments_db.py +++ b/neutron/db/segments_db.py @@ -18,6 +18,7 @@ from neutron._i18n import _LI from neutron.callbacks import events from neutron.callbacks import registry from neutron.callbacks import resources +from neutron.db import api as db_api from neutron.db.models import segment as segments_model LOG = logging.getLogger(__name__) @@ -37,7 +38,7 @@ def _make_segment_dict(record): def add_network_segment(context, network_id, segment, segment_index=0, is_dynamic=False): - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): record = segments_model.NetworkSegment( id=uuidutils.generate_uuid(), network_id=network_id, @@ -70,7 +71,7 @@ def get_networks_segments(context, network_ids, filter_dynamic=False): if not network_ids: return {} - with context.session.begin(subtransactions=True): + with db_api.context_manager.reader.using(context): query = (context.session.query(segments_model.NetworkSegment). filter(segments_model.NetworkSegment.network_id .in_(network_ids)). @@ -85,7 +86,7 @@ def get_networks_segments(context, network_ids, filter_dynamic=False): def get_segment_by_id(context, segment_id): - with context.session.begin(subtransactions=True): + with db_api.context_manager.reader.using(context): try: record = (context.session.query(segments_model.NetworkSegment). filter_by(id=segment_id). @@ -98,7 +99,7 @@ def get_segment_by_id(context, segment_id): def get_dynamic_segment(context, network_id, physical_network=None, segmentation_id=None): """Return a dynamic segment for the filters provided if one exists.""" - with context.session.begin(subtransactions=True): + with db_api.context_manager.reader.using(context): query = (context.session.query(segments_model.NetworkSegment). filter_by(network_id=network_id, is_dynamic=True)) if physical_network: @@ -122,6 +123,6 @@ def get_dynamic_segment(context, network_id, physical_network=None, def delete_network_segment(context, segment_id): """Release a dynamic segment for the params provided if one exists.""" - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): (context.session.query(segments_model.NetworkSegment). filter_by(id=segment_id).delete()) diff --git a/neutron/ipam/drivers/neutrondb_ipam/driver.py b/neutron/ipam/drivers/neutrondb_ipam/driver.py index 4e6f3884827..c969c3c305e 100644 --- a/neutron/ipam/drivers/neutrondb_ipam/driver.py +++ b/neutron/ipam/drivers/neutrondb_ipam/driver.py @@ -201,6 +201,8 @@ class NeutronDbSubnet(ipam_base.Subnet): # The only defined status at this stage is 'ALLOCATED'. # More states will be available in the future - e.g.: RECYCLABLE try: + # TODO(ataraday): revisit this after objects switched to + # new enginefacade with self._context.session.begin(subtransactions=True): # NOTE(kevinbenton): we use a subtransaction to force # a flush here so we can capture DBReferenceErrors due diff --git a/neutron/ipam/subnet_alloc.py b/neutron/ipam/subnet_alloc.py index 00bb4b3e98b..6be42014241 100644 --- a/neutron/ipam/subnet_alloc.py +++ b/neutron/ipam/subnet_alloc.py @@ -24,6 +24,7 @@ from oslo_utils import uuidutils from neutron._i18n import _ from neutron.common import exceptions as n_exc +from neutron.db import api as db_api from neutron.db import models_v2 from neutron.ipam import driver from neutron.ipam import exceptions as ipam_exc @@ -49,9 +50,10 @@ class SubnetAllocator(driver.Pool): subnetpool, it's required to ensure non-overlapping cidrs in the same subnetpool. """ - - current_hash = (self._context.session.query(models_v2.SubnetPool.hash) - .filter_by(id=self._subnetpool['id']).scalar()) + with db_api.context_manager.reader.using(self._context): + current_hash = ( + self._context.session.query(models_v2.SubnetPool.hash) + .filter_by(id=self._subnetpool['id']).scalar()) if current_hash is None: # NOTE(cbrandily): subnetpool has been deleted raise n_exc.SubnetPoolNotFound( @@ -61,17 +63,21 @@ class SubnetAllocator(driver.Pool): # NOTE(cbrandily): the update disallows 2 concurrent subnet allocation # to succeed: at most 1 transaction will succeed, others will be # rolled back and be caught in neutron.db.v2.base - query = self._context.session.query(models_v2.SubnetPool).filter_by( - id=self._subnetpool['id'], hash=current_hash) - count = query.update({'hash': new_hash}) + with db_api.context_manager.writer.using(self._context): + query = ( + self._context.session.query(models_v2.SubnetPool).filter_by( + id=self._subnetpool['id'], hash=current_hash)) + + count = query.update({'hash': new_hash}) if not count: raise db_exc.RetryRequest(lib_exc.SubnetPoolInUse( subnet_pool_id=self._subnetpool['id'])) def _get_allocated_cidrs(self): - query = self._context.session.query(models_v2.Subnet) - subnets = query.filter_by(subnetpool_id=self._subnetpool['id']) - return (x.cidr for x in subnets) + with db_api.context_manager.reader.using(self._context): + query = self._context.session.query(models_v2.Subnet) + subnets = query.filter_by(subnetpool_id=self._subnetpool['id']) + return (x.cidr for x in subnets) def _get_available_prefix_list(self): prefixes = (x.cidr for x in self._subnetpool.prefixes) @@ -90,7 +96,7 @@ class SubnetAllocator(driver.Pool): def _allocations_used_by_tenant(self, quota_unit): subnetpool_id = self._subnetpool['id'] tenant_id = self._subnetpool['tenant_id'] - with self._context.session.begin(subtransactions=True): + with db_api.context_manager.reader.using(self._context): qry = self._context.session.query(models_v2.Subnet) allocations = qry.filter_by(subnetpool_id=subnetpool_id, tenant_id=tenant_id) @@ -115,7 +121,7 @@ class SubnetAllocator(driver.Pool): raise n_exc.SubnetPoolQuotaExceeded() def _allocate_any_subnet(self, request): - with self._context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(self._context): self._lock_subnetpool() self._check_subnetpool_tenant_quota(request.tenant_id, request.prefixlen) @@ -139,7 +145,7 @@ class SubnetAllocator(driver.Pool): str(request.prefixlen)) def _allocate_specific_subnet(self, request): - with self._context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(self._context): self._lock_subnetpool() self._check_subnetpool_tenant_quota(request.tenant_id, request.prefixlen) diff --git a/neutron/plugins/ml2/db.py b/neutron/plugins/ml2/db.py index ca9ccfdfd12..e98ba5f3e22 100644 --- a/neutron/plugins/ml2/db.py +++ b/neutron/plugins/ml2/db.py @@ -27,6 +27,7 @@ from neutron._i18n import _, _LE from neutron.callbacks import events from neutron.callbacks import registry from neutron.callbacks import resources +from neutron.db import api as db_api from neutron.db.models import securitygroup as sg_models from neutron.db import models_v2 from neutron.extensions import portbindings @@ -40,13 +41,13 @@ LOG = log.getLogger(__name__) MAX_PORTS_PER_QUERY = 500 +@db_api.context_manager.writer def add_port_binding(context, port_id): - with context.session.begin(subtransactions=True): - record = models.PortBinding( - port_id=port_id, - vif_type=portbindings.VIF_TYPE_UNBOUND) - context.session.add(record) - return record + record = models.PortBinding( + port_id=port_id, + vif_type=portbindings.VIF_TYPE_UNBOUND) + context.session.add(record) + return record @removals.remove( @@ -77,6 +78,7 @@ def get_locked_port_and_binding(context, port_id): return None, None +@db_api.context_manager.writer def set_binding_levels(context, levels): if levels: for level in levels: @@ -90,6 +92,7 @@ def set_binding_levels(context, levels): LOG.debug("Attempted to set empty binding levels") +@db_api.context_manager.reader def get_binding_levels(context, port_id, host): if host: result = (context.session.query(models.PortBindingLevel). @@ -104,6 +107,7 @@ def get_binding_levels(context, port_id, host): return result +@db_api.context_manager.writer def clear_binding_levels(context, port_id, host): if host: (context.session.query(models.PortBindingLevel). @@ -116,13 +120,14 @@ def clear_binding_levels(context, port_id, host): def ensure_distributed_port_binding(context, port_id, host, router_id=None): - record = (context.session.query(models.DistributedPortBinding). - filter_by(port_id=port_id, host=host).first()) + with db_api.context_manager.reader.using(context): + record = (context.session.query(models.DistributedPortBinding). + filter_by(port_id=port_id, host=host).first()) if record: return record try: - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): record = models.DistributedPortBinding( port_id=port_id, host=host, @@ -134,13 +139,14 @@ def ensure_distributed_port_binding(context, port_id, host, router_id=None): return record except db_exc.DBDuplicateEntry: LOG.debug("Distributed Port %s already bound", port_id) - return (context.session.query(models.DistributedPortBinding). - filter_by(port_id=port_id, host=host).one()) + with db_api.context_manager.reader.using(context): + return (context.session.query(models.DistributedPortBinding). + filter_by(port_id=port_id, host=host).one()) def delete_distributed_port_binding_if_stale(context, binding): if not binding.router_id and binding.status == n_const.PORT_STATUS_DOWN: - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): LOG.debug("Distributed port: Deleting binding %s", binding) context.session.delete(binding) @@ -148,10 +154,12 @@ def delete_distributed_port_binding_if_stale(context, binding): def get_port(context, port_id): """Get port record for update within transaction.""" - with context.session.begin(subtransactions=True): + with db_api.context_manager.reader.using(context): try: + # Set enable_eagerloads to True, so that lazy load can be + # proceed later. record = (context.session.query(models_v2.Port). - enable_eagerloads(False). + enable_eagerloads(True). filter(models_v2.Port.id.startswith(port_id)). one()) return record @@ -163,6 +171,7 @@ def get_port(context, port_id): return +@db_api.context_manager.reader def get_port_from_device_mac(context, device_mac): LOG.debug("get_port_from_device_mac() called for mac %s", device_mac) ports = port_obj.Port.get_objects(context, mac_address=device_mac) @@ -194,7 +203,7 @@ def get_sg_ids_grouped_by_port(context, port_ids): sg_ids_grouped_by_port = {} sg_binding_port = sg_models.SecurityGroupPortBinding.port_id - with context.session.begin(subtransactions=True): + with db_api.context_manager.reader.using(context): # partial UUIDs must be individually matched with startswith. # full UUIDs may be matched directly in an IN statement partial_uuids = set(port_id for port_id in port_ids @@ -233,7 +242,7 @@ def make_port_dict_with_security_groups(port, sec_groups): def get_port_binding_host(context, port_id): try: - with context.session.begin(subtransactions=True): + with db_api.context_manager.reader.using(context): query = (context.session.query(models.PortBinding). filter(models.PortBinding.port_id.startswith(port_id)). one()) @@ -248,6 +257,7 @@ def get_port_binding_host(context, port_id): return query.host +@db_api.context_manager.reader def generate_distributed_port_status(context, port_id): # an OR'ed value of status assigned to parent port from the # distributedportbinding bucket @@ -262,7 +272,7 @@ def generate_distributed_port_status(context, port_id): def get_distributed_port_binding_by_host(context, port_id, host): - with context.session.begin(subtransactions=True): + with db_api.context_manager.reader.using(context): binding = (context.session.query(models.DistributedPortBinding). filter(models.DistributedPortBinding.port_id.startswith(port_id), models.DistributedPortBinding.host == host).first()) @@ -273,7 +283,7 @@ def get_distributed_port_binding_by_host(context, port_id, host): def get_distributed_port_bindings(context, port_id): - with context.session.begin(subtransactions=True): + with db_api.context_manager.reader.using(context): bindings = (context.session.query(models.DistributedPortBinding). filter(models.DistributedPortBinding.port_id.startswith( port_id)).all()) @@ -282,6 +292,7 @@ def get_distributed_port_bindings(context, port_id): return bindings +@db_api.context_manager.reader def is_dhcp_active_on_any_subnet(context, subnet_ids): if not subnet_ids: return False @@ -297,13 +308,15 @@ def _prevent_segment_delete_with_port_bound(resource, event, trigger, if for_net_delete: # don't check for network deletes return - segment_id = segment['id'] - query = context.session.query(models_v2.Port) - query = query.join( - models.PortBindingLevel, - models.PortBindingLevel.port_id == models_v2.Port.id) - query = query.filter(models.PortBindingLevel.segment_id == segment_id) - port_ids = [p.id for p in query] + + with db_api.context_manager.reader.using(context): + segment_id = segment['id'] + query = context.session.query(models_v2.Port) + query = query.join( + models.PortBindingLevel, + models.PortBindingLevel.port_id == models_v2.Port.id) + query = query.filter(models.PortBindingLevel.segment_id == segment_id) + port_ids = [p.id for p in query] # There are still some ports in the segment, segment should not be deleted # TODO(xiaohhui): Should we delete the dhcp port automatically here? diff --git a/neutron/plugins/ml2/managers.py b/neutron/plugins/ml2/managers.py index 896f1a9a823..ffc6f92a98a 100644 --- a/neutron/plugins/ml2/managers.py +++ b/neutron/plugins/ml2/managers.py @@ -190,8 +190,7 @@ class TypeManager(stevedore.named.NamedExtensionManager): def create_network_segments(self, context, network, tenant_id): """Call type drivers to create network segments.""" segments = self._process_provider_create(network) - session = context.session - with session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): network_id = network['id'] if segments: for segment_index, segment in enumerate(segments): @@ -224,7 +223,7 @@ class TypeManager(stevedore.named.NamedExtensionManager): self.validate_provider_segment(segment) # Reserve segment in type driver - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): return self.reserve_provider_segment(context, segment) def is_partial_segment(self, segment): diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index 2018deb6010..0063de7b71e 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -433,7 +433,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, # After we've attempted to bind the port, we begin a # transaction, get the current port state, and decide whether # to commit the binding results. - with plugin_context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(plugin_context): # Get the current port state and build a new PortContext # reflecting this state as original state for subsequent # mechanism driver update_port_*commit() calls. @@ -594,17 +594,24 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, attributes.SUBNETS, ['_ml2_md_extend_subnet_dict']) def _ml2_md_extend_network_dict(self, result, netdb): - session = sqlalchemy.inspect(netdb).session + session = self._object_session_or_new_session(netdb) self.extension_manager.extend_network_dict(session, netdb, result) def _ml2_md_extend_port_dict(self, result, portdb): - session = sqlalchemy.inspect(portdb).session + session = self._object_session_or_new_session(portdb) self.extension_manager.extend_port_dict(session, portdb, result) def _ml2_md_extend_subnet_dict(self, result, subnetdb): - session = sqlalchemy.inspect(subnetdb).session + session = self._object_session_or_new_session(subnetdb) self.extension_manager.extend_subnet_dict(session, subnetdb, result) + @staticmethod + def _object_session_or_new_session(sql_obj): + session = sqlalchemy.inspect(sql_obj).session + if not session: + session = db_api.get_reader_session() + return session + # Note - The following hook methods have "ml2" in their names so # that they are not called twice during unit tests due to global # registration of hooks in portbindings_db.py used by other @@ -656,7 +663,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, objects = [] collection = "%ss" % resource items = request_items[collection] - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): obj_creator = getattr(self, '_create_%s_db' % resource) for item in items: try: @@ -733,10 +740,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, def _create_network_db(self, context, network): net_data = network[attributes.NETWORK] tenant_id = net_data['tenant_id'] - session = context.session registry.notify(resources.NETWORK, events.BEFORE_CREATE, self, context=context, network=net_data) - with session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): net_db = self.create_network_db(context, network) result = self._make_network_dict(net_db, process_extensions=False, context=context) @@ -766,7 +772,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, net_db[az_ext.AZ_HINTS] = az_hints result[az_ext.AZ_HINTS] = az_hints - self._apply_dict_extend_functions('networks', result, net_db) + self._apply_dict_extend_functions('networks', result, net_db) return result, mech_context @utils.transaction_guard @@ -797,8 +803,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, net_data = network[attributes.NETWORK] provider._raise_if_updates_provider_attributes(net_data) - session = context.session - with session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): original_network = super(Ml2Plugin, self).get_network(context, id) updated_network = super(Ml2Plugin, self).update_network(context, id, @@ -841,8 +846,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, @db_api.retry_if_session_inactive() def get_network(self, context, id, fields=None): - session = context.session - with session.begin(subtransactions=True): + with db_api.context_manager.reader.using(context): result = super(Ml2Plugin, self).get_network(context, id, None) self.type_manager.extend_network_dict_provider(context, result) result[api.MTU] = self._get_network_mtu(result) @@ -852,8 +856,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, @db_api.retry_if_session_inactive() def get_networks(self, context, filters=None, fields=None, sorts=None, limit=None, marker=None, page_reverse=False): - session = context.session - with session.begin(subtransactions=True): + with db_api.context_manager.reader.using(context): nets = super(Ml2Plugin, self).get_networks(context, filters, None, sorts, limit, marker, page_reverse) @@ -900,8 +903,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, self.notifier.network_delete(context, network['id']) def _create_subnet_db(self, context, subnet): - session = context.session - with session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): result, net_db, ipam_sub = self._create_subnet_precommit( context, subnet) self.extension_manager.process_create_subnet( @@ -912,6 +914,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, mech_context = driver_context.SubnetContext(self, context, result, network) self.mechanism_manager.create_subnet_precommit(mech_context) + # TODO(ataraday): temporary flush until this will be available + # in oslo.db (change https://review.openstack.org/#/c/433758/ + # got merged) + context.session.flush() + # db base plugin post commit ops self._create_subnet_postcommit(context, result, net_db, ipam_sub) @@ -941,8 +948,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, @utils.transaction_guard @db_api.retry_if_session_inactive() def update_subnet(self, context, id, subnet): - session = context.session - with session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): original_subnet = self.get_subnet(context, id) updated_subnet = self._update_subnet_precommit( context, id, subnet) @@ -1042,8 +1048,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, registry.notify(resources.PORT, events.BEFORE_CREATE, self, context=context, port=attrs) - session = context.session - with session.begin(subtransactions=True): + # NOTE(kevinbenton): triggered outside of transaction since it + # emits 'AFTER' events if it creates. + self._ensure_default_security_group(context, attrs['tenant_id']) + with db_api.context_manager.writer.using(context): dhcp_opts = attrs.get(edo_ext.EXTRADHCPOPTS, []) port_db = self.create_port_db(context, port) result = self._make_port_dict(port_db, process_extensions=False) @@ -1180,10 +1188,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, def update_port(self, context, id, port): attrs = port[attributes.PORT] need_port_update_notify = False - session = context.session bound_mech_contexts = [] - - with session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): port_db = self._get_port(context, id) binding = port_db.port_binding if not binding: @@ -1343,7 +1349,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, LOG.error(_LE("No Host supplied to bind DVR Port %s"), id) return - session = context.session binding = db.get_distributed_port_binding_by_host(context, id, host) device_id = attrs and attrs.get('device_id') @@ -1353,7 +1358,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, router_id != device_id) if update_required: try: - with session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): orig_port = self.get_port(context, id) if not binding: binding = db.ensure_distributed_port_binding( @@ -1398,8 +1403,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, router_ids = [] l3plugin = directory.get_plugin(const.L3) - session = context.session - with session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): try: port_db = self._get_port(context, id) binding = port_db.port_binding @@ -1466,8 +1470,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, @db_api.retry_if_session_inactive(context_var_name='plugin_context') def get_bound_port_context(self, plugin_context, port_id, host=None, cached_networks=None): - session = plugin_context.session - with session.begin(subtransactions=True): + with db_api.context_manager.reader.using(plugin_context) as session: try: port_db = (session.query(models_v2.Port). enable_eagerloads(False). @@ -1527,8 +1530,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, one was already performed by the caller. """ updated = False - session = context.session - with session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): port = db.get_port(context, port_id) if not port: LOG.debug("Port %(port)s update to %(val)s by agent not found", @@ -1560,7 +1562,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, if (updated and port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE): - with session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): port = db.get_port(context, port_id) if not port: LOG.warning(_LW("Port %s not found during update"), diff --git a/neutron/services/segments/db.py b/neutron/services/segments/db.py index 8e8b303473a..6f96050f553 100644 --- a/neutron/services/segments/db.py +++ b/neutron/services/segments/db.py @@ -77,7 +77,7 @@ class SegmentDbMixin(common_db_mixin.CommonDbMixin): return self._make_segment_dict(new_segment) def _create_segment_db(self, context, segment_id, segment): - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): network_id = segment['network_id'] physical_network = segment[extension.PHYSICAL_NETWORK] if physical_network == constants.ATTR_NOT_SPECIFIED: @@ -125,7 +125,7 @@ class SegmentDbMixin(common_db_mixin.CommonDbMixin): def update_segment(self, context, uuid, segment): """Update an existing segment.""" segment = segment['segment'] - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): curr_segment = self._get_segment(context, uuid) curr_segment.update(segment) return self._make_segment_dict(curr_segment) @@ -175,7 +175,7 @@ class SegmentDbMixin(common_db_mixin.CommonDbMixin): segment=segment, for_net_delete=for_net_delete) # Delete segment in DB - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): query = self._model_query(context, segment_model.NetworkSegment) query = query.filter(segment_model.NetworkSegment.id == uuid) if 0 == query.delete(): @@ -191,7 +191,7 @@ class SegmentDbMixin(common_db_mixin.CommonDbMixin): def update_segment_host_mapping(context, host, current_segment_ids): - with context.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(context): segment_host_mapping = network.SegmentHostMapping.get_objects( context, host=host) previous_segment_ids = { @@ -241,7 +241,7 @@ def get_segments_with_phys_nets(context, phys_nets): if not phys_nets: return [] - with context.session.begin(subtransactions=True): + with db_api.context_manager.reader.using(context): segments = context.session.query(segment_model.NetworkSegment).filter( segment_model.NetworkSegment.physical_network.in_(phys_nets)) return segments @@ -249,7 +249,7 @@ def get_segments_with_phys_nets(context, phys_nets): def map_segment_to_hosts(context, segment_id, hosts): """Map segment to a collection of hosts.""" - with db_api.autonested_transaction(context.session): + with db_api.context_manager.writer.using(context): for host in hosts: network.SegmentHostMapping( context, segment_id=segment_id, host=host).create() diff --git a/neutron/services/segments/plugin.py b/neutron/services/segments/plugin.py index 5d637b39ceb..b4167553c11 100644 --- a/neutron/services/segments/plugin.py +++ b/neutron/services/segments/plugin.py @@ -30,6 +30,7 @@ from neutron.callbacks import registry from neutron.callbacks import resources from neutron.common import exceptions as n_exc from neutron.db import _resource_extend as resource_extend +from neutron.db import api as db_api from neutron.db.models import segment as segment_model from neutron.db import models_v2 from neutron.extensions import ip_allocation @@ -102,14 +103,17 @@ class Plugin(db.SegmentDbMixin, segment.SegmentPluginBase): if for_net_delete: # don't check if this is a part of a network delete operation return - segment_id = segment['id'] - query = context.session.query(models_v2.Subnet.id) - query = query.filter(models_v2.Subnet.segment_id == segment_id) - subnet_ids = [s[0] for s in query] + with db_api.context_manager.reader.using(context): + segment_id = segment['id'] + query = context.session.query(models_v2.Subnet.id) + query = query.filter(models_v2.Subnet.segment_id == segment_id) + subnet_ids = [s[0] for s in query] + if subnet_ids: reason = _("The segment is still associated with subnet(s) " "%s") % ", ".join(subnet_ids) - raise exceptions.SegmentInUse(segment_id=segment_id, reason=reason) + raise exceptions.SegmentInUse(segment_id=segment_id, + reason=reason) class Event(object): diff --git a/neutron/tests/unit/db/test_db_base_plugin_v2.py b/neutron/tests/unit/db/test_db_base_plugin_v2.py index b6d561e5269..7e385892468 100644 --- a/neutron/tests/unit/db/test_db_base_plugin_v2.py +++ b/neutron/tests/unit/db/test_db_base_plugin_v2.py @@ -6295,7 +6295,7 @@ class DbModelMixin(object): class DbModelTenantTestCase(DbModelMixin, testlib_api.SqlTestCase): def _make_network(self, ctx): - with ctx.session.begin(): + with db_api.context_manager.writer.using(ctx): network = models_v2.Network(name="net_net", status="OK", tenant_id='dbcheck', admin_state_up=True) @@ -6303,7 +6303,7 @@ class DbModelTenantTestCase(DbModelMixin, testlib_api.SqlTestCase): return network def _make_subnet(self, ctx, network_id): - with ctx.session.begin(): + with db_api.context_manager.writer.using(ctx): subnet = models_v2.Subnet(name="subsub", ip_version=4, tenant_id='dbcheck', cidr='turn_down_for_what', @@ -6321,7 +6321,7 @@ class DbModelTenantTestCase(DbModelMixin, testlib_api.SqlTestCase): return port def _make_subnetpool(self, ctx): - with ctx.session.begin(): + with db_api.context_manager.writer.using(ctx): subnetpool = models_v2.SubnetPool( ip_version=4, default_prefixlen=4, min_prefixlen=4, max_prefixlen=4, shared=False, default_quota=4, @@ -6334,7 +6334,7 @@ class DbModelTenantTestCase(DbModelMixin, testlib_api.SqlTestCase): class DbModelProjectTestCase(DbModelMixin, testlib_api.SqlTestCase): def _make_network(self, ctx): - with ctx.session.begin(): + with db_api.context_manager.writer.using(ctx): network = models_v2.Network(name="net_net", status="OK", project_id='dbcheck', admin_state_up=True) @@ -6342,7 +6342,7 @@ class DbModelProjectTestCase(DbModelMixin, testlib_api.SqlTestCase): return network def _make_subnet(self, ctx, network_id): - with ctx.session.begin(): + with db_api.context_manager.writer.using(ctx): subnet = models_v2.Subnet(name="subsub", ip_version=4, project_id='dbcheck', cidr='turn_down_for_what', @@ -6360,7 +6360,7 @@ class DbModelProjectTestCase(DbModelMixin, testlib_api.SqlTestCase): return port def _make_subnetpool(self, ctx): - with ctx.session.begin(): + with db_api.context_manager.writer.using(ctx): subnetpool = models_v2.SubnetPool( ip_version=4, default_prefixlen=4, min_prefixlen=4, max_prefixlen=4, shared=False, default_quota=4, diff --git a/neutron/tests/unit/db/test_l3_dvr_db.py b/neutron/tests/unit/db/test_l3_dvr_db.py index 7ba7fba3b3b..b7ac4eaa0f5 100644 --- a/neutron/tests/unit/db/test_l3_dvr_db.py +++ b/neutron/tests/unit/db/test_l3_dvr_db.py @@ -810,7 +810,9 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase): RuntimeError, self.mixin.add_router_interface, self.ctx, router['id'], {'port_id': port['port']['id']}) - + # expire since we are re-using the session which might have stale + # ports in it + self.ctx.session.expire_all() port_info = self.core_plugin.get_port(self.ctx, port['port']['id']) self.assertEqual(port_dict['device_id'], port_info['device_id']) self.assertEqual(port_dict['device_owner'], diff --git a/neutron/tests/unit/ipam/test_subnet_alloc.py b/neutron/tests/unit/ipam/test_subnet_alloc.py index eaf51229203..ff1f853ff75 100644 --- a/neutron/tests/unit/ipam/test_subnet_alloc.py +++ b/neutron/tests/unit/ipam/test_subnet_alloc.py @@ -23,6 +23,7 @@ from oslo_db import exception as db_exc from oslo_utils import uuidutils from neutron.common import exceptions as n_exc +from neutron.db import api as db_api from neutron.ipam import requests as ipam_req from neutron.ipam import subnet_alloc from neutron.tests.unit.db import test_db_base_plugin_v2 @@ -64,7 +65,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase): sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp', prefix_list, 21, 4) sp = self.plugin._get_subnetpool(self.ctx, sp['id']) - with self.ctx.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(self.ctx): sa = subnet_alloc.SubnetAllocator(sp, self.ctx) req = ipam_req.AnySubnetRequest(self._tenant_id, uuidutils.generate_uuid(), @@ -80,7 +81,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase): sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp', ['10.1.0.0/16', '192.168.1.0/24'], 21, 4) - with self.ctx.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(self.ctx): sp = self.plugin._get_subnetpool(self.ctx, sp['id']) sa = subnet_alloc.SubnetAllocator(sp, self.ctx) req = ipam_req.SpecificSubnetRequest(self._tenant_id, @@ -122,7 +123,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase): ['10.1.0.0/16', '192.168.1.0/24'], 21, 4) sp = self.plugin._get_subnetpool(self.ctx, sp['id']) - with self.ctx.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(self.ctx): sa = subnet_alloc.SubnetAllocator(sp, self.ctx) req = ipam_req.AnySubnetRequest(self._tenant_id, uuidutils.generate_uuid(), @@ -137,7 +138,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase): ['10.1.0.0/16', '192.168.1.0/24'], 21, 4) sp = self.plugin._get_subnetpool(self.ctx, sp['id']) - with self.ctx.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(self.ctx): sa = subnet_alloc.SubnetAllocator(sp, self.ctx) req = ipam_req.SpecificSubnetRequest(self._tenant_id, uuidutils.generate_uuid(), @@ -154,7 +155,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase): ['2210::/64'], 64, 6) sp = self.plugin._get_subnetpool(self.ctx, sp['id']) - with self.ctx.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(self.ctx): sa = subnet_alloc.SubnetAllocator(sp, self.ctx) req = ipam_req.SpecificSubnetRequest(self._tenant_id, uuidutils.generate_uuid(), diff --git a/neutron/tests/unit/plugins/ml2/drivers/ext_test.py b/neutron/tests/unit/plugins/ml2/drivers/ext_test.py index 6c39020b9b8..4571f38900f 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/ext_test.py +++ b/neutron/tests/unit/plugins/ml2/drivers/ext_test.py @@ -129,7 +129,8 @@ class TestNetworkExtension(model_base.BASEV2): value = sa.Column(sa.String(64)) network = orm.relationship( models_v2.Network, - backref=orm.backref('extension', cascade='delete', uselist=False)) + backref=orm.backref('extension', cascade='delete', uselist=False, + lazy='joined')) class TestSubnetExtension(model_base.BASEV2): @@ -139,7 +140,8 @@ class TestSubnetExtension(model_base.BASEV2): value = sa.Column(sa.String(64)) subnet = orm.relationship( models_v2.Subnet, - backref=orm.backref('extension', cascade='delete', uselist=False)) + backref=orm.backref('extension', cascade='delete', uselist=False, + lazy='joined')) class TestPortExtension(model_base.BASEV2): @@ -149,7 +151,8 @@ class TestPortExtension(model_base.BASEV2): value = sa.Column(sa.String(64)) port = orm.relationship( models_v2.Port, - backref=orm.backref('extension', cascade='delete', uselist=False)) + backref=orm.backref('extension', cascade='delete', uselist=False, + lazy='joined')) class TestDBExtensionDriver(TestExtensionDriverBase): diff --git a/neutron/tests/unit/plugins/ml2/test_db.py b/neutron/tests/unit/plugins/ml2/test_db.py index 3423cb9dbd7..19471766fef 100644 --- a/neutron/tests/unit/plugins/ml2/test_db.py +++ b/neutron/tests/unit/plugins/ml2/test_db.py @@ -23,6 +23,7 @@ from oslo_utils import uuidutils from sqlalchemy.orm import exc from sqlalchemy.orm import query +from neutron.db import api as db_api from neutron.db import db_base_plugin_v2 from neutron.db.models import l3 as l3_models from neutron.db import models_v2 @@ -63,7 +64,7 @@ class Ml2DBTestCase(testlib_api.SqlTestCase): return port def _setup_neutron_portbinding(self, port_id, vif_type, host): - with self.ctx.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(self.ctx): self.ctx.session.add(models.PortBinding(port_id=port_id, vif_type=vif_type, host=host)) @@ -285,7 +286,7 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase): self.setup_coreplugin(PLUGIN_NAME) def _setup_neutron_network(self, network_id, port_ids): - with self.ctx.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(self.ctx): network_obj.Network(self.ctx, id=network_id).create() ports = [] for port_id in port_ids: @@ -311,7 +312,7 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase): def _setup_distributed_binding(self, network_id, port_id, router_id, host_id): - with self.ctx.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(self.ctx): record = models.DistributedPortBinding( port_id=port_id, host=host_id, @@ -403,7 +404,7 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase): def test_distributed_port_binding_deleted_by_port_deletion(self): network_id = uuidutils.generate_uuid() network_obj.Network(self.ctx, id=network_id).create() - with self.ctx.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(self.ctx): device_owner = constants.DEVICE_OWNER_DVR_INTERFACE port = models_v2.Port( id='port_id', @@ -428,7 +429,7 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase): self.ctx.session.add(models.DistributedPortBinding( **binding_kwarg)) with warnings.catch_warnings(record=True) as warning_list: - with self.ctx.session.begin(subtransactions=True): + with db_api.context_manager.writer.using(self.ctx): self.ctx.session.delete(port) self.assertEqual([], warning_list) ports = ml2_db.get_distributed_port_bindings(self.ctx, diff --git a/neutron/tests/unit/plugins/ml2/test_plugin.py b/neutron/tests/unit/plugins/ml2/test_plugin.py index 091f0b53f87..15018531b24 100644 --- a/neutron/tests/unit/plugins/ml2/test_plugin.py +++ b/neutron/tests/unit/plugins/ml2/test_plugin.py @@ -660,7 +660,7 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase): status='ACTIVE', device_id='vm_id', device_owner=DEVICE_OWNER_COMPUTE ) - with self.context.session.begin(): + with db_api.context_manager.writer.using(self.context): self.context.session.add(port_db) self.assertIsNone(plugin._port_provisioned('port', 'evt', 'trigger', self.context, port_id)) @@ -1404,7 +1404,7 @@ class TestMl2DvrPortsV2(TestMl2PortsV2): {'subnet_id': s['subnet']['id']}) # lie to turn the port into an SNAT interface - with self.context.session.begin(): + with db_api.context_manager.reader.using(self.context): rp = self.context.session.query(l3_models.RouterPort).filter_by( port_id=p['port_id']).first() rp.port_type = constants.DEVICE_OWNER_ROUTER_SNAT @@ -2560,17 +2560,21 @@ class TestML2Segments(Ml2PluginV2TestCase): ml2_db.subscribe() plugin = directory.get_plugin() with self.port(device_owner=fake_owner_compute) as port: - binding = plugin._get_port( - self.context, port['port']['id']).port_binding - binding['host'] = 'host-ovs-no_filter' - mech_context = driver_context.PortContext( - plugin, self.context, port['port'], - plugin.get_network(self.context, port['port']['network_id']), - binding, None) - plugin._bind_port_if_needed(mech_context) - segment = segments_db.get_network_segments( - self.context, port['port']['network_id'])[0] - segment['network_id'] = port['port']['network_id'] + # add writer here to make sure that the following operations are + # performed in the same session + with db_api.context_manager.writer.using(self.context): + binding = plugin._get_port( + self.context, port['port']['id']).port_binding + binding['host'] = 'host-ovs-no_filter' + mech_context = driver_context.PortContext( + plugin, self.context, port['port'], + plugin.get_network(self.context, + port['port']['network_id']), + binding, None) + plugin._bind_port_if_needed(mech_context) + segment = segments_db.get_network_segments( + self.context, port['port']['network_id'])[0] + segment['network_id'] = port['port']['network_id'] self.assertRaises(c_exc.CallbackFailure, registry.notify, resources.SEGMENT, events.BEFORE_DELETE, mock.ANY,