Introduce new query to return all hosts for DVR router
When we remove explicit binding of dvr routers to compute nodes we'll need a way to know all hosts where a dvr router should be hosted in order to send notifications. This patch adds such a query and updates l3 rpc notifier to use it. Partially implements blueprint improve-dvr-l3-agent-binding Change-Id: Ic6680bb42455189f14c4c913b1d4adeebda83180
This commit is contained in:
parent
2c599814fb
commit
ea6ed6ab3a
|
@ -23,7 +23,6 @@ from neutron.common import constants
|
|||
from neutron.common import rpc as n_rpc
|
||||
from neutron.common import topics
|
||||
from neutron.common import utils
|
||||
from neutron.db import agentschedulers_db
|
||||
from neutron import manager
|
||||
from neutron.plugins.common import constants as service_constants
|
||||
|
||||
|
@ -54,22 +53,18 @@ class L3AgentNotifyAPI(object):
|
|||
adminContext = context if context.is_admin else context.elevated()
|
||||
plugin = manager.NeutronManager.get_service_plugins().get(
|
||||
service_constants.L3_ROUTER_NAT)
|
||||
state = agentschedulers_db.get_admin_state_up_filter()
|
||||
for router_id in router_ids:
|
||||
l3_agents = plugin.get_l3_agents_hosting_routers(
|
||||
adminContext, [router_id],
|
||||
admin_state_up=state,
|
||||
active=True)
|
||||
hosts = plugin.get_hosts_to_notify(adminContext, router_id)
|
||||
if shuffle_agents:
|
||||
random.shuffle(l3_agents)
|
||||
for l3_agent in l3_agents:
|
||||
random.shuffle(hosts)
|
||||
for host in hosts:
|
||||
LOG.debug('Notify agent at %(topic)s.%(host)s the message '
|
||||
'%(method)s',
|
||||
{'topic': l3_agent.topic,
|
||||
'host': l3_agent.host,
|
||||
{'topic': topics.L3_AGENT,
|
||||
'host': host,
|
||||
'method': method})
|
||||
cctxt = self.client.prepare(topic=l3_agent.topic,
|
||||
server=l3_agent.host,
|
||||
cctxt = self.client.prepare(topic=topics.L3_AGENT,
|
||||
server=host,
|
||||
version='1.1')
|
||||
cctxt.cast(context, method, routers=[router_id])
|
||||
|
||||
|
@ -82,22 +77,17 @@ class L3AgentNotifyAPI(object):
|
|||
context or context.elevated())
|
||||
plugin = manager.NeutronManager.get_service_plugins().get(
|
||||
service_constants.L3_ROUTER_NAT)
|
||||
state = agentschedulers_db.get_admin_state_up_filter()
|
||||
l3_agents = (plugin.
|
||||
get_l3_agents_hosting_routers(adminContext,
|
||||
[router_id],
|
||||
admin_state_up=state,
|
||||
active=True))
|
||||
hosts = plugin.get_hosts_to_notify(adminContext, router_id)
|
||||
# TODO(murali): replace cast with fanout to avoid performance
|
||||
# issues at greater scale.
|
||||
for l3_agent in l3_agents:
|
||||
log_topic = '%s.%s' % (l3_agent.topic, l3_agent.host)
|
||||
for host in hosts:
|
||||
log_topic = '%s.%s' % (topics.L3_AGENT, host)
|
||||
LOG.debug('Casting message %(method)s with topic %(topic)s',
|
||||
{'topic': log_topic, 'method': method})
|
||||
dvr_arptable = {'router_id': router_id,
|
||||
'arp_table': data}
|
||||
cctxt = self.client.prepare(topic=l3_agent.topic,
|
||||
server=l3_agent.host,
|
||||
cctxt = self.client.prepare(topic=topics.L3_AGENT,
|
||||
server=host,
|
||||
version='1.2')
|
||||
cctxt.cast(context, method, payload=dvr_arptable)
|
||||
|
||||
|
|
|
@ -555,6 +555,13 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
|
|||
res = query.filter(agents_db.Agent.id.in_(agent_ids)).first()
|
||||
return res[0]
|
||||
|
||||
def get_hosts_to_notify(self, context, router_id):
|
||||
"""Returns all hosts to send notification about router update"""
|
||||
state = agentschedulers_db.get_admin_state_up_filter()
|
||||
agents = self.get_l3_agents_hosting_routers(
|
||||
context, [router_id], admin_state_up=state, active=True)
|
||||
return [a.host for a in agents]
|
||||
|
||||
|
||||
class AZL3AgentSchedulerDbMixin(L3AgentSchedulerDbMixin,
|
||||
router_az.RouterAvailabilityZonePluginBase):
|
||||
|
|
|
@ -18,6 +18,7 @@ import random
|
|||
from oslo_db import exception as db_exc
|
||||
from oslo_log import log as logging
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy import or_
|
||||
from sqlalchemy import orm
|
||||
from sqlalchemy.orm import joinedload
|
||||
|
||||
|
@ -28,6 +29,7 @@ from neutron.callbacks import resources
|
|||
from neutron.common import constants as n_const
|
||||
from neutron.common import utils as n_utils
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import agentschedulers_db
|
||||
from neutron.db import l3_agentschedulers_db as l3agent_sch_db
|
||||
from neutron.db import model_base
|
||||
from neutron.db import models_v2
|
||||
|
@ -36,6 +38,7 @@ from neutron.extensions import portbindings
|
|||
from neutron import manager
|
||||
from neutron.plugins.common import constants as service_constants
|
||||
from neutron.plugins.ml2 import db as ml2_db
|
||||
from neutron.plugins.ml2 import models as ml2_models
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
@ -114,6 +117,9 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
|
|||
|
||||
ips = port['fixed_ips']
|
||||
router_ids = self.get_dvr_routers_by_portid(context, port['id'], ips)
|
||||
if not router_ids:
|
||||
return
|
||||
|
||||
for router_id in router_ids:
|
||||
if not self.check_l3_agent_router_binding(
|
||||
context, router_id, l3_agent_on_host['id']):
|
||||
|
@ -435,6 +441,46 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
|
|||
self).remove_router_from_l3_agent(
|
||||
context, agent_id, router_id)
|
||||
|
||||
def get_hosts_to_notify(self, context, router_id):
|
||||
"""Returns all hosts to send notification about router update"""
|
||||
hosts = super(L3_DVRsch_db_mixin, self).get_hosts_to_notify(
|
||||
context, router_id)
|
||||
router = self.get_router(context, router_id)
|
||||
if router.get('distributed', False):
|
||||
dvr_hosts = self._get_dvr_hosts_for_router(context, router_id)
|
||||
dvr_hosts = set(dvr_hosts) - set(hosts)
|
||||
state = agentschedulers_db.get_admin_state_up_filter()
|
||||
agents = self.get_l3_agents(context, active=state,
|
||||
filters={'host': dvr_hosts})
|
||||
hosts += [a.host for a in agents]
|
||||
|
||||
return hosts
|
||||
|
||||
def _get_dvr_hosts_for_router(self, context, router_id):
|
||||
"""Get a list of hosts where specified DVR router should be hosted
|
||||
|
||||
It will first get IDs of all subnets connected to the router and then
|
||||
get a set of hosts where all dvr serviceable ports on those subnets
|
||||
are bound
|
||||
"""
|
||||
subnet_ids = self.get_subnet_ids_on_router(context, router_id)
|
||||
Binding = ml2_models.PortBinding
|
||||
Port = models_v2.Port
|
||||
IPAllocation = models_v2.IPAllocation
|
||||
|
||||
query = context.session.query(Binding.host).distinct()
|
||||
query = query.join(Binding.port)
|
||||
query = query.join(Port.fixed_ips)
|
||||
query = query.filter(IPAllocation.subnet_id.in_(subnet_ids))
|
||||
owner_filter = or_(
|
||||
Port.device_owner.startswith(n_const.DEVICE_OWNER_COMPUTE_PREFIX),
|
||||
Port.device_owner.in_(
|
||||
n_utils.get_other_dvr_serviced_device_owners()))
|
||||
query = query.filter(owner_filter)
|
||||
hosts = [item[0] for item in query]
|
||||
LOG.debug('Hosts for router %s: %s', router_id, hosts)
|
||||
return hosts
|
||||
|
||||
|
||||
def _notify_l3_agent_new_port(resource, event, trigger, **kwargs):
|
||||
LOG.debug('Received %(resource)s %(event)s', {
|
||||
|
|
|
@ -16,6 +16,7 @@ import mock
|
|||
|
||||
from neutron.api.v2 import attributes
|
||||
from neutron.common import constants
|
||||
from neutron.common import topics
|
||||
from neutron import context
|
||||
from neutron.extensions import external_net
|
||||
from neutron.extensions import portbindings
|
||||
|
@ -559,3 +560,97 @@ class L3DvrTestCase(ml2_test_base.ML2TestFramework):
|
|||
def test_admin_router_remove_from_agent_on_vm_port_deletion(self):
|
||||
self._test_router_remove_from_agent_on_vm_port_deletion(
|
||||
non_admin_port=True)
|
||||
|
||||
def test_dvr_router_notifications(self):
|
||||
"""Check that notifications go to the right hosts in different
|
||||
conditions
|
||||
"""
|
||||
# register l3 agents in dvr mode in addition to existing dvr_snat agent
|
||||
HOST1, HOST2, HOST3 = 'host1', 'host2', 'host3'
|
||||
for host in [HOST1, HOST2, HOST3]:
|
||||
helpers.register_l3_agent(
|
||||
host=host, agent_mode=constants.L3_AGENT_MODE_DVR)
|
||||
|
||||
router = self._create_router()
|
||||
arg_list = (portbindings.HOST_ID,)
|
||||
with self.subnet() as ext_subnet,\
|
||||
self.subnet(cidr='20.0.0.0/24') as subnet1,\
|
||||
self.subnet(cidr='30.0.0.0/24') as subnet2,\
|
||||
self.subnet(cidr='40.0.0.0/24') as subnet3,\
|
||||
self.port(subnet=subnet1,
|
||||
device_owner=DEVICE_OWNER_COMPUTE,
|
||||
arg_list=arg_list,
|
||||
**{portbindings.HOST_ID: HOST1}),\
|
||||
self.port(subnet=subnet2,
|
||||
device_owner=constants.DEVICE_OWNER_DHCP,
|
||||
arg_list=arg_list,
|
||||
**{portbindings.HOST_ID: HOST2}),\
|
||||
self.port(subnet=subnet3,
|
||||
device_owner=constants.DEVICE_OWNER_NEUTRON_PREFIX,
|
||||
arg_list=arg_list,
|
||||
**{portbindings.HOST_ID: HOST3}):
|
||||
# make net external
|
||||
ext_net_id = ext_subnet['subnet']['network_id']
|
||||
self._update('networks', ext_net_id,
|
||||
{'network': {external_net.EXTERNAL: True}})
|
||||
|
||||
with mock.patch.object(self.l3_plugin.l3_rpc_notifier.client,
|
||||
'prepare') as mock_prepare:
|
||||
# add external gateway to router
|
||||
self.l3_plugin.update_router(
|
||||
self.context, router['id'],
|
||||
{'router': {
|
||||
'external_gateway_info': {'network_id': ext_net_id}}})
|
||||
# router has no interfaces so notification goes
|
||||
# to only dvr_snat agent
|
||||
mock_prepare.assert_called_once_with(
|
||||
server=self.l3_agent['host'],
|
||||
topic=topics.L3_AGENT,
|
||||
version='1.1')
|
||||
|
||||
mock_prepare.reset_mock()
|
||||
self.l3_plugin.add_router_interface(
|
||||
self.context, router['id'],
|
||||
{'subnet_id': subnet1['subnet']['id']})
|
||||
self.assertEqual(2, mock_prepare.call_count)
|
||||
expected = [mock.call(server=self.l3_agent['host'],
|
||||
topic=topics.L3_AGENT,
|
||||
version='1.1'),
|
||||
mock.call(server=HOST1,
|
||||
topic=topics.L3_AGENT,
|
||||
version='1.1')]
|
||||
mock_prepare.assert_has_calls(expected, any_order=True)
|
||||
|
||||
mock_prepare.reset_mock()
|
||||
self.l3_plugin.add_router_interface(
|
||||
self.context, router['id'],
|
||||
{'subnet_id': subnet2['subnet']['id']})
|
||||
self.assertEqual(3, mock_prepare.call_count)
|
||||
expected = [mock.call(server=self.l3_agent['host'],
|
||||
topic=topics.L3_AGENT,
|
||||
version='1.1'),
|
||||
mock.call(server=HOST1,
|
||||
topic=topics.L3_AGENT,
|
||||
version='1.1'),
|
||||
mock.call(server=HOST2,
|
||||
topic=topics.L3_AGENT,
|
||||
version='1.1')]
|
||||
mock_prepare.assert_has_calls(expected, any_order=True)
|
||||
|
||||
mock_prepare.reset_mock()
|
||||
self.l3_plugin.add_router_interface(
|
||||
self.context, router['id'],
|
||||
{'subnet_id': subnet3['subnet']['id']})
|
||||
# there are no dvr serviceable ports on HOST3, so notification
|
||||
# goes to the same hosts
|
||||
self.assertEqual(3, mock_prepare.call_count)
|
||||
expected = [mock.call(server=self.l3_agent['host'],
|
||||
topic=topics.L3_AGENT,
|
||||
version='1.1'),
|
||||
mock.call(server=HOST1,
|
||||
topic=topics.L3_AGENT,
|
||||
version='1.1'),
|
||||
mock.call(server=HOST2,
|
||||
topic=topics.L3_AGENT,
|
||||
version='1.1')]
|
||||
mock_prepare.assert_has_calls(expected, any_order=True)
|
||||
|
|
Loading…
Reference in New Issue