Merge "Lock in DHCP agent based on network_id"
This commit is contained in:
commit
bd193cba73
|
@ -19,12 +19,14 @@ import os
|
|||
import eventlet
|
||||
from neutron_lib import constants
|
||||
from neutron_lib import exceptions
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
from oslo_service import loopingcall
|
||||
from oslo_utils import fileutils
|
||||
from oslo_utils import importutils
|
||||
import six
|
||||
|
||||
from neutron._i18n import _, _LE, _LI, _LW
|
||||
from neutron.agent.linux import dhcp
|
||||
|
@ -39,6 +41,31 @@ from neutron import context
|
|||
from neutron import manager
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
_SYNC_STATE_LOCK = lockutils.ReaderWriterLock()
|
||||
|
||||
|
||||
def _sync_lock(f):
|
||||
"""Decorator to block all operations for a global sync call."""
|
||||
@six.wraps(f)
|
||||
def wrapped(*args, **kwargs):
|
||||
with _SYNC_STATE_LOCK.write_lock():
|
||||
return f(*args, **kwargs)
|
||||
return wrapped
|
||||
|
||||
|
||||
def _wait_if_syncing(f):
|
||||
"""Decorator to wait if any sync operations are in progress."""
|
||||
@six.wraps(f)
|
||||
def wrapped(*args, **kwargs):
|
||||
with _SYNC_STATE_LOCK.read_lock():
|
||||
return f(*args, **kwargs)
|
||||
return wrapped
|
||||
|
||||
|
||||
def _net_lock(network_id):
|
||||
"""Returns a context manager lock based on network_id."""
|
||||
lock_name = 'dhcp-agent-network-lock-%s' % network_id
|
||||
return lockutils.lock(lock_name, utils.SYNCHRONIZED_PREFIX)
|
||||
|
||||
|
||||
class DhcpAgent(manager.Manager):
|
||||
|
@ -146,7 +173,7 @@ class DhcpAgent(manager.Manager):
|
|||
"""
|
||||
self.needs_resync_reasons[network_id].append(reason)
|
||||
|
||||
@utils.synchronized('dhcp-agent')
|
||||
@_sync_lock
|
||||
def sync_state(self, networks=None):
|
||||
"""Sync the local DHCP state with Neutron. If no networks are passed,
|
||||
or 'None' is one of the networks, sync all of the networks.
|
||||
|
@ -327,44 +354,54 @@ class DhcpAgent(manager.Manager):
|
|||
# Update the metadata proxy after the dhcp driver has been updated
|
||||
self.update_isolated_metadata_proxy(network)
|
||||
|
||||
@utils.synchronized('dhcp-agent')
|
||||
@_wait_if_syncing
|
||||
def network_create_end(self, context, payload):
|
||||
"""Handle the network.create.end notification event."""
|
||||
network_id = payload['network']['id']
|
||||
self.enable_dhcp_helper(network_id)
|
||||
with _net_lock(network_id):
|
||||
self.enable_dhcp_helper(network_id)
|
||||
|
||||
@utils.synchronized('dhcp-agent')
|
||||
@_wait_if_syncing
|
||||
def network_update_end(self, context, payload):
|
||||
"""Handle the network.update.end notification event."""
|
||||
network_id = payload['network']['id']
|
||||
if payload['network']['admin_state_up']:
|
||||
self.enable_dhcp_helper(network_id)
|
||||
else:
|
||||
self.disable_dhcp_helper(network_id)
|
||||
with _net_lock(network_id):
|
||||
if payload['network']['admin_state_up']:
|
||||
self.enable_dhcp_helper(network_id)
|
||||
else:
|
||||
self.disable_dhcp_helper(network_id)
|
||||
|
||||
@utils.synchronized('dhcp-agent')
|
||||
@_wait_if_syncing
|
||||
def network_delete_end(self, context, payload):
|
||||
"""Handle the network.delete.end notification event."""
|
||||
self.disable_dhcp_helper(payload['network_id'])
|
||||
network_id = payload['network_id']
|
||||
with _net_lock(network_id):
|
||||
self.disable_dhcp_helper(network_id)
|
||||
|
||||
@utils.synchronized('dhcp-agent')
|
||||
@_wait_if_syncing
|
||||
def subnet_update_end(self, context, payload):
|
||||
"""Handle the subnet.update.end notification event."""
|
||||
network_id = payload['subnet']['network_id']
|
||||
self.refresh_dhcp_helper(network_id)
|
||||
with _net_lock(network_id):
|
||||
self.refresh_dhcp_helper(network_id)
|
||||
|
||||
# Use the update handler for the subnet create event.
|
||||
subnet_create_end = subnet_update_end
|
||||
|
||||
@utils.synchronized('dhcp-agent')
|
||||
@_wait_if_syncing
|
||||
def subnet_delete_end(self, context, payload):
|
||||
"""Handle the subnet.delete.end notification event."""
|
||||
subnet_id = payload['subnet_id']
|
||||
network = self.cache.get_network_by_subnet_id(subnet_id)
|
||||
if network:
|
||||
if not network:
|
||||
return
|
||||
with _net_lock(network.id):
|
||||
network = self.cache.get_network_by_subnet_id(subnet_id)
|
||||
if not network:
|
||||
return
|
||||
self.refresh_dhcp_helper(network.id)
|
||||
|
||||
@utils.synchronized('dhcp-agent')
|
||||
@_wait_if_syncing
|
||||
def port_update_end(self, context, payload):
|
||||
"""Handle the port.update.end notification event."""
|
||||
updated_port = dhcp.DictModel(payload['port'])
|
||||
|
@ -372,7 +409,12 @@ class DhcpAgent(manager.Manager):
|
|||
LOG.debug("Discarding stale port update: %s", updated_port)
|
||||
return
|
||||
network = self.cache.get_network_by_id(updated_port.network_id)
|
||||
if network:
|
||||
if not network:
|
||||
return
|
||||
with _net_lock(network.id):
|
||||
network = self.cache.get_network_by_id(updated_port.network_id)
|
||||
if not network:
|
||||
return
|
||||
LOG.info(_LI("Trigger reload_allocations for port %s"),
|
||||
updated_port)
|
||||
driver_action = 'reload_allocations'
|
||||
|
@ -409,12 +451,17 @@ class DhcpAgent(manager.Manager):
|
|||
# Use the update handler for the port create event.
|
||||
port_create_end = port_update_end
|
||||
|
||||
@utils.synchronized('dhcp-agent')
|
||||
@_wait_if_syncing
|
||||
def port_delete_end(self, context, payload):
|
||||
"""Handle the port.delete.end notification event."""
|
||||
port = self.cache.get_port_by_id(payload['port_id'])
|
||||
self.cache.deleted_ports.add(payload['port_id'])
|
||||
if port:
|
||||
if not port:
|
||||
return
|
||||
with _net_lock(port.network_id):
|
||||
port = self.cache.get_port_by_id(payload['port_id'])
|
||||
if not port:
|
||||
return
|
||||
network = self.cache.get_network_by_id(port.network_id)
|
||||
self.cache.remove_port(port)
|
||||
if self._is_port_on_this_agent(port):
|
||||
|
|
|
@ -1098,6 +1098,7 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
|
|||
self.cache.assert_has_calls(
|
||||
[mock.call.get_port_by_id(fake_port2.id),
|
||||
mock.call.deleted_ports.add(fake_port2.id),
|
||||
mock.call.get_port_by_id(fake_port2.id),
|
||||
mock.call.get_network_by_id(fake_network.id),
|
||||
mock.call.remove_port(fake_port2)])
|
||||
self.call_driver.assert_has_calls(
|
||||
|
|
Loading…
Reference in New Issue