Browse Source

Improve port dhcp Provisioning

Currently, the dhcp Provisioning of ports is the crucial bottleneck
of that concurrently boot multiple VM.

The root cause is that these ports will be processed one by one by dhcp
agent when they belong to the same network, And the 'Provisioning complete'
port is still blocked other port's processing in other dhcp agents. The
patch aim to optimize the dispatch strategy of the port cast to agent to
improve the Provisioning process.

In server side, I classify messages to multi levels. Especially, I classify
the port_update_end or port_create_end message to two levels, the high-level
message only cast to one agent, the low-level message cast to all agent. In
agent side I put these messages to `resource_processing_queue`, with the queue,
We can delete `_net_lock` and process these messages in order of priority.

Additonally, I modified the `resource_processing_queue` for my demand. I update
`_queue` from LIST to PriorityQueue in `ExclusiveResourceProcessor`, by this
way, we can sort all message which cached in `ExclusiveResourceProcessor` by
priority.

Conflicts:
    neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py

Related-Bug: #1760047
Change-Id: I255caa0571c42fb012fe882259ef181070beccef
(cherry picked from commit 99f4495c94)
(cherry picked from commit 740295d94d)
tags/12.1.0
yangjianfeng 1 year ago
committed by Slawek Kaplonski
parent
commit
dd5e89e717
6 changed files with 357 additions and 128 deletions
  1. +10
    -9
      neutron/agent/common/resource_processing_queue.py
  2. +146
    -71
      neutron/agent/dhcp/agent.py
  3. +51
    -3
      neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py
  4. +65
    -32
      neutron/tests/unit/agent/dhcp/test_agent.py
  5. +36
    -1
      neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py
  6. +49
    -12
      neutron/tests/unit/db/test_agentschedulers_db.py

+ 10
- 9
neutron/agent/common/resource_processing_queue.py View File

@@ -105,7 +105,7 @@ class ExclusiveResourceProcessor(object):

if id not in self._masters:
self._masters[id] = self
self._queue = []
self._queue = Queue.PriorityQueue(-1)

self._master = self._masters[id]

@@ -135,7 +135,7 @@ class ExclusiveResourceProcessor(object):
resource is being processed. These updates have already bubbled to
the front of the ResourceProcessingQueue.
"""
self._master._queue.append(update)
self._master._queue.put(update)

def updates(self):
"""Processes the resource until updates stop coming
@@ -144,13 +144,14 @@ class ExclusiveResourceProcessor(object):
may come in from other workers while it is in progress. This method
loops until they stop coming.
"""
if self._i_am_master():
while self._queue:
# Remove the update from the queue even if it is old.
update = self._queue.pop(0)
# Process the update only if it is fresh.
if self._get_resource_data_timestamp() < update.timestamp:
yield update
while self._i_am_master():
if self._queue.empty():
return
# Get the update from the queue even if it is old.
update = self._queue.get()
# Process the update only if it is fresh.
if self._get_resource_data_timestamp() < update.timestamp:
yield update


class ResourceProcessingQueue(object):


+ 146
- 71
neutron/agent/dhcp/agent.py View File

@@ -21,7 +21,6 @@ from neutron_lib.agent import constants as agent_consts
from neutron_lib import constants
from neutron_lib import context
from neutron_lib import exceptions
from neutron_lib.utils import runtime
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
@@ -32,6 +31,7 @@ from oslo_utils import importutils
import six

from neutron._i18n import _
from neutron.agent.common import resource_processing_queue as queue
from neutron.agent.linux import dhcp
from neutron.agent.linux import external_process
from neutron.agent.metadata import driver as metadata_driver
@@ -45,6 +45,8 @@ from neutron import manager
LOG = logging.getLogger(__name__)
_SYNC_STATE_LOCK = lockutils.ReaderWriterLock()

DEFAULT_PRIORITY = 255


def _sync_lock(f):
"""Decorator to block all operations for a global sync call."""
@@ -64,12 +66,6 @@ def _wait_if_syncing(f):
return wrapped


def _net_lock(network_id):
"""Returns a context manager lock based on network_id."""
lock_name = 'dhcp-agent-network-lock-%s' % network_id
return lockutils.lock(lock_name, runtime.SYNCHRONIZED_PREFIX)


class DhcpAgent(manager.Manager):
"""DHCP agent service manager.

@@ -100,6 +96,7 @@ class DhcpAgent(manager.Manager):
self._process_monitor = external_process.ProcessMonitor(
config=self.conf,
resource_type='dhcp')
self._queue = queue.ResourceProcessingQueue()

def init_host(self):
self.sync_state()
@@ -128,6 +125,7 @@ class DhcpAgent(manager.Manager):
"""Activate the DHCP agent."""
self.periodic_resync()
self.start_ready_ports_loop()
eventlet.spawn_n(self._process_loop)

def call_driver(self, action, network, **action_kwargs):
"""Invoke an action on a DHCP driver instance."""
@@ -354,36 +352,64 @@ class DhcpAgent(manager.Manager):
# Update the metadata proxy after the dhcp driver has been updated
self.update_isolated_metadata_proxy(network)

