Merge "Fix connection between 2 dvr routers"
This commit is contained in:
commit
02edde5cbf
neutron
agent/l3
db
tests
functional/services/l3_router
unit
@ -65,12 +65,21 @@ LOG = logging.getLogger(__name__)
|
||||
SYNC_ROUTERS_MAX_CHUNK_SIZE = 256
|
||||
SYNC_ROUTERS_MIN_CHUNK_SIZE = 32
|
||||
|
||||
# Lower value is higher priority
|
||||
PRIORITY_RPC = 0
|
||||
PRIORITY_SYNC_ROUTERS_TASK = 1
|
||||
PRIORITY_PD_UPDATE = 2
|
||||
# Priorities - lower value is higher priority
|
||||
PRIORITY_RELATED_ROUTER = 0
|
||||
PRIORITY_RPC = 1
|
||||
PRIORITY_SYNC_ROUTERS_TASK = 2
|
||||
PRIORITY_PD_UPDATE = 3
|
||||
|
||||
# Actions
|
||||
DELETE_ROUTER = 1
|
||||
PD_UPDATE = 2
|
||||
DELETE_RELATED_ROUTER = 2
|
||||
ADD_UPDATE_ROUTER = 3
|
||||
ADD_UPDATE_RELATED_ROUTER = 4
|
||||
PD_UPDATE = 5
|
||||
|
||||
RELATED_ACTION_MAP = {DELETE_ROUTER: DELETE_RELATED_ROUTER,
|
||||
ADD_UPDATE_ROUTER: ADD_UPDATE_RELATED_ROUTER}
|
||||
|
||||
|
||||
def log_verbose_exc(message, router_payload):
|
||||
@ -436,7 +445,8 @@ class L3NATAgent(ha.AgentMixin,
|
||||
if isinstance(routers[0], dict):
|
||||
routers = [router['id'] for router in routers]
|
||||
for id in routers:
|
||||
update = queue.ResourceUpdate(id, PRIORITY_RPC)
|
||||
update = queue.ResourceUpdate(
|
||||
id, PRIORITY_RPC, action=ADD_UPDATE_ROUTER)
|
||||
self._queue.add(update)
|
||||
|
||||
def router_removed_from_agent(self, context, payload):
|
||||
@ -537,8 +547,14 @@ class L3NATAgent(ha.AgentMixin,
|
||||
self.pd.process_prefix_update()
|
||||
LOG.debug("Finished a router update for %s", update.id)
|
||||
continue
|
||||
router = update.resource
|
||||
if update.action != DELETE_ROUTER and not router:
|
||||
|
||||
routers = [update.resource] if update.resource else []
|
||||
|
||||
not_delete_no_routers = (update.action != DELETE_ROUTER and
|
||||
not routers)
|
||||
related_action = update.action in (DELETE_RELATED_ROUTER,
|
||||
ADD_UPDATE_RELATED_ROUTER)
|
||||
if not_delete_no_routers or related_action:
|
||||
try:
|
||||
update.timestamp = timeutils.utcnow()
|
||||
routers = self.plugin_rpc.get_routers(self.context,
|
||||
@ -549,10 +565,13 @@ class L3NATAgent(ha.AgentMixin,
|
||||
self._resync_router(update)
|
||||
continue
|
||||
|
||||
if routers:
|
||||
router = routers[0]
|
||||
# For a related action, verify the router is still hosted here,
|
||||
# since it could have just been deleted and we don't want to
|
||||
# add it back.
|
||||
if related_action:
|
||||
routers = [r for r in routers if r['id'] == update.id]
|
||||
|
||||
if not router:
|
||||
if not routers:
|
||||
removed = self._safe_router_removed(update.id)
|
||||
if not removed:
|
||||
self._resync_router(update)
|
||||
@ -565,6 +584,41 @@ class L3NATAgent(ha.AgentMixin,
|
||||
LOG.debug("Finished a router update for %s", update.id)
|
||||
continue
|
||||
|
||||
if not self._process_routers_if_compatible(routers, update):
|
||||
self._resync_router(update)
|
||||
continue
|
||||
|
||||
rp.fetched_and_processed(update.timestamp)
|
||||
LOG.debug("Finished a router update for %s", update.id)
|
||||
|
||||
def _process_routers_if_compatible(self, routers, update):
|
||||
process_result = True
|
||||
for router in routers:
|
||||
if router['id'] != update.id:
|
||||
# Don't do the work here, instead create a new update and
|
||||
# enqueue it, since there could be another thread working
|
||||
# on it already and we don't want to race.
|
||||
new_action = RELATED_ACTION_MAP.get(
|
||||
update.action, ADD_UPDATE_RELATED_ROUTER)
|
||||
new_update = queue.ResourceUpdate(
|
||||
router['id'],
|
||||
priority=PRIORITY_RELATED_ROUTER,
|
||||
action=new_action)
|
||||
self._queue.add(new_update)
|
||||
LOG.debug('Queued a router update for %(router_id)s '
|
||||
'(related router %(related_router_id)s). '
|
||||
'Original event action %(action)s, '
|
||||
'priority %(priority)s. '
|
||||
'New event action %(new_action)s, '
|
||||
'priority %(new_priority)s',
|
||||
{'router_id': router['id'],
|
||||
'related_router_id': update.id,
|
||||
'action': update.action,
|
||||
'priority': update.priority,
|
||||
'new_action': new_update.action,
|
||||
'new_priority': new_update.priority})
|
||||
continue
|
||||
|
||||
try:
|
||||
self._process_router_if_compatible(router)
|
||||
except n_exc.RouterNotCompatibleWithAgent as e:
|
||||
@ -578,11 +632,8 @@ class L3NATAgent(ha.AgentMixin,
|
||||
log_verbose_exc(
|
||||
"Failed to process compatible router: %s" % update.id,
|
||||
router)
|
||||
self._resync_router(update)
|
||||
continue
|
||||
|
||||
LOG.debug("Finished a router update for %s", update.id)
|
||||
rp.fetched_and_processed(update.timestamp)
|
||||
process_result = False
|
||||
return process_result
|
||||
|
||||
def _process_routers_loop(self):
|
||||
LOG.debug("Starting _process_routers_loop")
|
||||
@ -645,6 +696,7 @@ class L3NATAgent(ha.AgentMixin,
|
||||
r['id'],
|
||||
PRIORITY_SYNC_ROUTERS_TASK,
|
||||
resource=r,
|
||||
action=ADD_UPDATE_ROUTER,
|
||||
timestamp=timestamp)
|
||||
self._queue.add(update)
|
||||
except oslo_messaging.MessagingTimeout:
|
||||
|
@ -65,6 +65,9 @@ class DVRResourceOperationHandler(object):
|
||||
necessary.
|
||||
"""
|
||||
|
||||
related_dvr_router_hosts = {}
|
||||
related_dvr_router_routers = {}
|
||||
|
||||
@property
|
||||
def l3plugin(self):
|
||||
return directory.get_plugin(plugin_constants.L3)
|
||||
@ -516,6 +519,33 @@ class DVRResourceOperationHandler(object):
|
||||
return True
|
||||
return False
|
||||
|
||||
@registry.receives(resources.ROUTER_INTERFACE, [events.BEFORE_DELETE])
|
||||
def _cache_related_dvr_routers_info_before_interface_removal(
|
||||
self, resource, event, trigger, context, **kwargs):
|
||||
router_id = kwargs.get("router_id")
|
||||
subnet_id = kwargs.get("subnet_id")
|
||||
|
||||
router = self.l3plugin._get_router(context, router_id)
|
||||
if not router.extra_attributes.distributed:
|
||||
return
|
||||
|
||||
cache_key = (router_id, subnet_id)
|
||||
try:
|
||||
existing_hosts = self.related_dvr_router_hosts[cache_key]
|
||||
except KeyError:
|
||||
existing_hosts = set()
|
||||
other_hosts = set(self._get_other_dvr_hosts(context, router_id))
|
||||
self.related_dvr_router_hosts[cache_key] = existing_hosts | other_hosts
|
||||
|
||||
try:
|
||||
existing_routers = self.related_dvr_router_routers[cache_key]
|
||||
except KeyError:
|
||||
existing_routers = set()
|
||||
other_routers = set(self._get_other_dvr_router_ids_connected_router(
|
||||
context, router_id))
|
||||
self.related_dvr_router_routers[cache_key] = (
|
||||
existing_routers | other_routers)
|
||||
|
||||
@registry.receives(resources.ROUTER_INTERFACE, [events.AFTER_DELETE])
|
||||
@db_api.retry_if_session_inactive()
|
||||
def _cleanup_after_interface_removal(self, resource, event, trigger,
|
||||
@ -535,27 +565,98 @@ class DVRResourceOperationHandler(object):
|
||||
context, router_id)
|
||||
removed_hosts = set(router_hosts_for_removed) - set(router_hosts_after)
|
||||
if removed_hosts:
|
||||
# Get hosts where this router is placed as "related" to other dvr
|
||||
# routers and don't remove it from such hosts
|
||||
related_hosts = self._get_other_dvr_hosts(context, router_id)
|
||||
agents = self.l3plugin.get_l3_agents(
|
||||
context, filters={'host': removed_hosts})
|
||||
bindings = rb_obj.RouterL3AgentBinding.get_objects(
|
||||
context, router_id=router_id)
|
||||
snat_binding = bindings.pop() if bindings else None
|
||||
connected_dvr_routers = set(
|
||||
self.l3plugin._get_other_dvr_router_ids_connected_router(
|
||||
context, router_id))
|
||||
for agent in agents:
|
||||
is_this_snat_agent = (
|
||||
snat_binding and snat_binding.l3_agent_id == agent['id'])
|
||||
if (not is_this_snat_agent and
|
||||
agent['host'] not in related_hosts):
|
||||
self.l3plugin.l3_rpc_notifier.router_removed_from_agent(
|
||||
context, router_id, agent['host'])
|
||||
for connected_router_id in connected_dvr_routers:
|
||||
connected_router_hosts = set(
|
||||
self.l3plugin._get_dvr_hosts_for_router(
|
||||
context, connected_router_id))
|
||||
connected_router_hosts |= set(
|
||||
self._get_other_dvr_hosts(
|
||||
context, connected_router_id))
|
||||
if agent['host'] not in connected_router_hosts:
|
||||
self.l3plugin.l3_rpc_notifier.\
|
||||
router_removed_from_agent(
|
||||
context, connected_router_id,
|
||||
agent['host'])
|
||||
# if subnet_id not in interface_info, request was to remove by port
|
||||
sub_id = (interface_info.get('subnet_id') or
|
||||
port['fixed_ips'][0]['subnet_id'])
|
||||
self._cleanup_related_hosts_after_interface_removal(
|
||||
context, router_id, sub_id)
|
||||
self._cleanup_related_routers_after_interface_removal(
|
||||
context, router_id, sub_id)
|
||||
is_multiple_prefix_csport = (
|
||||
self._check_for_multiprefix_csnat_port_and_update(
|
||||
context, router, port['network_id'], sub_id))
|
||||
if not is_multiple_prefix_csport:
|
||||
# Single prefix port - go ahead and delete the port
|
||||
self.delete_csnat_router_interface_ports(
|
||||
context.elevated(), router, subnet_id=sub_id)
|
||||
|
||||
def _cleanup_related_hosts_after_interface_removal(
|
||||
self, context, router_id, subnet_id):
|
||||
router_hosts = self.l3plugin._get_dvr_hosts_for_router(
|
||||
context, router_id)
|
||||
|
||||
cache_key = (router_id, subnet_id)
|
||||
related_dvr_router_hosts_before = self.related_dvr_router_hosts.pop(
|
||||
cache_key, set())
|
||||
related_dvr_router_hosts_after = set(self._get_other_dvr_hosts(
|
||||
context, router_id))
|
||||
related_dvr_router_hosts_before -= set(router_hosts)
|
||||
related_removed_hosts = (
|
||||
related_dvr_router_hosts_before - related_dvr_router_hosts_after)
|
||||
if related_removed_hosts:
|
||||
agents = self.l3plugin.get_l3_agents(
|
||||
context, filters={'host': related_removed_hosts})
|
||||
bindings = rb_obj.RouterL3AgentBinding.get_objects(
|
||||
context, router_id=router_id)
|
||||
snat_binding = bindings.pop() if bindings else None
|
||||
for agent in agents:
|
||||
is_this_snat_agent = (
|
||||
snat_binding and snat_binding.l3_agent_id == agent['id'])
|
||||
if not is_this_snat_agent:
|
||||
self.l3plugin.l3_rpc_notifier.router_removed_from_agent(
|
||||
context, router_id, agent['host'])
|
||||
# if subnet_id not in interface_info, request was to remove by port
|
||||
sub_id = (interface_info.get('subnet_id') or
|
||||
port['fixed_ips'][0]['subnet_id'])
|
||||
is_multiple_prefix_csport = (
|
||||
self._check_for_multiprefix_csnat_port_and_update(
|
||||
context, router, port['network_id'], sub_id))
|
||||
if not is_multiple_prefix_csport:
|
||||
# Single prefix port - go ahead and delete the port
|
||||
self.delete_csnat_router_interface_ports(
|
||||
context.elevated(), router, subnet_id=sub_id)
|
||||
|
||||
def _cleanup_related_routers_after_interface_removal(
|
||||
self, context, router_id, subnet_id):
|
||||
router_hosts = self.l3plugin._get_dvr_hosts_for_router(
|
||||
context, router_id)
|
||||
|
||||
cache_key = (router_id, subnet_id)
|
||||
related_dvr_routers_before = self.related_dvr_router_routers.pop(
|
||||
cache_key, set())
|
||||
related_dvr_routers_after = set(
|
||||
self._get_other_dvr_router_ids_connected_router(
|
||||
context, router_id))
|
||||
related_routers_to_remove = (
|
||||
related_dvr_routers_before - related_dvr_routers_after)
|
||||
|
||||
for related_router in related_routers_to_remove:
|
||||
related_router_hosts = self.l3plugin._get_dvr_hosts_for_router(
|
||||
context, related_router)
|
||||
hosts_to_remove = set(router_hosts) - set(related_router_hosts)
|
||||
for host in hosts_to_remove:
|
||||
self.l3plugin.l3_rpc_notifier.router_removed_from_agent(
|
||||
context, related_router, host)
|
||||
|
||||
def delete_csnat_router_interface_ports(self, context,
|
||||
router, subnet_id=None):
|
||||
|
@ -20,15 +20,16 @@ import neutron.db.l3_hascheduler_db as l3_ha_sch_db
|
||||
class L3_DVR_HA_scheduler_db_mixin(l3agent_dvr_sch_db.L3_DVRsch_db_mixin,
|
||||
l3_ha_sch_db.L3_HA_scheduler_db_mixin):
|
||||
|
||||
def get_dvr_routers_to_remove(self, context, port_id):
|
||||
def get_dvr_routers_to_remove(self, context, port_id,
|
||||
get_related_hosts_info=True):
|
||||
"""Returns info about which routers should be removed
|
||||
|
||||
In case dvr serviceable port was deleted we need to check
|
||||
if any dvr routers should be removed from l3 agent on port's host
|
||||
"""
|
||||
remove_router_info = super(L3_DVR_HA_scheduler_db_mixin,
|
||||
self).get_dvr_routers_to_remove(context,
|
||||
port_id)
|
||||
remove_router_info = super(
|
||||
L3_DVR_HA_scheduler_db_mixin, self).get_dvr_routers_to_remove(
|
||||
context, port_id, get_related_hosts_info)
|
||||
# Process the router information which was returned to make
|
||||
# sure we don't delete routers which have dvrhs snat bindings.
|
||||
processed_remove_router_info = []
|
||||
|
@ -30,6 +30,7 @@ from neutron.common import utils as n_utils
|
||||
from neutron.db import agentschedulers_db
|
||||
from neutron.db import l3_agentschedulers_db as l3agent_sch_db
|
||||
from neutron.db import l3_dvr_db
|
||||
from neutron.db.models import l3 as l3_models
|
||||
from neutron.db import models_v2
|
||||
from neutron.objects import l3agent as rb_obj
|
||||
from neutron.plugins.ml2 import db as ml2_db
|
||||
@ -130,11 +131,23 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
|
||||
if agent.host == port_host:
|
||||
agent_port_host_match = True
|
||||
if not agent_port_host_match:
|
||||
LOG.debug('DVR: Handle new service port, host %(host)s, '
|
||||
'router ids %(router_ids)s',
|
||||
{'host': port_host, 'router_ids': router_ids})
|
||||
self.l3_rpc_notifier.routers_updated_on_host(
|
||||
context, router_ids, port_host)
|
||||
hosts = set([port_host])
|
||||
for router_id in router_ids:
|
||||
hosts |= set(self.get_hosts_to_notify(context, router_id))
|
||||
|
||||
for host in hosts:
|
||||
updated_routers = set()
|
||||
for router_id in router_ids:
|
||||
LOG.debug('DVR: Handle new service port, host %(host)s, '
|
||||
'router ids %(router_id)s',
|
||||
{'host': host, 'router_id': router_id})
|
||||
if self._check_for_rtr_serviceable_ports(
|
||||
context.elevated(), router_id, host):
|
||||
updated_routers.add(router_id)
|
||||
|
||||
if updated_routers:
|
||||
self.l3_rpc_notifier.routers_updated_on_host(
|
||||
context, updated_routers, host)
|
||||
|
||||
def get_dvr_snat_agent_list(self, context):
|
||||
agent_filters = {'agent_modes': [n_const.L3_AGENT_MODE_DVR_SNAT]}
|
||||
@ -172,7 +185,8 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
|
||||
'for router %s', router_id)
|
||||
return subnet_ids
|
||||
|
||||
def get_dvr_routers_to_remove(self, context, deleted_port):
|
||||
def get_dvr_routers_to_remove(self, context, deleted_port,
|
||||
get_related_hosts_info=True):
|
||||
"""Returns info about which routers should be removed
|
||||
|
||||
In case dvr serviceable port was deleted we need to check
|
||||
@ -186,7 +200,6 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
|
||||
subnet_ids = [ip['subnet_id'] for ip in deleted_port['fixed_ips']]
|
||||
router_ids = self.get_dvr_routers_by_subnet_ids(admin_context,
|
||||
subnet_ids)
|
||||
|
||||
if not router_ids:
|
||||
LOG.debug('No DVR routers for this DVR port %(port)s '
|
||||
'on host %(host)s', {'port': deleted_port['id'],
|
||||
@ -195,38 +208,78 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
|
||||
agent = self._get_agent_by_type_and_host(
|
||||
context, n_const.AGENT_TYPE_L3, port_host)
|
||||
removed_router_info = []
|
||||
# NOTE(Swami): If host has any serviceable ports,
|
||||
# we should not remove the router namespace of the
|
||||
# port as well as the connected routers namespace.
|
||||
# After all serviceable ports in the host for the
|
||||
# connected routers are deleted, then we can remove
|
||||
# the router namespace.
|
||||
host_has_serviceable_port = False
|
||||
for router_id in router_ids:
|
||||
if rb_obj.RouterL3AgentBinding.objects_exist(context,
|
||||
router_id=router_id,
|
||||
l3_agent_id=agent.id):
|
||||
# not removing from the agent hosting SNAT for the router
|
||||
continue
|
||||
subnet_ids = self.get_subnet_ids_on_router(admin_context,
|
||||
router_id)
|
||||
if self._check_dvr_serviceable_ports_on_host(
|
||||
admin_context, port_host, subnet_ids):
|
||||
continue
|
||||
filter_rtr = {'device_id': [router_id],
|
||||
'device_owner':
|
||||
[n_const.DEVICE_OWNER_DVR_INTERFACE]}
|
||||
int_ports = self._core_plugin.get_ports(
|
||||
admin_context, filters=filter_rtr)
|
||||
for port in int_ports:
|
||||
dvr_binding = (ml2_db.
|
||||
get_distributed_port_binding_by_host(
|
||||
context, port['id'], port_host))
|
||||
if dvr_binding:
|
||||
# unbind this port from router
|
||||
dvr_binding['router_id'] = None
|
||||
dvr_binding.update(dvr_binding)
|
||||
|
||||
if self._check_for_rtr_serviceable_ports(
|
||||
admin_context, router_id, port_host):
|
||||
# once we found a serviceable port there is no need to
|
||||
# check further
|
||||
host_has_serviceable_port = True
|
||||
break
|
||||
self._unbind_dvr_port_before_delete(context, router_id, port_host)
|
||||
info = {'router_id': router_id, 'host': port_host,
|
||||
'agent_id': str(agent.id)}
|
||||
removed_router_info.append(info)
|
||||
LOG.debug('Router %(router_id)s on host %(host)s to be deleted',
|
||||
info)
|
||||
# Now collect the connected router info as well to remove
|
||||
# it from the agent, only if there is not a serviceable port.
|
||||
if not host_has_serviceable_port:
|
||||
related_router_ids = set()
|
||||
for router_id in router_ids:
|
||||
connected_dvr_router_ids = set(
|
||||
self._get_other_dvr_router_ids_connected_router(
|
||||
context, router_id))
|
||||
related_router_ids |= connected_dvr_router_ids
|
||||
related_router_ids = [r_id for r_id in related_router_ids
|
||||
if r_id not in list(router_ids)]
|
||||
for router_id in related_router_ids:
|
||||
if self._check_for_rtr_serviceable_ports(
|
||||
admin_context, router_id, port_host):
|
||||
# once we found a serviceable port there is no need to
|
||||
# check further
|
||||
host_has_serviceable_port = True
|
||||
break
|
||||
self._unbind_dvr_port_before_delete(context, router_id,
|
||||
port_host)
|
||||
info = {'router_id': router_id, 'host': port_host,
|
||||
'agent_id': str(agent.id)}
|
||||
removed_router_info.append(info)
|
||||
LOG.debug("Router info to be deleted: %s", removed_router_info)
|
||||
return removed_router_info
|
||||
|
||||
def _check_for_rtr_serviceable_ports(
|
||||
self, admin_context, router_id, port_host):
|
||||
subnet_ids = self.get_subnet_ids_on_router(admin_context,
|
||||
router_id)
|
||||
return self._check_dvr_serviceable_ports_on_host(
|
||||
admin_context, port_host, subnet_ids)
|
||||
|
||||
def _unbind_dvr_port_before_delete(
|
||||
self, context, router_id, port_host):
|
||||
filter_rtr = {'device_id': [router_id],
|
||||
'device_owner':
|
||||
[n_const.DEVICE_OWNER_DVR_INTERFACE]}
|
||||
int_ports = self._core_plugin.get_ports(
|
||||
context.elevated(), filters=filter_rtr)
|
||||
for port in int_ports:
|
||||
dvr_binding = (ml2_db.
|
||||
get_distributed_port_binding_by_host(
|
||||
context, port['id'], port_host))
|
||||
if dvr_binding:
|
||||
# unbind this port from router
|
||||
dvr_binding['router_id'] = None
|
||||
dvr_binding.update(dvr_binding)
|
||||
|
||||
def _get_active_l3_agent_routers_sync_data(self, context, host, agent,
|
||||
router_ids):
|
||||
if extensions.is_extension_supported(
|
||||
@ -241,10 +294,11 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
|
||||
"""Returns all hosts to send notification about router update"""
|
||||
hosts = super(L3_DVRsch_db_mixin, self).get_hosts_to_notify(
|
||||
context, router_id)
|
||||
router = self.get_router(context, router_id)
|
||||
router = self.get_router(context.elevated(), router_id)
|
||||
if router.get('distributed', False):
|
||||
dvr_hosts = self._get_dvr_hosts_for_router(context, router_id)
|
||||
dvr_hosts = set(dvr_hosts) - set(hosts)
|
||||
dvr_hosts |= self._get_other_dvr_hosts(context, router_id)
|
||||
state = agentschedulers_db.get_admin_state_up_filter()
|
||||
agents = self.get_l3_agents(context, active=state,
|
||||
filters={'host': dvr_hosts})
|
||||
@ -264,6 +318,25 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
|
||||
LOG.debug('Hosts for router %s: %s', router_id, hosts)
|
||||
return hosts
|
||||
|
||||
def _get_other_dvr_hosts(self, context, router_id):
|
||||
"""Get a list of hosts where specified DVR router should be hosted
|
||||
|
||||
It will search DVR hosts based on other dvr routers connected to the
|
||||
router.
|
||||
"""
|
||||
dvr_hosts = set()
|
||||
connected_dvr_routers = (
|
||||
self._get_other_dvr_router_ids_connected_router(
|
||||
context, router_id))
|
||||
for dvr_router in connected_dvr_routers:
|
||||
dvr_hosts |= set(
|
||||
self._get_dvr_hosts_for_router(context, dvr_router))
|
||||
|
||||
LOG.debug('Hosts for other DVR routers connected to router '
|
||||
'%(router_id)s: %(dvr_hosts)s',
|
||||
{'router_id': router_id, 'dvr_hosts': dvr_hosts})
|
||||
return dvr_hosts
|
||||
|
||||
def _get_dvr_hosts_for_subnets(self, context, subnet_ids):
|
||||
"""Get a list of hosts with DVR servicable ports on subnet_ids."""
|
||||
Binding = ml2_models.PortBinding
|
||||
@ -309,6 +382,22 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
|
||||
LOG.debug('DVR routers on host %s: %s', host, router_ids)
|
||||
return router_ids
|
||||
|
||||
def _get_other_dvr_router_ids_connected_router(self, context, router_id):
|
||||
# TODO(slaweq): move this method to RouterPort OVO object
|
||||
subnet_ids = self.get_subnet_ids_on_router(context, router_id)
|
||||
RouterPort = l3_models.RouterPort
|
||||
query = context.elevated().session.query(RouterPort.router_id)
|
||||
query = query.join(models_v2.Port)
|
||||
query = query.join(
|
||||
models_v2.Subnet,
|
||||
models_v2.Subnet.network_id == models_v2.Port.network_id)
|
||||
query = query.filter(
|
||||
models_v2.Subnet.id.in_(subnet_ids),
|
||||
RouterPort.port_type == n_const.DEVICE_OWNER_DVR_INTERFACE
|
||||
).distinct()
|
||||
query = query.filter(RouterPort.router_id != router_id)
|
||||
return [item[0] for item in query]
|
||||
|
||||
def _get_router_ids_for_agent(self, context, agent_db, router_ids):
|
||||
result_set = set(super(L3_DVRsch_db_mixin,
|
||||
self)._get_router_ids_for_agent(
|
||||
@ -325,9 +414,10 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
|
||||
[n_const.L3_AGENT_MODE_DVR,
|
||||
n_const.L3_AGENT_MODE_DVR_NO_EXTERNAL,
|
||||
n_const.L3_AGENT_MODE_DVR_SNAT]):
|
||||
dvr_routers = self._get_dvr_router_ids_for_host(context,
|
||||
agent_db['host'])
|
||||
if not router_ids:
|
||||
result_set |= set(self._get_dvr_router_ids_for_host(
|
||||
context, agent_db['host']))
|
||||
result_set |= set(dvr_routers)
|
||||
else:
|
||||
for router_id in (router_ids - result_set):
|
||||
subnet_ids = self.get_subnet_ids_on_router(
|
||||
@ -338,6 +428,11 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
|
||||
list(subnet_ids))):
|
||||
result_set.add(router_id)
|
||||
|
||||
for dvr_router in dvr_routers:
|
||||
result_set |= set(
|
||||
self._get_other_dvr_router_ids_connected_router(
|
||||
context, dvr_router))
|
||||
|
||||
return list(result_set)
|
||||
|
||||
def _check_dvr_serviceable_ports_on_host(self, context, host, subnet_ids):
|
||||
@ -414,6 +509,7 @@ def _notify_l3_agent_new_port(resource, event, trigger, **kwargs):
|
||||
def _notify_port_delete(event, resource, trigger, **kwargs):
|
||||
context = kwargs['context']
|
||||
port = kwargs['port']
|
||||
get_related_hosts_info = kwargs.get("get_related_hosts_info", True)
|
||||
l3plugin = directory.get_plugin(plugin_constants.L3)
|
||||
if port:
|
||||
port_host = port.get(portbindings.HOST_ID)
|
||||
@ -423,7 +519,8 @@ def _notify_port_delete(event, resource, trigger, **kwargs):
|
||||
_dvr_handle_unbound_allowed_addr_pair_del(
|
||||
l3plugin, context, port, address_pair)
|
||||
l3plugin.delete_arp_entry_for_dvr_service_port(context, port)
|
||||
removed_routers = l3plugin.get_dvr_routers_to_remove(context, port)
|
||||
removed_routers = l3plugin.get_dvr_routers_to_remove(
|
||||
context, port, get_related_hosts_info)
|
||||
for info in removed_routers:
|
||||
l3plugin.l3_rpc_notifier.router_removed_from_agent(
|
||||
context, info['router_id'], info['host'])
|
||||
@ -452,12 +549,14 @@ def _notify_l3_agent_port_update(resource, event, trigger, **kwargs):
|
||||
if is_bound_port_moved:
|
||||
removed_routers = l3plugin.get_dvr_routers_to_remove(
|
||||
context,
|
||||
original_port)
|
||||
original_port,
|
||||
get_related_hosts_info=False)
|
||||
if removed_routers:
|
||||
removed_router_args = {
|
||||
'context': context,
|
||||
'port': original_port,
|
||||
'removed_routers': removed_routers,
|
||||
'get_related_hosts_info': False,
|
||||
}
|
||||
_notify_port_delete(
|
||||
event, resource, trigger, **removed_router_args)
|
||||
|
@ -766,11 +766,12 @@ class L3DvrTestCase(L3DvrTestCaseBase):
|
||||
floating_ip = self.l3_plugin.create_floatingip(
|
||||
self.context, {'floatingip': floating_ip})
|
||||
expected_routers_updated_calls = [
|
||||
mock.call(self.context, mock.ANY, HOST1),
|
||||
mock.call(self.context, mock.ANY, HOST2),
|
||||
mock.call(self.context, mock.ANY, 'host0')]
|
||||
mock.call(self.context, mock.ANY, 'host0'),
|
||||
mock.call(self.context, mock.ANY, HOST1),
|
||||
mock.call(self.context, mock.ANY, HOST1),
|
||||
mock.call(self.context, mock.ANY, HOST2)]
|
||||
l3_notifier.routers_updated_on_host.assert_has_calls(
|
||||
expected_routers_updated_calls)
|
||||
expected_routers_updated_calls, any_order=True)
|
||||
self.assertFalse(l3_notifier.routers_updated.called)
|
||||
router_info = (
|
||||
self.l3_plugin.list_active_sync_routers_on_active_l3_agent(
|
||||
@ -1089,7 +1090,7 @@ class L3DvrTestCase(L3DvrTestCaseBase):
|
||||
{'port': {
|
||||
'allowed_address_pairs': allowed_address_pairs}})
|
||||
self.assertEqual(
|
||||
2, l3_notifier.routers_updated_on_host.call_count)
|
||||
3, l3_notifier.routers_updated_on_host.call_count)
|
||||
updated_vm_port1 = self.core_plugin.get_port(
|
||||
self.context, vm_port['id'])
|
||||
updated_vm_port2 = self.core_plugin.get_port(
|
||||
@ -1139,9 +1140,10 @@ class L3DvrTestCase(L3DvrTestCaseBase):
|
||||
expected_routers_updated_calls = [
|
||||
mock.call(self.context, mock.ANY, HOST1),
|
||||
mock.call(self.context, mock.ANY, HOST2),
|
||||
mock.call(self.context, mock.ANY, HOST1),
|
||||
mock.call(self.context, mock.ANY, 'host0')]
|
||||
l3_notifier.routers_updated_on_host.assert_has_calls(
|
||||
expected_routers_updated_calls)
|
||||
expected_routers_updated_calls, any_order=True)
|
||||
self.assertFalse(l3_notifier.routers_updated.called)
|
||||
router_info = (
|
||||
self.l3_plugin.list_active_sync_routers_on_active_l3_agent(
|
||||
|
@ -2472,12 +2472,17 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
agent.fullsync = False
|
||||
agent._process_router_if_compatible = mock.Mock()
|
||||
router_id = _uuid()
|
||||
router = {'id': router_id,
|
||||
'external_gateway_info': {'network_id': 'aaa'}}
|
||||
self.plugin_api.get_routers.return_value = [router]
|
||||
if ext_net_call_failed:
|
||||
agent._process_router_if_compatible.side_effect = (
|
||||
oslo_messaging.MessagingTimeout)
|
||||
agent._queue = mock.Mock()
|
||||
agent._resync_router = mock.Mock()
|
||||
update = mock.Mock()
|
||||
update.id = router_id
|
||||
update.resource = None
|
||||
agent._queue.each_update_to_next_resource.side_effect = [
|
||||
[(None, update)]]
|
||||
@ -2494,6 +2499,8 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
||||
|
||||
def test_process_routers_update_resyncs_failed_router(self):
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
router_id = _uuid()
|
||||
router = {'id': router_id}
|
||||
|
||||
# Attempting to configure the router will fail
|
||||
agent._process_router_if_compatible = mock.MagicMock()
|
||||
@ -2501,9 +2508,9 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
||||
|
||||
# Queue an update from a full sync
|
||||
update = resource_processing_queue.ResourceUpdate(
|
||||
42,
|
||||
router_id,
|
||||
l3_agent.PRIORITY_SYNC_ROUTERS_TASK,
|
||||
resource=mock.Mock(),
|
||||
resource=router,
|
||||
timestamp=timeutils.utcnow())
|
||||
agent._queue.add(update)
|
||||
agent._process_router_update()
|
||||
@ -2555,6 +2562,79 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
||||
def test_process_routers_update_router_deleted_error(self):
|
||||
self._test_process_routers_update_router_deleted(error=True)
|
||||
|
||||
def test_process_routers_if_compatible(self):
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
router = {'id': _uuid()}
|
||||
related_router = {'id': _uuid()}
|
||||
routers = [router, related_router]
|
||||
self.plugin_api.get_routers.return_value = routers
|
||||
update = resource_processing_queue.ResourceUpdate(
|
||||
router['id'], l3_agent.PRIORITY_RPC, resource=router)
|
||||
|
||||
events_queue = []
|
||||
|
||||
def add_mock(update):
|
||||
events_queue.append(update)
|
||||
|
||||
agent._queue = mock.Mock()
|
||||
agent._queue.add.side_effect = add_mock
|
||||
|
||||
with mock.patch.object(
|
||||
agent, "_process_router_if_compatible"
|
||||
) as process_router_if_compatible, mock.patch.object(
|
||||
agent, "_safe_router_removed"
|
||||
) as safe_router_removed:
|
||||
self.assertTrue(
|
||||
agent._process_routers_if_compatible(routers, update))
|
||||
process_router_if_compatible.assert_called_once_with(
|
||||
router)
|
||||
safe_router_removed.assert_not_called()
|
||||
self.assertEqual(1, len(events_queue))
|
||||
self.assertEqual(related_router['id'], events_queue[0].id)
|
||||
self.assertEqual(l3_agent.PRIORITY_RELATED_ROUTER,
|
||||
events_queue[0].priority)
|
||||
|
||||
def test_process_routers_if_compatible_router_not_compatible(self):
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
router = {'id': _uuid()}
|
||||
agent.router_info = [router['id']]
|
||||
self.plugin_api.get_routers.return_value = [router]
|
||||
update = resource_processing_queue.ResourceUpdate(
|
||||
router['id'], l3_agent.PRIORITY_RPC, resource=router)
|
||||
|
||||
with mock.patch.object(
|
||||
agent, "_process_router_if_compatible",
|
||||
side_effect=n_exc.RouterNotCompatibleWithAgent(
|
||||
router_id=router['id'])
|
||||
) as process_router_if_compatible, mock.patch.object(
|
||||
agent, "_safe_router_removed"
|
||||
) as safe_router_removed:
|
||||
self.assertTrue(
|
||||
agent._process_routers_if_compatible([router], update))
|
||||
process_router_if_compatible.assert_called_once_with(
|
||||
router)
|
||||
safe_router_removed.assert_called_once_with(router['id'])
|
||||
|
||||
def test_process_routers_if_compatible_error(self):
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
router = {'id': _uuid()}
|
||||
self.plugin_api.get_routers.return_value = [router]
|
||||
update = resource_processing_queue.ResourceUpdate(
|
||||
router['id'], l3_agent.PRIORITY_RPC, resource=router)
|
||||
|
||||
with mock.patch.object(
|
||||
agent, "_process_router_if_compatible",
|
||||
side_effect=Exception(
|
||||
"Test failure during _process_routers_if_compatible")
|
||||
) as process_router_if_compatible, mock.patch.object(
|
||||
agent, "_safe_router_removed"
|
||||
) as safe_router_removed:
|
||||
self.assertFalse(
|
||||
agent._process_routers_if_compatible([router], update))
|
||||
process_router_if_compatible.assert_called_once_with(
|
||||
router)
|
||||
safe_router_removed.assert_not_called()
|
||||
|
||||
def test_process_ha_dvr_router_if_compatible_no_ha_interface(self):
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
agent.conf.agent_mode = 'dvr_snat'
|
||||
|
@ -1283,16 +1283,24 @@ class L3DvrSchedulerTestCase(L3SchedulerBaseMixin,
|
||||
'.L3AgentNotifyAPI'),\
|
||||
mock.patch.object(
|
||||
self.dut, 'get_l3_agents',
|
||||
return_value=[agent_on_host]) as get_l3_agents:
|
||||
return_value=[agent_on_host]) as get_l3_agents,\
|
||||
mock.patch.object(
|
||||
self.dut, 'get_hosts_to_notify',
|
||||
return_value=['other_host']),\
|
||||
mock.patch.object(
|
||||
self.dut, '_check_for_rtr_serviceable_ports',
|
||||
return_value=True):
|
||||
|
||||
self.dut.dvr_handle_new_service_port(
|
||||
self.adminContext, port)
|
||||
|
||||
get_l3_agents.assert_called_once_with(
|
||||
self.adminContext,
|
||||
filters={'host': [port[portbindings.HOST_ID]]})
|
||||
(self.dut.l3_rpc_notifier.routers_updated_on_host.
|
||||
assert_called_once_with(
|
||||
self.adminContext, {'r1', 'r2'}, 'host1'))
|
||||
self.dut.l3_rpc_notifier.routers_updated_on_host.assert_has_calls(
|
||||
[mock.call(self.adminContext, {'r1', 'r2'}, 'host1'),
|
||||
mock.call(self.adminContext, {'r1', 'r2'}, 'other_host')],
|
||||
any_order=True)
|
||||
self.assertFalse(self.dut.l3_rpc_notifier.routers_updated.called)
|
||||
|
||||
def test_get_dvr_routers_by_subnet_ids(self):
|
||||
|
Loading…
x
Reference in New Issue
Block a user