Merge "Improve port dhcp Provisioning" into stable/queens

This commit is contained in:
Zuul 2019-04-11 12:13:18 +00:00 committed by Gerrit Code Review
commit 8c4431562e
6 changed files with 358 additions and 129 deletions

View File

@ -105,7 +105,7 @@ class ExclusiveResourceProcessor(object):
if id not in self._masters:
self._masters[id] = self
self._queue = []
self._queue = Queue.PriorityQueue(-1)
self._master = self._masters[id]
@ -135,7 +135,7 @@ class ExclusiveResourceProcessor(object):
resource is being processed. These updates have already bubbled to
the front of the ResourceProcessingQueue.
"""
self._master._queue.append(update)
self._master._queue.put(update)
def updates(self):
"""Processes the resource until updates stop coming
@ -144,13 +144,14 @@ class ExclusiveResourceProcessor(object):
may come in from other workers while it is in progress. This method
loops until they stop coming.
"""
if self._i_am_master():
while self._queue:
# Remove the update from the queue even if it is old.
update = self._queue.pop(0)
# Process the update only if it is fresh.
if self._get_resource_data_timestamp() < update.timestamp:
yield update
while self._i_am_master():
if self._queue.empty():
return
# Get the update from the queue even if it is old.
update = self._queue.get()
# Process the update only if it is fresh.
if self._get_resource_data_timestamp() < update.timestamp:
yield update
class ResourceProcessingQueue(object):

View File

@ -21,7 +21,6 @@ from neutron_lib.agent import constants as agent_consts
from neutron_lib import constants
from neutron_lib import context
from neutron_lib import exceptions
from neutron_lib.utils import runtime
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
@ -32,6 +31,7 @@ from oslo_utils import importutils
import six
from neutron._i18n import _
from neutron.agent.common import resource_processing_queue as queue
from neutron.agent.linux import dhcp
from neutron.agent.linux import external_process
from neutron.agent.metadata import driver as metadata_driver
@ -45,6 +45,8 @@ from neutron import manager
LOG = logging.getLogger(__name__)
_SYNC_STATE_LOCK = lockutils.ReaderWriterLock()
DEFAULT_PRIORITY = 255
def _sync_lock(f):
"""Decorator to block all operations for a global sync call."""
@ -64,12 +66,6 @@ def _wait_if_syncing(f):
return wrapped
def _net_lock(network_id):
"""Returns a context manager lock based on network_id."""
lock_name = 'dhcp-agent-network-lock-%s' % network_id
return lockutils.lock(lock_name, runtime.SYNCHRONIZED_PREFIX)
class DhcpAgent(manager.Manager):
"""DHCP agent service manager.
@ -100,6 +96,7 @@ class DhcpAgent(manager.Manager):
self._process_monitor = external_process.ProcessMonitor(
config=self.conf,
resource_type='dhcp')
self._queue = queue.ResourceProcessingQueue()
def init_host(self):
self.sync_state()
@ -128,6 +125,7 @@ class DhcpAgent(manager.Manager):
"""Activate the DHCP agent."""
self.periodic_resync()
self.start_ready_ports_loop()
eventlet.spawn_n(self._process_loop)
def call_driver(self, action, network, **action_kwargs):
"""Invoke an action on a DHCP driver instance."""
@ -354,36 +352,64 @@ class DhcpAgent(manager.Manager):
# Update the metadata proxy after the dhcp driver has been updated
self.update_isolated_metadata_proxy(network)
@_wait_if_syncing
def network_create_end(self, context, payload):
"""Handle the network.create.end notification event."""
network_id = payload['network']['id']
with _net_lock(network_id):
self.enable_dhcp_helper(network_id)
update = queue.ResourceUpdate(payload['network']['id'],
payload.get('priority',
DEFAULT_PRIORITY),
action='_network_create',
resource=payload)
self._queue.add(update)
@_wait_if_syncing
def _network_create(self, payload):
network_id = payload['network']['id']
self.enable_dhcp_helper(network_id)
def network_update_end(self, context, payload):
"""Handle the network.update.end notification event."""
network_id = payload['network']['id']
with _net_lock(network_id):
if payload['network']['admin_state_up']:
self.enable_dhcp_helper(network_id)
else:
self.disable_dhcp_helper(network_id)
update = queue.ResourceUpdate(payload['network']['id'],
payload.get('priority',
DEFAULT_PRIORITY),
action='_network_update',
resource=payload)
self._queue.add(update)
@_wait_if_syncing
def network_delete_end(self, context, payload):
"""Handle the network.delete.end notification event."""
network_id = payload['network_id']
with _net_lock(network_id):
def _network_update(self, payload):
network_id = payload['network']['id']
if payload['network']['admin_state_up']:
self.enable_dhcp_helper(network_id)
else:
self.disable_dhcp_helper(network_id)
def network_delete_end(self, context, payload):
"""Handle the network.delete.end notification event."""
update = queue.ResourceUpdate(payload['network_id'],
payload.get('priority',
DEFAULT_PRIORITY),
action='_network_delete',
resource=payload)
self._queue.add(update)
@_wait_if_syncing
def _network_delete(self, payload):
network_id = payload['network_id']
self.disable_dhcp_helper(network_id)
def subnet_update_end(self, context, payload):
"""Handle the subnet.update.end notification event."""
update = queue.ResourceUpdate(payload['subnet']['network_id'],
payload.get('priority',
DEFAULT_PRIORITY),
action='_subnet_update',
resource=payload)
self._queue.add(update)
@_wait_if_syncing
def _subnet_update(self, payload):
network_id = payload['subnet']['network_id']
with _net_lock(network_id):
self.refresh_dhcp_helper(network_id)
self.refresh_dhcp_helper(network_id)
# Use the update handler for the subnet create event.
subnet_create_end = subnet_update_end
@ -406,31 +432,63 @@ class DhcpAgent(manager.Manager):
port = self.cache.get_port_by_id(port_id)
return port.network_id if port else None
@_wait_if_syncing
def subnet_delete_end(self, context, payload):
"""Handle the subnet.delete.end notification event."""
network_id = self._get_network_lock_id(payload)
if not network_id:
return
with _net_lock(network_id):
subnet_id = payload['subnet_id']
network = self.cache.get_network_by_subnet_id(subnet_id)
if not network:
return
self.refresh_dhcp_helper(network.id)
update = queue.ResourceUpdate(network_id,
payload.get('priority',
DEFAULT_PRIORITY),
action='_subnet_delete',
resource=payload)
self._queue.add(update)
@_wait_if_syncing
def _subnet_delete(self, payload):
network_id = self._get_network_lock_id(payload)
if not network_id:
return
subnet_id = payload['subnet_id']
network = self.cache.get_network_by_subnet_id(subnet_id)
if not network:
return
self.refresh_dhcp_helper(network.id)
def _process_loop(self):
LOG.debug("Starting _process_loop")
pool = eventlet.GreenPool(size=8)
while True:
pool.spawn_n(self._process_resource_update)
def _process_resource_update(self):
for tmp, update in self._queue.each_update_to_next_resource():
method = getattr(self, update.action)
method(update.resource)
def port_update_end(self, context, payload):
"""Handle the port.update.end notification event."""
updated_port = dhcp.DictModel(payload['port'])
with _net_lock(updated_port.network_id):
if self.cache.is_port_message_stale(payload['port']):
LOG.debug("Discarding stale port update: %s", updated_port)
return
network = self.cache.get_network_by_id(updated_port.network_id)
if not network:
return
self.reload_allocations(updated_port, network)
if self.cache.is_port_message_stale(updated_port):
LOG.debug("Discarding stale port update: %s", updated_port)
return
update = queue.ResourceUpdate(updated_port.network_id,
payload.get('priority',
DEFAULT_PRIORITY),
action='_port_update',
resource=updated_port)
self._queue.add(update)
@_wait_if_syncing
def _port_update(self, updated_port):
if self.cache.is_port_message_stale(updated_port):
LOG.debug("Discarding stale port update: %s", updated_port)
return
network = self.cache.get_network_by_id(updated_port.network_id)
if not network:
return
self.reload_allocations(updated_port, network)
def reload_allocations(self, port, network):
LOG.info("Trigger reload_allocations for port %s", port)
@ -466,50 +524,67 @@ class DhcpAgent(manager.Manager):
port['network_id'], self.conf.host)
return port['device_id'] == thishost
@_wait_if_syncing
def port_create_end(self, context, payload):
"""Handle the port.create.end notification event."""
created_port = dhcp.DictModel(payload['port'])
with _net_lock(created_port.network_id):
network = self.cache.get_network_by_id(created_port.network_id)
if not network:
return
new_ips = {i['ip_address'] for i in created_port['fixed_ips']}
for port_cached in network.ports:
# if there are other ports cached with the same ip address in
# the same network this indicate that the cache is out of sync
cached_ips = {i['ip_address']
for i in port_cached['fixed_ips']}
if new_ips.intersection(cached_ips):
self.schedule_resync("Duplicate IP addresses found, "
"DHCP cache is out of sync",
created_port.network_id)
return
self.reload_allocations(created_port, network)
update = queue.ResourceUpdate(created_port.network_id,
payload.get('priority',
DEFAULT_PRIORITY),
action='_port_create',
resource=created_port)
self._queue.add(update)
@_wait_if_syncing
def _port_create(self, created_port):
network = self.cache.get_network_by_id(created_port.network_id)
if not network:
return
new_ips = {i['ip_address'] for i in created_port['fixed_ips']}
for port_cached in network.ports:
# if there are other ports cached with the same ip address in
# the same network this indicate that the cache is out of sync
cached_ips = {i['ip_address']
for i in port_cached['fixed_ips']}
if new_ips.intersection(cached_ips):
self.schedule_resync("Duplicate IP addresses found, "
"DHCP cache is out of sync",
created_port.network_id)
return
self.reload_allocations(created_port, network)
def port_delete_end(self, context, payload):
"""Handle the port.delete.end notification event."""
network_id = self._get_network_lock_id(payload)
if not network_id:
return
with _net_lock(network_id):
port_id = payload['port_id']
port = self.cache.get_port_by_id(port_id)
self.cache.deleted_ports.add(port_id)
if not port:
return
network = self.cache.get_network_by_id(port.network_id)
self.cache.remove_port(port)
if self._is_port_on_this_agent(port):
# the agent's port has been deleted. disable the service
# and add the network to the resync list to create
# (or acquire a reserved) port.
self.call_driver('disable', network)
self.schedule_resync("Agent port was deleted", port.network_id)
else:
self.call_driver('reload_allocations', network)
self.update_isolated_metadata_proxy(network)
update = queue.ResourceUpdate(network_id,
payload.get('priority',
DEFAULT_PRIORITY),
action='_port_delete',
resource=payload)
self._queue.add(update)
@_wait_if_syncing
def _port_delete(self, payload):
network_id = self._get_network_lock_id(payload)
if not network_id:
return
port_id = payload['port_id']
port = self.cache.get_port_by_id(port_id)
self.cache.deleted_ports.add(port_id)
if not port:
return
network = self.cache.get_network_by_id(port.network_id)
self.cache.remove_port(port)
if self._is_port_on_this_agent(port):
# the agent's port has been deleted. disable the service
# and add the network to the resync list to create
# (or acquire a reserved) port.
self.call_driver('disable', network)
self.schedule_resync("Agent port was deleted", port.network_id)
else:
self.call_driver('reload_allocations', network)
self.update_isolated_metadata_proxy(network)
def update_isolated_metadata_proxy(self, network):
"""Spawn or kill metadata proxy.

View File

@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import random
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
@ -26,6 +29,36 @@ from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils
# Priorities - lower value is higher priority
PRIORITY_NETWORK_CREATE = 0
PRIORITY_NETWORK_UPDATE = 1
PRIORITY_NETWORK_DELETE = 2
PRIORITY_SUBNET_UPDATE = 3
PRIORITY_SUBNET_DELETE = 4
# In order to improve port dhcp provisioning when nova concurrently create
# multiple vms, I classify the port_create_end message to two levels, the
# high-level message only cast to one agent, the low-level message cast to all
# other agent. In this way, When there are a large number of ports that need to
# be processed, we can dispatch the high priority message of port to different
# agent, so that the processed port will not block other port's processing in
# other dhcp agents.
PRIORITY_PORT_CREATE_HIGH = 5
PRIORITY_PORT_CREATE_LOW = 6
PRIORITY_PORT_UPDATE = 7
PRIORITY_PORT_DELETE = 8
METHOD_PRIORITY_MAP = {
'network_create_end': PRIORITY_NETWORK_CREATE,
'network_update_end': PRIORITY_NETWORK_UPDATE,
'network_delete_end': PRIORITY_NETWORK_DELETE,
'subnet_create_end': PRIORITY_SUBNET_UPDATE,
'subnet_update_end': PRIORITY_SUBNET_UPDATE,
'subnet_delete_end': PRIORITY_SUBNET_DELETE,
'port_create_end': PRIORITY_PORT_CREATE_LOW,
'port_update_end': PRIORITY_PORT_UPDATE,
'port_delete_end': PRIORITY_PORT_DELETE
}
LOG = logging.getLogger(__name__)
@ -101,7 +134,8 @@ class DhcpAgentNotifyAPI(object):
for agent in new_agents:
self._cast_message(
context, 'network_create_end',
{'network': {'id': network['id']}}, agent['host'])
{'network': {'id': network['id']},
'priority': PRIORITY_NETWORK_CREATE}, agent['host'])
elif not existing_agents:
LOG.warning('Unable to schedule network %s: no agents '
'available; will retry on subsequent port '
@ -147,6 +181,7 @@ class DhcpAgentNotifyAPI(object):
def _notify_agents(self, context, method, payload, network_id):
"""Notify all the agents that are hosting the network."""
payload['priority'] = METHOD_PRIORITY_MAP.get(method)
# fanout is required as we do not know who is "listening"
no_agents = not utils.is_extension_supported(
self.plugin, constants.DHCP_AGENT_SCHEDULER_EXT_ALIAS)
@ -184,10 +219,21 @@ class DhcpAgentNotifyAPI(object):
return
enabled_agents = self._get_enabled_agents(
context, network, agents, method, payload)
if method == 'port_create_end':
high_agent = enabled_agents.pop(
random.randint(0, len(enabled_agents) - 1))
self._notify_high_priority_agent(
context, copy.deepcopy(payload), high_agent)
for agent in enabled_agents:
self._cast_message(
context, method, payload, agent.host, agent.topic)
def _notify_high_priority_agent(self, context, payload, agent):
payload['priority'] = PRIORITY_PORT_CREATE_HIGH
self._cast_message(context, "port_create_end",
payload, agent.host, agent.topic)
def _cast_message(self, context, method, payload, host,
topic=topics.DHCP_AGENT):
"""Cast the payload to the dhcp agent running on the host."""
@ -201,11 +247,13 @@ class DhcpAgentNotifyAPI(object):
def network_removed_from_agent(self, context, network_id, host):
self._cast_message(context, 'network_delete_end',
{'network_id': network_id}, host)
{'network_id': network_id,
'priority': PRIORITY_NETWORK_DELETE}, host)
def network_added_to_agent(self, context, network_id, host):
self._cast_message(context, 'network_create_end',
{'network': {'id': network_id}}, host)
{'network': {'id': network_id},
'priority': PRIORITY_NETWORK_CREATE}, host)
def agent_updated(self, context, admin_state_up, host):
self._cast_message(context, 'agent_updated',

View File

@ -48,6 +48,7 @@ DHCP_PLUGIN = '%s.%s' % (rpc_api.__module__, rpc_api.__name__)
FAKE_NETWORK_UUID = '12345678-1234-5678-1234567890ab'
FAKE_NETWORK_DHCP_NS = "qdhcp-%s" % FAKE_NETWORK_UUID
FAKE_TENANT_ID = 'aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa'
FAKE_PRIORITY = 6
fake_subnet1_allocation_pools = dhcp.DictModel(dict(id='', start='172.9.9.2',
@ -284,11 +285,15 @@ class TestDhcpAgent(base.BaseTestCase):
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
attrs_to_mock = dict(
[(a, mock.DEFAULT) for a in
['periodic_resync', 'start_ready_ports_loop']])
['periodic_resync', 'start_ready_ports_loop',
'_process_loop']])
with mock.patch.multiple(dhcp, **attrs_to_mock) as mocks:
dhcp.run()
mocks['periodic_resync'].assert_called_once_with()
mocks['start_ready_ports_loop'].assert_called_once_with()
with mock.patch.object(dhcp_agent.eventlet,
'spawn_n') as spawn_n:
dhcp.run()
mocks['periodic_resync'].assert_called_once_with()
mocks['start_ready_ports_loop'].assert_called_once_with()
spawn_n.assert_called_once_with(mocks['_process_loop'])
def test_call_driver(self):
network = mock.Mock()
@ -891,29 +896,36 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
self._test_disable_isolated_metadata_proxy(fake_dist_network)
def test_network_create_end(self):
payload = dict(network=dict(id=fake_network.id))
payload = dict(network=dict(id=fake_network.id),
priority=FAKE_PRIORITY)
with mock.patch.object(self.dhcp, 'enable_dhcp_helper') as enable:
self.dhcp.network_create_end(None, payload)
self.dhcp._process_resource_update()
enable.assert_called_once_with(fake_network.id)
def test_network_update_end_admin_state_up(self):
payload = dict(network=dict(id=fake_network.id, admin_state_up=True))
payload = dict(network=dict(id=fake_network.id, admin_state_up=True),
priority=FAKE_PRIORITY)
with mock.patch.object(self.dhcp, 'enable_dhcp_helper') as enable:
self.dhcp.network_update_end(None, payload)
self.dhcp._process_resource_update()
enable.assert_called_once_with(fake_network.id)
def test_network_update_end_admin_state_down(self):
payload = dict(network=dict(id=fake_network.id, admin_state_up=False))
payload = dict(network=dict(id=fake_network.id, admin_state_up=False),
priority=FAKE_PRIORITY)
with mock.patch.object(self.dhcp, 'disable_dhcp_helper') as disable:
self.dhcp.network_update_end(None, payload)
self.dhcp._process_resource_update()
disable.assert_called_once_with(fake_network.id)
def test_network_delete_end(self):
payload = dict(network_id=fake_network.id)
payload = dict(network_id=fake_network.id, priority=FAKE_PRIORITY)
with mock.patch.object(self.dhcp, 'disable_dhcp_helper') as disable:
self.dhcp.network_delete_end(None, payload)
self.dhcp._process_resource_update()
disable.assert_called_once_with(fake_network.id)
def test_refresh_dhcp_helper_no_dhcp_enabled_networks(self):
@ -955,13 +967,15 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
# attribute isn't True.
payload = dict(subnet=dhcp.DictModel(
dict(network_id=fake_network.id, enable_dhcp=False,
cidr='99.99.99.0/24', ip_version=4)))
cidr='99.99.99.0/24', ip_version=4)),
priority=FAKE_PRIORITY)
self.cache.get_network_by_id.return_value = fake_network
new_net = copy.deepcopy(fake_network)
new_net.subnets.append(payload['subnet'])
self.plugin.get_network_info.return_value = new_net
self.dhcp.subnet_create_end(None, payload)
self.dhcp._process_resource_update()
self.cache.assert_has_calls([mock.call.put(new_net)])
self.call_driver.assert_called_once_with('reload_allocations', new_net)
@ -970,20 +984,24 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
self.call_driver.reset_mock()
payload = dict(subnet=dhcp.DictModel(
dict(network_id=fake_network.id, enable_dhcp=True,
cidr='99.99.88.0/24', ip_version=const.IP_VERSION_4)))
cidr='99.99.88.0/24', ip_version=4)),
priority=FAKE_PRIORITY)
new_net = copy.deepcopy(fake_network)
new_net.subnets.append(payload['subnet'])
self.plugin.get_network_info.return_value = new_net
self.dhcp.subnet_create_end(None, payload)
self.dhcp._process_resource_update()
self.cache.assert_has_calls([mock.call.put(new_net)])
self.call_driver.assert_called_once_with('restart', new_net)
def test_subnet_update_end(self):
payload = dict(subnet=dict(network_id=fake_network.id))
payload = dict(subnet=dict(network_id=fake_network.id),
priority=FAKE_PRIORITY)
self.cache.get_network_by_id.return_value = fake_network
self.plugin.get_network_info.return_value = fake_network
self.dhcp.subnet_update_end(None, payload)
self.dhcp._process_resource_update()
self.cache.assert_has_calls([mock.call.put(fake_network)])
self.call_driver.assert_called_once_with('reload_allocations',
@ -993,7 +1011,8 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
self.dhcp.dhcp_ready_ports)
def test_subnet_update_dhcp(self):
payload = dict(subnet=dict(network_id=fake_network.id))
payload = dict(subnet=dict(network_id=fake_network.id),
priority=FAKE_PRIORITY)
self.cache.get_network_by_id.return_value = fake_network
new_net = copy.deepcopy(fake_network)
new_subnet1 = copy.deepcopy(fake_subnet1)
@ -1002,6 +1021,7 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
new_net.subnets = [new_subnet1, new_subnet2]
self.plugin.get_network_info.return_value = new_net
self.dhcp.subnet_update_end(None, payload)
self.dhcp._process_resource_update()
self.call_driver.assert_called_once_with('restart', new_net)
self.call_driver.reset_mock()
@ -1013,6 +1033,7 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
new_net2.subnets = [new_subnet1, new_subnet2]
self.plugin.get_network_info.return_value = new_net2
self.dhcp.subnet_update_end(None, payload)
self.dhcp._process_resource_update()
self.call_driver.assert_called_once_with('restart', new_net2)
def test_subnet_update_end_restart(self):
@ -1022,11 +1043,13 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
subnets=[fake_subnet1, fake_subnet3],
ports=[fake_port1]))
payload = dict(subnet=dict(network_id=fake_network.id))
payload = dict(subnet=dict(network_id=fake_network.id),
priority=FAKE_PRIORITY)
self.cache.get_network_by_id.return_value = fake_network
self.plugin.get_network_info.return_value = new_state
self.dhcp.subnet_update_end(None, payload)
self.dhcp._process_resource_update()
self.cache.assert_has_calls([mock.call.put(new_state)])
self.call_driver.assert_called_once_with('restart',
@ -1039,12 +1062,13 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
subnets=[fake_subnet1, fake_subnet3],
ports=[fake_port1]))
payload = dict(subnet_id=fake_subnet1.id)
payload = dict(subnet_id=fake_subnet1.id, priority=FAKE_PRIORITY)
self.cache.get_network_by_subnet_id.return_value = prev_state
self.cache.get_network_by_id.return_value = prev_state
self.plugin.get_network_info.return_value = fake_network
self.dhcp.subnet_delete_end(None, payload)
self.dhcp._process_resource_update()
self.cache.assert_has_calls([
mock.call.get_network_by_subnet_id(
@ -1063,12 +1087,14 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
subnets=[fake_subnet1, fake_subnet3],
ports=[fake_port1]))
payload = dict(subnet_id=fake_subnet1.id, network_id=fake_network.id)
payload = dict(subnet_id=fake_subnet1.id, network_id=fake_network.id,
priority=FAKE_PRIORITY)
self.cache.get_network_by_subnet_id.return_value = prev_state
self.cache.get_network_by_id.return_value = prev_state
self.plugin.get_network_info.return_value = fake_network
self.dhcp.subnet_delete_end(None, payload)
self.dhcp._process_resource_update()
self.cache.assert_has_calls([
mock.call.get_network_by_subnet_id(
@ -1085,6 +1111,7 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
payload = dict(port=copy.deepcopy(fake_port2))
self.cache.get_network_by_id.return_value = fake_network
self.dhcp.port_update_end(None, payload)
self.dhcp._process_resource_update()
self.reload_allocations.assert_called_once_with(fake_port2,
fake_network)
@ -1105,19 +1132,12 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
payload = dict(port=copy.deepcopy(fake_port2))
self.cache.get_network_by_id.return_value = fake_network
self.dhcp.port_create_end(None, payload)
self.dhcp._process_resource_update()
self.reload_allocations.assert_called_once_with(fake_port2,
fake_network)
def test_port_update_end_grabs_lock(self):
payload = dict(port=fake_port2)
self.cache.get_network_by_id.return_value = None
self.cache.get_port_by_id.return_value = fake_port2
with mock.patch('neutron.agent.dhcp.agent._net_lock') as nl:
self.dhcp.port_update_end(None, payload)
nl.assert_called_once_with(fake_port2.network_id)
def test_port_update_change_ip_on_port(self):
payload = dict(port=fake_port1)
payload = dict(port=fake_port1, priority=FAKE_PRIORITY)
self.cache.get_network_by_id.return_value = fake_network
updated_fake_port1 = copy.deepcopy(fake_port1)
updated_fake_port1.fixed_ips[0].ip_address = '172.9.9.99'
@ -1125,6 +1145,7 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
with mock.patch.object(
self.dhcp, 'update_isolated_metadata_proxy') as ump:
self.dhcp.port_update_end(None, payload)
self.dhcp._process_resource_update()
self.cache.assert_has_calls(
[mock.call.get_network_by_id(fake_port1.network_id),
mock.call.put_port(mock.ANY)])
@ -1135,35 +1156,38 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
def test_port_update_change_subnet_on_dhcp_agents_port(self):
self.cache.get_network_by_id.return_value = fake_network
self.cache.get_port_by_id.return_value = fake_port1
payload = dict(port=copy.deepcopy(fake_port1))
payload = dict(port=copy.deepcopy(fake_port1), priority=FAKE_PRIORITY)
device_id = utils.get_dhcp_agent_device_id(
payload['port']['network_id'], self.dhcp.conf.host)
payload['port']['fixed_ips'][0]['subnet_id'] = '77777-7777'
payload['port']['device_id'] = device_id
self.dhcp.port_update_end(None, payload)
self.dhcp._process_resource_update()
self.assertFalse(self.call_driver.called)
def test_port_update_change_ip_on_dhcp_agents_port(self):
self.cache.get_network_by_id.return_value = fake_network
self.cache.get_port_by_id.return_value = fake_port1
payload = dict(port=copy.deepcopy(fake_port1))
payload = dict(port=copy.deepcopy(fake_port1), priority=FAKE_PRIORITY)
device_id = utils.get_dhcp_agent_device_id(
payload['port']['network_id'], self.dhcp.conf.host)
payload['port']['fixed_ips'][0]['ip_address'] = '172.9.9.99'
payload['port']['device_id'] = device_id
self.dhcp.port_update_end(None, payload)
self.dhcp._process_resource_update()
self.call_driver.assert_has_calls(
[mock.call.call_driver('restart', fake_network)])
def test_port_update_change_ip_on_dhcp_agents_port_cache_miss(self):
self.cache.get_network_by_id.return_value = fake_network
self.cache.get_port_by_id.return_value = None
payload = dict(port=copy.deepcopy(fake_port1))
payload = dict(port=copy.deepcopy(fake_port1), priority=FAKE_PRIORITY)
device_id = utils.get_dhcp_agent_device_id(
payload['port']['network_id'], self.dhcp.conf.host)
payload['port']['fixed_ips'][0]['ip_address'] = '172.9.9.99'
payload['port']['device_id'] = device_id
self.dhcp.port_update_end(None, payload)
self.dhcp._process_resource_update()
self.schedule_resync.assert_called_once_with(mock.ANY,
fake_port1.network_id)
@ -1173,28 +1197,31 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
duplicate_ip = fake_port1['fixed_ips'][0]['ip_address']
payload['port']['fixed_ips'][0]['ip_address'] = duplicate_ip
self.dhcp.port_create_end(None, payload)
self.dhcp._process_resource_update()
self.schedule_resync.assert_called_once_with(mock.ANY,
fake_port2.network_id)
def test_port_update_on_dhcp_agents_port_no_ip_change(self):
self.cache.get_network_by_id.return_value = fake_network
self.cache.get_port_by_id.return_value = fake_port1
payload = dict(port=fake_port1)
payload = dict(port=fake_port1, priority=FAKE_PRIORITY)
device_id = utils.get_dhcp_agent_device_id(
payload['port']['network_id'], self.dhcp.conf.host)
payload['port']['device_id'] = device_id
self.dhcp.port_update_end(None, payload)
self.dhcp._process_resource_update()
self.call_driver.assert_has_calls(
[mock.call.call_driver('reload_allocations', fake_network)])
def test_port_delete_end_no_network_id(self):
payload = dict(port_id=fake_port2.id)
payload = dict(port_id=fake_port2.id, priority=FAKE_PRIORITY)
self.cache.get_network_by_id.return_value = fake_network
self.cache.get_port_by_id.return_value = fake_port2
with mock.patch.object(
self.dhcp, 'update_isolated_metadata_proxy') as ump:
self.dhcp.port_delete_end(None, payload)
self.dhcp._process_resource_update()
self.cache.assert_has_calls(
[mock.call.get_port_by_id(fake_port2.id),
mock.call.get_port_by_id(fake_port2.id),
@ -1206,13 +1233,15 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
self.assertTrue(ump.called)
def test_port_delete_end(self):
payload = dict(port_id=fake_port2.id, network_id=fake_network.id)
payload = dict(port_id=fake_port2.id, network_id=fake_network.id,
priority=FAKE_PRIORITY)
self.cache.get_network_by_id.return_value = fake_network
self.cache.get_port_by_id.return_value = fake_port2
with mock.patch.object(
self.dhcp, 'update_isolated_metadata_proxy') as ump:
self.dhcp.port_delete_end(None, payload)
self.dhcp._process_resource_update()
self.cache.assert_has_calls(
[mock.call.get_port_by_id(fake_port2.id),
mock.call.deleted_ports.add(fake_port2.id),
@ -1223,10 +1252,12 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
self.assertTrue(ump.called)
def test_port_delete_end_unknown_port(self):
payload = dict(port_id='unknown', network_id='unknown')
payload = dict(port_id='unknown', network_id='unknown',
priority=FAKE_PRIORITY)
self.cache.get_port_by_id.return_value = None
self.dhcp.port_delete_end(None, payload)
self.dhcp._process_resource_update()
self.cache.assert_has_calls([mock.call.get_port_by_id('unknown')])
self.assertEqual(self.call_driver.call_count, 0)
@ -1239,7 +1270,9 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
self.cache.get_network_by_id.return_value = fake_network
self.cache.get_port_by_id.return_value = port
self.dhcp.port_delete_end(None, {'port_id': port.id,
'network_id': fake_network.id})
'network_id': fake_network.id,
'priority': FAKE_PRIORITY})
self.dhcp._process_resource_update()
self.call_driver.assert_has_calls(
[mock.call.call_driver('disable', fake_network)])

View File

@ -13,12 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import datetime
import mock
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants
from neutron_lib.plugins import directory
from oslo_utils import timeutils
from oslo_utils import uuidutils
@ -138,10 +140,43 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
network = {'id': 'foo_network_id'}
self._test__get_enabled_agents(network, agents=[agent1, agent2])
def test__notify_agents_allocate_priority(self):
mock_context = mock.MagicMock()
mock_context.is_admin = True
methods = ['network_create_end', 'network_update_end',
'network_delete_end', 'subnet_create_end',
'subnet_update_end', 'subnet_delete_end',
'port_create_end', 'port_update_end', 'port_delete_end']
with mock.patch.object(self.notifier, '_schedule_network') as f:
with mock.patch.object(self.notifier, '_get_enabled_agents') as g:
for method in methods:
f.return_value = [mock.MagicMock()]
g.return_value = [mock.MagicMock()]
payload = {}
if method.startswith('port'):
payload['port'] = \
{'device_id':
constants.DEVICE_ID_RESERVED_DHCP_PORT}
expected_payload = copy.deepcopy(payload)
expected_payload['priority'] = \
dhcp_rpc_agent_api.METHOD_PRIORITY_MAP.get(method)
self.notifier._notify_agents(mock_context, method, payload,
'fake_network_id')
if method == 'network_delete_end':
self.mock_fanout.assert_called_with(mock.ANY, method,
expected_payload)
elif method != 'network_create_end':
if method == 'port_create_end':
expected_payload['priority'] = \
dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_HIGH
self.mock_cast.assert_called_with(mock.ANY, method,
expected_payload,
mock.ANY, mock.ANY)
def test__notify_agents_fanout_required(self):
self.notifier._notify_agents(mock.ANY,
'network_delete_end',
mock.ANY, 'foo_network_id')
{}, 'foo_network_id')
self.assertEqual(1, self.mock_fanout.call_count)
def _test__notify_agents_with_function(

View File

@ -27,6 +27,7 @@ from oslo_utils import uuidutils
from webob import exc
from neutron.api import extensions
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import l3_rpc
@ -46,10 +47,12 @@ from neutron.tests.unit.extensions import test_l3
from neutron.tests.unit import testlib_api
from neutron import wsgi
L3_HOSTA = 'hosta'
DHCP_HOSTA = 'hosta'
L3_HOSTB = 'hostb'
DHCP_HOSTC = 'hostc'
DHCP_HOSTD = 'hostd'
DEVICE_OWNER_COMPUTE = ''.join([constants.DEVICE_OWNER_COMPUTE_PREFIX,
'test:',
@ -1313,7 +1316,9 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn,
network_id)
self.dhcp_notifier_cast.assert_called_with(
mock.ANY, 'network_create_end',
{'network': {'id': network_id}}, DHCP_HOSTA)
{'network': {'id': network_id},
'priority': dhcp_rpc_agent_api.PRIORITY_NETWORK_CREATE},
DHCP_HOSTA)
notifications = fake_notifier.NOTIFICATIONS
expected_event_type = 'dhcp_agent.network.add'
self._assert_notify(notifications, expected_event_type)
@ -1331,7 +1336,9 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn,
network_id)
self.dhcp_notifier_cast.assert_called_with(
mock.ANY, 'network_delete_end',
{'network_id': network_id}, DHCP_HOSTA)
{'network_id': network_id,
'priority': dhcp_rpc_agent_api.PRIORITY_NETWORK_DELETE},
DHCP_HOSTA)
notifications = fake_notifier.NOTIFICATIONS
expected_event_type = 'dhcp_agent.network.remove'
self._assert_notify(notifications, expected_event_type)
@ -1374,17 +1381,19 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn,
ctx = context.get_admin_context()
net['network'] = self.plugin.get_network(ctx, net['network']['id'])
sub['subnet'] = self.plugin.get_subnet(ctx, sub['subnet']['id'])
sub['priority'] = dhcp_rpc_agent_api.PRIORITY_SUBNET_UPDATE
port['port'] = self.plugin.get_port(ctx, port['port']['id'])
return net, sub, port
def _notification_mocks(self, hosts, net, subnet, port):
def _notification_mocks(self, hosts, net, subnet, port, port_priority):
host_calls = {}
for host in hosts:
expected_calls = [
mock.call(
mock.ANY,
'network_create_end',
{'network': {'id': net['network']['id']}},
{'priority': dhcp_rpc_agent_api.PRIORITY_NETWORK_CREATE,
'network': {'id': net['network']['id']}},
host),
mock.call(
mock.ANY,
@ -1394,7 +1403,8 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn,
mock.call(
mock.ANY,
'port_create_end',
{'port': port['port']},
{'port': port['port'],
'priority': port_priority},
host, 'dhcp_agent')]
host_calls[host] = expected_calls
return host_calls
@ -1402,19 +1412,46 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn,
def test_network_port_create_notification(self):
hosts = [DHCP_HOSTA]
net, subnet, port = self._network_port_create(hosts)
expected_calls = self._notification_mocks(hosts, net, subnet, port)
expected_calls = self._notification_mocks(
hosts, net, subnet, port,
dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_HIGH)
self.assertEqual(
expected_calls[DHCP_HOSTA], self.dhcp_notifier_cast.call_args_list)
def test_network_ha_port_create_notification(self):
cfg.CONF.set_override('dhcp_agents_per_network', 2)
hosts = [DHCP_HOSTA, DHCP_HOSTC]
cfg.CONF.set_override('dhcp_agents_per_network', 3)
hosts = [DHCP_HOSTA, DHCP_HOSTC, DHCP_HOSTD]
net, subnet, port = self._network_port_create(hosts)
expected_calls = self._notification_mocks(hosts, net, subnet, port)
for expected in expected_calls[DHCP_HOSTA]:
self.assertIn(expected, self.dhcp_notifier_cast.call_args_list)
for expected in expected_calls[DHCP_HOSTC]:
for host_call in self.dhcp_notifier_cast.call_args_list:
if ("'priority': " + str(
dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_HIGH)
in str(host_call)):
if DHCP_HOSTA in str(host_call):
expected_high_calls = self._notification_mocks(
[DHCP_HOSTA], net, subnet, port,
dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_HIGH)
high_host = DHCP_HOSTA
hosts.pop(0)
elif DHCP_HOSTC in str(host_call):
expected_high_calls = self._notification_mocks(
[DHCP_HOSTC], net, subnet, port,
dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_HIGH)
high_host = DHCP_HOSTC
hosts.pop(1)
elif DHCP_HOSTD in str(host_call):
expected_high_calls = self._notification_mocks(
[DHCP_HOSTD], net, subnet, port,
dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_HIGH)
high_host = DHCP_HOSTD
hosts.pop(2)
expected_low_calls = self._notification_mocks(
hosts, net, subnet, port,
dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_LOW)
for expected in expected_high_calls[high_host]:
self.assertIn(expected, self.dhcp_notifier_cast.call_args_list)
for host, low_expecteds in expected_low_calls.items():
for expected in low_expecteds:
self.assertIn(expected, self.dhcp_notifier_cast.call_args_list)
def _is_schedule_network_called(self, device_id):
dhcp_notifier_schedule = mock.patch(