Merge "Process DHCP events in order if related" into stable/victoria
This commit is contained in:
commit
372573d86d
|
@ -75,6 +75,35 @@ def _wait_if_syncing(f):
|
||||||
return wrapped
|
return wrapped
|
||||||
|
|
||||||
|
|
||||||
|
class DHCPResourceUpdate(queue.ResourceUpdate):
|
||||||
|
|
||||||
|
def __init__(self, _id, priority, action=None, resource=None,
|
||||||
|
timestamp=None, tries=5, obj_type=None):
|
||||||
|
super().__init__(_id, priority, action=action, resource=resource,
|
||||||
|
timestamp=timestamp, tries=tries)
|
||||||
|
self.obj_type = obj_type
|
||||||
|
|
||||||
|
def __lt__(self, other):
|
||||||
|
if other.obj_type == self.obj_type == 'port':
|
||||||
|
# NOTE(ralonsoh): both resources should have "fixed_ips"
|
||||||
|
# information. That key was added to the deleted ports in this
|
||||||
|
# patch but this code runs in the Neutron API (server). Both the
|
||||||
|
# server and the DHCP agent should be updated.
|
||||||
|
# This check could be removed in Y release.
|
||||||
|
if ('fixed_ips' not in self.resource or
|
||||||
|
'fixed_ips' not in other.resource):
|
||||||
|
return super().__lt__(other)
|
||||||
|
|
||||||
|
self_ips = set(str(fixed_ip['ip_address']) for
|
||||||
|
fixed_ip in self.resource['fixed_ips'])
|
||||||
|
other_ips = set(str(fixed_ip['ip_address']) for
|
||||||
|
fixed_ip in other.resource['fixed_ips'])
|
||||||
|
if self_ips & other_ips:
|
||||||
|
return self.timestamp < other.timestamp
|
||||||
|
|
||||||
|
return super().__lt__(other)
|
||||||
|
|
||||||
|
|
||||||
class DhcpAgent(manager.Manager):
|
class DhcpAgent(manager.Manager):
|
||||||
"""DHCP agent service manager.
|
"""DHCP agent service manager.
|
||||||
|
|
||||||
|
@ -446,11 +475,10 @@ class DhcpAgent(manager.Manager):
|
||||||
|
|
||||||
def network_create_end(self, context, payload):
|
def network_create_end(self, context, payload):
|
||||||
"""Handle the network.create.end notification event."""
|
"""Handle the network.create.end notification event."""
|
||||||
update = queue.ResourceUpdate(payload['network']['id'],
|
update = DHCPResourceUpdate(payload['network']['id'],
|
||||||
payload.get('priority',
|
payload.get('priority', DEFAULT_PRIORITY),
|
||||||
DEFAULT_PRIORITY),
|
action='_network_create',
|
||||||
action='_network_create',
|
resource=payload, obj_type='network')
|
||||||
resource=payload)
|
|
||||||
self._queue.add(update)
|
self._queue.add(update)
|
||||||
|
|
||||||
@_wait_if_syncing
|
@_wait_if_syncing
|
||||||
|
@ -461,11 +489,10 @@ class DhcpAgent(manager.Manager):
|
||||||
|
|
||||||
def network_update_end(self, context, payload):
|
def network_update_end(self, context, payload):
|
||||||
"""Handle the network.update.end notification event."""
|
"""Handle the network.update.end notification event."""
|
||||||
update = queue.ResourceUpdate(payload['network']['id'],
|
update = DHCPResourceUpdate(payload['network']['id'],
|
||||||
payload.get('priority',
|
payload.get('priority', DEFAULT_PRIORITY),
|
||||||
DEFAULT_PRIORITY),
|
action='_network_update',
|
||||||
action='_network_update',
|
resource=payload, obj_type='network')
|
||||||
resource=payload)
|
|
||||||
self._queue.add(update)
|
self._queue.add(update)
|
||||||
|
|
||||||
@_wait_if_syncing
|
@_wait_if_syncing
|
||||||
|
@ -479,11 +506,10 @@ class DhcpAgent(manager.Manager):
|
||||||
|
|
||||||
def network_delete_end(self, context, payload):
|
def network_delete_end(self, context, payload):
|
||||||
"""Handle the network.delete.end notification event."""
|
"""Handle the network.delete.end notification event."""
|
||||||
update = queue.ResourceUpdate(payload['network_id'],
|
update = DHCPResourceUpdate(payload['network_id'],
|
||||||
payload.get('priority',
|
payload.get('priority', DEFAULT_PRIORITY),
|
||||||
DEFAULT_PRIORITY),
|
action='_network_delete',
|
||||||
action='_network_delete',
|
resource=payload, obj_type='network')
|
||||||
resource=payload)
|
|
||||||
self._queue.add(update)
|
self._queue.add(update)
|
||||||
|
|
||||||
@_wait_if_syncing
|
@_wait_if_syncing
|
||||||
|
@ -494,11 +520,10 @@ class DhcpAgent(manager.Manager):
|
||||||
|
|
||||||
def subnet_update_end(self, context, payload):
|
def subnet_update_end(self, context, payload):
|
||||||
"""Handle the subnet.update.end notification event."""
|
"""Handle the subnet.update.end notification event."""
|
||||||
update = queue.ResourceUpdate(payload['subnet']['network_id'],
|
update = DHCPResourceUpdate(payload['subnet']['network_id'],
|
||||||
payload.get('priority',
|
payload.get('priority', DEFAULT_PRIORITY),
|
||||||
DEFAULT_PRIORITY),
|
action='_subnet_update',
|
||||||
action='_subnet_update',
|
resource=payload, obj_type='subnet')
|
||||||
resource=payload)
|
|
||||||
self._queue.add(update)
|
self._queue.add(update)
|
||||||
|
|
||||||
@_wait_if_syncing
|
@_wait_if_syncing
|
||||||
|
@ -533,11 +558,10 @@ class DhcpAgent(manager.Manager):
|
||||||
network_id = self._get_network_lock_id(payload)
|
network_id = self._get_network_lock_id(payload)
|
||||||
if not network_id:
|
if not network_id:
|
||||||
return
|
return
|
||||||
update = queue.ResourceUpdate(network_id,
|
update = DHCPResourceUpdate(network_id,
|
||||||
payload.get('priority',
|
payload.get('priority', DEFAULT_PRIORITY),
|
||||||
DEFAULT_PRIORITY),
|
action='_subnet_delete',
|
||||||
action='_subnet_delete',
|
resource=payload, obj_type='subnet')
|
||||||
resource=payload)
|
|
||||||
self._queue.add(update)
|
self._queue.add(update)
|
||||||
|
|
||||||
@_wait_if_syncing
|
@_wait_if_syncing
|
||||||
|
@ -581,11 +605,10 @@ class DhcpAgent(manager.Manager):
|
||||||
if self.cache.is_port_message_stale(updated_port):
|
if self.cache.is_port_message_stale(updated_port):
|
||||||
LOG.debug("Discarding stale port update: %s", updated_port)
|
LOG.debug("Discarding stale port update: %s", updated_port)
|
||||||
return
|
return
|
||||||
update = queue.ResourceUpdate(updated_port.network_id,
|
update = DHCPResourceUpdate(updated_port.network_id,
|
||||||
payload.get('priority',
|
payload.get('priority', DEFAULT_PRIORITY),
|
||||||
DEFAULT_PRIORITY),
|
action='_port_update',
|
||||||
action='_port_update',
|
resource=updated_port, obj_type='port')
|
||||||
resource=updated_port)
|
|
||||||
self._queue.add(update)
|
self._queue.add(update)
|
||||||
|
|
||||||
@_wait_if_syncing
|
@_wait_if_syncing
|
||||||
|
@ -641,11 +664,10 @@ class DhcpAgent(manager.Manager):
|
||||||
def port_create_end(self, context, payload):
|
def port_create_end(self, context, payload):
|
||||||
"""Handle the port.create.end notification event."""
|
"""Handle the port.create.end notification event."""
|
||||||
created_port = dhcp.DictModel(payload['port'])
|
created_port = dhcp.DictModel(payload['port'])
|
||||||
update = queue.ResourceUpdate(created_port.network_id,
|
update = DHCPResourceUpdate(created_port.network_id,
|
||||||
payload.get('priority',
|
payload.get('priority', DEFAULT_PRIORITY),
|
||||||
DEFAULT_PRIORITY),
|
action='_port_create',
|
||||||
action='_port_create',
|
resource=created_port, obj_type='port')
|
||||||
resource=created_port)
|
|
||||||
self._queue.add(update)
|
self._queue.add(update)
|
||||||
|
|
||||||
@_wait_if_syncing
|
@_wait_if_syncing
|
||||||
|
@ -684,11 +706,10 @@ class DhcpAgent(manager.Manager):
|
||||||
network_id = self._get_network_lock_id(payload)
|
network_id = self._get_network_lock_id(payload)
|
||||||
if not network_id:
|
if not network_id:
|
||||||
return
|
return
|
||||||
update = queue.ResourceUpdate(network_id,
|
update = DHCPResourceUpdate(network_id,
|
||||||
payload.get('priority',
|
payload.get('priority', DEFAULT_PRIORITY),
|
||||||
DEFAULT_PRIORITY),
|
action='_port_delete',
|
||||||
action='_port_delete',
|
resource=payload, obj_type='port')
|
||||||
resource=payload)
|
|
||||||
self._queue.add(update)
|
self._queue.add(update)
|
||||||
|
|
||||||
@_wait_if_syncing
|
@_wait_if_syncing
|
||||||
|
|
|
@ -268,7 +268,8 @@ class DhcpAgentNotifyAPI(object):
|
||||||
def _after_router_interface_deleted(self, resource, event, trigger,
|
def _after_router_interface_deleted(self, resource, event, trigger,
|
||||||
**kwargs):
|
**kwargs):
|
||||||
self._notify_agents(kwargs['context'], 'port_delete_end',
|
self._notify_agents(kwargs['context'], 'port_delete_end',
|
||||||
{'port_id': kwargs['port']['id']},
|
{'port_id': kwargs['port']['id'],
|
||||||
|
'fixed_ips': kwargs['port']['fixed_ips']},
|
||||||
kwargs['port']['network_id'])
|
kwargs['port']['network_id'])
|
||||||
|
|
||||||
def _native_event_send_dhcp_notification(self, resource, event, trigger,
|
def _native_event_send_dhcp_notification(self, resource, event, trigger,
|
||||||
|
@ -343,6 +344,8 @@ class DhcpAgentNotifyAPI(object):
|
||||||
payload = {obj_type + '_id': obj_value['id']}
|
payload = {obj_type + '_id': obj_value['id']}
|
||||||
if obj_type != 'network':
|
if obj_type != 'network':
|
||||||
payload['network_id'] = network_id
|
payload['network_id'] = network_id
|
||||||
|
if obj_type == 'port':
|
||||||
|
payload['fixed_ips'] = obj_value['fixed_ips']
|
||||||
self._notify_agents(context, method_name, payload, network_id)
|
self._notify_agents(context, method_name, payload, network_id)
|
||||||
else:
|
else:
|
||||||
self._notify_agents(context, method_name, data, network_id)
|
self._notify_agents(context, method_name, data, network_id)
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
import collections
|
import collections
|
||||||
import copy
|
import copy
|
||||||
|
import datetime
|
||||||
import sys
|
import sys
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
import uuid
|
import uuid
|
||||||
|
@ -2376,3 +2377,63 @@ class TestDeviceManager(base.BaseTestCase):
|
||||||
self.assertEqual(2, device.route.get_gateway.call_count)
|
self.assertEqual(2, device.route.get_gateway.call_count)
|
||||||
self.assertFalse(device.route.delete_gateway.called)
|
self.assertFalse(device.route.delete_gateway.called)
|
||||||
device.route.add_gateway.assert_has_calls(expected)
|
device.route.add_gateway.assert_has_calls(expected)
|
||||||
|
|
||||||
|
|
||||||
|
class TestDHCPResourceUpdate(base.BaseTestCase):
|
||||||
|
|
||||||
|
date1 = datetime.datetime(year=2021, month=2, day=1, hour=9, minute=1,
|
||||||
|
second=2)
|
||||||
|
date2 = datetime.datetime(year=2021, month=2, day=1, hour=9, minute=1,
|
||||||
|
second=1) # older than date1
|
||||||
|
|
||||||
|
def test__lt__no_port_event(self):
|
||||||
|
# Lower numerical priority always gets precedence. DHCPResourceUpdate
|
||||||
|
# (and ResourceUpdate) objects with more precedence will return as
|
||||||
|
# "lower" in a "__lt__" method comparison.
|
||||||
|
update1 = dhcp_agent.DHCPResourceUpdate('id1', 5, obj_type='network')
|
||||||
|
update2 = dhcp_agent.DHCPResourceUpdate('id2', 6, obj_type='network')
|
||||||
|
self.assertLess(update1, update2)
|
||||||
|
|
||||||
|
def test__lt__no_port_event_timestamp(self):
|
||||||
|
update1 = dhcp_agent.DHCPResourceUpdate(
|
||||||
|
'id1', 5, timestamp=self.date1, obj_type='network')
|
||||||
|
update2 = dhcp_agent.DHCPResourceUpdate(
|
||||||
|
'id2', 6, timestamp=self.date2, obj_type='network')
|
||||||
|
self.assertLess(update1, update2)
|
||||||
|
|
||||||
|
def test__lt__port_no_fixed_ips(self):
|
||||||
|
update1 = dhcp_agent.DHCPResourceUpdate(
|
||||||
|
'id1', 5, timestamp=self.date1, resource={}, obj_type='port')
|
||||||
|
update2 = dhcp_agent.DHCPResourceUpdate(
|
||||||
|
'id2', 6, timestamp=self.date2, resource={}, obj_type='port')
|
||||||
|
self.assertLess(update1, update2)
|
||||||
|
|
||||||
|
def test__lt__port_fixed_ips_not_matching(self):
|
||||||
|
resource1 = {'fixed_ips': [
|
||||||
|
{'subnet_id': 'subnet1', 'ip_address': '10.0.0.1'}]}
|
||||||
|
resource2 = {'fixed_ips': [
|
||||||
|
{'subnet_id': 'subnet1', 'ip_address': '10.0.0.2'},
|
||||||
|
{'subnet_id': 'subnet2', 'ip_address': '10.0.1.1'}]}
|
||||||
|
update1 = dhcp_agent.DHCPResourceUpdate(
|
||||||
|
'id1', 5, timestamp=self.date1, resource=resource1,
|
||||||
|
obj_type='port')
|
||||||
|
update2 = dhcp_agent.DHCPResourceUpdate(
|
||||||
|
'id2', 6, timestamp=self.date2, resource=resource2,
|
||||||
|
obj_type='port')
|
||||||
|
self.assertLess(update1, update2)
|
||||||
|
|
||||||
|
def test__lt__port_fixed_ips_matching(self):
|
||||||
|
resource1 = {'fixed_ips': [
|
||||||
|
{'subnet_id': 'subnet1', 'ip_address': '10.0.0.1'}]}
|
||||||
|
resource2 = {'fixed_ips': [
|
||||||
|
{'subnet_id': 'subnet1', 'ip_address': '10.0.0.1'},
|
||||||
|
{'subnet_id': 'subnet2', 'ip_address': '10.0.0.2'}]}
|
||||||
|
update1 = dhcp_agent.DHCPResourceUpdate(
|
||||||
|
'id1', 5, timestamp=self.date1, resource=resource1,
|
||||||
|
obj_type='port')
|
||||||
|
update2 = dhcp_agent.DHCPResourceUpdate(
|
||||||
|
'id2', 6, timestamp=self.date2, resource=resource2,
|
||||||
|
obj_type='port')
|
||||||
|
# In this case, both "port" events have matching IPs. "__lt__" method
|
||||||
|
# uses the timestamp: date2 < date1
|
||||||
|
self.assertLess(update2, update1)
|
||||||
|
|
|
@ -247,7 +247,9 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
|
||||||
self._test__notify_agents_with_function(
|
self._test__notify_agents_with_function(
|
||||||
lambda: self.notifier._after_router_interface_deleted(
|
lambda: self.notifier._after_router_interface_deleted(
|
||||||
mock.ANY, mock.ANY, mock.ANY, context=mock.Mock(),
|
mock.ANY, mock.ANY, mock.ANY, context=mock.Mock(),
|
||||||
port={'id': 'foo_port_id', 'network_id': 'foo_network_id'}),
|
port={'id': 'foo_port_id', 'network_id': 'foo_network_id',
|
||||||
|
'fixed_ips': {'subnet_id': 'subnet1',
|
||||||
|
'ip_address': '10.0.0.1'}}),
|
||||||
expected_scheduling=0, expected_casts=1)
|
expected_scheduling=0, expected_casts=1)
|
||||||
|
|
||||||
def test__fanout_message(self):
|
def test__fanout_message(self):
|
||||||
|
|
Loading…
Reference in New Issue