L3 scheduler: add retry indicators for bind_router
Change I3447ea5bcb7c57365c6f50efe12a1671e86588b3 added a binding_index to the RouterL3AgentBinding table. In certain (concurrent) cases, a row with the same binding_index might be used twice, which will raise DBDuplicateEntry. However, that change didn't retry on this case at all code-paths, so this patch rectifies this issue. Closes-Bug: #1633042 Change-Id: I228b0084a8e7c48e78bc2ea6a3fccf7437210e73
This commit is contained in:
parent
ff9c6e219b
commit
1e195afaf1
|
@ -183,16 +183,17 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
|
|||
router_id = router['id']
|
||||
agent_id = agent['id']
|
||||
if self.router_scheduler:
|
||||
plugin = manager.NeutronManager.get_service_plugins().get(
|
||||
service_constants.L3_ROUTER_NAT)
|
||||
try:
|
||||
if router.get('ha'):
|
||||
plugin = manager.NeutronManager.get_service_plugins().get(
|
||||
service_constants.L3_ROUTER_NAT)
|
||||
self.router_scheduler.create_ha_port_and_bind(
|
||||
plugin, context, router['id'],
|
||||
router['tenant_id'], agent, is_manual_scheduling=True)
|
||||
router['tenant_id'], agent,
|
||||
is_manual_scheduling=True)
|
||||
else:
|
||||
self.router_scheduler.bind_router(
|
||||
context, router_id, agent)
|
||||
plugin, context, router_id, agent.id)
|
||||
except db_exc.DBError:
|
||||
raise l3agentscheduler.RouterSchedulingFailed(
|
||||
router_id=router_id, agent_id=agent_id)
|
||||
|
|
|
@ -199,44 +199,72 @@ class L3Scheduler(object):
|
|||
plugin, context, router['id'],
|
||||
router['tenant_id'], l3_agent)
|
||||
else:
|
||||
self.bind_router(context, router['id'], l3_agent)
|
||||
self.bind_router(plugin, context, router['id'], l3_agent.id)
|
||||
|
||||
def bind_router(self, context, router_id, chosen_agent,
|
||||
binding_index=rb_model.LOWEST_BINDING_INDEX):
|
||||
"""Bind the router to the l3 agent which has been chosen."""
|
||||
# Pre-cache the agent's id so that if an exception is raised we can
|
||||
# safely access its value. Otherwise, sqlalchemy will try to fetch it
|
||||
# from the database during a rollback, which is bad for us.
|
||||
agent_id = chosen_agent.id
|
||||
@db_api.retry_db_errors
|
||||
def bind_router(self, plugin, context, router_id, agent_id,
|
||||
is_manual_scheduling=False, is_ha=False):
|
||||
"""Bind the router to the l3 agent which has been chosen.
|
||||
|
||||
The function tries to create a RouterL3AgentBinding object and add it
|
||||
to the database. It returns the binding that was created or None if it
|
||||
failed to create it due to some conflict.
|
||||
|
||||
In the HA router case, when creating a RouterL3AgentBinding (with some
|
||||
binding_index) fails because some other RouterL3AgentBinding was
|
||||
concurrently created using the same binding_index, then the function
|
||||
will retry to create an entry with a new binding_index. This creation
|
||||
will be retried up to db_api.MAX_RETRIES times.
|
||||
If, still in the HA router case, the creation failed because the
|
||||
router has already been bound to the l3 agent in question or has been
|
||||
removed (by a concurrent operation), then no further attempts will be
|
||||
made and the function will return None.
|
||||
|
||||
Note that for non-HA routers, the function will always perform exactly
|
||||
one try, regardless of the error preventing the addition of a new
|
||||
RouterL3AgentBinding object to the database.
|
||||
"""
|
||||
bindings = context.session.query(
|
||||
rb_model.RouterL3AgentBinding).filter_by(router_id=router_id)
|
||||
|
||||
if bindings.filter_by(l3_agent_id=agent_id).first():
|
||||
LOG.debug('Router %(router_id)s has already been scheduled '
|
||||
'to L3 agent %(agent_id)s.',
|
||||
{'router_id': router_id, 'agent_id': agent_id})
|
||||
return
|
||||
|
||||
if not is_ha:
|
||||
binding_index = rb_model.LOWEST_BINDING_INDEX
|
||||
if bindings.filter_by(binding_index=binding_index).first():
|
||||
LOG.debug('Non-HA router %s has already been scheduled',
|
||||
router_id)
|
||||
return
|
||||
else:
|
||||
binding_index = plugin.get_vacant_binding_index(
|
||||
context, router_id, is_manual_scheduling)
|
||||
if binding_index < rb_model.LOWEST_BINDING_INDEX:
|
||||
LOG.debug('Unable to find a vacant binding_index for '
|
||||
'router %(router_id)s and agent %(agent_id)s',
|
||||
{'router_id': router_id,
|
||||
'agent_id': agent_id})
|
||||
return
|
||||
|
||||
try:
|
||||
with context.session.begin(subtransactions=True):
|
||||
binding = rb_model.RouterL3AgentBinding()
|
||||
binding.l3_agent = chosen_agent
|
||||
binding.l3_agent_id = agent_id
|
||||
binding.router_id = router_id
|
||||
binding.binding_index = binding_index
|
||||
context.session.add(binding)
|
||||
except db_exc.DBDuplicateEntry as error:
|
||||
LOG.debug('Router %(router_id)s has already been scheduled '
|
||||
'to L3 agent %(agent_id)s (tried to bind with '
|
||||
'binding_index %(binding_index)d). The conflict was '
|
||||
'with columns %(columns)r.',
|
||||
{'agent_id': agent_id,
|
||||
'router_id': router_id,
|
||||
'binding_index': binding_index,
|
||||
'columns': error.columns})
|
||||
return
|
||||
LOG.debug('Router %(router_id)s is scheduled to L3 agent '
|
||||
'%(agent_id)s with binding_index %(binding_index)d',
|
||||
{'router_id': router_id,
|
||||
'agent_id': agent_id,
|
||||
'binding_index': binding_index})
|
||||
return binding
|
||||
except db_exc.DBReferenceError:
|
||||
LOG.debug('Router %s has already been removed '
|
||||
'by concurrent operation', router_id)
|
||||
return
|
||||
|
||||
LOG.debug('Router %(router_id)s is scheduled to L3 agent '
|
||||
'%(agent_id)s with binding_index %(binding_index)d',
|
||||
{'router_id': router_id,
|
||||
'agent_id': agent_id,
|
||||
'binding_index': binding_index})
|
||||
return binding
|
||||
|
||||
def _schedule_router(self, plugin, context, router_id,
|
||||
candidates=None):
|
||||
|
@ -256,7 +284,7 @@ class L3Scheduler(object):
|
|||
else:
|
||||
chosen_agent = self._choose_router_agent(
|
||||
plugin, context, candidates)
|
||||
self.bind_router(context, router_id, chosen_agent)
|
||||
self.bind_router(plugin, context, router_id, chosen_agent.id)
|
||||
return chosen_agent
|
||||
|
||||
@abc.abstractmethod
|
||||
|
@ -297,35 +325,27 @@ class L3Scheduler(object):
|
|||
dep_deleter = functools.partial(plugin._delete_ha_network, ctxt)
|
||||
dep_id_attr = 'network_id'
|
||||
|
||||
for attempts in range(1, db_api.MAX_RETRIES + 1):
|
||||
binding_index = plugin.get_vacant_binding_index(
|
||||
context, router_id, is_manual_scheduling)
|
||||
if binding_index == -1:
|
||||
LOG.debug("Couldn't find a vacant binding_index for router %s",
|
||||
router_id)
|
||||
return
|
||||
# This might fail in case of concurrent calls, which is good for us
|
||||
# as we can skip the rest of this function.
|
||||
binding = self.bind_router(
|
||||
plugin, context, router_id, agent['id'],
|
||||
is_manual_scheduling=is_manual_scheduling, is_ha=True)
|
||||
if not binding:
|
||||
return
|
||||
|
||||
# This might fail in case of concurrent calls, which is good for us
|
||||
# as we can skip the rest of this function.
|
||||
if not self.bind_router(context, router_id, agent, binding_index):
|
||||
return
|
||||
|
||||
try:
|
||||
port_binding = utils.create_object_with_dependency(
|
||||
creator, dep_getter, dep_creator,
|
||||
dep_id_attr, dep_deleter)[0]
|
||||
with db_api.autonested_transaction(context.session):
|
||||
port_binding.l3_agent_id = agent['id']
|
||||
return
|
||||
except db_exc.DBDuplicateEntry:
|
||||
LOG.debug("Router %(router)s already scheduled for agent "
|
||||
"%(agent)s", {'router': router_id,
|
||||
'agent': agent['id']})
|
||||
return
|
||||
except l3.RouterNotFound:
|
||||
LOG.debug('Router %s has already been removed '
|
||||
'by concurrent operation', router_id)
|
||||
return
|
||||
try:
|
||||
port_binding = utils.create_object_with_dependency(
|
||||
creator, dep_getter, dep_creator,
|
||||
dep_id_attr, dep_deleter)[0]
|
||||
with db_api.autonested_transaction(context.session):
|
||||
port_binding.l3_agent_id = agent['id']
|
||||
except db_exc.DBDuplicateEntry:
|
||||
LOG.debug("Router %(router)s already scheduled for agent "
|
||||
"%(agent)s", {'router': router_id,
|
||||
'agent': agent['id']})
|
||||
except l3.RouterNotFound:
|
||||
LOG.debug('Router %s has already been removed '
|
||||
'by concurrent operation', router_id)
|
||||
|
||||
def get_ha_routers_l3_agents_counts(self, plugin, context, filters=None):
|
||||
"""Return a mapping (router, # agents) matching specified filters."""
|
||||
|
@ -359,13 +379,11 @@ class L3Scheduler(object):
|
|||
chosen_agents):
|
||||
port_bindings = plugin.get_ha_router_port_bindings(context,
|
||||
[router_id])
|
||||
binding_indices = range(rb_model.LOWEST_BINDING_INDEX,
|
||||
len(port_bindings) + 1)
|
||||
for port_binding, agent, binding_index in zip(
|
||||
port_bindings, chosen_agents, binding_indices):
|
||||
for port_binding, agent in zip(port_bindings, chosen_agents):
|
||||
if not self.bind_router(plugin, context, router_id, agent.id,
|
||||
is_ha=True):
|
||||
break
|
||||
|
||||
if not self.bind_router(context, router_id, agent, binding_index):
|
||||
continue
|
||||
try:
|
||||
with db_api.autonested_transaction(context.session):
|
||||
port_binding.l3_agent_id = agent.id
|
||||
|
|
|
@ -89,7 +89,8 @@ class L3SchedulerBaseTest(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
|
|||
for i in range(count):
|
||||
router = self.routers[i]
|
||||
agent = random.choice(self.l3_agents)
|
||||
scheduler.bind_router(self.adminContext, router['id'], agent)
|
||||
scheduler.bind_router(self.l3_plugin, self.adminContext,
|
||||
router['id'], agent.id)
|
||||
hosting_agents.append(agent)
|
||||
return hosting_agents
|
||||
|
||||
|
@ -531,7 +532,8 @@ class L3AZAutoScheduleTestCaseBase(L3AZSchedulerBaseTest):
|
|||
az = 'az%s' % i
|
||||
for j in range(self.scheduled_agent_count[i]):
|
||||
agent = l3_agents[az][j + self.down_agent_count[i]]
|
||||
scheduler.bind_router(self.adminContext, router['id'], agent)
|
||||
scheduler.bind_router(self.l3_plugin, self.adminContext,
|
||||
router['id'], agent.id)
|
||||
|
||||
# activate down agent and call auto_schedule_routers
|
||||
activate_agent = l3_agents[self.agent_az][0]
|
||||
|
@ -768,9 +770,10 @@ class L3DVRSchedulerTestCase(L3DVRSchedulerBaseTest):
|
|||
|
||||
def _test_schedule_router(self):
|
||||
if self.router_already_hosted:
|
||||
self.scheduler.bind_router(self.adminContext,
|
||||
self.scheduler.bind_router(self.l3_plugin,
|
||||
self.adminContext,
|
||||
self.router_to_schedule['id'],
|
||||
self.l3_agents[0])
|
||||
self.l3_agents[0].id)
|
||||
|
||||
# schedule:
|
||||
actual_scheduled_agent = self.scheduler.schedule(
|
||||
|
@ -785,9 +788,10 @@ class L3DVRSchedulerTestCase(L3DVRSchedulerBaseTest):
|
|||
|
||||
def _test_auto_schedule_routers(self):
|
||||
if self.router_already_hosted:
|
||||
self.scheduler.bind_router(self.adminContext,
|
||||
self.scheduler.bind_router(self.l3_plugin,
|
||||
self.adminContext,
|
||||
self.router_to_schedule['id'],
|
||||
self.l3_agents[0])
|
||||
self.l3_agents[0].id)
|
||||
# schedule:
|
||||
hosting_before = self.l3_plugin.get_l3_agents_hosting_routers(
|
||||
self.adminContext, [self.router_to_schedule['id']])
|
||||
|
|
|
@ -254,9 +254,11 @@ class L3SchedulerBaseTestCase(base.BaseTestCase):
|
|||
|
||||
def test__bind_routers_centralized(self):
|
||||
routers = [{'id': 'foo_router'}]
|
||||
agent = agent_model.Agent(id='foo_agent')
|
||||
with mock.patch.object(self.scheduler, 'bind_router') as mock_bind:
|
||||
self.scheduler._bind_routers(mock.ANY, mock.ANY, routers, mock.ANY)
|
||||
mock_bind.assert_called_once_with(mock.ANY, 'foo_router', mock.ANY)
|
||||
self.scheduler._bind_routers(mock.ANY, mock.ANY, routers, agent)
|
||||
mock_bind.assert_called_once_with(mock.ANY, mock.ANY,
|
||||
'foo_router', agent.id)
|
||||
|
||||
def _test__bind_routers_ha(self, has_binding):
|
||||
routers = [{'id': 'foo_router', 'ha': True, 'tenant_id': '42'}]
|
||||
|
@ -577,7 +579,7 @@ class L3SchedulerTestBaseMixin(object):
|
|||
scheduler = l3_agent_scheduler.ChanceScheduler()
|
||||
|
||||
rid = router['router']['id']
|
||||
scheduler.bind_router(ctx, rid, agent)
|
||||
scheduler.bind_router(self.plugin, ctx, rid, agent.id)
|
||||
results = (session.query(db).filter_by(router_id=rid).all())
|
||||
self.assertGreater(len(results), 0)
|
||||
self.assertIn(agent.id, [bind.l3_agent_id for bind in results])
|
||||
|
@ -596,7 +598,8 @@ class L3SchedulerTestBaseMixin(object):
|
|||
scheduler = l3_agent_scheduler.ChanceScheduler()
|
||||
# checking that bind_router() is not throwing
|
||||
# when supplied with router_id of non-existing router
|
||||
scheduler.bind_router(self.adminContext, "dummyID", self.agent1)
|
||||
scheduler.bind_router(self.plugin, self.adminContext,
|
||||
"dummyID", self.agent_id1)
|
||||
|
||||
def test_bind_existing_router(self):
|
||||
router = self._make_router(self.fmt,
|
||||
|
@ -732,7 +735,8 @@ class L3SchedulerTestBaseMixin(object):
|
|||
name='r1')
|
||||
ctx = self.adminContext
|
||||
router_id = router['router']['id']
|
||||
self.plugin.router_scheduler.bind_router(ctx, router_id, agent)
|
||||
self.plugin.router_scheduler.bind_router(self.plugin, ctx,
|
||||
router_id, agent.id)
|
||||
agents = self.plugin.get_l3_agents_hosting_routers(ctx,
|
||||
[router_id])
|
||||
self.assertEqual([agent.id], [agt.id for agt in agents])
|
||||
|
@ -1490,8 +1494,7 @@ class L3HATestCaseMixin(testlib_api.SqlTestCase,
|
|||
instance.router_id = 'nonexistent_router'
|
||||
return orig_fn(s, instance)
|
||||
|
||||
with mock.patch.object(self.plugin.router_scheduler,
|
||||
'bind_router') as bind_router:
|
||||
with mock.patch.object(self.plugin.router_scheduler, 'bind_router'):
|
||||
with mock.patch.object(
|
||||
orm.Session, 'add',
|
||||
side_effect=db_ref_err_for_add_haportbinding,
|
||||
|
@ -1499,21 +1502,30 @@ class L3HATestCaseMixin(testlib_api.SqlTestCase,
|
|||
self.plugin.router_scheduler.create_ha_port_and_bind(
|
||||
self.plugin, self.adminContext,
|
||||
router['id'], router['tenant_id'], agent)
|
||||
self.assertFalse(bind_router.called)
|
||||
|
||||
def test_create_ha_port_and_bind_catch_router_not_found(self):
|
||||
router = self._create_ha_router(tenant_id='foo_tenant')
|
||||
agent = {'id': 'foo_agent'}
|
||||
|
||||
with mock.patch.object(self.plugin.router_scheduler,
|
||||
'bind_router') as bind_router:
|
||||
with mock.patch.object(self.plugin.router_scheduler, 'bind_router'):
|
||||
with mock.patch.object(
|
||||
self.plugin, 'add_ha_port',
|
||||
side_effect=l3.RouterNotFound(router_id='foo_router')):
|
||||
self.plugin.router_scheduler.create_ha_port_and_bind(
|
||||
self.plugin, self.adminContext,
|
||||
router['id'], router['tenant_id'], agent)
|
||||
self.assertFalse(bind_router.called)
|
||||
|
||||
def test_create_ha_port_and_bind_bind_router_returns_None(self):
|
||||
router = self._create_ha_router(tenant_id='foo_tenant')
|
||||
agent = {'id': 'foo_agent'}
|
||||
|
||||
with mock.patch.object(self.plugin.router_scheduler, 'bind_router',
|
||||
return_value=None):
|
||||
with mock.patch.object(self.plugin, 'add_ha_port') as add_ha_port:
|
||||
self.plugin.router_scheduler.create_ha_port_and_bind(
|
||||
self.plugin, self.adminContext,
|
||||
router['id'], router['tenant_id'], agent)
|
||||
self.assertFalse(add_ha_port.called)
|
||||
|
||||
|
||||
class VacantBindingIndexTestCase(L3HATestCaseMixin):
|
||||
|
@ -1763,12 +1775,14 @@ class L3AgentSchedulerDbMixinTestCase(L3HATestCaseMixin):
|
|||
|
||||
def test_bind_router_twice_for_non_ha(self):
|
||||
router = self._create_ha_router(ha=False)
|
||||
self.plugin.router_scheduler.bind_router(self.adminContext,
|
||||
self.plugin.router_scheduler.bind_router(self.plugin,
|
||||
self.adminContext,
|
||||
router['id'],
|
||||
self.agent1)
|
||||
self.plugin.router_scheduler.bind_router(self.adminContext,
|
||||
self.agent_id1)
|
||||
self.plugin.router_scheduler.bind_router(self.plugin,
|
||||
self.adminContext,
|
||||
router['id'],
|
||||
self.agent2)
|
||||
self.agent_id2)
|
||||
|
||||
# Make sure the second bind_router call didn't schedule the router to
|
||||
# more agents than allowed.
|
||||
|
|
Loading…
Reference in New Issue