From dd5e89e717ef0993d1de4c97d1f0d3132f2845e5 Mon Sep 17 00:00:00 2001 From: yangjianfeng Date: Fri, 21 Dec 2018 14:08:38 +0800 Subject: [PATCH] Improve port dhcp Provisioning Currently, the dhcp Provisioning of ports is the crucial bottleneck of that concurrently boot multiple VM. The root cause is that these ports will be processed one by one by dhcp agent when they belong to the same network, And the 'Provisioning complete' port is still blocked other port's processing in other dhcp agents. The patch aim to optimize the dispatch strategy of the port cast to agent to improve the Provisioning process. In server side, I classify messages to multi levels. Especially, I classify the port_update_end or port_create_end message to two levels, the high-level message only cast to one agent, the low-level message cast to all agent. In agent side I put these messages to `resource_processing_queue`, with the queue, We can delete `_net_lock` and process these messages in order of priority. Additonally, I modified the `resource_processing_queue` for my demand. I update `_queue` from LIST to PriorityQueue in `ExclusiveResourceProcessor`, by this way, we can sort all message which cached in `ExclusiveResourceProcessor` by priority. Conflicts: neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py Related-Bug: #1760047 Change-Id: I255caa0571c42fb012fe882259ef181070beccef (cherry picked from commit 99f4495c940011293e3cabbb590770dc1e7b6900) (cherry picked from commit 740295d94dad0063663da3e2db96f9920e813a90) --- .../agent/common/resource_processing_queue.py | 19 +- neutron/agent/dhcp/agent.py | 219 ++++++++++++------ .../rpc/agentnotifiers/dhcp_rpc_agent_api.py | 54 ++++- neutron/tests/unit/agent/dhcp/test_agent.py | 97 +++++--- .../agentnotifiers/test_dhcp_rpc_agent_api.py | 37 ++- .../tests/unit/db/test_agentschedulers_db.py | 61 ++++- 6 files changed, 358 insertions(+), 129 deletions(-) diff --git a/neutron/agent/common/resource_processing_queue.py b/neutron/agent/common/resource_processing_queue.py index e153f91eefc..56878e9f73b 100644 --- a/neutron/agent/common/resource_processing_queue.py +++ b/neutron/agent/common/resource_processing_queue.py @@ -105,7 +105,7 @@ class ExclusiveResourceProcessor(object): if id not in self._masters: self._masters[id] = self - self._queue = [] + self._queue = Queue.PriorityQueue(-1) self._master = self._masters[id] @@ -135,7 +135,7 @@ class ExclusiveResourceProcessor(object): resource is being processed. These updates have already bubbled to the front of the ResourceProcessingQueue. """ - self._master._queue.append(update) + self._master._queue.put(update) def updates(self): """Processes the resource until updates stop coming @@ -144,13 +144,14 @@ class ExclusiveResourceProcessor(object): may come in from other workers while it is in progress. This method loops until they stop coming. """ - if self._i_am_master(): - while self._queue: - # Remove the update from the queue even if it is old. - update = self._queue.pop(0) - # Process the update only if it is fresh. - if self._get_resource_data_timestamp() < update.timestamp: - yield update + while self._i_am_master(): + if self._queue.empty(): + return + # Get the update from the queue even if it is old. + update = self._queue.get() + # Process the update only if it is fresh. + if self._get_resource_data_timestamp() < update.timestamp: + yield update class ResourceProcessingQueue(object): diff --git a/neutron/agent/dhcp/agent.py b/neutron/agent/dhcp/agent.py index 76d20272294..c6e42e30250 100644 --- a/neutron/agent/dhcp/agent.py +++ b/neutron/agent/dhcp/agent.py @@ -21,7 +21,6 @@ from neutron_lib.agent import constants as agent_consts from neutron_lib import constants from neutron_lib import context from neutron_lib import exceptions -from neutron_lib.utils import runtime from oslo_concurrency import lockutils from oslo_config import cfg from oslo_log import log as logging @@ -32,6 +31,7 @@ from oslo_utils import importutils import six from neutron._i18n import _ +from neutron.agent.common import resource_processing_queue as queue from neutron.agent.linux import dhcp from neutron.agent.linux import external_process from neutron.agent.metadata import driver as metadata_driver @@ -45,6 +45,8 @@ from neutron import manager LOG = logging.getLogger(__name__) _SYNC_STATE_LOCK = lockutils.ReaderWriterLock() +DEFAULT_PRIORITY = 255 + def _sync_lock(f): """Decorator to block all operations for a global sync call.""" @@ -64,12 +66,6 @@ def _wait_if_syncing(f): return wrapped -def _net_lock(network_id): - """Returns a context manager lock based on network_id.""" - lock_name = 'dhcp-agent-network-lock-%s' % network_id - return lockutils.lock(lock_name, runtime.SYNCHRONIZED_PREFIX) - - class DhcpAgent(manager.Manager): """DHCP agent service manager. @@ -100,6 +96,7 @@ class DhcpAgent(manager.Manager): self._process_monitor = external_process.ProcessMonitor( config=self.conf, resource_type='dhcp') + self._queue = queue.ResourceProcessingQueue() def init_host(self): self.sync_state() @@ -128,6 +125,7 @@ class DhcpAgent(manager.Manager): """Activate the DHCP agent.""" self.periodic_resync() self.start_ready_ports_loop() + eventlet.spawn_n(self._process_loop) def call_driver(self, action, network, **action_kwargs): """Invoke an action on a DHCP driver instance.""" @@ -354,36 +352,64 @@ class DhcpAgent(manager.Manager): # Update the metadata proxy after the dhcp driver has been updated self.update_isolated_metadata_proxy(network) - @_wait_if_syncing def network_create_end(self, context, payload): """Handle the network.create.end notification event.""" - network_id = payload['network']['id'] - with _net_lock(network_id): - self.enable_dhcp_helper(network_id) + update = queue.ResourceUpdate(payload['network']['id'], + payload.get('priority', + DEFAULT_PRIORITY), + action='_network_create', + resource=payload) + self._queue.add(update) @_wait_if_syncing + def _network_create(self, payload): + network_id = payload['network']['id'] + self.enable_dhcp_helper(network_id) + def network_update_end(self, context, payload): """Handle the network.update.end notification event.""" - network_id = payload['network']['id'] - with _net_lock(network_id): - if payload['network']['admin_state_up']: - self.enable_dhcp_helper(network_id) - else: - self.disable_dhcp_helper(network_id) + update = queue.ResourceUpdate(payload['network']['id'], + payload.get('priority', + DEFAULT_PRIORITY), + action='_network_update', + resource=payload) + self._queue.add(update) @_wait_if_syncing - def network_delete_end(self, context, payload): - """Handle the network.delete.end notification event.""" - network_id = payload['network_id'] - with _net_lock(network_id): + def _network_update(self, payload): + network_id = payload['network']['id'] + if payload['network']['admin_state_up']: + self.enable_dhcp_helper(network_id) + else: self.disable_dhcp_helper(network_id) + def network_delete_end(self, context, payload): + """Handle the network.delete.end notification event.""" + update = queue.ResourceUpdate(payload['network_id'], + payload.get('priority', + DEFAULT_PRIORITY), + action='_network_delete', + resource=payload) + self._queue.add(update) + @_wait_if_syncing + def _network_delete(self, payload): + network_id = payload['network_id'] + self.disable_dhcp_helper(network_id) + def subnet_update_end(self, context, payload): """Handle the subnet.update.end notification event.""" + update = queue.ResourceUpdate(payload['subnet']['network_id'], + payload.get('priority', + DEFAULT_PRIORITY), + action='_subnet_update', + resource=payload) + self._queue.add(update) + + @_wait_if_syncing + def _subnet_update(self, payload): network_id = payload['subnet']['network_id'] - with _net_lock(network_id): - self.refresh_dhcp_helper(network_id) + self.refresh_dhcp_helper(network_id) # Use the update handler for the subnet create event. subnet_create_end = subnet_update_end @@ -406,31 +432,63 @@ class DhcpAgent(manager.Manager): port = self.cache.get_port_by_id(port_id) return port.network_id if port else None - @_wait_if_syncing def subnet_delete_end(self, context, payload): """Handle the subnet.delete.end notification event.""" network_id = self._get_network_lock_id(payload) if not network_id: return - with _net_lock(network_id): - subnet_id = payload['subnet_id'] - network = self.cache.get_network_by_subnet_id(subnet_id) - if not network: - return - self.refresh_dhcp_helper(network.id) + update = queue.ResourceUpdate(network_id, + payload.get('priority', + DEFAULT_PRIORITY), + action='_subnet_delete', + resource=payload) + self._queue.add(update) @_wait_if_syncing + def _subnet_delete(self, payload): + network_id = self._get_network_lock_id(payload) + if not network_id: + return + subnet_id = payload['subnet_id'] + network = self.cache.get_network_by_subnet_id(subnet_id) + if not network: + return + self.refresh_dhcp_helper(network.id) + + def _process_loop(self): + LOG.debug("Starting _process_loop") + + pool = eventlet.GreenPool(size=8) + while True: + pool.spawn_n(self._process_resource_update) + + def _process_resource_update(self): + for tmp, update in self._queue.each_update_to_next_resource(): + method = getattr(self, update.action) + method(update.resource) + def port_update_end(self, context, payload): """Handle the port.update.end notification event.""" updated_port = dhcp.DictModel(payload['port']) - with _net_lock(updated_port.network_id): - if self.cache.is_port_message_stale(payload['port']): - LOG.debug("Discarding stale port update: %s", updated_port) - return - network = self.cache.get_network_by_id(updated_port.network_id) - if not network: - return - self.reload_allocations(updated_port, network) + if self.cache.is_port_message_stale(updated_port): + LOG.debug("Discarding stale port update: %s", updated_port) + return + update = queue.ResourceUpdate(updated_port.network_id, + payload.get('priority', + DEFAULT_PRIORITY), + action='_port_update', + resource=updated_port) + self._queue.add(update) + + @_wait_if_syncing + def _port_update(self, updated_port): + if self.cache.is_port_message_stale(updated_port): + LOG.debug("Discarding stale port update: %s", updated_port) + return + network = self.cache.get_network_by_id(updated_port.network_id) + if not network: + return + self.reload_allocations(updated_port, network) def reload_allocations(self, port, network): LOG.info("Trigger reload_allocations for port %s", port) @@ -466,50 +524,67 @@ class DhcpAgent(manager.Manager): port['network_id'], self.conf.host) return port['device_id'] == thishost - @_wait_if_syncing def port_create_end(self, context, payload): """Handle the port.create.end notification event.""" created_port = dhcp.DictModel(payload['port']) - with _net_lock(created_port.network_id): - network = self.cache.get_network_by_id(created_port.network_id) - if not network: - return - new_ips = {i['ip_address'] for i in created_port['fixed_ips']} - for port_cached in network.ports: - # if there are other ports cached with the same ip address in - # the same network this indicate that the cache is out of sync - cached_ips = {i['ip_address'] - for i in port_cached['fixed_ips']} - if new_ips.intersection(cached_ips): - self.schedule_resync("Duplicate IP addresses found, " - "DHCP cache is out of sync", - created_port.network_id) - return - self.reload_allocations(created_port, network) + update = queue.ResourceUpdate(created_port.network_id, + payload.get('priority', + DEFAULT_PRIORITY), + action='_port_create', + resource=created_port) + self._queue.add(update) @_wait_if_syncing + def _port_create(self, created_port): + network = self.cache.get_network_by_id(created_port.network_id) + if not network: + return + new_ips = {i['ip_address'] for i in created_port['fixed_ips']} + for port_cached in network.ports: + # if there are other ports cached with the same ip address in + # the same network this indicate that the cache is out of sync + cached_ips = {i['ip_address'] + for i in port_cached['fixed_ips']} + if new_ips.intersection(cached_ips): + self.schedule_resync("Duplicate IP addresses found, " + "DHCP cache is out of sync", + created_port.network_id) + return + self.reload_allocations(created_port, network) + def port_delete_end(self, context, payload): """Handle the port.delete.end notification event.""" network_id = self._get_network_lock_id(payload) if not network_id: return - with _net_lock(network_id): - port_id = payload['port_id'] - port = self.cache.get_port_by_id(port_id) - self.cache.deleted_ports.add(port_id) - if not port: - return - network = self.cache.get_network_by_id(port.network_id) - self.cache.remove_port(port) - if self._is_port_on_this_agent(port): - # the agent's port has been deleted. disable the service - # and add the network to the resync list to create - # (or acquire a reserved) port. - self.call_driver('disable', network) - self.schedule_resync("Agent port was deleted", port.network_id) - else: - self.call_driver('reload_allocations', network) - self.update_isolated_metadata_proxy(network) + update = queue.ResourceUpdate(network_id, + payload.get('priority', + DEFAULT_PRIORITY), + action='_port_delete', + resource=payload) + self._queue.add(update) + + @_wait_if_syncing + def _port_delete(self, payload): + network_id = self._get_network_lock_id(payload) + if not network_id: + return + port_id = payload['port_id'] + port = self.cache.get_port_by_id(port_id) + self.cache.deleted_ports.add(port_id) + if not port: + return + network = self.cache.get_network_by_id(port.network_id) + self.cache.remove_port(port) + if self._is_port_on_this_agent(port): + # the agent's port has been deleted. disable the service + # and add the network to the resync list to create + # (or acquire a reserved) port. + self.call_driver('disable', network) + self.schedule_resync("Agent port was deleted", port.network_id) + else: + self.call_driver('reload_allocations', network) + self.update_isolated_metadata_proxy(network) def update_isolated_metadata_proxy(self, network): """Spawn or kill metadata proxy. diff --git a/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py b/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py index 7751583eebe..347786787a1 100644 --- a/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py +++ b/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy +import random + from neutron_lib.callbacks import events from neutron_lib.callbacks import registry from neutron_lib.callbacks import resources @@ -26,6 +29,36 @@ from neutron.common import rpc as n_rpc from neutron.common import topics from neutron.common import utils +# Priorities - lower value is higher priority +PRIORITY_NETWORK_CREATE = 0 +PRIORITY_NETWORK_UPDATE = 1 +PRIORITY_NETWORK_DELETE = 2 +PRIORITY_SUBNET_UPDATE = 3 +PRIORITY_SUBNET_DELETE = 4 +# In order to improve port dhcp provisioning when nova concurrently create +# multiple vms, I classify the port_create_end message to two levels, the +# high-level message only cast to one agent, the low-level message cast to all +# other agent. In this way, When there are a large number of ports that need to +# be processed, we can dispatch the high priority message of port to different +# agent, so that the processed port will not block other port's processing in +# other dhcp agents. +PRIORITY_PORT_CREATE_HIGH = 5 +PRIORITY_PORT_CREATE_LOW = 6 +PRIORITY_PORT_UPDATE = 7 +PRIORITY_PORT_DELETE = 8 + +METHOD_PRIORITY_MAP = { + 'network_create_end': PRIORITY_NETWORK_CREATE, + 'network_update_end': PRIORITY_NETWORK_UPDATE, + 'network_delete_end': PRIORITY_NETWORK_DELETE, + 'subnet_create_end': PRIORITY_SUBNET_UPDATE, + 'subnet_update_end': PRIORITY_SUBNET_UPDATE, + 'subnet_delete_end': PRIORITY_SUBNET_DELETE, + 'port_create_end': PRIORITY_PORT_CREATE_LOW, + 'port_update_end': PRIORITY_PORT_UPDATE, + 'port_delete_end': PRIORITY_PORT_DELETE +} + LOG = logging.getLogger(__name__) @@ -101,7 +134,8 @@ class DhcpAgentNotifyAPI(object): for agent in new_agents: self._cast_message( context, 'network_create_end', - {'network': {'id': network['id']}}, agent['host']) + {'network': {'id': network['id']}, + 'priority': PRIORITY_NETWORK_CREATE}, agent['host']) elif not existing_agents: LOG.warning('Unable to schedule network %s: no agents ' 'available; will retry on subsequent port ' @@ -147,6 +181,7 @@ class DhcpAgentNotifyAPI(object): def _notify_agents(self, context, method, payload, network_id): """Notify all the agents that are hosting the network.""" + payload['priority'] = METHOD_PRIORITY_MAP.get(method) # fanout is required as we do not know who is "listening" no_agents = not utils.is_extension_supported( self.plugin, constants.DHCP_AGENT_SCHEDULER_EXT_ALIAS) @@ -184,10 +219,21 @@ class DhcpAgentNotifyAPI(object): return enabled_agents = self._get_enabled_agents( context, network, agents, method, payload) + + if method == 'port_create_end': + high_agent = enabled_agents.pop( + random.randint(0, len(enabled_agents) - 1)) + self._notify_high_priority_agent( + context, copy.deepcopy(payload), high_agent) for agent in enabled_agents: self._cast_message( context, method, payload, agent.host, agent.topic) + def _notify_high_priority_agent(self, context, payload, agent): + payload['priority'] = PRIORITY_PORT_CREATE_HIGH + self._cast_message(context, "port_create_end", + payload, agent.host, agent.topic) + def _cast_message(self, context, method, payload, host, topic=topics.DHCP_AGENT): """Cast the payload to the dhcp agent running on the host.""" @@ -201,11 +247,13 @@ class DhcpAgentNotifyAPI(object): def network_removed_from_agent(self, context, network_id, host): self._cast_message(context, 'network_delete_end', - {'network_id': network_id}, host) + {'network_id': network_id, + 'priority': PRIORITY_NETWORK_DELETE}, host) def network_added_to_agent(self, context, network_id, host): self._cast_message(context, 'network_create_end', - {'network': {'id': network_id}}, host) + {'network': {'id': network_id}, + 'priority': PRIORITY_NETWORK_CREATE}, host) def agent_updated(self, context, admin_state_up, host): self._cast_message(context, 'agent_updated', diff --git a/neutron/tests/unit/agent/dhcp/test_agent.py b/neutron/tests/unit/agent/dhcp/test_agent.py index 67b63ad5577..db1cbce263c 100644 --- a/neutron/tests/unit/agent/dhcp/test_agent.py +++ b/neutron/tests/unit/agent/dhcp/test_agent.py @@ -48,6 +48,7 @@ DHCP_PLUGIN = '%s.%s' % (rpc_api.__module__, rpc_api.__name__) FAKE_NETWORK_UUID = '12345678-1234-5678-1234567890ab' FAKE_NETWORK_DHCP_NS = "qdhcp-%s" % FAKE_NETWORK_UUID FAKE_TENANT_ID = 'aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa' +FAKE_PRIORITY = 6 fake_subnet1_allocation_pools = dhcp.DictModel(dict(id='', start='172.9.9.2', @@ -284,11 +285,15 @@ class TestDhcpAgent(base.BaseTestCase): dhcp = dhcp_agent.DhcpAgent(HOSTNAME) attrs_to_mock = dict( [(a, mock.DEFAULT) for a in - ['periodic_resync', 'start_ready_ports_loop']]) + ['periodic_resync', 'start_ready_ports_loop', + '_process_loop']]) with mock.patch.multiple(dhcp, **attrs_to_mock) as mocks: - dhcp.run() - mocks['periodic_resync'].assert_called_once_with() - mocks['start_ready_ports_loop'].assert_called_once_with() + with mock.patch.object(dhcp_agent.eventlet, + 'spawn_n') as spawn_n: + dhcp.run() + mocks['periodic_resync'].assert_called_once_with() + mocks['start_ready_ports_loop'].assert_called_once_with() + spawn_n.assert_called_once_with(mocks['_process_loop']) def test_call_driver(self): network = mock.Mock() @@ -891,29 +896,36 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): self._test_disable_isolated_metadata_proxy(fake_dist_network) def test_network_create_end(self): - payload = dict(network=dict(id=fake_network.id)) + payload = dict(network=dict(id=fake_network.id), + priority=FAKE_PRIORITY) with mock.patch.object(self.dhcp, 'enable_dhcp_helper') as enable: self.dhcp.network_create_end(None, payload) + self.dhcp._process_resource_update() enable.assert_called_once_with(fake_network.id) def test_network_update_end_admin_state_up(self): - payload = dict(network=dict(id=fake_network.id, admin_state_up=True)) + payload = dict(network=dict(id=fake_network.id, admin_state_up=True), + priority=FAKE_PRIORITY) with mock.patch.object(self.dhcp, 'enable_dhcp_helper') as enable: self.dhcp.network_update_end(None, payload) + self.dhcp._process_resource_update() enable.assert_called_once_with(fake_network.id) def test_network_update_end_admin_state_down(self): - payload = dict(network=dict(id=fake_network.id, admin_state_up=False)) + payload = dict(network=dict(id=fake_network.id, admin_state_up=False), + priority=FAKE_PRIORITY) with mock.patch.object(self.dhcp, 'disable_dhcp_helper') as disable: self.dhcp.network_update_end(None, payload) + self.dhcp._process_resource_update() disable.assert_called_once_with(fake_network.id) def test_network_delete_end(self): - payload = dict(network_id=fake_network.id) + payload = dict(network_id=fake_network.id, priority=FAKE_PRIORITY) with mock.patch.object(self.dhcp, 'disable_dhcp_helper') as disable: self.dhcp.network_delete_end(None, payload) + self.dhcp._process_resource_update() disable.assert_called_once_with(fake_network.id) def test_refresh_dhcp_helper_no_dhcp_enabled_networks(self): @@ -955,13 +967,15 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): # attribute isn't True. payload = dict(subnet=dhcp.DictModel( dict(network_id=fake_network.id, enable_dhcp=False, - cidr='99.99.99.0/24', ip_version=4))) + cidr='99.99.99.0/24', ip_version=4)), + priority=FAKE_PRIORITY) self.cache.get_network_by_id.return_value = fake_network new_net = copy.deepcopy(fake_network) new_net.subnets.append(payload['subnet']) self.plugin.get_network_info.return_value = new_net self.dhcp.subnet_create_end(None, payload) + self.dhcp._process_resource_update() self.cache.assert_has_calls([mock.call.put(new_net)]) self.call_driver.assert_called_once_with('reload_allocations', new_net) @@ -970,20 +984,24 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): self.call_driver.reset_mock() payload = dict(subnet=dhcp.DictModel( dict(network_id=fake_network.id, enable_dhcp=True, - cidr='99.99.88.0/24', ip_version=const.IP_VERSION_4))) + cidr='99.99.88.0/24', ip_version=4)), + priority=FAKE_PRIORITY) new_net = copy.deepcopy(fake_network) new_net.subnets.append(payload['subnet']) self.plugin.get_network_info.return_value = new_net self.dhcp.subnet_create_end(None, payload) + self.dhcp._process_resource_update() self.cache.assert_has_calls([mock.call.put(new_net)]) self.call_driver.assert_called_once_with('restart', new_net) def test_subnet_update_end(self): - payload = dict(subnet=dict(network_id=fake_network.id)) + payload = dict(subnet=dict(network_id=fake_network.id), + priority=FAKE_PRIORITY) self.cache.get_network_by_id.return_value = fake_network self.plugin.get_network_info.return_value = fake_network self.dhcp.subnet_update_end(None, payload) + self.dhcp._process_resource_update() self.cache.assert_has_calls([mock.call.put(fake_network)]) self.call_driver.assert_called_once_with('reload_allocations', @@ -993,7 +1011,8 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): self.dhcp.dhcp_ready_ports) def test_subnet_update_dhcp(self): - payload = dict(subnet=dict(network_id=fake_network.id)) + payload = dict(subnet=dict(network_id=fake_network.id), + priority=FAKE_PRIORITY) self.cache.get_network_by_id.return_value = fake_network new_net = copy.deepcopy(fake_network) new_subnet1 = copy.deepcopy(fake_subnet1) @@ -1002,6 +1021,7 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): new_net.subnets = [new_subnet1, new_subnet2] self.plugin.get_network_info.return_value = new_net self.dhcp.subnet_update_end(None, payload) + self.dhcp._process_resource_update() self.call_driver.assert_called_once_with('restart', new_net) self.call_driver.reset_mock() @@ -1013,6 +1033,7 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): new_net2.subnets = [new_subnet1, new_subnet2] self.plugin.get_network_info.return_value = new_net2 self.dhcp.subnet_update_end(None, payload) + self.dhcp._process_resource_update() self.call_driver.assert_called_once_with('restart', new_net2) def test_subnet_update_end_restart(self): @@ -1022,11 +1043,13 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): subnets=[fake_subnet1, fake_subnet3], ports=[fake_port1])) - payload = dict(subnet=dict(network_id=fake_network.id)) + payload = dict(subnet=dict(network_id=fake_network.id), + priority=FAKE_PRIORITY) self.cache.get_network_by_id.return_value = fake_network self.plugin.get_network_info.return_value = new_state self.dhcp.subnet_update_end(None, payload) + self.dhcp._process_resource_update() self.cache.assert_has_calls([mock.call.put(new_state)]) self.call_driver.assert_called_once_with('restart', @@ -1039,12 +1062,13 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): subnets=[fake_subnet1, fake_subnet3], ports=[fake_port1])) - payload = dict(subnet_id=fake_subnet1.id) + payload = dict(subnet_id=fake_subnet1.id, priority=FAKE_PRIORITY) self.cache.get_network_by_subnet_id.return_value = prev_state self.cache.get_network_by_id.return_value = prev_state self.plugin.get_network_info.return_value = fake_network self.dhcp.subnet_delete_end(None, payload) + self.dhcp._process_resource_update() self.cache.assert_has_calls([ mock.call.get_network_by_subnet_id( @@ -1063,12 +1087,14 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): subnets=[fake_subnet1, fake_subnet3], ports=[fake_port1])) - payload = dict(subnet_id=fake_subnet1.id, network_id=fake_network.id) + payload = dict(subnet_id=fake_subnet1.id, network_id=fake_network.id, + priority=FAKE_PRIORITY) self.cache.get_network_by_subnet_id.return_value = prev_state self.cache.get_network_by_id.return_value = prev_state self.plugin.get_network_info.return_value = fake_network self.dhcp.subnet_delete_end(None, payload) + self.dhcp._process_resource_update() self.cache.assert_has_calls([ mock.call.get_network_by_subnet_id( @@ -1085,6 +1111,7 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): payload = dict(port=copy.deepcopy(fake_port2)) self.cache.get_network_by_id.return_value = fake_network self.dhcp.port_update_end(None, payload) + self.dhcp._process_resource_update() self.reload_allocations.assert_called_once_with(fake_port2, fake_network) @@ -1105,19 +1132,12 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): payload = dict(port=copy.deepcopy(fake_port2)) self.cache.get_network_by_id.return_value = fake_network self.dhcp.port_create_end(None, payload) + self.dhcp._process_resource_update() self.reload_allocations.assert_called_once_with(fake_port2, fake_network) - def test_port_update_end_grabs_lock(self): - payload = dict(port=fake_port2) - self.cache.get_network_by_id.return_value = None - self.cache.get_port_by_id.return_value = fake_port2 - with mock.patch('neutron.agent.dhcp.agent._net_lock') as nl: - self.dhcp.port_update_end(None, payload) - nl.assert_called_once_with(fake_port2.network_id) - def test_port_update_change_ip_on_port(self): - payload = dict(port=fake_port1) + payload = dict(port=fake_port1, priority=FAKE_PRIORITY) self.cache.get_network_by_id.return_value = fake_network updated_fake_port1 = copy.deepcopy(fake_port1) updated_fake_port1.fixed_ips[0].ip_address = '172.9.9.99' @@ -1125,6 +1145,7 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): with mock.patch.object( self.dhcp, 'update_isolated_metadata_proxy') as ump: self.dhcp.port_update_end(None, payload) + self.dhcp._process_resource_update() self.cache.assert_has_calls( [mock.call.get_network_by_id(fake_port1.network_id), mock.call.put_port(mock.ANY)]) @@ -1135,35 +1156,38 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): def test_port_update_change_subnet_on_dhcp_agents_port(self): self.cache.get_network_by_id.return_value = fake_network self.cache.get_port_by_id.return_value = fake_port1 - payload = dict(port=copy.deepcopy(fake_port1)) + payload = dict(port=copy.deepcopy(fake_port1), priority=FAKE_PRIORITY) device_id = utils.get_dhcp_agent_device_id( payload['port']['network_id'], self.dhcp.conf.host) payload['port']['fixed_ips'][0]['subnet_id'] = '77777-7777' payload['port']['device_id'] = device_id self.dhcp.port_update_end(None, payload) + self.dhcp._process_resource_update() self.assertFalse(self.call_driver.called) def test_port_update_change_ip_on_dhcp_agents_port(self): self.cache.get_network_by_id.return_value = fake_network self.cache.get_port_by_id.return_value = fake_port1 - payload = dict(port=copy.deepcopy(fake_port1)) + payload = dict(port=copy.deepcopy(fake_port1), priority=FAKE_PRIORITY) device_id = utils.get_dhcp_agent_device_id( payload['port']['network_id'], self.dhcp.conf.host) payload['port']['fixed_ips'][0]['ip_address'] = '172.9.9.99' payload['port']['device_id'] = device_id self.dhcp.port_update_end(None, payload) + self.dhcp._process_resource_update() self.call_driver.assert_has_calls( [mock.call.call_driver('restart', fake_network)]) def test_port_update_change_ip_on_dhcp_agents_port_cache_miss(self): self.cache.get_network_by_id.return_value = fake_network self.cache.get_port_by_id.return_value = None - payload = dict(port=copy.deepcopy(fake_port1)) + payload = dict(port=copy.deepcopy(fake_port1), priority=FAKE_PRIORITY) device_id = utils.get_dhcp_agent_device_id( payload['port']['network_id'], self.dhcp.conf.host) payload['port']['fixed_ips'][0]['ip_address'] = '172.9.9.99' payload['port']['device_id'] = device_id self.dhcp.port_update_end(None, payload) + self.dhcp._process_resource_update() self.schedule_resync.assert_called_once_with(mock.ANY, fake_port1.network_id) @@ -1173,28 +1197,31 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): duplicate_ip = fake_port1['fixed_ips'][0]['ip_address'] payload['port']['fixed_ips'][0]['ip_address'] = duplicate_ip self.dhcp.port_create_end(None, payload) + self.dhcp._process_resource_update() self.schedule_resync.assert_called_once_with(mock.ANY, fake_port2.network_id) def test_port_update_on_dhcp_agents_port_no_ip_change(self): self.cache.get_network_by_id.return_value = fake_network self.cache.get_port_by_id.return_value = fake_port1 - payload = dict(port=fake_port1) + payload = dict(port=fake_port1, priority=FAKE_PRIORITY) device_id = utils.get_dhcp_agent_device_id( payload['port']['network_id'], self.dhcp.conf.host) payload['port']['device_id'] = device_id self.dhcp.port_update_end(None, payload) + self.dhcp._process_resource_update() self.call_driver.assert_has_calls( [mock.call.call_driver('reload_allocations', fake_network)]) def test_port_delete_end_no_network_id(self): - payload = dict(port_id=fake_port2.id) + payload = dict(port_id=fake_port2.id, priority=FAKE_PRIORITY) self.cache.get_network_by_id.return_value = fake_network self.cache.get_port_by_id.return_value = fake_port2 with mock.patch.object( self.dhcp, 'update_isolated_metadata_proxy') as ump: self.dhcp.port_delete_end(None, payload) + self.dhcp._process_resource_update() self.cache.assert_has_calls( [mock.call.get_port_by_id(fake_port2.id), mock.call.get_port_by_id(fake_port2.id), @@ -1206,13 +1233,15 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): self.assertTrue(ump.called) def test_port_delete_end(self): - payload = dict(port_id=fake_port2.id, network_id=fake_network.id) + payload = dict(port_id=fake_port2.id, network_id=fake_network.id, + priority=FAKE_PRIORITY) self.cache.get_network_by_id.return_value = fake_network self.cache.get_port_by_id.return_value = fake_port2 with mock.patch.object( self.dhcp, 'update_isolated_metadata_proxy') as ump: self.dhcp.port_delete_end(None, payload) + self.dhcp._process_resource_update() self.cache.assert_has_calls( [mock.call.get_port_by_id(fake_port2.id), mock.call.deleted_ports.add(fake_port2.id), @@ -1223,10 +1252,12 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): self.assertTrue(ump.called) def test_port_delete_end_unknown_port(self): - payload = dict(port_id='unknown', network_id='unknown') + payload = dict(port_id='unknown', network_id='unknown', + priority=FAKE_PRIORITY) self.cache.get_port_by_id.return_value = None self.dhcp.port_delete_end(None, payload) + self.dhcp._process_resource_update() self.cache.assert_has_calls([mock.call.get_port_by_id('unknown')]) self.assertEqual(self.call_driver.call_count, 0) @@ -1239,7 +1270,9 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): self.cache.get_network_by_id.return_value = fake_network self.cache.get_port_by_id.return_value = port self.dhcp.port_delete_end(None, {'port_id': port.id, - 'network_id': fake_network.id}) + 'network_id': fake_network.id, + 'priority': FAKE_PRIORITY}) + self.dhcp._process_resource_update() self.call_driver.assert_has_calls( [mock.call.call_driver('disable', fake_network)]) diff --git a/neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py b/neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py index b12844c32bd..428f4568bd7 100644 --- a/neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py +++ b/neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py @@ -13,12 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy import datetime import mock from neutron_lib.callbacks import events from neutron_lib.callbacks import registry from neutron_lib.callbacks import resources +from neutron_lib import constants from neutron_lib.plugins import directory from oslo_utils import timeutils from oslo_utils import uuidutils @@ -138,10 +140,43 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase): network = {'id': 'foo_network_id'} self._test__get_enabled_agents(network, agents=[agent1, agent2]) + def test__notify_agents_allocate_priority(self): + mock_context = mock.MagicMock() + mock_context.is_admin = True + methods = ['network_create_end', 'network_update_end', + 'network_delete_end', 'subnet_create_end', + 'subnet_update_end', 'subnet_delete_end', + 'port_create_end', 'port_update_end', 'port_delete_end'] + with mock.patch.object(self.notifier, '_schedule_network') as f: + with mock.patch.object(self.notifier, '_get_enabled_agents') as g: + for method in methods: + f.return_value = [mock.MagicMock()] + g.return_value = [mock.MagicMock()] + payload = {} + if method.startswith('port'): + payload['port'] = \ + {'device_id': + constants.DEVICE_ID_RESERVED_DHCP_PORT} + expected_payload = copy.deepcopy(payload) + expected_payload['priority'] = \ + dhcp_rpc_agent_api.METHOD_PRIORITY_MAP.get(method) + self.notifier._notify_agents(mock_context, method, payload, + 'fake_network_id') + if method == 'network_delete_end': + self.mock_fanout.assert_called_with(mock.ANY, method, + expected_payload) + elif method != 'network_create_end': + if method == 'port_create_end': + expected_payload['priority'] = \ + dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_HIGH + self.mock_cast.assert_called_with(mock.ANY, method, + expected_payload, + mock.ANY, mock.ANY) + def test__notify_agents_fanout_required(self): self.notifier._notify_agents(mock.ANY, 'network_delete_end', - mock.ANY, 'foo_network_id') + {}, 'foo_network_id') self.assertEqual(1, self.mock_fanout.call_count) def _test__notify_agents_with_function( diff --git a/neutron/tests/unit/db/test_agentschedulers_db.py b/neutron/tests/unit/db/test_agentschedulers_db.py index 0fe458caea3..3369f3659b0 100644 --- a/neutron/tests/unit/db/test_agentschedulers_db.py +++ b/neutron/tests/unit/db/test_agentschedulers_db.py @@ -27,6 +27,7 @@ from oslo_utils import uuidutils from webob import exc from neutron.api import extensions +from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.api.rpc.handlers import dhcp_rpc from neutron.api.rpc.handlers import l3_rpc @@ -46,10 +47,12 @@ from neutron.tests.unit.extensions import test_l3 from neutron.tests.unit import testlib_api from neutron import wsgi + L3_HOSTA = 'hosta' DHCP_HOSTA = 'hosta' L3_HOSTB = 'hostb' DHCP_HOSTC = 'hostc' +DHCP_HOSTD = 'hostd' DEVICE_OWNER_COMPUTE = ''.join([constants.DEVICE_OWNER_COMPUTE_PREFIX, 'test:', @@ -1313,7 +1316,9 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn, network_id) self.dhcp_notifier_cast.assert_called_with( mock.ANY, 'network_create_end', - {'network': {'id': network_id}}, DHCP_HOSTA) + {'network': {'id': network_id}, + 'priority': dhcp_rpc_agent_api.PRIORITY_NETWORK_CREATE}, + DHCP_HOSTA) notifications = fake_notifier.NOTIFICATIONS expected_event_type = 'dhcp_agent.network.add' self._assert_notify(notifications, expected_event_type) @@ -1331,7 +1336,9 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn, network_id) self.dhcp_notifier_cast.assert_called_with( mock.ANY, 'network_delete_end', - {'network_id': network_id}, DHCP_HOSTA) + {'network_id': network_id, + 'priority': dhcp_rpc_agent_api.PRIORITY_NETWORK_DELETE}, + DHCP_HOSTA) notifications = fake_notifier.NOTIFICATIONS expected_event_type = 'dhcp_agent.network.remove' self._assert_notify(notifications, expected_event_type) @@ -1374,17 +1381,19 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn, ctx = context.get_admin_context() net['network'] = self.plugin.get_network(ctx, net['network']['id']) sub['subnet'] = self.plugin.get_subnet(ctx, sub['subnet']['id']) + sub['priority'] = dhcp_rpc_agent_api.PRIORITY_SUBNET_UPDATE port['port'] = self.plugin.get_port(ctx, port['port']['id']) return net, sub, port - def _notification_mocks(self, hosts, net, subnet, port): + def _notification_mocks(self, hosts, net, subnet, port, port_priority): host_calls = {} for host in hosts: expected_calls = [ mock.call( mock.ANY, 'network_create_end', - {'network': {'id': net['network']['id']}}, + {'priority': dhcp_rpc_agent_api.PRIORITY_NETWORK_CREATE, + 'network': {'id': net['network']['id']}}, host), mock.call( mock.ANY, @@ -1394,7 +1403,8 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn, mock.call( mock.ANY, 'port_create_end', - {'port': port['port']}, + {'port': port['port'], + 'priority': port_priority}, host, 'dhcp_agent')] host_calls[host] = expected_calls return host_calls @@ -1402,19 +1412,46 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn, def test_network_port_create_notification(self): hosts = [DHCP_HOSTA] net, subnet, port = self._network_port_create(hosts) - expected_calls = self._notification_mocks(hosts, net, subnet, port) + expected_calls = self._notification_mocks( + hosts, net, subnet, port, + dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_HIGH) self.assertEqual( expected_calls[DHCP_HOSTA], self.dhcp_notifier_cast.call_args_list) def test_network_ha_port_create_notification(self): - cfg.CONF.set_override('dhcp_agents_per_network', 2) - hosts = [DHCP_HOSTA, DHCP_HOSTC] + cfg.CONF.set_override('dhcp_agents_per_network', 3) + hosts = [DHCP_HOSTA, DHCP_HOSTC, DHCP_HOSTD] net, subnet, port = self._network_port_create(hosts) - expected_calls = self._notification_mocks(hosts, net, subnet, port) - for expected in expected_calls[DHCP_HOSTA]: - self.assertIn(expected, self.dhcp_notifier_cast.call_args_list) - for expected in expected_calls[DHCP_HOSTC]: + for host_call in self.dhcp_notifier_cast.call_args_list: + if ("'priority': " + str( + dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_HIGH) + in str(host_call)): + if DHCP_HOSTA in str(host_call): + expected_high_calls = self._notification_mocks( + [DHCP_HOSTA], net, subnet, port, + dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_HIGH) + high_host = DHCP_HOSTA + hosts.pop(0) + elif DHCP_HOSTC in str(host_call): + expected_high_calls = self._notification_mocks( + [DHCP_HOSTC], net, subnet, port, + dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_HIGH) + high_host = DHCP_HOSTC + hosts.pop(1) + elif DHCP_HOSTD in str(host_call): + expected_high_calls = self._notification_mocks( + [DHCP_HOSTD], net, subnet, port, + dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_HIGH) + high_host = DHCP_HOSTD + hosts.pop(2) + expected_low_calls = self._notification_mocks( + hosts, net, subnet, port, + dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_LOW) + for expected in expected_high_calls[high_host]: self.assertIn(expected, self.dhcp_notifier_cast.call_args_list) + for host, low_expecteds in expected_low_calls.items(): + for expected in low_expecteds: + self.assertIn(expected, self.dhcp_notifier_cast.call_args_list) def _is_schedule_network_called(self, device_id): dhcp_notifier_schedule = mock.patch(