Process DHCP events in order if related
When processing port events (create, update, delete), the port provisioning (port creation) has priority over the other events [1]. As reported in the related bug, if a port deletion with an IP address and another port creation with the same IP address arrive to the DHCP agent, those events can be processed in the same queue. Because of the creation event priority, even when this event arrived after the deletion event, it will be processed first. That will clash with the DHCP agent cache, that contains a port (not deleted yet) with the same IP address. That will trigger an unwanted resync. This patch implements a specific logic to store the events in "ResourceProcessingQueue" (that uses "PriorityQueue" [2]). When a port event arrives, the event comparison method checks the (subnet, fixed_ips) tuple set of both elements. If there is a coincidence, that means those ports are the same or are using the same IP addreses (the race condition explained in the bug). In this case, the priority is defined only by the timestamp; that means the events are processed in order of arrival. Because the Neutron server do not allow to have two ports in the same subnet with the same IP address, the order of the events is guaranteed. In the case explained in the bug, the deletion event will be processed first. [1]https://review.opendev.org/c/openstack/neutron/+/626830 [2]https://docs.python.org/3/library/queue.html#queue.PriorityQueue Closes-Bug: #1913723 Change-Id: I89438feae3c0244f6da5e6a2a035d45b956ac247
This commit is contained in:
parent
04e1a63979
commit
ede6db6c4c
|
@ -75,6 +75,35 @@ def _wait_if_syncing(f):
|
||||||
return wrapped
|
return wrapped
|
||||||
|
|
||||||
|
|
||||||
|
class DHCPResourceUpdate(queue.ResourceUpdate):
|
||||||
|
|
||||||
|
def __init__(self, _id, priority, action=None, resource=None,
|
||||||
|
timestamp=None, tries=5, obj_type=None):
|
||||||
|
super().__init__(_id, priority, action=action, resource=resource,
|
||||||
|
timestamp=timestamp, tries=tries)
|
||||||
|
self.obj_type = obj_type
|
||||||
|
|
||||||
|
def __lt__(self, other):
|
||||||
|
if other.obj_type == self.obj_type == 'port':
|
||||||
|
# NOTE(ralonsoh): both resources should have "fixed_ips"
|
||||||
|
# information. That key was added to the deleted ports in this
|
||||||
|
# patch but this code runs in the Neutron API (server). Both the
|
||||||
|
# server and the DHCP agent should be updated.
|
||||||
|
# This check could be removed in Y release.
|
||||||
|
if ('fixed_ips' not in self.resource or
|
||||||
|
'fixed_ips' not in other.resource):
|
||||||
|
return super().__lt__(other)
|
||||||
|
|
||||||
|
self_ips = set(str(fixed_ip['ip_address']) for
|
||||||
|
fixed_ip in self.resource['fixed_ips'])
|
||||||
|
other_ips = set(str(fixed_ip['ip_address']) for
|
||||||
|
fixed_ip in other.resource['fixed_ips'])
|
||||||
|
if self_ips & other_ips:
|
||||||
|
return self.timestamp < other.timestamp
|
||||||
|
|
||||||
|
return super().__lt__(other)
|
||||||
|
|
||||||
|
|
||||||
class DhcpAgent(manager.Manager):
|
class DhcpAgent(manager.Manager):
|
||||||
"""DHCP agent service manager.
|
"""DHCP agent service manager.
|
||||||
|
|
||||||
|
@ -446,11 +475,10 @@ class DhcpAgent(manager.Manager):
|
||||||
|
|
||||||
def network_create_end(self, context, payload):
|
def network_create_end(self, context, payload):
|
||||||
"""Handle the network.create.end notification event."""
|
"""Handle the network.create.end notification event."""
|
||||||
update = queue.ResourceUpdate(payload['network']['id'],
|
update = DHCPResourceUpdate(payload['network']['id'],
|
||||||
payload.get('priority',
|
payload.get('priority', DEFAULT_PRIORITY),
|
||||||
DEFAULT_PRIORITY),
|
action='_network_create',
|
||||||
action='_network_create',
|
resource=payload, obj_type='network')
|
||||||
resource=payload)
|
|
||||||
self._queue.add(update)
|
self._queue.add(update)
|
||||||
|
|
||||||
@_wait_if_syncing
|
@_wait_if_syncing
|
||||||
|
@ -461,11 +489,10 @@ class DhcpAgent(manager.Manager):
|
||||||
|
|
||||||
def network_update_end(self, context, payload):
|
def network_update_end(self, context, payload):
|
||||||
"""Handle the network.update.end notification event."""
|
"""Handle the network.update.end notification event."""
|
||||||
update = queue.ResourceUpdate(payload['network']['id'],
|
update = DHCPResourceUpdate(payload['network']['id'],
|
||||||
payload.get('priority',
|
payload.get('priority', DEFAULT_PRIORITY),
|
||||||
DEFAULT_PRIORITY),
|
action='_network_update',
|
||||||
action='_network_update',
|
resource=payload, obj_type='network')
|
||||||
resource=payload)
|
|
||||||
self._queue.add(update)
|
self._queue.add(update)
|
||||||
|
|
||||||
@_wait_if_syncing
|
@_wait_if_syncing
|
||||||
|
@ -479,11 +506,10 @@ class DhcpAgent(manager.Manager):
|
||||||
|
|
||||||
def network_delete_end(self, context, payload):
|
def network_delete_end(self, context, payload):
|
||||||
"""Handle the network.delete.end notification event."""
|
"""Handle the network.delete.end notification event."""
|
||||||
update = queue.ResourceUpdate(payload['network_id'],
|
update = DHCPResourceUpdate(payload['network_id'],
|
||||||
payload.get('priority',
|
payload.get('priority', DEFAULT_PRIORITY),
|
||||||
DEFAULT_PRIORITY),
|
action='_network_delete',
|
||||||
action='_network_delete',
|
resource=payload, obj_type='network')
|
||||||
resource=payload)
|
|
||||||
self._queue.add(update)
|
self._queue.add(update)
|
||||||
|
|
||||||
@_wait_if_syncing
|
@_wait_if_syncing
|
||||||
|
@ -494,11 +520,10 @@ class DhcpAgent(manager.Manager):
|
||||||
|
|
||||||
def subnet_update_end(self, context, payload):
|
def subnet_update_end(self, context, payload):
|
||||||
"""Handle the subnet.update.end notification event."""
|
"""Handle the subnet.update.end notification event."""
|
||||||
update = queue.ResourceUpdate(payload['subnet']['network_id'],
|
update = DHCPResourceUpdate(payload['subnet']['network_id'],
|
||||||
payload.get('priority',
|
payload.get('priority', DEFAULT_PRIORITY),
|
||||||
DEFAULT_PRIORITY),
|
action='_subnet_update',
|
||||||
action='_subnet_update',
|
resource=payload, obj_type='subnet')
|
||||||
resource=payload)
|
|
||||||
self._queue.add(update)
|
self._queue.add(update)
|
||||||
|
|
||||||
@_wait_if_syncing
|
@_wait_if_syncing
|
||||||
|
@ -533,11 +558,10 @@ class DhcpAgent(manager.Manager):
|
||||||
network_id = self._get_network_lock_id(payload)
|
network_id = self._get_network_lock_id(payload)
|
||||||
if not network_id:
|
if not network_id:
|
||||||
return
|
return
|
||||||
update = queue.ResourceUpdate(network_id,
|
update = DHCPResourceUpdate(network_id,
|
||||||
payload.get('priority',
|
payload.get('priority', DEFAULT_PRIORITY),
|
||||||
DEFAULT_PRIORITY),
|
action='_subnet_delete',
|
||||||
action='_subnet_delete',
|
resource=payload, obj_type='subnet')
|
||||||
resource=payload)
|
|
||||||
self._queue.add(update)
|
self._queue.add(update)
|
||||||
|
|
||||||
@_wait_if_syncing
|
@_wait_if_syncing
|
||||||
|
@ -581,11 +605,10 @@ class DhcpAgent(manager.Manager):
|
||||||
if self.cache.is_port_message_stale(updated_port):
|
if self.cache.is_port_message_stale(updated_port):
|
||||||
LOG.debug("Discarding stale port update: %s", updated_port)
|
LOG.debug("Discarding stale port update: %s", updated_port)
|
||||||
return
|
return
|
||||||
update = queue.ResourceUpdate(updated_port.network_id,
|
update = DHCPResourceUpdate(updated_port.network_id,
|
||||||
payload.get('priority',
|
payload.get('priority', DEFAULT_PRIORITY),
|
||||||
DEFAULT_PRIORITY),
|
action='_port_update',
|
||||||
action='_port_update',
|
resource=updated_port, obj_type='port')
|
||||||
resource=updated_port)
|
|
||||||
self._queue.add(update)
|
self._queue.add(update)
|
||||||
|
|
||||||
@_wait_if_syncing
|
@_wait_if_syncing
|
||||||
|
@ -641,11 +664,10 @@ class DhcpAgent(manager.Manager):
|
||||||
def port_create_end(self, context, payload):
|
def port_create_end(self, context, payload):
|
||||||
"""Handle the port.create.end notification event."""
|
"""Handle the port.create.end notification event."""
|
||||||
created_port = dhcp.DictModel(payload['port'])
|
created_port = dhcp.DictModel(payload['port'])
|
||||||
update = queue.ResourceUpdate(created_port.network_id,
|
update = DHCPResourceUpdate(created_port.network_id,
|
||||||
payload.get('priority',
|
payload.get('priority', DEFAULT_PRIORITY),
|
||||||
DEFAULT_PRIORITY),
|
action='_port_create',
|
||||||
action='_port_create',
|
resource=created_port, obj_type='port')
|
||||||
resource=created_port)
|
|
||||||
self._queue.add(update)
|
self._queue.add(update)
|
||||||
|
|
||||||
@_wait_if_syncing
|
@_wait_if_syncing
|
||||||
|
@ -684,11 +706,10 @@ class DhcpAgent(manager.Manager):
|
||||||
network_id = self._get_network_lock_id(payload)
|
network_id = self._get_network_lock_id(payload)
|
||||||
if not network_id:
|
if not network_id:
|
||||||
return
|
return
|
||||||
update = queue.ResourceUpdate(network_id,
|
update = DHCPResourceUpdate(network_id,
|
||||||
payload.get('priority',
|
payload.get('priority', DEFAULT_PRIORITY),
|
||||||
DEFAULT_PRIORITY),
|
action='_port_delete',
|
||||||
action='_port_delete',
|
resource=payload, obj_type='port')
|
||||||
resource=payload)
|
|
||||||
self._queue.add(update)
|
self._queue.add(update)
|
||||||
|
|
||||||
@_wait_if_syncing
|
@_wait_if_syncing
|
||||||
|
|
|
@ -268,7 +268,8 @@ class DhcpAgentNotifyAPI(object):
|
||||||
def _after_router_interface_deleted(self, resource, event, trigger,
|
def _after_router_interface_deleted(self, resource, event, trigger,
|
||||||
**kwargs):
|
**kwargs):
|
||||||
self._notify_agents(kwargs['context'], 'port_delete_end',
|
self._notify_agents(kwargs['context'], 'port_delete_end',
|
||||||
{'port_id': kwargs['port']['id']},
|
{'port_id': kwargs['port']['id'],
|
||||||
|
'fixed_ips': kwargs['port']['fixed_ips']},
|
||||||
kwargs['port']['network_id'])
|
kwargs['port']['network_id'])
|
||||||
|
|
||||||
def _native_event_send_dhcp_notification(self, resource, event, trigger,
|
def _native_event_send_dhcp_notification(self, resource, event, trigger,
|
||||||
|
@ -343,6 +344,8 @@ class DhcpAgentNotifyAPI(object):
|
||||||
payload = {obj_type + '_id': obj_value['id']}
|
payload = {obj_type + '_id': obj_value['id']}
|
||||||
if obj_type != 'network':
|
if obj_type != 'network':
|
||||||
payload['network_id'] = network_id
|
payload['network_id'] = network_id
|
||||||
|
if obj_type == 'port':
|
||||||
|
payload['fixed_ips'] = obj_value['fixed_ips']
|
||||||
self._notify_agents(context, method_name, payload, network_id)
|
self._notify_agents(context, method_name, payload, network_id)
|
||||||
else:
|
else:
|
||||||
self._notify_agents(context, method_name, data, network_id)
|
self._notify_agents(context, method_name, data, network_id)
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
import collections
|
import collections
|
||||||
import copy
|
import copy
|
||||||
|
import datetime
|
||||||
import sys
|
import sys
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
import uuid
|
import uuid
|
||||||
|
@ -2376,3 +2377,63 @@ class TestDeviceManager(base.BaseTestCase):
|
||||||
self.assertEqual(2, device.route.get_gateway.call_count)
|
self.assertEqual(2, device.route.get_gateway.call_count)
|
||||||
self.assertFalse(device.route.delete_gateway.called)
|
self.assertFalse(device.route.delete_gateway.called)
|
||||||
device.route.add_gateway.assert_has_calls(expected)
|
device.route.add_gateway.assert_has_calls(expected)
|
||||||
|
|
||||||
|
|
||||||
|
class TestDHCPResourceUpdate(base.BaseTestCase):
|
||||||
|
|
||||||
|
date1 = datetime.datetime(year=2021, month=2, day=1, hour=9, minute=1,
|
||||||
|
second=2)
|
||||||
|
date2 = datetime.datetime(year=2021, month=2, day=1, hour=9, minute=1,
|
||||||
|
second=1) # older than date1
|
||||||
|
|
||||||
|
def test__lt__no_port_event(self):
|
||||||
|
# Lower numerical priority always gets precedence. DHCPResourceUpdate
|
||||||
|
# (and ResourceUpdate) objects with more precedence will return as
|
||||||
|
# "lower" in a "__lt__" method comparison.
|
||||||
|
update1 = dhcp_agent.DHCPResourceUpdate('id1', 5, obj_type='network')
|
||||||
|
update2 = dhcp_agent.DHCPResourceUpdate('id2', 6, obj_type='network')
|
||||||
|
self.assertLess(update1, update2)
|
||||||
|
|
||||||
|
def test__lt__no_port_event_timestamp(self):
|
||||||
|
update1 = dhcp_agent.DHCPResourceUpdate(
|
||||||
|
'id1', 5, timestamp=self.date1, obj_type='network')
|
||||||
|
update2 = dhcp_agent.DHCPResourceUpdate(
|
||||||
|
'id2', 6, timestamp=self.date2, obj_type='network')
|
||||||
|
self.assertLess(update1, update2)
|
||||||
|
|
||||||
|
def test__lt__port_no_fixed_ips(self):
|
||||||
|
update1 = dhcp_agent.DHCPResourceUpdate(
|
||||||
|
'id1', 5, timestamp=self.date1, resource={}, obj_type='port')
|
||||||
|
update2 = dhcp_agent.DHCPResourceUpdate(
|
||||||
|
'id2', 6, timestamp=self.date2, resource={}, obj_type='port')
|
||||||
|
self.assertLess(update1, update2)
|
||||||
|
|
||||||
|
def test__lt__port_fixed_ips_not_matching(self):
|
||||||
|
resource1 = {'fixed_ips': [
|
||||||
|
{'subnet_id': 'subnet1', 'ip_address': '10.0.0.1'}]}
|
||||||
|
resource2 = {'fixed_ips': [
|
||||||
|
{'subnet_id': 'subnet1', 'ip_address': '10.0.0.2'},
|
||||||
|
{'subnet_id': 'subnet2', 'ip_address': '10.0.1.1'}]}
|
||||||
|
update1 = dhcp_agent.DHCPResourceUpdate(
|
||||||
|
'id1', 5, timestamp=self.date1, resource=resource1,
|
||||||
|
obj_type='port')
|
||||||
|
update2 = dhcp_agent.DHCPResourceUpdate(
|
||||||
|
'id2', 6, timestamp=self.date2, resource=resource2,
|
||||||
|
obj_type='port')
|
||||||
|
self.assertLess(update1, update2)
|
||||||
|
|
||||||
|
def test__lt__port_fixed_ips_matching(self):
|
||||||
|
resource1 = {'fixed_ips': [
|
||||||
|
{'subnet_id': 'subnet1', 'ip_address': '10.0.0.1'}]}
|
||||||
|
resource2 = {'fixed_ips': [
|
||||||
|
{'subnet_id': 'subnet1', 'ip_address': '10.0.0.1'},
|
||||||
|
{'subnet_id': 'subnet2', 'ip_address': '10.0.0.2'}]}
|
||||||
|
update1 = dhcp_agent.DHCPResourceUpdate(
|
||||||
|
'id1', 5, timestamp=self.date1, resource=resource1,
|
||||||
|
obj_type='port')
|
||||||
|
update2 = dhcp_agent.DHCPResourceUpdate(
|
||||||
|
'id2', 6, timestamp=self.date2, resource=resource2,
|
||||||
|
obj_type='port')
|
||||||
|
# In this case, both "port" events have matching IPs. "__lt__" method
|
||||||
|
# uses the timestamp: date2 < date1
|
||||||
|
self.assertLess(update2, update1)
|
||||||
|
|
|
@ -247,7 +247,9 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
|
||||||
self._test__notify_agents_with_function(
|
self._test__notify_agents_with_function(
|
||||||
lambda: self.notifier._after_router_interface_deleted(
|
lambda: self.notifier._after_router_interface_deleted(
|
||||||
mock.ANY, mock.ANY, mock.ANY, context=mock.Mock(),
|
mock.ANY, mock.ANY, mock.ANY, context=mock.Mock(),
|
||||||
port={'id': 'foo_port_id', 'network_id': 'foo_network_id'}),
|
port={'id': 'foo_port_id', 'network_id': 'foo_network_id',
|
||||||
|
'fixed_ips': {'subnet_id': 'subnet1',
|
||||||
|
'ip_address': '10.0.0.1'}}),
|
||||||
expected_scheduling=0, expected_casts=1)
|
expected_scheduling=0, expected_casts=1)
|
||||||
|
|
||||||
def test__fanout_message(self):
|
def test__fanout_message(self):
|
||||||
|
|
Loading…
Reference in New Issue