Merge "Prioritize port create and update ready messages" into stable/rocky
This commit is contained in:
commit
941e6f0513
|
@ -83,6 +83,7 @@ class DhcpAgent(manager.Manager):
|
||||||
super(DhcpAgent, self).__init__(host=host)
|
super(DhcpAgent, self).__init__(host=host)
|
||||||
self.needs_resync_reasons = collections.defaultdict(list)
|
self.needs_resync_reasons = collections.defaultdict(list)
|
||||||
self.dhcp_ready_ports = set()
|
self.dhcp_ready_ports = set()
|
||||||
|
self.dhcp_prio_ready_ports = set()
|
||||||
self.conf = conf or cfg.CONF
|
self.conf = conf or cfg.CONF
|
||||||
self.cache = NetworkCache()
|
self.cache = NetworkCache()
|
||||||
self.dhcp_driver_cls = importutils.import_class(self.conf.dhcp_driver)
|
self.dhcp_driver_cls = importutils.import_class(self.conf.dhcp_driver)
|
||||||
|
@ -221,23 +222,29 @@ class DhcpAgent(manager.Manager):
|
||||||
def _dhcp_ready_ports_loop(self):
|
def _dhcp_ready_ports_loop(self):
|
||||||
"""Notifies the server of any ports that had reservations setup."""
|
"""Notifies the server of any ports that had reservations setup."""
|
||||||
while True:
|
while True:
|
||||||
# this is just watching a set so we can do it really frequently
|
# this is just watching sets so we can do it really frequently
|
||||||
eventlet.sleep(0.1)
|
eventlet.sleep(0.1)
|
||||||
if self.dhcp_ready_ports:
|
prio_ports_to_send = set()
|
||||||
ports_to_send = set()
|
ports_to_send = set()
|
||||||
for port_count in range(min(len(self.dhcp_ready_ports),
|
for port_count in range(min(len(self.dhcp_prio_ready_ports) +
|
||||||
DHCP_READY_PORTS_SYNC_MAX)):
|
len(self.dhcp_ready_ports),
|
||||||
ports_to_send.add(self.dhcp_ready_ports.pop())
|
DHCP_READY_PORTS_SYNC_MAX)):
|
||||||
|
if self.dhcp_prio_ready_ports:
|
||||||
|
prio_ports_to_send.add(self.dhcp_prio_ready_ports.pop())
|
||||||
|
continue
|
||||||
|
ports_to_send.add(self.dhcp_ready_ports.pop())
|
||||||
|
if prio_ports_to_send or ports_to_send:
|
||||||
try:
|
try:
|
||||||
self.plugin_rpc.dhcp_ready_on_ports(ports_to_send)
|
self.plugin_rpc.dhcp_ready_on_ports(prio_ports_to_send |
|
||||||
|
ports_to_send)
|
||||||
LOG.info("DHCP configuration for ports %s is completed",
|
LOG.info("DHCP configuration for ports %s is completed",
|
||||||
ports_to_send)
|
prio_ports_to_send | ports_to_send)
|
||||||
continue
|
continue
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception("Failure notifying DHCP server of "
|
LOG.exception("Failure notifying DHCP server of "
|
||||||
"ready DHCP ports. Will retry on next "
|
"ready DHCP ports. Will retry on next "
|
||||||
"iteration.")
|
"iteration.")
|
||||||
|
self.dhcp_prio_ready_ports |= prio_ports_to_send
|
||||||
self.dhcp_ready_ports |= ports_to_send
|
self.dhcp_ready_ports |= ports_to_send
|
||||||
|
|
||||||
def start_ready_ports_loop(self):
|
def start_ready_ports_loop(self):
|
||||||
|
@ -493,9 +500,10 @@ class DhcpAgent(manager.Manager):
|
||||||
network = self.cache.get_network_by_id(updated_port.network_id)
|
network = self.cache.get_network_by_id(updated_port.network_id)
|
||||||
if not network:
|
if not network:
|
||||||
return
|
return
|
||||||
self.reload_allocations(updated_port, network)
|
# treat update as a create event
|
||||||
|
self.reload_allocations(updated_port, network, prio=True)
|
||||||
|
|
||||||
def reload_allocations(self, port, network):
|
def reload_allocations(self, port, network, prio=False):
|
||||||
LOG.info("Trigger reload_allocations for port %s", port)
|
LOG.info("Trigger reload_allocations for port %s", port)
|
||||||
driver_action = 'reload_allocations'
|
driver_action = 'reload_allocations'
|
||||||
if self._is_port_on_this_agent(port):
|
if self._is_port_on_this_agent(port):
|
||||||
|
@ -521,7 +529,10 @@ class DhcpAgent(manager.Manager):
|
||||||
driver_action = 'restart'
|
driver_action = 'restart'
|
||||||
self.cache.put_port(port)
|
self.cache.put_port(port)
|
||||||
self.call_driver(driver_action, network)
|
self.call_driver(driver_action, network)
|
||||||
self.dhcp_ready_ports.add(port.id)
|
if prio:
|
||||||
|
self.dhcp_prio_ready_ports.add(port.id)
|
||||||
|
else:
|
||||||
|
self.dhcp_ready_ports.add(port.id)
|
||||||
self.update_isolated_metadata_proxy(network)
|
self.update_isolated_metadata_proxy(network)
|
||||||
|
|
||||||
def _is_port_on_this_agent(self, port):
|
def _is_port_on_this_agent(self, port):
|
||||||
|
@ -558,7 +569,7 @@ class DhcpAgent(manager.Manager):
|
||||||
"DHCP cache is out of sync",
|
"DHCP cache is out of sync",
|
||||||
created_port.network_id)
|
created_port.network_id)
|
||||||
return
|
return
|
||||||
self.reload_allocations(created_port, network)
|
self.reload_allocations(created_port, network, prio=True)
|
||||||
|
|
||||||
def port_delete_end(self, context, payload):
|
def port_delete_end(self, context, payload):
|
||||||
"""Handle the port.delete.end notification event."""
|
"""Handle the port.delete.end notification event."""
|
||||||
|
|
|
@ -471,6 +471,55 @@ class TestDhcpAgent(base.BaseTestCase):
|
||||||
ready.call_args_list[1][0][0])
|
ready.call_args_list[1][0][0])
|
||||||
self.assertEqual(set(range(port_count)), ports_ready)
|
self.assertEqual(set(range(port_count)), ports_ready)
|
||||||
|
|
||||||
|
def test_dhcp_ready_ports_loop_with_limit_ports_per_call_prio(self):
|
||||||
|
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
|
||||||
|
sync_max = dhcp_agent.DHCP_READY_PORTS_SYNC_MAX
|
||||||
|
port_count = 4
|
||||||
|
# port set ranges must be unique to differentiate results
|
||||||
|
dhcp.dhcp_prio_ready_ports = set(range(sync_max))
|
||||||
|
dhcp.dhcp_ready_ports = set(range(sync_max, sync_max + port_count))
|
||||||
|
|
||||||
|
with mock.patch.object(dhcp.plugin_rpc,
|
||||||
|
'dhcp_ready_on_ports') as ready:
|
||||||
|
# exit after 1 iteration
|
||||||
|
with mock.patch.object(dhcp_agent.eventlet, 'sleep',
|
||||||
|
side_effect=[0, RuntimeError]):
|
||||||
|
with testtools.ExpectedException(RuntimeError):
|
||||||
|
dhcp._dhcp_ready_ports_loop()
|
||||||
|
|
||||||
|
# only priority ports should have been processed
|
||||||
|
self.assertEqual(set(), dhcp.dhcp_prio_ready_ports)
|
||||||
|
self.assertEqual(set(range(sync_max, sync_max + port_count)),
|
||||||
|
dhcp.dhcp_ready_ports)
|
||||||
|
# one call is expected, with DHCP_READY_PORTS_SYNC_MAX ports
|
||||||
|
self.assertEqual(1, ready.call_count)
|
||||||
|
self.assertEqual(sync_max, len(ready.call_args_list[0][0][0]))
|
||||||
|
# priority ports need to be ready
|
||||||
|
ports_ready = ready.call_args_list[0][0][0]
|
||||||
|
self.assertEqual(set(range(sync_max)), ports_ready)
|
||||||
|
|
||||||
|
# add some priority ports, to make sure they are processed
|
||||||
|
dhcp.dhcp_prio_ready_ports = set(range(port_count))
|
||||||
|
with mock.patch.object(dhcp.plugin_rpc,
|
||||||
|
'dhcp_ready_on_ports') as ready:
|
||||||
|
# exit after 1 iteration
|
||||||
|
with mock.patch.object(dhcp_agent.eventlet, 'sleep',
|
||||||
|
side_effect=[0, RuntimeError]):
|
||||||
|
with testtools.ExpectedException(RuntimeError):
|
||||||
|
dhcp._dhcp_ready_ports_loop()
|
||||||
|
|
||||||
|
# all ports should have been processed
|
||||||
|
self.assertEqual(set(), dhcp.dhcp_prio_ready_ports)
|
||||||
|
self.assertEqual(set(), dhcp.dhcp_ready_ports)
|
||||||
|
# one call is expected, with (port_count * 2) ports
|
||||||
|
self.assertEqual(1, ready.call_count)
|
||||||
|
self.assertEqual(port_count * 2, len(ready.call_args_list[0][0][0]))
|
||||||
|
# all ports need to be ready
|
||||||
|
ports_ready = ready.call_args_list[0][0][0]
|
||||||
|
all_ports = (set(range(port_count)) |
|
||||||
|
set(range(sync_max, sync_max + port_count)))
|
||||||
|
self.assertEqual(all_ports, ports_ready)
|
||||||
|
|
||||||
def test_dhcp_ready_ports_updates_after_enable_dhcp(self):
|
def test_dhcp_ready_ports_updates_after_enable_dhcp(self):
|
||||||
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
|
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
|
||||||
self.assertEqual(set(), dhcp.dhcp_ready_ports)
|
self.assertEqual(set(), dhcp.dhcp_ready_ports)
|
||||||
|
@ -1126,7 +1175,8 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
|
||||||
self.dhcp.port_update_end(None, payload)
|
self.dhcp.port_update_end(None, payload)
|
||||||
self.dhcp._process_resource_update()
|
self.dhcp._process_resource_update()
|
||||||
self.reload_allocations.assert_called_once_with(fake_port2,
|
self.reload_allocations.assert_called_once_with(fake_port2,
|
||||||
fake_network)
|
fake_network,
|
||||||
|
prio=True)
|
||||||
|
|
||||||
def test_reload_allocations(self):
|
def test_reload_allocations(self):
|
||||||
self.cache.get_port_by_id.return_value = fake_port2
|
self.cache.get_port_by_id.return_value = fake_port2
|
||||||
|
@ -1147,7 +1197,8 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
|
||||||
self.dhcp.port_create_end(None, payload)
|
self.dhcp.port_create_end(None, payload)
|
||||||
self.dhcp._process_resource_update()
|
self.dhcp._process_resource_update()
|
||||||
self.reload_allocations.assert_called_once_with(fake_port2,
|
self.reload_allocations.assert_called_once_with(fake_port2,
|
||||||
fake_network)
|
fake_network,
|
||||||
|
prio=True)
|
||||||
|
|
||||||
def test_port_create_end_no_resync_if_same_port_already_in_cache(self):
|
def test_port_create_end_no_resync_if_same_port_already_in_cache(self):
|
||||||
self.reload_allocations_p = mock.patch.object(self.dhcp,
|
self.reload_allocations_p = mock.patch.object(self.dhcp,
|
||||||
|
@ -1161,7 +1212,8 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
|
||||||
self.dhcp.port_create_end(None, payload)
|
self.dhcp.port_create_end(None, payload)
|
||||||
self.dhcp._process_resource_update()
|
self.dhcp._process_resource_update()
|
||||||
self.reload_allocations.assert_called_once_with(fake_port2,
|
self.reload_allocations.assert_called_once_with(fake_port2,
|
||||||
new_fake_network)
|
new_fake_network,
|
||||||
|
prio=True)
|
||||||
self.schedule_resync.assert_not_called()
|
self.schedule_resync.assert_not_called()
|
||||||
|
|
||||||
def test_port_update_change_ip_on_port(self):
|
def test_port_update_change_ip_on_port(self):
|
||||||
|
|
Loading…
Reference in New Issue