Merge "use context manager from neutron-lib"

This commit is contained in:
Zuul 2018-10-27 09:39:17 +00:00 committed by Gerrit Code Review
commit 55be92e253
47 changed files with 379 additions and 395 deletions

View File

@ -33,7 +33,6 @@ from neutron.api.v2 import resource as wsgi_resource
from neutron.common import constants as n_const
from neutron.common import exceptions as n_exc
from neutron.common import rpc as n_rpc
from neutron.db import api as ndb_api
from neutron import policy
from neutron import quota
from neutron.quota import resource_registry
@ -493,7 +492,7 @@ class Controller(object):
def notify(create_result):
# Ensure usage trackers for all resources affected by this API
# operation are marked as dirty
with ndb_api.context_manager.writer.using(request.context):
with db_api.CONTEXT_WRITER.using(request.context):
# Commit the reservation(s)
for reservation in reservations:
quota.QUOTAS.commit_reservation(

View File

@ -15,13 +15,13 @@
from neutron_lib.api.definitions import address_scope as apidef
from neutron_lib.api.definitions import network as net_def
from neutron_lib import constants
from neutron_lib.db import api as db_api
from neutron_lib.db import utils as db_utils
from neutron_lib.exceptions import address_scope as api_err
from oslo_utils import uuidutils
from neutron._i18n import _
from neutron.db import _resource_extend as resource_extend
from neutron.db import api as db_api
from neutron.extensions import address_scope as ext_address_scope
from neutron.objects import address_scope as obj_addr_scope
from neutron.objects import base as base_obj
@ -113,7 +113,7 @@ class AddressScopeDbMixin(ext_address_scope.AddressScopePluginBase):
return obj_addr_scope.AddressScope.count(context, **filters)
def delete_address_scope(self, context, id):
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
if subnetpool_obj.SubnetPool.get_objects(context,
address_scope_id=id):
raise api_err.AddressScopeInUse(address_scope_id=id)

View File

@ -16,13 +16,13 @@
from neutron_lib.api.definitions import allowedaddresspairs as addr_apidef
from neutron_lib.api.definitions import port as port_def
from neutron_lib.api import validators
from neutron_lib.db import api as db_api
from neutron_lib.db import utils as db_utils
from neutron_lib.exceptions import allowedaddresspairs as addr_exc
from neutron_lib.objects import exceptions
from neutron.common import utils
from neutron.db import _resource_extend as resource_extend
from neutron.db import api as db_api
from neutron.objects.port.extensions import (allowedaddresspairs
as obj_addr_pair)
@ -36,7 +36,7 @@ class AllowedAddressPairsMixin(object):
if not validators.is_attr_set(allowed_address_pairs):
return []
try:
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
for address_pair in allowed_address_pairs:
# use port.mac_address if no mac address in address pair
if 'mac_address' not in address_pair:

View File

@ -21,6 +21,7 @@ from neutron_lib.api.definitions import subnet as subnet_def
from neutron_lib.api.definitions import subnetpool as subnetpool_def
from neutron_lib.api import validators
from neutron_lib import constants
from neutron_lib.db import api as db_api
from neutron_lib.db import utils as db_utils
from neutron_lib import exceptions as n_exc
from neutron_lib.utils import net
@ -32,7 +33,6 @@ from neutron.common import constants as n_const
from neutron.common import exceptions
from neutron.db import _model_query as model_query
from neutron.db import _resource_extend as resource_extend
from neutron.db import api as db_api
from neutron.db import common_db_mixin
from neutron.db import models_v2
from neutron.objects import base as base_obj
@ -91,7 +91,7 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
def _generate_mac():
return net.get_random_mac(cfg.CONF.base_mac.split(':'))
@db_api.context_manager.reader
@db_api.CONTEXT_READER
def _is_mac_in_use(self, context, network_id, mac_address):
return port_obj.Port.objects_exist(context, network_id=network_id,
mac_address=mac_address)
@ -110,7 +110,7 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
subnet_id=subnet_id)
@staticmethod
@db_api.context_manager.writer
@db_api.CONTEXT_WRITER
def _store_ip_allocation(context, ip_address, network_id, subnet_id,
port_id):
LOG.debug("Allocated IP %(ip_address)s "
@ -263,11 +263,11 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
context, network_id=network_id,
device_owner=constants.DEVICE_OWNER_ROUTER_GW)
@db_api.context_manager.reader
@db_api.CONTEXT_READER
def _get_subnets_by_network(self, context, network_id):
return subnet_obj.Subnet.get_objects(context, network_id=network_id)
@db_api.context_manager.reader
@db_api.CONTEXT_READER
def _get_subnets_by_subnetpool(self, context, subnetpool_id):
return subnet_obj.Subnet.get_objects(context,
subnetpool_id=subnetpool_id)
@ -279,7 +279,7 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
filters = filters or {}
# TODO(ihrachys) remove explicit reader usage when subnet OVO switches
# to engine facade by default
with db_api.context_manager.reader.using(context):
with db_api.CONTEXT_READER.using(context):
return subnet_obj.Subnet.get_objects(context, _pager=pager,
validate_filters=False,
**filters)

View File

@ -26,7 +26,7 @@ from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants
from neutron_lib import context as ctx
from neutron_lib.db import api as lib_db_api
from neutron_lib.db import api as db_api
from neutron_lib.db import utils as ndb_utils
from neutron_lib import exceptions as exc
from neutron_lib.exceptions import l3 as l3_exc
@ -48,7 +48,6 @@ from neutron.common import ipv6_utils
from neutron.common import utils
from neutron.db import _model_query as model_query
from neutron.db import _resource_extend as resource_extend
from neutron.db import api as db_api
from neutron.db import db_base_plugin_common
from neutron.db import ipam_pluggable_backend
from neutron.db import models_v2
@ -166,18 +165,18 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
# NOTE(arosen) These event listeners are here to hook into when
# port status changes and notify nova about their change.
self.nova_notifier = nova.Notifier.get_instance()
lib_db_api.sqla_listen(models_v2.Port, 'after_insert',
self.nova_notifier.send_port_status)
lib_db_api.sqla_listen(models_v2.Port, 'after_update',
self.nova_notifier.send_port_status)
lib_db_api.sqla_listen(
db_api.sqla_listen(models_v2.Port, 'after_insert',
self.nova_notifier.send_port_status)
db_api.sqla_listen(models_v2.Port, 'after_update',
self.nova_notifier.send_port_status)
db_api.sqla_listen(
models_v2.Port.status, 'set',
self.nova_notifier.record_port_status_changed)
@registry.receives(resources.RBAC_POLICY, [events.BEFORE_CREATE,
events.BEFORE_UPDATE,
events.BEFORE_DELETE])
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def validate_network_rbac_policy_change(self, resource, event, trigger,
context, object_type, policy,
**kwargs):
@ -369,7 +368,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
collection = "%ss" % resource
items = request_items[collection]
try:
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
for item in items:
obj_creator = getattr(self, 'create_%s' % resource)
objects.append(obj_creator(context, item))
@ -380,11 +379,11 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
{'resource': resource, 'item': item})
return objects
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def create_network_bulk(self, context, networks):
return self._create_bulk('network', context, networks)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def create_network(self, context, network):
"""Handle creation of a single network."""
net_db = self.create_network_db(context, network)
@ -394,7 +393,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
def create_network_db(self, context, network):
# single request processing
n = network['network']
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
args = {'tenant_id': n['tenant_id'],
'id': n.get('id') or uuidutils.generate_uuid(),
'name': n['name'],
@ -411,10 +410,10 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
context.session.add(network)
return network
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def update_network(self, context, id, network):
n = network['network']
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
network = self._get_network(context, id)
# validate 'shared' parameter
if 'shared' in n:
@ -459,13 +458,13 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
if non_auto_ports.count():
raise exc.NetworkInUse(net_id=net_id)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def delete_network(self, context, id):
registry.publish(resources.NETWORK, events.BEFORE_DELETE, self,
payload=events.DBEventPayload(
context, resource_id=id))
self._ensure_network_not_in_use(context, id)
with db_api.context_manager.reader.using(context):
with db_api.CONTEXT_READER.using(context):
auto_delete_port_ids = [p.id for p in context.session.query(
models_v2.Port.id).filter_by(network_id=id).filter(
models_v2.Port.device_owner.in_(AUTO_DELETE_PORT_OWNERS))]
@ -478,12 +477,12 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
"The port has already been deleted.", port_id)
# clean up subnets
subnets = self._get_subnets_by_network(context, id)
with lib_db_api.exc_to_retry(os_db_exc.DBReferenceError):
with db_api.exc_to_retry(os_db_exc.DBReferenceError):
# retry reference errors so we can check the port type and
# cleanup if a network-owned port snuck in without failing
for subnet in subnets:
self._delete_subnet(context, subnet)
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
network_db = self._get_network(context, id)
network = self._make_network_dict(network_db, context=context)
registry.notify(resources.NETWORK, events.PRECOMMIT_DELETE,
@ -497,12 +496,12 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
registry.notify(resources.NETWORK, events.AFTER_DELETE,
self, context=context, network=network)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_network(self, context, id, fields=None):
network = self._get_network(context, id)
return self._make_network_dict(network, fields, context=context)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def _get_networks(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
@ -518,7 +517,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
marker_obj=marker_obj,
page_reverse=page_reverse)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_networks(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
@ -531,12 +530,12 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
limit=limit, marker=marker, page_reverse=page_reverse)
]
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_networks_count(self, context, filters=None):
return model_query.get_collection_count(context, models_v2.Network,
filters=filters)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def create_subnet_bulk(self, context, subnets):
return self._create_bulk('subnet', context, subnets)
@ -608,7 +607,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
# a subnet-update and a router-interface-add operation are
# executed concurrently
if cur_subnet and not ipv6_utils.is_ipv6_pd_enabled(s):
with db_api.context_manager.reader.using(context):
with db_api.CONTEXT_READER.using(context):
# TODO(electrocucaracha): Look a solution for Join in OVO
ipal = models_v2.IPAllocation
alloc_qry = context.session.query(ipal.port_id)
@ -717,7 +716,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
external_gateway_info}}
l3plugin.update_router(context, router_id, info)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def _create_subnet_postcommit(self, context, result, network, ipam_subnet):
if hasattr(network, 'external') and network.external:
self._update_router_gw_ports(context,
@ -780,7 +779,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
msg = _('No default subnetpool found for IPv%s') % ip_version
raise exc.BadRequest(resource='subnets', msg=msg)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def create_subnet(self, context, subnet):
result, net, ipam_sub = self._create_subnet_precommit(context, subnet)
self._create_subnet_postcommit(context, result, net, ipam_sub)
@ -828,7 +827,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
raise exc.BadRequest(resource='subnets', msg=msg)
self._validate_subnet(context, s)
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
network = self._get_network(context,
subnet['subnet']['network_id'])
subnet, ipam_subnet = self.ipam.allocate_subnet(context,
@ -849,7 +848,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
'end': str(netaddr.IPAddress(p.last, subnet['ip_version']))}
for p in allocation_pools]
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def update_subnet(self, context, id, subnet):
"""Update the subnet with new info.
@ -917,7 +916,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
registry.notify(resources.SUBNET, events.BEFORE_UPDATE,
self, **kwargs)
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
subnet, changes = self.ipam.update_db_subnet(context, id, s,
db_pools)
return self._make_subnet_dict(subnet, context=context), orig
@ -972,13 +971,13 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
**kwargs)
return result
@db_api.context_manager.reader
@db_api.CONTEXT_READER
def _subnet_get_user_allocation(self, context, subnet_id):
"""Check if there are any user ports on subnet and return first."""
return port_obj.IPAllocation.get_alloc_by_subnet_id(
context, subnet_id, AUTO_DELETE_PORT_OWNERS)
@db_api.context_manager.reader
@db_api.CONTEXT_READER
def _subnet_check_ip_allocations_internal_router_ports(self, context,
subnet_id):
# Do not delete the subnet if IP allocations for internal
@ -990,7 +989,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
"cannot delete", subnet_id)
raise exc.SubnetInUse(subnet_id=subnet_id)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def _remove_subnet_from_port(self, context, sub_id, port_id, auto_subnet):
try:
fixed = [f for f in self.get_port(context, port_id)['fixed_ips']
@ -1018,7 +1017,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
'subnet': id})
raise exc.SubnetInUse(subnet_id=id)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def _remove_subnet_ip_allocations_from_ports(self, context, id):
# Do not allow a subnet to be deleted if a router is attached to it
self._subnet_check_ip_allocations_internal_router_ports(
@ -1036,7 +1035,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
self._remove_subnet_from_port(context, id, port_id,
auto_subnet=is_auto_addr_subnet)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def delete_subnet(self, context, id):
LOG.debug("Deleting subnet %s", id)
# Make sure the subnet isn't used by other resources
@ -1046,8 +1045,8 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
self._delete_subnet(context, subnet)
def _delete_subnet(self, context, subnet):
with lib_db_api.exc_to_retry(sql_exc.IntegrityError), \
db_api.context_manager.writer.using(context):
with db_api.exc_to_retry(sql_exc.IntegrityError), \
db_api.CONTEXT_WRITER.using(context):
registry.notify(resources.SUBNET, events.PRECOMMIT_DELETE,
self, context=context, subnet_id=subnet.id)
subnet.delete()
@ -1057,12 +1056,12 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
registry.notify(resources.SUBNET, events.AFTER_DELETE,
self, context=context, subnet=subnet.to_dict())
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_subnet(self, context, id, fields=None):
subnet_obj = self._get_subnet_object(context, id)
return self._make_subnet_dict(subnet_obj, fields, context=context)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_subnets(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
@ -1073,13 +1072,13 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
for subnet_object in subnet_objs
]
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_subnets_count(self, context, filters=None):
filters = filters or {}
return subnet_obj.Subnet.count(context, validate_filters=False,
**filters)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_subnets_by_network(self, context, network_id):
return [self._make_subnet_dict(subnet_obj) for subnet_obj in
self._get_subnets_by_network(context, network_id)]
@ -1153,7 +1152,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
"been set. Only one default may exist per IP family")
raise exc.InvalidInput(error_message=msg)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def create_subnetpool(self, context, subnetpool):
sp = subnetpool['subnetpool']
sp_reader = subnet_alloc.SubnetPoolReader(sp)
@ -1182,11 +1181,11 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
return self._make_subnetpool_dict(subnetpool.db_obj)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def update_subnetpool(self, context, id, subnetpool):
new_sp = subnetpool['subnetpool']
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
orig_sp = self._get_subnetpool(context, id=id)
updated = _update_subnetpool_dict(orig_sp, new_sp)
reader = subnet_alloc.SubnetPoolReader(updated)
@ -1220,12 +1219,12 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
updated, orig_sp.db_obj)
return updated
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_subnetpool(self, context, id, fields=None):
subnetpool = self._get_subnetpool(context, id)
return self._make_subnetpool_dict(subnetpool.db_obj, fields)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_subnetpools(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
@ -1238,7 +1237,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
for pool in subnetpools
]
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_default_subnetpool(self, context, ip_version):
"""Retrieve the default subnetpool for the given IP version."""
filters = {'is_default': True,
@ -1247,9 +1246,9 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
if subnetpool:
return subnetpool[0]
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def delete_subnetpool(self, context, id):
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
subnetpool = self._get_subnetpool(context, id=id)
if subnet_obj.Subnet.objects_exist(context, subnetpool_id=id):
reason = _("Subnet pool has existing allocations")
@ -1264,7 +1263,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
op=_("mac address update"), port_id=id,
device_owner=device_owner)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def create_port_bulk(self, context, ports):
return self._create_bulk('port', context, ports)
@ -1281,7 +1280,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
context.session.add(db_port)
return db_port
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def create_port(self, context, port):
db_port = self.create_port_db(context, port)
return self._make_port_dict(db_port, process_extensions=False)
@ -1306,7 +1305,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
description=p.get('description'))
if p.get('mac_address') is not constants.ATTR_NOT_SPECIFIED:
port_data['mac_address'] = p.get('mac_address')
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
# Ensure that the network exists.
self._get_network(context, network_id)
@ -1345,11 +1344,11 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
self._check_mac_addr_update(context, db_port,
new_mac, current_owner)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def update_port(self, context, id, port):
new_port = port['port']
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
db_port = self._get_port(context, id)
new_mac = new_port.get('mac_address')
self._validate_port_for_update(context, db_port, new_port, new_mac)
@ -1380,13 +1379,13 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
raise os_db_exc.RetryRequest(e)
return self._make_port_dict(db_port)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def delete_port(self, context, id):
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
self.ipam.delete_port(context, id)
def delete_ports_by_device_id(self, context, device_id, network_id=None):
with db_api.context_manager.reader.using(context):
with db_api.CONTEXT_READER.using(context):
query = (context.session.query(models_v2.Port.id)
.enable_eagerloads(False)
.filter(models_v2.Port.device_id == device_id))
@ -1402,8 +1401,8 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
"The port has already been deleted.",
port_id)
@lib_db_api.retry_if_session_inactive()
@db_api.context_manager.reader
@db_api.retry_if_session_inactive()
@db_api.CONTEXT_READER
def get_port(self, context, id, fields=None):
port = self._get_port(context, id)
return self._make_port_dict(port, fields)
@ -1427,7 +1426,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
Port.fixed_ips.any(IPAllocation.subnet_id.in_(subnet_ids)))
return query
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_ports(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
@ -1442,7 +1441,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
items.reverse()
return items
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_ports_count(self, context, filters=None):
return self._get_ports_query(context, filters).count()

View File

@ -20,7 +20,7 @@ from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants
from neutron_lib.db import api as lib_db_api
from neutron_lib.db import api as db_api
from neutron_lib import exceptions as n_exc
from neutron_lib.exceptions import dvr as dvr_exc
from neutron_lib.objects import exceptions
@ -33,7 +33,6 @@ from sqlalchemy import or_
from neutron.common import utils
from neutron.conf.db import dvr_mac_db
from neutron.db import api as db_api
from neutron.db import models_v2
from neutron.extensions import dvr as ext_dvr
from neutron.objects import router
@ -60,7 +59,7 @@ class DVRDbMixin(ext_dvr.DVRMacAddressPluginBase):
return self._plugin
@staticmethod
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def _db_delete_mac_associated_with_agent(context, agent):
host = agent['host']
plugin = directory.get_plugin()
@ -85,7 +84,7 @@ class DVRDbMixin(ext_dvr.DVRMacAddressPluginBase):
DVRDbMixin._db_delete_mac_associated_with_agent(
payload.context, payload.latest_state)
@db_api.context_manager.reader
@db_api.CONTEXT_READER
def _get_dvr_mac_address_by_host(self, context, host):
dvr_obj = router.DVRMacAddress.get_object(context, host=host)
if not dvr_obj:
@ -93,9 +92,9 @@ class DVRDbMixin(ext_dvr.DVRMacAddressPluginBase):
return self._make_dvr_mac_address_dict(dvr_obj)
@utils.transaction_guard
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def _create_dvr_mac_address_retry(self, context, host, base_mac):
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
mac_address = net.get_random_mac(base_mac)
dvr_mac_binding = router.DVRMacAddress(
context, host=host, mac_address=netaddr.EUI(mac_address))
@ -120,7 +119,7 @@ class DVRDbMixin(ext_dvr.DVRMacAddressPluginBase):
db_api.MAX_RETRIES)
raise n_exc.HostMacAddressGenerationFailure(host=host)
@db_api.context_manager.reader
@db_api.CONTEXT_READER
def get_dvr_mac_address_list(self, context):
return [
dvr_mac.to_dict()
@ -142,7 +141,7 @@ class DVRDbMixin(ext_dvr.DVRMacAddressPluginBase):
'mac_address': str(dvr_mac_entry['mac_address'])}
@log_helpers.log_method_call
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_ports_on_host_by_subnet(self, context, host, subnet):
"""Returns DVR serviced ports on a given subnet in the input host
@ -172,7 +171,7 @@ class DVRDbMixin(ext_dvr.DVRMacAddressPluginBase):
return ports
@log_helpers.log_method_call
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_subnet_for_dvr(self, context, subnet, fixed_ips=None):
if fixed_ips:
subnet_data = fixed_ips[0]['subnet_id']