@_wait_if_syncing
def network_create_end(self, context, payload):
"""Handle the network.create.end notification event."""
network_id = payload['network']['id']
with _net_lock(network_id):
self.enable_dhcp_helper(network_id)
update = queue.ResourceUpdate(payload['network']['id'],
payload.get('priority',
DEFAULT_PRIORITY),
action='_network_create',
resource=payload)
self._queue.add(update)

@_wait_if_syncing
def _network_create(self, payload):
network_id = payload['network']['id']
self.enable_dhcp_helper(network_id)

def network_update_end(self, context, payload):
"""Handle the network.update.end notification event."""
network_id = payload['network']['id']
with _net_lock(network_id):
if payload['network']['admin_state_up']:
self.enable_dhcp_helper(network_id)
else:
self.disable_dhcp_helper(network_id)
update = queue.ResourceUpdate(payload['network']['id'],
payload.get('priority',
DEFAULT_PRIORITY),
action='_network_update',
resource=payload)
self._queue.add(update)

@_wait_if_syncing
def _network_update(self, payload):
network_id = payload['network']['id']
if payload['network']['admin_state_up']:
self.enable_dhcp_helper(network_id)
else:
self.disable_dhcp_helper(network_id)

def network_delete_end(self, context, payload):
"""Handle the network.delete.end notification event."""
network_id = payload['network_id']
with _net_lock(network_id):
self.disable_dhcp_helper(network_id)
update = queue.ResourceUpdate(payload['network_id'],
payload.get('priority',
DEFAULT_PRIORITY),
action='_network_delete',
resource=payload)
self._queue.add(update)

@_wait_if_syncing
def _network_delete(self, payload):
network_id = payload['network_id']
self.disable_dhcp_helper(network_id)

def subnet_update_end(self, context, payload):
"""Handle the subnet.update.end notification event."""
update = queue.ResourceUpdate(payload['subnet']['network_id'],
payload.get('priority',
DEFAULT_PRIORITY),
action='_subnet_update',
resource=payload)
self._queue.add(update)

@_wait_if_syncing
def _subnet_update(self, payload):
network_id = payload['subnet']['network_id']
with _net_lock(network_id):
self.refresh_dhcp_helper(network_id)
self.refresh_dhcp_helper(network_id)

# Use the update handler for the subnet create event.
subnet_create_end = subnet_update_end
@@ -406,31 +432,63 @@ class DhcpAgent(manager.Manager):
port = self.cache.get_port_by_id(port_id)
return port.network_id if port else None

@_wait_if_syncing
def subnet_delete_end(self, context, payload):
"""Handle the subnet.delete.end notification event."""
network_id = self._get_network_lock_id(payload)
if not network_id:
return
with _net_lock(network_id):
subnet_id = payload['subnet_id']
network = self.cache.get_network_by_subnet_id(subnet_id)
if not network:
return
self.refresh_dhcp_helper(network.id)
update = queue.ResourceUpdate(network_id,
payload.get('priority',
DEFAULT_PRIORITY),
action='_subnet_delete',
resource=payload)
self._queue.add(update)

@_wait_if_syncing
def _subnet_delete(self, payload):
network_id = self._get_network_lock_id(payload)
if not network_id:
return
subnet_id = payload['subnet_id']
network = self.cache.get_network_by_subnet_id(subnet_id)
if not network:
return
self.refresh_dhcp_helper(network.id)

def _process_loop(self):
LOG.debug("Starting _process_loop")

pool = eventlet.GreenPool(size=8)
while True:
pool.spawn_n(self._process_resource_update)

def _process_resource_update(self):
for tmp, update in self._queue.each_update_to_next_resource():
method = getattr(self, update.action)
method(update.resource)

def port_update_end(self, context, payload):
"""Handle the port.update.end notification event."""
updated_port = dhcp.DictModel(payload['port'])
with _net_lock(updated_port.network_id):
if self.cache.is_port_message_stale(payload['port']):
LOG.debug("Discarding stale port update: %s", updated_port)
return
network = self.cache.get_network_by_id(updated_port.network_id)
if not network:
return
self.reload_allocations(updated_port, network)
if self.cache.is_port_message_stale(updated_port):
LOG.debug("Discarding stale port update: %s", updated_port)
return
update = queue.ResourceUpdate(updated_port.network_id,
payload.get('priority',
DEFAULT_PRIORITY),
action='_port_update',
resource=updated_port)
self._queue.add(update)

@_wait_if_syncing
def _port_update(self, updated_port):
if self.cache.is_port_message_stale(updated_port):
LOG.debug("Discarding stale port update: %s", updated_port)
return
network = self.cache.get_network_by_id(updated_port.network_id)
if not network:
return
self.reload_allocations(updated_port, network)

def reload_allocations(self, port, network):
LOG.info("Trigger reload_allocations for port %s", port)
@@ -466,50 +524,67 @@ class DhcpAgent(manager.Manager):
port['network_id'], self.conf.host)
return port['device_id'] == thishost

