Merge "Refactor the L3 agent batch notifier" into stable/stein

This commit is contained in:
Zuul 2019-09-10 12:57:36 +00:00 committed by Gerrit Code Review
commit a18213987d
4 changed files with 71 additions and 55 deletions

View File

@ -182,9 +182,6 @@ class AgentMixin(object):
ri.disable_radvd() ri.disable_radvd()
def notify_server(self, batched_events): def notify_server(self, batched_events):
eventlet.spawn_n(self._notify_server, batched_events)
def _notify_server(self, batched_events):
translated_states = dict((router_id, TRANSLATION_MAP[state]) for translated_states = dict((router_id, TRANSLATION_MAP[state]) for
router_id, state in batched_events) router_id, state in batched_events)
LOG.debug('Updating server with HA routers states %s', LOG.debug('Updating server with HA routers states %s',

View File

@ -10,17 +10,17 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import threading
import eventlet import eventlet
from neutron_lib.utils import runtime
from oslo_utils import uuidutils
class BatchNotifier(object): class BatchNotifier(object):
def __init__(self, batch_interval, callback): def __init__(self, batch_interval, callback):
self.pending_events = [] self._pending_events = eventlet.Queue()
self.callback = callback self.callback = callback
self.batch_interval = batch_interval self.batch_interval = batch_interval
self._lock_identifier = 'notifier-%s' % uuidutils.generate_uuid() self._mutex = threading.Lock()
def queue_event(self, event): def queue_event(self, event):
"""Called to queue sending an event with the next batch of events. """Called to queue sending an event with the next batch of events.
@ -33,32 +33,35 @@ class BatchNotifier(object):
This replaces the loopingcall with a mechanism that creates a This replaces the loopingcall with a mechanism that creates a
short-lived thread on demand whenever an event is queued. That thread short-lived thread on demand whenever an event is queued. That thread
will wait for a lock, send all queued events and then sleep for will check if the lock is released, send all queued events and then
'batch_interval' seconds to allow other events to queue up. sleep for 'batch_interval' seconds. If at the end of this sleep time,
other threads have added new events to the event queue, the same thread
will process them.
This effectively acts as a rate limiter to only allow 1 batch per At the same time, other threads will be able to add new events to the
'batch_interval' seconds. queue and will spawn new "synced_send" threads to process them. But if
the mutex is locked, the spawned thread will end immediately.
:param event: the event that occurred. :param event: the event that occurred.
""" """
if not event: if not event:
return return
self.pending_events.append(event) self._pending_events.put(event)
@runtime.synchronized(self._lock_identifier)
def synced_send(): def synced_send():
self._notify() if not self._mutex.locked():
# sleeping after send while holding the lock allows subsequent with self._mutex:
# events to batch up while not self._pending_events.empty():
eventlet.sleep(self.batch_interval) self._notify()
# sleeping after send while holding the lock allows
# subsequent events to batch up
eventlet.sleep(self.batch_interval)
eventlet.spawn_n(synced_send) eventlet.spawn_n(synced_send)
def _notify(self): def _notify(self):
if not self.pending_events: batched_events = []
return while not self._pending_events.empty():
batched_events.append(self._pending_events.get())
batched_events = self.pending_events
self.pending_events = []
self.callback(batched_events) self.callback(batched_events)

View File

@ -16,6 +16,7 @@
import eventlet import eventlet
import mock import mock
from neutron.common import utils
from neutron.notifiers import batch_notifier from neutron.notifiers import batch_notifier
from neutron.tests import base from neutron.tests import base
@ -23,41 +24,54 @@ from neutron.tests import base
class TestBatchNotifier(base.BaseTestCase): class TestBatchNotifier(base.BaseTestCase):
def setUp(self): def setUp(self):
super(TestBatchNotifier, self).setUp() super(TestBatchNotifier, self).setUp()
self.notifier = batch_notifier.BatchNotifier(0.1, lambda x: x) self._received_events = eventlet.Queue()
self.spawn_n_p = mock.patch('eventlet.spawn_n') self.notifier = batch_notifier.BatchNotifier(2, self._queue_events)
self.spawn_n = self.spawn_n_p.start() self.spawn_n_p = mock.patch.object(eventlet, 'spawn_n')
def _queue_events(self, events):
for event in events:
self._received_events.put(event)
def test_queue_event_no_event(self): def test_queue_event_no_event(self):
spawn_n = self.spawn_n_p.start()
self.notifier.queue_event(None) self.notifier.queue_event(None)
self.assertEqual(0, len(self.notifier.pending_events)) self.assertEqual(0, len(self.notifier._pending_events.queue))
self.assertEqual(0, self.spawn_n.call_count) self.assertEqual(0, spawn_n.call_count)
def test_queue_event_first_event(self): def test_queue_event_first_event(self):
spawn_n = self.spawn_n_p.start()
self.notifier.queue_event(mock.Mock()) self.notifier.queue_event(mock.Mock())
self.assertEqual(1, len(self.notifier.pending_events)) self.assertEqual(1, len(self.notifier._pending_events.queue))
self.assertEqual(1, self.spawn_n.call_count) self.assertEqual(1, spawn_n.call_count)
def test_queue_event_multiple_events(self): def test_queue_event_multiple_events_notify_method(self):
self.spawn_n_p.stop() def _batch_notifier_dequeue():
c_mock = mock.patch.object(self.notifier, 'callback').start() while not self.notifier._pending_events.empty():
events = 6 self.notifier._pending_events.get()
for i in range(0, events):
self.notifier.queue_event(mock.Mock()) c_mock = mock.patch.object(self.notifier, '_notify',
side_effect=_batch_notifier_dequeue).start()
events = 20
for i in range(events):
self.notifier.queue_event('Event %s' % i)
eventlet.sleep(0) # yield to let coro execute eventlet.sleep(0) # yield to let coro execute
while self.notifier.pending_events: utils.wait_until_true(self.notifier._pending_events.empty,
# wait for coroutines to finish timeout=5)
eventlet.sleep(0.1) # Called twice: when the first thread calls "synced_send" and then,
# in the same loop, when self._pending_events is not empty(). All
# self.notifier.queue_event calls are done in just one
# "batch_interval" (2 secs).
self.assertEqual(2, c_mock.call_count) self.assertEqual(2, c_mock.call_count)
self.assertEqual(6, sum(len(c[0][0]) for c in c_mock.call_args_list))
self.assertEqual(0, len(self.notifier.pending_events))
def test_queue_event_call_send_events(self): def test_queue_event_multiple_events_callback_method(self):
with mock.patch.object(self.notifier, events = 20
'callback') as send_events: for i in range(events):
self.spawn_n.side_effect = lambda func: func() self.notifier.queue_event('Event %s' % i)
self.notifier.queue_event(mock.Mock()) eventlet.sleep(0) # yield to let coro execute
while self.notifier.pending_events:
# wait for coroutines to finish utils.wait_until_true(self.notifier._pending_events.empty,
eventlet.sleep(0.1) timeout=5)
self.assertTrue(send_events.called) expected = ['Event %s' % i for i in range(events)]
# Check the events have been handled in the same input order.
self.assertEqual(expected, list(self._received_events.queue))

View File

@ -294,7 +294,7 @@ class TestNovaNotify(base.BaseTestCase):
self.nova_notifier.send_network_change( self.nova_notifier.send_network_change(
'update_floatingip', original_obj, returned_obj) 'update_floatingip', original_obj, returned_obj)
self.assertEqual( self.assertEqual(
2, len(self.nova_notifier.batch_notifier.pending_events)) 2, len(self.nova_notifier.batch_notifier._pending_events.queue))
returned_obj_non = {'floatingip': {'port_id': None}} returned_obj_non = {'floatingip': {'port_id': None}}
event_dis = self.nova_notifier.create_port_changed_event( event_dis = self.nova_notifier.create_port_changed_event(
@ -302,9 +302,10 @@ class TestNovaNotify(base.BaseTestCase):
event_assoc = self.nova_notifier.create_port_changed_event( event_assoc = self.nova_notifier.create_port_changed_event(
'update_floatingip', original_obj, returned_obj) 'update_floatingip', original_obj, returned_obj)
self.assertEqual( self.assertEqual(
self.nova_notifier.batch_notifier.pending_events[0], event_dis) self.nova_notifier.batch_notifier._pending_events.get(), event_dis)
self.assertEqual( self.assertEqual(
self.nova_notifier.batch_notifier.pending_events[1], event_assoc) self.nova_notifier.batch_notifier._pending_events.get(),
event_assoc)
def test_delete_port_notify(self): def test_delete_port_notify(self):
device_id = '32102d7b-1cf4-404d-b50a-97aae1f55f87' device_id = '32102d7b-1cf4-404d-b50a-97aae1f55f87'
@ -365,6 +366,7 @@ class TestNovaNotify(base.BaseTestCase):
self.nova_notifier.notify_port_active_direct(port) self.nova_notifier.notify_port_active_direct(port)
self.assertEqual( self.assertEqual(
1, len(self.nova_notifier.batch_notifier.pending_events)) 1, len(self.nova_notifier.batch_notifier._pending_events.queue))
self.assertEqual(expected_event, self.assertEqual(
self.nova_notifier.batch_notifier.pending_events[0]) expected_event,
self.nova_notifier.batch_notifier._pending_events.get())