View File

@ -15,9 +15,9 @@
from neutron_lib.api.definitions import extra_dhcp_opt as edo_ext
from neutron_lib.api.definitions import port as port_def
from neutron_lib.db import api as db_api
from neutron.db import _resource_extend as resource_extend
from neutron.db import api as db_api
from neutron.objects.port.extensions import extra_dhcp_opt as obj_extra_dhcp
@ -41,7 +41,7 @@ class ExtraDhcpOptMixin(object):
extra_dhcp_opts):
if not extra_dhcp_opts:
return port
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
for dopt in extra_dhcp_opts:
if self._is_valid_opt_value(dopt['opt_name'],
dopt['opt_value']):
@ -79,7 +79,7 @@ class ExtraDhcpOptMixin(object):
context, port_id=id)
# if there are currently no dhcp_options associated to
# this port, Then just insert the new ones and be done.
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
for upd_rec in dopts:
for opt in opts:
if (opt['opt_name'] == upd_rec['opt_name'] and

View File

@ -12,12 +12,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.db import api as db_api
from neutron_lib.db import utils as db_utils
from neutron_lib.exceptions import flavors as flav_exc
from oslo_db import exception as db_exc
from oslo_log import log as logging
from neutron.db import api as db_api
from neutron.db import common_db_mixin
from neutron.db import servicetype_db as sdb
from neutron.objects import base as base_obj
@ -101,7 +101,7 @@ class FlavorsDbMixin(common_db_mixin.CommonDbMixin):
return self._make_flavor_dict(obj)
def update_flavor(self, context, flavor_id, flavor):
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
self._ensure_flavor_not_in_use(context, flavor_id)
fl_obj = self._get_flavor(context, flavor_id)
fl_obj.update_fields(flavor['flavor'])
@ -118,7 +118,7 @@ class FlavorsDbMixin(common_db_mixin.CommonDbMixin):
# flavors so for now we just capture the foreign key violation
# to detect if it's in use.
try:
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
self._ensure_flavor_not_in_use(context, flavor_id)
self._get_flavor(context, flavor_id).delete()
except db_exc.DBReferenceError:
@ -136,7 +136,7 @@ class FlavorsDbMixin(common_db_mixin.CommonDbMixin):
def create_flavor_service_profile(self, context,
service_profile, flavor_id):
sp = service_profile['service_profile']
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
if obj_flavor.FlavorServiceProfileBinding.objects_exist(
context, service_profile_id=sp['id'], flavor_id=flavor_id):
raise flav_exc.FlavorServiceProfileBindingExists(
@ -189,7 +189,7 @@ class FlavorsDbMixin(common_db_mixin.CommonDbMixin):
if sp.get('driver'):
self._validate_driver(context, sp['driver'])
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
self._ensure_service_profile_not_in_use(context,
service_profile_id)
sp_obj = self._get_service_profile(context, service_profile_id)
@ -202,7 +202,7 @@ class FlavorsDbMixin(common_db_mixin.CommonDbMixin):
return self._make_service_profile_dict(sp_db, fields)
def delete_service_profile(self, context, sp_id):
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
self._ensure_service_profile_not_in_use(context, sp_id)
self._get_service_profile(context, sp_id).delete()

View File

@ -22,6 +22,7 @@ from neutron_lib.api.definitions import ip_allocation as ipalloc_apidef
from neutron_lib.api.definitions import portbindings
from neutron_lib.api import validators
from neutron_lib import constants as const
from neutron_lib.db import api as db_api
from neutron_lib.db import utils as db_utils
from neutron_lib import exceptions as exc
from oslo_config import cfg
@ -32,7 +33,6 @@ from neutron._i18n import _
from neutron.common import exceptions as n_exc
from neutron.common import ipv6_utils
from neutron.common import utils as common_utils
from neutron.db import api as db_api
from neutron.db import db_base_plugin_common
from neutron.db import models_v2
from neutron.extensions import segment
@ -159,7 +159,7 @@ class IpamBackendMixin(db_base_plugin_common.DbBasePluginCommon):
del s["dns_nameservers"]
return new_dns_addr_list
@db_api.context_manager.writer
@db_api.CONTEXT_WRITER
def _update_subnet_allocation_pools(self, context, subnet_id, s):
subnet_obj.IPAllocationPool.delete_objects(context,
subnet_id=subnet_id)

View File

@ -18,6 +18,7 @@ import copy
import netaddr
from neutron_lib.api.definitions import portbindings
from neutron_lib import constants
from neutron_lib.db import api as db_api
from neutron_lib import exceptions as n_exc
from oslo_db import exception as db_exc
from oslo_log import log as logging
@ -25,7 +26,6 @@ from oslo_utils import excutils
from sqlalchemy import and_
from neutron.common import ipv6_utils
from neutron.db import api as db_api
from neutron.db import ipam_backend_mixin
from neutron.db import models_v2
from neutron.ipam import driver
@ -322,7 +322,7 @@ class IpamPluggableBackend(ipam_backend_mixin.IpamBackendMixin):
original=changes.original,
remove=removed)
@db_api.context_manager.writer
@db_api.CONTEXT_WRITER
def save_allocation_pools(self, context, subnet, allocation_pools):
for pool in allocation_pools:
first_ip = str(netaddr.IPAddress(pool.first, pool.version))

View File

@ -13,6 +13,7 @@
# under the License.
import netaddr
from neutron_lib.db import api as db_api
from neutron_lib.db import utils as db_utils
from neutron_lib.exceptions import metering as metering_exc
from oslo_db import exception as db_exc
@ -20,7 +21,6 @@ from oslo_utils import uuidutils
from neutron.api.rpc.agentnotifiers import metering_rpc_agent_api
from neutron.common import constants
from neutron.db import api as db_api
from neutron.db import common_db_mixin as base_db
from neutron.db import l3_dvr_db
from neutron.extensions import metering
@ -131,7 +131,7 @@ class MeteringDbMixin(metering.MeteringPluginBase,
def create_metering_label_rule(self, context, metering_label_rule):
m = metering_label_rule['metering_label_rule']
try:
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
label_id = m['metering_label_id']
ip_prefix = m['remote_ip_prefix']
direction = m['direction']
@ -151,7 +151,7 @@ class MeteringDbMixin(metering.MeteringPluginBase,
return self._make_metering_label_rule_dict(rule)
def delete_metering_label_rule(self, context, rule_id):
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
rule = self._get_metering_label_rule(context, rule_id)
rule.delete()

View File

@ -14,10 +14,10 @@
# limitations under the License.
import netaddr
from neutron_lib.db import api as db_api
import six
from sqlalchemy import func
from neutron.db import api as db_api
import neutron.db.models_v2 as mod
NETWORK_ID = 'network_id'
@ -86,7 +86,7 @@ class IpAvailabilityMixin(object):
return net_ip_availabilities
@classmethod
@db_api.context_manager.reader
@db_api.CONTEXT_READER
def _build_network_used_ip_query(cls, context, filters):
# Generate a query to gather network/subnet/used_ips.
# Ensure query is tolerant of missing child table data (outerjoins)
@ -103,7 +103,7 @@ class IpAvailabilityMixin(object):
return cls._adjust_query_for_filters(query, filters)
@classmethod
@db_api.context_manager.reader
@db_api.CONTEXT_READER
def _build_total_ips_query(cls, context, filters):
query = context.session.query()
query = query.add_columns(*cls.total_ips_columns)

View File

@ -16,11 +16,11 @@
from neutron_lib.api.definitions import port as port_def
from neutron_lib.api.definitions import portbindings
from neutron_lib.api import validators
from neutron_lib.db import api as db_api
from neutron_lib.plugins import directory
from neutron.db import _model_query as model_query
from neutron.db import _resource_extend as resource_extend
from neutron.db import api as db_api
from neutron.db.models import portbinding as pmodels
from neutron.db import models_v2
from neutron.db import portbindings_base
@ -71,7 +71,7 @@ class PortBindingMixin(portbindings_base.PortBindingBaseMixin):
host = port_data.get(portbindings.HOST_ID)
host_set = validators.is_attr_set(host)
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
bind_port = context.session.query(
pmodels.PortBindingPort).filter_by(port_id=port['id']).first()
if host_set:
@ -85,7 +85,7 @@ class PortBindingMixin(portbindings_base.PortBindingBaseMixin):
self._extend_port_dict_binding_host(port, host)
def get_port_host(self, context, port_id):
with db_api.context_manager.reader.using(context):
with db_api.CONTEXT_READER.using(context):
bind_port = (
context.session.query(pmodels.PortBindingPort.host).
filter_by(port_id=port_id).

View File

@ -15,9 +15,8 @@
import collections
import datetime
from neutron_lib.db import api as lib_db_api
from neutron_lib.db import api as db_api
from neutron.db import api as db_api
from neutron.objects import quota as quota_obj
@ -37,7 +36,7 @@ class ReservationInfo(collections.namedtuple(
"""Information about a resource reservation."""
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_quota_usage_by_resource_and_tenant(context, resource, tenant_id):
"""Return usage info for a given resource and tenant.
@ -55,7 +54,7 @@ def get_quota_usage_by_resource_and_tenant(context, resource, tenant_id):
result.dirty)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_quota_usage_by_resource(context, resource):
objs = quota_obj.QuotaUsage.get_objects(context, resource=resource)
return [QuotaUsageInfo(item.resource,
@ -64,7 +63,7 @@ def get_quota_usage_by_resource(context, resource):
item.dirty) for item in objs]
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_quota_usage_by_tenant_id(context, tenant_id):
objs = quota_obj.QuotaUsage.get_objects(context, project_id=tenant_id)
return [QuotaUsageInfo(item.resource,
@ -73,7 +72,7 @@ def get_quota_usage_by_tenant_id(context, tenant_id):
item.dirty) for item in objs]
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def set_quota_usage(context, resource, tenant_id,
in_use=None, delta=False):
"""Set resource quota usage.
@ -87,7 +86,7 @@ def set_quota_usage(context, resource, tenant_id,
:param delta: Specifies whether in_use is an absolute number
or a delta (default to False)
"""
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
usage_data = quota_obj.QuotaUsage.get_object(
context, resource=resource, project_id=tenant_id)
if not usage_data:
@ -107,8 +106,8 @@ def set_quota_usage(context, resource, tenant_id,
usage_data.in_use, usage_data.dirty)
@lib_db_api.retry_if_session_inactive()
@db_api.context_manager.writer
@db_api.retry_if_session_inactive()
@db_api.CONTEXT_WRITER
def set_quota_usage_dirty(context, resource, tenant_id, dirty=True):
"""Set quota usage dirty bit for a given resource and tenant.
@ -126,8 +125,8 @@ def set_quota_usage_dirty(context, resource, tenant_id, dirty=True):
return 0
@lib_db_api.retry_if_session_inactive()
@db_api.context_manager.writer
@db_api.retry_if_session_inactive()
@db_api.CONTEXT_WRITER
def set_resources_quota_usage_dirty(context, resources, tenant_id, dirty=True):
"""Set quota usage dirty bit for a given tenant and multiple resources.
@ -147,8 +146,8 @@ def set_resources_quota_usage_dirty(context, resources, tenant_id, dirty=True):
return len(objs)
@lib_db_api.retry_if_session_inactive()
@db_api.context_manager.writer
@db_api.retry_if_session_inactive()
@db_api.CONTEXT_WRITER
def set_all_quota_usage_dirty(context, resource, dirty=True):
"""Set the dirty bit on quota usage for all tenants.
@ -165,7 +164,7 @@ def set_all_quota_usage_dirty(context, resource, dirty=True):
return len(objs)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def create_reservation(context, tenant_id, deltas, expiration=None):
# This method is usually called from within another transaction.
# Consider using begin_nested
@ -185,7 +184,7 @@ def create_reservation(context, tenant_id, deltas, expiration=None):
for delta in reserv_obj.resource_deltas))
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_reservation(context, reservation_id):
reserv_obj = quota_obj.Reservation.get_object(context, id=reservation_id)
if not reserv_obj:
@ -197,8 +196,8 @@ def get_reservation(context, reservation_id):
for delta in reserv_obj.resource_deltas))
@lib_db_api.retry_if_session_inactive()
@db_api.context_manager.writer
@db_api.retry_if_session_inactive()
@db_api.CONTEXT_WRITER
def remove_reservation(context, reservation_id, set_dirty=False):
reservation = quota_obj.Reservation.get_object(context, id=reservation_id)
if not reservation:
@ -214,7 +213,7 @@ def remove_reservation(context, reservation_id, set_dirty=False):
return 1
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_reservations_for_resources(context, tenant_id, resources,
expired=False):
"""Retrieve total amount of reservations for specified resources.
@ -233,7 +232,7 @@ def get_reservations_for_resources(context, tenant_id, resources,
context, utcnow(), tenant_id, resources, expired)
@lib_db_api.retry_if_session_inactive()
@db_api.context_manager.writer
@db_api.retry_if_session_inactive()
@db_api.CONTEXT_WRITER
def remove_expired_reservations(context, tenant_id=None):
return quota_obj.Reservation.delete_expired(context, utcnow(), tenant_id)

View File

@ -14,14 +14,13 @@
# under the License.
from neutron_lib.api import attributes
from neutron_lib.db import api as lib_db_api
from neutron_lib.db import api as db_api
from neutron_lib import exceptions
from neutron_lib.plugins import constants
from neutron_lib.plugins import directory
from oslo_log import log
from neutron.common import exceptions as n_exc
from neutron.db import api as db_api
from neutron.db.quota import api as quota_api
from neutron.objects import quota as quota_obj
from neutron.quota import resource as res
@ -53,7 +52,7 @@ class DbQuotaDriver(object):
for key, resource in resources.items())
@staticmethod
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_tenant_quotas(context, resources, tenant_id):
"""Given a list of resources, retrieve the quotas for the given
tenant. If no limits are found for the specified tenant, the operation
@ -77,7 +76,7 @@ class DbQuotaDriver(object):
return tenant_quota
@staticmethod
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_detailed_tenant_quotas(context, resources, tenant_id):
"""Given a list of resources and a sepecific tenant, retrieve
the detailed quotas (limit, used, reserved).
@ -112,7 +111,7 @@ class DbQuotaDriver(object):
return tenant_quota_ext
@staticmethod
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def delete_tenant_quota(context, tenant_id):
"""Delete the quota entries for a given tenant_id.
@ -126,7 +125,7 @@ class DbQuotaDriver(object):
raise n_exc.TenantQuotaNotFound(tenant_id=tenant_id)
@staticmethod
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_all_quotas(context, resources):
"""Given a list of resources, retrieve the quotas for the all tenants.
@ -159,7 +158,7 @@ class DbQuotaDriver(object):
return list(all_tenant_quotas.values())
@staticmethod
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def update_quota_limit(context, tenant_id, resource, limit):
tenant_quotas = quota_obj.Quota.get_objects(
context, project_id=tenant_id, resource=resource)
@ -194,7 +193,7 @@ class DbQuotaDriver(object):
quota_api.remove_expired_reservations(
context, tenant_id=tenant_id)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def make_reservation(self, context, tenant_id, resources, deltas, plugin):
# Lock current reservation table
# NOTE(salv-orlando): This routine uses DB write locks.
@ -209,7 +208,7 @@ class DbQuotaDriver(object):
# locks should be ok to use when support for sending "hotspot" writes
# to a single node will be available.
requested_resources = deltas.keys()
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
# get_tenant_quotes needs in input a dictionary mapping resource
# name to BaseResosurce instances so that the default quota can be
# retrieved

View File

@ -20,7 +20,7 @@ from neutron_lib.callbacks import exceptions
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants
from neutron_lib.db import api as lib_db_api
from neutron_lib.db import api as db_api
from neutron_lib.db import utils as db_utils
from neutron_lib import exceptions as n_exc
from neutron_lib.utils import helpers
@ -34,7 +34,6 @@ from neutron.common import constants as n_const
from neutron.common import utils
from neutron.db import _model_query as model_query
from neutron.db import _resource_extend as resource_extend
from neutron.db import api as db_api
from neutron.db.models import securitygroup as sg_models
from neutron.extensions import securitygroup as ext_sg
from neutron.objects import base as base_obj
@ -68,7 +67,7 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
{'event': event, 'reason': e})
raise exc_cls(reason=reason, id=id)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def create_security_group(self, context, security_group, default_sg=False):
"""Create security group.
@ -95,7 +94,7 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
# default already exists, return it
return self.get_security_group(context, existing_def_sg_id)
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
sg = sg_obj.SecurityGroup(
context, id=s.get('id') or uuidutils.generate_uuid(),
description=s['description'], project_id=tenant_id,
@ -134,7 +133,7 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
**kwargs)
return secgroup_dict
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_security_groups(self, context, filters=None, fields=None,
sorts=None, limit=None,
marker=None, page_reverse=False, default_sg=False):
@ -160,13 +159,13 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
return [self._make_security_group_dict(obj, fields) for obj in sg_objs]
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_security_groups_count(self, context, filters=None):
filters = filters or {}
return sg_obj.SecurityGroup.count(
context, validate_filters=False, **filters)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_security_group(self, context, id, fields=None, tenant_id=None):
"""Tenant id is given to handle the case when creating a security
group rule on behalf of another use.
@ -177,7 +176,7 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
context.tenant_id = tenant_id
try:
with db_api.context_manager.reader.using(context):
with db_api.CONTEXT_READER.using(context):
ret = self._make_security_group_dict(self._get_security_group(
context, id), fields)
ret['security_group_rules'] = self.get_security_group_rules(
@ -193,10 +192,10 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
raise ext_sg.SecurityGroupNotFound(id=id)
return sg
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def delete_security_group(self, context, id):
filters = {'security_group_id': [id]}
with db_api.context_manager.reader.using(context):
with db_api.CONTEXT_READER.using(context):
ports = self._get_port_security_group_bindings(context, filters)
if ports:
raise ext_sg.SecurityGroupInUse(id=id)
@ -215,7 +214,7 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
exc_cls=ext_sg.SecurityGroupInUse, id=id,
**kwargs)
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
# pass security_group_rule_ids to ensure
# consistency with deleted rules
# get security_group_bindings and security_group one more time
@ -235,7 +234,7 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
registry.notify(resources.SECURITY_GROUP, events.AFTER_DELETE,
self, **kwargs)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def update_security_group(self, context, id, security_group):
s = security_group['security_group']
@ -247,7 +246,7 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
self._registry_notify(resources.SECURITY_GROUP, events.BEFORE_UPDATE,
exc_cls=ext_sg.SecurityGroupConflict, **kwargs)
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
sg = self._get_security_group(context, id)
if sg.name == 'default' and 'name' in s:
raise ext_sg.SecurityGroupCannotUpdateDefault()
@ -288,10 +287,10 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
'security_group_id': security_group['security_group_id']}
return db_utils.resource_fields(res, fields)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def _create_port_security_group_binding(self, context, port_id,
security_group_id):
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
db = sg_models.SecurityGroupPortBinding(port_id=port_id,
security_group_id=security_group_id)
context.session.add(db)
@ -303,9 +302,9 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
self._make_security_group_binding_dict,
filters=filters, fields=fields)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def _delete_port_security_group_bindings(self, context, port_id):
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
query = model_query.query_with_hooks(
context, sg_models.SecurityGroupPortBinding)
bindings = query.filter(
@ -313,19 +312,19 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
for binding in bindings:
context.session.delete(binding)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def create_security_group_rule_bulk(self, context, security_group_rules):
return self._create_bulk('security_group_rule', context,
security_group_rules)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def create_security_group_rule_bulk_native(self, context,
security_group_rules):
rules = security_group_rules['security_group_rules']
scoped_session(context.session)
security_group_id = self._validate_security_group_rules(
context, security_group_rules)
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
if not self.get_security_group(context, security_group_id):
raise ext_sg.SecurityGroupNotFound(id=security_group_id)
@ -341,7 +340,7 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
context=context, security_group_rule=rdict)
return ret
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def create_security_group_rule(self, context, security_group_rule):
res = self._create_security_group_rule(context, security_group_rule)
registry.notify(
@ -390,7 +389,7 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
self._registry_notify(resources.SECURITY_GROUP_RULE,
events.BEFORE_CREATE,
exc_cls=ext_sg.SecurityGroupConflict, **kwargs)
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
if validate:
self._check_for_duplicate_rules_in_db(context,
security_group_rule)
@ -663,7 +662,7 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
raise ext_sg.SecurityGroupRuleParameterConflict(
ethertype=rule['ethertype'], cidr=input_prefix)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_security_group_rules(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
@ -679,13 +678,13 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
for obj in rule_objs
]
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_security_group_rules_count(self, context, filters=None):
filters = filters or {}
return sg_obj.SecurityGroupRule.count(
context, validate_filters=False, **filters)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_security_group_rule(self, context, id, fields=None):
security_group_rule = self._get_security_group_rule(context, id)
return self._make_security_group_rule_dict(
@ -697,7 +696,7 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
raise ext_sg.SecurityGroupRuleNotFound(id=id)
return sgr
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def delete_security_group_rule(self, context, id):
kwargs = {
'context': context,
@ -707,7 +706,7 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
events.BEFORE_DELETE, id=id,
exc_cls=ext_sg.SecurityGroupRuleInUse, **kwargs)
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
sgr = self._get_security_group_rule(context, id)
kwargs['security_group_id'] = sgr['security_group_id']
self._registry_notify(resources.SECURITY_GROUP_RULE,

View File

@ -14,10 +14,10 @@ from neutron_lib.api.definitions import segment as segment_def
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib.db import api as db_api
from oslo_log import log as logging
from oslo_utils import uuidutils
from neutron.db import api as db_api
from neutron.objects import base as base_obj
from neutron.objects import network as network_obj
@ -40,7 +40,7 @@ def _make_segment_dict(obj):
def add_network_segment(context, network_id, segment, segment_index=0,
is_dynamic=False):
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
netseg_obj = network_obj.NetworkSegment(
context, id=uuidutils.generate_uuid(), network_id=network_id,
network_type=segment.get(NETWORK_TYPE),
@ -70,7 +70,7 @@ def get_networks_segments(context, network_ids, filter_dynamic=False):
if not network_ids:
return {}
with db_api.context_manager.reader.using(context):
with db_api.CONTEXT_READER.using(context):
filters = {
'network_id': network_ids,
}
@ -84,7 +84,7 @@ def get_networks_segments(context, network_ids, filter_dynamic=False):
def get_segment_by_id(context, segment_id):
with db_api.context_manager.reader.using(context):
with db_api.CONTEXT_READER.using(context):
net_obj = network_obj.NetworkSegment.get_object(context, id=segment_id)
if net_obj:
return _make_segment_dict(net_obj)
@ -93,7 +93,7 @@ def get_segment_by_id(context, segment_id):
def get_dynamic_segment(context, network_id, physical_network=None,
segmentation_id=None):
"""Return a dynamic segment for the filters provided if one exists."""
with db_api.context_manager.reader.using(context):
with db_api.CONTEXT_READER.using(context):
filters = {
'network_id': network_id,
'is_dynamic': True,
@ -120,5 +120,5 @@ def get_dynamic_segment(context, network_id, physical_network=None,
def delete_network_segment(context, segment_id):
"""Release a dynamic segment for the params provided if one exists."""
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
network_obj.NetworkSegment.delete_objects(context, id=segment_id)

View File

@ -18,13 +18,13 @@ import operator
import netaddr
from neutron_lib import constants
from neutron_lib.db import api as db_api
from neutron_lib import exceptions as lib_exc
from oslo_db import exception as db_exc
from oslo_utils import uuidutils
from neutron._i18n import _
from neutron.common import exceptions as n_exc
from neutron.db import api as db_api
from neutron.db import models_v2
from neutron.ipam import driver
from neutron.ipam import exceptions as ipam_exc
@ -50,7 +50,7 @@ class SubnetAllocator(driver.Pool):
subnetpool, it's required to ensure non-overlapping cidrs in the same
subnetpool.
"""
with db_api.context_manager.reader.using(self._context):
with db_api.CONTEXT_READER.using(self._context):
current_hash = (
self._context.session.query(models_v2.SubnetPool.hash)
.filter_by(id=self._subnetpool['id']).scalar())
@ -63,7 +63,7 @@ class SubnetAllocator(driver.Pool):
# NOTE(cbrandily): the update disallows 2 concurrent subnet allocation
# to succeed: at most 1 transaction will succeed, others will be
# rolled back and be caught in neutron.db.v2.base
with db_api.context_manager.writer.using(self._context):
with db_api.CONTEXT_WRITER.using(self._context):
query = (
self._context.session.query(models_v2.SubnetPool).filter_by(
id=self._subnetpool['id'], hash=current_hash))
@ -74,7 +74,7 @@ class SubnetAllocator(driver.Pool):
subnet_pool_id=self._subnetpool['id']))
def _get_allocated_cidrs(self):
with db_api.context_manager.reader.using(self._context):
with db_api.CONTEXT_READER.using(self._context):
query = self._context.session.query(models_v2.Subnet.cidr)
subnets = query.filter_by(subnetpool_id=self._subnetpool['id'])
return (x.cidr for x in subnets)
@ -96,7 +96,7 @@ class SubnetAllocator(driver.Pool):
def _allocations_used_by_tenant(self, quota_unit):
subnetpool_id = self._subnetpool['id']
tenant_id = self._subnetpool['tenant_id']
with db_api.context_manager.reader.using(self._context):
with db_api.CONTEXT_READER.using(self._context):
qry = self._context.session.query(models_v2.Subnet.cidr)
allocations = qry.filter_by(subnetpool_id=subnetpool_id,
tenant_id=tenant_id)
@ -121,7 +121,7 @@ class SubnetAllocator(driver.Pool):
raise n_exc.SubnetPoolQuotaExceeded()
def _allocate_any_subnet(self, request):
with db_api.context_manager.writer.using(self._context):
with db_api.CONTEXT_WRITER.using(self._context):
self._lock_subnetpool()
self._check_subnetpool_tenant_quota(request.tenant_id,
request.prefixlen)
@ -145,7 +145,7 @@ class SubnetAllocator(driver.Pool):
str(request.prefixlen))
def _allocate_specific_subnet(self, request):
with db_api.context_manager.writer.using(self._context):
with db_api.CONTEXT_WRITER.using(self._context):
self._lock_subnetpool()
self._check_subnetpool_tenant_quota(request.tenant_id,
request.prefixlen)

