Add periodic check resync check to DHCP agent

fixes bug 1047605

This patch adds a periodic resync check to the DHCP agent that will
resync state with the Quantum server if any notification or rpc errors
have occurred since the last check.

Change-Id: I879662ff44f2991cd2ff07062bb4e770a2981528
This commit is contained in:
Mark McClain 2012-09-10 15:28:56 -04:00
parent ba684d7ac2
commit 9890aa7c8d
3 changed files with 191 additions and 27 deletions

View File

@ -7,6 +7,10 @@
# devstack installation.
state_path = /opt/stack/data
# The DHCP agent will resync its state with Quantum to recover from any
# transient notification or rpc errors. The interval is number of
# seconds between attempts.
# resync_interval = 30
# The DHCP requires that an inteface driver be set. Choose the one that best
# matches you plugin.

View File

@ -46,6 +46,7 @@ NS_PREFIX = 'qdhcp-'
class DhcpAgent(object):
OPTS = [
cfg.StrOpt('root_helper', default='sudo'),
cfg.IntOpt('resync_interval', default=30),
cfg.StrOpt('dhcp_driver',
default='quantum.agent.linux.dhcp.Dnsmasq',
help="The driver used to manage the DHCP server."),
@ -54,6 +55,7 @@ class DhcpAgent(object):
]
def __init__(self, conf):
self.needs_resync = False
self.conf = conf
self.cache = NetworkCache()
@ -67,10 +69,8 @@ class DhcpAgent(object):
def run(self):
"""Activate the DHCP agent."""
# enable DHCP for current networks
for network_id in self.plugin_rpc.get_active_networks():
self.enable_dhcp_helper(network_id)
self.sync_state()
self.periodic_resync()
self.lease_relay.start()
self.notifications.run_dispatch(self)
@ -92,17 +92,51 @@ class DhcpAgent(object):
return True
except Exception, e:
self.needs_resync = True
LOG.exception('Unable to %s dhcp.' % action)
def update_lease(self, network_id, ip_address, time_remaining):
try:
self.plugin_rpc.update_lease_expiration(network_id, ip_address,
time_remaining)
except:
self.needs_resync = True
LOG.exception(_('Unable to update lease'))
def sync_state(self):
"""Sync the local DHCP state with Quantum."""
LOG.info(_('Synchronizing state'))
known_networks = set(self.cache.get_network_ids())
try:
active_networks = set(self.plugin_rpc.get_active_networks())
for deleted_id in known_networks - active_networks:
self.disable_dhcp_helper(deleted_id)
for network_id in active_networks:
self.refresh_dhcp_helper(network_id)
except:
self.needs_resync = True
LOG.exception(_('Unable to sync network state.'))
def _periodic_resync_helper(self):
"""Resync the dhcp state at the configured interval."""
while True:
eventlet.sleep(self.conf.resync_interval)
if self.needs_resync:
self.needs_resync = False
self.sync_state()
def periodic_resync(self):
"""Spawn a thread to periodically resync the dhcp state."""
eventlet.spawn(self._periodic_resync_helper)
def enable_dhcp_helper(self, network_id):
"""Enable DHCP for a network that meets enabling criteria."""
try:
network = self.plugin_rpc.get_network_info(network_id)
except:
self.needs_resync = True
LOG.exception(_('Network %s RPC info call failed.') % network_id)
return
@ -119,7 +153,7 @@ class DhcpAgent(object):
"""Disable DHCP for a network known to the agent."""
network = self.cache.get_network_by_id(network_id)
if network:
self.call_driver('disable', network)
if self.call_driver('disable', network):
self.cache.remove(network)
def refresh_dhcp_helper(self, network_id):
@ -132,7 +166,12 @@ class DhcpAgent(object):
# DHCP current not running for network.
return self.enable_dhcp_helper(network_id)
try:
network = self.plugin_rpc.get_network_info(network_id)
except:
self.needs_resync = True
LOG.exception(_('Network %s RPC info call failed.') % network_id)
return
old_cidrs = set(s.cidr for s in old_network.subnets if s.enable_dhcp)
new_cidrs = set(s.cidr for s in network.subnets if s.enable_dhcp)
@ -141,7 +180,7 @@ class DhcpAgent(object):
self.call_driver('reload_allocations', network)
self.cache.put(network)
elif new_cidrs:
self.call_driver('restart', network)
if self.call_driver('restart', network):
self.cache.put(network)
else:
self.disable_dhcp_helper(network.id)
@ -274,6 +313,9 @@ class NetworkCache(object):
self.subnet_lookup = {}
self.port_lookup = {}
def get_network_ids(self):
return self.cache.keys()
def get_network_by_id(self, network_id):
return self.cache.get(network_id)

