From 3b6bd917e4b968a47a5aacb7f590143fc83816d9 Mon Sep 17 00:00:00 2001 From: Eugene Nikanorov Date: Mon, 12 Oct 2015 13:59:01 +0400 Subject: [PATCH] Resync L3, DHCP and OVS/LB agents upon revival In big and busy clusters there could be a condition when rabbitmq clustering mechanism synchronizes queues and during this period agents connected to that instance of rabbitmq can't communicate with the server and server considers them dead moving resources away. After agent become active again, it needs to cleanup state entries and synchronize its state with neutron-server. The solution is to make agents aware of their state from neutron-server point of view. This is done by changing state reports from cast to call that would return agent's status. When agent was dead and becomes alive, it would receive special AGENT_REVIVED status indicating that it should refresh its local data which it would not do otherwise. Closes-Bug: #1505166 Change-Id: Id28248f4f75821fbacf46e2c44e40f27f59172a9 --- neutron/agent/dhcp/agent.py | 9 ++++++--- neutron/agent/l3/agent.py | 11 +++++++---- neutron/common/constants.py | 8 ++++++++ neutron/db/agents_db.py | 17 +++++++++++++++-- .../agent/linuxbridge_neutron_agent.py | 19 +++++++++++++++---- .../openvswitch/agent/ovs_neutron_agent.py | 17 ++++++++++++----- neutron/tests/unit/agent/dhcp/test_agent.py | 14 ++++++++++++++ neutron/tests/unit/agent/l3/test_agent.py | 17 +++++++++++++++-- .../agent/test_linuxbridge_neutron_agent.py | 7 +++++++ .../agent/test_ovs_neutron_agent.py | 16 ++++++++-------- 10 files changed, 107 insertions(+), 28 deletions(-) diff --git a/neutron/agent/dhcp/agent.py b/neutron/agent/dhcp/agent.py index dead17f4891..5fa5a784948 100644 --- a/neutron/agent/dhcp/agent.py +++ b/neutron/agent/dhcp/agent.py @@ -551,7 +551,6 @@ class DhcpAgentWithStateReport(DhcpAgent): 'start_flag': True, 'agent_type': constants.AGENT_TYPE_DHCP} report_interval = self.conf.AGENT.report_interval - self.use_call = True if report_interval: self.heartbeat = loopingcall.FixedIntervalLoopingCall( self._report_state) @@ -562,8 +561,12 @@ class DhcpAgentWithStateReport(DhcpAgent): self.agent_state.get('configurations').update( self.cache.get_state()) ctx = context.get_admin_context_without_session() - self.state_rpc.report_state(ctx, self.agent_state, self.use_call) - self.use_call = False + agent_status = self.state_rpc.report_state( + ctx, self.agent_state, True) + if agent_status == constants.AGENT_REVIVED: + LOG.info(_LI("Agent has just been revived. " + "Scheduling full sync")) + self.schedule_resync("Agent has just been revived") except AttributeError: # This means the server does not support report_state LOG.warn(_LW("Neutron server does not support state report." diff --git a/neutron/agent/l3/agent.py b/neutron/agent/l3/agent.py index 8191c5a814b..95d7e2a05a5 100644 --- a/neutron/agent/l3/agent.py +++ b/neutron/agent/l3/agent.py @@ -610,7 +610,6 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, class L3NATAgentWithStateReport(L3NATAgent): def __init__(self, host, conf=None): - self.use_call = True super(L3NATAgentWithStateReport, self).__init__(host=host, conf=conf) self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS) self.agent_state = { @@ -657,10 +656,14 @@ class L3NATAgentWithStateReport(L3NATAgent): configurations['interfaces'] = num_interfaces configurations['floating_ips'] = num_floating_ips try: - self.state_rpc.report_state(self.context, self.agent_state, - self.use_call) + agent_status = self.state_rpc.report_state(self.context, + self.agent_state, + True) + if agent_status == l3_constants.AGENT_REVIVED: + LOG.info(_LI('Agent has just been revived. ' + 'Doing a full sync.')) + self.fullsync = True self.agent_state.pop('start_flag', None) - self.use_call = False except AttributeError: # This means the server does not support report_state LOG.warn(_LW("Neutron server does not support state report." diff --git a/neutron/common/constants.py b/neutron/common/constants.py index a5f34ee9d2d..5ac94c8b4c1 100644 --- a/neutron/common/constants.py +++ b/neutron/common/constants.py @@ -200,3 +200,11 @@ ROUTER_MARK_MASK = "0xffff" # Time format ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%f' + +# Agent states as detected by server, used to reply on agent's state report +# agent has just been registered +AGENT_NEW = 'new' +# agent is alive +AGENT_ALIVE = 'alive' +# agent has just returned to alive after being dead +AGENT_REVIVED = 'revived' diff --git a/neutron/db/agents_db.py b/neutron/db/agents_db.py index 2a9aeb73453..69ecb13180e 100644 --- a/neutron/db/agents_db.py +++ b/neutron/db/agents_db.py @@ -299,6 +299,12 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin): 'delta': delta}) def _create_or_update_agent(self, context, agent_state): + """Registers new agent in the database or updates existing. + + Returns agent status from server point of view: alive, new or revived. + It could be used by agent to do some sync with the server if needed. + """ + status = constants.AGENT_ALIVE with context.session.begin(subtransactions=True): res_keys = ['agent_type', 'binary', 'host', 'topic'] res = dict((k, agent_state[k]) for k in res_keys) @@ -311,6 +317,8 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin): try: agent_db = self._get_agent_by_type_and_host( context, agent_state['agent_type'], agent_state['host']) + if not agent_db.is_active: + status = constants.AGENT_REVIVED res['heartbeat_timestamp'] = current_time if agent_state.get('start_flag'): res['started_at'] = current_time @@ -327,7 +335,9 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin): greenthread.sleep(0) context.session.add(agent_db) self._log_heartbeat(agent_state, agent_db, configurations_dict) + status = constants.AGENT_NEW greenthread.sleep(0) + return status def create_or_update_agent(self, context, agent): """Create or update agent according to report.""" @@ -367,7 +377,10 @@ class AgentExtRpcCallback(object): self.plugin = plugin def report_state(self, context, **kwargs): - """Report state from agent to server.""" + """Report state from agent to server. + + Returns - agent's status: AGENT_NEW, AGENT_REVIVED, AGENT_ALIVE + """ time = kwargs['time'] time = timeutils.parse_strtime(time) agent_state = kwargs['agent_state']['agent_state'] @@ -382,7 +395,7 @@ class AgentExtRpcCallback(object): return if not self.plugin: self.plugin = manager.NeutronManager.get_plugin() - self.plugin.create_or_update_agent(context, agent_state) + return self.plugin.create_or_update_agent(context, agent_state) def _check_clock_sync_on_agent_start(self, agent_state, agent_time): """Checks if the server and the agent times are in sync. diff --git a/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py b/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py index 89f819be2af..6a58a28cc07 100644 --- a/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py @@ -876,6 +876,8 @@ class LinuxBridgeNeutronAgentRPC(service.Service): # stores received port_updates for processing by the main loop self.updated_devices = set() + # flag to do a sync after revival + self.fullsync = False self.context = context.get_admin_context_without_session() self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN) self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN) @@ -897,8 +899,13 @@ class LinuxBridgeNeutronAgentRPC(service.Service): try: devices = len(self.br_mgr.get_tap_devices()) self.agent_state.get('configurations')['devices'] = devices - self.state_rpc.report_state(self.context, - self.agent_state) + agent_status = self.state_rpc.report_state(self.context, + self.agent_state, + True) + if agent_status == constants.AGENT_REVIVED: + LOG.info(_LI('Agent has just been revived. ' + 'Doing a full sync.')) + self.fullsync = True self.agent_state.pop('start_flag', None) except Exception: LOG.exception(_LE("Failed reporting state!")) @@ -1101,11 +1108,15 @@ class LinuxBridgeNeutronAgentRPC(service.Service): while True: start = time.time() - device_info = self.scan_devices(previous=device_info, sync=sync) + if self.fullsync: + sync = True + self.fullsync = False if sync: LOG.info(_LI("Agent out of sync with plugin!")) - sync = False + + device_info = self.scan_devices(previous=device_info, sync=sync) + sync = False if (self._device_info_has_changes(device_info) or self.sg_agent.firewall_refresh_needed()): diff --git a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py index 55846dae07f..5e00c4839fd 100644 --- a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py @@ -182,6 +182,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, super(OVSNeutronAgent, self).__init__() self.conf = conf or cfg.CONF + self.fullsync = True # init bridge classes with configured datapath type. self.br_int_cls, self.br_phys_cls, self.br_tun_cls = ( functools.partial(bridge_classes[b], @@ -192,7 +193,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.veth_mtu = veth_mtu self.available_local_vlans = set(moves.range(p_const.MIN_VLAN_TAG, p_const.MAX_VLAN_TAG)) - self.use_call = True self.tunnel_types = tunnel_types or [] self.l2_pop = l2_population # TODO(ethuleau): Change ARP responder so it's not dependent on the @@ -325,9 +325,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.dvr_agent.in_distributed_mode()) try: - self.state_rpc.report_state(self.context, - self.agent_state, - self.use_call) + agent_status = self.state_rpc.report_state(self.context, + self.agent_state, + True) + if agent_status == n_const.AGENT_REVIVED: + LOG.info(_LI('Agent has just been revived. ' + 'Doing a full sync.')) + self.fullsync = True self.use_call = False self.agent_state.pop('start_flag', None) except Exception: @@ -1646,7 +1650,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, if not polling_manager: polling_manager = polling.get_polling_manager( minimize_polling=False) - sync = True ports = set() updated_ports_copy = set() @@ -1655,6 +1658,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, ovs_restarted = False consecutive_resyncs = 0 while self._check_and_handle_signal(): + if self.fullsync: + LOG.info(_LI("rpc_loop doing a full sync.")) + sync = True + self.fullsync = False port_info = {} ancillary_port_info = {} start = time.time() diff --git a/neutron/tests/unit/agent/dhcp/test_agent.py b/neutron/tests/unit/agent/dhcp/test_agent.py index fb7355f149e..51bab94af6a 100644 --- a/neutron/tests/unit/agent/dhcp/test_agent.py +++ b/neutron/tests/unit/agent/dhcp/test_agent.py @@ -421,6 +421,20 @@ class TestDhcpAgent(base.BaseTestCase): dhcp.periodic_resync() spawn.assert_called_once_with(dhcp._periodic_resync_helper) + def test_report_state_revival_logic(self): + dhcp = dhcp_agent.DhcpAgentWithStateReport(HOSTNAME) + with mock.patch.object(dhcp.state_rpc, + 'report_state') as report_state,\ + mock.patch.object(dhcp, "run"): + report_state.return_value = const.AGENT_ALIVE + dhcp._report_state() + self.assertEqual(dhcp.needs_resync_reasons, {}) + + report_state.return_value = const.AGENT_REVIVED + dhcp._report_state() + self.assertEqual(dhcp.needs_resync_reasons[None], + ['Agent has just been revived']) + def test_periodic_resync_helper(self): with mock.patch.object(dhcp_agent.eventlet, 'sleep') as sleep: dhcp = dhcp_agent.DhcpAgent(HOSTNAME) diff --git a/neutron/tests/unit/agent/l3/test_agent.py b/neutron/tests/unit/agent/l3/test_agent.py index a3649d9ee6c..ed94606db1c 100644 --- a/neutron/tests/unit/agent/l3/test_agent.py +++ b/neutron/tests/unit/agent/l3/test_agent.py @@ -197,13 +197,26 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): conf=self.conf) self.assertTrue(agent.agent_state['start_flag']) - use_call_arg = agent.use_call agent.after_start() report_state.assert_called_once_with(agent.context, agent.agent_state, - use_call_arg) + True) self.assertIsNone(agent.agent_state.get('start_flag')) + def test_report_state_revival_logic(self): + with mock.patch.object(agent_rpc.PluginReportStateAPI, + 'report_state') as report_state: + agent = l3_agent.L3NATAgentWithStateReport(host=HOSTNAME, + conf=self.conf) + report_state.return_value = l3_constants.AGENT_REVIVED + agent._report_state() + self.assertTrue(agent.fullsync) + + agent.fullsync = False + report_state.return_value = l3_constants.AGENT_ALIVE + agent._report_state() + self.assertFalse(agent.fullsync) + def test_periodic_sync_routers_task_call_clean_stale_namespaces(self): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) self.plugin_api.get_routers.return_value = [] diff --git a/neutron/tests/unit/plugins/ml2/drivers/linuxbridge/agent/test_linuxbridge_neutron_agent.py b/neutron/tests/unit/plugins/ml2/drivers/linuxbridge/agent/test_linuxbridge_neutron_agent.py index d5eea954ee0..b75a53337bd 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/linuxbridge/agent/test_linuxbridge_neutron_agent.py +++ b/neutron/tests/unit/plugins/ml2/drivers/linuxbridge/agent/test_linuxbridge_neutron_agent.py @@ -365,6 +365,13 @@ class TestLinuxBridgeAgent(base.BaseTestCase): self.agent.stop() self.assertFalse(mock_set_rpc.called) + def test_report_state_revived(self): + with mock.patch.object(self.agent.state_rpc, + "report_state") as report_st: + report_st.return_value = constants.AGENT_REVIVED + self.agent._report_state() + self.assertTrue(self.agent.fullsync) + class TestLinuxBridgeManager(base.BaseTestCase): def setUp(self): diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py index 654b429430e..f7161e49049 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py @@ -145,9 +145,6 @@ class TestOvsNeutronAgent(object): return_value=[]): self.agent = self.mod_agent.OVSNeutronAgent(self._bridge_classes(), **kwargs) - # set back to true because initial report state will succeed due - # to mocked out RPC calls - self.agent.use_call = True self.agent.tun_br = self.br_tun_cls(br_name='br-tun') self.agent.sg_agent = mock.Mock() @@ -694,14 +691,13 @@ class TestOvsNeutronAgent(object): report_st.assert_called_with(self.agent.context, self.agent.agent_state, True) self.assertNotIn("start_flag", self.agent.agent_state) - self.assertFalse(self.agent.use_call) self.assertEqual( self.agent.agent_state["configurations"]["devices"], self.agent.int_br_device_count ) self.agent._report_state() report_st.assert_called_with(self.agent.context, - self.agent.agent_state, False) + self.agent.agent_state, True) def test_report_state_fail(self): with mock.patch.object(self.agent.state_rpc, @@ -714,6 +710,13 @@ class TestOvsNeutronAgent(object): report_st.assert_called_with(self.agent.context, self.agent.agent_state, True) + def test_report_state_revived(self): + with mock.patch.object(self.agent.state_rpc, + "report_state") as report_st: + report_st.return_value = n_const.AGENT_REVIVED + self.agent._report_state() + self.assertTrue(self.agent.fullsync) + def test_port_update(self): port = {"id": TEST_PORT_ID1, "network_id": TEST_NETWORK_ID1, @@ -1733,9 +1736,6 @@ class TestOvsDvrNeutronAgent(object): return_value=[]): self.agent = self.mod_agent.OVSNeutronAgent(self._bridge_classes(), **kwargs) - # set back to true because initial report state will succeed due - # to mocked out RPC calls - self.agent.use_call = True self.agent.tun_br = self.br_tun_cls(br_name='br-tun') self.agent.sg_agent = mock.Mock()