View File

@ -16,7 +16,7 @@ import copy
import functools
import itertools
from neutron_lib.db import api as lib_db_api
from neutron_lib.db import api as db_api
from neutron_lib import exceptions as n_exc
from neutron_lib.objects import exceptions as o_exc
from oslo_db import exception as obj_exc
@ -32,7 +32,6 @@ import six
from sqlalchemy import orm
from neutron._i18n import _
from neutron.db import api as db_api
from neutron.db import standard_attr
from neutron.objects.db import api as obj_db_api
from neutron.objects.extensions import standardattributes
@ -528,15 +527,15 @@ class NeutronDbObject(NeutronObject):
def db_context_writer(cls, context):
"""Return read-write session activation decorator."""
if cls.new_facade or cls._use_db_facade(context):
return db_api.context_manager.writer.using(context)
return lib_db_api.autonested_transaction(context.session)
return db_api.CONTEXT_WRITER.using(context)
return db_api.autonested_transaction(context.session)
@classmethod
def db_context_reader(cls, context):
"""Return read-only session activation decorator."""
if cls.new_facade or cls._use_db_facade(context):
return db_api.context_manager.reader.using(context)
return lib_db_api.autonested_transaction(context.session)
return db_api.CONTEXT_READER.using(context)
return db_api.autonested_transaction(context.session)
@classmethod
def get_object(cls, context, **kwargs):

