Reschedule a bgp speaker binded to a down dr-agent
If a bgp speaker is binded to a down dr-agent, unbind and reschedule it. If there is an unbinded bgp speaker, schedule it. Change-Id: If5de81f1ca9b7781f48fd9bea84848f4261ccbe6
This commit is contained in:
parent
fb06d1d91f
commit
0e7e93051e
|
@ -66,6 +66,24 @@ class BgpDrAgentSchedulerDbMixin(bgp_dras_ext.BgpDrSchedulerPluginBase,
|
|||
|
||||
bgp_drscheduler = None
|
||||
|
||||
def add_periodic_dragent_status_check(self):
|
||||
if self.bgp_drscheduler:
|
||||
self.add_agent_status_check_worker(
|
||||
self.remove_bgp_speaker_from_down_dragents)
|
||||
self.add_agent_status_check_worker(
|
||||
self.schedule_all_unscheduled_bgp_speakers)
|
||||
else:
|
||||
LOG.warning(_LW("Cannot schedule BgpSpeaker to DrAgent. "
|
||||
"Reason: No scheduler registered."))
|
||||
|
||||
def schedule_all_unscheduled_bgp_speakers(self, context):
|
||||
if self.bgp_drscheduler:
|
||||
return self.bgp_drscheduler.schedule_all_unscheduled_bgp_speakers(
|
||||
context)
|
||||
else:
|
||||
LOG.warning(_LW("Cannot schedule BgpSpeaker to DrAgent. "
|
||||
"Reason: No scheduler registered."))
|
||||
|
||||
def schedule_unscheduled_bgp_speakers(self, context, host):
|
||||
if self.bgp_drscheduler:
|
||||
return self.bgp_drscheduler.schedule_unscheduled_bgp_speakers(
|
||||
|
@ -117,7 +135,43 @@ class BgpDrAgentSchedulerDbMixin(bgp_dras_ext.BgpDrSchedulerPluginBase,
|
|||
|
||||
self._bgp_rpc.bgp_speaker_created(context, speaker_id, agent_db.host)
|
||||
|
||||
def remove_bgp_speaker_from_dragent(self, context, agent_id, speaker_id):
|
||||
def remove_bgp_speaker_from_down_dragents(self):
|
||||
self.reschedule_resources_from_down_agents(
|
||||
agent_type=bgp_consts.AGENT_TYPE_BGP_ROUTING,
|
||||
get_down_bindings=self.get_down_bgp_speaker_bindings,
|
||||
agent_id_attr='agent_id',
|
||||
resource_id_attr='bgp_speaker_id',
|
||||
resource_name='bgp_speaker',
|
||||
reschedule_resource=self.reschedule_bgp_speaker,
|
||||
rescheduling_failed=bgp_dras_ext.BgpSpeakerRescheduleError)
|
||||
|
||||
def get_down_bgp_speaker_bindings(self, context, agent_dead_limit):
|
||||
cutoff = self.get_cutoff_time(agent_dead_limit)
|
||||
query = (
|
||||
context.session.query(BgpSpeakerDrAgentBinding).
|
||||
join(agent_model.Agent).
|
||||
filter(agent_model.Agent.heartbeat_timestamp < cutoff,
|
||||
agent_model.Agent.admin_state_up))
|
||||
down_bindings = [b for b in query]
|
||||
return down_bindings
|
||||
|
||||
def reschedule_bgp_speaker(self, context, bgp_speaker_id):
|
||||
dragent = self.get_dragents_hosting_bgp_speakers(
|
||||
context, [bgp_speaker_id])[0]
|
||||
bgp_speaker = self.get_bgp_speaker(context, bgp_speaker_id)
|
||||
dragent_id = dragent.id
|
||||
with db_api.context_manager.writer.using(context):
|
||||
self._remove_bgp_speaker_from_dragent(
|
||||
context, dragent_id, bgp_speaker_id)
|
||||
self.schedule_bgp_speaker(context, bgp_speaker)
|
||||
new_dragents = self.get_dragents_hosting_bgp_speakers(
|
||||
context, [bgp_speaker_id])
|
||||
if new_dragents == [] or new_dragents[0].id == dragent.id:
|
||||
raise bgp_dras_ext.BgpSpeakerRescheduleError(
|
||||
bgp_speaker_id=bgp_speaker_id,
|
||||
failure_reason="no eligible dr agent found")
|
||||
|
||||
def _remove_bgp_speaker_from_dragent(self, context, agent_id, speaker_id):
|
||||
with db_api.context_manager.writer.using(context):
|
||||
agent_db = self._get_agent(context, agent_id)
|
||||
is_agent_bgp = (agent_db['agent_type'] ==
|
||||
|
@ -139,6 +193,9 @@ class BgpDrAgentSchedulerDbMixin(bgp_dras_ext.BgpDrSchedulerPluginBase,
|
|||
{'bgp_speaker_id': speaker_id,
|
||||
'agent_id': agent_id})
|
||||
|
||||
def remove_bgp_speaker_from_dragent(self, context, agent_id, speaker_id):
|
||||
self._remove_bgp_speaker_from_dragent(context, agent_id, speaker_id)
|
||||
agent_db = self._get_agent(context, agent_id)
|
||||
self._bgp_rpc.bgp_speaker_removed(context, speaker_id, agent_db.host)
|
||||
|
||||
def get_dragents_hosting_bgp_speakers(self, context, bgp_speaker_ids,
|
||||
|
|
|
@ -54,6 +54,11 @@ class DrAgentAssociationError(n_exc.Conflict):
|
|||
"to a BGP speaker.")
|
||||
|
||||
|
||||
class BgpSpeakerRescheduleError(n_exc.Invalid):
|
||||
message = _("Failed rescheduling %(bgp_speaker_id)s: "
|
||||
"%(failure_reason)s.")
|
||||
|
||||
|
||||
class BgpDrSchedulerController(wsgi.Controller):
|
||||
"""Schedule BgpSpeaker for a BgpDrAgent"""
|
||||
def get_plugin(self):
|
||||
|
|
|
@ -52,6 +52,7 @@ class BgpPlugin(service_base.ServicePluginBase,
|
|||
cfg.CONF.bgp_drscheduler_driver)
|
||||
self._setup_rpc()
|
||||
self._register_callbacks()
|
||||
self.add_periodic_dragent_status_check()
|
||||
|
||||
def get_plugin_name(self):
|
||||
return PLUGIN_NAME
|
||||
|
|
|
@ -132,6 +132,16 @@ class BgpDrAgentSchedulerBase(BgpDrAgentFilter):
|
|||
dr_resources.BGP_SPEAKER,
|
||||
events.AFTER_CREATE)
|
||||
|
||||
def schedule_all_unscheduled_bgp_speakers(self, context):
|
||||
"""Call schedule_unscheduled_bgp_speakers for all hosts.
|
||||
"""
|
||||
|
||||
with context.session.begin(subtransactions=True):
|
||||
query = context.session.query(agent_model.Agent.host).distinct()
|
||||
for agent in query:
|
||||
self.schedule_unscheduled_bgp_speakers(context, agent[0])
|
||||
return True
|
||||
|
||||
def schedule_unscheduled_bgp_speakers(self, context, host):
|
||||
"""Schedule unscheduled BgpSpeaker to a BgpDrAgent.
|
||||
"""
|
||||
|
|
|
@ -66,12 +66,15 @@ class TestBgpDrAgentSchedulerBaseTestCase(testlib_api.SqlTestCase):
|
|||
'networks': []}}
|
||||
cls._save_bgp_speaker(self.ctx, bgp_speaker_body, uuid=bgp_speaker_id)
|
||||
|
||||
def _get_dragent_bgp_speaker_bindings(self, bgp_speaker_id):
|
||||
return self.ctx.session.query(
|
||||
bgp_dras_db.BgpSpeakerDrAgentBinding).filter_by(
|
||||
bgp_speaker_id=bgp_speaker_id).all()
|
||||
|
||||
def _test_schedule_bind_bgp_speaker(self, agents, bgp_speaker_id):
|
||||
scheduler = bgp_dras.ChanceScheduler()
|
||||
scheduler.resource_filter.bind(self.ctx, agents, bgp_speaker_id)
|
||||
results = self.ctx.session.query(
|
||||
bgp_dras_db.BgpSpeakerDrAgentBinding).filter_by(
|
||||
bgp_speaker_id=bgp_speaker_id).all()
|
||||
results = self._get_dragent_bgp_speaker_bindings(bgp_speaker_id)
|
||||
|
||||
for result in results:
|
||||
self.assertEqual(bgp_speaker_id, result.bgp_speaker_id)
|
||||
|
@ -289,3 +292,63 @@ class TestAutoScheduleBgpSpeakers(TestBgpDrAgentSchedulerBaseTestCase):
|
|||
hosted_agents = self.ctx.session.query(
|
||||
bgp_dras_db.BgpSpeakerDrAgentBinding).all()
|
||||
self.assertEqual(expected_hosted_agents, len(hosted_agents))
|
||||
|
||||
|
||||
class TestRescheduleBgpSpeaker(TestBgpDrAgentSchedulerBaseTestCase,
|
||||
bgp_db.BgpDbMixin):
|
||||
|
||||
def setUp(self):
|
||||
super(TestRescheduleBgpSpeaker, self).setUp()
|
||||
bgp_notify_p = mock.patch('neutron_dynamic_routing.api.rpc.'
|
||||
'agentnotifiers.bgp_dr_rpc_agent_api.'
|
||||
'BgpDrAgentNotifyApi')
|
||||
bgp_notify_p.start()
|
||||
rpc_conn_p = mock.patch('neutron.common.rpc.create_connection')
|
||||
rpc_conn_p.start()
|
||||
admin_ctx_p = mock.patch('neutron_lib.context.get_admin_context')
|
||||
self.admin_ctx_m = admin_ctx_p.start()
|
||||
self.admin_ctx_m.return_value = self.ctx
|
||||
self.plugin = bgp_plugin.BgpPlugin()
|
||||
self.scheduler = bgp_dras.ChanceScheduler()
|
||||
self.host1 = 'host-a'
|
||||
self.host2 = 'host-b'
|
||||
|
||||
def _kill_bgp_dragent(self, hosts):
|
||||
agents = []
|
||||
for host in hosts:
|
||||
agents.append(
|
||||
helpers.register_bgp_dragent(host=host, alive=False))
|
||||
return agents
|
||||
|
||||
def _schedule_bind_bgp_speaker(self, agents, bgp_speaker_id):
|
||||
self.scheduler.resource_filter.bind(self.ctx, agents, bgp_speaker_id)
|
||||
return self._get_dragent_bgp_speaker_bindings(bgp_speaker_id)
|
||||
|
||||
def test_reschedule_bgp_speaker_bound_to_down_dragent(self):
|
||||
agents = self._create_and_set_agents_down([self.host1, self.host2])
|
||||
self._schedule_bind_bgp_speaker([agents[0]], self.bgp_speaker_id)
|
||||
self._kill_bgp_dragent([self.host1])
|
||||
self.plugin.remove_bgp_speaker_from_down_dragents()
|
||||
binds = self._get_dragent_bgp_speaker_bindings(self.bgp_speaker_id)
|
||||
self.assertEqual(binds[0].agent_id, agents[1].id)
|
||||
|
||||
def test_no_schedule_with_non_available_dragent(self):
|
||||
agents = self._create_and_set_agents_down([self.host1, self.host2])
|
||||
self._schedule_bind_bgp_speaker([agents[0]], self.bgp_speaker_id)
|
||||
self._kill_bgp_dragent([self.host1, self.host2])
|
||||
self.plugin.remove_bgp_speaker_from_down_dragents()
|
||||
binds = self._get_dragent_bgp_speaker_bindings(self.bgp_speaker_id)
|
||||
self.assertEqual(binds, [])
|
||||
|
||||
def test_schedule_unbind_bgp_speaker(self):
|
||||
agents = self._create_and_set_agents_down([self.host1, self.host2])
|
||||
self._schedule_bind_bgp_speaker([agents[0]], self.bgp_speaker_id)
|
||||
self._kill_bgp_dragent([self.host1, self.host2])
|
||||
self.plugin.remove_bgp_speaker_from_down_dragents()
|
||||
binds = self._get_dragent_bgp_speaker_bindings(self.bgp_speaker_id)
|
||||
self.assertEqual(binds, [])
|
||||
# schedule a unbind bgp speaker
|
||||
agents = self._create_and_set_agents_down([self.host1])
|
||||
self.scheduler.schedule_all_unscheduled_bgp_speakers(self.ctx)
|
||||
binds = self._get_dragent_bgp_speaker_bindings(self.bgp_speaker_id)
|
||||
self.assertEqual(binds[0].agent_id, agents[0].id)
|
||||
|
|
Loading…
Reference in New Issue