diff --git a/neutron_dynamic_routing/db/bgp_dragentscheduler_db.py b/neutron_dynamic_routing/db/bgp_dragentscheduler_db.py index 455e1c82..15c9d567 100644 --- a/neutron_dynamic_routing/db/bgp_dragentscheduler_db.py +++ b/neutron_dynamic_routing/db/bgp_dragentscheduler_db.py @@ -66,6 +66,24 @@ class BgpDrAgentSchedulerDbMixin(bgp_dras_ext.BgpDrSchedulerPluginBase, bgp_drscheduler = None + def add_periodic_dragent_status_check(self): + if self.bgp_drscheduler: + self.add_agent_status_check_worker( + self.remove_bgp_speaker_from_down_dragents) + self.add_agent_status_check_worker( + self.schedule_all_unscheduled_bgp_speakers) + else: + LOG.warning(_LW("Cannot schedule BgpSpeaker to DrAgent. " + "Reason: No scheduler registered.")) + + def schedule_all_unscheduled_bgp_speakers(self, context): + if self.bgp_drscheduler: + return self.bgp_drscheduler.schedule_all_unscheduled_bgp_speakers( + context) + else: + LOG.warning(_LW("Cannot schedule BgpSpeaker to DrAgent. " + "Reason: No scheduler registered.")) + def schedule_unscheduled_bgp_speakers(self, context, host): if self.bgp_drscheduler: return self.bgp_drscheduler.schedule_unscheduled_bgp_speakers( @@ -117,7 +135,43 @@ class BgpDrAgentSchedulerDbMixin(bgp_dras_ext.BgpDrSchedulerPluginBase, self._bgp_rpc.bgp_speaker_created(context, speaker_id, agent_db.host) - def remove_bgp_speaker_from_dragent(self, context, agent_id, speaker_id): + def remove_bgp_speaker_from_down_dragents(self): + self.reschedule_resources_from_down_agents( + agent_type=bgp_consts.AGENT_TYPE_BGP_ROUTING, + get_down_bindings=self.get_down_bgp_speaker_bindings, + agent_id_attr='agent_id', + resource_id_attr='bgp_speaker_id', + resource_name='bgp_speaker', + reschedule_resource=self.reschedule_bgp_speaker, + rescheduling_failed=bgp_dras_ext.BgpSpeakerRescheduleError) + + def get_down_bgp_speaker_bindings(self, context, agent_dead_limit): + cutoff = self.get_cutoff_time(agent_dead_limit) + query = ( + context.session.query(BgpSpeakerDrAgentBinding). + join(agent_model.Agent). + filter(agent_model.Agent.heartbeat_timestamp < cutoff, + agent_model.Agent.admin_state_up)) + down_bindings = [b for b in query] + return down_bindings + + def reschedule_bgp_speaker(self, context, bgp_speaker_id): + dragent = self.get_dragents_hosting_bgp_speakers( + context, [bgp_speaker_id])[0] + bgp_speaker = self.get_bgp_speaker(context, bgp_speaker_id) + dragent_id = dragent.id + with db_api.context_manager.writer.using(context): + self._remove_bgp_speaker_from_dragent( + context, dragent_id, bgp_speaker_id) + self.schedule_bgp_speaker(context, bgp_speaker) + new_dragents = self.get_dragents_hosting_bgp_speakers( + context, [bgp_speaker_id]) + if new_dragents == [] or new_dragents[0].id == dragent.id: + raise bgp_dras_ext.BgpSpeakerRescheduleError( + bgp_speaker_id=bgp_speaker_id, + failure_reason="no eligible dr agent found") + + def _remove_bgp_speaker_from_dragent(self, context, agent_id, speaker_id): with db_api.context_manager.writer.using(context): agent_db = self._get_agent(context, agent_id) is_agent_bgp = (agent_db['agent_type'] == @@ -139,6 +193,9 @@ class BgpDrAgentSchedulerDbMixin(bgp_dras_ext.BgpDrSchedulerPluginBase, {'bgp_speaker_id': speaker_id, 'agent_id': agent_id}) + def remove_bgp_speaker_from_dragent(self, context, agent_id, speaker_id): + self._remove_bgp_speaker_from_dragent(context, agent_id, speaker_id) + agent_db = self._get_agent(context, agent_id) self._bgp_rpc.bgp_speaker_removed(context, speaker_id, agent_db.host) def get_dragents_hosting_bgp_speakers(self, context, bgp_speaker_ids, diff --git a/neutron_dynamic_routing/extensions/bgp_dragentscheduler.py b/neutron_dynamic_routing/extensions/bgp_dragentscheduler.py index bae4c65d..219e3b98 100644 --- a/neutron_dynamic_routing/extensions/bgp_dragentscheduler.py +++ b/neutron_dynamic_routing/extensions/bgp_dragentscheduler.py @@ -54,6 +54,11 @@ class DrAgentAssociationError(n_exc.Conflict): "to a BGP speaker.") +class BgpSpeakerRescheduleError(n_exc.Invalid): + message = _("Failed rescheduling %(bgp_speaker_id)s: " + "%(failure_reason)s.") + + class BgpDrSchedulerController(wsgi.Controller): """Schedule BgpSpeaker for a BgpDrAgent""" def get_plugin(self): diff --git a/neutron_dynamic_routing/services/bgp/bgp_plugin.py b/neutron_dynamic_routing/services/bgp/bgp_plugin.py index becadce8..2bdd8d28 100644 --- a/neutron_dynamic_routing/services/bgp/bgp_plugin.py +++ b/neutron_dynamic_routing/services/bgp/bgp_plugin.py @@ -52,6 +52,7 @@ class BgpPlugin(service_base.ServicePluginBase, cfg.CONF.bgp_drscheduler_driver) self._setup_rpc() self._register_callbacks() + self.add_periodic_dragent_status_check() def get_plugin_name(self): return PLUGIN_NAME diff --git a/neutron_dynamic_routing/services/bgp/scheduler/bgp_dragent_scheduler.py b/neutron_dynamic_routing/services/bgp/scheduler/bgp_dragent_scheduler.py index 1a956897..f26417de 100644 --- a/neutron_dynamic_routing/services/bgp/scheduler/bgp_dragent_scheduler.py +++ b/neutron_dynamic_routing/services/bgp/scheduler/bgp_dragent_scheduler.py @@ -132,6 +132,16 @@ class BgpDrAgentSchedulerBase(BgpDrAgentFilter): dr_resources.BGP_SPEAKER, events.AFTER_CREATE) + def schedule_all_unscheduled_bgp_speakers(self, context): + """Call schedule_unscheduled_bgp_speakers for all hosts. + """ + + with context.session.begin(subtransactions=True): + query = context.session.query(agent_model.Agent.host).distinct() + for agent in query: + self.schedule_unscheduled_bgp_speakers(context, agent[0]) + return True + def schedule_unscheduled_bgp_speakers(self, context, host): """Schedule unscheduled BgpSpeaker to a BgpDrAgent. """ diff --git a/neutron_dynamic_routing/tests/unit/services/bgp/scheduler/test_bgp_dragent_scheduler.py b/neutron_dynamic_routing/tests/unit/services/bgp/scheduler/test_bgp_dragent_scheduler.py index ecad7354..c0e087d1 100644 --- a/neutron_dynamic_routing/tests/unit/services/bgp/scheduler/test_bgp_dragent_scheduler.py +++ b/neutron_dynamic_routing/tests/unit/services/bgp/scheduler/test_bgp_dragent_scheduler.py @@ -66,12 +66,15 @@ class TestBgpDrAgentSchedulerBaseTestCase(testlib_api.SqlTestCase): 'networks': []}} cls._save_bgp_speaker(self.ctx, bgp_speaker_body, uuid=bgp_speaker_id) + def _get_dragent_bgp_speaker_bindings(self, bgp_speaker_id): + return self.ctx.session.query( + bgp_dras_db.BgpSpeakerDrAgentBinding).filter_by( + bgp_speaker_id=bgp_speaker_id).all() + def _test_schedule_bind_bgp_speaker(self, agents, bgp_speaker_id): scheduler = bgp_dras.ChanceScheduler() scheduler.resource_filter.bind(self.ctx, agents, bgp_speaker_id) - results = self.ctx.session.query( - bgp_dras_db.BgpSpeakerDrAgentBinding).filter_by( - bgp_speaker_id=bgp_speaker_id).all() + results = self._get_dragent_bgp_speaker_bindings(bgp_speaker_id) for result in results: self.assertEqual(bgp_speaker_id, result.bgp_speaker_id) @@ -289,3 +292,63 @@ class TestAutoScheduleBgpSpeakers(TestBgpDrAgentSchedulerBaseTestCase): hosted_agents = self.ctx.session.query( bgp_dras_db.BgpSpeakerDrAgentBinding).all() self.assertEqual(expected_hosted_agents, len(hosted_agents)) + + +class TestRescheduleBgpSpeaker(TestBgpDrAgentSchedulerBaseTestCase, + bgp_db.BgpDbMixin): + + def setUp(self): + super(TestRescheduleBgpSpeaker, self).setUp() + bgp_notify_p = mock.patch('neutron_dynamic_routing.api.rpc.' + 'agentnotifiers.bgp_dr_rpc_agent_api.' + 'BgpDrAgentNotifyApi') + bgp_notify_p.start() + rpc_conn_p = mock.patch('neutron.common.rpc.create_connection') + rpc_conn_p.start() + admin_ctx_p = mock.patch('neutron_lib.context.get_admin_context') + self.admin_ctx_m = admin_ctx_p.start() + self.admin_ctx_m.return_value = self.ctx + self.plugin = bgp_plugin.BgpPlugin() + self.scheduler = bgp_dras.ChanceScheduler() + self.host1 = 'host-a' + self.host2 = 'host-b' + + def _kill_bgp_dragent(self, hosts): + agents = [] + for host in hosts: + agents.append( + helpers.register_bgp_dragent(host=host, alive=False)) + return agents + + def _schedule_bind_bgp_speaker(self, agents, bgp_speaker_id): + self.scheduler.resource_filter.bind(self.ctx, agents, bgp_speaker_id) + return self._get_dragent_bgp_speaker_bindings(bgp_speaker_id) + + def test_reschedule_bgp_speaker_bound_to_down_dragent(self): + agents = self._create_and_set_agents_down([self.host1, self.host2]) + self._schedule_bind_bgp_speaker([agents[0]], self.bgp_speaker_id) + self._kill_bgp_dragent([self.host1]) + self.plugin.remove_bgp_speaker_from_down_dragents() + binds = self._get_dragent_bgp_speaker_bindings(self.bgp_speaker_id) + self.assertEqual(binds[0].agent_id, agents[1].id) + + def test_no_schedule_with_non_available_dragent(self): + agents = self._create_and_set_agents_down([self.host1, self.host2]) + self._schedule_bind_bgp_speaker([agents[0]], self.bgp_speaker_id) + self._kill_bgp_dragent([self.host1, self.host2]) + self.plugin.remove_bgp_speaker_from_down_dragents() + binds = self._get_dragent_bgp_speaker_bindings(self.bgp_speaker_id) + self.assertEqual(binds, []) + + def test_schedule_unbind_bgp_speaker(self): + agents = self._create_and_set_agents_down([self.host1, self.host2]) + self._schedule_bind_bgp_speaker([agents[0]], self.bgp_speaker_id) + self._kill_bgp_dragent([self.host1, self.host2]) + self.plugin.remove_bgp_speaker_from_down_dragents() + binds = self._get_dragent_bgp_speaker_bindings(self.bgp_speaker_id) + self.assertEqual(binds, []) + # schedule a unbind bgp speaker + agents = self._create_and_set_agents_down([self.host1]) + self.scheduler.schedule_all_unscheduled_bgp_speakers(self.ctx) + binds = self._get_dragent_bgp_speaker_bindings(self.bgp_speaker_id) + self.assertEqual(binds[0].agent_id, agents[0].id)