Merge "Reschedule a bgp speaker binded to a down dr-agent"
This commit is contained in:
commit
68fa2cee0d
|
@ -66,6 +66,24 @@ class BgpDrAgentSchedulerDbMixin(bgp_dras_ext.BgpDrSchedulerPluginBase,
|
||||||
|
|
||||||
bgp_drscheduler = None
|
bgp_drscheduler = None
|
||||||
|
|
||||||
|
def add_periodic_dragent_status_check(self):
|
||||||
|
if self.bgp_drscheduler:
|
||||||
|
self.add_agent_status_check_worker(
|
||||||
|
self.remove_bgp_speaker_from_down_dragents)
|
||||||
|
self.add_agent_status_check_worker(
|
||||||
|
self.schedule_all_unscheduled_bgp_speakers)
|
||||||
|
else:
|
||||||
|
LOG.warning(_LW("Cannot schedule BgpSpeaker to DrAgent. "
|
||||||
|
"Reason: No scheduler registered."))
|
||||||
|
|
||||||
|
def schedule_all_unscheduled_bgp_speakers(self, context):
|
||||||
|
if self.bgp_drscheduler:
|
||||||
|
return self.bgp_drscheduler.schedule_all_unscheduled_bgp_speakers(
|
||||||
|
context)
|
||||||
|
else:
|
||||||
|
LOG.warning(_LW("Cannot schedule BgpSpeaker to DrAgent. "
|
||||||
|
"Reason: No scheduler registered."))
|
||||||
|
|
||||||
def schedule_unscheduled_bgp_speakers(self, context, host):
|
def schedule_unscheduled_bgp_speakers(self, context, host):
|
||||||
if self.bgp_drscheduler:
|
if self.bgp_drscheduler:
|
||||||
return self.bgp_drscheduler.schedule_unscheduled_bgp_speakers(
|
return self.bgp_drscheduler.schedule_unscheduled_bgp_speakers(
|
||||||
|
@ -117,7 +135,43 @@ class BgpDrAgentSchedulerDbMixin(bgp_dras_ext.BgpDrSchedulerPluginBase,
|
||||||
|
|
||||||
self._bgp_rpc.bgp_speaker_created(context, speaker_id, agent_db.host)
|
self._bgp_rpc.bgp_speaker_created(context, speaker_id, agent_db.host)
|
||||||
|
|
||||||
def remove_bgp_speaker_from_dragent(self, context, agent_id, speaker_id):
|
def remove_bgp_speaker_from_down_dragents(self):
|
||||||
|
self.reschedule_resources_from_down_agents(
|
||||||
|
agent_type=bgp_consts.AGENT_TYPE_BGP_ROUTING,
|
||||||
|
get_down_bindings=self.get_down_bgp_speaker_bindings,
|
||||||
|
agent_id_attr='agent_id',
|
||||||
|
resource_id_attr='bgp_speaker_id',
|
||||||
|
resource_name='bgp_speaker',
|
||||||
|
reschedule_resource=self.reschedule_bgp_speaker,
|
||||||
|
rescheduling_failed=bgp_dras_ext.BgpSpeakerRescheduleError)
|
||||||
|
|
||||||
|
def get_down_bgp_speaker_bindings(self, context, agent_dead_limit):
|
||||||
|
cutoff = self.get_cutoff_time(agent_dead_limit)
|
||||||
|
query = (
|
||||||
|
context.session.query(BgpSpeakerDrAgentBinding).
|
||||||
|
join(agent_model.Agent).
|
||||||
|
filter(agent_model.Agent.heartbeat_timestamp < cutoff,
|
||||||
|
agent_model.Agent.admin_state_up))
|
||||||
|
down_bindings = [b for b in query]
|
||||||
|
return down_bindings
|
||||||
|
|
||||||
|
def reschedule_bgp_speaker(self, context, bgp_speaker_id):
|
||||||
|
dragent = self.get_dragents_hosting_bgp_speakers(
|
||||||
|
context, [bgp_speaker_id])[0]
|
||||||
|
bgp_speaker = self.get_bgp_speaker(context, bgp_speaker_id)
|
||||||
|
dragent_id = dragent.id
|
||||||
|
with db_api.context_manager.writer.using(context):
|
||||||
|
self._remove_bgp_speaker_from_dragent(
|
||||||
|
context, dragent_id, bgp_speaker_id)
|
||||||
|
self.schedule_bgp_speaker(context, bgp_speaker)
|
||||||
|
new_dragents = self.get_dragents_hosting_bgp_speakers(
|
||||||
|
context, [bgp_speaker_id])
|
||||||
|
if new_dragents == [] or new_dragents[0].id == dragent.id:
|
||||||
|
raise bgp_dras_ext.BgpSpeakerRescheduleError(
|
||||||
|
bgp_speaker_id=bgp_speaker_id,
|
||||||
|
failure_reason="no eligible dr agent found")
|
||||||
|
|
||||||
|
def _remove_bgp_speaker_from_dragent(self, context, agent_id, speaker_id):
|
||||||
with db_api.context_manager.writer.using(context):
|
with db_api.context_manager.writer.using(context):
|
||||||
agent_db = self._get_agent(context, agent_id)
|
agent_db = self._get_agent(context, agent_id)
|
||||||
is_agent_bgp = (agent_db['agent_type'] ==
|
is_agent_bgp = (agent_db['agent_type'] ==
|
||||||
|
@ -139,6 +193,9 @@ class BgpDrAgentSchedulerDbMixin(bgp_dras_ext.BgpDrSchedulerPluginBase,
|
||||||
{'bgp_speaker_id': speaker_id,
|
{'bgp_speaker_id': speaker_id,
|
||||||
'agent_id': agent_id})
|
'agent_id': agent_id})
|
||||||
|
|
||||||
|
def remove_bgp_speaker_from_dragent(self, context, agent_id, speaker_id):
|
||||||
|
self._remove_bgp_speaker_from_dragent(context, agent_id, speaker_id)
|
||||||
|
agent_db = self._get_agent(context, agent_id)
|
||||||
self._bgp_rpc.bgp_speaker_removed(context, speaker_id, agent_db.host)
|
self._bgp_rpc.bgp_speaker_removed(context, speaker_id, agent_db.host)
|
||||||
|
|
||||||
def get_dragents_hosting_bgp_speakers(self, context, bgp_speaker_ids,
|
def get_dragents_hosting_bgp_speakers(self, context, bgp_speaker_ids,
|
||||||
|
|
|
@ -54,6 +54,11 @@ class DrAgentAssociationError(n_exc.Conflict):
|
||||||
"to a BGP speaker.")
|
"to a BGP speaker.")
|
||||||
|
|
||||||
|
|
||||||
|
class BgpSpeakerRescheduleError(n_exc.Invalid):
|
||||||
|
message = _("Failed rescheduling %(bgp_speaker_id)s: "
|
||||||
|
"%(failure_reason)s.")
|
||||||
|
|
||||||
|
|
||||||
class BgpDrSchedulerController(wsgi.Controller):
|
class BgpDrSchedulerController(wsgi.Controller):
|
||||||
"""Schedule BgpSpeaker for a BgpDrAgent"""
|
"""Schedule BgpSpeaker for a BgpDrAgent"""
|
||||||
def get_plugin(self):
|
def get_plugin(self):
|
||||||
|
|
|
@ -52,6 +52,7 @@ class BgpPlugin(service_base.ServicePluginBase,
|
||||||
cfg.CONF.bgp_drscheduler_driver)
|
cfg.CONF.bgp_drscheduler_driver)
|
||||||
self._setup_rpc()
|
self._setup_rpc()
|
||||||
self._register_callbacks()
|
self._register_callbacks()
|
||||||
|
self.add_periodic_dragent_status_check()
|
||||||
|
|
||||||
def get_plugin_name(self):
|
def get_plugin_name(self):
|
||||||
return PLUGIN_NAME
|
return PLUGIN_NAME
|
||||||
|
|
|
@ -132,6 +132,16 @@ class BgpDrAgentSchedulerBase(BgpDrAgentFilter):
|
||||||
dr_resources.BGP_SPEAKER,
|
dr_resources.BGP_SPEAKER,
|
||||||
events.AFTER_CREATE)
|
events.AFTER_CREATE)
|
||||||
|
|
||||||
|
def schedule_all_unscheduled_bgp_speakers(self, context):
|
||||||
|
"""Call schedule_unscheduled_bgp_speakers for all hosts.
|
||||||
|
"""
|
||||||
|
|
||||||
|
with context.session.begin(subtransactions=True):
|
||||||
|
query = context.session.query(agent_model.Agent.host).distinct()
|
||||||
|
for agent in query:
|
||||||
|
self.schedule_unscheduled_bgp_speakers(context, agent[0])
|
||||||
|
return True
|
||||||
|
|
||||||
def schedule_unscheduled_bgp_speakers(self, context, host):
|
def schedule_unscheduled_bgp_speakers(self, context, host):
|
||||||
"""Schedule unscheduled BgpSpeaker to a BgpDrAgent.
|
"""Schedule unscheduled BgpSpeaker to a BgpDrAgent.
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -66,12 +66,15 @@ class TestBgpDrAgentSchedulerBaseTestCase(testlib_api.SqlTestCase):
|
||||||
'networks': []}}
|
'networks': []}}
|
||||||
cls._save_bgp_speaker(self.ctx, bgp_speaker_body, uuid=bgp_speaker_id)
|
cls._save_bgp_speaker(self.ctx, bgp_speaker_body, uuid=bgp_speaker_id)
|
||||||
|
|
||||||
|
def _get_dragent_bgp_speaker_bindings(self, bgp_speaker_id):
|
||||||
|
return self.ctx.session.query(
|
||||||
|
bgp_dras_db.BgpSpeakerDrAgentBinding).filter_by(
|
||||||
|
bgp_speaker_id=bgp_speaker_id).all()
|
||||||
|
|
||||||
def _test_schedule_bind_bgp_speaker(self, agents, bgp_speaker_id):
|
def _test_schedule_bind_bgp_speaker(self, agents, bgp_speaker_id):
|
||||||
scheduler = bgp_dras.ChanceScheduler()
|
scheduler = bgp_dras.ChanceScheduler()
|
||||||
scheduler.resource_filter.bind(self.ctx, agents, bgp_speaker_id)
|
scheduler.resource_filter.bind(self.ctx, agents, bgp_speaker_id)
|
||||||
results = self.ctx.session.query(
|
results = self._get_dragent_bgp_speaker_bindings(bgp_speaker_id)
|
||||||
bgp_dras_db.BgpSpeakerDrAgentBinding).filter_by(
|
|
||||||
bgp_speaker_id=bgp_speaker_id).all()
|
|
||||||
|
|
||||||
for result in results:
|
for result in results:
|
||||||
self.assertEqual(bgp_speaker_id, result.bgp_speaker_id)
|
self.assertEqual(bgp_speaker_id, result.bgp_speaker_id)
|
||||||
|
@ -289,3 +292,63 @@ class TestAutoScheduleBgpSpeakers(TestBgpDrAgentSchedulerBaseTestCase):
|
||||||
hosted_agents = self.ctx.session.query(
|
hosted_agents = self.ctx.session.query(
|
||||||
bgp_dras_db.BgpSpeakerDrAgentBinding).all()
|
bgp_dras_db.BgpSpeakerDrAgentBinding).all()
|
||||||
self.assertEqual(expected_hosted_agents, len(hosted_agents))
|
self.assertEqual(expected_hosted_agents, len(hosted_agents))
|
||||||
|
|
||||||
|
|
||||||
|
class TestRescheduleBgpSpeaker(TestBgpDrAgentSchedulerBaseTestCase,
|
||||||
|
bgp_db.BgpDbMixin):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestRescheduleBgpSpeaker, self).setUp()
|
||||||
|
bgp_notify_p = mock.patch('neutron_dynamic_routing.api.rpc.'
|
||||||
|
'agentnotifiers.bgp_dr_rpc_agent_api.'
|
||||||
|
'BgpDrAgentNotifyApi')
|
||||||
|
bgp_notify_p.start()
|
||||||
|
rpc_conn_p = mock.patch('neutron.common.rpc.create_connection')
|
||||||
|
rpc_conn_p.start()
|
||||||
|
admin_ctx_p = mock.patch('neutron_lib.context.get_admin_context')
|
||||||
|
self.admin_ctx_m = admin_ctx_p.start()
|
||||||
|
self.admin_ctx_m.return_value = self.ctx
|
||||||
|
self.plugin = bgp_plugin.BgpPlugin()
|
||||||
|
self.scheduler = bgp_dras.ChanceScheduler()
|
||||||
|
self.host1 = 'host-a'
|
||||||
|
self.host2 = 'host-b'
|
||||||
|
|
||||||
|
def _kill_bgp_dragent(self, hosts):
|
||||||
|
agents = []
|
||||||
|
for host in hosts:
|
||||||
|
agents.append(
|
||||||
|
helpers.register_bgp_dragent(host=host, alive=False))
|
||||||
|
return agents
|
||||||
|
|
||||||
|
def _schedule_bind_bgp_speaker(self, agents, bgp_speaker_id):
|
||||||
|
self.scheduler.resource_filter.bind(self.ctx, agents, bgp_speaker_id)
|
||||||
|
return self._get_dragent_bgp_speaker_bindings(bgp_speaker_id)
|
||||||
|
|
||||||
|
def test_reschedule_bgp_speaker_bound_to_down_dragent(self):
|
||||||
|
agents = self._create_and_set_agents_down([self.host1, self.host2])
|
||||||
|
self._schedule_bind_bgp_speaker([agents[0]], self.bgp_speaker_id)
|
||||||
|
self._kill_bgp_dragent([self.host1])
|
||||||
|
self.plugin.remove_bgp_speaker_from_down_dragents()
|
||||||
|
binds = self._get_dragent_bgp_speaker_bindings(self.bgp_speaker_id)
|
||||||
|
self.assertEqual(binds[0].agent_id, agents[1].id)
|
||||||
|
|
||||||
|
def test_no_schedule_with_non_available_dragent(self):
|
||||||
|
agents = self._create_and_set_agents_down([self.host1, self.host2])
|
||||||
|
self._schedule_bind_bgp_speaker([agents[0]], self.bgp_speaker_id)
|
||||||
|
self._kill_bgp_dragent([self.host1, self.host2])
|
||||||
|
self.plugin.remove_bgp_speaker_from_down_dragents()
|
||||||
|
binds = self._get_dragent_bgp_speaker_bindings(self.bgp_speaker_id)
|
||||||
|
self.assertEqual(binds, [])
|
||||||
|
|
||||||
|
def test_schedule_unbind_bgp_speaker(self):
|
||||||
|
agents = self._create_and_set_agents_down([self.host1, self.host2])
|
||||||
|
self._schedule_bind_bgp_speaker([agents[0]], self.bgp_speaker_id)
|
||||||
|
self._kill_bgp_dragent([self.host1, self.host2])
|
||||||
|
self.plugin.remove_bgp_speaker_from_down_dragents()
|
||||||
|
binds = self._get_dragent_bgp_speaker_bindings(self.bgp_speaker_id)
|
||||||
|
self.assertEqual(binds, [])
|
||||||
|
# schedule a unbind bgp speaker
|
||||||
|
agents = self._create_and_set_agents_down([self.host1])
|
||||||
|
self.scheduler.schedule_all_unscheduled_bgp_speakers(self.ctx)
|
||||||
|
binds = self._get_dragent_bgp_speaker_bindings(self.bgp_speaker_id)
|
||||||
|
self.assertEqual(binds[0].agent_id, agents[0].id)
|
||||||
|
|
Loading…
Reference in New Issue