From f0606dbb7fa4024e9f25b9b6c73e33897b5087f9 Mon Sep 17 00:00:00 2001 From: Brian Haley Date: Tue, 25 Feb 2020 15:01:47 -0500 Subject: [PATCH] Prioritize port create and update ready messages The DHCP agent prioritizes RPC messages based on the priority field send from neutron-server, but then groups them all in the same dhcp_ready_ports set when sending them back to the server to clear the provisioning block(s). Priority should be given to new and changed ports, since those are most likely to be associated with new instances which can fail to boot if they are not handled quickly when the agent is very busy, for example, right after it was restarted. Conflicts: neutron/tests/unit/agent/dhcp/test_agent.py Change-Id: Ib5074abadd7189bb4bdd5e46c677f1bfb071221e Closes-bug: #1864675 (cherry picked from commit 113dfac6083c2bda36f5186e123e4e5c82fa8097) (cherry picked from commit 18c7e3ead7e385b83c516cbe4558ddad60fbd54e) --- neutron/agent/dhcp/agent.py | 37 ++++++++----- neutron/tests/unit/agent/dhcp/test_agent.py | 58 +++++++++++++++++++-- 2 files changed, 79 insertions(+), 16 deletions(-) diff --git a/neutron/agent/dhcp/agent.py b/neutron/agent/dhcp/agent.py index 433e833a8ad..956e6eabdb5 100644 --- a/neutron/agent/dhcp/agent.py +++ b/neutron/agent/dhcp/agent.py @@ -83,6 +83,7 @@ class DhcpAgent(manager.Manager): super(DhcpAgent, self).__init__(host=host) self.needs_resync_reasons = collections.defaultdict(list) self.dhcp_ready_ports = set() + self.dhcp_prio_ready_ports = set() self.conf = conf or cfg.CONF self.cache = NetworkCache() self.dhcp_driver_cls = importutils.import_class(self.conf.dhcp_driver) @@ -221,23 +222,29 @@ class DhcpAgent(manager.Manager): def _dhcp_ready_ports_loop(self): """Notifies the server of any ports that had reservations setup.""" while True: - # this is just watching a set so we can do it really frequently + # this is just watching sets so we can do it really frequently eventlet.sleep(0.1) - if self.dhcp_ready_ports: - ports_to_send = set() - for port_count in range(min(len(self.dhcp_ready_ports), - DHCP_READY_PORTS_SYNC_MAX)): - ports_to_send.add(self.dhcp_ready_ports.pop()) - + prio_ports_to_send = set() + ports_to_send = set() + for port_count in range(min(len(self.dhcp_prio_ready_ports) + + len(self.dhcp_ready_ports), + DHCP_READY_PORTS_SYNC_MAX)): + if self.dhcp_prio_ready_ports: + prio_ports_to_send.add(self.dhcp_prio_ready_ports.pop()) + continue + ports_to_send.add(self.dhcp_ready_ports.pop()) + if prio_ports_to_send or ports_to_send: try: - self.plugin_rpc.dhcp_ready_on_ports(ports_to_send) + self.plugin_rpc.dhcp_ready_on_ports(prio_ports_to_send | + ports_to_send) LOG.info("DHCP configuration for ports %s is completed", - ports_to_send) + prio_ports_to_send | ports_to_send) continue except Exception: LOG.exception("Failure notifying DHCP server of " "ready DHCP ports. Will retry on next " "iteration.") + self.dhcp_prio_ready_ports |= prio_ports_to_send self.dhcp_ready_ports |= ports_to_send def start_ready_ports_loop(self): @@ -493,9 +500,10 @@ class DhcpAgent(manager.Manager): network = self.cache.get_network_by_id(updated_port.network_id) if not network: return - self.reload_allocations(updated_port, network) + # treat update as a create event + self.reload_allocations(updated_port, network, prio=True) - def reload_allocations(self, port, network): + def reload_allocations(self, port, network, prio=False): LOG.info("Trigger reload_allocations for port %s", port) driver_action = 'reload_allocations' if self._is_port_on_this_agent(port): @@ -521,7 +529,10 @@ class DhcpAgent(manager.Manager): driver_action = 'restart' self.cache.put_port(port) self.call_driver(driver_action, network) - self.dhcp_ready_ports.add(port.id) + if prio: + self.dhcp_prio_ready_ports.add(port.id) + else: + self.dhcp_ready_ports.add(port.id) self.update_isolated_metadata_proxy(network) def _is_port_on_this_agent(self, port): @@ -558,7 +569,7 @@ class DhcpAgent(manager.Manager): "DHCP cache is out of sync", created_port.network_id) return - self.reload_allocations(created_port, network) + self.reload_allocations(created_port, network, prio=True) def port_delete_end(self, context, payload): """Handle the port.delete.end notification event.""" diff --git a/neutron/tests/unit/agent/dhcp/test_agent.py b/neutron/tests/unit/agent/dhcp/test_agent.py index 55edf14aaae..c5fbbe4f804 100644 --- a/neutron/tests/unit/agent/dhcp/test_agent.py +++ b/neutron/tests/unit/agent/dhcp/test_agent.py @@ -470,6 +470,55 @@ class TestDhcpAgent(base.BaseTestCase): ready.call_args_list[1][0][0]) self.assertEqual(set(range(port_count)), ports_ready) + def test_dhcp_ready_ports_loop_with_limit_ports_per_call_prio(self): + dhcp = dhcp_agent.DhcpAgent(HOSTNAME) + sync_max = dhcp_agent.DHCP_READY_PORTS_SYNC_MAX + port_count = 4 + # port set ranges must be unique to differentiate results + dhcp.dhcp_prio_ready_ports = set(range(sync_max)) + dhcp.dhcp_ready_ports = set(range(sync_max, sync_max + port_count)) + + with mock.patch.object(dhcp.plugin_rpc, + 'dhcp_ready_on_ports') as ready: + # exit after 1 iteration + with mock.patch.object(dhcp_agent.eventlet, 'sleep', + side_effect=[0, RuntimeError]): + with testtools.ExpectedException(RuntimeError): + dhcp._dhcp_ready_ports_loop() + + # only priority ports should have been processed + self.assertEqual(set(), dhcp.dhcp_prio_ready_ports) + self.assertEqual(set(range(sync_max, sync_max + port_count)), + dhcp.dhcp_ready_ports) + # one call is expected, with DHCP_READY_PORTS_SYNC_MAX ports + self.assertEqual(1, ready.call_count) + self.assertEqual(sync_max, len(ready.call_args_list[0][0][0])) + # priority ports need to be ready + ports_ready = ready.call_args_list[0][0][0] + self.assertEqual(set(range(sync_max)), ports_ready) + + # add some priority ports, to make sure they are processed + dhcp.dhcp_prio_ready_ports = set(range(port_count)) + with mock.patch.object(dhcp.plugin_rpc, + 'dhcp_ready_on_ports') as ready: + # exit after 1 iteration + with mock.patch.object(dhcp_agent.eventlet, 'sleep', + side_effect=[0, RuntimeError]): + with testtools.ExpectedException(RuntimeError): + dhcp._dhcp_ready_ports_loop() + + # all ports should have been processed + self.assertEqual(set(), dhcp.dhcp_prio_ready_ports) + self.assertEqual(set(), dhcp.dhcp_ready_ports) + # one call is expected, with (port_count * 2) ports + self.assertEqual(1, ready.call_count) + self.assertEqual(port_count * 2, len(ready.call_args_list[0][0][0])) + # all ports need to be ready + ports_ready = ready.call_args_list[0][0][0] + all_ports = (set(range(port_count)) | + set(range(sync_max, sync_max + port_count))) + self.assertEqual(all_ports, ports_ready) + def test_dhcp_ready_ports_updates_after_enable_dhcp(self): dhcp = dhcp_agent.DhcpAgent(HOSTNAME) self.assertEqual(set(), dhcp.dhcp_ready_ports) @@ -1125,7 +1174,8 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): self.dhcp.port_update_end(None, payload) self.dhcp._process_resource_update() self.reload_allocations.assert_called_once_with(fake_port2, - fake_network) + fake_network, + prio=True) def test_reload_allocations(self): self.cache.get_port_by_id.return_value = fake_port2 @@ -1146,7 +1196,8 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): self.dhcp.port_create_end(None, payload) self.dhcp._process_resource_update() self.reload_allocations.assert_called_once_with(fake_port2, - fake_network) + fake_network, + prio=True) def test_port_create_end_no_resync_if_same_port_already_in_cache(self): self.reload_allocations_p = mock.patch.object(self.dhcp, @@ -1160,7 +1211,8 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): self.dhcp.port_create_end(None, payload) self.dhcp._process_resource_update() self.reload_allocations.assert_called_once_with(fake_port2, - new_fake_network) + new_fake_network, + prio=True) self.schedule_resync.assert_not_called() def test_port_update_change_ip_on_port(self):