View File

@ -15,11 +15,11 @@
import collections
from neutron_lib.db import api as db_api
from oslo_log import log as logging
from pecan import hooks
from neutron.common import exceptions
from neutron.db import api as db_api
from neutron import manager
from neutron import quota
from neutron.quota import resource_registry
@ -75,7 +75,7 @@ class QuotaEnforcementHook(hooks.PecanHook):
reservations = state.request.context.get('reservations') or []
if not reservations and state.request.method != 'DELETE':
return
with db_api.context_manager.writer.using(neutron_context):
with db_api.CONTEXT_WRITER.using(neutron_context):
# Commit the reservation(s)
for reservation in reservations:
quota.QUOTAS.commit_reservation(

View File

@ -19,6 +19,7 @@ from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants as n_const
from neutron_lib.db import api as db_api
from neutron_lib.plugins import directory
from oslo_db import exception as db_exc
from oslo_log import log
@ -28,7 +29,6 @@ from sqlalchemy import or_
from sqlalchemy.orm import exc
from neutron._i18n import _
from neutron.db import api as db_api
from neutron.db.models import securitygroup as sg_models
from neutron.db import models_v2
from neutron.objects import base as objects_base
@ -42,7 +42,7 @@ LOG = log.getLogger(__name__)
MAX_PORTS_PER_QUERY = 500
@db_api.context_manager.writer
@db_api.CONTEXT_WRITER
def add_port_binding(context, port_id):
record = models.PortBinding(
port_id=port_id,
@ -51,7 +51,7 @@ def add_port_binding(context, port_id):
return record
@db_api.context_manager.writer
@db_api.CONTEXT_WRITER
def set_binding_levels(context, levels):
if levels:
for level in levels:
@ -69,7 +69,7 @@ def set_binding_levels(context, levels):
version="Stein", removal_version="T",
message="Function get_binding_levels is deprecated. Please use "
"get_binding_level_objs instead as it makes use of OVOs.")
@db_api.context_manager.reader
@db_api.CONTEXT_READER
def get_binding_levels(context, port_id, host):
if host:
result = (context.session.query(models.PortBindingLevel).
@ -84,7 +84,7 @@ def get_binding_levels(context, port_id, host):
return result
@db_api.context_manager.reader
@db_api.CONTEXT_READER
def get_binding_level_objs(context, port_id, host):
if host:
pager = objects_base.Pager(sorts=[('level', True)])
@ -98,7 +98,7 @@ def get_binding_level_objs(context, port_id, host):
return port_bl_objs
@db_api.context_manager.writer
@db_api.CONTEXT_WRITER
def clear_binding_levels(context, port_id, host):
if host:
port_obj.PortBindingLevel.delete_objects(
@ -110,14 +110,14 @@ def clear_binding_levels(context, port_id, host):
def ensure_distributed_port_binding(context, port_id, host, router_id=None):
with db_api.context_manager.reader.using(context):
with db_api.CONTEXT_READER.using(context):
record = (context.session.query(models.DistributedPortBinding).
filter_by(port_id=port_id, host=host).first())
if record:
return record
try:
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
record = models.DistributedPortBinding(
port_id=port_id,
host=host,
@ -129,14 +129,14 @@ def ensure_distributed_port_binding(context, port_id, host, router_id=None):
return record
except db_exc.DBDuplicateEntry:
LOG.debug("Distributed Port %s already bound", port_id)
with db_api.context_manager.reader.using(context):
with db_api.CONTEXT_READER.using(context):
return (context.session.query(models.DistributedPortBinding).
filter_by(port_id=port_id, host=host).one())
def delete_distributed_port_binding_if_stale(context, binding):
if not binding.router_id and binding.status == n_const.PORT_STATUS_DOWN:
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
LOG.debug("Distributed port: Deleting binding %s", binding)
context.session.delete(binding)
@ -144,7 +144,7 @@ def delete_distributed_port_binding_if_stale(context, binding):
def get_port(context, port_id):
"""Get port record for update within transaction."""
with db_api.context_manager.reader.using(context):
with db_api.CONTEXT_READER.using(context):
try:
# Set enable_eagerloads to True, so that lazy load can be
# proceed later.
@ -161,7 +161,7 @@ def get_port(context, port_id):
return
@db_api.context_manager.reader
@db_api.CONTEXT_READER
def get_port_from_device_mac(context, device_mac):
LOG.debug("get_port_from_device_mac() called for mac %s", device_mac)
ports = port_obj.Port.get_objects(context, mac_address=device_mac)
@ -193,7 +193,7 @@ def get_sg_ids_grouped_by_port(context, port_ids):
sg_ids_grouped_by_port = {}
sg_binding_port = sg_models.SecurityGroupPortBinding.port_id
with db_api.context_manager.reader.using(context):
with db_api.CONTEXT_READER.using(context):
# partial UUIDs must be individually matched with startswith.
# full UUIDs may be matched directly in an IN statement
partial_uuids = set(port_id for port_id in port_ids
@ -232,7 +232,7 @@ def make_port_dict_with_security_groups(port, sec_groups):
def get_port_binding_host(context, port_id):
try:
with db_api.context_manager.reader.using(context):
with db_api.CONTEXT_READER.using(context):
query = (context.session.query(models.PortBinding.host).
filter(models.PortBinding.port_id.startswith(port_id)))
query = query.filter(
@ -248,7 +248,7 @@ def get_port_binding_host(context, port_id):
return query.host
@db_api.context_manager.reader
@db_api.CONTEXT_READER
def generate_distributed_port_status(context, port_id):
# an OR'ed value of status assigned to parent port from the
# distributedportbinding bucket
@ -263,7 +263,7 @@ def generate_distributed_port_status(context, port_id):
def get_distributed_port_binding_by_host(context, port_id, host):
with db_api.context_manager.reader.using(context):
with db_api.CONTEXT_READER.using(context):
binding = (context.session.query(models.DistributedPortBinding).
filter(models.DistributedPortBinding.port_id.startswith(port_id),
models.DistributedPortBinding.host == host).first())
@ -274,7 +274,7 @@ def get_distributed_port_binding_by_host(context, port_id, host):
def get_distributed_port_bindings(context, port_id):
with db_api.context_manager.reader.using(context):
with db_api.CONTEXT_READER.using(context):
bindings = (context.session.query(models.DistributedPortBinding).
filter(models.DistributedPortBinding.port_id.startswith(
port_id)).all())
@ -283,7 +283,7 @@ def get_distributed_port_bindings(context, port_id):
return bindings
@db_api.context_manager.reader
@db_api.CONTEXT_READER
def partial_port_ids_to_full_ids(context, partial_ids):
"""Takes a list of the start of port IDs and returns full IDs.
@ -308,7 +308,7 @@ def partial_port_ids_to_full_ids(context, partial_ids):
return result
@db_api.context_manager.reader
@db_api.CONTEXT_READER
def get_port_db_objects(context, port_ids):
"""Takes a list of port_ids and returns matching port db objects.
@ -323,7 +323,7 @@ def get_port_db_objects(context, port_ids):
return result
@db_api.context_manager.reader
@db_api.CONTEXT_READER
def is_dhcp_active_on_any_subnet(context, subnet_ids):
if not subnet_ids:
return False
@ -340,7 +340,7 @@ def _prevent_segment_delete_with_port_bound(resource, event, trigger,
# don't check for network deletes
return
with db_api.context_manager.reader.using(context):
with db_api.CONTEXT_READER.using(context):
port_ids = port_obj.Port.get_port_ids_filter_by_segment_id(
context, segment_id=segment['id'])

View File

@ -16,6 +16,7 @@
import random
from neutron_lib import context as neutron_ctx
from neutron_lib.db import api as db_api
from neutron_lib.plugins.ml2 import api
from neutron_lib.plugins import utils as p_utils
from neutron_lib.utils import helpers
@ -24,7 +25,6 @@ from oslo_db import exception as db_exc
from oslo_log import log
from neutron.common import exceptions as exc
from neutron.db import api as db_api
from neutron.objects import base as base_obj
@ -68,7 +68,7 @@ class SegmentTypeDriver(BaseTypeDriver):
# TODO(ataraday): get rid of this method when old TypeDriver won't be used
def _get_session(self, arg):
if isinstance(arg, neutron_ctx.Context):
return arg.session, db_api.context_manager.writer.using(arg)
return arg.session, db_api.CONTEXT_WRITER.using(arg)
return arg, arg.session.begin(subtransactions=True)
def allocate_fully_specified_segment(self, context, **raw_segment):

View File

@ -14,6 +14,7 @@
# under the License.
from neutron_lib import constants as p_const
from neutron_lib.db import api as db_api
from neutron_lib import exceptions as exc
from neutron_lib.objects import exceptions as obj_base
from neutron_lib.plugins.ml2 import api
@ -23,7 +24,6 @@ from oslo_log import log
from neutron._i18n import _
from neutron.common import exceptions as n_exc
from neutron.conf.plugins.ml2.drivers import driver_type
from neutron.db import api as db_api
from neutron.objects.plugins.ml2 import flatallocation as flat_obj
from neutron.plugins.ml2.drivers import helpers
@ -107,7 +107,7 @@ class FlatTypeDriver(helpers.BaseTypeDriver):
def release_segment(self, context, segment):
physical_network = segment[api.PHYSICAL_NETWORK]
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
obj = flat_obj.FlatAllocation.get_object(
context,
physical_network=physical_network)

View File

@ -20,7 +20,7 @@ import netaddr
from neutron_lib.agent import topics
from neutron_lib import constants as p_const
from neutron_lib import context
from neutron_lib.db import api as lib_db_api
from neutron_lib.db import api as db_api
from neutron_lib import exceptions as exc
from neutron_lib.plugins.ml2 import api
from neutron_lib.plugins import utils as plugin_utils
@ -32,7 +32,6 @@ from six import moves
from sqlalchemy import or_
from neutron._i18n import _
from neutron.db import api as db_api
from neutron.objects import base as base_obj
from neutron.plugins.ml2.drivers import helpers
@ -137,7 +136,7 @@ class _TunnelTypeDriverBase(helpers.SegmentTypeDriver):
LOG.info("%(type)s ID ranges: %(range)s",
{'type': self.get_type(), 'range': current_range})
@lib_db_api.retry_db_errors
@db_api.retry_db_errors
def sync_allocations(self):
# determine current configured allocatable tunnel ids
tunnel_ids = set()
@ -147,7 +146,7 @@ class _TunnelTypeDriverBase(helpers.SegmentTypeDriver):
tunnel_id_getter = operator.attrgetter(self.segmentation_key)
tunnel_col = getattr(self.model, self.segmentation_key)
ctx = context.get_admin_context()
with db_api.context_manager.writer.using(ctx):
with db_api.CONTEXT_WRITER.using(ctx):
# remove from table unallocated tunnels not currently allocatable
# fetch results as list via all() because we'll be iterating
# through them twice
@ -313,7 +312,7 @@ class ML2TunnelTypeDriver(_TunnelTypeDriverBase):
inside = any(lo <= tunnel_id <= hi for lo, hi in self.tunnel_ranges)
info = {'type': self.get_type(), 'id': tunnel_id}
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
query = (context.session.query(self.model).
filter_by(**{self.segmentation_key: tunnel_id}))
if inside:
@ -330,7 +329,7 @@ class ML2TunnelTypeDriver(_TunnelTypeDriverBase):
if not count:
LOG.warning("%(type)s tunnel %(id)s not found", info)
@db_api.context_manager.reader
@db_api.CONTEXT_READER
def get_allocation(self, context, tunnel_id):
return (context.session.query(self.model).
filter_by(**{self.segmentation_key: tunnel_id}).
@ -349,37 +348,37 @@ class EndpointTunnelTypeDriver(ML2TunnelTypeDriver):
def get_endpoint_by_host(self, host):
LOG.debug("get_endpoint_by_host() called for host %s", host)
session = lib_db_api.get_reader_session()
session = db_api.get_reader_session()
return (session.query(self.endpoint_model).
filter_by(host=host).first())
def get_endpoint_by_ip(self, ip):
LOG.debug("get_endpoint_by_ip() called for ip %s", ip)
session = lib_db_api.get_reader_session()
session = db_api.get_reader_session()
return (session.query(self.endpoint_model).
filter_by(ip_address=ip).first())
def delete_endpoint(self, ip):
LOG.debug("delete_endpoint() called for ip %s", ip)
session = lib_db_api.get_writer_session()
session = db_api.get_writer_session()
session.query(self.endpoint_model).filter_by(ip_address=ip).delete()
def delete_endpoint_by_host_or_ip(self, host, ip):
LOG.debug("delete_endpoint_by_host_or_ip() called for "
"host %(host)s or %(ip)s", {'host': host, 'ip': ip})
session = lib_db_api.get_writer_session()
session = db_api.get_writer_session()
session.query(self.endpoint_model).filter(
or_(self.endpoint_model.host == host,
self.endpoint_model.ip_address == ip)).delete()
def _get_endpoints(self):
LOG.debug("_get_endpoints() called")
session = lib_db_api.get_reader_session()
session = db_api.get_reader_session()
return session.query(self.endpoint_model)
def _add_endpoint(self, ip, host, **kwargs):
LOG.debug("_add_endpoint() called for ip %s", ip)
session = lib_db_api.get_writer_session()
session = db_api.get_writer_session()
try:
endpoint = self.endpoint_model(ip_address=ip, host=host, **kwargs)
endpoint.save(session)

View File

@ -17,7 +17,7 @@ import sys
from neutron_lib import constants as p_const
from neutron_lib import context
from neutron_lib.db import api as lib_db_api
from neutron_lib.db import api as db_api
from neutron_lib import exceptions as exc
from neutron_lib.plugins.ml2 import api
from neutron_lib.plugins import utils as plugin_utils
@ -27,7 +27,6 @@ from six import moves
from neutron._i18n import _
from neutron.conf.plugins.ml2.drivers import driver_type
from neutron.db import api as db_api
from neutron.objects.plugins.ml2 import vlanallocation as vlanalloc
from neutron.plugins.ml2.drivers import helpers
@ -61,10 +60,10 @@ class VlanTypeDriver(helpers.SegmentTypeDriver):
sys.exit(1)
LOG.info("Network VLAN ranges: %s", self.network_vlan_ranges)
@lib_db_api.retry_db_errors
@db_api.retry_db_errors
def _sync_vlan_allocations(self):
ctx = context.get_admin_context()
with db_api.context_manager.writer.using(ctx):
with db_api.CONTEXT_WRITER.using(ctx):
# get existing allocations for all physical networks
allocations = dict()
allocs = vlanalloc.VlanAllocation.get_objects(ctx)
@ -222,7 +221,7 @@ class VlanTypeDriver(helpers.SegmentTypeDriver):
inside = any(lo <= vlan_id <= hi for lo, hi in ranges)
count = False
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
alloc = vlanalloc.VlanAllocation.get_object(
context, physical_network=physical_network, vlan_id=vlan_id)
if alloc:

View File

@ -19,7 +19,7 @@ from neutron_lib.api.definitions import portbindings
from neutron_lib.api.definitions import provider_net as provider
from neutron_lib.api import validators
from neutron_lib import constants
from neutron_lib.db import api as lib_db_api
from neutron_lib.db import api as db_api
from neutron_lib import exceptions as exc
from neutron_lib.exceptions import multiprovidernet as mpnet_exc
from neutron_lib.exceptions import vlantransparent as vlan_exc
@ -31,7 +31,6 @@ import stevedore
from neutron._i18n import _
from neutron.conf.plugins.ml2 import config
from neutron.db import api as db_api
from neutron.db import segments_db
from neutron.objects import ports
from neutron.plugins.ml2.common import exceptions as ml2_exc
@ -196,7 +195,7 @@ class TypeManager(stevedore.named.NamedExtensionManager):
def create_network_segments(self, context, network, tenant_id):
"""Call type drivers to create network segments."""
segments = self._process_provider_create(network)
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
network_id = network['id']
if segments:
for segment_index, segment in enumerate(segments):
@ -229,7 +228,7 @@ class TypeManager(stevedore.named.NamedExtensionManager):
self.validate_provider_segment(segment)
# Reserve segment in type driver
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
return self.reserve_provider_segment(context, segment)
def is_partial_segment(self, segment):
@ -429,7 +428,7 @@ class MechanismManager(stevedore.named.NamedExtensionManager):
try:
getattr(driver.obj, method_name)(context)
except Exception as e:
if raise_db_retriable and lib_db_api.is_retriable(e):
if raise_db_retriable and db_api.is_retriable(e):
with excutils.save_and_reraise_exception():
LOG.debug("DB exception raised by Mechanism driver "
"'%(name)s' in %(method)s",

View File

@ -20,13 +20,13 @@ from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import context as n_ctx
from neutron_lib.db import api as db_api
from oslo_concurrency import lockutils
from oslo_log import log as logging
from neutron._i18n import _
from neutron.api.rpc.callbacks import events as rpc_events
from neutron.api.rpc.handlers import resources_rpc
from neutron.db import api as db_api
from neutron.objects import network
from neutron.objects import ports
from neutron.objects import securitygroup
@ -105,7 +105,8 @@ class _ObjectChangeHandler(object):
context = n_ctx.Context.from_dict(context_dict)
# attempt to get regardless of event type so concurrent delete
# after create/update is the same code-path as a delete event
with db_api.context_manager.independent.reader.using(context):
with db_api.get_context_manager().independent.reader.using(
context):
obj = self._obj_class.get_object(context, id=resource_id)
# CREATE events are always treated as UPDATE events to ensure
# listeners are written to handle out-of-order messages

View File

@ -35,7 +35,7 @@ from neutron_lib.callbacks import exceptions
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants as const
from neutron_lib.db import api as lib_db_api
from neutron_lib.db import api as db_api
from neutron_lib.db import utils as db_utils
from neutron_lib import exceptions as exc
from neutron_lib.exceptions import allowedaddresspairs as addr_exc
@ -75,7 +75,6 @@ from neutron.db import address_scope_db
from neutron.db import agents_db
from neutron.db import agentschedulers_db
from neutron.db import allowedaddresspairs_db as addr_pair_db
from neutron.db import api as db_api
from neutron.db import db_base_plugin_v2
from neutron.db import dvr_mac_db
from neutron.db import external_net_db
@ -447,7 +446,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
binding.persist_state_to_session(plugin_context.session)
return changes
@lib_db_api.retry_db_errors
@db_api.retry_db_errors
def _bind_port_if_needed(self, context, allow_notify=False,
need_notify=False, allow_commit=True):
if not context.network.network_segments:
@ -553,7 +552,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# After we've attempted to bind the port, we begin a
# transaction, get the current port state, and decide whether
# to commit the binding results.
with db_api.context_manager.writer.using(plugin_context):
with db_api.CONTEXT_WRITER.using(plugin_context):
# Get the current port state and build a new PortContext
# reflecting this state as original state for subsequent
# mechanism driver update_port_*commit() calls.
@ -749,7 +748,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def _object_session_or_new_session(sql_obj):
session = sqlalchemy.inspect(sql_obj).session
if not session:
session = lib_db_api.get_reader_session()
session = db_api.get_reader_session()
return session
def _notify_port_updated(self, mech_context):
@ -787,7 +786,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
obj_before_create = getattr(self, '_before_create_%s' % resource)
for item in items:
obj_before_create(context, item)
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
obj_creator = getattr(self, '_create_%s_db' % resource)
for item in items:
try:
@ -883,7 +882,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def _create_network_db(self, context, network):
net_data = network[net_def.RESOURCE_NAME]
tenant_id = net_data['tenant_id']
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
net_db = self.create_network_db(context, network)
net_data['id'] = net_db.id
self.type_manager.create_network_segments(context, net_data,
@ -925,7 +924,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
return result, mech_context
@utils.transaction_guard
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def create_network(self, context, network):
self._before_create_network(context, network)
result, mech_context = self._create_network_db(context, network)
@ -945,20 +944,20 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
return result
@utils.transaction_guard
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def create_network_bulk(self, context, networks):
objects = self._create_bulk_ml2(
net_def.RESOURCE_NAME, context, networks)
return [obj['result'] for obj in objects]
@utils.transaction_guard
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def update_network(self, context, id, network):
net_data = network[net_def.RESOURCE_NAME]
provider._raise_if_updates_provider_attributes(net_data)
need_network_update_notify = False
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
original_network = super(Ml2Plugin, self).get_network(context, id)
updated_network = super(Ml2Plugin, self).update_network(context,
id,
@ -1019,11 +1018,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self.notifier.network_update(context, updated_network)
return updated_network
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_network(self, context, id, fields=None):
# NOTE(ihrachys) use writer manager to be able to update mtu
# TODO(ihrachys) remove in Queens+ when mtu is not nullable
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
net_db = self._get_network(context, id)
# NOTE(ihrachys) pre Pike networks may have null mtus; update them
@ -1037,12 +1036,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
return db_utils.resource_fields(net_data, fields)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_networks(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None, page_reverse=False):
# NOTE(ihrachys) use writer manager to be able to update mtu
# TODO(ihrachys) remove in Queens when mtu is not nullable
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
nets_db = super(Ml2Plugin, self)._get_networks(
context, filters, None, sorts, limit, marker, page_reverse)
@ -1115,7 +1114,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
context=context, subnet=subnet_data)
def _create_subnet_db(self, context, subnet):
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
result, net_db, ipam_sub = self._create_subnet_precommit(
context, subnet)
@ -1140,7 +1139,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
return result, mech_context
@utils.transaction_guard
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def create_subnet(self, context, subnet):
self._before_create_subnet(context, subnet)
result, mech_context = self._create_subnet_db(context, subnet)
@ -1159,16 +1158,16 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
return result
@utils.transaction_guard
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def create_subnet_bulk(self, context, subnets):
objects = self._create_bulk_ml2(
subnet_def.RESOURCE_NAME, context, subnets)
return [obj['result'] for obj in objects]
@utils.transaction_guard
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def update_subnet(self, context, id, subnet):
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
updated_subnet, original_subnet = self._update_subnet_precommit(
context, id, subnet)
self.extension_manager.process_update_subnet(
@ -1268,7 +1267,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def _create_port_db(self, context, port):
attrs = port[port_def.RESOURCE_NAME]
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
dhcp_opts = attrs.get(edo_ext.EXTRADHCPOPTS, [])
port_db = self.create_port_db(context, port)
result = self._make_port_dict(port_db, process_extensions=False)
@ -1300,7 +1299,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
return result, mech_context
@utils.transaction_guard
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def create_port(self, context, port):
self._before_create_port(context, port)
result, mech_context = self._create_port_db(context, port)
@ -1329,7 +1328,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
return bound_context.current
@utils.transaction_guard
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def create_port_bulk(self, context, ports):
objects = self._create_bulk_ml2(port_def.RESOURCE_NAME, context, ports)
return [obj['result'] for obj in objects]
@ -1378,7 +1377,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
raise psec_exc.PortSecurityPortHasSecurityGroup()
@utils.transaction_guard
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def update_port(self, context, id, port):
attrs = port[port_def.RESOURCE_NAME]
need_port_update_notify = False
@ -1387,7 +1386,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
registry.notify(resources.PORT, events.BEFORE_UPDATE, self,
context=context, port=attrs,
original_port=original_port)
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
port_db = self._get_port(context, id)
binding = p_utils.get_port_binding_by_status_and_host(
port_db.port_bindings, const.ACTIVE)
@ -1540,7 +1539,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
binding.persist_state_to_session(plugin_context.session)
@utils.transaction_guard
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def update_distributed_port_binding(self, context, id, port):
attrs = port[port_def.RESOURCE_NAME]
@ -1560,7 +1559,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
router_id != device_id)
if update_required:
try:
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
orig_port = self.get_port(context, id)
if not binding:
binding = db.ensure_distributed_port_binding(
@ -1598,14 +1597,14 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
raise exc.ServicePortInUse(port_id=port_id, reason=e)
@utils.transaction_guard
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def delete_port(self, context, id, l3_port_check=True):
self._pre_delete_port(context, id, l3_port_check)
# TODO(armax): get rid of the l3 dependency in the with block
router_ids = []
l3plugin = directory.get_plugin(plugin_constants.L3)
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
try:
port_db = self._get_port(context, id)
binding = p_utils.get_port_binding_by_status_and_host(
@ -1684,13 +1683,13 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self.notifier.port_delete(context, port['id'])
@utils.transaction_guard
@lib_db_api.retry_if_session_inactive(context_var_name='plugin_context')
@db_api.retry_if_session_inactive(context_var_name='plugin_context')
def get_bound_port_context(self, plugin_context, port_id, host=None,
cached_networks=None):
# NOTE(ihrachys) use writer manager to be able to update mtu when
# fetching network
# TODO(ihrachys) remove in Queens+ when mtu is not nullable
with db_api.context_manager.writer.using(plugin_context) as session:
with db_api.CONTEXT_WRITER.using(plugin_context) as session:
try:
port_db = (session.query(models_v2.Port).
enable_eagerloads(False).
@ -1741,13 +1740,13 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
return self._bind_port_if_needed(port_context)
@utils.transaction_guard
@lib_db_api.retry_if_session_inactive(context_var_name='plugin_context')
@db_api.retry_if_session_inactive(context_var_name='plugin_context')
def get_bound_ports_contexts(self, plugin_context, dev_ids, host=None):
result = {}
# NOTE(ihrachys) use writer manager to be able to update mtu when
# fetching network
# TODO(ihrachys) remove in Queens+ when mtu is not nullable
with db_api.context_manager.writer.using(plugin_context):
with db_api.CONTEXT_WRITER.using(plugin_context):
dev_to_full_pids = db.partial_port_ids_to_full_ids(
plugin_context, dev_ids)
# get all port objects for IDs
@ -1806,7 +1805,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
context, {port_id: status}, host)[port_id]
@utils.transaction_guard
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def update_port_statuses(self, context, port_id_to_status, host=None):
result = {}
port_ids = port_id_to_status.keys()
@ -1847,7 +1846,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
registry.notify(resources.PORT, events.BEFORE_UPDATE, self,
original_port=port,
context=context, port=attr)
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
context.session.add(port) # bring port into writer session
if (port.status != status and
port['device_owner'] != const.DEVICE_OWNER_DVR_INTERFACE):
@ -1878,7 +1877,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
if (updated and
port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE):
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
port = db.get_port(context, port_id)
if not port:
LOG.warning("Port %s not found during update",
@ -1914,7 +1913,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
return port['id']
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def port_bound_to_host(self, context, port_id, host):
if not host:
return
@ -1934,7 +1933,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
port_host = db.get_port_binding_host(context, port_id)
return port if (port_host == host) else None
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_ports_from_devices(self, context, devices):
port_ids_to_devices = dict(
(self._device_to_port_id(context, device), device)
@ -2099,10 +2098,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
mech_context._plugin_context.session)
@utils.transaction_guard
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def create_port_binding(self, context, port_id, binding):
attrs = binding[pbe_ext.RESOURCE_NAME]
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
port_db = self._get_port(context, port_id)
self._validate_compute_port(port_db)
if self._get_binding_for_host(port_db.port_bindings,
@ -2139,13 +2138,13 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
bind_context._binding.port_id = port_id
bind_context._binding.status = status
if not is_active_binding:
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
bind_context._binding.persist_state_to_session(context.session)
db.set_binding_levels(context, bind_context._binding_levels)
return self._make_port_binding_dict(bind_context._binding)
@utils.transaction_guard
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_port_bindings(self, context, port_id, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
@ -2162,7 +2161,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
for binding in bindings]
@utils.transaction_guard
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def get_port_binding(self, context, host, port_id, fields=None):
port = ports_obj.Port.get_object(context, id=port_id)
if not port:
@ -2180,10 +2179,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
return binding
@utils.transaction_guard
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def update_port_binding(self, context, host, port_id, binding):
attrs = binding[pbe_ext.RESOURCE_NAME]
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
port_db = self._get_port(context, port_id)
self._validate_compute_port(port_db)
original_binding = self._get_binding_for_host(
@ -2208,15 +2207,15 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
portbindings.VIF_TYPE_BINDING_FAILED):
raise n_exc.PortBindingError(port_id=port_id, host=host)
if not is_active_binding:
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
bind_context._binding.persist_state_to_session(context.session)
db.set_binding_levels(context, bind_context._binding_levels)
return self._make_port_binding_dict(bind_context._binding)
@utils.transaction_guard
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def activate(self, context, host, port_id):
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
# TODO(mlavalle) Next two lines can be removed when bug #1770267 is
# fixed
if isinstance(port_id, dict):
@ -2266,7 +2265,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
raise n_exc.PortBindingError(port_id=port_id, host=host)
@utils.transaction_guard
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def delete_port_binding(self, context, host, port_id):
ports_obj.PortBinding.delete_objects(context, host=host,
port_id=port_id)

View File

@ -12,7 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.db import api as lib_db_api
from neutron_lib.db import api as db_api
from neutron_lib.plugins import constants
from neutron_lib.plugins import directory
from oslo_config import cfg
@ -22,7 +22,6 @@ from sqlalchemy import exc as sql_exc
from sqlalchemy.orm import session as se
from neutron._i18n import _
from neutron.db import api as db_api
from neutron.db.quota import api as quota_api
LOG = log.getLogger(__name__)
@ -193,7 +192,7 @@ class TrackedResource(BaseResource):
def mark_dirty(self, context):
if not self._dirty_tenants:
return
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
# It is not necessary to protect this operation with a lock.
# Indeed when this method is called the request has been processed
# and therefore all resources created or deleted.
@ -220,7 +219,7 @@ class TrackedResource(BaseResource):
# can happen is two or more workers are trying to create a resource of a
# give kind for the same tenant concurrently. Retrying the operation will
# ensure that an UPDATE statement is emitted rather than an INSERT one
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def _set_quota_usage(self, context, tenant_id, in_use):
return quota_api.set_quota_usage(
context, self.name, tenant_id, in_use=in_use)
@ -322,19 +321,19 @@ class TrackedResource(BaseResource):
self._model_class)
def register_events(self):
listen = lib_db_api.sqla_listen
listen = db_api.sqla_listen
listen(self._model_class, 'after_insert', self._db_event_handler)
listen(self._model_class, 'after_delete', self._db_event_handler)
listen(se.Session, 'after_bulk_delete', self._except_bulk_delete)
def unregister_events(self):
try:
lib_db_api.sqla_remove(self._model_class, 'after_insert',
self._db_event_handler)
lib_db_api.sqla_remove(self._model_class, 'after_delete',
self._db_event_handler)
lib_db_api.sqla_remove(se.Session, 'after_bulk_delete',
self._except_bulk_delete)
db_api.sqla_remove(self._model_class, 'after_insert',
self._db_event_handler)
db_api.sqla_remove(self._model_class, 'after_delete',
self._db_event_handler)
db_api.sqla_remove(se.Session, 'after_bulk_delete',
self._except_bulk_delete)
except sql_exc.InvalidRequestError:
LOG.warning("No sqlalchemy event for resource %s found",
self.name)

View File

@ -10,12 +10,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.db import api as db_api
from oslo_config import cfg
from oslo_log import log
import six
from neutron._i18n import _
from neutron.db import api as db_api
from neutron.quota import resource
LOG = log.getLogger(__name__)
@ -72,7 +72,7 @@ def set_resources_dirty(context):
return
for res in get_all_resources().values():
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
if is_tracked(res.name) and res.dirty:
res.mark_dirty(context)

View File

@ -14,10 +14,10 @@
# under the License.
from neutron_lib import constants as const
from neutron_lib.db import api as db_api
from oslo_log import log as logging
from sqlalchemy.orm import exc as orm_exc
from neutron.db import api as db_api
from neutron.db.models import securitygroup as sg_db
from neutron.objects.logapi import logging_resource as log_object
from neutron.objects import ports as port_objects
@ -31,7 +31,7 @@ LOG = logging.getLogger(__name__)
def _get_ports_attached_to_sg(context, sg_id):
"""Return a list of ports attached to a security group"""
with db_api.context_manager.reader.using(context):
with db_api.CONTEXT_READER.using(context):
ports = context.session.query(
sg_db.SecurityGroupPortBinding.port_id).filter(
sg_db.SecurityGroupPortBinding.security_group_id ==
@ -44,7 +44,7 @@ def _get_ports_filter_in_tenant(context, tenant_id):
try:
sg_id = sg_db.SecurityGroupPortBinding.security_group_id
with db_api.context_manager.reader.using(context):
with db_api.CONTEXT_READER.using(context):
ports = context.session.query(
sg_db.SecurityGroupPortBinding.port_id).join(
sg_db.SecurityGroup, sg_db.SecurityGroup.id == sg_id).filter(
@ -57,7 +57,7 @@ def _get_ports_filter_in_tenant(context, tenant_id):
def _get_sgs_attached_to_port(context, port_id):
"""Return a list of security groups are associated to a port"""
with db_api.context_manager.reader.using(context):
with db_api.CONTEXT_READER.using(context):
sg_ids = context.session.query(
sg_db.SecurityGroupPortBinding.security_group_id).filter(
sg_db.SecurityGroupPortBinding.port_id == port_id).all()

View File

@ -13,7 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron.db import api as db_api
from neutron_lib.db import api as db_api
from neutron.db import db_base_plugin_common
from neutron.extensions import logging as log_ext
from neutron.objects import base as base_obj
@ -69,7 +70,7 @@ class LoggingPlugin(log_ext.LoggingPluginBase):
"""Create a log object"""
log_data = log['log']
self.validator_mgr.validate_request(context, log_data)
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
# body 'log' contains both tenant_id and project_id
# but only latter needs to be used to create Log object.
# We need to remove redundant keyword.
@ -88,7 +89,7 @@ class LoggingPlugin(log_ext.LoggingPluginBase):
def update_log(self, context, log_id, log):
"""Update information for the specified log object"""
log_data = log['log']
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
log_obj = log_object.Log(context, id=log_id)
log_obj.update_fields(log_data, reset_changes=True)
log_obj.update()
@ -103,7 +104,7 @@ class LoggingPlugin(log_ext.LoggingPluginBase):
def delete_log(self, context, log_id):
"""Delete the specified log object"""
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
log_obj = self._get_log(context, log_id)
log_obj.delete()
self.driver_manager.call(

View File

@ -23,7 +23,7 @@ from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants as lib_consts
from neutron_lib.db import api as lib_db_api
from neutron_lib.db import api as db_api
from neutron_lib.db import utils as db_utils
from neutron_lib import exceptions as lib_exc
from neutron_lib.exceptions import l3 as lib_l3_exc
@ -37,7 +37,6 @@ from neutron.api.rpc.callbacks import events as rpc_events
from neutron.api.rpc.handlers import resources_rpc
from neutron.common import utils
from neutron.db import _resource_extend as resource_extend
from neutron.db import api as db_api
from neutron.db import db_base_plugin_common
from neutron.extensions import floating_ip_port_forwarding as fip_pf
from neutron.objects import base as base_obj
@ -137,7 +136,7 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
@registry.receives(resources.PORT, [events.AFTER_UPDATE,
events.PRECOMMIT_DELETE])
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def _process_port_request(self, resource, event, trigger, context,
**kwargs):
# Deleting floatingip will receive port resource with precommit_delete
@ -198,7 +197,7 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
# here, if event is AFTER_UPDATE, and remove_ip_set is empty, the
# following block won't be processed.
remove_port_forwarding_list = []
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
for pf_resource in pf_resources:
if str(pf_resource.internal_ip_address) in remove_ip_set:
pf_objs = pf.PortForwarding.get_objects(
@ -284,7 +283,7 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
port_forwarding = port_forwarding.get(apidef.RESOURCE_NAME)
port_forwarding['floatingip_id'] = floatingip_id
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
fip_obj = self._get_fip_obj(context, floatingip_id)
if fip_obj.fixed_port_id:
raise lib_l3_exc.FloatingIPPortAlreadyAssociated(
@ -331,7 +330,7 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
if port_forwarding and port_forwarding.get('internal_port_id'):
new_internal_port_id = port_forwarding.get('internal_port_id')
try:
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
fip_obj = self._get_fip_obj(context, floatingip_id)
pf_obj = pf.PortForwarding.get_object(context, id=id)
if not pf_obj:
@ -455,7 +454,7 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
if not pf_obj or pf_obj.floatingip_id != floatingip_id:
raise pf_exc.PortForwardingNotFound(id=id)
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
fip_obj = self._get_fip_obj(context, pf_obj.floatingip_id)
pf_objs = pf.PortForwarding.get_objects(
context, floatingip_id=pf_obj.floatingip_id)

View File

@ -18,12 +18,11 @@ from neutron_lib.api.definitions import qos_bw_minimum_ingress
from neutron_lib.callbacks import events as callbacks_events
from neutron_lib.callbacks import registry as callbacks_registry
from neutron_lib.callbacks import resources as callbacks_resources
from neutron_lib.db import api as lib_db_api
from neutron_lib.db import api as db_api
from neutron_lib import exceptions as lib_exc
from neutron_lib.services.qos import constants as qos_consts
from neutron.common import exceptions as n_exc
from neutron.db import api as db_api
from neutron.db import db_base_plugin_common
from neutron.extensions import qos
from neutron.objects import base as base_obj
@ -172,7 +171,7 @@ class QoSPlugin(qos.QoSPluginBase):
# needs to be backward compatible.
policy['policy'].pop('tenant_id', None)
policy_obj = policy_object.QosPolicy(context, **policy['policy'])
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
policy_obj.create()
self.driver_manager.call(qos_consts.CREATE_POLICY_PRECOMMIT,
context, policy_obj)
@ -195,7 +194,7 @@ class QoSPlugin(qos.QoSPluginBase):
:returns: a QosPolicy object
"""
policy_data = policy['policy']
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
policy_obj = policy_object.QosPolicy.get_policy_obj(
context, policy_id)
policy_obj.update_fields(policy_data, reset_changes=True)
@ -218,7 +217,7 @@ class QoSPlugin(qos.QoSPluginBase):
:returns: None
"""
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
policy = policy_object.QosPolicy(context)
policy.id = policy_id
policy.delete()
@ -301,7 +300,7 @@ class QoSPlugin(qos.QoSPluginBase):
rule_type = rule_cls.rule_type
rule_data = rule_data[rule_type + '_rule']
with lib_db_api.autonested_transaction(context.session):
with db_api.autonested_transaction(context.session):
# Ensure that we have access to the policy.
policy = policy_object.QosPolicy.get_policy_obj(context, policy_id)
checker.check_bandwidth_rule_conflict(policy, rule_data)
@ -338,7 +337,7 @@ class QoSPlugin(qos.QoSPluginBase):
rule_type = rule_cls.rule_type
rule_data = rule_data[rule_type + '_rule']
with lib_db_api.autonested_transaction(context.session):
with db_api.autonested_transaction(context.session):
# Ensure we have access to the policy.
policy = policy_object.QosPolicy.get_policy_obj(context, policy_id)
# Ensure the rule belongs to the policy.
@ -371,7 +370,7 @@ class QoSPlugin(qos.QoSPluginBase):
:returns: None
"""
with lib_db_api.autonested_transaction(context.session):
with db_api.autonested_transaction(context.session):
# Ensure we have access to the policy.
policy = policy_object.QosPolicy.get_policy_obj(context, policy_id)
rule = policy.get_rule_by_id(rule_id)
@ -400,7 +399,7 @@ class QoSPlugin(qos.QoSPluginBase):
:returns: a QoS policy rule object
:raises: n_exc.QosRuleNotFound
"""
with lib_db_api.autonested_transaction(context.session):
with db_api.autonested_transaction(context.session):
# Ensure we have access to the policy.
policy_object.QosPolicy.get_policy_obj(context, policy_id)
rule = rule_cls.get_object(context, id=rule_id)
@ -425,7 +424,7 @@ class QoSPlugin(qos.QoSPluginBase):
:returns: QoS policy rule objects meeting the search criteria
"""
with lib_db_api.autonested_transaction(context.session):
with db_api.autonested_transaction(context.session):
# Ensure we have access to the policy.
policy_object.QosPolicy.get_policy_obj(context, policy_id)
filters = filters or dict()

View File

@ -18,7 +18,7 @@ from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants
from neutron_lib.db import api as lib_db_api
from neutron_lib.db import api as db_api
from neutron_lib.db import utils as db_utils
from neutron_lib import exceptions as n_exc
from neutron_lib.plugins import directory
@ -28,7 +28,6 @@ from oslo_log import helpers as log_helpers
from oslo_utils import uuidutils
from neutron.db import _resource_extend as resource_extend
from neutron.db import api as db_api
from neutron.db import common_db_mixin
from neutron.db import segments_db as db
from neutron.extensions import segment as extension
@ -75,7 +74,7 @@ class SegmentDbMixin(common_db_mixin.CommonDbMixin):
return self._make_segment_dict(new_segment)
def _create_segment_db(self, context, segment_id, segment):
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
network_id = segment['network_id']
physical_network = segment[extension.PHYSICAL_NETWORK]
if physical_network == constants.ATTR_NOT_SPECIFIED:
@ -127,7 +126,7 @@ class SegmentDbMixin(common_db_mixin.CommonDbMixin):
def update_segment(self, context, uuid, segment):
"""Update an existing segment."""
segment = segment['segment']
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
curr_segment = self._get_segment(context, uuid)
curr_segment.update_fields(segment)
curr_segment.update()
@ -171,7 +170,7 @@ class SegmentDbMixin(common_db_mixin.CommonDbMixin):
segment=segment_dict, for_net_delete=for_net_delete)
# Delete segment in DB
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
if not network.NetworkSegment.delete_objects(context, id=uuid):
raise exceptions.SegmentNotFound(segment_id=uuid)
# Do some preliminary operations before deleting segment in db
@ -184,10 +183,10 @@ class SegmentDbMixin(common_db_mixin.CommonDbMixin):
segment=segment_dict)
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
@lockutils.synchronized('update_segment_host_mapping')
def update_segment_host_mapping(context, host, current_segment_ids):
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
segment_host_mapping = network.SegmentHostMapping.get_objects(
context, host=host)
previous_segment_ids = {
@ -238,14 +237,14 @@ def get_segments_with_phys_nets(context, phys_nets):
if not phys_nets:
return []
with db_api.context_manager.reader.using(context):
with db_api.CONTEXT_READER.using(context):
return network.NetworkSegment.get_objects(
context, physical_network=phys_nets)
def map_segment_to_hosts(context, segment_id, hosts):
"""Map segment to a collection of hosts."""
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
for host in hosts:
network.SegmentHostMapping(
context, segment_id=segment_id, host=host).create()

View File

@ -12,7 +12,7 @@
# under the License.
#
from neutron_lib.db import api as lib_db_api
from neutron_lib.db import api as db_api
from neutron_lib.objects import exceptions as obj_exc
from neutron_lib.plugins import directory
from oslo_log import helpers as log_helpers
@ -20,7 +20,6 @@ from sqlalchemy.orm import exc
from neutron.db import _model_query as model_query
from neutron.db import _resource_extend as resource_extend
from neutron.db import api as db_api
from neutron.db import common_db_mixin
from neutron.db import standard_attr
from neutron.extensions import tagging
@ -73,9 +72,9 @@ class TagPlugin(common_db_mixin.CommonDbMixin, tagging.TagPluginBase):
raise tagging.TagNotFound(tag=tag)
@log_helpers.log_method_call
@lib_db_api.retry_if_session_inactive()
@db_api.retry_if_session_inactive()
def update_tags(self, context, resource, resource_id, body):
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
# We get and do all operations with objects in one session
res = self._get_resource(context, resource, resource_id)
new_tags = set(body['tags'])

View File

@ -25,7 +25,7 @@ from neutron_lib.callbacks import exceptions
from neutron_lib.callbacks import registry
from neutron_lib import constants
from neutron_lib import context
from neutron_lib.db import api as lib_db_api
from neutron_lib.db import api as db_api
from neutron_lib import exceptions as lib_exc
from neutron_lib import fixture
from neutron_lib.plugins import directory
@ -49,7 +49,6 @@ from neutron.common import exceptions as n_exc
from neutron.common import ipv6_utils
from neutron.common import test_lib
from neutron.common import utils
from neutron.db import api as db_api
from neutron.db import db_base_plugin_common
from neutron.db import ipam_backend_mixin
from neutron.db.models import l3 as l3_models
@ -2703,7 +2702,7 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
def test_update_network_set_not_shared_other_tenant_access_via_rbac(self):
with self.network(shared=True) as network:
ctx = context.get_admin_context()
with db_api.context_manager.writer.using(ctx):
with db_api.CONTEXT_WRITER.using(ctx):
ctx.session.add(
rbac_db_models.NetworkRBAC(
object_id=network['network']['id'],
@ -4617,7 +4616,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
# this protection only applies to router ports so we need
# to make this port belong to a router
ctx = context.get_admin_context()
with db_api.context_manager.writer.using(ctx):
with db_api.CONTEXT_WRITER.using(ctx):
router = l3_models.Router()
ctx.session.add(router)
rp = l3_obj.RouterPort(ctx, router_id=router.id,
@ -4631,7 +4630,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
self.assertEqual(409, res.status_int)
# should work fine if it's not a router port
rp.delete()
with db_api.context_manager.writer.using(ctx):
with db_api.CONTEXT_WRITER.using(ctx):
ctx.session.delete(router)
res = req.get_response(self.api)
self.assertEqual(res.status_int, 200)
@ -6312,11 +6311,11 @@ class DbModelMixin(object):
def test_make_network_dict_outside_engine_facade_manager(self):
mock.patch.object(directory, 'get_plugin').start()
ctx = context.get_admin_context()
with db_api.context_manager.writer.using(ctx):
with db_api.CONTEXT_WRITER.using(ctx):
network = models_v2.Network(name="net_net", status="OK",
admin_state_up=True)
ctx.session.add(network)
with lib_db_api.autonested_transaction(ctx.session):
with db_api.autonested_transaction(ctx.session):
sg = sg_models.SecurityGroup(name='sg', description='sg')
ctx.session.add(sg)
# ensure db rels aren't loaded until commit for network object
@ -6352,7 +6351,7 @@ class DbModelMixin(object):
self.assertEqual(final_exp, actual_repr_output)
def _make_security_group_and_rule(self, ctx):
with db_api.context_manager.writer.using(ctx):
with db_api.CONTEXT_WRITER.using(ctx):
sg = sg_models.SecurityGroup(name='sg', description='sg')
rule = sg_models.SecurityGroupRule(
security_group=sg, port_range_min=1,
@ -6372,7 +6371,7 @@ class DbModelMixin(object):
return flip
def _make_router(self, ctx):
with db_api.context_manager.writer.using(ctx):
with db_api.CONTEXT_WRITER.using(ctx):
router = l3_models.Router()
ctx.session.add(router)
return router
@ -6455,7 +6454,7 @@ class DbModelMixin(object):
def _lock_blocked_name_update():
ctx = context.get_admin_context()
with db_api.context_manager.writer.using(ctx):
with db_api.CONTEXT_WRITER.using(ctx):
thing = ctx.session.query(model).filter_by(id=dbid).one()
thing.bump_revision()
thing.name = 'newname'
@ -6470,7 +6469,7 @@ class DbModelMixin(object):
while not self._blocked_on_lock:
eventlet.sleep(0)
ctx = context.get_admin_context()
with db_api.context_manager.writer.using(ctx):
with db_api.CONTEXT_WRITER.using(ctx):
thing = ctx.session.query(model).filter_by(id=dbid).one()
thing.bump_revision()
thing.description = 'a description'
@ -6545,7 +6544,7 @@ class DbModelMixin(object):
class DbModelTenantTestCase(DbModelMixin, testlib_api.SqlTestCase):
def _make_network(self, ctx):
with db_api.context_manager.writer.using(ctx):
with db_api.CONTEXT_WRITER.using(ctx):
network = models_v2.Network(name="net_net", status="OK",
tenant_id='dbcheck',
admin_state_up=True)
@ -6553,7 +6552,7 @@ class DbModelTenantTestCase(DbModelMixin, testlib_api.SqlTestCase):
return network
def _make_subnet(self, ctx, network_id):
with db_api.context_manager.writer.using(ctx):
with db_api.CONTEXT_WRITER.using(ctx):
subnet = models_v2.Subnet(name="subsub",
ip_version=constants.IP_VERSION_4,
tenant_id='dbcheck',
@ -6572,7 +6571,7 @@ class DbModelTenantTestCase(DbModelMixin, testlib_api.SqlTestCase):
return port
def _make_subnetpool(self, ctx):
with db_api.context_manager.writer.using(ctx):
with db_api.CONTEXT_WRITER.using(ctx):
subnetpool = models_v2.SubnetPool(
ip_version=constants.IP_VERSION_4, default_prefixlen=4,
min_prefixlen=4, max_prefixlen=4, shared=False,
@ -6585,7 +6584,7 @@ class DbModelTenantTestCase(DbModelMixin, testlib_api.SqlTestCase):
class DbModelProjectTestCase(DbModelMixin, testlib_api.SqlTestCase):
def _make_network(self, ctx):
with db_api.context_manager.writer.using(ctx):
with db_api.CONTEXT_WRITER.using(ctx):
network = models_v2.Network(name="net_net", status="OK",
project_id='dbcheck',
admin_state_up=True)
@ -6593,7 +6592,7 @@ class DbModelProjectTestCase(DbModelMixin, testlib_api.SqlTestCase):
return network
def _make_subnet(self, ctx, network_id):
with db_api.context_manager.writer.using(ctx):
with db_api.CONTEXT_WRITER.using(ctx):
subnet = models_v2.Subnet(name="subsub",
ip_version=constants.IP_VERSION_4,
project_id='dbcheck',
@ -6612,7 +6611,7 @@ class DbModelProjectTestCase(DbModelMixin, testlib_api.SqlTestCase):
return port
def _make_subnetpool(self, ctx):
with db_api.context_manager.writer.using(ctx):
with db_api.CONTEXT_WRITER.using(ctx):
subnetpool = models_v2.SubnetPool(
ip_version=constants.IP_VERSION_4, default_prefixlen=4,
min_prefixlen=4, max_prefixlen=4, shared=False,
@ -6756,8 +6755,8 @@ class DbOperationBoundMixin(object):
def _event_incrementer(conn, clauseelement, *args, **kwargs):
self._recorded_statements.append(str(clauseelement))
engine = db_api.context_manager.writer.get_engine()
lib_db_api.sqla_listen(engine, 'after_execute', _event_incrementer)
engine = db_api.CONTEXT_WRITER.get_engine()
db_api.sqla_listen(engine, 'after_execute', _event_incrementer)
def _get_context(self):
if self.admin:

View File

@ -18,6 +18,7 @@ import copy
import fixtures
import mock
from neutron_lib import context
from neutron_lib.db import api as db_api
from neutron_lib.db import constants as db_const
from neutron_lib.exceptions import flavors as flav_exc
from neutron_lib.plugins import constants
@ -25,7 +26,6 @@ from oslo_config import cfg
from oslo_utils import uuidutils
from webob import exc
from neutron.db import api as dbapi
from neutron.db.models import l3 as l3_models
from neutron.db import servicetype_db
from neutron.extensions import flavors
@ -460,7 +460,7 @@ class FlavorPluginTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase,
self.service_manager.add_provider_configuration(
provider.split(':')[0], provconf.ProviderConfiguration())
dbapi.context_manager.writer.get_engine()
db_api.CONTEXT_WRITER.get_engine()
def _create_flavor(self, description=None):
flavor = {'flavor': {'name': 'GOLD',

View File

@ -18,12 +18,12 @@ import copy
from neutron_lib.api.definitions import port_security as psec
from neutron_lib.api import validators
from neutron_lib import context
from neutron_lib.db import api as db_api
from neutron_lib.db import utils as db_utils
from neutron_lib.exceptions import port_security as psec_exc
from neutron_lib.plugins import directory
from webob import exc
from neutron.db import api as db_api
from neutron.db import db_base_plugin_v2
from neutron.db import portsecurity_db
from neutron.db import securitygroups_db
@ -68,7 +68,7 @@ class PortSecurityTestPlugin(db_base_plugin_v2.NeutronDbPluginV2,
def create_network(self, context, network):
tenant_id = network['network'].get('tenant_id')
self._ensure_default_security_group(context, tenant_id)
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
neutron_db = super(PortSecurityTestPlugin, self).create_network(
context, network)
neutron_db.update(network['network'])
@ -77,7 +77,7 @@ class PortSecurityTestPlugin(db_base_plugin_v2.NeutronDbPluginV2,
return neutron_db
def update_network(self, context, id, network):
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
neutron_db = super(PortSecurityTestPlugin, self).update_network(
context, id, network)
if psec.PORTSECURITY in network['network']:
@ -86,7 +86,7 @@ class PortSecurityTestPlugin(db_base_plugin_v2.NeutronDbPluginV2,
return neutron_db
def get_network(self, context, id, fields=None):
with db_api.context_manager.reader.using(context):
with db_api.CONTEXT_READER.using(context):
net = super(PortSecurityTestPlugin, self).get_network(
context, id)
return db_utils.resource_fields(net, fields)
@ -122,7 +122,7 @@ class PortSecurityTestPlugin(db_base_plugin_v2.NeutronDbPluginV2,
delete_security_groups = self._check_update_deletes_security_groups(
port)
has_security_groups = self._check_update_has_security_groups(port)
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
ret_port = super(PortSecurityTestPlugin, self).update_port(
context, id, port)
# copy values over - but not fixed_ips

View File

@ -20,6 +20,7 @@ import mock
from neutron_lib.api import validators
from neutron_lib import constants as const
from neutron_lib import context
from neutron_lib.db import api as db_api
from neutron_lib.db import constants as db_const
from neutron_lib.plugins import directory
from oslo_config import cfg
@ -28,7 +29,6 @@ import testtools
import webob.exc
from neutron.common import exceptions as n_exc
from neutron.db import api as db_api
from neutron.db import db_base_plugin_v2
from neutron.db import securitygroups_db
from neutron.extensions import securitygroup as ext_sg
@ -205,7 +205,7 @@ class SecurityGroupTestPlugin(db_base_plugin_v2.NeutronDbPluginV2,
default_sg = self._ensure_default_security_group(context, tenant_id)
if not validators.is_attr_set(port['port'].get(ext_sg.SECURITYGROUPS)):
port['port'][ext_sg.SECURITYGROUPS] = [default_sg]
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
sgids = self._get_security_groups_on_port(context, port)
port = super(SecurityGroupTestPlugin, self).create_port(context,
port)
@ -214,7 +214,7 @@ class SecurityGroupTestPlugin(db_base_plugin_v2.NeutronDbPluginV2,
return port
def update_port(self, context, id, port):
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
if ext_sg.SECURITYGROUPS in port['port']:
port['port'][ext_sg.SECURITYGROUPS] = (
self._get_security_groups_on_port(context, port))

View File

@ -17,13 +17,13 @@ import mock
import netaddr
from neutron_lib import constants
from neutron_lib import context
from neutron_lib.db import api as db_api
from neutron_lib.plugins import directory
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_utils import uuidutils
from neutron.common import exceptions as n_exc
from neutron.db import api as db_api
from neutron.ipam import requests as ipam_req
from neutron.ipam import subnet_alloc
from neutron.tests.unit.db import test_db_base_plugin_v2
@ -65,7 +65,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp',
prefix_list, 21, 4)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
with db_api.context_manager.writer.using(self.ctx):
with db_api.CONTEXT_WRITER.using(self.ctx):
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam_req.AnySubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
@ -81,7 +81,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp',
['10.1.0.0/16', '192.168.1.0/24'],
21, 4)
with db_api.context_manager.writer.using(self.ctx):
with db_api.CONTEXT_WRITER.using(self.ctx):
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam_req.SpecificSubnetRequest(self._tenant_id,
@ -123,7 +123,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
['10.1.0.0/16', '192.168.1.0/24'],
21, 4)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
with db_api.context_manager.writer.using(self.ctx):
with db_api.CONTEXT_WRITER.using(self.ctx):
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam_req.AnySubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
@ -138,7 +138,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
['10.1.0.0/16', '192.168.1.0/24'],
21, 4)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
with db_api.context_manager.writer.using(self.ctx):
with db_api.CONTEXT_WRITER.using(self.ctx):
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam_req.SpecificSubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
@ -155,7 +155,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
['2210::/64'],
64, 6)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
with db_api.context_manager.writer.using(self.ctx):
with db_api.CONTEXT_WRITER.using(self.ctx):
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam_req.SpecificSubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),

View File

@ -20,7 +20,7 @@ import mock
import netaddr
from neutron_lib import constants
from neutron_lib import context
from neutron_lib.db import api as lib_db_api
from neutron_lib.db import api as db_api
from neutron_lib import exceptions as n_exc
from neutron_lib.objects import exceptions as o_exc
from neutron_lib.objects import utils as obj_utils
@ -34,7 +34,6 @@ from oslo_versionedobjects import fields as obj_fields
import testtools
from neutron.db import _model_query as model_query
from neutron.db import api as db_api
from neutron import objects
from neutron.objects import agent
from neutron.objects import base
@ -1707,13 +1706,13 @@ class BaseDbObjectTestCase(_BaseObjectTestCase,
def test_get_objects_single_transaction(self):
with mock.patch(self._get_ro_txn_exit_func_name()) as mock_exit:
with lib_db_api.autonested_transaction(self.context.session):
with db_api.autonested_transaction(self.context.session):
self._test_class.get_objects(self.context)
self.assertEqual(1, mock_exit.call_count)
def test_get_objects_single_transaction_enginefacade(self):
with mock.patch(self._get_ro_txn_exit_func_name()) as mock_exit:
with db_api.context_manager.reader.using(self.context):
with db_api.CONTEXT_READER.using(self.context):
self._test_class.get_objects(self.context)
self.assertEqual(1, mock_exit.call_count)
@ -1722,7 +1721,7 @@ class BaseDbObjectTestCase(_BaseObjectTestCase,
obj.create()
with mock.patch(self._get_ro_txn_exit_func_name()) as mock_exit:
with lib_db_api.autonested_transaction(self.context.session):
with db_api.autonested_transaction(self.context.session):
obj = self._test_class.get_object(self.context,
**obj._get_composite_keys())
self.assertEqual(1, mock_exit.call_count)
@ -1732,7 +1731,7 @@ class BaseDbObjectTestCase(_BaseObjectTestCase,
obj.create()
with mock.patch(self._get_ro_txn_exit_func_name()) as mock_exit:
with db_api.context_manager.reader.using(self.context):
with db_api.CONTEXT_READER.using(self.context):
obj = self._test_class.get_object(self.context,
**obj._get_composite_keys())
self.assertEqual(1, mock_exit.call_count)

View File

@ -16,13 +16,13 @@
import mock
from neutron_lib import constants as p_const
from neutron_lib import context
from neutron_lib.db import api as db_api
from neutron_lib import exceptions as exc
from neutron_lib.plugins.ml2 import api
from neutron_lib.plugins import utils as plugin_utils
from oslo_config import cfg
from testtools import matchers
from neutron.db import api as db_api
from neutron.objects.plugins.ml2 import vlanallocation as vlan_alloc_obj
from neutron.plugins.ml2.drivers import type_vlan
from neutron.tests.unit import testlib_api
@ -65,7 +65,7 @@ class VlanTypeTest(testlib_api.SqlTestCase):
self.assertRaises(SystemExit,
self.driver._parse_network_vlan_ranges)
@db_api.context_manager.reader
@db_api.CONTEXT_READER
def _get_allocation(self, context, segment):
return vlan_alloc_obj.VlanAllocation.get_object(
context,

View File

@ -20,12 +20,12 @@ import netaddr
from neutron_lib.api.definitions import portbindings
from neutron_lib import constants
from neutron_lib import context
from neutron_lib.db import api as db_api
from neutron_lib.plugins.ml2 import api
from oslo_utils import uuidutils
from sqlalchemy.orm import exc
from sqlalchemy.orm import query
from neutron.db import api as db_api
from neutron.db import db_base_plugin_v2
from neutron.db.models import l3 as l3_models
from neutron.db import segments_db
@ -64,7 +64,7 @@ class Ml2DBTestCase(testlib_api.SqlTestCase):
def _setup_neutron_portbinding(self, port_id, vif_type, host,
status=constants.ACTIVE):
with db_api.context_manager.writer.using(self.ctx):
with db_api.CONTEXT_WRITER.using(self.ctx):
self.ctx.session.add(models.PortBinding(port_id=port_id,
vif_type=vif_type,
host=host,
@ -296,7 +296,7 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase):
self.setup_coreplugin(PLUGIN_NAME)
def _setup_neutron_network(self, network_id, port_ids):
with db_api.context_manager.writer.using(self.ctx):
with db_api.CONTEXT_WRITER.using(self.ctx):
network_obj.Network(self.ctx, id=network_id).create()
ports = []
for port_id in port_ids:
@ -322,7 +322,7 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase):
def _setup_distributed_binding(self, network_id,
port_id, router_id, host_id):
with db_api.context_manager.writer.using(self.ctx):
with db_api.CONTEXT_WRITER.using(self.ctx):
record = models.DistributedPortBinding(
port_id=port_id,
host=host_id,

View File

@ -30,7 +30,7 @@ from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants
from neutron_lib import context
from neutron_lib.db import api as lib_db_api
from neutron_lib.db import api as db_api
from neutron_lib import exceptions as exc
from neutron_lib import fixture
from neutron_lib.plugins import constants as plugin_constants
@ -46,7 +46,6 @@ import webob
from neutron._i18n import _
from neutron.common import utils
from neutron.db import agents_db
from neutron.db import api as db_api
from neutron.db import provisioning_blocks
from neutron.db import securitygroups_db as sg_db
from neutron.db import segments_db
@ -1383,10 +1382,10 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
raise db_exc.DBDuplicateEntry()
listener = IPAllocationsGrenade()
engine = db_api.context_manager.writer.get_engine()
lib_db_api.sqla_listen(engine, 'before_cursor_execute',
listener.execute)
lib_db_api.sqla_listen(engine, 'commit', listener.commit)
engine = db_api.CONTEXT_WRITER.get_engine()
db_api.sqla_listen(engine, 'before_cursor_execute',
listener.execute)
db_api.sqla_listen(engine, 'commit', listener.commit)
func()
# make sure that the grenade went off during the commit
self.assertTrue(listener.except_raised)
@ -1847,7 +1846,7 @@ class TestMl2DvrPortsV2(TestMl2PortsV2):
{'subnet_id': s['subnet']['id']})
# lie to turn the port into an SNAT interface
with db_api.context_manager.writer.using(self.context):
with db_api.CONTEXT_WRITER.using(self.context):
pager = base_obj.Pager(limit=1)
rp = l3_obj.RouterPort.get_objects(
self.context, _pager=pager, port_id=p['port_id'])
@ -2933,14 +2932,14 @@ class TestTransactionGuard(Ml2PluginV2TestCase):
def test_delete_network_guard(self):
plugin = directory.get_plugin()
ctx = context.get_admin_context()
with db_api.context_manager.writer.using(ctx):
with db_api.CONTEXT_WRITER.using(ctx):
with testtools.ExpectedException(RuntimeError):
plugin.delete_network(ctx, 'id')
def test_delete_subnet_guard(self):
plugin = directory.get_plugin()
ctx = context.get_admin_context()
with db_api.context_manager.writer.using(ctx):
with db_api.CONTEXT_WRITER.using(ctx):
with testtools.ExpectedException(RuntimeError):
plugin.delete_subnet(ctx, 'id')
@ -3078,7 +3077,7 @@ class TestML2Segments(Ml2PluginV2TestCase):
with self.port(device_owner=fake_owner_compute) as port:
# add writer here to make sure that the following operations are
# performed in the same session
with db_api.context_manager.writer.using(self.context):
with db_api.CONTEXT_WRITER.using(self.context):
binding = p_utils.get_port_binding_by_status_and_host(
plugin._get_port(self.context,
port['port']['id']).port_bindings,

View File

@ -19,6 +19,7 @@ import testresources
import testscenarios
import testtools
from neutron_lib.db import api as db_api
from neutron_lib.db import model_base
from oslo_config import cfg
from oslo_db import exception as oslodb_exception
@ -26,7 +27,6 @@ from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import provision
from oslo_db.sqlalchemy import session
from neutron.db import api as db_api
from neutron.db.migration import cli as migration
# Import all data models
from neutron.db.migration.models import head # noqa
@ -101,19 +101,19 @@ class SqlFixture(fixtures.Fixture):
self.sessionmaker = session.get_maker(engine)
_restore_factory = db_api.context_manager._root_factory
_restore_factory = db_api.get_context_manager()._root_factory
self.enginefacade_factory = enginefacade._TestTransactionFactory(
self.engine, self.sessionmaker, from_factory=_restore_factory,
apply_global=False)
db_api.context_manager._root_factory = self.enginefacade_factory
db_api.get_context_manager()._root_factory = self.enginefacade_factory
engine = db_api.context_manager.writer.get_engine()
engine = db_api.CONTEXT_WRITER.get_engine()
self.addCleanup(
lambda: setattr(
db_api.context_manager,
db_api.get_context_manager(),
"_root_factory", _restore_factory))
self.useFixture(EnableSQLiteFKsFixture(engine))
@ -163,7 +163,8 @@ class StaticSqlFixture(SqlFixture):
else:
cls._GLOBAL_RESOURCES = True
cls.schema_resource = provision.SchemaResource(
provision.DatabaseResource("sqlite", db_api.context_manager),
provision.DatabaseResource(
"sqlite", db_api.get_context_manager()),
cls._generate_schema, teardown=False)
dependency_resources = {}
for name, resource in cls.schema_resource.resources:
@ -187,7 +188,7 @@ class StaticSqlFixtureNoSchema(SqlFixture):
else:
cls._GLOBAL_RESOURCES = True
cls.database_resource = provision.DatabaseResource(
"sqlite", db_api.context_manager)
"sqlite", db_api.get_context_manager())
dependency_resources = {}
for name, resource in cls.database_resource.resources:
dependency_resources[name] = resource.getResource()

View File

@ -23,6 +23,7 @@ import time
import eventlet.wsgi
from neutron_lib import context
from neutron_lib.db import api as db_api
from neutron_lib import exceptions as exception
from neutron_lib import worker as neutron_worker
from oslo_config import cfg
@ -43,7 +44,6 @@ from neutron._i18n import _
from neutron.common import config
from neutron.common import exceptions as n_exc
from neutron.conf import wsgi as wsgi_config
from neutron.db import api
CONF = cfg.CONF
wsgi_config.register_socket_opts()
@ -188,7 +188,7 @@ class Server(object):
# dispose the whole pool before os.fork, otherwise there will
# be shared DB connections in child processes which may cause
# DB errors.
api.context_manager.dispose_pool()
db_api.get_context_manager().dispose_pool()
# The API service runs in a number of child processes.
# Minimize the cost of checking for child exit by extending the
# wait interval past the default of 0.01s.