View File

@ -104,21 +104,16 @@ class TestDhcpAgent(unittest.TestCase):
def test_run_completes_single_pass(self):
with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr:
with mock.patch('quantum.agent.dhcp_agent.DhcpPluginApi') as plug:
mock_plugin = mock.Mock()
mock_plugin.get_active_networks.return_value = ['a']
plug.return_value = mock_plugin
dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
with mock.patch.object(dhcp, 'enable_dhcp_helper') as enable:
with mock.patch.object(dhcp, 'lease_relay') as relay:
attrs_to_mock = dict(
[(a, mock.DEFAULT) for a in
['sync_state', 'lease_relay', 'periodic_resync']])
with mock.patch.multiple(dhcp, **attrs_to_mock) as mocks:
dhcp.run()
enable.assert_called_once_with('a')
plug.assert_called_once_with('q-plugin', mock.ANY)
mock_plugin.assert_has_calls(
[mock.call.get_active_networks()])
relay.assert_has_mock_calls([mock.call.run()])
mocks['sync_state'].assert_called_once_with()
mocks['periodic_resync'].assert_called_once_with()
mocks['lease_relay'].assert_has_mock_calls(
[mock.call.start()])
self.notification.assert_has_calls([mock.call.run_dispatch()])
def test_call_driver(self):
@ -149,6 +144,95 @@ class TestDhcpAgent(unittest.TestCase):
mock.ANY,
'qdhcp-1')
self.assertEqual(log.call_count, 1)
self.assertTrue(dhcp.needs_resync)
def test_update_lease(self):
with mock.patch('quantum.agent.dhcp_agent.DhcpPluginApi') as plug:
dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
dhcp.update_lease('net_id', '192.168.1.1', 120)
plug.assert_has_calls(
[mock.call().update_lease_expiration(
'net_id', '192.168.1.1', 120)])
def test_update_lease_failure(self):
with mock.patch('quantum.agent.dhcp_agent.DhcpPluginApi') as plug:
plug.return_value.update_lease_expiration.side_effect = Exception
with mock.patch.object(dhcp_agent.LOG, 'exception') as log:
dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
dhcp.update_lease('net_id', '192.168.1.1', 120)
plug.assert_has_calls(
[mock.call().update_lease_expiration(
'net_id', '192.168.1.1', 120)])
self.assertTrue(log.called)
self.assertTrue(dhcp.needs_resync)
def _test_sync_state_helper(self, known_networks, active_networks):
with mock.patch('quantum.agent.dhcp_agent.DhcpPluginApi') as plug:
mock_plugin = mock.Mock()
mock_plugin.get_active_networks.return_value = active_networks
plug.return_value = mock_plugin
dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
attrs_to_mock = dict(
[(a, mock.DEFAULT) for a in
['refresh_dhcp_helper', 'disable_dhcp_helper', 'cache']])
with mock.patch.multiple(dhcp, **attrs_to_mock) as mocks:
mocks['cache'].get_network_ids.return_value = known_networks
dhcp.sync_state()
exp_refresh = [
mock.call(net_id) for net_id in active_networks]
diff = set(known_networks) - set(active_networks)
exp_disable = [mock.call(net_id) for net_id in diff]
mocks['cache'].assert_has_calls([mock.call.get_network_ids()])
mocks['refresh_dhcp_helper'].assert_has_called(exp_refresh)
mocks['disable_dhcp_helper'].assert_has_called(exp_disable)
def test_sync_state_initial(self):
self._test_sync_state_helper([], ['a'])
def test_sync_state_same(self):
self._test_sync_state_helper(['a'], ['a'])
def test_sync_state_disabled_net(self):
self._test_sync_state_helper(['b'], ['a'])
def test_sync_state_plugin_error(self):
with mock.patch('quantum.agent.dhcp_agent.DhcpPluginApi') as plug:
mock_plugin = mock.Mock()
mock_plugin.get_active_networks.side_effect = Exception
plug.return_value = mock_plugin
with mock.patch.object(dhcp_agent.LOG, 'exception') as log:
dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
dhcp.sync_state()
self.assertTrue(log.called)
self.assertTrue(dhcp.needs_resync)
def test_periodic_resync(self):
dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
with mock.patch.object(dhcp_agent.eventlet, 'spawn') as spawn:
dhcp.periodic_resync()
spawn.assert_called_once_with(dhcp._periodic_resync_helper)
def test_periodoc_resync_helper(self):
with mock.patch.object(dhcp_agent.eventlet, 'sleep') as sleep:
dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
dhcp.needs_resync = True
with mock.patch.object(dhcp, 'sync_state') as sync_state:
sync_state.side_effect = RuntimeError
with self.assertRaises(RuntimeError):
dhcp._periodic_resync_helper()
sync_state.assert_called_once_with()
sleep.assert_called_once_with(dhcp.conf.resync_interval)
self.assertFalse(dhcp.needs_resync)
class TestDhcpAgentEventHandler(unittest.TestCase):
@ -207,6 +291,7 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
[mock.call.get_network_info(fake_network.id)])
self.assertFalse(self.call_driver.called)
self.assertTrue(log.called)
self.assertTrue(self.dhcp.needs_resync)
self.assertFalse(self.cache.called)
def test_enable_dhcp_helper_driver_failure(self):
@ -232,6 +317,16 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
[mock.call.get_network_by_id('abcdef')])
self.assertEqual(self.call_driver.call_count, 0)
def test_disable_dhcp_helper_driver_failure(self):
self.cache.get_network_by_id.return_value = fake_network
self.dhcp.disable_dhcp_helper(fake_network.id)
self.call_driver.disable.return_value = False
self.cache.assert_has_calls(
[mock.call.get_network_by_id(fake_network.id)])
self.call_driver.assert_called_once_with('disable', fake_network)
self.cache.assert_has_calls(
[mock.call.get_network_by_id(fake_network.id)])
def test_network_create_end(self):
payload = dict(network=dict(id=fake_network.id))
@ -275,6 +370,23 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
self.cache.assert_has_calls(
[mock.call.get_network_by_id('net-id')])
def test_refresh_dhcp_helper_exception_during_rpc(self):
network = FakeModel('net-id',
tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
admin_state_up=True,
subnets=[],
ports=[])
self.cache.get_network_by_id.return_value = network
self.plugin.get_network_info.side_effect = Exception
with mock.patch.object(dhcp_agent.LOG, 'exception') as log:
self.dhcp.refresh_dhcp_helper(network.id)
self.assertFalse(self.call_driver.called)
self.cache.assert_has_calls(
[mock.call.get_network_by_id('net-id')])
self.assertTrue(log.called)
self.assertTrue(self.dhcp.needs_resync)
def test_subnet_update_end(self):
payload = dict(subnet=dict(network_id=fake_network.id))
self.cache.get_network_by_id.return_value = fake_network
@ -472,6 +584,12 @@ class TestNetworkCache(unittest.TestCase):
self.assertEqual(nc.get_network_by_id(fake_network.id), fake_network)
def test_get_network_ids(self):
nc = dhcp_agent.NetworkCache()
nc.put(fake_network)
self.assertEqual(nc.get_network_ids(), [fake_network.id])
def test_get_network_by_subnet_id(self):
nc = dhcp_agent.NetworkCache()
nc.put(fake_network)