Refactor some router-related methods

This is done in preparation for the distributed
router functionality. Breaking down and factoring
out some functionality helps the DVR work to come
as a more loosely coupled addition to the L3
centralized case. This also ensures that the two
code bases are kept separately to minimize chance
of regression, and simplify code coverage effort.

Partially-implements: blueprint neutron-ovs-dvr

Change-Id: Ide1bc193666ce03a3ae26b1869bde8093ed453ef
This commit is contained in:
armando-migliaccio 2014-05-30 14:58:30 -07:00
parent d720cb5cf6
commit 55243850d4
2 changed files with 248 additions and 200 deletions

View File

@ -44,6 +44,7 @@ EXTERNAL_GW_INFO = l3.EXTERNAL_GW_INFO
# API parameter name and Database column names may differ.
# Useful to keep the filtering between API and Database.
API_TO_DB_COLUMN_MAP = {'port_id': 'fixed_port_id'}
CORE_ROUTER_ATTRS = ('id', 'name', 'tenant_id', 'admin_state_up', 'status')
class Router(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
@ -81,89 +82,91 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
"""Mixin class to add L3/NAT router methods to db_base_plugin_v2."""
l3_rpc_notifier = l3_rpc_agent_api.L3AgentNotify
router_device_owners = (
DEVICE_OWNER_ROUTER_INTF,
DEVICE_OWNER_ROUTER_GW,
DEVICE_OWNER_FLOATINGIP
)
@property
def _core_plugin(self):
return manager.NeutronManager.get_plugin()
def _get_router(self, context, id):
def _get_router(self, context, router_id):
try:
router = self._get_by_id(context, Router, id)
router = self._get_by_id(context, Router, router_id)
except exc.NoResultFound:
raise l3.RouterNotFound(router_id=id)
raise l3.RouterNotFound(router_id=router_id)
return router
def _make_router_dict(self, router, fields=None,
process_extensions=True):
res = {'id': router['id'],
'name': router['name'],
'tenant_id': router['tenant_id'],
'admin_state_up': router['admin_state_up'],
'status': router['status'],
EXTERNAL_GW_INFO: None,
'gw_port_id': router['gw_port_id']}
def _make_router_dict(self, router, fields=None, process_extensions=True):
res = dict((key, router[key]) for key in CORE_ROUTER_ATTRS)
if router['gw_port_id']:
nw_id = router.gw_port['network_id']
res[EXTERNAL_GW_INFO] = {'network_id': nw_id}
ext_gw_info = {'network_id': router.gw_port['network_id']}
else:
ext_gw_info = None
res.update({
EXTERNAL_GW_INFO: ext_gw_info,
'gw_port_id': router['gw_port_id'],
})
# NOTE(salv-orlando): The following assumes this mixin is used in a
# class inheriting from CommonDbMixin, which is true for all existing
# plugins.
if process_extensions:
self._apply_dict_extend_functions(
l3.ROUTERS, res, router)
self._apply_dict_extend_functions(l3.ROUTERS, res, router)
return self._fields(res, fields)
def create_router(self, context, router):
r = router['router']
has_gw_info = False
if EXTERNAL_GW_INFO in r:
has_gw_info = True
gw_info = r[EXTERNAL_GW_INFO]
del r[EXTERNAL_GW_INFO]
tenant_id = self._get_tenant_id_for_create(context, r)
def _create_router_db(self, context, router, tenant_id, gw_info):
"""Create the DB object and update gw info, if available."""
with context.session.begin(subtransactions=True):
# pre-generate id so it will be available when
# configuring external gw port
router_db = Router(id=uuidutils.generate_uuid(),
tenant_id=tenant_id,
name=r['name'],
admin_state_up=r['admin_state_up'],
name=router['name'],
admin_state_up=router['admin_state_up'],
status="ACTIVE")
context.session.add(router_db)
if has_gw_info:
return router_db
def create_router(self, context, router):
r = router['router']
gw_info = r.pop(EXTERNAL_GW_INFO, None)
tenant_id = self._get_tenant_id_for_create(context, r)
with context.session.begin(subtransactions=True):
router_db = self._create_router_db(context, r, tenant_id, gw_info)
if gw_info:
self._update_router_gw_info(context, router_db['id'], gw_info)
return self._make_router_dict(router_db, process_extensions=False)
def _update_router_db(self, context, router_id, data, gw_info):
"""Update the DB object and related gw info, if available."""
with context.session.begin(subtransactions=True):
if gw_info != attributes.ATTR_NOT_SPECIFIED:
self._update_router_gw_info(context, router_id, gw_info)
router_db = self._get_router(context, router_id)
if data:
router_db.update(data)
return router_db
def update_router(self, context, id, router):
r = router['router']
has_gw_info = False
gw_info = None
if EXTERNAL_GW_INFO in r:
has_gw_info = True
gw_info = r[EXTERNAL_GW_INFO]
del r[EXTERNAL_GW_INFO]
gw_info = r.pop(EXTERNAL_GW_INFO, attributes.ATTR_NOT_SPECIFIED)
# check whether router needs and can be rescheduled to the proper
# l3 agent (associated with given external network);
# do check before update in DB as an exception will be raised
# in case no proper l3 agent found
candidates = None
if has_gw_info:
if gw_info != attributes.ATTR_NOT_SPECIFIED:
candidates = self._check_router_needs_rescheduling(
context, id, gw_info)
with context.session.begin(subtransactions=True):
if has_gw_info:
self._update_router_gw_info(context, id, gw_info)
router_db = self._get_router(context, id)
# Ensure we actually have something to update
if r.keys():
router_db.update(r)
router_db = self._update_router_db(context, id, r, gw_info)
if candidates:
l3_plugin = manager.NeutronManager.get_service_plugins().get(
constants.L3_ROUTER_NAT)
l3_plugin.reschedule_router(context, id, candidates)
self.l3_rpc_notifier.routers_updated(
context, [router_db['id']])
self.l3_rpc_notifier.routers_updated(context, [router_db['id']])
return self._make_router_dict(router_db)
def _check_router_needs_rescheduling(self, context, router_id, gw_info):
@ -254,61 +257,73 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
gw_port['id'])
context.session.add(router)
def _validate_gw_info(self, context, gw_port, info):
network_id = info['network_id'] if info else None
if network_id:
network_db = self._core_plugin._get_network(context, network_id)
if not network_db.external:
msg = _("Network %s is not an external network") % network_id
raise n_exc.BadRequest(resource='router', msg=msg)
return network_id
def _delete_current_gw_port(self, context, router_id, router, new_network):
"""Delete gw port, if it is attached to an old network."""
is_gw_port_attached_to_existing_network = (
router.gw_port and router.gw_port['network_id'] != new_network)
admin_ctx = context.elevated()
if is_gw_port_attached_to_existing_network:
if self.get_floatingips_count(
admin_ctx, {'router_id': [router_id]}):
raise l3.RouterExternalGatewayInUseByFloatingIp(
router_id=router_id, net_id=router.gw_port['network_id'])
with context.session.begin(subtransactions=True):
gw_port_id = router.gw_port['id']
router.gw_port = None
context.session.add(router)
self._core_plugin.delete_port(
admin_ctx, gw_port_id, l3_port_check=False)
def _create_gw_port(self, context, router_id, router, new_network):
new_valid_gw_port_attachment = (
new_network and (not router.gw_port or
router.gw_port['network_id'] != new_network))
if new_valid_gw_port_attachment:
subnets = self._core_plugin._get_subnets_by_network(context,
new_network)
for subnet in subnets:
self._check_for_dup_router_subnet(context, router_id,
new_network, subnet['id'],
subnet['cidr'])
self._create_router_gw_port(context, router, new_network)
def _update_router_gw_info(self, context, router_id, info, router=None):
# TODO(salvatore-orlando): guarantee atomic behavior also across
# operations that span beyond the model classes handled by this
# class (e.g.: delete_port)
router = router or self._get_router(context, router_id)
gw_port = router.gw_port
# network_id attribute is required by API, so it must be present
network_id = info['network_id'] if info else None
if network_id:
network_db = self._core_plugin._get_network(context, network_id)
if not network_db.external:
msg = _("Network %s is not a valid external "
"network") % network_id
raise n_exc.BadRequest(resource='router', msg=msg)
network_id = self._validate_gw_info(context, gw_port, info)
self._delete_current_gw_port(context, router_id, router, network_id)
self._create_gw_port(context, router_id, router, network_id)
# figure out if we need to delete existing port
if gw_port and gw_port['network_id'] != network_id:
fip_count = self.get_floatingips_count(context.elevated(),
{'router_id': [router_id]})
if fip_count:
raise l3.RouterExternalGatewayInUseByFloatingIp(
router_id=router_id, net_id=gw_port['network_id'])
with context.session.begin(subtransactions=True):
router.gw_port = None
context.session.add(router)
self._core_plugin.delete_port(context.elevated(),
gw_port['id'],
l3_port_check=False)
if network_id is not None and (gw_port is None or
gw_port['network_id'] != network_id):
subnets = self._core_plugin._get_subnets_by_network(context,
network_id)
for subnet in subnets:
self._check_for_dup_router_subnet(context, router_id,
network_id, subnet['id'],
subnet['cidr'])
self._create_router_gw_port(context, router, network_id)
def _ensure_router_not_in_use(self, context, router_id):
admin_ctx = context.elevated()
router = self._get_router(context, router_id)
if self.get_floatingips_count(
admin_ctx, filters={'router_id': [router_id]}):
raise l3.RouterInUse(router_id=router_id)
device_owner = self._get_device_owner(context, router)
device_filter = {'device_id': [router_id],
'device_owner': [device_owner]}
port_count = self._core_plugin.get_ports_count(
admin_ctx, filters=device_filter)
if port_count:
raise l3.RouterInUse(router_id=router_id)
return router
def delete_router(self, context, id):
with context.session.begin(subtransactions=True):
router = self._get_router(context, id)
# Ensure that the router is not used
fips = self.get_floatingips_count(context.elevated(),
filters={'router_id': [id]})
if fips:
raise l3.RouterInUse(router_id=id)
device_filter = {'device_id': [id],
'device_owner': [DEVICE_OWNER_ROUTER_INTF]}
ports = self._core_plugin.get_ports_count(context.elevated(),
filters=device_filter)
if ports:
raise l3.RouterInUse(router_id=id)
router = self._ensure_router_not_in_use(context, id)
#TODO(nati) Refactor here when we have router insertion model
vpnservice = manager.NeutronManager.get_service_plugins().get(
@ -382,64 +397,79 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
except exc.NoResultFound:
pass
def add_router_interface(self, context, router_id, interface_info):
def _get_device_owner(self, context, router=None):
"""Get device_owner for the specified router."""
# NOTE(armando-migliaccio): in the base case this is invariant
return DEVICE_OWNER_ROUTER_INTF
def _validate_interface_info(self, interface_info):
if not interface_info:
msg = _("Either subnet_id or port_id must be specified")
raise n_exc.BadRequest(resource='router', msg=msg)
port_id_specified = 'port_id' in interface_info
subnet_id_specified = 'subnet_id' in interface_info
if port_id_specified and subnet_id_specified:
msg = _("Cannot specify both subnet-id and port-id")
raise n_exc.BadRequest(resource='router', msg=msg)
return port_id_specified, subnet_id_specified
if 'port_id' in interface_info:
# make sure port update is committed
with context.session.begin(subtransactions=True):
if 'subnet_id' in interface_info:
msg = _("Cannot specify both subnet-id and port-id")
raise n_exc.BadRequest(resource='router', msg=msg)
port = self._core_plugin._get_port(context,
interface_info['port_id'])
if port['device_id']:
raise n_exc.PortInUse(net_id=port['network_id'],
port_id=port['id'],
device_id=port['device_id'])
fixed_ips = [ip for ip in port['fixed_ips']]
if len(fixed_ips) != 1:
msg = _('Router port must have exactly one fixed IP')
raise n_exc.BadRequest(resource='router', msg=msg)
subnet_id = fixed_ips[0]['subnet_id']
subnet = self._core_plugin._get_subnet(context, subnet_id)
self._check_for_dup_router_subnet(context, router_id,
port['network_id'],
subnet['id'],
subnet['cidr'])
port.update({'device_id': router_id,
'device_owner': DEVICE_OWNER_ROUTER_INTF})
elif 'subnet_id' in interface_info:
subnet_id = interface_info['subnet_id']
subnet = self._core_plugin._get_subnet(context, subnet_id)
# Ensure the subnet has a gateway
if not subnet['gateway_ip']:
msg = _('Subnet for router interface must have a gateway IP')
def _add_interface_by_port(self, context, router_id, port_id, owner):
with context.session.begin(subtransactions=True):
port = self._core_plugin._get_port(context, port_id)
if port['device_id']:
raise n_exc.PortInUse(net_id=port['network_id'],
port_id=port['id'],
device_id=port['device_id'])
fixed_ips = [ip for ip in port['fixed_ips']]
if len(fixed_ips) != 1:
msg = _('Router port must have exactly one fixed IP')
raise n_exc.BadRequest(resource='router', msg=msg)
subnet_id = fixed_ips[0]['subnet_id']
subnet = self._core_plugin._get_subnet(context, subnet_id)
self._check_for_dup_router_subnet(context, router_id,
subnet['network_id'],
subnet_id,
port['network_id'],
subnet['id'],
subnet['cidr'])
fixed_ip = {'ip_address': subnet['gateway_ip'],
'subnet_id': subnet['id']}
port = self._core_plugin.create_port(context, {
'port':
{'tenant_id': subnet['tenant_id'],
'network_id': subnet['network_id'],
'fixed_ips': [fixed_ip],
'mac_address': attributes.ATTR_NOT_SPECIFIED,
'admin_state_up': True,
'device_id': router_id,
'device_owner': DEVICE_OWNER_ROUTER_INTF,
'name': ''}})
port.update({'device_id': router_id, 'device_owner': owner})
return port
def _add_interface_by_subnet(self, context, router_id, subnet_id, owner):
subnet = self._core_plugin._get_subnet(context, subnet_id)
if not subnet['gateway_ip']:
msg = _('Subnet for router interface must have a gateway IP')
raise n_exc.BadRequest(resource='router', msg=msg)
self._check_for_dup_router_subnet(context, router_id,
subnet['network_id'],
subnet_id,
subnet['cidr'])
fixed_ip = {'ip_address': subnet['gateway_ip'],
'subnet_id': subnet['id']}
return self._core_plugin.create_port(context, {
'port':
{'tenant_id': subnet['tenant_id'],
'network_id': subnet['network_id'],
'fixed_ips': [fixed_ip],
'mac_address': attributes.ATTR_NOT_SPECIFIED,
'admin_state_up': True,
'device_id': router_id,
'device_owner': owner,
'name': ''}})
def add_router_interface(self, context, router_id, interface_info):
add_by_port, add_by_sub = self._validate_interface_info(interface_info)
device_owner = self._get_device_owner(context, router_id)
if add_by_port:
port = self._add_interface_by_port(
context, router_id, interface_info['port_id'], device_owner)
elif add_by_sub:
port = self._add_interface_by_subnet(
context, router_id, interface_info['subnet_id'], device_owner)
self.l3_rpc_notifier.routers_updated(
context, [router_id], 'add_router_interface')
info = {'id': router_id,
'tenant_id': subnet['tenant_id'],
'tenant_id': port['tenant_id'],
'port_id': port['id'],
'subnet_id': port['fixed_ips'][0]['subnet_id']}
notifier_api.notify(context,
@ -459,63 +489,68 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
raise l3.RouterInterfaceInUseByFloatingIP(
router_id=router_id, subnet_id=subnet_id)
def _remove_interface_by_port(self, context, router_id,
port_id, subnet_id, owner):
port_db = self._core_plugin._get_port(context, port_id)
if not (port_db['device_owner'] == owner and
port_db['device_id'] == router_id):
raise l3.RouterInterfaceNotFound(router_id=router_id,
port_id=port_id)
port_subnet_id = port_db['fixed_ips'][0]['subnet_id']
if subnet_id and port_subnet_id != subnet_id:
raise n_exc.SubnetMismatchForPort(
port_id=port_id, subnet_id=subnet_id)
subnet = self._core_plugin._get_subnet(context, port_subnet_id)
self._confirm_router_interface_not_in_use(
context, router_id, port_subnet_id)
self._core_plugin.delete_port(context, port_db['id'],
l3_port_check=False)
return (port_db, subnet)
def _remove_interface_by_subnet(self, context,
router_id, subnet_id, owner):
self._confirm_router_interface_not_in_use(
context, router_id, subnet_id)
subnet = self._core_plugin._get_subnet(context, subnet_id)
try:
rport_qry = context.session.query(models_v2.Port)
ports = rport_qry.filter_by(
device_id=router_id,
device_owner=owner,
network_id=subnet['network_id'])
for p in ports:
if p['fixed_ips'][0]['subnet_id'] == subnet_id:
self._core_plugin.delete_port(context, p['id'],
l3_port_check=False)
return (p, subnet)
except exc.NoResultFound:
pass
raise l3.RouterInterfaceNotFoundForSubnet(router_id=router_id,
subnet_id=subnet_id)
def remove_router_interface(self, context, router_id, interface_info):
if not interface_info:
msg = _("Either subnet_id or port_id must be specified")
raise n_exc.BadRequest(resource='router', msg=msg)
if 'port_id' in interface_info:
port_id = interface_info['port_id']
port_db = self._core_plugin._get_port(context, port_id)
if not (port_db['device_owner'] == DEVICE_OWNER_ROUTER_INTF and
port_db['device_id'] == router_id):
raise l3.RouterInterfaceNotFound(router_id=router_id,
port_id=port_id)
if 'subnet_id' in interface_info:
port_subnet_id = port_db['fixed_ips'][0]['subnet_id']
if port_subnet_id != interface_info['subnet_id']:
raise n_exc.SubnetMismatchForPort(
port_id=port_id,
subnet_id=interface_info['subnet_id'])
subnet_id = port_db['fixed_ips'][0]['subnet_id']
subnet = self._core_plugin._get_subnet(context, subnet_id)
self._confirm_router_interface_not_in_use(
context, router_id, subnet_id)
self._core_plugin.delete_port(context, port_db['id'],
l3_port_check=False)
elif 'subnet_id' in interface_info:
subnet_id = interface_info['subnet_id']
self._confirm_router_interface_not_in_use(context, router_id,
subnet_id)
port_id = interface_info.get('port_id')
subnet_id = interface_info.get('subnet_id')
device_owner = self._get_device_owner(context, router_id)
if port_id:
port, subnet = self._remove_interface_by_port(context, router_id,
port_id, subnet_id,
device_owner)
elif subnet_id:
port, subnet = self._remove_interface_by_subnet(
context, router_id, subnet_id, device_owner)
subnet = self._core_plugin._get_subnet(context, subnet_id)
found = False
try:
rport_qry = context.session.query(models_v2.Port)
ports = rport_qry.filter_by(
device_id=router_id,
device_owner=DEVICE_OWNER_ROUTER_INTF,
network_id=subnet['network_id'])
for p in ports:
if p['fixed_ips'][0]['subnet_id'] == subnet_id:
port_id = p['id']
self._core_plugin.delete_port(context, p['id'],
l3_port_check=False)
found = True
break
except exc.NoResultFound:
pass
if not found:
raise l3.RouterInterfaceNotFoundForSubnet(router_id=router_id,
subnet_id=subnet_id)
self.l3_rpc_notifier.routers_updated(
context, [router_id], 'remove_router_interface')
info = {'id': router_id,
'tenant_id': subnet['tenant_id'],
'port_id': port_id,
'subnet_id': subnet_id}
'tenant_id': port['tenant_id'],
'port_id': port['id'],
'subnet_id': subnet['id']}
notifier_api.notify(context,
notifier_api.publisher_id('network'),
'router.interface.delete',
@ -541,6 +576,12 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
'status': floatingip['status']}
return self._fields(res, fields)
def _get_interface_ports_for_network(self, context, network_id):
router_intf_qry = context.session.query(models_v2.Port)
return router_intf_qry.filter_by(
network_id=network_id,
device_owner=DEVICE_OWNER_ROUTER_INTF)
def _get_router_for_floatingip(self, context, internal_port,
internal_subnet_id,
external_network_id):
@ -551,11 +592,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
'which has no gateway_ip') % internal_subnet_id)
raise n_exc.BadRequest(resource='floatingip', msg=msg)
# find router interface ports on this network
router_intf_qry = context.session.query(models_v2.Port)
router_intf_ports = router_intf_qry.filter_by(
network_id=internal_port['network_id'],
device_owner=DEVICE_OWNER_ROUTER_INTF)
router_intf_ports = self._get_interface_ports_for_network(
context, internal_port['network_id'])
for intf_p in router_intf_ports:
if intf_p['fixed_ips'][0]['subnet_id'] == internal_subnet_id:
@ -820,9 +858,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
deletion checks.
"""
port_db = self._core_plugin._get_port(context, port_id)
if port_db['device_owner'] in [DEVICE_OWNER_ROUTER_INTF,
DEVICE_OWNER_ROUTER_GW,
DEVICE_OWNER_FLOATINGIP]:
if port_db['device_owner'] in self.router_device_owners:
# Raise port in use only if the port has IP addresses
# Otherwise it's a stale port that can be removed
fixed_ips = port_db['fixed_ips']
@ -905,13 +941,13 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
self._populate_subnet_for_ports(context, gw_ports)
return gw_ports
def get_sync_interfaces(self, context, router_ids,
device_owner=DEVICE_OWNER_ROUTER_INTF):
def get_sync_interfaces(self, context, router_ids, device_owners=None):
"""Query router interfaces that relate to list of router_ids."""
device_owners = device_owners or [DEVICE_OWNER_ROUTER_INTF]
if not router_ids:
return []
filters = {'device_id': router_ids,
'device_owner': [device_owner]}
'device_owner': device_owners}
interfaces = self._core_plugin.get_ports(context, filters)
if interfaces:
self._populate_subnet_for_ports(context, interfaces)
@ -979,13 +1015,20 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
router[l3_constants.INTERFACE_KEY] = router_interfaces
return routers_dict.values()
def get_sync_data(self, context, router_ids=None, active=None):
def _get_router_info_list(self, context, router_ids=None, active=None,
device_owners=None):
"""Query routers and their related floating_ips, interfaces."""
with context.session.begin(subtransactions=True):
routers = self._get_sync_routers(context,
router_ids=router_ids,
active=active)
router_ids = [router['id'] for router in routers]
interfaces = self.get_sync_interfaces(
context, router_ids, device_owners)
floating_ips = self._get_sync_floating_ips(context, router_ids)
interfaces = self.get_sync_interfaces(context, router_ids)
return (routers, interfaces, floating_ips)
def get_sync_data(self, context, router_ids=None, active=None):
routers, interfaces, floating_ips = self._get_router_info_list(
context, router_ids=router_ids, active=active)
return self._process_sync_data(routers, interfaces, floating_ips)

View File

@ -351,10 +351,11 @@ class L3NatTestCaseMixin(object):
neutron_context=neutron_context)
def _remove_external_gateway_from_router(self, router_id, network_id,
expected_code=exc.HTTPOk.code):
expected_code=exc.HTTPOk.code,
external_gw_info=None):
return self._update('routers', router_id,
{'router': {'external_gateway_info':
{}}},
external_gw_info}},
expected_code=expected_code)
def _router_interface_action(self, action, router_id, subnet_id, port_id,
@ -615,9 +616,13 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
net_id = (body['router']
['external_gateway_info']['network_id'])
self.assertEqual(net_id, s2['subnet']['network_id'])
# Validate that we can clear the gateway with
# an empty dict, in any other case, we fall back
# on None as default value
self._remove_external_gateway_from_router(
r['router']['id'],
s2['subnet']['network_id'])
s2['subnet']['network_id'],
external_gw_info={})
def test_router_update_gateway_with_existed_floatingip(self):
with self.subnet() as subnet: