Merge "Minor refactoring of auto_schedule_routers"
This commit is contained in:
commit
ebc3ccfdd0
@ -82,6 +82,23 @@ class AgentDbMixin(ext_agent.AgentPluginBase):
|
||||
raise ext_agent.AgentNotFound(id=id)
|
||||
return agent
|
||||
|
||||
def get_enabled_agent_on_host(self, context, agent_type, host):
|
||||
"""Return agent of agent_type for the specified host."""
|
||||
query = context.session.query(Agent)
|
||||
query = query.filter(Agent.agent_type == agent_type,
|
||||
Agent.host == host,
|
||||
Agent.admin_state_up == sql.true())
|
||||
try:
|
||||
agent = query.one()
|
||||
except exc.NoResultFound:
|
||||
LOG.debug('No enabled %(agent_type)s agent on host '
|
||||
'%(host)s' % {'agent_type': agent_type, 'host': host})
|
||||
return
|
||||
if self.is_agent_down(agent.heartbeat_timestamp):
|
||||
LOG.warn(_('%(agent_type)s agent %(agent_id)s is not active')
|
||||
% {'agent_type': agent_type, 'agent_id': agent.id})
|
||||
return agent
|
||||
|
||||
@classmethod
|
||||
def is_agent_down(cls, heart_beat_time):
|
||||
return timeutils.is_older_than(heart_beat_time,
|
||||
|
@ -18,11 +18,9 @@ import random
|
||||
|
||||
from oslo.db import exception as db_exc
|
||||
import six
|
||||
from sqlalchemy.orm import exc
|
||||
from sqlalchemy import sql
|
||||
|
||||
from neutron.common import constants
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import l3_agentschedulers_db
|
||||
from neutron.db import l3_db
|
||||
from neutron.openstack.common import log as logging
|
||||
@ -52,89 +50,92 @@ class L3Scheduler(object):
|
||||
|
||||
return query.count() > 0
|
||||
|
||||
def filter_unscheduled_routers(self, context, plugin, routers):
|
||||
"""Filter from list of routers the ones that are not scheduled."""
|
||||
unscheduled_routers = []
|
||||
for router in routers:
|
||||
l3_agents = plugin.get_l3_agents_hosting_routers(
|
||||
context, [router['id']], admin_state_up=True)
|
||||
# TODO(armando-migliaccio): remove dvr-related check
|
||||
if l3_agents and not router.get('distributed', False):
|
||||
LOG.debug(('Router %(router_id)s has already been '
|
||||
'hosted by L3 agent %(agent_id)s'),
|
||||
{'router_id': router['id'],
|
||||
'agent_id': l3_agents[0]['id']})
|
||||
else:
|
||||
unscheduled_routers.append(router)
|
||||
return unscheduled_routers
|
||||
|
||||
def get_unscheduled_routers(self, context, plugin):
|
||||
"""Get routers with no agent binding."""
|
||||
# TODO(gongysh) consider the disabled agent's router
|
||||
no_agent_binding = ~sql.exists().where(
|
||||
l3_db.Router.id ==
|
||||
l3_agentschedulers_db.RouterL3AgentBinding.router_id)
|
||||
query = context.session.query(l3_db.Router.id).filter(no_agent_binding)
|
||||
unscheduled_router_ids = [router_id_[0] for router_id_ in query]
|
||||
if unscheduled_router_ids:
|
||||
return plugin.get_routers(
|
||||
context, filters={'id': unscheduled_router_ids})
|
||||
return []
|
||||
|
||||
def get_routers_to_schedule(self, context, plugin, router_ids=None):
|
||||
"""Verify that the routers specified need to be scheduled.
|
||||
|
||||
:param context: the context
|
||||
:param plugin: the core plugin
|
||||
:param router_ids: the list of routers to be checked for scheduling
|
||||
:returns: the list of routers to be scheduled
|
||||
"""
|
||||
if router_ids is not None:
|
||||
routers = plugin.get_routers(context, filters={'id': router_ids})
|
||||
return self.filter_unscheduled_routers(context, plugin, routers)
|
||||
else:
|
||||
return self.get_unscheduled_routers(context, plugin)
|
||||
|
||||
def get_routers_can_schedule(self, context, plugin, routers, l3_agent):
|
||||
"""Get the subset of routers that can be scheduled on the L3 agent."""
|
||||
ids_to_discard = set()
|
||||
for router in routers:
|
||||
# check if the l3 agent is compatible with the router
|
||||
candidates = plugin.get_l3_agent_candidates(
|
||||
context, router, [l3_agent])
|
||||
if not candidates:
|
||||
ids_to_discard.add(router['id'])
|
||||
|
||||
return [r for r in routers if r['id'] not in ids_to_discard]
|
||||
|
||||
def auto_schedule_routers(self, plugin, context, host, router_ids):
|
||||
"""Schedule non-hosted routers to L3 Agent running on host.
|
||||
|
||||
If router_ids is given, each router in router_ids is scheduled
|
||||
if it is not scheduled yet. Otherwise all unscheduled routers
|
||||
are scheduled.
|
||||
Don't schedule the routers which are hosted already
|
||||
Do not schedule the routers which are hosted already
|
||||
by active l3 agents.
|
||||
|
||||
:returns: True if routers have been successfully assigned to host
|
||||
"""
|
||||
with context.session.begin(subtransactions=True):
|
||||
# query if we have valid l3 agent on the host
|
||||
query = context.session.query(agents_db.Agent)
|
||||
query = query.filter(agents_db.Agent.agent_type ==
|
||||
constants.AGENT_TYPE_L3,
|
||||
agents_db.Agent.host == host,
|
||||
agents_db.Agent.admin_state_up == sql.true())
|
||||
try:
|
||||
l3_agent = query.one()
|
||||
except (exc.MultipleResultsFound, exc.NoResultFound):
|
||||
LOG.debug(_('No enabled L3 agent on host %s'),
|
||||
host)
|
||||
l3_agent = plugin.get_enabled_agent_on_host(
|
||||
context, constants.AGENT_TYPE_L3, host)
|
||||
if not l3_agent:
|
||||
return False
|
||||
if agents_db.AgentDbMixin.is_agent_down(
|
||||
l3_agent.heartbeat_timestamp):
|
||||
LOG.warn(_('L3 agent %s is not active'), l3_agent.id)
|
||||
# check if each of the specified routers is hosted
|
||||
if router_ids:
|
||||
routers = plugin.get_routers(
|
||||
context, filters={'id': router_ids})
|
||||
unscheduled_routers = []
|
||||
for router in routers:
|
||||
l3_agents = plugin.get_l3_agents_hosting_routers(
|
||||
context, [router['id']], admin_state_up=True)
|
||||
if l3_agents and not router.get('distributed', False):
|
||||
LOG.debug(_('Router %(router_id)s has already been'
|
||||
' hosted by L3 agent %(agent_id)s'),
|
||||
{'router_id': router['id'],
|
||||
'agent_id': l3_agents[0]['id']})
|
||||
else:
|
||||
unscheduled_routers.append(router)
|
||||
if not unscheduled_routers:
|
||||
# all (specified) routers are already scheduled
|
||||
return False
|
||||
else:
|
||||
# get all routers that are not hosted
|
||||
#TODO(gongysh) consider the disabled agent's router
|
||||
stmt = ~sql.exists().where(
|
||||
l3_db.Router.id ==
|
||||
l3_agentschedulers_db.RouterL3AgentBinding.router_id)
|
||||
unscheduled_router_ids = [router_id_[0] for router_id_ in
|
||||
context.session.query(
|
||||
l3_db.Router.id).filter(stmt)]
|
||||
if not unscheduled_router_ids:
|
||||
LOG.debug(_('No non-hosted routers'))
|
||||
return False
|
||||
unscheduled_routers = plugin.get_routers(
|
||||
context, filters={'id': unscheduled_router_ids})
|
||||
|
||||
# check if the configuration of l3 agent is compatible
|
||||
# with the router
|
||||
to_removed_ids = set()
|
||||
for router in unscheduled_routers:
|
||||
candidates = plugin.get_l3_agent_candidates(context,
|
||||
router,
|
||||
[l3_agent])
|
||||
if not candidates:
|
||||
to_removed_ids.add(router['id'])
|
||||
unscheduled_routers = self.get_routers_to_schedule(
|
||||
context, plugin, router_ids)
|
||||
if not unscheduled_routers:
|
||||
return False
|
||||
|
||||
target_routers = [r for r in unscheduled_routers
|
||||
if r['id'] not in to_removed_ids]
|
||||
target_routers = self.get_routers_can_schedule(
|
||||
context, plugin, unscheduled_routers, l3_agent)
|
||||
if not target_routers:
|
||||
LOG.warn(_('No routers compatible with L3 agent configuration'
|
||||
' on host %s'), host)
|
||||
return False
|
||||
|
||||
for router_dict in target_routers:
|
||||
if (router_dict.get('distributed', False)
|
||||
and self.dvr_has_binding(context,
|
||||
router_dict['id'],
|
||||
l3_agent.id)):
|
||||
continue
|
||||
self.bind_router(context, router_dict['id'], l3_agent)
|
||||
return True
|
||||
self.bind_routers(context, target_routers, l3_agent)
|
||||
return True
|
||||
|
||||
def get_candidates(self, plugin, context, sync_router, subnet_id):
|
||||
"""Return L3 agents where a router could be scheduled."""
|
||||
@ -172,6 +173,13 @@ class L3Scheduler(object):
|
||||
|
||||
return candidates
|
||||
|
||||
def bind_routers(self, context, routers, l3_agent):
|
||||
for router in routers:
|
||||
if (router.get('distributed', False) and
|
||||
self.dvr_has_binding(context, router['id'], l3_agent.id)):
|
||||
continue
|
||||
self.bind_router(context, router['id'], l3_agent)
|
||||
|
||||
def bind_router(self, context, router_id, chosen_agent):
|
||||
"""Bind the router to the l3 agent which has been chosen."""
|
||||
try:
|
||||
|
@ -16,9 +16,11 @@
|
||||
import mock
|
||||
from oslo.db import exception as exc
|
||||
|
||||
from neutron.common import constants
|
||||
from neutron import context
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import db_base_plugin_v2 as base_plugin
|
||||
from neutron.openstack.common import timeutils
|
||||
from neutron.tests.unit import testlib_api
|
||||
|
||||
|
||||
@ -40,6 +42,35 @@ class TestAgentsDbMixin(testlib_api.SqlTestCase):
|
||||
'topic': 'N/A'
|
||||
}
|
||||
|
||||
def _add_agent(self, agent_id, agent_type, agent_host):
|
||||
with self.context.session.begin(subtransactions=True):
|
||||
now = timeutils.utcnow()
|
||||
agent = agents_db.Agent(id=agent_id,
|
||||
agent_type=agent_type,
|
||||
binary='foo_binary',
|
||||
topic='foo_topic',
|
||||
host=agent_host,
|
||||
created_at=now,
|
||||
started_at=now,
|
||||
admin_state_up=True,
|
||||
heartbeat_timestamp=now,
|
||||
configurations='')
|
||||
self.context.session.add(agent)
|
||||
return agent
|
||||
|
||||
def test_get_enabled_agent_on_host_found(self):
|
||||
agent = self._add_agent('foo_id', constants.AGENT_TYPE_L3, 'foo_host')
|
||||
expected = self.plugin.get_enabled_agent_on_host(
|
||||
self.context, constants.AGENT_TYPE_L3, 'foo_host')
|
||||
self.assertEqual(expected, agent)
|
||||
|
||||
def test_get_enabled_agent_on_host_not_found(self):
|
||||
with mock.patch.object(agents_db.LOG, 'debug') as mock_log:
|
||||
agent = self.plugin.get_enabled_agent_on_host(
|
||||
self.context, constants.AGENT_TYPE_L3, 'foo_agent')
|
||||
self.assertIsNone(agent)
|
||||
self.assertTrue(mock_log.called)
|
||||
|
||||
def _assert_ref_fields_are_equal(self, reference, result):
|
||||
"""Compare (key, value) pairs of a reference dict with the result
|
||||
|
||||
|
@ -35,6 +35,7 @@ from neutron.extensions import l3 as ext_l3
|
||||
from neutron import manager
|
||||
from neutron.openstack.common import timeutils
|
||||
from neutron.scheduler import l3_agent_scheduler
|
||||
from neutron.tests import base
|
||||
from neutron.tests.unit import test_db_plugin
|
||||
from neutron.tests.unit import test_l3_plugin
|
||||
from neutron.tests.unit import testlib_api
|
||||
@ -84,6 +85,138 @@ DB_PLUGIN_KLASS = ('neutron.plugins.openvswitch.ovs_neutron_plugin.'
|
||||
'OVSNeutronPluginV2')
|
||||
|
||||
|
||||
class FakeL3Scheduler(l3_agent_scheduler.L3Scheduler):
|
||||
|
||||
def schedule(self):
|
||||
pass
|
||||
|
||||
def _choose_router_agent(self):
|
||||
pass
|
||||
|
||||
|
||||
class L3SchedulerBaseTestCase(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(L3SchedulerBaseTestCase, self).setUp()
|
||||
self.scheduler = FakeL3Scheduler()
|
||||
self.plugin = mock.Mock()
|
||||
self.context = q_context.get_admin_context()
|
||||
|
||||
def test_auto_schedule_routers(self):
|
||||
self.plugin.get_enabled_agent_on_host.return_value = [mock.ANY]
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.scheduler, 'get_routers_to_schedule'),
|
||||
mock.patch.object(self.scheduler, 'get_routers_can_schedule')) as (
|
||||
gs, gr):
|
||||
result = self.scheduler.auto_schedule_routers(
|
||||
self.plugin, self.context, mock.ANY, mock.ANY)
|
||||
self.assertTrue(self.plugin.get_enabled_agent_on_host.called)
|
||||
self.assertTrue(result)
|
||||
self.assertTrue(gs.called)
|
||||
self.assertTrue(gr.called)
|
||||
|
||||
def test_auto_schedule_routers_no_agents(self):
|
||||
self.plugin.get_enabled_agent_on_host.return_value = None
|
||||
result = self.scheduler.auto_schedule_routers(
|
||||
self.plugin, self.context, mock.ANY, mock.ANY)
|
||||
self.assertTrue(self.plugin.get_enabled_agent_on_host.called)
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_auto_schedule_routers_no_unscheduled_routers(self):
|
||||
with mock.patch.object(self.scheduler,
|
||||
'get_routers_to_schedule') as mock_routers:
|
||||
mock_routers.return_value = None
|
||||
result = self.scheduler.auto_schedule_routers(
|
||||
self.plugin, self.context, mock.ANY, mock.ANY)
|
||||
self.assertTrue(self.plugin.get_enabled_agent_on_host.called)
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_auto_schedule_routers_no_target_routers(self):
|
||||
self.plugin.get_enabled_agent_on_host.return_value = [mock.ANY]
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.scheduler, 'get_routers_to_schedule'),
|
||||
mock.patch.object(self.scheduler, 'get_routers_can_schedule')) as (
|
||||
mock_unscheduled_routers, mock_target_routers):
|
||||
mock_unscheduled_routers.return_value = mock.ANY
|
||||
mock_target_routers.return_value = None
|
||||
result = self.scheduler.auto_schedule_routers(
|
||||
self.plugin, self.context, mock.ANY, mock.ANY)
|
||||
self.assertTrue(self.plugin.get_enabled_agent_on_host.called)
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_get_routers_to_schedule_with_router_ids(self):
|
||||
router_ids = ['foo_router_1', 'foo_router_2']
|
||||
expected_routers = [
|
||||
{'id': 'foo_router1'}, {'id': 'foo_router_2'}
|
||||
]
|
||||
self.plugin.get_routers.return_value = expected_routers
|
||||
with mock.patch.object(self.scheduler,
|
||||
'filter_unscheduled_routers') as mock_filter:
|
||||
mock_filter.return_value = expected_routers
|
||||
unscheduled_routers = self.scheduler.get_routers_to_schedule(
|
||||
mock.ANY, self.plugin, router_ids)
|
||||
mock_filter.assert_called_once_with(
|
||||
mock.ANY, self.plugin, expected_routers)
|
||||
self.assertEqual(expected_routers, unscheduled_routers)
|
||||
|
||||
def test_get_routers_to_schedule_without_router_ids(self):
|
||||
expected_routers = [
|
||||
{'id': 'foo_router1'}, {'id': 'foo_router_2'}
|
||||
]
|
||||
with mock.patch.object(self.scheduler,
|
||||
'get_unscheduled_routers') as mock_get:
|
||||
mock_get.return_value = expected_routers
|
||||
unscheduled_routers = self.scheduler.get_routers_to_schedule(
|
||||
mock.ANY, self.plugin)
|
||||
mock_get.assert_called_once_with(mock.ANY, self.plugin)
|
||||
self.assertEqual(expected_routers, unscheduled_routers)
|
||||
|
||||
def _test_get_routers_can_schedule(self, routers, agent, target_routers):
|
||||
self.plugin.get_l3_agent_candidates.return_value = agent
|
||||
result = self.scheduler.get_routers_can_schedule(
|
||||
mock.ANY, self.plugin, routers, mock.ANY)
|
||||
self.assertEqual(target_routers, result)
|
||||
|
||||
def _test_filter_unscheduled_routers(self, routers, agents, expected):
|
||||
self.plugin.get_l3_agents_hosting_routers.return_value = agents
|
||||
unscheduled_routers = self.scheduler.filter_unscheduled_routers(
|
||||
mock.ANY, self.plugin, routers)
|
||||
self.assertEqual(expected, unscheduled_routers)
|
||||
|
||||
def test_filter_unscheduled_routers_already_scheduled(self):
|
||||
self._test_filter_unscheduled_routers(
|
||||
[{'id': 'foo_router1'}, {'id': 'foo_router_2'}],
|
||||
[{'id': 'foo_agent_id'}], [])
|
||||
|
||||
def test_filter_unscheduled_routers_non_scheduled(self):
|
||||
self._test_filter_unscheduled_routers(
|
||||
[{'id': 'foo_router1'}, {'id': 'foo_router_2'}],
|
||||
None, [{'id': 'foo_router1'}, {'id': 'foo_router_2'}])
|
||||
|
||||
def test_get_routers_can_schedule_with_compat_agent(self):
|
||||
routers = [{'id': 'foo_router'}]
|
||||
self._test_get_routers_can_schedule(routers, mock.ANY, routers)
|
||||
|
||||
def test_get_routers_can_schedule_with_no_compat_agent(self):
|
||||
routers = [{'id': 'foo_router'}]
|
||||
self._test_get_routers_can_schedule(routers, None, [])
|
||||
|
||||
def test_bind_routers_centralized(self):
|
||||
routers = [{'id': 'foo_router'}]
|
||||
with mock.patch.object(self.scheduler, 'bind_router') as mock_bind:
|
||||
self.scheduler.bind_routers(mock.ANY, routers, mock.ANY)
|
||||
mock_bind.assert_called_once_with(mock.ANY, 'foo_router', mock.ANY)
|
||||
|
||||
def test_bind_routers_dvr(self):
|
||||
routers = [{'id': 'foo_router', 'distributed': True}]
|
||||
agent = agents_db.Agent(id='foo_agent')
|
||||
with mock.patch.object(self.scheduler, 'dvr_has_binding') as mock_dvr:
|
||||
with mock.patch.object(self.scheduler, 'bind_router') as mock_bind:
|
||||
self.scheduler.bind_routers(mock.ANY, routers, agent)
|
||||
mock_dvr.assert_called_once_with(mock.ANY, 'foo_router', 'foo_agent')
|
||||
self.assertFalse(mock_bind.called)
|
||||
|
||||
|
||||
class L3SchedulerTestExtensionManager(object):
|
||||
|
||||
def get_resources(self):
|
||||
|
Loading…
Reference in New Issue
Block a user