Minor refactoring of auto_schedule_routers
The method is more complicated than it needs to be, and it makes it difficult to target fixes for it. This is done in preparation of fix for DB lock timeout errors observed while dealing with DVR routers. Test coverage is already provided, and more granular coverage is added to reflect the new structure being introduced. Partial-bug: #1356121 Change-Id: Ifb7a742b64139f3a5d9b88c3c6261b1b890946f9
This commit is contained in:
parent
921969d374
commit
2fc7fd6eb1
@ -82,6 +82,23 @@ class AgentDbMixin(ext_agent.AgentPluginBase):
|
||||
raise ext_agent.AgentNotFound(id=id)
|
||||
return agent
|
||||
|
||||
def get_enabled_agent_on_host(self, context, agent_type, host):
|
||||
"""Return agent of agent_type for the specified host."""
|
||||
query = context.session.query(Agent)
|
||||
query = query.filter(Agent.agent_type == agent_type,
|
||||
Agent.host == host,
|
||||
Agent.admin_state_up == sql.true())
|
||||
try:
|
||||
agent = query.one()
|
||||
except exc.NoResultFound:
|
||||
LOG.debug('No enabled %(agent_type)s agent on host '
|
||||
'%(host)s' % {'agent_type': agent_type, 'host': host})
|
||||
return
|
||||
if self.is_agent_down(agent.heartbeat_timestamp):
|
||||
LOG.warn(_('%(agent_type)s agent %(agent_id)s is not active')
|
||||
% {'agent_type': agent_type, 'agent_id': agent.id})
|
||||
return agent
|
||||
|
||||
@classmethod
|
||||
def is_agent_down(cls, heart_beat_time):
|
||||
return timeutils.is_older_than(heart_beat_time,
|
||||
|
@ -18,11 +18,9 @@ import random
|
||||
|
||||
from oslo.db import exception as db_exc
|
||||
import six
|
||||
from sqlalchemy.orm import exc
|
||||
from sqlalchemy import sql
|
||||
|
||||
from neutron.common import constants
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import l3_agentschedulers_db
|
||||
from neutron.db import l3_db
|
||||
from neutron.openstack.common import log as logging
|
||||
@ -52,89 +50,92 @@ class L3Scheduler(object):
|
||||
|
||||
return query.count() > 0
|
||||
|
||||
def filter_unscheduled_routers(self, context, plugin, routers):
|
||||
"""Filter from list of routers the ones that are not scheduled."""
|
||||
unscheduled_routers = []
|
||||
for router in routers:
|
||||
l3_agents = plugin.get_l3_agents_hosting_routers(
|
||||
context, [router['id']], admin_state_up=True)
|
||||
# TODO(armando-migliaccio): remove dvr-related check
|
||||
if l3_agents and not router.get('distributed', False):
|
||||
LOG.debug(('Router %(router_id)s has already been '
|
||||
'hosted by L3 agent %(agent_id)s'),
|
||||
{'router_id': router['id'],
|
||||
'agent_id': l3_agents[0]['id']})
|
||||
else:
|
||||
unscheduled_routers.append(router)
|
||||
return unscheduled_routers
|
||||
|
||||
def get_unscheduled_routers(self, context, plugin):
|
||||
"""Get routers with no agent binding."""
|
||||
# TODO(gongysh) consider the disabled agent's router
|
||||
no_agent_binding = ~sql.exists().where(
|
||||
l3_db.Router.id ==
|
||||
l3_agentschedulers_db.RouterL3AgentBinding.router_id)
|
||||
query = context.session.query(l3_db.Router.id).filter(no_agent_binding)
|
||||
unscheduled_router_ids = [router_id_[0] for router_id_ in query]
|
||||
if unscheduled_router_ids:
|
||||
return plugin.get_routers(
|
||||
context, filters={'id': unscheduled_router_ids})
|
||||
return []
|
||||
|
||||
def get_routers_to_schedule(self, context, plugin, router_ids=None):
|
||||
"""Verify that the routers specified need to be scheduled.
|
||||
|
||||
:param context: the context
|
||||
:param plugin: the core plugin
|
||||
:param router_ids: the list of routers to be checked for scheduling
|
||||
:returns: the list of routers to be scheduled
|
||||
"""
|
||||
if router_ids is not None:
|
||||
routers = plugin.get_routers(context, filters={'id': router_ids})
|
||||
return self.filter_unscheduled_routers(context, plugin, routers)
|
||||
else:
|
||||
return self.get_unscheduled_routers(context, plugin)
|
||||
|
||||
def get_routers_can_schedule(self, context, plugin, routers, l3_agent):
|
||||
"""Get the subset of routers that can be scheduled on the L3 agent."""
|
||||
ids_to_discard = set()
|
||||
for router in routers:
|
||||
# check if the l3 agent is compatible with the router
|
||||
candidates = plugin.get_l3_agent_candidates(
|
||||
context, router, [l3_agent])
|
||||
if not candidates:
|
||||
ids_to_discard.add(router['id'])
|
||||
|
||||
return [r for r in routers if r['id'] not in ids_to_discard]
|
||||
|
||||
def auto_schedule_routers(self, plugin, context, host, router_ids):
|
||||
"""Schedule non-hosted routers to L3 Agent running on host.
|
||||
|
||||
If router_ids is given, each router in router_ids is scheduled
|
||||
if it is not scheduled yet. Otherwise all unscheduled routers
|
||||
are scheduled.
|
||||
Don't schedule the routers which are hosted already
|
||||
Do not schedule the routers which are hosted already
|
||||
by active l3 agents.
|
||||
|
||||
:returns: True if routers have been successfully assigned to host
|
||||
"""
|
||||
with context.session.begin(subtransactions=True):
|
||||
# query if we have valid l3 agent on the host
|
||||
query = context.session.query(agents_db.Agent)
|
||||
query = query.filter(agents_db.Agent.agent_type ==
|
||||
constants.AGENT_TYPE_L3,
|
||||
agents_db.Agent.host == host,
|
||||
agents_db.Agent.admin_state_up == sql.true())
|
||||
try:
|
||||
l3_agent = query.one()
|
||||
except (exc.MultipleResultsFound, exc.NoResultFound):
|
||||
LOG.debug(_('No enabled L3 agent on host %s'),
|
||||
host)
|
||||
l3_agent = plugin.get_enabled_agent_on_host(
|
||||
context, constants.AGENT_TYPE_L3, host)
|
||||
if not l3_agent:
|
||||
return False
|
||||
if agents_db.AgentDbMixin.is_agent_down(
|
||||
l3_agent.heartbeat_timestamp):
|
||||
LOG.warn(_('L3 agent %s is not active'), l3_agent.id)
|
||||
# check if each of the specified routers is hosted
|
||||
if router_ids:
|
||||
routers = plugin.get_routers(
|
||||
context, filters={'id': router_ids})
|
||||
unscheduled_routers = []
|
||||
for router in routers:
|
||||
l3_agents = plugin.get_l3_agents_hosting_routers(
|
||||
context, [router['id']], admin_state_up=True)
|
||||
if l3_agents and not router.get('distributed', False):
|
||||
LOG.debug(_('Router %(router_id)s has already been'
|
||||
' hosted by L3 agent %(agent_id)s'),
|
||||
{'router_id': router['id'],
|
||||
'agent_id': l3_agents[0]['id']})
|
||||
else:
|
||||
unscheduled_routers.append(router)
|
||||
if not unscheduled_routers:
|
||||
# all (specified) routers are already scheduled
|
||||
return False
|
||||
else:
|
||||
# get all routers that are not hosted
|
||||
#TODO(gongysh) consider the disabled agent's router
|
||||
stmt = ~sql.exists().where(
|
||||
l3_db.Router.id ==
|
||||
l3_agentschedulers_db.RouterL3AgentBinding.router_id)
|
||||
unscheduled_router_ids = [router_id_[0] for router_id_ in
|
||||
context.session.query(
|
||||
l3_db.Router.id).filter(stmt)]
|
||||
if not unscheduled_router_ids:
|
||||
LOG.debug(_('No non-hosted routers'))
|
||||
return False
|
||||
unscheduled_routers = plugin.get_routers(
|
||||
context, filters={'id': unscheduled_router_ids})
|
||||
|
||||
# check if the configuration of l3 agent is compatible
|
||||
# with the router
|
||||
to_removed_ids = set()
|
||||
for router in unscheduled_routers:
|
||||
candidates = plugin.get_l3_agent_candidates(context,
|
||||
router,
|
||||
[l3_agent])
|
||||
if not candidates:
|
||||
to_removed_ids.add(router['id'])
|
||||
unscheduled_routers = self.get_routers_to_schedule(
|
||||
context, plugin, router_ids)
|
||||
if not unscheduled_routers:
|
||||
return False
|
||||
|
||||
target_routers = [r for r in unscheduled_routers
|
||||
if r['id'] not in to_removed_ids]
|
||||
target_routers = self.get_routers_can_schedule(
|
||||
context, plugin, unscheduled_routers, l3_agent)
|
||||
if not target_routers:
|
||||
LOG.warn(_('No routers compatible with L3 agent configuration'
|
||||
' on host %s'), host)
|
||||
return False
|
||||
|
||||
for router_dict in target_routers:
|
||||
if (router_dict.get('distributed', False)
|
||||
and self.dvr_has_binding(context,
|
||||
router_dict['id'],
|
||||
l3_agent.id)):
|
||||
continue
|
||||
self.bind_router(context, router_dict['id'], l3_agent)
|
||||
return True
|
||||
self.bind_routers(context, target_routers, l3_agent)
|
||||
return True
|
||||
|
||||
def get_candidates(self, plugin, context, sync_router, subnet_id):
|
||||
"""Return L3 agents where a router could be scheduled."""
|
||||
@ -173,6 +174,13 @@ class L3Scheduler(object):
|
||||
|
||||
return candidates
|
||||
|
||||
def bind_routers(self, context, routers, l3_agent):
|
||||
for router in routers:
|
||||
if (router.get('distributed', False) and
|
||||
self.dvr_has_binding(context, router['id'], l3_agent.id)):
|
||||
continue
|
||||
self.bind_router(context, router['id'], l3_agent)
|
||||
|
||||
def bind_router(self, context, router_id, chosen_agent):
|
||||
"""Bind the router to the l3 agent which has been chosen."""
|
||||
try:
|
||||
|
@ -16,9 +16,11 @@
|
||||
import mock
|
||||
from oslo.db import exception as exc
|
||||
|
||||
from neutron.common import constants
|
||||
from neutron import context
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import db_base_plugin_v2 as base_plugin
|
||||
from neutron.openstack.common import timeutils
|
||||
from neutron.tests.unit import testlib_api
|
||||
|
||||
|
||||
@ -40,6 +42,35 @@ class TestAgentsDbMixin(testlib_api.SqlTestCase):
|
||||
'topic': 'N/A'
|
||||
}
|
||||
|
||||
def _add_agent(self, agent_id, agent_type, agent_host):
|
||||
with self.context.session.begin(subtransactions=True):
|
||||
now = timeutils.utcnow()
|
||||
agent = agents_db.Agent(id=agent_id,
|
||||
agent_type=agent_type,
|
||||
binary='foo_binary',
|
||||
topic='foo_topic',
|
||||
host=agent_host,
|
||||
created_at=now,
|
||||
started_at=now,
|
||||
admin_state_up=True,
|
||||
heartbeat_timestamp=now,
|
||||
configurations='')
|
||||
self.context.session.add(agent)
|
||||
return agent
|
||||
|
||||
def test_get_enabled_agent_on_host_found(self):
|
||||
agent = self._add_agent('foo_id', constants.AGENT_TYPE_L3, 'foo_host')
|
||||
expected = self.plugin.get_enabled_agent_on_host(
|
||||
self.context, constants.AGENT_TYPE_L3, 'foo_host')
|
||||
self.assertEqual(expected, agent)
|
||||
|
||||
def test_get_enabled_agent_on_host_not_found(self):
|
||||
with mock.patch.object(agents_db.LOG, 'debug') as mock_log:
|
||||
agent = self.plugin.get_enabled_agent_on_host(
|
||||
self.context, constants.AGENT_TYPE_L3, 'foo_agent')
|
||||
self.assertIsNone(agent)
|
||||
self.assertTrue(mock_log.called)
|
||||
|
||||
def _assert_ref_fields_are_equal(self, reference, result):
|
||||
"""Compare (key, value) pairs of a reference dict with the result
|
||||
|
||||
|
@ -35,6 +35,7 @@ from neutron.extensions import l3 as ext_l3
|
||||
from neutron import manager
|
||||
from neutron.openstack.common import timeutils
|
||||
from neutron.scheduler import l3_agent_scheduler
|
||||
from neutron.tests import base
|
||||
from neutron.tests.unit import test_db_plugin
|
||||
from neutron.tests.unit import test_l3_plugin
|
||||
from neutron.tests.unit import testlib_api
|
||||
@ -83,6 +84,138 @@ DB_PLUGIN_KLASS = ('neutron.plugins.openvswitch.ovs_neutron_plugin.'
|
||||
'OVSNeutronPluginV2')
|
||||
|
||||
|
||||
class FakeL3Scheduler(l3_agent_scheduler.L3Scheduler):
|
||||
|
||||
def schedule(self):
|
||||
pass
|
||||
|
||||
def _choose_router_agent(self):
|
||||
pass
|
||||
|
||||
|
||||
class L3SchedulerBaseTestCase(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(L3SchedulerBaseTestCase, self).setUp()
|
||||
self.scheduler = FakeL3Scheduler()
|
||||
self.plugin = mock.Mock()
|
||||
self.context = q_context.get_admin_context()
|
||||
|
||||
def test_auto_schedule_routers(self):
|
||||
self.plugin.get_enabled_agent_on_host.return_value = [mock.ANY]
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.scheduler, 'get_routers_to_schedule'),
|
||||
mock.patch.object(self.scheduler, 'get_routers_can_schedule')) as (
|
||||
gs, gr):
|
||||
result = self.scheduler.auto_schedule_routers(
|
||||
self.plugin, self.context, mock.ANY, mock.ANY)
|
||||
self.assertTrue(self.plugin.get_enabled_agent_on_host.called)
|
||||
self.assertTrue(result)
|
||||
self.assertTrue(gs.called)
|
||||
self.assertTrue(gr.called)
|
||||
|
||||
def test_auto_schedule_routers_no_agents(self):
|
||||
self.plugin.get_enabled_agent_on_host.return_value = None
|
||||
result = self.scheduler.auto_schedule_routers(
|
||||
self.plugin, self.context, mock.ANY, mock.ANY)
|
||||
self.assertTrue(self.plugin.get_enabled_agent_on_host.called)
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_auto_schedule_routers_no_unscheduled_routers(self):
|
||||
with mock.patch.object(self.scheduler,
|
||||
'get_routers_to_schedule') as mock_routers:
|
||||
mock_routers.return_value = None
|
||||
result = self.scheduler.auto_schedule_routers(
|
||||
self.plugin, self.context, mock.ANY, mock.ANY)
|
||||
self.assertTrue(self.plugin.get_enabled_agent_on_host.called)
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_auto_schedule_routers_no_target_routers(self):
|
||||
self.plugin.get_enabled_agent_on_host.return_value = [mock.ANY]
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.scheduler, 'get_routers_to_schedule'),
|
||||
mock.patch.object(self.scheduler, 'get_routers_can_schedule')) as (
|
||||
mock_unscheduled_routers, mock_target_routers):
|
||||
mock_unscheduled_routers.return_value = mock.ANY
|
||||
mock_target_routers.return_value = None
|
||||
result = self.scheduler.auto_schedule_routers(
|
||||
self.plugin, self.context, mock.ANY, mock.ANY)
|
||||
self.assertTrue(self.plugin.get_enabled_agent_on_host.called)
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_get_routers_to_schedule_with_router_ids(self):
|
||||
router_ids = ['foo_router_1', 'foo_router_2']
|
||||
expected_routers = [
|
||||
{'id': 'foo_router1'}, {'id': 'foo_router_2'}
|
||||
]
|
||||
self.plugin.get_routers.return_value = expected_routers
|
||||
with mock.patch.object(self.scheduler,
|
||||
'filter_unscheduled_routers') as mock_filter:
|
||||
mock_filter.return_value = expected_routers
|
||||
unscheduled_routers = self.scheduler.get_routers_to_schedule(
|
||||
mock.ANY, self.plugin, router_ids)
|
||||
mock_filter.assert_called_once_with(
|
||||
mock.ANY, self.plugin, expected_routers)
|
||||
self.assertEqual(expected_routers, unscheduled_routers)
|
||||
|
||||
def test_get_routers_to_schedule_without_router_ids(self):
|
||||
expected_routers = [
|
||||
{'id': 'foo_router1'}, {'id': 'foo_router_2'}
|
||||
]
|
||||
with mock.patch.object(self.scheduler,
|
||||
'get_unscheduled_routers') as mock_get:
|
||||
mock_get.return_value = expected_routers
|
||||
unscheduled_routers = self.scheduler.get_routers_to_schedule(
|
||||
mock.ANY, self.plugin)
|
||||
mock_get.assert_called_once_with(mock.ANY, self.plugin)
|
||||
self.assertEqual(expected_routers, unscheduled_routers)
|
||||
|
||||
def _test_get_routers_can_schedule(self, routers, agent, target_routers):
|
||||
self.plugin.get_l3_agent_candidates.return_value = agent
|
||||
result = self.scheduler.get_routers_can_schedule(
|
||||
mock.ANY, self.plugin, routers, mock.ANY)
|
||||
self.assertEqual(target_routers, result)
|
||||
|
||||
def _test_filter_unscheduled_routers(self, routers, agents, expected):
|
||||
self.plugin.get_l3_agents_hosting_routers.return_value = agents
|
||||
unscheduled_routers = self.scheduler.filter_unscheduled_routers(
|
||||
mock.ANY, self.plugin, routers)
|
||||
self.assertEqual(expected, unscheduled_routers)
|
||||
|
||||
def test_filter_unscheduled_routers_already_scheduled(self):
|
||||
self._test_filter_unscheduled_routers(
|
||||
[{'id': 'foo_router1'}, {'id': 'foo_router_2'}],
|
||||
[{'id': 'foo_agent_id'}], [])
|
||||
|
||||
def test_filter_unscheduled_routers_non_scheduled(self):
|
||||
self._test_filter_unscheduled_routers(
|
||||
[{'id': 'foo_router1'}, {'id': 'foo_router_2'}],
|
||||
None, [{'id': 'foo_router1'}, {'id': 'foo_router_2'}])
|
||||
|
||||
def test_get_routers_can_schedule_with_compat_agent(self):
|
||||
routers = [{'id': 'foo_router'}]
|
||||
self._test_get_routers_can_schedule(routers, mock.ANY, routers)
|
||||
|
||||
def test_get_routers_can_schedule_with_no_compat_agent(self):
|
||||
routers = [{'id': 'foo_router'}]
|
||||
self._test_get_routers_can_schedule(routers, None, [])
|
||||
|
||||
def test_bind_routers_centralized(self):
|
||||
routers = [{'id': 'foo_router'}]
|
||||
with mock.patch.object(self.scheduler, 'bind_router') as mock_bind:
|
||||
self.scheduler.bind_routers(mock.ANY, routers, mock.ANY)
|
||||
mock_bind.assert_called_once_with(mock.ANY, 'foo_router', mock.ANY)
|
||||
|
||||
def test_bind_routers_dvr(self):
|
||||
routers = [{'id': 'foo_router', 'distributed': True}]
|
||||
agent = agents_db.Agent(id='foo_agent')
|
||||
with mock.patch.object(self.scheduler, 'dvr_has_binding') as mock_dvr:
|
||||
with mock.patch.object(self.scheduler, 'bind_router') as mock_bind:
|
||||
self.scheduler.bind_routers(mock.ANY, routers, agent)
|
||||
mock_dvr.assert_called_once_with(mock.ANY, 'foo_router', 'foo_agent')
|
||||
self.assertFalse(mock_bind.called)
|
||||
|
||||
|
||||
class L3SchedulerTestExtensionManager(object):
|
||||
|
||||
def get_resources(self):
|
||||
|
Loading…
Reference in New Issue
Block a user