diff --git a/neutron/agent/common/resource_processing_queue.py b/neutron/agent/common/resource_processing_queue.py index e4662bbc69d..96404c40c73 100644 --- a/neutron/agent/common/resource_processing_queue.py +++ b/neutron/agent/common/resource_processing_queue.py @@ -89,7 +89,7 @@ class ExclusiveResourceProcessor(object): if id not in self._masters: self._masters[id] = self - self._queue = [] + self._queue = Queue.PriorityQueue(-1) self._master = self._masters[id] @@ -119,7 +119,7 @@ class ExclusiveResourceProcessor(object): resource is being processed. These updates have already bubbled to the front of the ResourceProcessingQueue. """ - self._master._queue.append(update) + self._master._queue.put(update) def updates(self): """Processes the resource until updates stop coming @@ -128,13 +128,14 @@ class ExclusiveResourceProcessor(object): may come in from other workers while it is in progress. This method loops until they stop coming. """ - if self._i_am_master(): - while self._queue: - # Remove the update from the queue even if it is old. - update = self._queue.pop(0) - # Process the update only if it is fresh. - if self._get_resource_data_timestamp() < update.timestamp: - yield update + while self._i_am_master(): + if self._queue.empty(): + return + # Get the update from the queue even if it is old. + update = self._queue.get() + # Process the update only if it is fresh. + if self._get_resource_data_timestamp() < update.timestamp: + yield update class ResourceProcessingQueue(object): diff --git a/neutron/agent/dhcp/agent.py b/neutron/agent/dhcp/agent.py index 962b4ad9c39..5b2b5dd036f 100644 --- a/neutron/agent/dhcp/agent.py +++ b/neutron/agent/dhcp/agent.py @@ -22,7 +22,6 @@ from neutron_lib.agent import topics from neutron_lib import constants from neutron_lib import context from neutron_lib import exceptions -from neutron_lib.utils import runtime from oslo_concurrency import lockutils from oslo_config import cfg from oslo_log import log as logging @@ -33,6 +32,7 @@ from oslo_utils import importutils import six from neutron._i18n import _ +from neutron.agent.common import resource_processing_queue as queue from neutron.agent.linux import dhcp from neutron.agent.linux import external_process from neutron.agent.metadata import driver as metadata_driver @@ -45,6 +45,8 @@ from neutron import manager LOG = logging.getLogger(__name__) _SYNC_STATE_LOCK = lockutils.ReaderWriterLock() +DEFAULT_PRIORITY = 255 + def _sync_lock(f): """Decorator to block all operations for a global sync call.""" @@ -64,12 +66,6 @@ def _wait_if_syncing(f): return wrapped -def _net_lock(network_id): - """Returns a context manager lock based on network_id.""" - lock_name = 'dhcp-agent-network-lock-%s' % network_id - return lockutils.lock(lock_name, runtime.SYNCHRONIZED_PREFIX) - - class DhcpAgent(manager.Manager): """DHCP agent service manager. @@ -100,6 +96,7 @@ class DhcpAgent(manager.Manager): self._process_monitor = external_process.ProcessMonitor( config=self.conf, resource_type='dhcp') + self._queue = queue.ResourceProcessingQueue() def init_host(self): self.sync_state() @@ -128,6 +125,7 @@ class DhcpAgent(manager.Manager): """Activate the DHCP agent.""" self.periodic_resync() self.start_ready_ports_loop() + eventlet.spawn_n(self._process_loop) def call_driver(self, action, network, **action_kwargs): """Invoke an action on a DHCP driver instance.""" @@ -354,36 +352,64 @@ class DhcpAgent(manager.Manager): # Update the metadata proxy after the dhcp driver has been updated self.update_isolated_metadata_proxy(network) - @_wait_if_syncing def network_create_end(self, context, payload): """Handle the network.create.end notification event.""" - network_id = payload['network']['id'] - with _net_lock(network_id): - self.enable_dhcp_helper(network_id) + update = queue.ResourceUpdate(payload['network']['id'], + payload.get('priority', + DEFAULT_PRIORITY), + action='_network_create', + resource=payload) + self._queue.add(update) @_wait_if_syncing + def _network_create(self, payload): + network_id = payload['network']['id'] + self.enable_dhcp_helper(network_id) + def network_update_end(self, context, payload): """Handle the network.update.end notification event.""" - network_id = payload['network']['id'] - with _net_lock(network_id): - if payload['network']['admin_state_up']: - self.enable_dhcp_helper(network_id) - else: - self.disable_dhcp_helper(network_id) + update = queue.ResourceUpdate(payload['network']['id'], + payload.get('priority', + DEFAULT_PRIORITY), + action='_network_update', + resource=payload) + self._queue.add(update) @_wait_if_syncing - def network_delete_end(self, context, payload): - """Handle the network.delete.end notification event.""" - network_id = payload['network_id'] - with _net_lock(network_id): + def _network_update(self, payload): + network_id = payload['network']['id'] + if payload['network']['admin_state_up']: + self.enable_dhcp_helper(network_id) + else: self.disable_dhcp_helper(network_id) + def network_delete_end(self, context, payload): + """Handle the network.delete.end notification event.""" + update = queue.ResourceUpdate(payload['network_id'], + payload.get('priority', + DEFAULT_PRIORITY), + action='_network_delete', + resource=payload) + self._queue.add(update) + @_wait_if_syncing + def _network_delete(self, payload): + network_id = payload['network_id'] + self.disable_dhcp_helper(network_id) + def subnet_update_end(self, context, payload): """Handle the subnet.update.end notification event.""" + update = queue.ResourceUpdate(payload['subnet']['network_id'], + payload.get('priority', + DEFAULT_PRIORITY), + action='_subnet_update', + resource=payload) + self._queue.add(update) + + @_wait_if_syncing + def _subnet_update(self, payload): network_id = payload['subnet']['network_id'] - with _net_lock(network_id): - self.refresh_dhcp_helper(network_id) + self.refresh_dhcp_helper(network_id) # Use the update handler for the subnet create event. subnet_create_end = subnet_update_end @@ -406,31 +432,63 @@ class DhcpAgent(manager.Manager): port = self.cache.get_port_by_id(port_id) return port.network_id if port else None - @_wait_if_syncing def subnet_delete_end(self, context, payload): """Handle the subnet.delete.end notification event.""" network_id = self._get_network_lock_id(payload) if not network_id: return - with _net_lock(network_id): - subnet_id = payload['subnet_id'] - network = self.cache.get_network_by_subnet_id(subnet_id) - if not network: - return - self.refresh_dhcp_helper(network.id) + update = queue.ResourceUpdate(network_id, + payload.get('priority', + DEFAULT_PRIORITY), + action='_subnet_delete', + resource=payload) + self._queue.add(update) @_wait_if_syncing + def _subnet_delete(self, payload): + network_id = self._get_network_lock_id(payload) + if not network_id: + return + subnet_id = payload['subnet_id'] + network = self.cache.get_network_by_subnet_id(subnet_id) + if not network: + return + self.refresh_dhcp_helper(network.id) + + def _process_loop(self): + LOG.debug("Starting _process_loop") + + pool = eventlet.GreenPool(size=8) + while True: + pool.spawn_n(self._process_resource_update) + + def _process_resource_update(self): + for tmp, update in self._queue.each_update_to_next_resource(): + method = getattr(self, update.action) + method(update.resource) + def port_update_end(self, context, payload): """Handle the port.update.end notification event.""" updated_port = dhcp.DictModel(payload['port']) - with _net_lock(updated_port.network_id): - if self.cache.is_port_message_stale(payload['port']): - LOG.debug("Discarding stale port update: %s", updated_port) - return - network = self.cache.get_network_by_id(updated_port.network_id) - if not network: - return - self.reload_allocations(updated_port, network) + if self.cache.is_port_message_stale(updated_port): + LOG.debug("Discarding stale port update: %s", updated_port) + return + update = queue.ResourceUpdate(updated_port.network_id, + payload.get('priority', + DEFAULT_PRIORITY), + action='_port_update', + resource=updated_port) + self._queue.add(update) + + @_wait_if_syncing + def _port_update(self, updated_port): + if self.cache.is_port_message_stale(updated_port): + LOG.debug("Discarding stale port update: %s", updated_port) + return + network = self.cache.get_network_by_id(updated_port.network_id) + if not network: + return + self.reload_allocations(updated_port, network) def reload_allocations(self, port, network): LOG.info("Trigger reload_allocations for port %s", port) @@ -466,50 +524,67 @@ class DhcpAgent(manager.Manager): port['network_id'], self.conf.host) return port['device_id'] == thishost - @_wait_if_syncing def port_create_end(self, context, payload): """Handle the port.create.end notification event.""" created_port = dhcp.DictModel(payload['port']) - with _net_lock(created_port.network_id): - network = self.cache.get_network_by_id(created_port.network_id) - if not network: - return - new_ips = {i['ip_address'] for i in created_port['fixed_ips']} - for port_cached in network.ports: - # if there are other ports cached with the same ip address in - # the same network this indicate that the cache is out of sync - cached_ips = {i['ip_address'] - for i in port_cached['fixed_ips']} - if new_ips.intersection(cached_ips): - self.schedule_resync("Duplicate IP addresses found, " - "DHCP cache is out of sync", - created_port.network_id) - return - self.reload_allocations(created_port, network) + update = queue.ResourceUpdate(created_port.network_id, + payload.get('priority', + DEFAULT_PRIORITY), + action='_port_create', + resource=created_port) + self._queue.add(update) @_wait_if_syncing + def _port_create(self, created_port): + network = self.cache.get_network_by_id(created_port.network_id) + if not network: + return + new_ips = {i['ip_address'] for i in created_port['fixed_ips']} + for port_cached in network.ports: + # if there are other ports cached with the same ip address in + # the same network this indicate that the cache is out of sync + cached_ips = {i['ip_address'] + for i in port_cached['fixed_ips']} + if new_ips.intersection(cached_ips): + self.schedule_resync("Duplicate IP addresses found, " + "DHCP cache is out of sync", + created_port.network_id) + return + self.reload_allocations(created_port, network) + def port_delete_end(self, context, payload): """Handle the port.delete.end notification event.""" network_id = self._get_network_lock_id(payload) if not network_id: return - with _net_lock(network_id): - port_id = payload['port_id'] - port = self.cache.get_port_by_id(port_id) - self.cache.deleted_ports.add(port_id) - if not port: - return - network = self.cache.get_network_by_id(port.network_id) - self.cache.remove_port(port) - if self._is_port_on_this_agent(port): - # the agent's port has been deleted. disable the service - # and add the network to the resync list to create - # (or acquire a reserved) port. - self.call_driver('disable', network) - self.schedule_resync("Agent port was deleted", port.network_id) - else: - self.call_driver('reload_allocations', network) - self.update_isolated_metadata_proxy(network) + update = queue.ResourceUpdate(network_id, + payload.get('priority', + DEFAULT_PRIORITY), + action='_port_delete', + resource=payload) + self._queue.add(update) + + @_wait_if_syncing + def _port_delete(self, payload): + network_id = self._get_network_lock_id(payload) + if not network_id: + return + port_id = payload['port_id'] + port = self.cache.get_port_by_id(port_id) + self.cache.deleted_ports.add(port_id) + if not port: + return + network = self.cache.get_network_by_id(port.network_id) + self.cache.remove_port(port) + if self._is_port_on_this_agent(port): + # the agent's port has been deleted. disable the service + # and add the network to the resync list to create + # (or acquire a reserved) port. + self.call_driver('disable', network) + self.schedule_resync("Agent port was deleted", port.network_id) + else: + self.call_driver('reload_allocations', network) + self.update_isolated_metadata_proxy(network) def update_isolated_metadata_proxy(self, network): """Spawn or kill metadata proxy. diff --git a/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py b/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py index cf18af27d3a..d862ed33024 100644 --- a/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py +++ b/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy +import random + from neutron_lib.agent import topics from neutron_lib.api import extensions from neutron_lib.callbacks import events @@ -26,6 +29,36 @@ import oslo_messaging from neutron.common import rpc as n_rpc +# Priorities - lower value is higher priority +PRIORITY_NETWORK_CREATE = 0 +PRIORITY_NETWORK_UPDATE = 1 +PRIORITY_NETWORK_DELETE = 2 +PRIORITY_SUBNET_UPDATE = 3 +PRIORITY_SUBNET_DELETE = 4 +# In order to improve port dhcp provisioning when nova concurrently create +# multiple vms, I classify the port_create_end message to two levels, the +# high-level message only cast to one agent, the low-level message cast to all +# other agent. In this way, When there are a large number of ports that need to +# be processed, we can dispatch the high priority message of port to different +# agent, so that the processed port will not block other port's processing in +# other dhcp agents. +PRIORITY_PORT_CREATE_HIGH = 5 +PRIORITY_PORT_CREATE_LOW = 6 +PRIORITY_PORT_UPDATE = 7 +PRIORITY_PORT_DELETE = 8 + +METHOD_PRIORITY_MAP = { + 'network_create_end': PRIORITY_NETWORK_CREATE, + 'network_update_end': PRIORITY_NETWORK_UPDATE, + 'network_delete_end': PRIORITY_NETWORK_DELETE, + 'subnet_create_end': PRIORITY_SUBNET_UPDATE, + 'subnet_update_end': PRIORITY_SUBNET_UPDATE, + 'subnet_delete_end': PRIORITY_SUBNET_DELETE, + 'port_create_end': PRIORITY_PORT_CREATE_LOW, + 'port_update_end': PRIORITY_PORT_UPDATE, + 'port_delete_end': PRIORITY_PORT_DELETE +} + LOG = logging.getLogger(__name__) @@ -101,7 +134,8 @@ class DhcpAgentNotifyAPI(object): for agent in new_agents: self._cast_message( context, 'network_create_end', - {'network': {'id': network['id']}}, agent['host']) + {'network': {'id': network['id']}, + 'priority': PRIORITY_NETWORK_CREATE}, agent['host']) elif not existing_agents: LOG.warning('Unable to schedule network %s: no agents ' 'available; will retry on subsequent port ' @@ -147,6 +181,7 @@ class DhcpAgentNotifyAPI(object): def _notify_agents(self, context, method, payload, network_id): """Notify all the agents that are hosting the network.""" + payload['priority'] = METHOD_PRIORITY_MAP.get(method) # fanout is required as we do not know who is "listening" no_agents = not extensions.is_extension_supported( self.plugin, constants.DHCP_AGENT_SCHEDULER_EXT_ALIAS) @@ -184,10 +219,21 @@ class DhcpAgentNotifyAPI(object): return enabled_agents = self._get_enabled_agents( context, network, agents, method, payload) + + if method == 'port_create_end': + high_agent = enabled_agents.pop( + random.randint(0, len(enabled_agents) - 1)) + self._notify_high_priority_agent( + context, copy.deepcopy(payload), high_agent) for agent in enabled_agents: self._cast_message( context, method, payload, agent.host, agent.topic) + def _notify_high_priority_agent(self, context, payload, agent): + payload['priority'] = PRIORITY_PORT_CREATE_HIGH + self._cast_message(context, "port_create_end", + payload, agent.host, agent.topic) + def _cast_message(self, context, method, payload, host, topic=topics.DHCP_AGENT): """Cast the payload to the dhcp agent running on the host.""" @@ -201,11 +247,13 @@ class DhcpAgentNotifyAPI(object): def network_removed_from_agent(self, context, network_id, host): self._cast_message(context, 'network_delete_end', - {'network_id': network_id}, host) + {'network_id': network_id, + 'priority': PRIORITY_NETWORK_DELETE}, host) def network_added_to_agent(self, context, network_id, host): self._cast_message(context, 'network_create_end', - {'network': {'id': network_id}}, host) + {'network': {'id': network_id}, + 'priority': PRIORITY_NETWORK_CREATE}, host) def agent_updated(self, context, admin_state_up, host): self._cast_message(context, 'agent_updated', diff --git a/neutron/tests/unit/agent/dhcp/test_agent.py b/neutron/tests/unit/agent/dhcp/test_agent.py index d5a279bb40a..3139a15a05a 100644 --- a/neutron/tests/unit/agent/dhcp/test_agent.py +++ b/neutron/tests/unit/agent/dhcp/test_agent.py @@ -48,6 +48,7 @@ DHCP_PLUGIN = '%s.%s' % (rpc_api.__module__, rpc_api.__name__) FAKE_NETWORK_UUID = '12345678-1234-5678-1234567890ab' FAKE_NETWORK_DHCP_NS = "qdhcp-%s" % FAKE_NETWORK_UUID FAKE_TENANT_ID = 'aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa' +FAKE_PRIORITY = 6 fake_subnet1_allocation_pools = dhcp.DictModel(dict(id='', start='172.9.9.2', @@ -285,11 +286,15 @@ class TestDhcpAgent(base.BaseTestCase): dhcp = dhcp_agent.DhcpAgent(HOSTNAME) attrs_to_mock = dict( [(a, mock.DEFAULT) for a in - ['periodic_resync', 'start_ready_ports_loop']]) + ['periodic_resync', 'start_ready_ports_loop', + '_process_loop']]) with mock.patch.multiple(dhcp, **attrs_to_mock) as mocks: - dhcp.run() - mocks['periodic_resync'].assert_called_once_with() - mocks['start_ready_ports_loop'].assert_called_once_with() + with mock.patch.object(dhcp_agent.eventlet, + 'spawn_n') as spawn_n: + dhcp.run() + mocks['periodic_resync'].assert_called_once_with() + mocks['start_ready_ports_loop'].assert_called_once_with() + spawn_n.assert_called_once_with(mocks['_process_loop']) def test_call_driver(self): network = mock.Mock() @@ -892,29 +897,36 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): self._test_disable_isolated_metadata_proxy(fake_dist_network) def test_network_create_end(self): - payload = dict(network=dict(id=fake_network.id)) + payload = dict(network=dict(id=fake_network.id), + priority=FAKE_PRIORITY) with mock.patch.object(self.dhcp, 'enable_dhcp_helper') as enable: self.dhcp.network_create_end(None, payload) + self.dhcp._process_resource_update() enable.assert_called_once_with(fake_network.id) def test_network_update_end_admin_state_up(self): - payload = dict(network=dict(id=fake_network.id, admin_state_up=True)) + payload = dict(network=dict(id=fake_network.id, admin_state_up=True), + priority=FAKE_PRIORITY) with mock.patch.object(self.dhcp, 'enable_dhcp_helper') as enable: self.dhcp.network_update_end(None, payload) + self.dhcp._process_resource_update() enable.assert_called_once_with(fake_network.id) def test_network_update_end_admin_state_down(self): - payload = dict(network=dict(id=fake_network.id, admin_state_up=False)) + payload = dict(network=dict(id=fake_network.id, admin_state_up=False), + priority=FAKE_PRIORITY) with mock.patch.object(self.dhcp, 'disable_dhcp_helper') as disable: self.dhcp.network_update_end(None, payload) + self.dhcp._process_resource_update() disable.assert_called_once_with(fake_network.id) def test_network_delete_end(self): - payload = dict(network_id=fake_network.id) + payload = dict(network_id=fake_network.id, priority=FAKE_PRIORITY) with mock.patch.object(self.dhcp, 'disable_dhcp_helper') as disable: self.dhcp.network_delete_end(None, payload) + self.dhcp._process_resource_update() disable.assert_called_once_with(fake_network.id) def test_refresh_dhcp_helper_no_dhcp_enabled_networks(self): @@ -956,13 +968,15 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): # attribute isn't True. payload = dict(subnet=dhcp.DictModel( dict(network_id=fake_network.id, enable_dhcp=False, - cidr='99.99.99.0/24', ip_version=4))) + cidr='99.99.99.0/24', ip_version=4)), + priority=FAKE_PRIORITY) self.cache.get_network_by_id.return_value = fake_network new_net = copy.deepcopy(fake_network) new_net.subnets.append(payload['subnet']) self.plugin.get_network_info.return_value = new_net self.dhcp.subnet_create_end(None, payload) + self.dhcp._process_resource_update() self.cache.assert_has_calls([mock.call.put(new_net)]) self.call_driver.assert_called_once_with('reload_allocations', new_net) @@ -971,20 +985,24 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): self.call_driver.reset_mock() payload = dict(subnet=dhcp.DictModel( dict(network_id=fake_network.id, enable_dhcp=True, - cidr='99.99.88.0/24', ip_version=const.IP_VERSION_4))) + cidr='99.99.88.0/24', ip_version=4)), + priority=FAKE_PRIORITY) new_net = copy.deepcopy(fake_network) new_net.subnets.append(payload['subnet']) self.plugin.get_network_info.return_value = new_net self.dhcp.subnet_create_end(None, payload) + self.dhcp._process_resource_update() self.cache.assert_has_calls([mock.call.put(new_net)]) self.call_driver.assert_called_once_with('restart', new_net) def test_subnet_update_end(self): - payload = dict(subnet=dict(network_id=fake_network.id)) + payload = dict(subnet=dict(network_id=fake_network.id), + priority=FAKE_PRIORITY) self.cache.get_network_by_id.return_value = fake_network self.plugin.get_network_info.return_value = fake_network self.dhcp.subnet_update_end(None, payload) + self.dhcp._process_resource_update() self.cache.assert_has_calls([mock.call.put(fake_network)]) self.call_driver.assert_called_once_with('reload_allocations', @@ -994,7 +1012,8 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): self.dhcp.dhcp_ready_ports) def test_subnet_update_dhcp(self): - payload = dict(subnet=dict(network_id=fake_network.id)) + payload = dict(subnet=dict(network_id=fake_network.id), + priority=FAKE_PRIORITY) self.cache.get_network_by_id.return_value = fake_network new_net = copy.deepcopy(fake_network) new_subnet1 = copy.deepcopy(fake_subnet1) @@ -1003,6 +1022,7 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): new_net.subnets = [new_subnet1, new_subnet2] self.plugin.get_network_info.return_value = new_net self.dhcp.subnet_update_end(None, payload) + self.dhcp._process_resource_update() self.call_driver.assert_called_once_with('restart', new_net) self.call_driver.reset_mock() @@ -1014,6 +1034,7 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): new_net2.subnets = [new_subnet1, new_subnet2] self.plugin.get_network_info.return_value = new_net2 self.dhcp.subnet_update_end(None, payload) + self.dhcp._process_resource_update() self.call_driver.assert_called_once_with('restart', new_net2) def test_subnet_update_end_restart(self): @@ -1023,11 +1044,13 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): subnets=[fake_subnet1, fake_subnet3], ports=[fake_port1])) - payload = dict(subnet=dict(network_id=fake_network.id)) + payload = dict(subnet=dict(network_id=fake_network.id), + priority=FAKE_PRIORITY) self.cache.get_network_by_id.return_value = fake_network self.plugin.get_network_info.return_value = new_state self.dhcp.subnet_update_end(None, payload) + self.dhcp._process_resource_update() self.cache.assert_has_calls([mock.call.put(new_state)]) self.call_driver.assert_called_once_with('restart', @@ -1040,12 +1063,13 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): subnets=[fake_subnet1, fake_subnet3], ports=[fake_port1])) - payload = dict(subnet_id=fake_subnet1.id) + payload = dict(subnet_id=fake_subnet1.id, priority=FAKE_PRIORITY) self.cache.get_network_by_subnet_id.return_value = prev_state self.cache.get_network_by_id.return_value = prev_state self.plugin.get_network_info.return_value = fake_network self.dhcp.subnet_delete_end(None, payload) + self.dhcp._process_resource_update() self.cache.assert_has_calls([ mock.call.get_network_by_subnet_id( @@ -1064,12 +1088,14 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): subnets=[fake_subnet1, fake_subnet3], ports=[fake_port1])) - payload = dict(subnet_id=fake_subnet1.id, network_id=fake_network.id) + payload = dict(subnet_id=fake_subnet1.id, network_id=fake_network.id, + priority=FAKE_PRIORITY) self.cache.get_network_by_subnet_id.return_value = prev_state self.cache.get_network_by_id.return_value = prev_state self.plugin.get_network_info.return_value = fake_network self.dhcp.subnet_delete_end(None, payload) + self.dhcp._process_resource_update() self.cache.assert_has_calls([ mock.call.get_network_by_subnet_id( @@ -1086,6 +1112,7 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): payload = dict(port=copy.deepcopy(fake_port2)) self.cache.get_network_by_id.return_value = fake_network self.dhcp.port_update_end(None, payload) + self.dhcp._process_resource_update() self.reload_allocations.assert_called_once_with(fake_port2, fake_network) @@ -1106,19 +1133,12 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): payload = dict(port=copy.deepcopy(fake_port2)) self.cache.get_network_by_id.return_value = fake_network self.dhcp.port_create_end(None, payload) + self.dhcp._process_resource_update() self.reload_allocations.assert_called_once_with(fake_port2, fake_network) - def test_port_update_end_grabs_lock(self): - payload = dict(port=fake_port2) - self.cache.get_network_by_id.return_value = None - self.cache.get_port_by_id.return_value = fake_port2 - with mock.patch('neutron.agent.dhcp.agent._net_lock') as nl: - self.dhcp.port_update_end(None, payload) - nl.assert_called_once_with(fake_port2.network_id) - def test_port_update_change_ip_on_port(self): - payload = dict(port=fake_port1) + payload = dict(port=fake_port1, priority=FAKE_PRIORITY) self.cache.get_network_by_id.return_value = fake_network updated_fake_port1 = copy.deepcopy(fake_port1) updated_fake_port1.fixed_ips[0].ip_address = '172.9.9.99' @@ -1126,6 +1146,7 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): with mock.patch.object( self.dhcp, 'update_isolated_metadata_proxy') as ump: self.dhcp.port_update_end(None, payload) + self.dhcp._process_resource_update() self.cache.assert_has_calls( [mock.call.get_network_by_id(fake_port1.network_id), mock.call.put_port(mock.ANY)]) @@ -1136,35 +1157,38 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): def test_port_update_change_subnet_on_dhcp_agents_port(self): self.cache.get_network_by_id.return_value = fake_network self.cache.get_port_by_id.return_value = fake_port1 - payload = dict(port=copy.deepcopy(fake_port1)) + payload = dict(port=copy.deepcopy(fake_port1), priority=FAKE_PRIORITY) device_id = utils.get_dhcp_agent_device_id( payload['port']['network_id'], self.dhcp.conf.host) payload['port']['fixed_ips'][0]['subnet_id'] = '77777-7777' payload['port']['device_id'] = device_id self.dhcp.port_update_end(None, payload) + self.dhcp._process_resource_update() self.assertFalse(self.call_driver.called) def test_port_update_change_ip_on_dhcp_agents_port(self): self.cache.get_network_by_id.return_value = fake_network self.cache.get_port_by_id.return_value = fake_port1 - payload = dict(port=copy.deepcopy(fake_port1)) + payload = dict(port=copy.deepcopy(fake_port1), priority=FAKE_PRIORITY) device_id = utils.get_dhcp_agent_device_id( payload['port']['network_id'], self.dhcp.conf.host) payload['port']['fixed_ips'][0]['ip_address'] = '172.9.9.99' payload['port']['device_id'] = device_id self.dhcp.port_update_end(None, payload) + self.dhcp._process_resource_update() self.call_driver.assert_has_calls( [mock.call.call_driver('restart', fake_network)]) def test_port_update_change_ip_on_dhcp_agents_port_cache_miss(self): self.cache.get_network_by_id.return_value = fake_network self.cache.get_port_by_id.return_value = None - payload = dict(port=copy.deepcopy(fake_port1)) + payload = dict(port=copy.deepcopy(fake_port1), priority=FAKE_PRIORITY) device_id = utils.get_dhcp_agent_device_id( payload['port']['network_id'], self.dhcp.conf.host) payload['port']['fixed_ips'][0]['ip_address'] = '172.9.9.99' payload['port']['device_id'] = device_id self.dhcp.port_update_end(None, payload) + self.dhcp._process_resource_update() self.schedule_resync.assert_called_once_with(mock.ANY, fake_port1.network_id) @@ -1174,28 +1198,31 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): duplicate_ip = fake_port1['fixed_ips'][0]['ip_address'] payload['port']['fixed_ips'][0]['ip_address'] = duplicate_ip self.dhcp.port_create_end(None, payload) + self.dhcp._process_resource_update() self.schedule_resync.assert_called_once_with(mock.ANY, fake_port2.network_id) def test_port_update_on_dhcp_agents_port_no_ip_change(self): self.cache.get_network_by_id.return_value = fake_network self.cache.get_port_by_id.return_value = fake_port1 - payload = dict(port=fake_port1) + payload = dict(port=fake_port1, priority=FAKE_PRIORITY) device_id = utils.get_dhcp_agent_device_id( payload['port']['network_id'], self.dhcp.conf.host) payload['port']['device_id'] = device_id self.dhcp.port_update_end(None, payload) + self.dhcp._process_resource_update() self.call_driver.assert_has_calls( [mock.call.call_driver('reload_allocations', fake_network)]) def test_port_delete_end_no_network_id(self): - payload = dict(port_id=fake_port2.id) + payload = dict(port_id=fake_port2.id, priority=FAKE_PRIORITY) self.cache.get_network_by_id.return_value = fake_network self.cache.get_port_by_id.return_value = fake_port2 with mock.patch.object( self.dhcp, 'update_isolated_metadata_proxy') as ump: self.dhcp.port_delete_end(None, payload) + self.dhcp._process_resource_update() self.cache.assert_has_calls( [mock.call.get_port_by_id(fake_port2.id), mock.call.get_port_by_id(fake_port2.id), @@ -1207,13 +1234,15 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): self.assertTrue(ump.called) def test_port_delete_end(self): - payload = dict(port_id=fake_port2.id, network_id=fake_network.id) + payload = dict(port_id=fake_port2.id, network_id=fake_network.id, + priority=FAKE_PRIORITY) self.cache.get_network_by_id.return_value = fake_network self.cache.get_port_by_id.return_value = fake_port2 with mock.patch.object( self.dhcp, 'update_isolated_metadata_proxy') as ump: self.dhcp.port_delete_end(None, payload) + self.dhcp._process_resource_update() self.cache.assert_has_calls( [mock.call.get_port_by_id(fake_port2.id), mock.call.deleted_ports.add(fake_port2.id), @@ -1224,10 +1253,12 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): self.assertTrue(ump.called) def test_port_delete_end_unknown_port(self): - payload = dict(port_id='unknown', network_id='unknown') + payload = dict(port_id='unknown', network_id='unknown', + priority=FAKE_PRIORITY) self.cache.get_port_by_id.return_value = None self.dhcp.port_delete_end(None, payload) + self.dhcp._process_resource_update() self.cache.assert_has_calls([mock.call.get_port_by_id('unknown')]) self.assertEqual(self.call_driver.call_count, 0) @@ -1240,7 +1271,9 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): self.cache.get_network_by_id.return_value = fake_network self.cache.get_port_by_id.return_value = port self.dhcp.port_delete_end(None, {'port_id': port.id, - 'network_id': fake_network.id}) + 'network_id': fake_network.id, + 'priority': FAKE_PRIORITY}) + self.dhcp._process_resource_update() self.call_driver.assert_has_calls( [mock.call.call_driver('disable', fake_network)]) diff --git a/neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py b/neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py index 26568418157..ae0f3a49c44 100644 --- a/neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py +++ b/neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy import datetime import mock @@ -20,6 +21,7 @@ from neutron_lib.api import extensions from neutron_lib.callbacks import events from neutron_lib.callbacks import registry from neutron_lib.callbacks import resources +from neutron_lib import constants from neutron_lib.plugins import directory from oslo_utils import timeutils from oslo_utils import uuidutils @@ -138,10 +140,43 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase): network = {'id': 'foo_network_id'} self._test__get_enabled_agents(network, agents=[agent1, agent2]) + def test__notify_agents_allocate_priority(self): + mock_context = mock.MagicMock() + mock_context.is_admin = True + methods = ['network_create_end', 'network_update_end', + 'network_delete_end', 'subnet_create_end', + 'subnet_update_end', 'subnet_delete_end', + 'port_create_end', 'port_update_end', 'port_delete_end'] + with mock.patch.object(self.notifier, '_schedule_network') as f: + with mock.patch.object(self.notifier, '_get_enabled_agents') as g: + for method in methods: + f.return_value = [mock.MagicMock()] + g.return_value = [mock.MagicMock()] + payload = {} + if method.startswith('port'): + payload['port'] = \ + {'device_id': + constants.DEVICE_ID_RESERVED_DHCP_PORT} + expected_payload = copy.deepcopy(payload) + expected_payload['priority'] = \ + dhcp_rpc_agent_api.METHOD_PRIORITY_MAP.get(method) + self.notifier._notify_agents(mock_context, method, payload, + 'fake_network_id') + if method == 'network_delete_end': + self.mock_fanout.assert_called_with(mock.ANY, method, + expected_payload) + elif method != 'network_create_end': + if method == 'port_create_end': + expected_payload['priority'] = \ + dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_HIGH + self.mock_cast.assert_called_with(mock.ANY, method, + expected_payload, + mock.ANY, mock.ANY) + def test__notify_agents_fanout_required(self): self.notifier._notify_agents(mock.ANY, 'network_delete_end', - mock.ANY, 'foo_network_id') + {}, 'foo_network_id') self.assertEqual(1, self.mock_fanout.call_count) def _test__notify_agents_with_function( diff --git a/neutron/tests/unit/db/test_agentschedulers_db.py b/neutron/tests/unit/db/test_agentschedulers_db.py index e6951e18ab9..d3840a501d0 100644 --- a/neutron/tests/unit/db/test_agentschedulers_db.py +++ b/neutron/tests/unit/db/test_agentschedulers_db.py @@ -28,6 +28,7 @@ from oslo_utils import uuidutils from webob import exc from neutron.api import extensions +from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.api.rpc.handlers import dhcp_rpc from neutron.api.rpc.handlers import l3_rpc @@ -46,10 +47,12 @@ from neutron.tests.unit.extensions import test_l3 from neutron.tests.unit import testlib_api from neutron import wsgi + L3_HOSTA = 'hosta' DHCP_HOSTA = 'hosta' L3_HOSTB = 'hostb' DHCP_HOSTC = 'hostc' +DHCP_HOSTD = 'hostd' DEVICE_OWNER_COMPUTE = ''.join([constants.DEVICE_OWNER_COMPUTE_PREFIX, 'test:', @@ -1380,7 +1383,9 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn, network_id) self.dhcp_notifier_cast.assert_called_with( mock.ANY, 'network_create_end', - {'network': {'id': network_id}}, DHCP_HOSTA) + {'network': {'id': network_id}, + 'priority': dhcp_rpc_agent_api.PRIORITY_NETWORK_CREATE}, + DHCP_HOSTA) notifications = fake_notifier.NOTIFICATIONS expected_event_type = 'dhcp_agent.network.add' self._assert_notify(notifications, expected_event_type) @@ -1398,7 +1403,9 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn, network_id) self.dhcp_notifier_cast.assert_called_with( mock.ANY, 'network_delete_end', - {'network_id': network_id}, DHCP_HOSTA) + {'network_id': network_id, + 'priority': dhcp_rpc_agent_api.PRIORITY_NETWORK_DELETE}, + DHCP_HOSTA) notifications = fake_notifier.NOTIFICATIONS expected_event_type = 'dhcp_agent.network.remove' self._assert_notify(notifications, expected_event_type) @@ -1441,17 +1448,19 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn, ctx = context.get_admin_context() net['network'] = self.plugin.get_network(ctx, net['network']['id']) sub['subnet'] = self.plugin.get_subnet(ctx, sub['subnet']['id']) + sub['priority'] = dhcp_rpc_agent_api.PRIORITY_SUBNET_UPDATE port['port'] = self.plugin.get_port(ctx, port['port']['id']) return net, sub, port - def _notification_mocks(self, hosts, net, subnet, port): + def _notification_mocks(self, hosts, net, subnet, port, port_priority): host_calls = {} for host in hosts: expected_calls = [ mock.call( mock.ANY, 'network_create_end', - {'network': {'id': net['network']['id']}}, + {'priority': dhcp_rpc_agent_api.PRIORITY_NETWORK_CREATE, + 'network': {'id': net['network']['id']}}, host), mock.call( mock.ANY, @@ -1461,7 +1470,8 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn, mock.call( mock.ANY, 'port_create_end', - {'port': port['port']}, + {'port': port['port'], + 'priority': port_priority}, host, 'dhcp_agent')] host_calls[host] = expected_calls return host_calls @@ -1469,19 +1479,46 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn, def test_network_port_create_notification(self): hosts = [DHCP_HOSTA] net, subnet, port = self._network_port_create(hosts) - expected_calls = self._notification_mocks(hosts, net, subnet, port) + expected_calls = self._notification_mocks( + hosts, net, subnet, port, + dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_HIGH) self.assertEqual( expected_calls[DHCP_HOSTA], self.dhcp_notifier_cast.call_args_list) def test_network_ha_port_create_notification(self): - cfg.CONF.set_override('dhcp_agents_per_network', 2) - hosts = [DHCP_HOSTA, DHCP_HOSTC] + cfg.CONF.set_override('dhcp_agents_per_network', 3) + hosts = [DHCP_HOSTA, DHCP_HOSTC, DHCP_HOSTD] net, subnet, port = self._network_port_create(hosts) - expected_calls = self._notification_mocks(hosts, net, subnet, port) - for expected in expected_calls[DHCP_HOSTA]: - self.assertIn(expected, self.dhcp_notifier_cast.call_args_list) - for expected in expected_calls[DHCP_HOSTC]: + for host_call in self.dhcp_notifier_cast.call_args_list: + if ("'priority': " + str( + dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_HIGH) + in str(host_call)): + if DHCP_HOSTA in str(host_call): + expected_high_calls = self._notification_mocks( + [DHCP_HOSTA], net, subnet, port, + dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_HIGH) + high_host = DHCP_HOSTA + hosts.pop(0) + elif DHCP_HOSTC in str(host_call): + expected_high_calls = self._notification_mocks( + [DHCP_HOSTC], net, subnet, port, + dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_HIGH) + high_host = DHCP_HOSTC + hosts.pop(1) + elif DHCP_HOSTD in str(host_call): + expected_high_calls = self._notification_mocks( + [DHCP_HOSTD], net, subnet, port, + dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_HIGH) + high_host = DHCP_HOSTD + hosts.pop(2) + expected_low_calls = self._notification_mocks( + hosts, net, subnet, port, + dhcp_rpc_agent_api.PRIORITY_PORT_CREATE_LOW) + for expected in expected_high_calls[high_host]: self.assertIn(expected, self.dhcp_notifier_cast.call_args_list) + for host, low_expecteds in expected_low_calls.items(): + for expected in low_expecteds: + self.assertIn(expected, self.dhcp_notifier_cast.call_args_list) def _is_schedule_network_called(self, device_id): dhcp_notifier_schedule = mock.patch(