Lock in DHCP agent based on network_id

All cache operations and dnsmasq process operations
are scoped to a network ID so we can always safely
perform concurrent actions on different network IDs.
This patch adjusts the DHCP agent to lock based on
network ID rather than having a global lock for every
operation.

sync_state calls are still protected with a reader/writer
lock to ensure that when sync_state needs to run, all
other operations are blocked.

Related-Bug: #1548190
Change-Id: I56010dc801d82be56f12e834c5164316872c2f8b
This commit is contained in:
Kevin Benton 2016-11-18 04:25:32 -07:00
parent b9d0a5b885
commit d1930cefd2
2 changed files with 66 additions and 18 deletions

View File

@ -19,12 +19,14 @@ import os
import eventlet import eventlet
from neutron_lib import constants from neutron_lib import constants
from neutron_lib import exceptions from neutron_lib import exceptions
from oslo_concurrency import lockutils
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
import oslo_messaging import oslo_messaging
from oslo_service import loopingcall from oslo_service import loopingcall
from oslo_utils import fileutils from oslo_utils import fileutils
from oslo_utils import importutils from oslo_utils import importutils
import six
from neutron._i18n import _, _LE, _LI, _LW from neutron._i18n import _, _LE, _LI, _LW
from neutron.agent.linux import dhcp from neutron.agent.linux import dhcp
@ -39,6 +41,31 @@ from neutron import context
from neutron import manager from neutron import manager
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
_SYNC_STATE_LOCK = lockutils.ReaderWriterLock()
def _sync_lock(f):
"""Decorator to block all operations for a global sync call."""
@six.wraps(f)
def wrapped(*args, **kwargs):
with _SYNC_STATE_LOCK.write_lock():
return f(*args, **kwargs)
return wrapped
def _wait_if_syncing(f):
"""Decorator to wait if any sync operations are in progress."""
@six.wraps(f)
def wrapped(*args, **kwargs):
with _SYNC_STATE_LOCK.read_lock():
return f(*args, **kwargs)
return wrapped
def _net_lock(network_id):
"""Returns a context manager lock based on network_id."""
lock_name = 'dhcp-agent-network-lock-%s' % network_id
return lockutils.lock(lock_name, utils.SYNCHRONIZED_PREFIX)
class DhcpAgent(manager.Manager): class DhcpAgent(manager.Manager):
@ -146,7 +173,7 @@ class DhcpAgent(manager.Manager):
""" """
self.needs_resync_reasons[network_id].append(reason) self.needs_resync_reasons[network_id].append(reason)
@utils.synchronized('dhcp-agent') @_sync_lock
def sync_state(self, networks=None): def sync_state(self, networks=None):
"""Sync the local DHCP state with Neutron. If no networks are passed, """Sync the local DHCP state with Neutron. If no networks are passed,
or 'None' is one of the networks, sync all of the networks. or 'None' is one of the networks, sync all of the networks.
@ -327,44 +354,54 @@ class DhcpAgent(manager.Manager):
# Update the metadata proxy after the dhcp driver has been updated # Update the metadata proxy after the dhcp driver has been updated
self.update_isolated_metadata_proxy(network) self.update_isolated_metadata_proxy(network)
@utils.synchronized('dhcp-agent') @_wait_if_syncing
def network_create_end(self, context, payload): def network_create_end(self, context, payload):
"""Handle the network.create.end notification event.""" """Handle the network.create.end notification event."""
network_id = payload['network']['id'] network_id = payload['network']['id']
self.enable_dhcp_helper(network_id) with _net_lock(network_id):
self.enable_dhcp_helper(network_id)
@utils.synchronized('dhcp-agent') @_wait_if_syncing
def network_update_end(self, context, payload): def network_update_end(self, context, payload):
"""Handle the network.update.end notification event.""" """Handle the network.update.end notification event."""
network_id = payload['network']['id'] network_id = payload['network']['id']
if payload['network']['admin_state_up']: with _net_lock(network_id):
self.enable_dhcp_helper(network_id) if payload['network']['admin_state_up']:
else: self.enable_dhcp_helper(network_id)
self.disable_dhcp_helper(network_id) else:
self.disable_dhcp_helper(network_id)
@utils.synchronized('dhcp-agent') @_wait_if_syncing
def network_delete_end(self, context, payload): def network_delete_end(self, context, payload):
"""Handle the network.delete.end notification event.""" """Handle the network.delete.end notification event."""
self.disable_dhcp_helper(payload['network_id']) network_id = payload['network_id']
with _net_lock(network_id):
self.disable_dhcp_helper(network_id)
@utils.synchronized('dhcp-agent') @_wait_if_syncing
def subnet_update_end(self, context, payload): def subnet_update_end(self, context, payload):
"""Handle the subnet.update.end notification event.""" """Handle the subnet.update.end notification event."""
network_id = payload['subnet']['network_id'] network_id = payload['subnet']['network_id']
self.refresh_dhcp_helper(network_id) with _net_lock(network_id):
self.refresh_dhcp_helper(network_id)
# Use the update handler for the subnet create event. # Use the update handler for the subnet create event.
subnet_create_end = subnet_update_end subnet_create_end = subnet_update_end
@utils.synchronized('dhcp-agent') @_wait_if_syncing
def subnet_delete_end(self, context, payload): def subnet_delete_end(self, context, payload):
"""Handle the subnet.delete.end notification event.""" """Handle the subnet.delete.end notification event."""
subnet_id = payload['subnet_id'] subnet_id = payload['subnet_id']
network = self.cache.get_network_by_subnet_id(subnet_id) network = self.cache.get_network_by_subnet_id(subnet_id)
if network: if not network:
return
with _net_lock(network.id):
network = self.cache.get_network_by_subnet_id(subnet_id)
if not network:
return
self.refresh_dhcp_helper(network.id) self.refresh_dhcp_helper(network.id)
@utils.synchronized('dhcp-agent') @_wait_if_syncing
def port_update_end(self, context, payload): def port_update_end(self, context, payload):
"""Handle the port.update.end notification event.""" """Handle the port.update.end notification event."""
updated_port = dhcp.DictModel(payload['port']) updated_port = dhcp.DictModel(payload['port'])
@ -372,7 +409,12 @@ class DhcpAgent(manager.Manager):
LOG.debug("Discarding stale port update: %s", updated_port) LOG.debug("Discarding stale port update: %s", updated_port)
return return
network = self.cache.get_network_by_id(updated_port.network_id) network = self.cache.get_network_by_id(updated_port.network_id)
if network: if not network:
return
with _net_lock(network.id):
network = self.cache.get_network_by_id(updated_port.network_id)
if not network:
return
LOG.info(_LI("Trigger reload_allocations for port %s"), LOG.info(_LI("Trigger reload_allocations for port %s"),
updated_port) updated_port)
driver_action = 'reload_allocations' driver_action = 'reload_allocations'
@ -409,12 +451,17 @@ class DhcpAgent(manager.Manager):
# Use the update handler for the port create event. # Use the update handler for the port create event.
port_create_end = port_update_end port_create_end = port_update_end
@utils.synchronized('dhcp-agent') @_wait_if_syncing
def port_delete_end(self, context, payload): def port_delete_end(self, context, payload):
"""Handle the port.delete.end notification event.""" """Handle the port.delete.end notification event."""
port = self.cache.get_port_by_id(payload['port_id']) port = self.cache.get_port_by_id(payload['port_id'])
self.cache.deleted_ports.add(payload['port_id']) self.cache.deleted_ports.add(payload['port_id'])
if port: if not port:
return
with _net_lock(port.network_id):
port = self.cache.get_port_by_id(payload['port_id'])
if not port:
return
network = self.cache.get_network_by_id(port.network_id) network = self.cache.get_network_by_id(port.network_id)
self.cache.remove_port(port) self.cache.remove_port(port)
if self._is_port_on_this_agent(port): if self._is_port_on_this_agent(port):

View File

@ -1098,6 +1098,7 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
self.cache.assert_has_calls( self.cache.assert_has_calls(
[mock.call.get_port_by_id(fake_port2.id), [mock.call.get_port_by_id(fake_port2.id),
mock.call.deleted_ports.add(fake_port2.id), mock.call.deleted_ports.add(fake_port2.id),
mock.call.get_port_by_id(fake_port2.id),
mock.call.get_network_by_id(fake_network.id), mock.call.get_network_by_id(fake_network.id),
mock.call.remove_port(fake_port2)]) mock.call.remove_port(fake_port2)])
self.call_driver.assert_has_calls( self.call_driver.assert_has_calls(