Fix connection between 2 dvr routers
In case when 2 dvr routers are connected to each other with tenant network, those routers needs to be always deployed on same compute nodes. So this patch changes dvr routers scheduler that it will create dvr router on each host on which there are vms or other dvr routers connected to same subnets. Co-Authored-By: Swaminathan Vasudevan <SVasudevan@suse.com> Closes-Bug: #1786272 Change-Id: I579c2522f8aed2b4388afacba34d9ffdc26708e3
This commit is contained in:
parent
94a5026fb5
commit
5018d70241
@ -65,12 +65,21 @@ LOG = logging.getLogger(__name__)
|
||||
SYNC_ROUTERS_MAX_CHUNK_SIZE = 256
|
||||
SYNC_ROUTERS_MIN_CHUNK_SIZE = 32
|
||||
|
||||
# Lower value is higher priority
|
||||
PRIORITY_RPC = 0
|
||||
PRIORITY_SYNC_ROUTERS_TASK = 1
|
||||
PRIORITY_PD_UPDATE = 2
|
||||
# Priorities - lower value is higher priority
|
||||
PRIORITY_RELATED_ROUTER = 0
|
||||
PRIORITY_RPC = 1
|
||||
PRIORITY_SYNC_ROUTERS_TASK = 2
|
||||
PRIORITY_PD_UPDATE = 3
|
||||
|
||||
# Actions
|
||||
DELETE_ROUTER = 1
|
||||
PD_UPDATE = 2
|
||||
DELETE_RELATED_ROUTER = 2
|
||||
ADD_UPDATE_ROUTER = 3
|
||||
ADD_UPDATE_RELATED_ROUTER = 4
|
||||
PD_UPDATE = 5
|
||||
|
||||
RELATED_ACTION_MAP = {DELETE_ROUTER: DELETE_RELATED_ROUTER,
|
||||
ADD_UPDATE_ROUTER: ADD_UPDATE_RELATED_ROUTER}
|
||||
|
||||
|
||||
def log_verbose_exc(message, router_payload):
|
||||
@ -436,7 +445,8 @@ class L3NATAgent(ha.AgentMixin,
|
||||
if isinstance(routers[0], dict):
|
||||
routers = [router['id'] for router in routers]
|
||||
for id in routers:
|
||||
update = queue.ResourceUpdate(id, PRIORITY_RPC)
|
||||
update = queue.ResourceUpdate(
|
||||
id, PRIORITY_RPC, action=ADD_UPDATE_ROUTER)
|
||||
self._queue.add(update)
|
||||
|
||||
def router_removed_from_agent(self, context, payload):
|
||||
@ -537,8 +547,14 @@ class L3NATAgent(ha.AgentMixin,
|
||||
self.pd.process_prefix_update()
|
||||
LOG.debug("Finished a router update for %s", update.id)
|
||||
continue
|
||||
router = update.resource
|
||||
if update.action != DELETE_ROUTER and not router:
|
||||
|
||||
routers = [update.resource] if update.resource else []
|
||||
|
||||
not_delete_no_routers = (update.action != DELETE_ROUTER and
|
||||
not routers)
|
||||
related_action = update.action in (DELETE_RELATED_ROUTER,
|
||||
ADD_UPDATE_RELATED_ROUTER)
|
||||
if not_delete_no_routers or related_action:
|
||||
try:
|
||||
update.timestamp = timeutils.utcnow()
|
||||
routers = self.plugin_rpc.get_routers(self.context,
|
||||
@ -549,10 +565,13 @@ class L3NATAgent(ha.AgentMixin,
|
||||
self._resync_router(update)
|
||||
continue
|
||||
|
||||
if routers:
|
||||
router = routers[0]
|
||||
# For a related action, verify the router is still hosted here,
|
||||
# since it could have just been deleted and we don't want to
|
||||
# add it back.
|
||||
if related_action:
|
||||
routers = [r for r in routers if r['id'] == update.id]
|
||||
|
||||
if not router:
|
||||
if not routers:
|
||||
removed = self._safe_router_removed(update.id)
|
||||
if not removed:
|
||||
self._resync_router(update)
|
||||
@ -565,6 +584,41 @@ class L3NATAgent(ha.AgentMixin,
|
||||
LOG.debug("Finished a router update for %s", update.id)
|
||||
continue
|
||||
|
||||
if not self._process_routers_if_compatible(routers, update):
|
||||
self._resync_router(update)
|
||||
continue
|
||||
|
||||
rp.fetched_and_processed(update.timestamp)
|
||||
LOG.debug("Finished a router update for %s", update.id)
|
||||
|
||||
def _process_routers_if_compatible(self, routers, update):
|
||||
process_result = True
|
||||
for router in routers:
|
||||
if router['id'] != update.id:
|
||||
# Don't do the work here, instead create a new update and
|
||||
# enqueue it, since there could be another thread working
|
||||
# on it already and we don't want to race.
|
||||
new_action = RELATED_ACTION_MAP.get(
|
||||
update.action, ADD_UPDATE_RELATED_ROUTER)
|
||||
new_update = queue.ResourceUpdate(
|
||||
router['id'],
|
||||
priority=PRIORITY_RELATED_ROUTER,
|
||||
action=new_action)
|
||||
self._queue.add(new_update)
|
||||
LOG.debug('Queued a router update for %(router_id)s '
|
||||
'(related router %(related_router_id)s). '
|
||||
'Original event action %(action)s, '
|
||||
'priority %(priority)s. '
|
||||
'New event action %(new_action)s, '
|
||||
'priority %(new_priority)s',
|
||||
{'router_id': router['id'],
|
||||
'related_router_id': update.id,
|
||||
'action': update.action,
|
||||
'priority': update.priority,
|
||||
'new_action': new_update.action,
|
||||
'new_priority': new_update.priority})
|
||||
continue
|
||||
|
||||
try:
|
||||
self._process_router_if_compatible(router)
|
||||
except n_exc.RouterNotCompatibleWithAgent as e:
|
||||
@ -578,11 +632,8 @@ class L3NATAgent(ha.AgentMixin,
|
||||
log_verbose_exc(
|
||||
"Failed to process compatible router: %s" % update.id,
|
||||
router)
|
||||
self._resync_router(update)
|
||||
continue
|
||||
|
||||
LOG.debug("Finished a router update for %s", update.id)
|
||||
rp.fetched_and_processed(update.timestamp)
|
||||
process_result = False
|
||||
return process_result
|
||||
|
||||
def _process_routers_loop(self):
|
||||
LOG.debug("Starting _process_routers_loop")
|
||||
@ -645,6 +696,7 @@ class L3NATAgent(ha.AgentMixin,
|
||||
r['id'],
|
||||
PRIORITY_SYNC_ROUTERS_TASK,
|
||||
resource=r,
|
||||
action=ADD_UPDATE_ROUTER,
|
||||
timestamp=timestamp)
|
||||
self._queue.add(update)
|
||||
except oslo_messaging.MessagingTimeout:
|
||||
|
@ -66,6 +66,9 @@ class DVRResourceOperationHandler(object):
|
||||
necessary.
|
||||
"""
|
||||
|
||||
related_dvr_router_hosts = {}
|
||||
related_dvr_router_routers = {}
|
||||
|
||||
@property
|
||||
def l3plugin(self):
|
||||
return directory.get_plugin(plugin_constants.L3)
|
||||
@ -517,6 +520,33 @@ class DVRResourceOperationHandler(object):
|
||||
return True
|
||||
return False
|
||||
|
||||
@registry.receives(resources.ROUTER_INTERFACE, [events.BEFORE_DELETE])
|
||||
def _cache_related_dvr_routers_info_before_interface_removal(
|
||||
self, resource, event, trigger, context, **kwargs):
|
||||
router_id = kwargs.get("router_id")
|
||||
subnet_id = kwargs.get("subnet_id")
|
||||
|
||||
router = self.l3plugin._get_router(context, router_id)
|
||||
if not router.extra_attributes.distributed:
|
||||
return
|
||||
|
||||
cache_key = (router_id, subnet_id)
|
||||
try:
|
||||
existing_hosts = self.related_dvr_router_hosts[cache_key]
|
||||
except KeyError:
|
||||
existing_hosts = set()
|
||||
other_hosts = set(self._get_other_dvr_hosts(context, router_id))
|
||||
self.related_dvr_router_hosts[cache_key] = existing_hosts | other_hosts
|
||||
|
||||
try:
|
||||
existing_routers = self.related_dvr_router_routers[cache_key]
|
||||
except KeyError:
|
||||
existing_routers = set()
|
||||
other_routers = set(self._get_other_dvr_router_ids_connected_router(
|
||||
context, router_id))
|
||||
self.related_dvr_router_routers[cache_key] = (
|
||||
existing_routers | other_routers)
|
||||
|
||||
@registry.receives(resources.ROUTER_INTERFACE, [events.AFTER_DELETE])
|
||||
@db_api.retry_if_session_inactive()
|
||||
def _cleanup_after_interface_removal(self, resource, event, trigger,
|
||||
@ -536,27 +566,98 @@ class DVRResourceOperationHandler(object):
|
||||
context, router_id)
|
||||
removed_hosts = set(router_hosts_for_removed) - set(router_hosts_after)
|
||||
if removed_hosts:
|
||||
# Get hosts where this router is placed as "related" to other dvr
|
||||
# routers and don't remove it from such hosts
|
||||
related_hosts = self._get_other_dvr_hosts(context, router_id)
|
||||
agents = self.l3plugin.get_l3_agents(
|
||||
context, filters={'host': removed_hosts})
|
||||
bindings = rb_obj.RouterL3AgentBinding.get_objects(
|
||||
context, router_id=router_id)
|
||||
snat_binding = bindings.pop() if bindings else None
|
||||
connected_dvr_routers = set(
|
||||
self.l3plugin._get_other_dvr_router_ids_connected_router(
|
||||
context, router_id))
|
||||
for agent in agents:
|
||||
is_this_snat_agent = (
|
||||
snat_binding and snat_binding.l3_agent_id == agent['id'])
|
||||
if (not is_this_snat_agent and
|
||||
agent['host'] not in related_hosts):
|
||||
self.l3plugin.l3_rpc_notifier.router_removed_from_agent(
|
||||
context, router_id, agent['host'])
|
||||
for connected_router_id in connected_dvr_routers:
|
||||
connected_router_hosts = set(
|
||||
self.l3plugin._get_dvr_hosts_for_router(
|
||||
context, connected_router_id))
|
||||
connected_router_hosts |= set(
|
||||
self._get_other_dvr_hosts(
|
||||
context, connected_router_id))
|
||||
if agent['host'] not in connected_router_hosts:
|
||||
self.l3plugin.l3_rpc_notifier.\
|
||||
router_removed_from_agent(
|
||||
context, connected_router_id,
|
||||
agent['host'])
|
||||
# if subnet_id not in interface_info, request was to remove by port
|
||||
sub_id = (interface_info.get('subnet_id') or
|
||||
port['fixed_ips'][0]['subnet_id'])
|
||||
self._cleanup_related_hosts_after_interface_removal(
|
||||
context, router_id, sub_id)
|
||||
self._cleanup_related_routers_after_interface_removal(
|
||||
context, router_id, sub_id)
|
||||
is_multiple_prefix_csport = (
|
||||
self._check_for_multiprefix_csnat_port_and_update(
|
||||
context, router, port['network_id'], sub_id))
|
||||
if not is_multiple_prefix_csport:
|
||||
# Single prefix port - go ahead and delete the port
|
||||
self.delete_csnat_router_interface_ports(
|
||||
context.elevated(), router, subnet_id=sub_id)
|
||||
|
||||
def _cleanup_related_hosts_after_interface_removal(
|
||||
self, context, router_id, subnet_id):
|
||||
router_hosts = self.l3plugin._get_dvr_hosts_for_router(
|
||||
context, router_id)
|
||||
|
||||
cache_key = (router_id, subnet_id)
|
||||
related_dvr_router_hosts_before = self.related_dvr_router_hosts.pop(
|
||||
cache_key, set())
|
||||
related_dvr_router_hosts_after = set(self._get_other_dvr_hosts(
|
||||
context, router_id))
|
||||
related_dvr_router_hosts_before -= set(router_hosts)
|
||||
related_removed_hosts = (
|
||||
related_dvr_router_hosts_before - related_dvr_router_hosts_after)
|
||||
if related_removed_hosts:
|
||||
agents = self.l3plugin.get_l3_agents(
|
||||
context, filters={'host': related_removed_hosts})
|
||||
bindings = rb_obj.RouterL3AgentBinding.get_objects(
|
||||
context, router_id=router_id)
|
||||
snat_binding = bindings.pop() if bindings else None
|
||||
for agent in agents:
|
||||
is_this_snat_agent = (
|
||||
snat_binding and snat_binding.l3_agent_id == agent['id'])
|
||||
if not is_this_snat_agent:
|
||||
self.l3plugin.l3_rpc_notifier.router_removed_from_agent(
|
||||
context, router_id, agent['host'])
|
||||
# if subnet_id not in interface_info, request was to remove by port
|
||||
sub_id = (interface_info.get('subnet_id') or
|
||||
port['fixed_ips'][0]['subnet_id'])
|
||||
is_multiple_prefix_csport = (
|
||||
self._check_for_multiprefix_csnat_port_and_update(
|
||||
context, router, port['network_id'], sub_id))
|
||||
if not is_multiple_prefix_csport:
|
||||
# Single prefix port - go ahead and delete the port
|
||||
self.delete_csnat_router_interface_ports(
|
||||
context.elevated(), router, subnet_id=sub_id)
|
||||
|
||||
def _cleanup_related_routers_after_interface_removal(
|
||||
self, context, router_id, subnet_id):
|
||||
router_hosts = self.l3plugin._get_dvr_hosts_for_router(
|
||||
context, router_id)
|
||||
|
||||
cache_key = (router_id, subnet_id)
|
||||
related_dvr_routers_before = self.related_dvr_router_routers.pop(
|
||||
cache_key, set())
|
||||
related_dvr_routers_after = set(
|
||||
self._get_other_dvr_router_ids_connected_router(
|
||||
context, router_id))
|
||||
related_routers_to_remove = (
|
||||
related_dvr_routers_before - related_dvr_routers_after)
|
||||
|
||||
for related_router in related_routers_to_remove:
|
||||
related_router_hosts = self.l3plugin._get_dvr_hosts_for_router(
|
||||
context, related_router)
|
||||
hosts_to_remove = set(router_hosts) - set(related_router_hosts)
|
||||
for host in hosts_to_remove:
|
||||
self.l3plugin.l3_rpc_notifier.router_removed_from_agent(
|
||||
context, related_router, host)
|
||||
|
||||
def delete_csnat_router_interface_ports(self, context,
|
||||
router, subnet_id=None):
|
||||
|
@ -20,15 +20,16 @@ import neutron.db.l3_hascheduler_db as l3_ha_sch_db
|
||||
class L3_DVR_HA_scheduler_db_mixin(l3agent_dvr_sch_db.L3_DVRsch_db_mixin,
|
||||
l3_ha_sch_db.L3_HA_scheduler_db_mixin):
|
||||
|
||||
def get_dvr_routers_to_remove(self, context, port_id):
|
||||
def get_dvr_routers_to_remove(self, context, port_id,
|
||||
get_related_hosts_info=True):
|
||||
"""Returns info about which routers should be removed
|
||||
|
||||
In case dvr serviceable port was deleted we need to check
|
||||
if any dvr routers should be removed from l3 agent on port's host
|
||||
"""
|
||||
remove_router_info = super(L3_DVR_HA_scheduler_db_mixin,
|
||||
self).get_dvr_routers_to_remove(context,
|
||||
port_id)
|
||||
remove_router_info = super(
|
||||
L3_DVR_HA_scheduler_db_mixin, self).get_dvr_routers_to_remove(
|
||||
context, port_id, get_related_hosts_info)
|
||||
# Process the router information which was returned to make
|
||||
# sure we don't delete routers which have dvrhs snat bindings.
|
||||
processed_remove_router_info = []
|
||||
|
@ -30,6 +30,7 @@ from neutron.common import utils as n_utils
|
||||
from neutron.db import agentschedulers_db
|
||||
from neutron.db import l3_agentschedulers_db as l3agent_sch_db
|
||||
from neutron.db import l3_dvr_db
|
||||
from neutron.db.models import l3 as l3_models
|
||||
from neutron.db import models_v2
|
||||
from neutron.objects import l3agent as rb_obj
|
||||
from neutron.plugins.ml2 import db as ml2_db
|
||||
@ -130,11 +131,23 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
|
||||
if agent.host == port_host:
|
||||
agent_port_host_match = True
|
||||
if not agent_port_host_match:
|
||||
LOG.debug('DVR: Handle new service port, host %(host)s, '
|
||||
'router ids %(router_ids)s',
|
||||
{'host': port_host, 'router_ids': router_ids})
|
||||
self.l3_rpc_notifier.routers_updated_on_host(
|
||||
context, router_ids, port_host)
|
||||
hosts = set([port_host])
|
||||
for router_id in router_ids:
|
||||
hosts |= set(self.get_hosts_to_notify(context, router_id))
|
||||
|
||||
for host in hosts:
|
||||
updated_routers = set()
|
||||
for router_id in router_ids:
|
||||
LOG.debug('DVR: Handle new service port, host %(host)s, '
|
||||
'router ids %(router_id)s',
|
||||
{'host': host, 'router_id': router_id})
|
||||
if self._check_for_rtr_serviceable_ports(
|
||||
context.elevated(), router_id, host):
|
||||
updated_routers.add(router_id)
|
||||
|
||||
if updated_routers:
|
||||
self.l3_rpc_notifier.routers_updated_on_host(
|
||||
context, updated_routers, host)
|
||||
|
||||
def get_dvr_snat_agent_list(self, context):
|
||||
agent_filters = {'agent_modes': [n_const.L3_AGENT_MODE_DVR_SNAT]}
|
||||
@ -172,7 +185,8 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
|
||||
'for router %s', router_id)
|
||||
return subnet_ids
|
||||
|
||||
def get_dvr_routers_to_remove(self, context, deleted_port):
|
||||
def get_dvr_routers_to_remove(self, context, deleted_port,
|
||||
get_related_hosts_info=True):
|
||||
"""Returns info about which routers should be removed
|
||||
|
||||
In case dvr serviceable port was deleted we need to check
|
||||
@ -186,7 +200,6 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
|
||||
subnet_ids = [ip['subnet_id'] for ip in deleted_port['fixed_ips']]
|
||||
router_ids = self.get_dvr_routers_by_subnet_ids(admin_context,
|
||||
subnet_ids)
|
||||
|
||||
if not router_ids:
|
||||
LOG.debug('No DVR routers for this DVR port %(port)s '
|
||||
'on host %(host)s', {'port': deleted_port['id'],
|
||||
@ -195,38 +208,78 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
|
||||
agent = self._get_agent_by_type_and_host(
|
||||
context, n_const.AGENT_TYPE_L3, port_host)
|
||||
removed_router_info = []
|
||||
# NOTE(Swami): If host has any serviceable ports,
|
||||
# we should not remove the router namespace of the
|
||||
# port as well as the connected routers namespace.
|
||||
# After all serviceable ports in the host for the
|
||||
# connected routers are deleted, then we can remove
|
||||
# the router namespace.
|
||||
host_has_serviceable_port = False
|
||||
for router_id in router_ids:
|
||||
if rb_obj.RouterL3AgentBinding.objects_exist(context,
|
||||
router_id=router_id,
|
||||
l3_agent_id=agent.id):
|
||||
# not removing from the agent hosting SNAT for the router
|
||||
continue
|
||||
subnet_ids = self.get_subnet_ids_on_router(admin_context,
|
||||
router_id)
|
||||
if self._check_dvr_serviceable_ports_on_host(
|
||||
admin_context, port_host, subnet_ids):
|
||||
continue
|
||||
filter_rtr = {'device_id': [router_id],
|
||||
'device_owner':
|
||||
[n_const.DEVICE_OWNER_DVR_INTERFACE]}
|
||||
int_ports = self._core_plugin.get_ports(
|
||||
admin_context, filters=filter_rtr)
|
||||
for port in int_ports:
|
||||
dvr_binding = (ml2_db.
|
||||
get_distributed_port_binding_by_host(
|
||||
context, port['id'], port_host))
|
||||
if dvr_binding:
|
||||
# unbind this port from router
|
||||
dvr_binding['router_id'] = None
|
||||
dvr_binding.update(dvr_binding)
|
||||
|
||||
if self._check_for_rtr_serviceable_ports(
|
||||
admin_context, router_id, port_host):
|
||||
# once we found a serviceable port there is no need to
|
||||
# check further
|
||||
host_has_serviceable_port = True
|
||||
break
|
||||
self._unbind_dvr_port_before_delete(context, router_id, port_host)
|
||||
info = {'router_id': router_id, 'host': port_host,
|
||||
'agent_id': str(agent.id)}
|
||||
removed_router_info.append(info)
|
||||
LOG.debug('Router %(router_id)s on host %(host)s to be deleted',
|
||||
info)
|
||||
# Now collect the connected router info as well to remove
|
||||
# it from the agent, only if there is not a serviceable port.
|
||||
if not host_has_serviceable_port:
|
||||
related_router_ids = set()
|
||||
for router_id in router_ids:
|
||||
connected_dvr_router_ids = set(
|
||||
self._get_other_dvr_router_ids_connected_router(
|
||||
context, router_id))
|
||||
related_router_ids |= connected_dvr_router_ids
|
||||
related_router_ids = [r_id for r_id in related_router_ids
|
||||
if r_id not in list(router_ids)]
|
||||
for router_id in related_router_ids:
|
||||
if self._check_for_rtr_serviceable_ports(
|
||||
admin_context, router_id, port_host):
|
||||
# once we found a serviceable port there is no need to
|
||||
# check further
|
||||
host_has_serviceable_port = True
|
||||
break
|
||||
self._unbind_dvr_port_before_delete(context, router_id,
|
||||
port_host)
|
||||
info = {'router_id': router_id, 'host': port_host,
|
||||
'agent_id': str(agent.id)}
|
||||
removed_router_info.append(info)
|
||||
LOG.debug("Router info to be deleted: %s", removed_router_info)
|
||||
return removed_router_info
|
||||
|
||||
def _check_for_rtr_serviceable_ports(
|
||||
self, admin_context, router_id, port_host):
|
||||
subnet_ids = self.get_subnet_ids_on_router(admin_context,
|
||||
router_id)
|
||||
return self._check_dvr_serviceable_ports_on_host(
|
||||
admin_context, port_host, subnet_ids)
|
||||
|
||||
def _unbind_dvr_port_before_delete(
|
||||
self, context, router_id, port_host):
|
||||
filter_rtr = {'device_id': [router_id],
|
||||
'device_owner':
|
||||
[n_const.DEVICE_OWNER_DVR_INTERFACE]}
|
||||
int_ports = self._core_plugin.get_ports(
|
||||
context.elevated(), filters=filter_rtr)
|
||||
for port in int_ports:
|
||||
dvr_binding = (ml2_db.
|
||||
get_distributed_port_binding_by_host(
|
||||
context, port['id'], port_host))
|
||||
if dvr_binding:
|
||||
# unbind this port from router
|
||||
dvr_binding['router_id'] = None
|
||||
dvr_binding.update(dvr_binding)
|
||||
|
||||
def _get_active_l3_agent_routers_sync_data(self, context, host, agent,
|
||||
router_ids):
|
||||
if extensions.is_extension_supported(
|
||||
@ -241,10 +294,11 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
|
||||
"""Returns all hosts to send notification about router update"""
|
||||
hosts = super(L3_DVRsch_db_mixin, self).get_hosts_to_notify(
|
||||
context, router_id)
|
||||
router = self.get_router(context, router_id)
|
||||
router = self.get_router(context.elevated(), router_id)
|
||||
if router.get('distributed', False):
|
||||
dvr_hosts = self._get_dvr_hosts_for_router(context, router_id)
|
||||
dvr_hosts = set(dvr_hosts) - set(hosts)
|
||||
dvr_hosts |= self._get_other_dvr_hosts(context, router_id)
|
||||
state = agentschedulers_db.get_admin_state_up_filter()
|
||||
agents = self.get_l3_agents(context, active=state,
|
||||
filters={'host': dvr_hosts})
|
||||
@ -264,6 +318,25 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
|
||||
LOG.debug('Hosts for router %s: %s', router_id, hosts)
|
||||
return hosts
|
||||
|
||||
def _get_other_dvr_hosts(self, context, router_id):
|
||||
"""Get a list of hosts where specified DVR router should be hosted
|
||||
|
||||
It will search DVR hosts based on other dvr routers connected to the
|
||||
router.
|
||||
"""
|
||||
dvr_hosts = set()
|
||||
connected_dvr_routers = (
|
||||
self._get_other_dvr_router_ids_connected_router(
|
||||
context, router_id))
|
||||
for dvr_router in connected_dvr_routers:
|
||||
dvr_hosts |= set(
|
||||
self._get_dvr_hosts_for_router(context, dvr_router))
|
||||
|
||||
LOG.debug('Hosts for other DVR routers connected to router '
|
||||
'%(router_id)s: %(dvr_hosts)s',
|
||||
{'router_id': router_id, 'dvr_hosts': dvr_hosts})
|
||||
return dvr_hosts
|
||||
|
||||
def _get_dvr_hosts_for_subnets(self, context, subnet_ids):
|
||||
"""Get a list of hosts with DVR servicable ports on subnet_ids."""
|
||||
Binding = ml2_models.PortBinding
|
||||
@ -309,6 +382,22 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
|
||||
LOG.debug('DVR routers on host %s: %s', host, router_ids)
|
||||
return router_ids
|
||||
|
||||
def _get_other_dvr_router_ids_connected_router(self, context, router_id):
|
||||
# TODO(slaweq): move this method to RouterPort OVO object
|
||||
subnet_ids = self.get_subnet_ids_on_router(context, router_id)
|
||||
RouterPort = l3_models.RouterPort
|
||||
query = context.elevated().session.query(RouterPort.router_id)
|
||||
query = query.join(models_v2.Port)
|
||||
query = query.join(
|
||||
models_v2.Subnet,
|
||||
models_v2.Subnet.network_id == models_v2.Port.network_id)
|
||||
query = query.filter(
|
||||
models_v2.Subnet.id.in_(subnet_ids),
|
||||
RouterPort.port_type == n_const.DEVICE_OWNER_DVR_INTERFACE
|
||||
).distinct()
|
||||
query = query.filter(RouterPort.router_id != router_id)
|
||||
return [item[0] for item in query]
|
||||
|
||||
def _get_router_ids_for_agent(self, context, agent_db, router_ids):
|
||||
result_set = set(super(L3_DVRsch_db_mixin,
|
||||
self)._get_router_ids_for_agent(
|
||||
@ -325,9 +414,10 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
|
||||
[n_const.L3_AGENT_MODE_DVR,
|
||||
n_const.L3_AGENT_MODE_DVR_NO_EXTERNAL,
|
||||
n_const.L3_AGENT_MODE_DVR_SNAT]):
|
||||
dvr_routers = self._get_dvr_router_ids_for_host(context,
|
||||
agent_db['host'])
|
||||
if not router_ids:
|
||||
result_set |= set(self._get_dvr_router_ids_for_host(
|
||||
context, agent_db['host']))
|
||||
result_set |= set(dvr_routers)
|
||||
else:
|
||||
for router_id in (router_ids - result_set):
|
||||
subnet_ids = self.get_subnet_ids_on_router(
|
||||
@ -338,6 +428,11 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
|
||||
list(subnet_ids))):
|
||||
result_set.add(router_id)
|
||||
|
||||
for dvr_router in dvr_routers:
|
||||
result_set |= set(
|
||||
self._get_other_dvr_router_ids_connected_router(
|
||||
context, dvr_router))
|
||||
|
||||
return list(result_set)
|
||||
|
||||
def _check_dvr_serviceable_ports_on_host(self, context, host, subnet_ids):
|
||||
@ -414,6 +509,7 @@ def _notify_l3_agent_new_port(resource, event, trigger, **kwargs):
|
||||
def _notify_port_delete(event, resource, trigger, **kwargs):
|
||||
context = kwargs['context']
|
||||
port = kwargs['port']
|
||||
get_related_hosts_info = kwargs.get("get_related_hosts_info", True)
|
||||
l3plugin = directory.get_plugin(plugin_constants.L3)
|
||||
if port:
|
||||
port_host = port.get(portbindings.HOST_ID)
|
||||
@ -423,7 +519,8 @@ def _notify_port_delete(event, resource, trigger, **kwargs):
|
||||
_dvr_handle_unbound_allowed_addr_pair_del(
|
||||
l3plugin, context, port, address_pair)
|
||||
l3plugin.delete_arp_entry_for_dvr_service_port(context, port)
|
||||
removed_routers = l3plugin.get_dvr_routers_to_remove(context, port)
|
||||
removed_routers = l3plugin.get_dvr_routers_to_remove(
|
||||
context, port, get_related_hosts_info)
|
||||
for info in removed_routers:
|
||||
l3plugin.l3_rpc_notifier.router_removed_from_agent(
|
||||
context, info['router_id'], info['host'])
|
||||
@ -452,12 +549,14 @@ def _notify_l3_agent_port_update(resource, event, trigger, **kwargs):
|
||||
if is_bound_port_moved:
|
||||
removed_routers = l3plugin.get_dvr_routers_to_remove(
|
||||
context,
|
||||
original_port)
|
||||
original_port,
|
||||
get_related_hosts_info=False)
|
||||
if removed_routers:
|
||||
removed_router_args = {
|
||||
'context': context,
|
||||
'port': original_port,
|
||||
'removed_routers': removed_routers,
|
||||
'get_related_hosts_info': False,
|
||||
}
|
||||
_notify_port_delete(
|
||||
event, resource, trigger, **removed_router_args)
|
||||
|
@ -766,11 +766,12 @@ class L3DvrTestCase(L3DvrTestCaseBase):
|
||||
floating_ip = self.l3_plugin.create_floatingip(
|
||||
self.context, {'floatingip': floating_ip})
|
||||
expected_routers_updated_calls = [
|
||||
mock.call(self.context, mock.ANY, HOST1),
|
||||
mock.call(self.context, mock.ANY, HOST2),
|
||||
mock.call(self.context, mock.ANY, 'host0')]
|
||||
mock.call(self.context, mock.ANY, 'host0'),
|
||||
mock.call(self.context, mock.ANY, HOST1),
|
||||
mock.call(self.context, mock.ANY, HOST1),
|
||||
mock.call(self.context, mock.ANY, HOST2)]
|
||||
l3_notifier.routers_updated_on_host.assert_has_calls(
|
||||
expected_routers_updated_calls)
|
||||
expected_routers_updated_calls, any_order=True)
|
||||
self.assertFalse(l3_notifier.routers_updated.called)
|
||||
router_info = (
|
||||
self.l3_plugin.list_active_sync_routers_on_active_l3_agent(
|
||||
@ -1089,7 +1090,7 @@ class L3DvrTestCase(L3DvrTestCaseBase):
|
||||
{'port': {
|
||||
'allowed_address_pairs': allowed_address_pairs}})
|
||||
self.assertEqual(
|
||||
2, l3_notifier.routers_updated_on_host.call_count)
|
||||
3, l3_notifier.routers_updated_on_host.call_count)
|
||||
updated_vm_port1 = self.core_plugin.get_port(
|
||||
self.context, vm_port['id'])
|
||||
updated_vm_port2 = self.core_plugin.get_port(
|
||||
@ -1139,9 +1140,10 @@ class L3DvrTestCase(L3DvrTestCaseBase):
|
||||
expected_routers_updated_calls = [
|
||||
mock.call(self.context, mock.ANY, HOST1),
|
||||
mock.call(self.context, mock.ANY, HOST2),
|
||||
mock.call(self.context, mock.ANY, HOST1),
|
||||
mock.call(self.context, mock.ANY, 'host0')]
|
||||
l3_notifier.routers_updated_on_host.assert_has_calls(
|
||||
expected_routers_updated_calls)
|
||||
expected_routers_updated_calls, any_order=True)
|
||||
self.assertFalse(l3_notifier.routers_updated.called)
|
||||
router_info = (
|
||||
self.l3_plugin.list_active_sync_routers_on_active_l3_agent(
|
||||
|
@ -2472,12 +2472,17 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
agent.fullsync = False
|
||||
agent._process_router_if_compatible = mock.Mock()
|
||||
router_id = _uuid()
|
||||
router = {'id': router_id,
|
||||
'external_gateway_info': {'network_id': 'aaa'}}
|
||||
self.plugin_api.get_routers.return_value = [router]
|
||||
if ext_net_call_failed:
|
||||
agent._process_router_if_compatible.side_effect = (
|
||||
oslo_messaging.MessagingTimeout)
|
||||
agent._queue = mock.Mock()
|
||||
agent._resync_router = mock.Mock()
|
||||
update = mock.Mock()
|
||||
update.id = router_id
|
||||
update.resource = None
|
||||
agent._queue.each_update_to_next_resource.side_effect = [
|
||||
[(None, update)]]
|
||||
@ -2494,6 +2499,8 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
||||
|
||||
def test_process_routers_update_resyncs_failed_router(self):
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
router_id = _uuid()
|
||||
router = {'id': router_id}
|
||||
|
||||
# Attempting to configure the router will fail
|
||||
agent._process_router_if_compatible = mock.MagicMock()
|
||||
@ -2501,9 +2508,9 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
||||
|
||||
# Queue an update from a full sync
|
||||
update = resource_processing_queue.ResourceUpdate(
|
||||
42,
|
||||
router_id,
|
||||
l3_agent.PRIORITY_SYNC_ROUTERS_TASK,
|
||||
resource=mock.Mock(),
|
||||
resource=router,
|
||||
timestamp=timeutils.utcnow())
|
||||
agent._queue.add(update)
|
||||
agent._process_router_update()
|
||||
@ -2555,6 +2562,79 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
||||
def test_process_routers_update_router_deleted_error(self):
|
||||
self._test_process_routers_update_router_deleted(error=True)
|
||||
|
||||
def test_process_routers_if_compatible(self):
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
router = {'id': _uuid()}
|
||||
related_router = {'id': _uuid()}
|
||||
routers = [router, related_router]
|
||||
self.plugin_api.get_routers.return_value = routers
|
||||
update = resource_processing_queue.ResourceUpdate(
|
||||
router['id'], l3_agent.PRIORITY_RPC, resource=router)
|
||||
|
||||
events_queue = []
|
||||
|
||||
def add_mock(update):
|
||||
events_queue.append(update)
|
||||
|
||||
agent._queue = mock.Mock()
|
||||
agent._queue.add.side_effect = add_mock
|
||||
|
||||
with mock.patch.object(
|
||||
agent, "_process_router_if_compatible"
|
||||
) as process_router_if_compatible, mock.patch.object(
|
||||
agent, "_safe_router_removed"
|
||||
) as safe_router_removed:
|
||||
self.assertTrue(
|
||||
agent._process_routers_if_compatible(routers, update))
|
||||
process_router_if_compatible.assert_called_once_with(
|
||||
router)
|
||||
safe_router_removed.assert_not_called()
|
||||
self.assertEqual(1, len(events_queue))
|
||||
self.assertEqual(related_router['id'], events_queue[0].id)
|
||||
self.assertEqual(l3_agent.PRIORITY_RELATED_ROUTER,
|
||||
events_queue[0].priority)
|
||||
|
||||
def test_process_routers_if_compatible_router_not_compatible(self):
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
router = {'id': _uuid()}
|
||||
agent.router_info = [router['id']]
|
||||
self.plugin_api.get_routers.return_value = [router]
|
||||
update = resource_processing_queue.ResourceUpdate(
|
||||
router['id'], l3_agent.PRIORITY_RPC, resource=router)
|
||||
|
||||
with mock.patch.object(
|
||||
agent, "_process_router_if_compatible",
|
||||
side_effect=n_exc.RouterNotCompatibleWithAgent(
|
||||
router_id=router['id'])
|
||||
) as process_router_if_compatible, mock.patch.object(
|
||||
agent, "_safe_router_removed"
|
||||
) as safe_router_removed:
|
||||
self.assertTrue(
|
||||
agent._process_routers_if_compatible([router], update))
|
||||
process_router_if_compatible.assert_called_once_with(
|
||||
router)
|
||||
safe_router_removed.assert_called_once_with(router['id'])
|
||||
|
||||
def test_process_routers_if_compatible_error(self):
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
router = {'id': _uuid()}
|
||||
self.plugin_api.get_routers.return_value = [router]
|
||||
update = resource_processing_queue.ResourceUpdate(
|
||||
router['id'], l3_agent.PRIORITY_RPC, resource=router)
|
||||
|
||||
with mock.patch.object(
|
||||
agent, "_process_router_if_compatible",
|
||||
side_effect=Exception(
|
||||
"Test failure during _process_routers_if_compatible")
|
||||
) as process_router_if_compatible, mock.patch.object(
|
||||
agent, "_safe_router_removed"
|
||||
) as safe_router_removed:
|
||||
self.assertFalse(
|
||||
agent._process_routers_if_compatible([router], update))
|
||||
process_router_if_compatible.assert_called_once_with(
|
||||
router)
|
||||
safe_router_removed.assert_not_called()
|
||||
|
||||
def test_process_ha_dvr_router_if_compatible_no_ha_interface(self):
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
agent.conf.agent_mode = 'dvr_snat'
|
||||
|
@ -1283,16 +1283,24 @@ class L3DvrSchedulerTestCase(L3SchedulerBaseMixin,
|
||||
'.L3AgentNotifyAPI'),\
|
||||
mock.patch.object(
|
||||
self.dut, 'get_l3_agents',
|
||||
return_value=[agent_on_host]) as get_l3_agents:
|
||||
return_value=[agent_on_host]) as get_l3_agents,\
|
||||
mock.patch.object(
|
||||
self.dut, 'get_hosts_to_notify',
|
||||
return_value=['other_host']),\
|
||||
mock.patch.object(
|
||||
self.dut, '_check_for_rtr_serviceable_ports',
|
||||
return_value=True):
|
||||
|
||||
self.dut.dvr_handle_new_service_port(
|
||||
self.adminContext, port)
|
||||
|
||||
get_l3_agents.assert_called_once_with(
|
||||
self.adminContext,
|
||||
filters={'host': [port[portbindings.HOST_ID]]})
|
||||
(self.dut.l3_rpc_notifier.routers_updated_on_host.
|
||||
assert_called_once_with(
|
||||
self.adminContext, {'r1', 'r2'}, 'host1'))
|
||||
self.dut.l3_rpc_notifier.routers_updated_on_host.assert_has_calls(
|
||||
[mock.call(self.adminContext, {'r1', 'r2'}, 'host1'),
|
||||
mock.call(self.adminContext, {'r1', 'r2'}, 'other_host')],
|
||||
any_order=True)
|
||||
self.assertFalse(self.dut.l3_rpc_notifier.routers_updated.called)
|
||||
|
||||
def test_get_dvr_routers_by_subnet_ids(self):
|
||||
|
Loading…
Reference in New Issue
Block a user