@_wait_if_syncing
def port_create_end(self, context, payload):
"""Handle the port.create.end notification event."""
created_port = dhcp.DictModel(payload['port'])
with _net_lock(created_port.network_id):
network = self.cache.get_network_by_id(created_port.network_id)
if not network:
return
new_ips = {i['ip_address'] for i in created_port['fixed_ips']}
for port_cached in network.ports:
# if there are other ports cached with the same ip address in
# the same network this indicate that the cache is out of sync
cached_ips = {i['ip_address']
for i in port_cached['fixed_ips']}
if new_ips.intersection(cached_ips):
self.schedule_resync("Duplicate IP addresses found, "
"DHCP cache is out of sync",
created_port.network_id)
return
self.reload_allocations(created_port, network)
update = queue.ResourceUpdate(created_port.network_id,
payload.get('priority',
DEFAULT_PRIORITY),
action='_port_create',
resource=created_port)
self._queue.add(update)

@_wait_if_syncing
def _port_create(self, created_port):
network = self.cache.get_network_by_id(created_port.network_id)
if not network:
return
new_ips = {i['ip_address'] for i in created_port['fixed_ips']}
for port_cached in network.ports:
# if there are other ports cached with the same ip address in
# the same network this indicate that the cache is out of sync
cached_ips = {i['ip_address']
for i in port_cached['fixed_ips']}
if new_ips.intersection(cached_ips):
self.schedule_resync("Duplicate IP addresses found, "
"DHCP cache is out of sync",
created_port.network_id)
return
self.reload_allocations(created_port, network)

def port_delete_end(self, context, payload):
"""Handle the port.delete.end notification event."""
network_id = self._get_network_lock_id(payload)
if not network_id:
return
with _net_lock(network_id):
port_id = payload['port_id']
port = self.cache.get_port_by_id(port_id)
self.cache.deleted_ports.add(port_id)
if not port:
return
network = self.cache.get_network_by_id(port.network_id)
self.cache.remove_port(port)
if self._is_port_on_this_agent(port):
# the agent's port has been deleted. disable the service
# and add the network to the resync list to create
# (or acquire a reserved) port.
self.call_driver('disable', network)
self.schedule_resync("Agent port was deleted", port.network_id)
else:
self.call_driver('reload_allocations', network)
self.update_isolated_metadata_proxy(network)
update = queue.ResourceUpdate(network_id,
payload.get('priority',
DEFAULT_PRIORITY),
action='_port_delete',
resource=payload)
self._queue.add(update)

@_wait_if_syncing
def _port_delete(self, payload):
network_id = self._get_network_lock_id(payload)
if not network_id:
return
port_id = payload['port_id']
port = self.cache.get_port_by_id(port_id)
self.cache.deleted_ports.add(port_id)
if not port:
return
network = self.cache.get_network_by_id(port.network_id)
self.cache.remove_port(port)
if self._is_port_on_this_agent(port):
# the agent's port has been deleted. disable the service
# and add the network to the resync list to create
# (or acquire a reserved) port.
self.call_driver('disable', network)
self.schedule_resync("Agent port was deleted", port.network_id)
else:
self.call_driver('reload_allocations', network)
self.update_isolated_metadata_proxy(network)

