Merge "Process DHCP events in order if related"
This commit is contained in:
commit
f9c9d102aa
@ -75,6 +75,35 @@ def _wait_if_syncing(f):
|
||||
return wrapped
|
||||
|
||||
|
||||
class DHCPResourceUpdate(queue.ResourceUpdate):
|
||||
|
||||
def __init__(self, _id, priority, action=None, resource=None,
|
||||
timestamp=None, tries=5, obj_type=None):
|
||||
super().__init__(_id, priority, action=action, resource=resource,
|
||||
timestamp=timestamp, tries=tries)
|
||||
self.obj_type = obj_type
|
||||
|
||||
def __lt__(self, other):
|
||||
if other.obj_type == self.obj_type == 'port':
|
||||
# NOTE(ralonsoh): both resources should have "fixed_ips"
|
||||
# information. That key was added to the deleted ports in this
|
||||
# patch but this code runs in the Neutron API (server). Both the
|
||||
# server and the DHCP agent should be updated.
|
||||
# This check could be removed in Y release.
|
||||
if ('fixed_ips' not in self.resource or
|
||||
'fixed_ips' not in other.resource):
|
||||
return super().__lt__(other)
|
||||
|
||||
self_ips = set(str(fixed_ip['ip_address']) for
|
||||
fixed_ip in self.resource['fixed_ips'])
|
||||
other_ips = set(str(fixed_ip['ip_address']) for
|
||||
fixed_ip in other.resource['fixed_ips'])
|
||||
if self_ips & other_ips:
|
||||
return self.timestamp < other.timestamp
|
||||
|
||||
return super().__lt__(other)
|
||||
|
||||
|
||||
class DhcpAgent(manager.Manager):
|
||||
"""DHCP agent service manager.
|
||||
|
||||
@ -446,11 +475,10 @@ class DhcpAgent(manager.Manager):
|
||||
|
||||
def network_create_end(self, context, payload):
|
||||
"""Handle the network.create.end notification event."""
|
||||
update = queue.ResourceUpdate(payload['network']['id'],
|
||||
payload.get('priority',
|
||||
DEFAULT_PRIORITY),
|
||||
action='_network_create',
|
||||
resource=payload)
|
||||
update = DHCPResourceUpdate(payload['network']['id'],
|
||||
payload.get('priority', DEFAULT_PRIORITY),
|
||||
action='_network_create',
|
||||
resource=payload, obj_type='network')
|
||||
self._queue.add(update)
|
||||
|
||||
@_wait_if_syncing
|
||||
@ -461,11 +489,10 @@ class DhcpAgent(manager.Manager):
|
||||
|
||||
def network_update_end(self, context, payload):
|
||||
"""Handle the network.update.end notification event."""
|
||||
update = queue.ResourceUpdate(payload['network']['id'],
|
||||
payload.get('priority',
|
||||
DEFAULT_PRIORITY),
|
||||
action='_network_update',
|
||||
resource=payload)
|
||||
update = DHCPResourceUpdate(payload['network']['id'],
|
||||
payload.get('priority', DEFAULT_PRIORITY),
|
||||
action='_network_update',
|
||||
resource=payload, obj_type='network')
|
||||
self._queue.add(update)
|
||||
|
||||
@_wait_if_syncing
|
||||
@ -479,11 +506,10 @@ class DhcpAgent(manager.Manager):
|
||||
|
||||
def network_delete_end(self, context, payload):
|
||||
"""Handle the network.delete.end notification event."""
|
||||
update = queue.ResourceUpdate(payload['network_id'],
|
||||
payload.get('priority',
|
||||
DEFAULT_PRIORITY),
|
||||
action='_network_delete',
|
||||
resource=payload)
|
||||
update = DHCPResourceUpdate(payload['network_id'],
|
||||
payload.get('priority', DEFAULT_PRIORITY),
|
||||
action='_network_delete',
|
||||
resource=payload, obj_type='network')
|
||||
self._queue.add(update)
|
||||
|
||||
@_wait_if_syncing
|
||||
@ -494,11 +520,10 @@ class DhcpAgent(manager.Manager):
|
||||
|
||||
def subnet_update_end(self, context, payload):
|
||||
"""Handle the subnet.update.end notification event."""
|
||||
update = queue.ResourceUpdate(payload['subnet']['network_id'],
|
||||
payload.get('priority',
|
||||
DEFAULT_PRIORITY),
|
||||
action='_subnet_update',
|
||||
resource=payload)
|
||||
update = DHCPResourceUpdate(payload['subnet']['network_id'],
|
||||
payload.get('priority', DEFAULT_PRIORITY),
|
||||
action='_subnet_update',
|
||||
resource=payload, obj_type='subnet')
|
||||
self._queue.add(update)
|
||||
|
||||
@_wait_if_syncing
|
||||
@ -533,11 +558,10 @@ class DhcpAgent(manager.Manager):
|
||||
network_id = self._get_network_lock_id(payload)
|
||||
if not network_id:
|
||||
return
|
||||
update = queue.ResourceUpdate(network_id,
|
||||
payload.get('priority',
|
||||
DEFAULT_PRIORITY),
|
||||
action='_subnet_delete',
|
||||
resource=payload)
|
||||
update = DHCPResourceUpdate(network_id,
|
||||
payload.get('priority', DEFAULT_PRIORITY),
|
||||
action='_subnet_delete',
|
||||
resource=payload, obj_type='subnet')
|
||||
self._queue.add(update)
|
||||
|
||||
@_wait_if_syncing
|
||||
@ -581,11 +605,10 @@ class DhcpAgent(manager.Manager):
|
||||
if self.cache.is_port_message_stale(updated_port):
|
||||
LOG.debug("Discarding stale port update: %s", updated_port)
|
||||
return
|
||||
update = queue.ResourceUpdate(updated_port.network_id,
|
||||
payload.get('priority',
|
||||
DEFAULT_PRIORITY),
|
||||
action='_port_update',
|
||||
resource=updated_port)
|
||||
update = DHCPResourceUpdate(updated_port.network_id,
|
||||
payload.get('priority', DEFAULT_PRIORITY),
|
||||
action='_port_update',
|
||||
resource=updated_port, obj_type='port')
|
||||
self._queue.add(update)
|
||||
|
||||
@_wait_if_syncing
|
||||
@ -641,11 +664,10 @@ class DhcpAgent(manager.Manager):
|
||||
def port_create_end(self, context, payload):
|
||||
"""Handle the port.create.end notification event."""
|
||||
created_port = dhcp.DictModel(payload['port'])
|
||||
update = queue.ResourceUpdate(created_port.network_id,
|
||||
payload.get('priority',
|
||||
DEFAULT_PRIORITY),
|
||||
action='_port_create',
|
||||
resource=created_port)
|
||||
update = DHCPResourceUpdate(created_port.network_id,
|
||||
payload.get('priority', DEFAULT_PRIORITY),
|
||||
action='_port_create',
|
||||
resource=created_port, obj_type='port')
|
||||
self._queue.add(update)
|
||||
|
||||
@_wait_if_syncing
|
||||
@ -684,11 +706,10 @@ class DhcpAgent(manager.Manager):
|
||||
network_id = self._get_network_lock_id(payload)
|
||||
if not network_id:
|
||||
return
|
||||
update = queue.ResourceUpdate(network_id,
|
||||
payload.get('priority',
|
||||
DEFAULT_PRIORITY),
|
||||
action='_port_delete',
|
||||
resource=payload)
|
||||
update = DHCPResourceUpdate(network_id,
|
||||
payload.get('priority', DEFAULT_PRIORITY),
|
||||
action='_port_delete',
|
||||
resource=payload, obj_type='port')
|
||||
self._queue.add(update)
|
||||
|
||||
@_wait_if_syncing
|
||||
|
@ -268,7 +268,8 @@ class DhcpAgentNotifyAPI(object):
|
||||
def _after_router_interface_deleted(self, resource, event, trigger,
|
||||
**kwargs):
|
||||
self._notify_agents(kwargs['context'], 'port_delete_end',
|
||||
{'port_id': kwargs['port']['id']},
|
||||
{'port_id': kwargs['port']['id'],
|
||||
'fixed_ips': kwargs['port']['fixed_ips']},
|
||||
kwargs['port']['network_id'])
|
||||
|
||||
def _native_event_send_dhcp_notification(self, resource, event, trigger,
|
||||
@ -343,6 +344,8 @@ class DhcpAgentNotifyAPI(object):
|
||||
payload = {obj_type + '_id': obj_value['id']}
|
||||
if obj_type != 'network':
|
||||
payload['network_id'] = network_id
|
||||
if obj_type == 'port':
|
||||
payload['fixed_ips'] = obj_value['fixed_ips']
|
||||
self._notify_agents(context, method_name, payload, network_id)
|
||||
else:
|
||||
self._notify_agents(context, method_name, data, network_id)
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
import collections
|
||||
import copy
|
||||
import datetime
|
||||
import sys
|
||||
from unittest import mock
|
||||
import uuid
|
||||
@ -2378,3 +2379,63 @@ class TestDeviceManager(base.BaseTestCase):
|
||||
self.assertEqual(2, device.route.get_gateway.call_count)
|
||||
self.assertFalse(device.route.delete_gateway.called)
|
||||
device.route.add_gateway.assert_has_calls(expected)
|
||||
|
||||
|
||||
class TestDHCPResourceUpdate(base.BaseTestCase):
|
||||
|
||||
date1 = datetime.datetime(year=2021, month=2, day=1, hour=9, minute=1,
|
||||
second=2)
|
||||
date2 = datetime.datetime(year=2021, month=2, day=1, hour=9, minute=1,
|
||||
second=1) # older than date1
|
||||
|
||||
def test__lt__no_port_event(self):
|
||||
# Lower numerical priority always gets precedence. DHCPResourceUpdate
|
||||
# (and ResourceUpdate) objects with more precedence will return as
|
||||
# "lower" in a "__lt__" method comparison.
|
||||
update1 = dhcp_agent.DHCPResourceUpdate('id1', 5, obj_type='network')
|
||||
update2 = dhcp_agent.DHCPResourceUpdate('id2', 6, obj_type='network')
|
||||
self.assertLess(update1, update2)
|
||||
|
||||
def test__lt__no_port_event_timestamp(self):
|
||||
update1 = dhcp_agent.DHCPResourceUpdate(
|
||||
'id1', 5, timestamp=self.date1, obj_type='network')
|
||||
update2 = dhcp_agent.DHCPResourceUpdate(
|
||||
'id2', 6, timestamp=self.date2, obj_type='network')
|
||||
self.assertLess(update1, update2)
|
||||
|
||||
def test__lt__port_no_fixed_ips(self):
|
||||
update1 = dhcp_agent.DHCPResourceUpdate(
|
||||
'id1', 5, timestamp=self.date1, resource={}, obj_type='port')
|
||||
update2 = dhcp_agent.DHCPResourceUpdate(
|
||||
'id2', 6, timestamp=self.date2, resource={}, obj_type='port')
|
||||
self.assertLess(update1, update2)
|
||||
|
||||
def test__lt__port_fixed_ips_not_matching(self):
|
||||
resource1 = {'fixed_ips': [
|
||||
{'subnet_id': 'subnet1', 'ip_address': '10.0.0.1'}]}
|
||||
resource2 = {'fixed_ips': [
|
||||
{'subnet_id': 'subnet1', 'ip_address': '10.0.0.2'},
|
||||
{'subnet_id': 'subnet2', 'ip_address': '10.0.1.1'}]}
|
||||
update1 = dhcp_agent.DHCPResourceUpdate(
|
||||
'id1', 5, timestamp=self.date1, resource=resource1,
|
||||
obj_type='port')
|
||||
update2 = dhcp_agent.DHCPResourceUpdate(
|
||||
'id2', 6, timestamp=self.date2, resource=resource2,
|
||||
obj_type='port')
|
||||
self.assertLess(update1, update2)
|
||||
|
||||
def test__lt__port_fixed_ips_matching(self):
|
||||
resource1 = {'fixed_ips': [
|
||||
{'subnet_id': 'subnet1', 'ip_address': '10.0.0.1'}]}
|
||||
resource2 = {'fixed_ips': [
|
||||
{'subnet_id': 'subnet1', 'ip_address': '10.0.0.1'},
|
||||
{'subnet_id': 'subnet2', 'ip_address': '10.0.0.2'}]}
|
||||
update1 = dhcp_agent.DHCPResourceUpdate(
|
||||
'id1', 5, timestamp=self.date1, resource=resource1,
|
||||
obj_type='port')
|
||||
update2 = dhcp_agent.DHCPResourceUpdate(
|
||||
'id2', 6, timestamp=self.date2, resource=resource2,
|
||||
obj_type='port')
|
||||
# In this case, both "port" events have matching IPs. "__lt__" method
|
||||
# uses the timestamp: date2 < date1
|
||||
self.assertLess(update2, update1)
|
||||
|
@ -247,7 +247,9 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
|
||||
self._test__notify_agents_with_function(
|
||||
lambda: self.notifier._after_router_interface_deleted(
|
||||
mock.ANY, mock.ANY, mock.ANY, context=mock.Mock(),
|
||||
port={'id': 'foo_port_id', 'network_id': 'foo_network_id'}),
|
||||
port={'id': 'foo_port_id', 'network_id': 'foo_network_id',
|
||||
'fixed_ips': {'subnet_id': 'subnet1',
|
||||
'ip_address': '10.0.0.1'}}),
|
||||
expected_scheduling=0, expected_casts=1)
|
||||
|
||||
def test__fanout_message(self):
|
||||
|
Loading…
x
Reference in New Issue
Block a user