def update_isolated_metadata_proxy(self, network):
"""Spawn or kill metadata proxy.


+ 51
- 3
neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py View File

@@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
import random

from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
@@ -26,6 +29,36 @@ from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils

# Priorities - lower value is higher priority
PRIORITY_NETWORK_CREATE = 0
PRIORITY_NETWORK_UPDATE = 1
PRIORITY_NETWORK_DELETE = 2
PRIORITY_SUBNET_UPDATE = 3
PRIORITY_SUBNET_DELETE = 4
# In order to improve port dhcp provisioning when nova concurrently create
# multiple vms, I classify the port_create_end message to two levels, the
# high-level message only cast to one agent, the low-level message cast to all
# other agent. In this way, When there are a large number of ports that need to
# be processed, we can dispatch the high priority message of port to different
# agent, so that the processed port will not block other port's processing in
# other dhcp agents.
PRIORITY_PORT_CREATE_HIGH = 5
PRIORITY_PORT_CREATE_LOW = 6
PRIORITY_PORT_UPDATE = 7
PRIORITY_PORT_DELETE = 8

METHOD_PRIORITY_MAP = {
'network_create_end': PRIORITY_NETWORK_CREATE,
'network_update_end': PRIORITY_NETWORK_UPDATE,
'network_delete_end': PRIORITY_NETWORK_DELETE,
'subnet_create_end': PRIORITY_SUBNET_UPDATE,
'subnet_update_end': PRIORITY_SUBNET_UPDATE,
'subnet_delete_end': PRIORITY_SUBNET_DELETE,
'port_create_end': PRIORITY_PORT_CREATE_LOW,
'port_update_end': PRIORITY_PORT_UPDATE,
'port_delete_end': PRIORITY_PORT_DELETE
}


LOG = logging.getLogger(__name__)

@@ -101,7 +134,8 @@ class DhcpAgentNotifyAPI(object):
for agent in new_agents:
self._cast_message(
context, 'network_create_end',
{'network': {'id': network['id']}}, agent['host'])
{'network': {'id': network['id']},
'priority': PRIORITY_NETWORK_CREATE}, agent['host'])
elif not existing_agents:
LOG.warning('Unable to schedule network %s: no agents '
'available; will retry on subsequent port '
@@ -147,6 +181,7 @@ class DhcpAgentNotifyAPI(object):

def _notify_agents(self, context, method, payload, network_id):
"""Notify all the agents that are hosting the network."""
payload['priority'] = METHOD_PRIORITY_MAP.get(method)
# fanout is required as we do not know who is "listening"
no_agents = not utils.is_extension_supported(
self.plugin, constants.DHCP_AGENT_SCHEDULER_EXT_ALIAS)
@@ -184,10 +219,21 @@ class DhcpAgentNotifyAPI(object):
return
enabled_agents = self._get_enabled_agents(
context, network, agents, method, payload)

if method == 'port_create_end':
high_agent = enabled_agents.pop(
random.randint(0, len(enabled_agents) - 1))
self._notify_high_priority_agent(
context, copy.deepcopy(payload), high_agent)
for agent in enabled_agents:
self._cast_message(
context, method, payload, agent.host, agent.topic)

def _notify_high_priority_agent(self, context, payload, agent):
payload['priority'] = PRIORITY_PORT_CREATE_HIGH
self._cast_message(context, "port_create_end",
payload, agent.host, agent.topic)

def _cast_message(self, context, method, payload, host,
topic=topics.DHCP_AGENT):
"""Cast the payload to the dhcp agent running on the host."""
@@ -201,11 +247,13 @@ class DhcpAgentNotifyAPI(object):

def network_removed_from_agent(self, context, network_id, host):
self._cast_message(context, 'network_delete_end',
{'network_id': network_id}, host)
{'network_id': network_id,
'priority': PRIORITY_NETWORK_DELETE}, host)

def network_added_to_agent(self, context, network_id, host):
self._cast_message(context, 'network_create_end',
{'network': {'id': network_id}}, host)
{'network': {'id': network_id},
'priority': PRIORITY_NETWORK_CREATE}, host)

def agent_updated(self, context, admin_state_up, host):
self._cast_message(context, 'agent_updated',


+ 65
- 32
neutron/tests/unit/agent/dhcp/test_agent.py View File

@@ -48,6 +48,7 @@ DHCP_PLUGIN = '%s.%s' % (rpc_api.__module__, rpc_api.__name__)
FAKE_NETWORK_UUID = '12345678-1234-5678-1234567890ab'
FAKE_NETWORK_DHCP_NS = "qdhcp-%s" % FAKE_NETWORK_UUID
FAKE_TENANT_ID = 'aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa'
FAKE_PRIORITY = 6


fake_subnet1_allocation_pools = dhcp.DictModel(dict(id='', start='172.9.9.2',
@@ -284,11 +285,15 @@ class TestDhcpAgent(base.BaseTestCase):
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
attrs_to_mock = dict(
[(a, mock.DEFAULT) for a in
['periodic_resync', 'start_ready_ports_loop']])
['periodic_resync', 'start_ready_ports_loop',
'_process_loop']])
with mock.patch.multiple(dhcp, **attrs_to_mock) as mocks:
dhcp.run()
mocks['periodic_resync'].assert_called_once_with()
mocks['start_ready_ports_loop'].assert_called_once_with()
with mock.patch.object(dhcp_agent.eventlet,
'spawn_n') as spawn_n:
dhcp.run()
mocks['periodic_resync'].assert_called_once_with()
mocks['start_ready_ports_loop'].assert_called_once_with()
spawn_n.assert_called_once_with(mocks['_process_loop'])

def test_call_driver(self):
network = mock.Mock()
@@ -891,29 +896,36 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
self._test_disable_isolated_metadata_proxy(fake_dist_network)

def test_network_create_end(self):
payload = dict(network=dict(id=fake_network.id))
payload = dict(network=dict(id=fake_network.id),
priority=FAKE_PRIORITY)

with mock.patch.object(self.dhcp, 'enable_dhcp_helper') as enable:
self.dhcp.network_create_end(None, payload)
self.dhcp._process_resource_update()
enable.assert_called_once_with(fake_network.id)

def test_network_update_end_admin_state_up(self):
payload = dict(network=dict(id=fake_network.id, admin_state_up=True))
payload = dict(network=dict(id=fake_network.id, admin_state_up=True),
priority=FAKE_PRIORITY)
with mock.patch.object(self.dhcp, 'enable_dhcp_helper') as enable:
self.dhcp.network_update_end(None, payload)
self.dhcp._process_resource_update()
enable.assert_called_once_with(fake_network.id)

def test_network_update_end_admin_state_down(self):
payload = dict(network=dict(id=fake_network.id, admin_state_up=False))
payload = dict(network=dict(id=fake_network.id, admin_state_up=False),
priority=FAKE_PRIORITY)
with mock.patch.object(self.dhcp, 'disable_dhcp_helper') as disable:
self.dhcp.network_update_end(None, payload)
self.dhcp._process_resource_update()
disable.assert_called_once_with(fake_network.id)

def test_network_delete_end(self):
payload = dict(network_id=fake_network.id)
payload = dict(network_id=fake_network.id, priority=FAKE_PRIORITY)

with mock.patch.object(self.dhcp, 'disable_dhcp_helper') as disable:
self.dhcp.network_delete_end(None, payload)
self.dhcp._process_resource_update()
disable.assert_called_once_with(fake_network.id)

def test_refresh_dhcp_helper_no_dhcp_enabled_networks(self):
@@ -955,13 +967,15 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
# attribute isn't True.
payload = dict(subnet=dhcp.DictModel(
dict(network_id=fake_network.id, enable_dhcp=False,
cidr='99.99.99.0/24', ip_version=4)))
cidr='99.99.99.0/24', ip_version=4)),
priority=FAKE_PRIORITY)
self.cache.get_network_by_id.return_value = fake_network
new_net = copy.deepcopy(fake_network)
new_net.subnets.append(payload['subnet'])
self.plugin.get_network_info.return_value = new_net

self.dhcp.subnet_create_end(None, payload)
self.dhcp._process_resource_update()

self.cache.assert_has_calls([mock.call.put(new_net)])
self.call_driver.assert_called_once_with('reload_allocations', new_net)
@@ -970,20 +984,24 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
self.call_driver.reset_mock()
payload = dict(subnet=dhcp.DictModel(
dict(network_id=fake_network.id, enable_dhcp=True,
cidr='99.99.88.0/24', ip_version=const.IP_VERSION_4)))
cidr='99.99.88.0/24', ip_version=4)),
priority=FAKE_PRIORITY)
new_net = copy.deepcopy(fake_network)
new_net.subnets.append(payload['subnet'])
self.plugin.get_network_info.return_value = new_net
self.dhcp.subnet_create_end(None, payload)
self.dhcp._process_resource_update()
self.cache.assert_has_calls([mock.call.put(new_net)])
self.call_driver.assert_called_once_with('restart', new_net)

def test_subnet_update_end(self):
payload = dict(subnet=dict(network_id=fake_network.id))
payload = dict(subnet=dict(network_id=fake_network.id),
priority=FAKE_PRIORITY)
self.cache.get_network_by_id.return_value = fake_network
self.plugin.get_network_info.return_value = fake_network

self.dhcp.subnet_update_end(None, payload)
self.dhcp._process_resource_update()

self.cache.assert_has_calls([mock.call.put(fake_network)])
self.call_driver.assert_called_once_with('reload_allocations',
@@ -993,7 +1011,8 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
self.dhcp.dhcp_ready_ports)

def test_subnet_update_dhcp(self):
payload = dict(subnet=dict(network_id=fake_network.id))
payload = dict(subnet=dict(network_id=fake_network.id),
priority=FAKE_PRIORITY)
self.cache.get_network_by_id.return_value = fake_network
new_net = copy.deepcopy(fake_network)
new_subnet1 = copy.deepcopy(fake_subnet1)
@@ -1002,6 +1021,7 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
new_net.subnets = [new_subnet1, new_subnet2]
self.plugin.get_network_info.return_value = new_net
self.dhcp.subnet_update_end(None, payload)
self.dhcp._process_resource_update()
self.call_driver.assert_called_once_with('restart', new_net)

self.call_driver.reset_mock()
@@ -1013,6 +1033,7 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
new_net2.subnets = [new_subnet1, new_subnet2]
self.plugin.get_network_info.return_value = new_net2
self.dhcp.subnet_update_end(None, payload)
self.dhcp._process_resource_update()
self.call_driver.assert_called_once_with('restart', new_net2)

def test_subnet_update_end_restart(self):
@@ -1022,11 +1043,13 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
subnets=[fake_subnet1, fake_subnet3],
ports=[fake_port1]))

payload = dict(subnet=dict(network_id=fake_network.id))
payload = dict(subnet=dict(network_id=fake_network.id),
priority=FAKE_PRIORITY)
self.cache.get_network_by_id.return_value = fake_network
self.plugin.get_network_info.return_value = new_state

self.dhcp.subnet_update_end(None, payload)
self.dhcp._process_resource_update()

self.cache.assert_has_calls([mock.call.put(new_state)])
self.call_driver.assert_called_once_with('restart',
@@ -1039,12 +1062,13 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
subnets=[fake_subnet1, fake_subnet3],
ports=[fake_port1]))

payload = dict(subnet_id=fake_subnet1.id)
payload = dict(subnet_id=fake_subnet1.id, priority=FAKE_PRIORITY)
self.cache.get_network_by_subnet_id.return_value = prev_state
self.cache.get_network_by_id.return_value = prev_state
self.plugin.get_network_info.return_value = fake_network

self.dhcp.subnet_delete_end(None, payload)
self.dhcp._process_resource_update()

self.cache.assert_has_calls([
mock.call.get_network_by_subnet_id(
@@ -1063,12 +1087,14 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
subnets=[fake_subnet1, fake_subnet3],
ports=[fake_port1]))

payload = dict(subnet_id=fake_subnet1.id, network_id=fake_network.id)
payload = dict(subnet_id=fake_subnet1.id, network_id=fake_network.id,
priority=FAKE_PRIORITY)
self.cache.get_network_by_subnet_id.return_value = prev_state
self.cache.get_network_by_id.return_value = prev_state
self.plugin.get_network_info.return_value = fake_network

self.dhcp.subnet_delete_end(None, payload)
self.dhcp._process_resource_update()

self.cache.assert_has_calls([
mock.call.get_network_by_subnet_id(
@@ -1085,6 +1111,7 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
payload = dict(port=copy.deepcopy(fake_port2))
self.cache.get_network_by_id.return_value = fake_network
self.dhcp.port_update_end(None, payload)
self.dhcp._process_resource_update()
self.reload_allocations.assert_called_once_with(fake_port2,
fake_network)

@@ -1105,19 +1132,12 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
payload = dict(port=copy.deepcopy(fake_port2))
self.cache.get_network_by_id.return_value = fake_network
self.dhcp.port_create_end(None, payload)
self.dhcp._process_resource_update()
self.reload_allocations.assert_called_once_with(fake_port2,
fake_network)

def test_port_update_end_grabs_lock(self):
payload = dict(port=fake_port2)
self.cache.get_network_by_id.return_value = None
self.cache.get_port_by_id.return_value = fake_port2
with mock.patch('neutron.agent.dhcp.agent._net_lock') as nl:
self.dhcp.port_update_end(None, payload)
nl.assert_called_once_with(fake_port2.network_id)

def test_port_update_change_ip_on_port(self):
payload = dict(port=fake_port1)
payload = dict(port=fake_port1, priority=FAKE_PRIORITY)
self.cache.get_network_by_id.return_value = fake_network
updated_fake_port1 = copy.deepcopy(fake_port1)
updated_fake_port1.fixed_ips[0].ip_address = '172.9.9.99'
@@ -1125,6 +1145,7 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
with mock.patch.object(
self.dhcp, 'update_isolated_metadata_proxy') as ump:
self.dhcp.port_update_end(None, payload)
self.dhcp._process_resource_update()
self.cache.assert_has_calls(
[mock.call.get_network_by_id(fake_port1.network_id),
mock.call.put_port(mock.ANY)])
@@ -1135,35 +1156,38 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
def test_port_update_change_subnet_on_dhcp_agents_port(self):
self.cache.get_network_by_id.return_value = fake_network
self.cache.get_port_by_id.return_value = fake_port1
payload = dict(port=copy.deepcopy(fake_port1))
payload = dict(port=copy.deepcopy(fake_port1), priority=FAKE_PRIORITY)
device_id = utils.get_dhcp_agent_device_id(
payload['port']['network_id'], self.dhcp.conf.host)
payload['port']['fixed_ips'][0]['subnet_id'] = '77777-7777'
payload['port']['device_id'] = device_id
self.dhcp.port_update_end(None, payload)
self.dhcp._process_resource_update()
self.assertFalse(self.call_driver.called)

def test_port_update_change_ip_on_dhcp_agents_port(self):
self.cache.get_network_by_id.return_value = fake_network
self.cache.get_port_by_id.return_value = fake_port1
payload = dict(port=copy.deepcopy(fake_port1))
payload = dict(port=copy.deepcopy(fake_port1), priority=FAKE_PRIORITY)
device_id = utils.get_dhcp_agent_device_id(
payload['port']['network_id'], self.dhcp.conf.host)
payload['port']['fixed_ips'][0]['ip_address'] = '172.9.9.99'
payload['port']['device_id'] = device_id
self.dhcp.port_update_end(None, payload)
self.dhcp._process_resource_update()
self.call_driver.assert_has_calls(
[mock.call.call_driver('restart', fake_network)])

def test_port_update_change_ip_on_dhcp_agents_port_cache_miss(self):
self.cache.get_network_by_id.return_value = fake_network
self.cache.get_port_by_id.return_value = None
payload = dict(port=copy.deepcopy(fake_port1))
payload = dict(port=copy.deepcopy(fake_port1), priority=FAKE_PRIORITY)
device_id = utils.get_dhcp_agent_device_id(
payload['port']['network_id'], self.dhcp.conf.host)
payload['port']['fixed_ips'][0]['ip_address'] = '172.9.9.99'
payload['port']['device_id'] = device_id
self.dhcp.port_update_end(None, payload)
self.dhcp._process_resource_update()
self.schedule_resync.assert_called_once_with(mock.ANY,
fake_port1.network_id)

@@ -1173,28 +1197,31 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
duplicate_ip = fake_port1['fixed_ips'][0]['ip_address']
payload['port']['fixed_ips'][0]['ip_address'] = duplicate_ip
self.dhcp.port_create_end(None, payload)
self.dhcp._process_resource_update()
self.schedule_resync.assert_called_once_with(mock.ANY,
fake_port2.network_id)

def test_port_update_on_dhcp_agents_port_no_ip_change(self):
self.cache.get_network_by_id.return_value = fake_network
self.cache.get_port_by_id.return_value = fake_port1
payload = dict(port=fake_port1)
payload = dict(port=fake_port1, priority=FAKE_PRIORITY)
device_id = utils.get_dhcp_agent_device_id(
payload['port']['network_id'], self.dhcp.conf.host)
payload['port']['device_id'] = device_id
self.dhcp.port_update_end(None, payload)
self.dhcp._process_resource_update()
self.call_driver.assert_has_calls(
[mock.call.call_driver('reload_allocations', fake_network)])

def test_port_delete_end_no_network_id(self):
payload = dict(port_id=fake_port2.id)
payload = dict(port_id=fake_port2.id, priority=FAKE_PRIORITY)
self.cache.get_network_by_id.return_value = fake_network
self.cache.get_port_by_id.return_value = fake_port2

with mock.patch.object(
self.dhcp, 'update_isolated_metadata_proxy') as ump:
self.dhcp.port_delete_end(None, payload)
self.dhcp._process_resource_update()
self.cache.assert_has_calls(
[mock.call.get_port_by_id(fake_port2.id),
mock.call.get_port_by_id(fake_port2.id),
@@ -1206,13 +1233,15 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
self.assertTrue(ump.called)

def test_port_delete_end(self):
payload = dict(port_id=fake_port2.id, network_id=fake_network.id)
payload = dict(port_id=fake_port2.id, network_id=fake_network.id,
priority=FAKE_PRIORITY)
self.cache.get_network_by_id.return_value = fake_network
self.cache.get_port_by_id.return_value = fake_port2

with mock.patch.object(
self.dhcp, 'update_isolated_metadata_proxy') as ump:
self.dhcp.port_delete_end(None, payload)
self.dhcp._process_resource_update()
self.cache.assert_has_calls(
[mock.call.get_port_by_id(fake_port2.id),
mock.call.deleted_ports.add(fake_port2.id),
@@ -1223,10 +1252,12 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
self.assertTrue(ump.called)

def test_port_delete_end_unknown_port(self):
payload = dict(port_id='unknown', network_id='unknown')
payload = dict(port_id='unknown', network_id='unknown',
priority=FAKE_PRIORITY)
self.cache.get_port_by_id.return_value = None

self.dhcp.port_delete_end(None, payload)
self.dhcp._process_resource_update()

self.cache.assert_has_calls([mock.call.get_port_by_id('unknown')])
self.assertEqual(self.call_driver.call_count, 0)
@@ -1239,7 +1270,9 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
self.cache.get_network_by_id.return_value = fake_network
self.cache.get_port_by_id.return_value = port
self.dhcp.port_delete_end(None, {'port_id': port.id,
'network_id': fake_network.id})
'network_id': fake_network.id,
'priority': FAKE_PRIORITY})
self.dhcp._process_resource_update()
self.call_driver.assert_has_calls(
[mock.call.call_driver('disable', fake_network)])



+ 36
- 1
neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py View File

@@ -13,12 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
import datetime

import mock
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants
from neutron_lib.plugins import directory
from oslo_utils import timeutils
from oslo_utils import uuidutils
@@ -138,10 +140,43 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
network = {'id': 'foo_network_id'}
self._test__get_enabled_agents(network, agents=[agent1, agent2])

def test__notify_agents_allocate_priority(self):
mock_context = mock.MagicMock()
mock_context.is_admin = True
methods = ['network_create_end', 'network_update_end',
'network_delete_end', 'subnet_create_end',
'subnet_update_end', 'subnet_delete_end',
'port_create_end', 'port_update_end', 'port_delete_end']
with mock.patch.object(self.notifier, '_schedule_network') as f:
with mock.patch.object(self.notifier, '_get_enabled_agents') as g:
for method in methods:
f.return_value = [mock.MagicMock()]
g.return_value = [mock.MagicMock()]
payload = {}
if method.startswith('port'):
payload['port'] = \
{'device_id':
constants.DEVICE_ID_RESERVED_DHCP_PORT}
expected_payload = copy.deepcopy(payload)
expected_payload['priority'] = \
dhcp_rpc_agent_api.METHOD_PRIORITY_MAP.get(method)
self.notifier._notify_agents(mock_context, method, payload,
'fake_network_id')
if method == 'network_delete_end':
self.mock_fanout.assert_called_with(mock.ANY, method,
expected_payload)
elif method != 'network_create_end':
if method == 'port_create_end':
expected_payload['priority'] = \
dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_HIGH
self.mock_cast.assert_called_with(mock.ANY, method,
expected_payload,
mock.ANY, mock.ANY)

def test__notify_agents_fanout_required(self):
self.notifier._notify_agents(mock.ANY,
'network_delete_end',
mock.ANY, 'foo_network_id')
{}, 'foo_network_id')
self.assertEqual(1, self.mock_fanout.call_count)

def _test__notify_agents_with_function(


+ 49
- 12
neutron/tests/unit/db/test_agentschedulers_db.py View File

@@ -27,6 +27,7 @@ from oslo_utils import uuidutils
from webob import exc

from neutron.api import extensions
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import l3_rpc
@@ -46,10 +47,12 @@ from neutron.tests.unit.extensions import test_l3
from neutron.tests.unit import testlib_api
from neutron import wsgi


L3_HOSTA = 'hosta'
DHCP_HOSTA = 'hosta'
L3_HOSTB = 'hostb'
DHCP_HOSTC = 'hostc'
DHCP_HOSTD = 'hostd'

DEVICE_OWNER_COMPUTE = ''.join([constants.DEVICE_OWNER_COMPUTE_PREFIX,
'test:',
@@ -1313,7 +1316,9 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn,
network_id)
self.dhcp_notifier_cast.assert_called_with(
mock.ANY, 'network_create_end',
{'network': {'id': network_id}}, DHCP_HOSTA)
{'network': {'id': network_id},
'priority': dhcp_rpc_agent_api.PRIORITY_NETWORK_CREATE},
DHCP_HOSTA)
notifications = fake_notifier.NOTIFICATIONS
expected_event_type = 'dhcp_agent.network.add'
self._assert_notify(notifications, expected_event_type)
@@ -1331,7 +1336,9 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn,
network_id)
self.dhcp_notifier_cast.assert_called_with(
mock.ANY, 'network_delete_end',
{'network_id': network_id}, DHCP_HOSTA)
{'network_id': network_id,
'priority': dhcp_rpc_agent_api.PRIORITY_NETWORK_DELETE},
DHCP_HOSTA)
notifications = fake_notifier.NOTIFICATIONS
expected_event_type = 'dhcp_agent.network.remove'
self._assert_notify(notifications, expected_event_type)
@@ -1374,17 +1381,19 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn,
ctx = context.get_admin_context()
net['network'] = self.plugin.get_network(ctx, net['network']['id'])
sub['subnet'] = self.plugin.get_subnet(ctx, sub['subnet']['id'])
sub['priority'] = dhcp_rpc_agent_api.PRIORITY_SUBNET_UPDATE
port['port'] = self.plugin.get_port(ctx, port['port']['id'])
return net, sub, port

def _notification_mocks(self, hosts, net, subnet, port):
def _notification_mocks(self, hosts, net, subnet, port, port_priority):
host_calls = {}
for host in hosts:
expected_calls = [
mock.call(
mock.ANY,
'network_create_end',
{'network': {'id': net['network']['id']}},
{'priority': dhcp_rpc_agent_api.PRIORITY_NETWORK_CREATE,
'network': {'id': net['network']['id']}},
host),
mock.call(
mock.ANY,
@@ -1394,7 +1403,8 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn,
mock.call(
mock.ANY,
'port_create_end',
{'port': port['port']},
{'port': port['port'],
'priority': port_priority},
host, 'dhcp_agent')]
host_calls[host] = expected_calls
return host_calls
@@ -1402,19 +1412,46 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn,
def test_network_port_create_notification(self):
hosts = [DHCP_HOSTA]
net, subnet, port = self._network_port_create(hosts)
expected_calls = self._notification_mocks(hosts, net, subnet, port)
expected_calls = self._notification_mocks(
hosts, net, subnet, port,
dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_HIGH)
self.assertEqual(
expected_calls[DHCP_HOSTA], self.dhcp_notifier_cast.call_args_list)

def test_network_ha_port_create_notification(self):
cfg.CONF.set_override('dhcp_agents_per_network', 2)
hosts = [DHCP_HOSTA, DHCP_HOSTC]
cfg.CONF.set_override('dhcp_agents_per_network', 3)
hosts = [DHCP_HOSTA, DHCP_HOSTC, DHCP_HOSTD]
net, subnet, port = self._network_port_create(hosts)
expected_calls = self._notification_mocks(hosts, net, subnet, port)
for expected in expected_calls[DHCP_HOSTA]:
self.assertIn(expected, self.dhcp_notifier_cast.call_args_list)
for expected in expected_calls[DHCP_HOSTC]:
for host_call in self.dhcp_notifier_cast.call_args_list:
if ("'priority': " + str(
dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_HIGH)
in str(host_call)):
if DHCP_HOSTA in str(host_call):
expected_high_calls = self._notification_mocks(
[DHCP_HOSTA], net, subnet, port,
dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_HIGH)
high_host = DHCP_HOSTA
hosts.pop(0)
elif DHCP_HOSTC in str(host_call):
expected_high_calls = self._notification_mocks(
[DHCP_HOSTC], net, subnet, port,
dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_HIGH)
high_host = DHCP_HOSTC
hosts.pop(1)
elif DHCP_HOSTD in str(host_call):
expected_high_calls = self._notification_mocks(
[DHCP_HOSTD], net, subnet, port,
dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_HIGH)
high_host = DHCP_HOSTD
hosts.pop(2)
expected_low_calls = self._notification_mocks(
hosts, net, subnet, port,
dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_LOW)
for expected in expected_high_calls[high_host]:
self.assertIn(expected, self.dhcp_notifier_cast.call_args_list)
for host, low_expecteds in expected_low_calls.items():
for expected in low_expecteds:
self.assertIn(expected, self.dhcp_notifier_cast.call_args_list)

def _is_schedule_network_called(self, device_id):
dhcp_notifier_schedule = mock.patch(


Loading…
Cancel
Save